Redisson 看门狗(watch dog)机制了解吗?

Sherwin.Wei Lv7

Redisson 看门狗(watch dog)机制了解吗?

回答重点

Redisson 的看门狗(watchdog)主要用来避免 Redis 中的锁在超时后业务逻辑还未执行完毕,锁却被自动释放的情况。它通过定期刷新锁的过期时间来实现自动续期。

主要原理

  1. 定时刷新:如果当前分布式锁未设置过期时间,Redisson 基于 Netty 时间轮启动一个定时任务,定期向 Redis 发送命令更新锁的过期时间,默认每 10s 发送一次请求,每次续期 30s。
  2. 释放锁:当客户端主动释放锁时,Redisson 会取消看门狗刷新操作。如果客户端宕机了,定时任务自然也就无法执行了,此时等超时时间到了,锁也会自动释放。

扩展知识

Redisson 看门狗的核心源码分析

定时续期的核心涉及以下两个方法

  • scheduleExpirationRenewal:这个方法用于在客户端获取到锁后,启动锁的过期续期机制。
  • renewExpiration:这个方法的作用是定期刷新锁的过期时间(续期),确保锁不会因过期而被其他客户端抢占。

scheduleExpirationRenewal 方法

1)**创建 ExpirationEntry**:首先创建一个新的 ExpirationEntry 对象,用于存储锁的过期信息。

2)**putIfAbsent 添加条目**:使用 EXPIRATION_RENEWAL_MAP.putIfAbsent() 方法将新的 ExpirationEntry 添加到续期 map 中。如果该条目已存在,说明已有其他线程在续期该锁,将当前线程 ID 添加到现有条目中。

3)首次设置续期任务:如果是第一次创建该条目(即没有其他线程正在续期),则调用 renewExpiration() 方法启动定时任务,开始进行锁的续期。

4)异常处理:在执行续期操作时,若当前线程被中断,则会调用 cancelExpirationRenewal(threadId) 取消续期操作。

renewExpiration 方法

1)获取锁的过期续期条目ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());

  • 通过 getEntryName() 获取锁的唯一标识,再从 EXPIRATION_RENEWAL_MAP 中查找这个锁对应的 ExpirationEntry,该条目保存了该锁的相关信息,包括持锁的线程 ID 等。
  • 如果没有找到对应的条目(即 ee == null),则退出,表示没有需要续期的锁。

2)定时任务:通过 commandExecutor.getServiceManager().newTimeout() 创建一个定时任务,每隔一段时间执行一次续期操作。

  • 任务会在 internalLockLeaseTime / 3 毫秒后执行,定时刷新锁的过期时间。这个时间间隔通常设置为锁超时时间的三分之一,确保锁的过期时间在锁持有过程中得到续期。
  • internalLockLeaseTime 默认时间是 30s
image.png

3)任务执行内容

  1. 重新获取 ExpirationEntry:每次定时任务执行时,都会再次从 EXPIRATION_RENEWAL_MAP 获取锁的过期条目。
  2. 检查线程 ID:获取到的 ExpirationEntry 中包含了持有锁的线程 ID。如果该线程 ID 为空,表示没有线程持有该锁,直接返回。
  3. 异步续期:通过调用 renewExpirationAsync(threadId) 异步续期锁的过期时间。
  4. 续期结果处理:在 whenComplete 中处理续期结果:如果续期成功(restrue),则重新调度续期任务。如果续期失败,调用 cancelExpirationRenewal(null) 来取消续期操作,并清除相关的过期条目。

中文注释版源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 续期锁的过期时间
private void renewExpiration() {
// 从过期续期映射中获取锁的过期条目
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return; // 如果找不到条目,说明没有锁需要续期
}

// 创建一个定时任务,用于定期续期锁的过期时间
Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 重新获取过期条目
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return; // 如果条目已被移除,结束任务
}
// 获取持有锁的线程 ID
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return; // 如果没有线程 ID,说明没有线程持有该锁,结束任务
}

// 异步续期锁的过期时间
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
// 如果续期过程中发生错误,记录日志并移除续期条目
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}

