Tag Archives: Distributed lock

Successful cases of redis distributed lock

1. Custom distributed lock tool class

package com.cache.redis.demo.util;

import org.apache.commons.lang.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * redisImplementing distributed locks
 * */
public class RedisLockHelper {

    private static final Logger log = LoggerFactory.getLogger(RedisLockHelper.class);

    /**
     * Default interval for lock acquisition by rotation, in milliseconds
     */
    private static final int DEFAULT_ACQUIRE_RESOLUTION_MILLIS = 100;

    private static final String UNLOCK_LUA;

    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    }

    private RedisTemplate redisTemplate;

    private final ThreadLocal<Map<String, LockVO&>&> lockMap = new ThreadLocal<&>();

    public RedisLockHelper(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * Get the lock and wait if it is not acquired
     *
     * @param key redis key
     * @param expire lock expiration time, in seconds
     */
    public void lock(final String key, long expire) {
        try {
            acquireLock(key, expire, -1);
        } catch (Exception e) {
            throw new RuntimeException("acquire lock exception", e);
        }
    }

    /**
     * If the lock is not acquired within the specified time, false is returned. otherwise, true is returned.
     *
     * @param key redis key
     * @param expire lock expiration time, in seconds
     * @param acquireTimeout The lock timeout period, -1 means never, in seconds.
     */
    public boolean lock(final String key, long expire, long acquireTimeout) throws RuntimeException {
        try {
            return acquireLock(key, expire, acquireTimeout);
        } catch (Exception e) {
            throw new RuntimeException("acquire lock exception", e);
        }
    }

    /**
     * 
     *
     * @param key redis key
     */
    public void unlock(String key) {
        try {
            release(key);
        } catch (Exception e) {
            throw new RuntimeException("release lock exception", e);
        }
    }


    private boolean acquireLock(String key, long expire, long acquireTimeout) throws InterruptedException {
        //If it was previously fetched and did not time out, then return the fetch success
        boolean acquired = acquired(key);
        if (acquired) {
            return true;
        }

        long acquireTime = acquireTimeout == -1 ?-1 : acquireTimeout * 1000 + System.currentTimeMillis();
        String currentTimeStr = DateFormatUtils.format(System.currentTimeMillis(),"yyyy-MM-dd HH:mm:ss.SSS");
        String acquireTimeStr = DateFormatUtils.format(acquireTime,"yyyy-MM-dd HH:mm:ss.SSS");
        log.info("Current time: {}, Timeout time.{}",currentTimeStr,acquireTimeStr);


        //The same process, for the same key lock, only allows the first one to try to get it.
        synchronized (key.intern()) {
            String lockId = UUID.randomUUID().toString();
            do {
                long before = System.currentTimeMillis();

                boolean hasLock = tryLock(key, expire, lockId);

                //Acquire lock successfully
                if (hasLock) {
                    long after = System.currentTimeMillis();
                    Map<String, LockVO&> map = lockMap.get();
                    if (map == null) {
                        map = new HashMap<&>(2);
                        lockMap.set(map);
                    }
                    map.put(key, new LockVO(1, lockId, expire * 1000 + before, expire * 1000 + after));
                    log.debug("acquire lock {} {} ", key, 1);
                    return true;
                }

                Thread.sleep(DEFAULT_ACQUIRE_RESOLUTION_MILLIS);

            } while (acquireTime == -1 || acquireTime &> System.currentTimeMillis());
        }
        log.debug("acquire lock {} fail,because timeout ", key);
        return false;
    }

    private boolean acquired(String key) {
        Map<String, LockVO&> map = lockMap.get();
        if (map == null || map.size() == 0 || !map.containsKey(key)) {
            return false;
        }

        LockVO vo = map.get(key);

        if (vo.beforeExpireTime < System.currentTimeMillis()) {
            log.debug("lock {} maybe release, because timeout ", key);
            return false;
        }
        int after = ++vo.count;
        log.debug("acquire lock {} {} ", key, after);
        return true;
    }

    private void release(String key) {
        Map<String, LockVO&> map = lockMap.get();
        if (map == null || map.size() == 0 || !map.containsKey(key)) {
            return;
        }

        LockVO vo = map.get(key);

        if (vo.afterExpireTime < System.currentTimeMillis()) {
            log.debug("release lock {}, because timeout ", key);
            map.remove(key);
            return;
        }
        int after = --vo.count;
        log.debug("release lock {} {} ", key, after);

        if (after &> 0) {
            return;
        }

        map.remove(key);
        RedisCallback<Boolean&> callback = (connection) -&>
                connection.eval(UNLOCK_LUA.getBytes(StandardCharsets.UTF_8), ReturnType.BOOLEAN, 1,
                        (RedisPrefix.LOCK_REDIS_PREFIX + key).getBytes(StandardCharsets.UTF_8), vo.value.getBytes(StandardCharsets.UTF_8));
        redisTemplate.execute(callback);
    }


    private boolean tryLock(String key, long expire, String lockId) {
        RedisCallback<Boolean&> callback = (connection) -&>
                connection.set((RedisPrefix.LOCK_REDIS_PREFIX + key).getBytes(StandardCharsets.UTF_8),
                        lockId.getBytes(StandardCharsets.UTF_8), Expiration.seconds(expire), RedisStringCommands.SetOption.SET_IF_ABSENT);
        return (Boolean) redisTemplate.execute(callback);
    }

    private static class LockVO {
        private int count;
        private String value;
        private long beforeExpireTime;
        private long afterExpireTime;

        LockVO(int count, String value, long beforeExpireTime, long afterExpireTime) {
            this.count = count;
            this.value = value;
            this.beforeExpireTime = beforeExpireTime;
            this.afterExpireTime = afterExpireTime;
        }
    }

}

