Redisson分布式锁等待时间实现机制

小豆丁 8小时前 ⋅ 3 阅读
聊天记录
头像

/** * Tries to acquire the lock with defined <code>leaseTime</code>. * Waits up to defined <code>waitTime</code> if necessary until the lock became available. * * Lock will be released automatically after defined <code>leaseTime</code> interval. * * @param waitTime the maximum time to acquire the lock * @param leaseTime lease time * @param unit time unit * @return <code>true</code> if lock is successfully acquired, * otherwise <code>false</code> if lock is already set. * @throws InterruptedException - if the thread is interrupted */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; redis 的锁等待时间,怎么实现的

头像

Redisson 的 tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法实现了锁等待时间,其实现原理涉及客户端轮询+Redis发布/订阅的混合机制。让我详细解释:

1. 核心实现机制

Redisson 的锁等待不是简单的睡眠轮询,而是结合了多种技术:

主要组件

JAVA
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) 
    throws InterruptedException {
    // 实现包含以下关键部分:
    // 1. 尝试直接获取锁
    // 2. 如果失败,订阅锁释放消息
    // 3. 定时重试,直到超时
    // 4. 使用异步操作和Future
}

2. 具体实现步骤

第一步:首次尝试获取锁

JAVA
// 伪代码示意
TTL = tryLockInner(leaseTime, unit);  // 执行Lua脚本尝试获取锁
if (TTL == null) {
    return true;  // 立即获取成功
}

第二步:计算剩余等待时间

JAVA
// 如果第一次尝试失败
long remainTime = waitTime;  // 剩余等待时间
long current = System.currentTimeMillis();

while (remainTime > 0) {
    // 进入循环等待
    // 1. 订阅锁释放的Channel
    // 2. 等待锁释放消息
    // 3. 再次尝试获取锁
    // 4. 更新剩余时间
}

3. 关键技术细节

Lua脚本实现原子操作

Redisson 使用 Lua 脚本在 Redis 中原子性地尝试获取锁:

LUA
-- tryLock 的核心 Lua 脚本
local key = KEYS[1];  -- 锁的key
local threadId = ARGV[1];  -- 线程标识
local leaseTime = ARGV[2];  -- 租约时间
local waitTime = ARGV[3];  -- 等待时间

-- 尝试获取锁
if (redis.call('exists', key) == 0) then
    redis.call('hset', key, threadId, 1);
    redis.call('pexpire', key, leaseTime);
    return nil;  -- 获取成功
end;

-- 锁已存在,检查是否可重入
if (redis.call('hexists', key, threadId) == 1) then
    redis.call('hincrby', key, threadId, 1);
    redis.call('pexpire', key, leaseTime);
    return nil;  -- 可重入,获取成功
end;

-- 获取失败,返回锁的剩余生存时间
return redis.call('pttl', key);

发布/订阅实现通知机制

JAVA
// 订阅锁释放的Channel
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    // 订阅超时
    return false;
}
  • 每个锁对应一个 Redis Channel
  • 锁释放时,Redis 会发布消息
  • 客户端收到消息后,立即尝试获取锁,而不是等待固定的轮询间隔

自适应重试策略

JAVA
while (remainingWaitTime > 0) {
    // 1. 等待锁释放消息(有超时时间)
    entry.getLatch().tryAcquire(remainingWaitTime, TimeUnit.MILLISECONDS);
    
    // 2. 计算新的剩余时间
    long elapsed = System.currentTimeMillis() - startTime;
    remainingWaitTime = waitTime - elapsed;
    
    if (remainingWaitTime <= 0) {
        return false;  // 等待超时
    }
    
    // 3. 再次尝试获取锁
    ttl = tryLockInner(leaseTime, unit);
    if (ttl == null) {
        return true;  // 获取成功
    }
    
    // 4. 更新剩余时间,继续循环
}

4. 等待时间的优化处理

避免忙等待(Busy Waiting)

