Implementation of retrial mechanism in Java

Keywords: thread, thread pool, queue, task scheduling

1. Retrying tool class

package com.iretry;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Retry Tools Class
 */
@Slf4j
public class RetryUtils {

    private final static int DEFAULT_RETRY_TIMES = 6;
    private final static int[] DEFAULT_DELAY_SECONDS = {3, 30, 180, 600, 1800, 3600};

    private static Queue<RetryRunnable> TASK_QUEUE = new ConcurrentLinkedQueue<>();

    public static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    public static ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();

    static {
        //Configure the number of core threads
        executor.setCorePoolSize(10);
        // Configure the maximum number of threads
        executor.setMaxPoolSize(20);
        //configure queue size
        executor.setQueueCapacity(1000);
        //configure the name prefix of the threads in the thread pool
        executor.setThreadNamePrefix("async_");
        // rejection-policy: how to handle new tasks when the pool has reached max size
        // CALLER_RUNS: instead of executing the task in a new thread, the caller's thread will execute it
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //Perform initialization
        executor.initialize();

        scheduler.setThreadNamePrefix("scheduler_");
        scheduler.setPoolSize(5);
        scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        scheduler.initialize();
    }

    public RetryUtils() {
        //Check every second: iterate through the task queue, if needed, the thread pool schedules execution
        scheduler.scheduleAtFixedRate(() -> {
            for (RetryRunnable task : TASK_QUEUE) {
                long nextRetryMillis = task.nextRetryMillis;
                if (nextRetryMillis != -1 && nextRetryMillis <= System.currentTimeMillis()) {
                    task.nextRetryMillis = -1;
                    executor.execute(task);
                }
            }
        }, 1000);
    }

    public void doRetry(Task task) {
        doRetry(DEFAULT_RETRY_TIMES, DEFAULT_DELAY_SECONDS, task);
    }

    public void doRetry(int maxRetryTime, Task task) {
        doRetry(maxRetryTime, DEFAULT_DELAY_SECONDS, task);
    }

    public void doRetry(int[] retryDelaySeconds, Task task) {
        doRetry(retryDelaySeconds.length, retryDelaySeconds, task);
    }

    /**
     * @param maxRetryTime      Maximum number of retries
     * @param retryDelaySeconds array of retry interval times
     * @param task task
     */
    public void doRetry(final int maxRetryTime, final int[] retryDelaySeconds, final Task task) {
        Runnable runnable = new RetryRunnable(maxRetryTime, retryDelaySeconds, task);
        executor.execute(runnable);
    }

    /**
     * Custom Threads Class
     */
    private static class RetryRunnable implements Runnable {

        private final Task task;
        private final int maxRetryTimes;
        private final int[] retryDelaySeconds;

        private int retryTimes;
        private volatile long nextRetryMillis;

        //Constructors
        public RetryRunnable(final int maxRetryTimes, final int[] retryDelaySeconds, final Task task) {
            this.task = task;
            if (maxRetryTimes <= 0) {
                this.maxRetryTimes = DEFAULT_RETRY_TIMES;
            } else {
                this.maxRetryTimes = maxRetryTimes;
            }
            if (retryDelaySeconds == null || retryDelaySeconds.length == 0) {
                this.retryDelaySeconds = DEFAULT_DELAY_SECONDS;
            } else {
                this.retryDelaySeconds = retryDelaySeconds;
            }
        }

        //Implementation of business methods
        @Override
        public void run() {
            try {

                task.run();

            } catch (Throwable e) {
                int sleepSeconds = retryTimes < retryDelaySeconds.length ?retryDelaySeconds[retryTimes] : retryDelaySeconds[retryDelaySeconds.length - 1];

                if (retryTimes < maxRetryTimes) {
                    if (retryTimes == 0) {
                        TASK_QUEUE.add(this);
                        log.error("task executed error, " + sleepSeconds + " seconds do next... ", e);
                    } else {
                        log.error("retry " + retryTimes + " times error, " + sleepSeconds + " seconds do next... ", e);
                    }
                    nextRetryMillis = System.currentTimeMillis() + sleepSeconds * 1000;

                } else {
                    log.error("retry " + retryTimes + " times error", e);
                    log.error("retry snapshot: {}", JSON.toJSONStringWithDateFormat(task.snapshot(), "yyyy-MM-dd HH:mm:ss"));

                    TASK_QUEUE.remove(this);
                    task.retryFailed(e);

                }

                retryTimes++;

            }
        }
    }

}

2. Business class interface

package com.iretry;

public interface Task {

    void run() throws Exception;

    default void retryFailed(Throwable e) {
    }

    default Object snapshot() {
        return null;
    }
}

3. Test class

package com.iretry;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
public class IRetryTest {

    public static void main(String[] args) {
        RetryUtils retryUtils = new RetryUtils();

        IRetryTest iRetryTest = new IRetryTest();
        iRetryTest.executeBusiness(retryUtils, new IRetryTest().new UserInfo("Jack"));

    }

    //Business methods that require retries
    private void executeBusiness(RetryUtils retryUtils, UserInfo user) {

        retryUtils.doRetry(new int[]{6, 12}, new Task() {
            @Override
            public void run() throws Exception {
                user.setName("Henry");
                log.info("Execute the operation, change the name of the employee to: " + user.getName());
                Integer a = 1/0;

            }

            // Support rollback operations after final failure
            @Override
            public void retryFailed(Throwable e) {
                user.setName("Jack");
                log.info("Retry failed, business data rolled back...") ;
                log.info("Username is called." + user.getName());
            }

            @Override
            public Object snapshot() {
                return user;
            }
        });
    }


    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    private class UserInfo {
        private String name;
    }


}

4. Implementation results

Read More: