目录
一、概要
1、核心源码
2、重试逻辑:
3、超时控制:
4、异步处理:
二、源码分析
1、获取锁的lua脚本
2、tryLock方法的可重试逻辑
3、小结:
三、看门狗机制
1、解决锁超时问题:
2、锁续期机制:
3、续约方法里面:
4、Task任务执行内容
5、锁释放
大家好,今天给大家分享一下redisson的锁是如何实现可重试的和锁续期的。
一、概要
1、核心源码
Redisson 的锁可重试机制主要通过tryAcquireAsync
方法和定时任务实现。以下是核心源码的简化版本,展示了重试逻辑的实现原理:
// RedissonLock.java 核心源码简化版
public class RedissonLock implements RLock {// 尝试获取锁,支持重试private <T> RFuture<T> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 转换等待时间long currentWaitTime = unit.toMillis(waitTime);// 首次尝试获取锁RFuture<Boolean> tryAcquireFuture = tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);// 处理获取结果return handleAcquireResult(tryAcquireFuture, currentWaitTime, leaseTime, unit, threadId);}// 处理获取锁的结果private <T> RFuture<T> handleAcquireResult(RFuture<Boolean> future, long currentWaitTime, long leaseTime, TimeUnit unit, long threadId) {// 如果获取成功,直接返回if (future.isSuccess() && future.getNow()) {return completedFuture(null);}// 如果等待时间已过,返回失败if (currentWaitTime <= 0) {return new FailedFuture<>(new LockException("Lock acquisition timeout"));}// 订阅锁释放事件,准备重试return subscribeAndRetry(currentWaitTime, leaseTime, unit, threadId);}// 订阅锁释放事件并安排重试private <T> RFuture<T> subscribeAndRetry(long currentWaitTime, long leaseTime, TimeUnit unit, long threadId) {// 创建订阅,当锁释放时会收到通知RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);// 订阅成功后安排重试return subscribeFuture.thenCompose(entry -> {// 计算剩余等待时间long elapsedTime = calculateElapsedTime(subscribeFuture);long currentRemainingTime = currentWaitTime - elapsedTime;// 如果时间已过,取消订阅并返回失败if (currentRemainingTime <= 0) {unsubscribe(entry, threadId);return new FailedFuture<>(new LockException("Lock acquisition timeout"));}// 创建延迟重试任务return scheduleRetryTask(currentRemainingTime, leaseTime, unit, threadId, entry);});}// 安排重试任务private <T> RFuture<T> scheduleRetryTask(long waitTime, long leaseTime, TimeUnit unit, long threadId,RedissonLockEntry entry) {// 创建一个延迟任务,在收到锁释放通知后重试CompletableFuture<T> result = new CompletableFuture<>();// 注册锁释放监听器entry.addListener(new LockListener() {@Overridepublic void onLockReleased() {// 锁释放后,立即尝试重新获取RFuture<Boolean> tryAcquireFuture = tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);// 处理重试结果handleRetryResult(tryAcquireFuture, waitTime, leaseTime, unit, threadId, entry, result);}});return new CompletableFutureWrapper<>(result);}// 处理重试结果private void handleRetryResult(RFuture<Boolean> future, long waitTime, long leaseTime, TimeUnit unit, long threadId,RedissonLockEntry entry,CompletableFuture result) {future.whenComplete((acquired, e) -> {if (e != null) {unsubscribe(entry, threadId);result.completeExceptionally(e);return;}if (acquired) {unsubscribe(entry, threadId);result.complete(null);} else {// 未获取到锁,继续等待或超时long currentWaitTime = calculateRemainingTime(waitTime);if (currentWaitTime <= 0) {unsubscribe(entry, threadId);result.completeExceptionally(new LockException("Lock acquisition timeout"));}// 继续等待锁释放事件...}});}// 其他辅助方法...
}
2、重试逻辑:
- 通过
tryAcquireAsync
方法尝试获取锁,失败时进入重试流程 - 使用
subscribe
方法订阅锁释放事件,当锁被释放时收到通知 - 收到通知后立即重试获取锁,实现快速响应
3、超时控制:
- 每次重试前计算剩余等待时间
currentWaitTime
- 如果时间耗尽,取消订阅并返回超时异常
- 通过
calculateElapsedTime
和calculateRemainingTime
方法精确控制时间
4、异步处理:
- 所有操作都通过
RFuture
异步执行,不阻塞线程 - 使用
thenCompose
和whenComplete
等方法组合异步操作 - 通过
CompletableFuture
实现结果回调
二、源码分析
1、获取锁的lua脚本
2、tryLock方法的可重试逻辑
如果在time时间内获取到了锁释放的通知,继续往下执行:
3、小结:
lua脚本释放锁的时候,会发一个通知。外部获取锁失败时,会计算还有没有剩余等待时间,如果有,那么就订阅锁释放的通知,锁释放了之后才开始重试获取锁。如果还是获取失败,后面还是一样的通过一个while(true)循环,不断地检查是否有剩余时间、订阅锁释放通知、重试获取锁。直到获取到锁或者没有了剩余等待时间才结束。
三、看门狗机制
1、解决锁超时问题:
线程阻塞,锁超时释放后。别的线程获取到了锁。
2、锁续期机制:
3、续约方法里面:
4、Task任务执行内容
5、锁释放
那既然锁是递归地无限刷新锁的剩余时间的,那锁什么时候释放呢?
答案:当调用unlock释放锁的是时候释放。
具体取消到期更新方法: