加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Redisson分布式锁实现

发布时间:2020-12-16 04:36:58 所属栏目:安全 来源:网络整理
导读:1.? 基本用法 dependency groupId org.redisson / artifactId redisson version 3.8.2 Config config = new Config();config.useClusterServers() .setScanInterval( 2000) // cluster state scan interval in milliseconds .addNodeAddress("redis://127.0.

1.? 基本用法

<dependency>
    groupId>org.redisson</artifactId>redissonversion>3.8.2>
>
Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // cluster state scan interval in milliseconds
    .addNodeAddress("redis://127.0.0.1:7000","redis://127.0.0.1:7001")
    .addNodeAddress("redis://127.0.0.1:7002");

RedissonClient redisson = Redisson.create(config);

RLock lock = redisson.getLock("anyLock");

lock.lock();

try {
    ...
} finally {
    lock.unlock();
}

针对上面这段代码,重点看一下Redisson是如何基于Redis实现分布式锁的

Redisson中提供的加锁的方法有很多,但大致类似,此处只看lock()方法

更多请参见?https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers

2.? 加锁

可以看到,调用getLock()方法后实际返回一个RedissonLock对象,在RedissonLock对象的lock()方法主要调用tryAcquire()方法

由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

首先,看一下evalWriteAsync方法的定义

<T,R> RFuture<R> evalWriteAsync(String key,Codec codec,RedisCommand<T> evalCommandType,String script,List<Object> keys,Object ... params);

最后两个参数分别是keys和params

实际调用是这样的:

单独将调用的那一段摘出来看

commandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,command,"if (redis.call('exists',KEYS[1]) == 0) then " +
                      "redis.call('hset',KEYS[1],ARGV[2],1); " +
                      "redis.call('pexpire',ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists',ARGV[2]) == 1) then " +
                      "redis.call('hincrby',ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl',KEYS[1]);",Collections.<Object>singletonList(getName()),internalLockLeaseTime,getLockName(threadId));

结合上面的参数声明,我们可以知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假设前面获取锁时传的name是“abc”,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,这段脚本的意思是

  1、判断有没有一个叫“abc”的key

  2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对 ,并设置它的过期时间

  3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间

  4、返回“abc”的生存时间(毫秒)

这里用的数据结构是hash,hash的结构是: key? 字段1? 值1 字段2? 值2? 。。。

用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

3.? 解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(),RedisCommands.EVAL_BOOLEAN,KEYS[1]) == 0) then " +
                "redis.call('publish',KEYS[2],ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists',ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby',ARGV[3],-1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire',ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del',KEYS[1]); " +
                "redis.call('publish',ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;"asList(getName(),getChannelName()),LockPubSub.unlockMessage,getLockName(threadId));

}

我们还是假设name=abc,假设线程ID是Thread-1

同理,我们可以知道

KEYS[1]是getName(),即KEYS[1]=abc

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存时间

ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,上面脚本的意思是:

  1、判断是否存在一个叫“abc”的key

  2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1

  3、如果存在,进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在

  4、若字段不存在,返回空,若字段存在,则字段值减1

  5、若减完以后,字段值仍大于0,则返回0

  6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

4.? 等待

以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?

再回到前面获取锁的位置

@Override
public void lockInterruptibly(long leaseTime,TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime,unit,threadId);
     lock acquired
    if (ttl == null) {
        ;
    }

        订阅
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

     {
        while (true) {
            ttl = lock acquired
            ) {
                break;
            }

             waiting for message
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    }  {
        unsubscribe(future,threadId);
    }
        get(lockAsync(leaseTime,unit));
}


protected static final LockPubSub PUBSUB =  LockPubSub();

protected RFuture<RedissonLockEntry> subscribe( PUBSUB.subscribe(getEntryName(),getChannelName(),commandExecutor.getConnectionManager().getSubscribeService());
}

void unsubscribe(RFuture<RedissonLockEntry> future, threadId) {
    PUBSUB.unsubscribe(future.getNow(),getEntryName(),commandExecutor.getConnectionManager().getSubscribeService());
}

这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源

当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

5.? 小结

?

?

?

6.? 其它相关

《基于Redis的分布式锁的简单实现》

?

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读