JAVA
// 传统的忙等待(不推荐)
while (System.currentTimeMillis() < endTime) {
    if (tryLock()) {
        return true;
    }
    Thread.sleep(100);  // 固定间隔睡眠
}
return false;

// Redisson 的优化等待
// 1. 订阅消息,阻塞等待通知
// 2. 收到通知后立即尝试
// 3. 减少不必要的轮询

时间精度控制

JAVA
// 使用 System.currentTimeMillis() 而不是 System.nanoTime()
// 因为分布式系统中,毫秒级精度足够
// 且需要考虑时钟回拨问题
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < waitTime) {
    // ...
}

5. 完整的 tryLock 流程

时序图

客户端                               Redis
  |---- tryLock(waitTime, leaseTime) -->|
  |<-------- 返回锁的TTL(如果被占用)-----|
  |                                       |
  |-- 订阅锁释放Channel ------------------>|
  |                                       |
  |        (等待期间)                      |
  |<--- 收到锁释放消息(当其他客户端解锁)---|
  |                                       |
  |---- 再次尝试获取锁 ------------------->|
  |<-------- 返回成功/失败 ----------------|

代码流程

JAVA
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) 
    throws InterruptedException {
    
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    
    // 1. 第一次尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return true;  // 获取成功
    }
    
    // 2. 计算剩余等待时间
    time -= (System.currentTimeMillis() - current);
    if (time <= 0) {
        return false;  // 立即超时
    }
    
    // 3. 订阅锁释放消息
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        // 订阅超时
        unsubscribe(subscribeFuture, threadId);
        return false;
    }
    
    try {
        // 4. 循环尝试获取锁
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            return false;
        }
        
        while (true) {
            long currentTime = System.currentTimeMillis();
            
            // 再次尝试获取锁
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                return true;  // 获取成功
            }
            
            // 计算剩余时间
            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                return false;  // 等待超时
            }
            
            // 等待锁释放信号
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                // 使用信号量等待
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            
            // 更新剩余时间
            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                return false;
            }
        }
    } finally {
        // 5. 取消订阅
        unsubscribe(subscribeFuture, threadId);
    }
}

6. 与简单轮询的区别

实现方式 优点 缺点
简单轮询 实现简单 1. 网络开销大<br>2. 延迟高<br>3. Redis压力大
Redisson混合模式 1. 减少网络调用<br>2. 响应及时<br>3. Redis压力小 实现复杂

7. 关键优化点

减少网络往返

JAVA
// 使用Lua脚本在一次网络调用中完成多个操作
// 1. 检查锁是否存在
// 2. 设置锁和过期时间
// 3. 返回结果

避免无效轮询

  • 通过发布/订阅机制,只在锁可能释放时才尝试
  • 设置合理的等待超时,避免长时间阻塞

处理网络分区

JAVA
// 考虑网络不稳定的情况
try {
    // 尝试获取锁
} catch (RedisConnectionException e) {
    // 网络异常处理
    // 可能重试或快速失败
}

8. 实际使用示例

JAVA
RLock lock = redisson.getLock("myLock");
try {
    // 等待最多10秒获取锁,锁租约30秒
    if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
        try {
            // 获取锁成功,执行业务
            doBusiness();
        } finally {
            lock.unlock();
        }
    } else {
        // 获取锁失败
        log.warn("获取锁超时");
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    // 处理中断
}

9. 总结

Redisson 的 tryLock 等待时间实现是混合策略

  1. 首次尝试:立即尝试获取锁
  2. 订阅机制:如果失败,订阅锁释放消息
  3. 定时重试:在等待时间内循环尝试
  4. 响应式等待:收到锁释放消息后立即重试
  5. 时间管理:精确控制剩余等待时间

这种设计既减少了不必要的轮询,又保证了及时响应,是分布式锁中等待机制的优秀实现。

头像

定时重试:在等待时间内循环尝试 循环多久一次?

头像

在 Redisson 的 tryLock 实现中,循环重试的间隔不是固定的,而是采用了智能的、动态调整的策略。让我详细解释其工作原理:

1. 核心机制:信号量等待

