Redisson 看门狗(watch dog)机制了解吗?
Redisson 看门狗(watch dog)机制了解吗? 回答重点 Redisson 的看门狗(watchdog)主要用来避免 Redis 中的锁在超时后业务逻辑还未执行完毕,锁却被自动释放的情况。它通过定期刷新锁的过期时间来实现自动续期。
主要原理 :
定时刷新 :如果当前分布式锁未设置过期时间 ,Redisson 基于 Netty 时间轮启动一个定时任务,定期向 Redis 发送命令更新锁的过期时间,默认每 10s 发送一次请求,每次续期 30s。
释放锁 :当客户端主动释放锁时,Redisson 会取消看门狗刷新操作。如果客户端宕机了,定时任务自然也就无法执行了,此时等超时时间到了,锁也会自动释放。
扩展知识 Redisson 看门狗的核心源码分析 定时续期的核心涉及以下两个方法 :
scheduleExpirationRenewal:这个方法用于在客户端获取到锁后,启动锁的过期续期机制。
renewExpiration:这个方法的作用是定期刷新锁的过期时间(续期),确保锁不会因过期而被其他客户端抢占。
scheduleExpirationRenewal 方法 1)**创建 ExpirationEntry**:首先创建一个新的 ExpirationEntry 对象,用于存储锁的过期信息。
2)**putIfAbsent 添加条目**:使用 EXPIRATION_RENEWAL_MAP.putIfAbsent() 方法将新的 ExpirationEntry 添加到续期 map 中。如果该条目已存在,说明已有其他线程在续期该锁,将当前线程 ID 添加到现有条目中。
3)首次设置续期任务 :如果是第一次创建该条目(即没有其他线程正在续期),则调用 renewExpiration() 方法启动定时任务,开始进行锁的续期。
4)异常处理 :在执行续期操作时,若当前线程被中断,则会调用 cancelExpirationRenewal(threadId) 取消续期操作。
renewExpiration 方法 1)获取锁的过期续期条目 :ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
通过 getEntryName() 获取锁的唯一标识,再从 EXPIRATION_RENEWAL_MAP 中查找这个锁对应的 ExpirationEntry,该条目保存了该锁的相关信息,包括持锁的线程 ID 等。
如果没有找到对应的条目(即 ee == null),则退出,表示没有需要续期的锁。
2)定时任务 :通过 commandExecutor.getServiceManager().newTimeout() 创建一个定时任务,每隔一段时间执行一次续期操作。
任务会在 internalLockLeaseTime / 3 毫秒后执行,定时刷新锁的过期时间。这个时间间隔通常设置为锁超时时间的三分之一,确保锁的过期时间在锁持有过程中得到续期。
internalLockLeaseTime 默认时间是 30s
3)任务执行内容 :
重新获取 ExpirationEntry :每次定时任务执行时,都会再次从 EXPIRATION_RENEWAL_MAP 获取锁的过期条目。
检查线程 ID :获取到的 ExpirationEntry 中包含了持有锁的线程 ID。如果该线程 ID 为空,表示没有线程持有该锁,直接返回。
异步续期 :通过调用 renewExpirationAsync(threadId) 异步续期锁的过期时间。
续期结果处理 :在 whenComplete 中处理续期结果:如果续期成功(res 为 true),则重新调度续期任务。如果续期失败,调用 cancelExpirationRenewal(null) 来取消续期操作,并清除相关的过期条目。
中文注释版源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 private void renewExpiration () { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null ) { return ; } Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask () { @Override public void run (Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null ) { return ; } Long threadId = ent.getFirstThreadId(); if (threadId == null ) { return ; } CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null ) { log.error("Can't update lock {} expiration" , getRawName(), e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return ; } if (res) { renewExpiration(); } else { cancelExpirationRenewal(null ); } }); } }, internalLockLeaseTime / 3 , TimeUnit.MILLISECONDS); ee.setTimeout(task); } protected void scheduleExpirationRenewal (long threadId) { ExpirationEntry entry = new ExpirationEntry (); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null ) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } } }
核心流程是 :
客户端在获取锁时,启动一个定时任务,定期刷新锁的过期时间。
定时任务会在过期时间到期之前执行,通过异步方法续期锁的过期时间。
如果续期成功,定时任务会重新调度自己;如果失败,则取消续期操作。
实际续期是使用 lua 脚本实现,具体代码在 renewExpirationAsync 方法内,可以看到续期时间也是 30s:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected CompletionStage<Boolean> renewExpirationAsync (long threadId) { return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" , Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
再补充下取消续期的方法 cancelExpirationRenewal 的源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 protected void cancelExpirationRenewal (Long threadId, Boolean unlockResult) { ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null ) { return ; } if (threadId != null ) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null ) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } }
从上面定时任务续期的逻辑我们可以知道,从 EXPIRATION_RENEWAL_MAP 中移除锁的过期条目,就可以结束续期。
在调用 unlock 解锁的时候,就会触发 cancelExpirationRenewal 的调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public void unlock () { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync (long threadId) { return commandExecutor.getServiceManager().execute(() -> unlockAsync0(threadId)); } private RFuture<Void> unlockAsync0 (long threadId) { CompletionStage<Boolean> future = unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null ) { if (e instanceof CompletionException) { throw (CompletionException) e; } throw new CompletionException (e); } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException (cause); } return null ; }); return new CompletableFutureWrapper <>(f); }
从 unlockAsync0 可以看到,即使出了异常 e!=null 也不影响 cancelExpirationRenewal 的调用,即使解锁出现异常,续期的定时任务也会被取消,防止无限续期 。
锁未设置过期时间才会续期 并不是 redisson 分布式锁都会有看门狗续期机制,只有设置了 leaseTime ,即超时时间才会有自动续期。
我们看下 redisson 加锁方法,例如 tryLock,实际会触发 tryAcquireAsync 方法,而里面会有对 leaseTime 的判断:
scheduleExpirationRenewal 方法上面已经分析了,应该很清晰了。
如果获取锁的客户端挂了怎么办? 从上面的分析我们可以得知,续期是通过定时任务执行的,如果当前客户端宕机,那么定时任务就没了,不会再执行、按照默认每 10s 续期 30s 的情况来看,锁不可能无限续期,等 30s 时间一到,集群中的其他客户端就可以获取锁,不会阻碍正常业务的执行。
客户端宕机还需等待 30s 时间太长怎么办?
1)紧急情况可以直接在 redis 中删除对应的 key,这样对应的锁就释放了
2)可以通过 lockWatchdogTimeout 参数修改看门狗机制的超时时间
1 2 3 4 5 6 7 8 9 Config config = new Config ();config.useSingleServer() .setAddress("redis://127.0.0.1:6379" ) .setConnectionPoolSize(10 ); config.setLockWatchdogTimeout(15000 ); RedissonClient redisson = Redisson.create(config);