if (res) {
// 如果续期成功,重新调度续期任务
renewExpiration();
} else {
// 如果续期失败,取消续期操作
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 定时任务每 internalLockLeaseTime / 3 毫秒执行一次

// 设置定时任务到过期条目中
ee.setTimeout(task);
}

// 启动续期操作,首次获取锁时会调用此方法
protected void scheduleExpirationRenewal(long threadId) {
// 创建新的过期条目
ExpirationEntry entry = new ExpirationEntry();
// 尝试将新的条目加入到续期映射中
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 如果条目已存在,说明已有其他线程在续期,添加当前线程 ID 到条目中
oldEntry.addThreadId(threadId);
} else {
// 如果是首次添加,开始进行续期操作
entry.addThreadId(threadId);
try {
// 启动锁过期续期任务
renewExpiration();
} finally {
// 如果当前线程被中断,取消续期操作
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}

核心流程是

  1. 客户端在获取锁时,启动一个定时任务,定期刷新锁的过期时间。
  2. 定时任务会在过期时间到期之前执行,通过异步方法续期锁的过期时间。
  3. 如果续期成功,定时任务会重新调度自己;如果失败,则取消续期操作。

实际续期是使用 lua 脚本实现,具体代码在 renewExpirationAsync 方法内,可以看到续期时间也是 30s:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 异步续期锁的过期时间
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
// 执行 Redis Lua 脚本进行锁的过期时间续期
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// Lua 脚本,检查锁的持有者并延长过期时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
// 锁的名称,传入 Redis 脚本
Collections.singletonList(getRawName()),
// 续期时间(单位:毫秒)和当前线程 ID
internalLockLeaseTime, getLockName(threadId));
}

再补充下取消续期的方法 cancelExpirationRenewal 的源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 取消锁的续期操作
protected void cancelExpirationRenewal(Long threadId, Boolean unlockResult) {
// 获取锁的过期续期条目
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return; // 如果找不到过期条目,退出方法
}

// 如果提供了线程 ID,移除该线程 ID
if (threadId != null) {
task.removeThreadId(threadId);
}

// 如果没有线程 ID 或没有线程持有该锁,取消续期任务
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel(); // 取消定时续期任务
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName()); // 移除过期条目
}
}

从上面定时任务续期的逻辑我们可以知道,从 EXPIRATION_RENEWAL_MAP 中移除锁的过期条目,就可以结束续期。

在调用 unlock 解锁的时候,就会触发 cancelExpirationRenewal 的调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
return commandExecutor.getServiceManager().execute(() -> unlockAsync0(threadId));
}

private RFuture<Void> unlockAsync0(long threadId) {
CompletionStage<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);

if (e != null) {
if (e instanceof CompletionException) {
throw (CompletionException) e;
}
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}

return null;
});

return new CompletableFutureWrapper<>(f);
}

unlockAsync0 可以看到,即使出了异常 e!=null 也不影响 cancelExpirationRenewal 的调用,即使解锁出现异常,续期的定时任务也会被取消,防止无限续期

锁未设置过期时间才会续期

并不是 redisson 分布式锁都会有看门狗续期机制,只有设置了 leaseTime ,即超时时间才会有自动续期。

我们看下 redisson 加锁方法,例如 tryLock,实际会触发 tryAcquireAsync 方法,而里面会有对 leaseTime 的判断:

image.png

scheduleExpirationRenewal 方法上面已经分析了,应该很清晰了。

如果获取锁的客户端挂了怎么办?

从上面的分析我们可以得知,续期是通过定时任务执行的,如果当前客户端宕机,那么定时任务就没了,不会再执行、按照默认每 10s 续期 30s 的情况来看,锁不可能无限续期,等 30s 时间一到,集群中的其他客户端就可以获取锁,不会阻碍正常业务的执行。

客户端宕机还需等待 30s 时间太长怎么办?

1)紧急情况可以直接在 redis 中删除对应的 key,这样对应的锁就释放了

2)可以通过 lockWatchdogTimeout 参数修改看门狗机制的超时时间

1
2
3
4
5
6
7
8
9
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setConnectionPoolSize(10);

// 设置 Redisson 锁的看门狗超时时间为 15 秒
config.setLockWatchdogTimeout(15000); // 时间单位:毫秒

RedissonClient redisson = Redisson.create(config);
Comments