Redisson 不是通过简单的 Thread.sleep(fixedInterval) 来循环重试,而是使用了信号量(Semaphore) 来实现高效等待:

JAVA
// 伪代码展示核心逻辑
while (remainingWaitTime > 0) {
    // 尝试获取锁
    Long ttl = tryAcquireOnce(leaseTime, unit, threadId);
    if (ttl == null) {
        return true;  // 获取成功
    }
    
    // 动态计算等待时间
    long waitTime = calculateWaitTime(ttl, remainingWaitTime);
    
    // 使用信号量等待(有超时)
    entry.getLatch().tryAcquire(waitTime, TimeUnit.MILLISECONDS);
    
    // 更新剩余时间
    remainingWaitTime = waitTime - (System.currentTimeMillis() - startTime);
}

2. 动态等待时间计算

计算逻辑

JAVA
private long calculateWaitTime(Long ttl, long remainingWaitTime) {
    if (ttl != null && ttl > 0) {
        // 如果知道锁的TTL(剩余过期时间),取两者最小值
        return Math.min(ttl, remainingWaitTime);
    } else {
        // 否则使用剩余等待时间
        return remainingWaitTime;
    }
}

实际策略

情况 等待策略 说明
知道TTL 等待 min(TTL, 剩余等待时间) 最多等到锁自动过期
不知道TTL 等待整个剩余等待时间 保守策略
订阅到消息 立即唤醒 锁释放时立即重试

3. 三种重试时机

时机1:锁释放通知(最快)

JAVA
// 订阅Redis的channel,当锁释放时立即收到通知
if (subscribeFuture.await(remainingWaitTime, TimeUnit.MILLISECONDS)) {
    // 收到锁释放消息,立即重试
    ttl = tryAcquireOnce(leaseTime, unit, threadId);
}

时机2:锁自动过期(较慢)

JAVA
// 如果锁设置了过期时间,会在过期时自动释放
// 客户端等待TTL时间后重试
if (ttl > 0 && ttl < remainingWaitTime) {
    // 最多等待锁的剩余生存时间
    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}

时机3:等待超时(最慢)

JAVA
// 如果不知道锁何时释放,最多等到waitTime结束
entry.getLatch().tryAcquire(remainingWaitTime, TimeUnit.MILLISECONDS);

4. 源码中的具体实现

RedissonLock.tryLockInnerAsync 方法

JAVA
private <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, 
                                         long threadId, RedisStrictCommand<T> command) {
    return this.evalWriteAsync(
        this.getRawName(), 
        LongCodec.INSTANCE, 
        command,
        // Lua脚本
        "if (redis.call('exists', KEYS[1]) == 0) then " +
        "    redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "    redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "    return nil; " +
        "end; " +
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "    redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "    redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "    return nil; " +
        "end; " +
        "return redis.call('pttl', KEYS[1]);",  // 返回锁的TTL
        Collections.singletonList(this.getRawName()), 
        new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)}
    );
}

等待循环的关键代码

JAVA
// 这是简化版的核心循环逻辑
while (true) {
    long currentTime = System.currentTimeMillis();
    
    // 尝试获取锁
    ttl = tryLockInner(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    
    // 获取成功
    if (ttl == null) {
        return true;
    }
    
    // 计算剩余时间
    time -= (System.currentTimeMillis() - currentTime);
    if (time <= 0) {
        return false;  // 等待超时
    }
    
    // 等待信号量
    currentTime = System.currentTimeMillis();
    if (ttl >= 0 && ttl < time) {
        // 等待锁自动释放
        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
        // 等待剩余时间
        getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }
    
    // 更新剩余时间
    time -= (System.currentTimeMillis() - currentTime);
    if (time <= 0) {
        return false;
    }
}

5. 信号量的工作方式

订阅机制

JAVA
// 每个线程对应一个RedissonLockEntry
public class RedissonLockEntry {
    private final Semaphore latch;  // 信号量
    private volatile int counter;    // 计数器
    private final String channelName;
    
    public RedissonLockEntry() {
        this.latch = new Semaphore(0);  // 初始为0,需要等待
    }
    
    public void acquire() {
        latch.acquireUninterruptibly();
    }
    
    public boolean tryAcquire(long timeout, TimeUnit unit) {
        return latch.tryAcquire(timeout, unit);
    }
    
    public void release() {
        latch.release();
    }
}

消息通知

JAVA
// 当Redis发布锁释放消息时
protected void onMessage(RedissonLockEntry value, Long message) {
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnable = value.getListeners().poll();
        if (runnable != null) {
            runnable.run();
        }
        
        // 释放信号量,唤醒等待的线程
        value.getLatch().release();
    }
}

