Jedis中怎么实现分布式锁

本篇文章为大家展示了Jedis中怎么实现分布式锁,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

目前创新互联已为超过千家的企业提供了网站建设、域名、虚拟主机、网站托管、服务器租用、企业网站设计、昆玉网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

package com.xxx.arch.seq.client.redis;

import java.io.Closeable;
import java.util.*;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.*;

import com.xxx.arch.seq.constant.Constants;

/**
 * Jedis配置实例封装类(兼容单节点连接池和集群节点)
 *
 * @author zhangyang
 * @createDate 2019-01-22
 * @since 2.x
 */
public class JedisConfig {

    private static volatile JedisConfig redisConfig;

    //当前模式:1单例,2哨兵 3集群Cluster
    private int singleton;

    //jedis连接池
    private JedisPool jedisPool;

    private JedisSentinelPool sentinelPool;

    private Jedis jedis;


    //jeids集群
    private JedisCluster jedisCluster;

    private JedisConfig() {
        Properties redisProp = new Properties();
        
		redisProp.setProperty("arch.seq.redis.host", Constants.ARCH_SEQ_REDIS_NODES);
		redisProp.setProperty("arch.seq.redis.password", Constants.ARCH_SEQ_REDIS_PASSWORD);
		redisProp.setProperty("arch.seq.redis.sentinel.master", Constants.ARCH_SEQ_REDIS_SENTINEL_MASTER);

        String hostConf = redisProp.getProperty("arch.seq.redis.host");
        if (hostConf == null) {
            throw new RuntimeException("get redis configuration error");
        }
        if ("${arch.seq.redis.host}".equals(hostConf)) {
            throw new RuntimeException("please check occ var \"arch.seq.redis.host\"");
        }
        if(!hostConf.contains(",")&&!hostConf.contains(">>")){
            singleton = 1;
        }else if(hostConf.contains(">>")){
            singleton=2;
        }else{
            singleton=3;
        }

        if (singleton==1) {
            initJedisPool(redisProp);
        } else if(singleton==2){
            initJedisSentinel(redisProp);
        }else{
            initJedisCluster(redisProp);
        }
    }

    private void initJedisPool(Properties redisProp) {
        String[] hostConf = redisProp.getProperty("arch.seq.redis.host").split(":");
        this.jedisPool = new JedisPool(new JedisPoolConfig(), hostConf[0], Integer.valueOf(hostConf[1]),
                0, redisProp.getProperty("arch.seq.redis.password"));
    }

