/** * Tries to acquire the lock with defined <code>leaseTime</code>. * Waits up to defined <code>waitTime</code> if necessary until the lock became available. * * Lock will be released automatically after defined <code>leaseTime</code> interval. * * @param waitTime the maximum time to acquire the lock * @param leaseTime lease time * @param unit time unit * @return <code>true</code> if lock is successfully acquired, * otherwise <code>false</code> if lock is already set. * @throws InterruptedException - if the thread is interrupted */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; redis 的锁等待时间,怎么实现的
Redisson 的 tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法实现了锁等待时间,其实现原理涉及客户端轮询+Redis发布/订阅的混合机制。让我详细解释:
1. 核心实现机制
Redisson 的锁等待不是简单的睡眠轮询,而是结合了多种技术:
主要组件
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
throws InterruptedException {
// 实现包含以下关键部分:
// 1. 尝试直接获取锁
// 2. 如果失败,订阅锁释放消息
// 3. 定时重试,直到超时
// 4. 使用异步操作和Future
}
2. 具体实现步骤
第一步:首次尝试获取锁
// 伪代码示意
TTL = tryLockInner(leaseTime, unit); // 执行Lua脚本尝试获取锁
if (TTL == null) {
return true; // 立即获取成功
}
第二步:计算剩余等待时间
// 如果第一次尝试失败
long remainTime = waitTime; // 剩余等待时间
long current = System.currentTimeMillis();
while (remainTime > 0) {
// 进入循环等待
// 1. 订阅锁释放的Channel
// 2. 等待锁释放消息
// 3. 再次尝试获取锁
// 4. 更新剩余时间
}
3. 关键技术细节
Lua脚本实现原子操作
Redisson 使用 Lua 脚本在 Redis 中原子性地尝试获取锁:
-- tryLock 的核心 Lua 脚本
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程标识
local leaseTime = ARGV[2]; -- 租约时间
local waitTime = ARGV[3]; -- 等待时间
-- 尝试获取锁
if (redis.call('exists', key) == 0) then
redis.call('hset', key, threadId, 1);
redis.call('pexpire', key, leaseTime);
return nil; -- 获取成功
end;
-- 锁已存在,检查是否可重入
if (redis.call('hexists', key, threadId) == 1) then
redis.call('hincrby', key, threadId, 1);
redis.call('pexpire', key, leaseTime);
return nil; -- 可重入,获取成功
end;
-- 获取失败,返回锁的剩余生存时间
return redis.call('pttl', key);
发布/订阅实现通知机制
// 订阅锁释放的Channel
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
// 订阅超时
return false;
}
- 每个锁对应一个 Redis Channel
- 锁释放时,Redis 会发布消息
- 客户端收到消息后,立即尝试获取锁,而不是等待固定的轮询间隔
自适应重试策略
while (remainingWaitTime > 0) {
// 1. 等待锁释放消息(有超时时间)
entry.getLatch().tryAcquire(remainingWaitTime, TimeUnit.MILLISECONDS);
// 2. 计算新的剩余时间
long elapsed = System.currentTimeMillis() - startTime;
remainingWaitTime = waitTime - elapsed;
if (remainingWaitTime <= 0) {
return false; // 等待超时
}
// 3. 再次尝试获取锁
ttl = tryLockInner(leaseTime, unit);
if (ttl == null) {
return true; // 获取成功
}
// 4. 更新剩余时间,继续循环
}
4. 等待时间的优化处理
避免忙等待(Busy Waiting)
// 传统的忙等待(不推荐)
while (System.currentTimeMillis() < endTime) {
if (tryLock()) {
return true;
}
Thread.sleep(100); // 固定间隔睡眠
}
return false;
// Redisson 的优化等待
// 1. 订阅消息,阻塞等待通知
// 2. 收到通知后立即尝试
// 3. 减少不必要的轮询
时间精度控制
// 使用 System.currentTimeMillis() 而不是 System.nanoTime()
// 因为分布式系统中,毫秒级精度足够
// 且需要考虑时钟回拨问题
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < waitTime) {
// ...
}
5. 完整的 tryLock 流程
时序图
客户端 Redis
|---- tryLock(waitTime, leaseTime) -->|
|<-------- 返回锁的TTL(如果被占用)-----|
| |
|-- 订阅锁释放Channel ------------------>|
| |
| (等待期间) |
|<--- 收到锁释放消息(当其他客户端解锁)---|
| |
|---- 再次尝试获取锁 ------------------->|
|<-------- 返回成功/失败 ----------------|
代码流程
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1. 第一次尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return true; // 获取成功
}
// 2. 计算剩余等待时间
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false; // 立即超时
}
// 3. 订阅锁释放消息
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
// 订阅超时
unsubscribe(subscribeFuture, threadId);
return false;
}
try {
// 4. 循环尝试获取锁
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return true; // 获取成功
}
// 计算剩余时间
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
return false; // 等待超时
}
// 等待锁释放信号
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 使用信号量等待
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新剩余时间
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
return false;
}
}
} finally {
// 5. 取消订阅
unsubscribe(subscribeFuture, threadId);
}
}
6. 与简单轮询的区别
| 实现方式 | 优点 | 缺点 |
|---|---|---|
| 简单轮询 | 实现简单 | 1. 网络开销大<br>2. 延迟高<br>3. Redis压力大 |
| Redisson混合模式 | 1. 减少网络调用<br>2. 响应及时<br>3. Redis压力小 | 实现复杂 |
7. 关键优化点
减少网络往返
// 使用Lua脚本在一次网络调用中完成多个操作
// 1. 检查锁是否存在
// 2. 设置锁和过期时间
// 3. 返回结果
避免无效轮询
- 通过发布/订阅机制,只在锁可能释放时才尝试
- 设置合理的等待超时,避免长时间阻塞
处理网络分区
// 考虑网络不稳定的情况
try {
// 尝试获取锁
} catch (RedisConnectionException e) {
// 网络异常处理
// 可能重试或快速失败
}
8. 实际使用示例
RLock lock = redisson.getLock("myLock");
try {
// 等待最多10秒获取锁,锁租约30秒
if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
try {
// 获取锁成功,执行业务
doBusiness();
} finally {
lock.unlock();
}
} else {
// 获取锁失败
log.warn("获取锁超时");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断
}
9. 总结
Redisson 的 tryLock 等待时间实现是混合策略:
- 首次尝试:立即尝试获取锁
- 订阅机制:如果失败,订阅锁释放消息
- 定时重试:在等待时间内循环尝试
- 响应式等待:收到锁释放消息后立即重试
- 时间管理:精确控制剩余等待时间
这种设计既减少了不必要的轮询,又保证了及时响应,是分布式锁中等待机制的优秀实现。
定时重试:在等待时间内循环尝试 循环多久一次?
在 Redisson 的 tryLock 实现中,循环重试的间隔不是固定的,而是采用了智能的、动态调整的策略。让我详细解释其工作原理:
1. 核心机制:信号量等待
Redisson 不是通过简单的 Thread.sleep(fixedInterval) 来循环重试,而是使用了信号量(Semaphore) 来实现高效等待:
// 伪代码展示核心逻辑
while (remainingWaitTime > 0) {
// 尝试获取锁
Long ttl = tryAcquireOnce(leaseTime, unit, threadId);
if (ttl == null) {
return true; // 获取成功
}
// 动态计算等待时间
long waitTime = calculateWaitTime(ttl, remainingWaitTime);
// 使用信号量等待(有超时)
entry.getLatch().tryAcquire(waitTime, TimeUnit.MILLISECONDS);
// 更新剩余时间
remainingWaitTime = waitTime - (System.currentTimeMillis() - startTime);
}
2. 动态等待时间计算
计算逻辑
private long calculateWaitTime(Long ttl, long remainingWaitTime) {
if (ttl != null && ttl > 0) {
// 如果知道锁的TTL(剩余过期时间),取两者最小值
return Math.min(ttl, remainingWaitTime);
} else {
// 否则使用剩余等待时间
return remainingWaitTime;
}
}
实际策略
| 情况 | 等待策略 | 说明 |
|---|---|---|
| 知道TTL | 等待 min(TTL, 剩余等待时间) |
最多等到锁自动过期 |
| 不知道TTL | 等待整个剩余等待时间 | 保守策略 |
| 订阅到消息 | 立即唤醒 | 锁释放时立即重试 |
3. 三种重试时机
时机1:锁释放通知(最快)
// 订阅Redis的channel,当锁释放时立即收到通知
if (subscribeFuture.await(remainingWaitTime, TimeUnit.MILLISECONDS)) {
// 收到锁释放消息,立即重试
ttl = tryAcquireOnce(leaseTime, unit, threadId);
}
时机2:锁自动过期(较慢)
// 如果锁设置了过期时间,会在过期时自动释放
// 客户端等待TTL时间后重试
if (ttl > 0 && ttl < remainingWaitTime) {
// 最多等待锁的剩余生存时间
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
时机3:等待超时(最慢)
// 如果不知道锁何时释放,最多等到waitTime结束
entry.getLatch().tryAcquire(remainingWaitTime, TimeUnit.MILLISECONDS);
4. 源码中的具体实现
RedissonLock.tryLockInnerAsync 方法
private <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit,
long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(
this.getRawName(),
LongCodec.INSTANCE,
command,
// Lua脚本
"if (redis.call('exists', KEYS[1]) == 0) then " +
" redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
" redis.call('pexpire', KEYS[1], ARGV[1]); " +
" return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
" redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
" redis.call('pexpire', KEYS[1], ARGV[1]); " +
" return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", // 返回锁的TTL
Collections.singletonList(this.getRawName()),
new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)}
);
}
等待循环的关键代码
// 这是简化版的核心循环逻辑
while (true) {
long currentTime = System.currentTimeMillis();
// 尝试获取锁
ttl = tryLockInner(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
// 获取成功
if (ttl == null) {
return true;
}
// 计算剩余时间
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
return false; // 等待超时
}
// 等待信号量
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 等待锁自动释放
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 等待剩余时间
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新剩余时间
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
return false;
}
}
5. 信号量的工作方式
订阅机制
// 每个线程对应一个RedissonLockEntry
public class RedissonLockEntry {
private final Semaphore latch; // 信号量
private volatile int counter; // 计数器
private final String channelName;
public RedissonLockEntry() {
this.latch = new Semaphore(0); // 初始为0,需要等待
}
public void acquire() {
latch.acquireUninterruptibly();
}
public boolean tryAcquire(long timeout, TimeUnit unit) {
return latch.tryAcquire(timeout, unit);
}
public void release() {
latch.release();
}
}
消息通知
// 当Redis发布锁释放消息时
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnable = value.getListeners().poll();
if (runnable != null) {
runnable.run();
}
// 释放信号量,唤醒等待的线程
value.getLatch().release();
}
}
6. 实际示例分析
场景1:锁很快被释放
线程A: 持有锁
线程B: 尝试获取锁,失败
线程B: 订阅锁释放消息,等待信号量
线程A: 释放锁,发布消息
线程B: 立即收到消息,唤醒,立即重试获取锁
循环间隔: 几毫秒到几十毫秒
场景2:锁持有时间较长
线程A: 持有锁,租约30秒
线程B: 尝试获取锁,失败
线程B: 获取到锁的TTL=29秒
线程B: 等待29秒(信号量等待锁自动过期)
29秒后: 锁自动过期
线程B: 唤醒,重试获取锁
循环间隔: 29秒
场景3:没有TTL信息
线程A: 持有锁(没有设置过期时间)
线程B: 尝试获取锁,失败
线程B: 不知道TTL,等待剩余等待时间(比如10秒)
10秒后: 等待超时
线程B: 返回false
循环间隔: 10秒
7. 与简单轮询对比
| 重试策略 | 间隔 | 网络开销 | Redis负载 | 响应速度 |
|---|---|---|---|---|
| 固定间隔轮询 | 固定,如100ms | 高 | 高 | 慢(最大延迟=间隔) |
| Redisson智能等待 | 动态,0ms~TTL | 低 | 低 | 快(锁释放立即响应) |
| 忙等待 | 接近0ms | 极高 | 极高 | 极快 |
8. 性能优化点
减少网络往返
// 一次Lua脚本调用完成多个操作
// 避免了多个命令的网络往返
String luaScript =
"if (redis.call('exists', KEYS[1]) == 0) then " +
" redis.call('hset', KEYS[1], ARGV[2], 1); " +
" redis.call('pexpire', KEYS[1], ARGV[1]); " +
" return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
" redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
" redis.call('pexpire', KEYS[1], ARGV[1]); " +
" return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);";
避免无效重试
// 只在以下情况重试:
// 1. 收到锁释放消息(最快)
// 2. 锁可能已过期(根据TTL)
// 3. 等待时间耗尽(最慢)
// 不进行盲目重试
9. 总结:循环间隔策略
Redisson 的 tryLock 重试间隔是动态的、智能的:
- 最优情况:锁释放时立即重试(通过发布/订阅)
- 次优情况:等待锁的剩余TTL时间后重试
- 最差情况:等待剩余等待时间后重试
没有固定的循环间隔,而是根据实际情况动态调整,这种设计:
- ✅ 最大化响应速度:锁释放时立即重试
- ✅ 最小化网络开销:避免不必要的重试请求
- ✅ 减少Redis压力:只在必要时才请求Redis
- ✅ 节省客户端资源:通过信号量等待而非忙等待
这就是为什么 Redisson 的分布式锁性能优秀的原因之一。
锁等待不是redis原生支持是吧
锁等待机制不是Redis原生支持的,而是Redisson等客户端库在应用层实现的。
Redis原生支持 vs 客户端实现
Redis原生能力
Redis本身只提供基础的原子操作:
# 1. 原子性加锁(Redis 2.6.12+)
SET lock_key unique_value NX EX 30
# 2. Lua脚本支持原子操作
EVAL "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then
redis.call('expire', KEYS[1], ARGV[2])
return 1
else
return 0
end" 1 lock_key unique_value 30
Redis原生不支持的功能:
- ❌ 锁等待队列
- ❌ 自动重试机制
- ❌ 锁释放通知
- ❌ 可重入锁
- ❌ 公平锁/非公平锁
Redisson的客户端实现
Redisson在客户端实现了完整的锁等待机制:
1. 等待队列管理
// Redisson在客户端维护等待队列
public class RedissonLock {
private final ConcurrentMap<String, RedissonLockEntry> entries;
// 每个锁对应一个等待队列
public class RedissonLockEntry {
private final Semaphore latch; // 信号量控制等待
private int counter;
}
}
2. 发布/订阅机制
// 订阅锁释放的Redis Channel
public void subscribe() {
// 当其他客户端释放锁时,Redis会发布消息
// 订阅者收到消息后唤醒等待线程
this.pubSub.subscribe(getChannelName(), this);
}
3. 智能等待策略
// 不是简单的轮询,而是智能等待
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
// 1. 首次尝试获取锁
// 2. 失败后订阅锁释放消息
// 3. 根据锁的TTL动态计算等待时间
// 4. 使用信号量等待,避免忙轮询
}
实现对比
| 功能 | Redis原生 | Redisson实现 |
|---|---|---|
| 原子性加锁 | ✅ SET NX EX | ✅ 基于Redis命令 |
| 锁等待 | ❌ 不支持 | ✅ 客户端队列+信号量 |
| 锁释放通知 | ❌ 不支持 | ✅ Redis Pub/Sub |
| 可重入锁 | ❌ 不支持 | ✅ 客户端计数 |
| 自动续期 | ❌ 不支持 | ✅ Watch Dog机制 |
| 公平锁 | ❌ 不支持 | ✅ 客户端队列排序 |
为什么Redis不原生支持锁等待?
设计哲学
- 保持简单:Redis核心是内存数据结构存储
- 高性能:避免在服务端维护复杂状态
- 解耦:将业务逻辑放在客户端,Redis只提供基础能力
技术限制
# Redis是单线程的,如果在服务端实现锁等待:
# 1. 需要维护等待队列 -> 增加内存开销
# 2. 需要处理超时和唤醒 -> 增加CPU开销
# 3. 需要保证原子性 -> 增加实现复杂度
Redisson的等待实现原理
核心流程
客户端A尝试获取锁 → Redis: 锁已被B持有 → 返回TTL
客户端A订阅锁释放Channel → 进入等待状态
客户端B释放锁 → Redis发布释放消息
客户端A收到消息 → 立即重试获取锁
代码示例
// Redisson的tryLock实现简化版
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
// 1. 尝试获取锁(使用Lua脚本)
Long ttl = tryLockInner(leaseTime, unit);
if (ttl == null) {
return true; // 获取成功
}
// 2. 订阅锁释放消息
subscribeToLockRelease();
// 3. 循环等待(不是固定间隔轮询)
while (remainingTime > 0) {
// 根据锁的TTL动态计算等待时间
long timeToWait = Math.min(ttl, remainingTime);
// 使用信号量等待,避免CPU空转
entry.getLatch().tryAcquire(timeToWait, TimeUnit.MILLISECONDS);
// 锁释放时被唤醒,立即重试
ttl = tryLockInner(leaseTime, unit);
if (ttl == null) {
return true;
}
// 更新剩余时间
remainingTime -= elapsedTime;
}
return false;
}
其他客户端实现
除了Redisson,其他Redis客户端也有类似实现:
| 客户端库 | 锁等待实现 | 特点 |
|---|---|---|
| Redisson | 发布/订阅+信号量 | 最成熟,功能最全 |
| Lettuce | 轮询+退避策略 | 简单直接 |
| Jedis | 需要用户自己实现 | 灵活性高,但复杂 |
总结
- Redis原生只提供原子操作:SET NX EX和Lua脚本
- 锁等待是客户端功能:Redisson在应用层实现完整的锁等待机制
- 实现方式:结合Redis Pub/Sub和客户端信号量
- 优势:解耦、灵活、可定制
- 建议:生产环境推荐使用Redisson,避免自己重复造轮子
这种设计让Redis保持简单高效,同时客户端可以根据需求实现复杂的分布式锁功能。