6. 实际示例分析

场景1:锁很快被释放

线程A: 持有锁
线程B: 尝试获取锁,失败
线程B: 订阅锁释放消息,等待信号量
线程A: 释放锁,发布消息
线程B: 立即收到消息,唤醒,立即重试获取锁
循环间隔: 几毫秒到几十毫秒

场景2:锁持有时间较长

线程A: 持有锁,租约30秒
线程B: 尝试获取锁,失败
线程B: 获取到锁的TTL=29秒
线程B: 等待29秒(信号量等待锁自动过期)
29秒后: 锁自动过期
线程B: 唤醒,重试获取锁
循环间隔: 29秒

场景3:没有TTL信息

线程A: 持有锁(没有设置过期时间)
线程B: 尝试获取锁,失败
线程B: 不知道TTL,等待剩余等待时间(比如10秒)
10秒后: 等待超时
线程B: 返回false
循环间隔: 10秒

7. 与简单轮询对比

重试策略 间隔 网络开销 Redis负载 响应速度
固定间隔轮询 固定,如100ms 慢(最大延迟=间隔)
Redisson智能等待 动态,0ms~TTL 快(锁释放立即响应)
忙等待 接近0ms 极高 极高 极快

8. 性能优化点

减少网络往返

JAVA
// 一次Lua脚本调用完成多个操作
// 避免了多个命令的网络往返
String luaScript = 
    "if (redis.call('exists', KEYS[1]) == 0) then " +
    "    redis.call('hset', KEYS[1], ARGV[2], 1); " +
    "    redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "    return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    "    redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "    redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "    return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);";

避免无效重试

JAVA
// 只在以下情况重试:
// 1. 收到锁释放消息(最快)
// 2. 锁可能已过期(根据TTL)
// 3. 等待时间耗尽(最慢)

// 不进行盲目重试

9. 总结:循环间隔策略

Redisson 的 tryLock 重试间隔是动态的、智能的

  1. 最优情况:锁释放时立即重试(通过发布/订阅)
  2. 次优情况:等待锁的剩余TTL时间后重试
  3. 最差情况:等待剩余等待时间后重试

没有固定的循环间隔,而是根据实际情况动态调整,这种设计:

  • 最大化响应速度:锁释放时立即重试
  • 最小化网络开销:避免不必要的重试请求
  • 减少Redis压力:只在必要时才请求Redis
  • 节省客户端资源:通过信号量等待而非忙等待

这就是为什么 Redisson 的分布式锁性能优秀的原因之一。

头像

锁等待不是redis原生支持是吧

头像

锁等待机制不是Redis原生支持的,而是Redisson等客户端库在应用层实现的。

Redis原生支持 vs 客户端实现

Redis原生能力

Redis本身只提供基础的原子操作:

BASH
# 1. 原子性加锁(Redis 2.6.12+)
SET lock_key unique_value NX EX 30

# 2. Lua脚本支持原子操作
EVAL "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then 
        redis.call('expire', KEYS[1], ARGV[2]) 
        return 1 
      else 
        return 0 
      end" 1 lock_key unique_value 30

Redis原生不支持的功能:

  • ❌ 锁等待队列
  • ❌ 自动重试机制
  • ❌ 锁释放通知
  • ❌ 可重入锁
  • ❌ 公平锁/非公平锁

Redisson的客户端实现

Redisson在客户端实现了完整的锁等待机制:

