Redission分布式锁源码简析

Redission获取分布式锁的源码简析

Redission介绍

Redission是一个Java与Redis交互的工具库,与我们常用的StringRedisTemplate类似,但是它们有不同的特点和使用方式

RedisClient

  1. 基于Redis协议的Java Redis客户端
  2. 提供多种分布式的操作与API,支持分布式对象,分布式锁,分布式集合等
  3. 可扩展性和灵活性很高,可以用来构建复杂的分布式应用

    StringRedisTemplate

  4. 基于Jedis或者Lettuce等连接池技术封装的Redis客户端
  5. 简化了Redis中字符串类型数据的操作,提供了一系列的方法来实现对字符串的读写等操作
  6. 基于模板方法的编程模式,将Redis操作封装在模板类中,简化了对Redis的使用
  7. 集成了Spring的事务管理机制,可以方便的进行事务控制

使用Redission分布式锁

Redission实现了RLock接口和RReadWriteLock接口来实现分布式锁和分布式读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 获取分布式锁对象
RLock lock = redisson.getLock("myLock");
try {
// 尝试获取锁,如果获取成功则执行临界区代码
if (lock.tryLock()) {
// 执行临界区代码
System.out.println("获取到锁,执行临界区代码");
Thread.sleep(1000); // 模拟临界区代码的执行时间
} else {
// 未获取到锁,执行其他逻辑
System.out.println("未获取到锁,执行其他逻辑");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}

源码简析

获取分布式锁

1
2
3
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name);
}

在获取分布式锁的时候,会新建一个RedissonLock对象,该对象的构造方法有两个参数,其中commandExecutor是Redission提供的一个接口,用于执行异步的Redis命令,name表示获取锁的名称。RedissionLock负责处理分布式锁的各种操作,使用我们的代码中getLock方法会拿到一个RedissionLock对象,然后使用该对象的tryLock()方法来尝试获取锁。tryLock()是非阻塞的获取锁,lock()是阻塞式的获取锁。

1
2
3
public boolean tryLock() {
return get(tryLockAsync());
}

tryLockAsync()方法执行获取锁的操作,但该方法是异步执行的,所以用get方法再来获取结果。源代码中tryLockAsync()方法实际上在执行下面的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Boolean> acquiredFuture;
if (leaseTime > 0) {
acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}

CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
// lock acquired
if (acquired) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return acquired;
});
return new CompletableFutureWrapper<>(f);
}

其中参数含义为:

  • waitTime: 等待获取锁的最大时间
  • leaseTime: 获取锁后持有的时间
  • unit: 时间单位
  • threadId: 线程ID

方法内部:

  1. 实际执行是tryLockInnerAsync()方法,但根据leaseTime的值会使用不同的参数,具体来说,当leaseTime大于0时,会直接使用该值,如果小于等于0(即不会主动释放锁),则会使用配置里面的值。
  2. 处理获取到锁的结果:
    • 如果获取锁成功,则根据leaseTime的值更新内部的锁持有时间:
      • leaseTime > 0,更新锁的持有时间
      • leaseTime < 0, 调用scheduleExpirationRenewal()方法定时续期锁的持有时间
    • 返回获取锁的结果

再来分析核心方法tryLockInnerAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

到这里就很容易看出来,其实该方法也是使用Redis的EVAL命令执行一段lua脚本来尝试获取锁,lua脚本的逻辑为:

  1. 使用exists命令检查锁的键KEYS[1]是否存在,如果不存在则说明锁可用,使用hincrby命令对锁的键进行初始化,并使用pexpire命令设置锁的过期时间,然后返回nil表示获取锁成功。
  2. 如果锁的键KEYS[1]已经存在,则使用hexists命令检查当前线程是否已经持有该锁。如果已持有,使用hincrby命令对锁的键进行更新,并使用pexpire命令重置锁的过期时间,然后返回nil表示获取锁成功。
  3. 如果锁的键已存在且当前线程未持有该锁,则使用pttl命令返回锁的剩余过期时间

问题与思考

  1. 为什么要用hincrby而不是hset
    • 如果使用hset命令设置锁的键,表示当前线程持有锁,那么其他线程需要等待锁释放。但是,在多线程环境中,多个线程可能同时执行到hset命令,导致多个线程都尝试设置锁的键,最终只有一个线程的设置会成功,其他线程会失败。这样就会产生竞态条件,导致多个线程同时认为自己获得了锁,造成问题。
    • 相比于hset命令,hincrby命令具备原子性。通过使用hincrby命令,可以在设置锁的键之前先检查是否已经存在锁的键,如果已存在,则说明锁被其他线程持有,当前线程无法获取锁;如果不存在,则通过hincrby命令设置锁的键,并且可以确保在多线程环境中只有一个线程的设置成功,其他线程会失败。这样就避免了竞态条件的发生。