    private void initJedisCluster(Properties redisProp) {
        String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(",");
        Set nodes = new HashSet<>();
        String[] hostConf;
        for (String hc : hostConfList) {
            hostConf = hc.split(":");
            nodes.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1])));
        }
        jedisCluster = new JedisCluster(nodes, 0, 0, 4,
                redisProp.getProperty("arch.seq.redis.password"), new GenericObjectPoolConfig());
    }

    private void initJedisSentinel(Properties redisProp) {
        String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(">>");
        Set sentinels = new HashSet();
        String[] hostConf;
        for (String hc : hostConfList) {
            hostConf= hc.split(":");
            sentinels.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1])).toString());
        }

        sentinelPool = new JedisSentinelPool(redisProp.getProperty("arch.seq.redis.sentinel.master"), sentinels,redisProp.getProperty("arch.seq.redis.password"));
        jedis = sentinelPool.getResource();
    }

    public static JedisConfig getInstance() {
        if (redisConfig == null) {
            synchronized (JedisConfig.class) {
                if (redisConfig == null) {
                    redisConfig = new JedisConfig();
                }
            }
        }
        return redisConfig;
    }

    public JedisConn getConn() {
        if(singleton==1){
            return new JedisConn(jedisPool.getResource());
        }
        if(singleton==2){
            return new JedisConn(sentinelPool.getResource());
        }
        if(singleton==3){
            return new JedisConn(jedisCluster);
        }
        return null;
    }

    /**
     * redis连接封装类,支持单机和集群,支持常规操作,支持分布式锁
     */
    public static class JedisConn implements Closeable {

        private JedisCommands invoke;

        public JedisConn(JedisCommands invoke) {
            this.invoke = invoke;
        }

        /**
         * 设置一个必须是不存在的值
         *
         * @param key   - 关键字
         * @param value
         * @return 1-成功 0-失败
         */
        public Long setnx(String key, String value) {
            return invoke.setnx(key, value);
        }

        /**
         * 获得一个值
         *
         * @param key - 关键字
         * @return
         */
        public String get(String key) {
            return invoke.get(key);
        }

        /**
         * 更新一个值
         *
         * @param key   - 关键字
         * @param value - 值
         * @return
         */
        public String set(String key, String value) {
            return invoke.set(key, value);
        }

        /**
         * 更新一个值,并返回更新前的老值
         *
         * @param key   - 关键字
         * @param value - 值
         * @return 更新前的老值
         */
        public String getSet(String key, String value) {
            return invoke.getSet(key, value);
        }

        /**
         * 删除一个值
         *
         * @param key - 关键字
         */
        public void del(String key) {
            invoke.del(key);
        }

        /**
         * 递增一个值,并返回最新值
         *
         * @param key - 关键字
         * @return 最新值
         */
        public Long incr(String key) {
            return invoke.incr(key);
        }

        /**
         * 递增一个值,并返回最新值
         *
         * @param key - 关键字
         * @return 最新值
         */
        public Long incr(String key, long total) {
            return invoke.incrBy(key, total);
        }

        /**
         * 设置过期时间
         *
         * @param key   - 关键字
         * @param expireTime - 过期时间,毫秒
         * @return
         */
        public Long expire(String key, long expireTime) {
            return invoke.pexpire(key, expireTime);
        }



        private static final String LOCK_SUCCESS = "OK";
        private static final String SET_IF_NOT_EXIST = "NX";//NX是不存在时才set
        private static final String SET_WITH_EXPIRE_TIME = "PX";//默认毫秒, 解释:EX是秒,PX是毫秒

        /**
         * 尝试获取分布式锁
         * @param lockKey 锁
         * @param requestId 请求标识
         * @param expireTime 超期时间
         * @return 是否获取成功
         */
        public boolean tryLock(String lockKey, String requestId, long expireTime) {

            String result = invoke.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
            return false;

        }


        private static final Long RELEASE_SUCCESS = 1L;

        /**
         * 释放分布式锁
         * @param lockKey 锁
         * @param requestId 请求标识
         * @return 是否释放成功
         */
        public boolean unLock(String lockKey, String requestId) {
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = evalScript(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
            return false;
        }

        private Object evalScript(String script, List keys, List args) {
            return (invoke instanceof Jedis)
                    ? ((Jedis)invoke).eval(script, keys, args)
                    : ((JedisCluster)invoke).eval(script, keys, args);
        }


        public void close() {
            if (invoke instanceof Jedis) {
                ((Jedis) invoke).close();
            }
        }



    }


}
package com.xxx.arch.seq.core;

import com.xxx.arch.seq.client.redis.JedisConfig;
import com.xxx.arch.seq.task.ContinuationOfLifeTask;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.*;

/**
 * 基于redis 的分布式锁
 */
@Slf4j
public final class DistributedLock {

    //续命任务延迟队列
    private static final DelayQueue QUEUE = new DelayQueue<>();
    //续命任务映射缓存
    private static final Map CACHE = new ConcurrentHashMap<>();
    //延长锁时间的守护线程
    private static final ExecutorService CONTINUATION_OF_LIFE_EXECUTOR = Executors.newSingleThreadExecutor();

    private static final long TIMEOUT = 1000;
    //限制最大长度
    private static final int SIZE = 5000;

    static {
        /**
         * 延长锁时间的核心线程代码
         */
        CONTINUATION_OF_LIFE_EXECUTOR.execute(() -> {
            while (true){
                //获取优先级最高的任务
                ContinuationOfLifeTask task;
                try {
                    task = QUEUE.take();
                } catch (InterruptedException e) {
                    continue;
                }
                if (task == null){
                    continue;
                }
                //验证是否活跃
                long nowTime = System.currentTimeMillis();
                if (task.isActive() && !task.isDiscarded(nowTime)){
                    //是否可以执行
                    if (task.isExecute(nowTime)){
                        task.execute();
                        //验证是否还需要续命
                        if (task.isActive() && task.checkCount()){
                            QUEUE.add(task);
                        }else {
                            //清理不需要任务的缓存
                            CACHE.remove(task.getId());
                        }
                    }else {
                        //清理不需要任务的缓存
                        //如果是时间没到不能执行的 不需要删除,一般不存在
                        if (nowTime >= task.getEndTime()){
                            CACHE.remove(task.getId());
                        }
                    }
                }else {
                    //清理过期的或者不活跃的任务
                    CACHE.remove(task.getId());
                }
            }
        });
    }