2. Business thread

package com.cache.redis.demo.service;

import com.cache.redis.demo.util.RedisLockHelper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;

@Slf4j
public class MyThread implements Runnable {
    public static Integer count = 0;

    private String threadName;
    private StringRedisTemplate stringRedisTemplate;

    public MyThread() {
    }

    public MyThread(String threadName, StringRedisTemplate stringRedisTemplate) {
        this.threadName = threadName;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public void run() {
        String key = "sync-key";
        RedisLockHelper redisLockHelper = new RedisLockHelper(stringRedisTemplate);
        //Way 1: Do not set the lock timeout time, the lock expires in 2 seconds
        redisLockHelper.lock(key, 2L);

        // mode 2: no lock timeout of 5 seconds and lock expiration of 2 seconds
       /* Boolean flag = redisLockHelper.lock(key, 2L,5L);
        log.info("Obtain synchronization lock identification.{}",flag);
        if(false == flag){
            return;
        }*/

        count++;
        log.info("Action" + this.threadName + "output:" + count);

        redisLockHelper.unlock(key);
    }
}

3. Testing

package com.cache.redis.demo.util;

import com.cache.redis.demo.service.MyThread;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestLock {

    public static void main(String[] args) {
        StringRedisTemplate stringRedisTemplate = getRedisTemplate();
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 100; i++) {
            MyThread myThread = new MyThread("threadName" + i, stringRedisTemplate);
            fixedThreadPool.execute(myThread);
        }

    }

    public static StringRedisTemplate getRedisTemplate() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName("127.0.0.1");
        config.setPort(6379);
        config.setPassword("pp@123e");
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(config);
       /* JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setHostName("127.0.0.1");
        jedisConnectionFactory.setPort(6379);
        jedisConnectionFactory.setPassword("pp@123e");*/

        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        Jackson2JsonRedisSerializer<Object&> fastJsonRedisSerializer = new Jackson2JsonRedisSerializer<Object&>(Object.class);
        //value fastJsonRedisSerializer
        stringRedisTemplate.setValueSerializer(fastJsonRedisSerializer);
        stringRedisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
        //key StringRedisSerializer
        stringRedisTemplate.setKeySerializer(new StringRedisSerializer());
        stringRedisTemplate.setHashKeySerializer(new StringRedisSerializer());

        stringRedisTemplate.setConnectionFactory(jedisConnectionFactory);

        stringRedisTemplate.afterPropertiesSet();

        return stringRedisTemplate;
    }
}