Keywords: thread, thread pool, queue, task scheduling
1. Retrying tool class
package com.iretry;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Retry Tools Class
*/
@Slf4j
public class RetryUtils {
private final static int DEFAULT_RETRY_TIMES = 6;
private final static int[] DEFAULT_DELAY_SECONDS = {3, 30, 180, 600, 1800, 3600};
private static Queue<RetryRunnable> TASK_QUEUE = new ConcurrentLinkedQueue<>();
public static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
public static ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
static {
//Configure the number of core threads
executor.setCorePoolSize(10);
// Configure the maximum number of threads
executor.setMaxPoolSize(20);
//configure queue size
executor.setQueueCapacity(1000);
//configure the name prefix of the threads in the thread pool
executor.setThreadNamePrefix("async_");
// rejection-policy: how to handle new tasks when the pool has reached max size
// CALLER_RUNS: instead of executing the task in a new thread, the caller's thread will execute it
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//Perform initialization
executor.initialize();
scheduler.setThreadNamePrefix("scheduler_");
scheduler.setPoolSize(5);
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
scheduler.initialize();
}
public RetryUtils() {
//Check every second: iterate through the task queue, if needed, the thread pool schedules execution
scheduler.scheduleAtFixedRate(() -> {
for (RetryRunnable task : TASK_QUEUE) {
long nextRetryMillis = task.nextRetryMillis;
if (nextRetryMillis != -1 && nextRetryMillis <= System.currentTimeMillis()) {
task.nextRetryMillis = -1;
executor.execute(task);
}
}
}, 1000);
}
public void doRetry(Task task) {
doRetry(DEFAULT_RETRY_TIMES, DEFAULT_DELAY_SECONDS, task);
}
public void doRetry(int maxRetryTime, Task task) {
doRetry(maxRetryTime, DEFAULT_DELAY_SECONDS, task);
}
public void doRetry(int[] retryDelaySeconds, Task task) {
doRetry(retryDelaySeconds.length, retryDelaySeconds, task);
}
/**
* @param maxRetryTime Maximum number of retries
* @param retryDelaySeconds array of retry interval times
* @param task task
*/
public void doRetry(final int maxRetryTime, final int[] retryDelaySeconds, final Task task) {
Runnable runnable = new RetryRunnable(maxRetryTime, retryDelaySeconds, task);
executor.execute(runnable);
}
/**
* Custom Threads Class
*/
private static class RetryRunnable implements Runnable {
private final Task task;
private final int maxRetryTimes;
private final int[] retryDelaySeconds;
private int retryTimes;
private volatile long nextRetryMillis;
//Constructors
public RetryRunnable(final int maxRetryTimes, final int[] retryDelaySeconds, final Task task) {
this.task = task;
if (maxRetryTimes <= 0) {
this.maxRetryTimes = DEFAULT_RETRY_TIMES;
} else {
this.maxRetryTimes = maxRetryTimes;
}
if (retryDelaySeconds == null || retryDelaySeconds.length == 0) {
this.retryDelaySeconds = DEFAULT_DELAY_SECONDS;
} else {
this.retryDelaySeconds = retryDelaySeconds;
}
}
//Implementation of business methods
@Override
public void run() {
try {
task.run();
} catch (Throwable e) {
int sleepSeconds = retryTimes < retryDelaySeconds.length ?retryDelaySeconds[retryTimes] : retryDelaySeconds[retryDelaySeconds.length - 1];
if (retryTimes < maxRetryTimes) {
if (retryTimes == 0) {
TASK_QUEUE.add(this);
log.error("task executed error, " + sleepSeconds + " seconds do next... ", e);
} else {
log.error("retry " + retryTimes + " times error, " + sleepSeconds + " seconds do next... ", e);
}
nextRetryMillis = System.currentTimeMillis() + sleepSeconds * 1000;
} else {
log.error("retry " + retryTimes + " times error", e);
log.error("retry snapshot: {}", JSON.toJSONStringWithDateFormat(task.snapshot(), "yyyy-MM-dd HH:mm:ss"));
TASK_QUEUE.remove(this);
task.retryFailed(e);
}
retryTimes++;
}
}
}
}
2. Business class interface
package com.iretry;
public interface Task {
void run() throws Exception;
default void retryFailed(Throwable e) {
}
default Object snapshot() {
return null;
}
}
3. Test class
package com.iretry;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class IRetryTest {
public static void main(String[] args) {
RetryUtils retryUtils = new RetryUtils();
IRetryTest iRetryTest = new IRetryTest();
iRetryTest.executeBusiness(retryUtils, new IRetryTest().new UserInfo("Jack"));
}
//Business methods that require retries
private void executeBusiness(RetryUtils retryUtils, UserInfo user) {
retryUtils.doRetry(new int[]{6, 12}, new Task() {
@Override
public void run() throws Exception {
user.setName("Henry");
log.info("Execute the operation, change the name of the employee to: " + user.getName());
Integer a = 1/0;
}
// Support rollback operations after final failure
@Override
public void retryFailed(Throwable e) {
user.setName("Jack");
log.info("Retry failed, business data rolled back...") ;
log.info("Username is called." + user.getName());
}
@Override
public Object snapshot() {
return user;
}
});
}
@Data
@NoArgsConstructor
@AllArgsConstructor
private class UserInfo {
private String name;
}
}
4. Implementation results
Read More:
- The thread implementation of timer in Java
- Java callback function implementation case
- Java implementation of inputsteam to Base64
- Redis: How to Implementate LRU Caching Mechanism Manually
- Java: How to use itext to export PDF text absolute positioning (implementation method)
- [Solved] Jxls error: Cannot load XLS transformer. Please make sure a Transformer implementation is in classpath
- java: java.lang.IllegalAccessError: class lombok.javac.apt.LombokProcessor (in unnamed module @0x590
- [Solved] java Internal error in the mapping processor java.lang.NullPointerException
- [Solved] java Internal error in the mapping processor java.lang.NullPointerException
- Java uses class array to report error Exception in thread “main” java.lang.NullPointerException solution
- [Solved] java: Internal error in the mapping processor: java.lang.NullPointerException
- C++: Implementation of multi-channel IO transfer with select
- C language: Implementation of dynamic array initialization, insertion, traversal, deletion and destruction
- Three ways of thread sequence alternate execution in Java lock free programming
- [Solved] IDEA springboot Startup Error: java.lang.UnsatisfiedLinkError: no tcnative-1 in java.library.path
- [Solved] The Bean Validation API is on the classpath but no implementation could be found
- How to Close the Current Form in JAVA Swing
- [Solved] Hibernate Error: java.lang.StackOverflowError at java.lang.Integer.toString(Integer.java:402)
- [Solved] IDEA java Cannot resolve method ‘getName‘ in ***
- Ternary operator in Java?: error: not a statement