Keywords: thread, thread pool, queue, task scheduling
1. Retrying tool class
package com.iretry;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Retry Tools Class
*/
@Slf4j
public class RetryUtils {
private final static int DEFAULT_RETRY_TIMES = 6;
private final static int[] DEFAULT_DELAY_SECONDS = {3, 30, 180, 600, 1800, 3600};
private static Queue<RetryRunnable> TASK_QUEUE = new ConcurrentLinkedQueue<>();
public static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
public static ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
static {
//Configure the number of core threads
executor.setCorePoolSize(10);
// Configure the maximum number of threads
executor.setMaxPoolSize(20);
//configure queue size
executor.setQueueCapacity(1000);
//configure the name prefix of the threads in the thread pool
executor.setThreadNamePrefix("async_");
// rejection-policy: how to handle new tasks when the pool has reached max size
// CALLER_RUNS: instead of executing the task in a new thread, the caller's thread will execute it
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//Perform initialization
executor.initialize();
scheduler.setThreadNamePrefix("scheduler_");
scheduler.setPoolSize(5);
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
scheduler.initialize();
}
public RetryUtils() {
//Check every second: iterate through the task queue, if needed, the thread pool schedules execution
scheduler.scheduleAtFixedRate(() -> {
for (RetryRunnable task : TASK_QUEUE) {
long nextRetryMillis = task.nextRetryMillis;
if (nextRetryMillis != -1 && nextRetryMillis <= System.currentTimeMillis()) {
task.nextRetryMillis = -1;
executor.execute(task);
}
}
}, 1000);
}
public void doRetry(Task task) {
doRetry(DEFAULT_RETRY_TIMES, DEFAULT_DELAY_SECONDS, task);
}
public void doRetry(int maxRetryTime, Task task) {
doRetry(maxRetryTime, DEFAULT_DELAY_SECONDS, task);
}
public void doRetry(int[] retryDelaySeconds, Task task) {
doRetry(retryDelaySeconds.length, retryDelaySeconds, task);
}
/**
* @param maxRetryTime Maximum number of retries
* @param retryDelaySeconds array of retry interval times
* @param task task
*/
public void doRetry(final int maxRetryTime, final int[] retryDelaySeconds, final Task task) {
Runnable runnable = new RetryRunnable(maxRetryTime, retryDelaySeconds, task);
executor.execute(runnable);
}
/**
* Custom Threads Class
*/
private static class RetryRunnable implements Runnable {
private final Task task;
private final int maxRetryTimes;
private final int[] retryDelaySeconds;
private int retryTimes;
private volatile long nextRetryMillis;
//Constructors
public RetryRunnable(final int maxRetryTimes, final int[] retryDelaySeconds, final Task task) {
this.task = task;
if (maxRetryTimes <= 0) {
this.maxRetryTimes = DEFAULT_RETRY_TIMES;
} else {
this.maxRetryTimes = maxRetryTimes;
}
if (retryDelaySeconds == null || retryDelaySeconds.length == 0) {
this.retryDelaySeconds = DEFAULT_DELAY_SECONDS;
} else {
this.retryDelaySeconds = retryDelaySeconds;
}
}
//Implementation of business methods
@Override
public void run() {
try {
task.run();
} catch (Throwable e) {
int sleepSeconds = retryTimes < retryDelaySeconds.length ?retryDelaySeconds[retryTimes] : retryDelaySeconds[retryDelaySeconds.length - 1];
if (retryTimes < maxRetryTimes) {
if (retryTimes == 0) {
TASK_QUEUE.add(this);
log.error("task executed error, " + sleepSeconds + " seconds do next... ", e);
} else {
log.error("retry " + retryTimes + " times error, " + sleepSeconds + " seconds do next... ", e);
}
nextRetryMillis = System.currentTimeMillis() + sleepSeconds * 1000;
} else {
log.error("retry " + retryTimes + " times error", e);
log.error("retry snapshot: {}", JSON.toJSONStringWithDateFormat(task.snapshot(), "yyyy-MM-dd HH:mm:ss"));
TASK_QUEUE.remove(this);
task.retryFailed(e);
}
retryTimes++;
}
}
}
}
2. Business class interface
package com.iretry;
public interface Task {
void run() throws Exception;
default void retryFailed(Throwable e) {
}
default Object snapshot() {
return null;
}
}
3. Test class
package com.iretry;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class IRetryTest {
public static void main(String[] args) {
RetryUtils retryUtils = new RetryUtils();
IRetryTest iRetryTest = new IRetryTest();
iRetryTest.executeBusiness(retryUtils, new IRetryTest().new UserInfo("Jack"));
}
//Business methods that require retries
private void executeBusiness(RetryUtils retryUtils, UserInfo user) {
retryUtils.doRetry(new int[]{6, 12}, new Task() {
@Override
public void run() throws Exception {
user.setName("Henry");
log.info("Execute the operation, change the name of the employee to: " + user.getName());
Integer a = 1/0;
}
// Support rollback operations after final failure
@Override
public void retryFailed(Throwable e) {
user.setName("Jack");
log.info("Retry failed, business data rolled back...") ;
log.info("Username is called." + user.getName());
}
@Override
public Object snapshot() {
return user;
}
});
}
@Data
@NoArgsConstructor
@AllArgsConstructor
private class UserInfo {
private String name;
}
}
4. Implementation results