1. 等待队列管理

JAVA
// Redisson在客户端维护等待队列
public class RedissonLock {
    private final ConcurrentMap<String, RedissonLockEntry> entries;
    
    // 每个锁对应一个等待队列
    public class RedissonLockEntry {
        private final Semaphore latch;  // 信号量控制等待
        private int counter;
    }
}

2. 发布/订阅机制

JAVA
// 订阅锁释放的Redis Channel
public void subscribe() {
    // 当其他客户端释放锁时,Redis会发布消息
    // 订阅者收到消息后唤醒等待线程
    this.pubSub.subscribe(getChannelName(), this);
}

3. 智能等待策略

JAVA
// 不是简单的轮询,而是智能等待
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
    // 1. 首次尝试获取锁
    // 2. 失败后订阅锁释放消息
    // 3. 根据锁的TTL动态计算等待时间
    // 4. 使用信号量等待,避免忙轮询
}

实现对比

功能 Redis原生 Redisson实现
原子性加锁 ✅ SET NX EX ✅ 基于Redis命令
锁等待 ❌ 不支持 ✅ 客户端队列+信号量
锁释放通知 ❌ 不支持 ✅ Redis Pub/Sub
可重入锁 ❌ 不支持 ✅ 客户端计数
自动续期 ❌ 不支持 ✅ Watch Dog机制
公平锁 ❌ 不支持 ✅ 客户端队列排序

为什么Redis不原生支持锁等待?

设计哲学

  1. 保持简单:Redis核心是内存数据结构存储
  2. 高性能:避免在服务端维护复杂状态
  3. 解耦:将业务逻辑放在客户端,Redis只提供基础能力

技术限制

BASH
# Redis是单线程的,如果在服务端实现锁等待:
# 1. 需要维护等待队列 -> 增加内存开销
# 2. 需要处理超时和唤醒 -> 增加CPU开销
# 3. 需要保证原子性 -> 增加实现复杂度

Redisson的等待实现原理

核心流程

客户端A尝试获取锁 → Redis: 锁已被B持有 → 返回TTL
客户端A订阅锁释放Channel → 进入等待状态
客户端B释放锁 → Redis发布释放消息
客户端A收到消息 → 立即重试获取锁

代码示例

JAVA
// Redisson的tryLock实现简化版
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
    // 1. 尝试获取锁(使用Lua脚本)
    Long ttl = tryLockInner(leaseTime, unit);
    
    if (ttl == null) {
        return true;  // 获取成功
    }
    
    // 2. 订阅锁释放消息
    subscribeToLockRelease();
    
    // 3. 循环等待(不是固定间隔轮询)
    while (remainingTime > 0) {
        // 根据锁的TTL动态计算等待时间
        long timeToWait = Math.min(ttl, remainingTime);
        
        // 使用信号量等待,避免CPU空转
        entry.getLatch().tryAcquire(timeToWait, TimeUnit.MILLISECONDS);
        
        // 锁释放时被唤醒,立即重试
        ttl = tryLockInner(leaseTime, unit);
        if (ttl == null) {
            return true;
        }
        
        // 更新剩余时间
        remainingTime -= elapsedTime;
    }
    
    return false;
}

其他客户端实现

除了Redisson,其他Redis客户端也有类似实现:

客户端库 锁等待实现 特点
Redisson 发布/订阅+信号量 最成熟,功能最全
Lettuce 轮询+退避策略 简单直接
Jedis 需要用户自己实现 灵活性高,但复杂

总结

  1. Redis原生只提供原子操作:SET NX EX和Lua脚本
  2. 锁等待是客户端功能:Redisson在应用层实现完整的锁等待机制
  3. 实现方式:结合Redis Pub/Sub和客户端信号量
  4. 优势:解耦、灵活、可定制
  5. 建议:生产环境推荐使用Redisson,避免自己重复造轮子

这种设计让Redis保持简单高效,同时客户端可以根据需求实现复杂的分布式锁功能。

全部评论: 0

    我有话说: