private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}
Async代表异步函数。在 Redisson 中,所有带有 Async 后缀的方法都是异步版本。
即它不会阻塞当前线程去等待 Redis 命令执行完成,而是立即返回一个 RFuture 对象。
Redisson 是基于 Netty 的 Redis 客户端,它天然是事件驱动 + 非阻塞 I/O。也就是说,一个线程可以同时管理成百上千个 Redis 连接请求,不需要为每个请求单独创建线程。
可重试机制:
boolean success = lock.tryLock(1, TimeUnit.MINUTES);
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
-
如果
ttl为null,代表加锁成功直接返回true否则表明锁被占用,则去拿对方剩余的 TTL -
订阅释放信号
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, MILLISECONDS)) { ... return false; }protected RFuture<RedissonLockEntry> subscribe(long threadId) {return pubSub.subscribe(getEntryName(), getChannelName());}String getChannelName() {return prefixName("redisson_lock__channel", getName());}
当别人unlock() 时会publish() 一条解锁消息。这里先订阅,再判断。
- 等待-重试的循环
while (true) {long cur = now();ttl = tryAcquire(...); // 再抢一次if (ttl == null) return true; // 抢到就直接返回time -= (now() - cur); // 扣减剩余等待时间if (time <= 0) { acquireFailed(...); return false; }// 选择等待多久再试:取“锁的剩余TTL”和“自己的剩余等待time”的较小者cur = now();if (ttl >= 0 && ttl < time) {entry.getLatch().tryAcquire(ttl, MILLISECONDS);} else {entry.getLatch().tryAcquire(time, MILLISECONDS);}time -= (now() - cur);if (time <= 0) { acquireFailed(...); return false; }
}