    private DistributedLock(){}

    /**
     * 获得分布式锁
     *
     * @param lockKey    - 分布式锁的key,保证全局唯一
     * @param requestId  - 本次请求的唯一ID,可用UUID等生成
     * @param expireTime - 锁获取后,使用的最长时间,毫秒
     * @param flagCount -  延续锁的次数
     * @return - 是否成功获取锁
     */
    public static boolean getDistributeLock(String lockKey, String requestId, long expireTime,int flagCount) {
        JedisConfig.JedisConn conn = null;
        try {
            conn = JedisConfig.getInstance().getConn();
            //获取锁
            if (QUEUE.size() < SIZE && conn.tryLock(lockKey, requestId, expireTime)){
                //创建一个续命任务
                ContinuationOfLifeTask task = ContinuationOfLifeTask.build(lockKey, requestId, expireTime, flagCount);
                //如果放入队列超时 或者失败
                if (!QUEUE.offer(task, TIMEOUT, TimeUnit.MILLISECONDS)){
                    //释放锁
                    releaseDistributeLock(lockKey, requestId);
                    //返回锁获取失败
                    return false;
                }
                //设置缓存
                CACHE.put(lockKey + requestId, task);
                return true;
            }
            return false;
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }

    /**
     * 获取分布式锁
     * 默认是延长3次锁寿命
     * @param lockKey  分布式锁的key,保证全局唯一
     * @param requestId  本次请求的唯一ID,可用UUID等生成
     * @param expireTime 锁获取后,使用的最长时间,毫秒
     * @return
     */
    public static boolean getDefaultDistributeLock(String lockKey, String requestId, long expireTime) {
        return getDistributeLock(lockKey, requestId, expireTime, 3);
    }

    /**
     * 获取永久分布式锁(默认24小时)
     * 使用时候记得一定要释放锁
     * @param lockKey
     * @param requestId
     * @return
     */
    public static boolean getPermanentDistributedLock(String lockKey, String requestId){
        return getDistributeLock(lockKey, requestId, 10000, 6 * 60 * 24);
    }

    /**
     * 释放分布式锁
     *
     * @param lockKey   - 分布式锁的key,保证全局唯一
     * @param requestId - 本次请求的唯一ID,可用UUID等生成
     * @return
     */
    public static boolean releaseDistributeLock(String lockKey, String requestId) {
        JedisConfig.JedisConn conn = null;
        try {
            ContinuationOfLifeTask task = CACHE.remove(lockKey + requestId);
            if (task != null){
                task.setActive(false);
                QUEUE.remove(task);
            }
            conn = JedisConfig.getInstance().getConn();
            return conn.unLock(lockKey, requestId);
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }
}
package com.xxx.arch.seq.task;

import com.xxx.arch.seq.client.redis.JedisConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 续命任务类
 */
@Slf4j
public class ContinuationOfLifeTask implements Delayed {
    private String id;
    //结束时间 即为需要续命的时间
    private long endTime;
    //是否还存活
    private volatile boolean active;
    //锁的key
    private String lockKey;
    //锁超时时间
    private long timeout;
    //锁的持续时间
    private long expireTime;
    //锁的续命次数 -1 代表无限
    private int flagCount;
    //续命次数统计 count 不能大于 flagCount
    private int count;

    private ContinuationOfLifeTask(String id, String lockKey, long expireTime, long endTime, long timeout, int flagCount) {
        this.id = id;
        this.lockKey = lockKey;
        this.expireTime = expireTime;
        this.endTime = endTime;
        this.timeout = timeout;
        this.flagCount = flagCount;
        this.active = true;
        this.count = 0;
    }

    public void execute() {
        //该续命任务是否还存活
        if (active) {
            JedisConfig.JedisConn conn = null;
            // 当前次数是否小于指定续命次数
            // 当前时间是否大于结束时间
            if (flagCount > count) {
                //重试次数
                int retryCount = 0;
                // 当前时间是否大于过期时间
                while (System.currentTimeMillis() >= endTime && retryCount < 3) {
                    try {
                        // 续命延期锁的过期时间
                        (conn = JedisConfig.getInstance().getConn()).expire(lockKey, expireTime);
                        long expiration = expireTime / 10;
                        //保证最少提前100毫秒
                        timeout = System.currentTimeMillis() + expireTime;
                        //更新结束时间
                        endTime = timeout - (expiration > 100 ? expiration : 100);
                        //增加执行次数
                        count++;
                        if (log.isDebugEnabled()) {
                            log.debug("【续命】锁关键字:{},续期:{}毫秒,计数:{}", lockKey, expireTime, count);
                        }
                        break;
                    } catch (Exception e) {
                        try {
                            log.error(e.getMessage(), e);
                            retryCount++;
                            Thread.sleep(100L);
                        } catch (InterruptedException ie) {
                            log.error(e.getMessage(), e);
                        }
                    } finally {
                        if (conn != null) {
                            conn.close();
                        }
                    }
                }
            }
        }
    }

    /**
     * 是否可以执行 必须是活跃且执行次数没有到最大值
     * 且时间没有过期的任务才能执行
     *
     * @return
     */
    public boolean isExecute(long nowTime) {
        return nowTime >= endTime && nowTime <= timeout && flagCount >= count;
    }

    /**
     * 是否丢弃
     *
     * @return
     */
    public boolean isDiscarded(long nowTime) {
        return nowTime > timeout || flagCount <= count;
    }

    public boolean checkCount() {
        return count < flagCount;
    }

    public static final ContinuationOfLifeTask build(String lockKey, String requestId, long expireTime, int flagCount) {
        if (StringUtils.isAnyBlank(lockKey, requestId)) {
            throw new IllegalArgumentException("lockKey Can't be blank !");
        }
        //校验入参如果锁定时间低于 1000 毫秒 延长到 1000 毫秒
        if (expireTime < 1000) {
            expireTime = 1000;
        }
        //校验 锁的续命次数 如果小于 -1 则默认等于3
        if (flagCount < -1) {
            flagCount = 3;
        }
        long expiration = expireTime / 10;
        //保证最少提前100毫秒
        long timeout = System.currentTimeMillis() + expireTime;
        long endTime = timeout - (expiration > 500 ? expiration : 500);
        return new ContinuationOfLifeTask(lockKey + requestId, lockKey, expireTime, endTime, timeout, flagCount);
    }

    public long getEndTime() {
        return endTime;
    }

    public ContinuationOfLifeTask setEndTime(long endTime) {
        this.endTime = endTime;
        return this;
    }

    public boolean isActive() {
        return active;
    }

    public ContinuationOfLifeTask setActive(boolean active) {
        this.active = active;
        return this;
    }

    public String getLockKey() {
        return lockKey;
    }

    public ContinuationOfLifeTask setLockKey(String lockKey) {
        this.lockKey = lockKey;
        return this;
    }

    public long getExpireTime() {
        return expireTime;
    }

    public ContinuationOfLifeTask setExpireTime(long expireTime) {
        this.expireTime = expireTime;
        return this;
    }

    public int getFlagCount() {
        return flagCount;
    }

    public ContinuationOfLifeTask setFlagCount(int flagCount) {
        this.flagCount = flagCount;
        return this;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((endTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }
}
package com.xxx.arch.seq.constant;

import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import org.apache.commons.lang3.StringUtils;

public class Constants {

	//apollo公共的ZK配置集群NameSpace
	public static final String ZK_NAME_SPACE = "33.zk";
	public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence-redis";
//	public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence";

	public static final String ARCH_SEQ_ZOOKEEPER_CONNECT_STRING = getConfig(ZK_NAME_SPACE,"zk.address", "");
	public static final String ARCH_SEQ_REDIS_NODES = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.nodes", "");
	public static final String ARCH_SEQ_REDIS_SENTINEL_MASTER = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.sentinel.master", "");
	public static final String ARCH_SEQ_REDIS_PASSWORD = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.common.key", "");

	public static String getConfig(String nameSpace,String key,String defultValue){
		if(StringUtils.isBlank(nameSpace)){
			return "";
		}
		Config config = ConfigService.getConfig(nameSpace);
		return config.getProperty(key,defultValue);
	}
}

上述内容就是Jedis中怎么实现分布式锁,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。


分享标题:Jedis中怎么实现分布式锁
网站路径:http://scyanting.com/article/jhdope.html