文章目錄
- Redisson的讀寫鎖使用
- 加鎖源碼分析
- 釋放鎖源碼分析:
- Redisson一次加多個鎖
- RedissonMultiLock加鎖源碼分析:
- RedissonMultiLock釋放鎖源碼分析:
- RCountDownLatch介紹:
- RCountDownLatch源碼分析:
- RSemaphore分布式信號量
- RSmaphore源碼分析:
Redisson的讀寫鎖使用
Redisson 的讀寫鎖是 RReadWriteLock,它是基于 Redis 實現的可重入的分布式讀寫鎖,支持跨線程、跨 JVM 的并發讀寫控制。
分布式鎖的使用案例:
public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();final RReadWriteLock lock = redisson.getReadWriteLock("lock");lock.writeLock().tryLock();Thread t = new Thread() {public void run() {RLock r = lock.readLock();r.lock();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}r.unlock();};};t.start();t.join();lock.writeLock().unlock();t.join();redisson.shutdown();}
分布式鎖源碼分析:
RedissonReadWriteLock 的實現
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);}//讀鎖@Overridepublic RLock readLock() {return new RedissonReadLock(commandExecutor, getName());}//寫鎖@Overridepublic RLock writeLock() {return new RedissonWriteLock(commandExecutor, getName());}}
加鎖源碼分析
RedissonLock.lock()源碼分析:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//線程ID綁定long threadId = Thread.currentThread().getId();//嘗試獲取鎖(非阻塞)//tryAcquire 是執行一段 Lua 腳本,核心做法://如果 Redis 上的鎖 key 不存在,則設置并返回 null;//如果存在,返回剩余 TTL;//所以:返回 null 表示獲取成功,非 null 表示鎖被占用(還剩多少 ms 過期)。Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired加鎖成功if (ttl == null) {return;}//訂閱鎖釋放事件,進入阻塞等待CompletableFuture<RedissonLockEntry> future = subscribe(threadId);pubSub.timeout(future);RedissonLockEntry entry;//根據 interruptibly 阻塞當前線程if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}//循環嘗試重新獲取鎖try {while (true) {//嘗試加鎖ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired加鎖成功if (ttl == null) {//結束break;}// // 阻塞等待 ttl 時間// 這是 Redisson 的“自旋式阻塞重試”機制;//每次重試之前,都會判斷鎖是否釋放;//若未釋放,根據 TTL 設置 CountDownLatch.tryAcquire(ttl),阻塞一段時間;//如果 ttl < 0,表示未知 TTL,則完全阻塞等待鎖釋放信號;if (ttl >= 0) {try {//如果在這段時間內其他線程釋放了鎖,會收到 Redis 發布的消息,喚醒 latch,線程會提前返回。//如果到時間 latch 還沒被釋放,tryAcquire 會返回 false,再進入下一輪 while 循環繼續嘗試。//這就是典型的“阻塞 + 自旋”模式,節省 CPU,又不失活性。entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// ttl < 0 的分支:表示鎖沒有設置過期時間(或 Redis 中已被人為刪除)if (interruptibly) {entry.getLatch().acquire();//// 可中斷阻塞等待} else {entry.getLatch().acquireUninterruptibly();//不可中斷的阻塞}}}} finally {//finally 中取消訂閱unsubscribe(entry, threadId);}
// get(lockAsync(leaseTime, unit));}
嘗試獲取鎖的源碼分析RedissonLock.tryAcquireAsync():
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//嘗試執行 Redis 腳本加鎖RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {//如果指定了 leaseTime > 0,則鎖將在此時間后自動過期(無續約機制)ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//否則使用默認鎖時間(如 30s)+ 開啟「自動續約」ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {//watch Dog 現成定時給沒有設置過期時間的鎖加鎖,默認10s加一次,最長為30sscheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}
區分讀鎖和寫鎖
讀加鎖RedissonReadLock.tryLockInnerAsync():
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'read'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('set', KEYS[2] .. ':1', 1); " +"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" +"redis.call('set', key, 1); " +"redis.call('pexpire', key, ARGV[1]); " +"local remainTime = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +"return nil; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));}
寫加鎖RedissonWriteLock.tryLockInnerAsync():
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'write'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'write') then " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName()),unit.toMillis(leaseTime), getLockName(threadId));}
釋放鎖源碼分析:
釋放鎖RedissonBaseLock.unlockInnerAsync():
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {//獲取鎖釋放通知標識 ID 用于創建一個“臨時 Redis key”,幫助其他線程判斷鎖是否真正釋放并通知它們String id = getServiceManager().generateId();//獲取 Redisson 的集群/主從配置,包括超時、重試等參數。MasterSlaveServersConfig config = getServiceManager().getConfig();//計算一個“最大超時值”,用于后續 unlockInnerAsync(threadId, id, timeout) 調用://這個值等于:每次請求 timeout + 每次重試的 delay × 重試次數;//是 Redisson 估算出這次解鎖流程最大可能執行時長,在后續通知訂閱者的時候用到。long timeout = (config.getTimeout() + config.getRetryDelay().calcDelay(config.getRetryAttempts()).toMillis()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);//調用 真正的解鎖邏輯(重載版本),傳入://threadId:驗證是否為持鎖線程;//id:解鎖通知標識;//timeout:通知監聽器等待的超時時間;RFuture<Boolean> r = unlockInnerAsync(threadId, id, (int) timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {//解鎖操作完成后,繼續處理解鎖通知 key 的刪除邏輯。CommandAsyncExecutor ce = commandExecutor;// 判斷當前是否在批量命令上下文中。如果是,則重新包一層 CommandBatchService(防止共享污染);if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}//刪除解鎖通知 key(如:redisson_lock__unlock_latch__{id})//這個 key 是在 unlock 的 Lua 腳本中設置的,用于通知等待線程“鎖釋放”是否成功。//為什么要顯式刪掉它?//因為訂閱者在收到解鎖通知后,會去檢查這個 key 是否存在;//不存在 = 真釋放成功;//為了避免它一直存在,占用內存,Redisson 主動清除它。ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}return v;});return new CompletableFutureWrapper<>(ff);}
讀鎖釋放鎖RedissonReadLock.unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[5]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[5], 1, 'px', ARGV[4]); " +"return nil; " +"end; " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +"if (lockExists == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter == 0) then " +"redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" +"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +"if (redis.call('hlen', KEYS[1]) > 1) then " +"local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " +"if maxRemainTime > 0 then " +"redis.call('pexpire', KEYS[1], maxRemainTime); " +"redis.call('set', KEYS[5], 0, 'px', ARGV[4]); " +"return 0; " +"end;" + "if mode == 'write' then " +"redis.call('set', KEYS[5], 0, 'px', ARGV[4]); " +"return 0;" +"end; " +"end; " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[5], 1, 'px', ARGV[4]); " +"return 1; ",Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix, getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}
寫鎖釋放鎖RedissonWriteLock.unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[3]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return nil; " +"end;" +"if (mode == 'write') then " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +"if (lockExists == 0) then " +"return nil;" +"else " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +"return 0; " +"else " +"redis.call('hdel', KEYS[1], ARGV[3]); " +"if (redis.call('hlen', KEYS[1]) == 1) then " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"else " +// has unlocked read-locks"redis.call('hset', KEYS[1], 'mode', 'read'); " +"end; " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return 1; "+"end; " +"end; " +"end; "+ "return nil;",Arrays.<Object>asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}
讀鎖特點
多個線程可以同時獲取讀鎖;
如果有寫鎖存在,其他線程不能獲取讀鎖;
當前線程已持有寫鎖時,可以獲取讀鎖(讀寫重入);
每獲取一次讀鎖,Redisson 都會創建一個 獨立的 Redis key,用于設置過期時間;
所有讀鎖共享一個主 Hash key,如 rwlock:{myKey}。
使用場景:緩存讀取、多線程查看共享數據時
寫鎖特點
只能被一個線程獨占;
如果有讀鎖或寫鎖,其他線程都必須等待;
支持可重入(同線程反復加鎖);
超時釋放:設置 leaseTime 或依賴 WatchDog;
Redis 中使用 Hash key 存儲鎖狀態及持有者線程。
Redisson一次加多個鎖
RedissonMultiLocks加多個鎖的使用案例:
public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient client = Redisson.create();RLock lock1 = client.getLock("lock1");RLock lock2 = client.getLock("lock2");RLock lock3 = client.getLock("lock3");Thread t = new Thread() {public void run() {RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);lock.lock();try {Thread.sleep(3000);} catch (InterruptedException e) {}lock.unlock();};};t.start();t.join(1000);RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);lock.lock();lock.unlock();}
RedissonMultiLock加鎖源碼分析:
RedissonMultiLock.lock()
public void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {//每個鎖默認等待 1500ms,因此基礎等待時間是鎖數量 × 1500 毫秒;long baseWaitTime = locks.size() * 1500;while (true) {long waitTime;//如果你不傳 leaseTime,那就等 baseWaitTime 毫秒;if (leaseTime <= 0) {waitTime = baseWaitTime;} else {//如果你傳了 leaseTime,就在一個合理范圍內隨機等待一段時間;//這個隨機機制可以緩解多個線程同時競爭鎖時的“驚群效應”(即都在同一時間重試造成 Redis 壓力),提高鎖的公平性和獲取率。waitTime = unit.toMillis(leaseTime);if (waitTime <= baseWaitTime) {waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);} else {waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);}}if (leaseTime > 0) {leaseTime = unit.toMillis(leaseTime);}//嘗試在 waitTime 時間內加鎖成功;//如果成功,就返回;//如果失敗,繼續下一輪(外層 while 循環);//若線程在這期間被中斷,則拋出 InterruptedException。if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {return;}}}
嘗試加鎖,本質還是對單個的對象加鎖,全部加鎖成功,才算成功,一個加鎖失敗,都需要將加鎖成功的釋放,防止出現死鎖的現象。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }//重新計算內部實際鎖定的時間 newLeaseTime。long newLeaseTime = -1;if (leaseTime > 0) {//如果設置了 waitTime,newLeaseTime 會取 waitTime*2,用于確保鎖不會因為 續期等待時間過短 提前釋放。if (waitTime > 0) {newLeaseTime = unit.toMillis(waitTime)*2;} else {// //否則按用戶指定的 leaseTime。newLeaseTime = unit.toMillis(leaseTime);}}//記錄當前時間 time,用于后續計算剩余等待時間long time = System.currentTimeMillis();//如果設置了 waitTime,計算還剩多少時間 remainTime。long remainTime = -1;if (waitTime > 0) {remainTime = unit.toMillis(waitTime);}//lockWaitTime 是為了當前線程嘗試單個鎖時的最大等待時間,內部有可能會縮小單個鎖的等待時間,避免整個鎖組阻塞時間太久。long lockWaitTime = calcLockWaitTime(remainTime);//獲取容忍失敗的鎖個數(在 RedLock 中可能允許某些節點獲取失敗)。int failedLocksLimit = failedLocksLimit();//初始化已獲取鎖的列表。List<RLock> acquiredLocks = new ArrayList<>(locks.size());//遍歷所有要加的鎖(一般用于 RedLock 或 MultiLock)。for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime <= 0 && leaseTime <= 0) {// 非阻塞鎖lockAcquired = lock.tryLock();} else {//如果沒有設置等待時間和租約時間,直接嘗試非阻塞 tryLock()。//否則嘗試阻塞獲取鎖,最大等待時間是 awaitTime。//如果 Redis 響應超時或異常,則判定加鎖失敗,并釋放當前鎖。long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}//判斷當前鎖是否成功if (lockAcquired) {//加鎖成功acquiredLocks.add(lock);} else {//加鎖失敗 如果失敗鎖已達上限(如 RedLock 只要求超過半數成功),直接退出;if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}//如果不允許失敗if (failedLocksLimit == 0) {//解鎖之前已成功的鎖(防止部分加鎖造成死鎖)。unlockInner(acquiredLocks);//如果超時時間是 0,立即返回失敗。if (waitTime <= 0) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// 重置迭代器,重新加鎖;while (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}//剩余時間調整if (remainTime > 0) {//每次加鎖后都要更新剩余的可等待時間;remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();//如果耗盡 waitTime,釋放當前已加鎖部分,返回失敗。if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}//如果指定了鎖有效期 leaseTime,則設置所有成功加鎖的鎖的過期時間。if (leaseTime > 0) {acquiredLocks.stream().map(l -> (RedissonBaseLock) l).map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS)).forEach(f -> f.toCompletableFuture().join());}return true;}
RedissonMultiLock釋放鎖源碼分析:
RedissonMultiLock.unlock()的源碼流程
public void unlock() {//循環遍歷加鎖的對象,進行鎖的釋放locks.forEach(Lock::unlock);}
RedissonBaseLock.unlockAsync0()
private RFuture<Void> unlockAsync0(long threadId) {CompletionStage<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((res, e) -> {cancelExpirationRenewal(threadId, res);//釋放鎖報錯if (e != null) {if (e instanceof CompletionException) {throw (CompletionException) e;}throw new CompletionException(e);}//返回的響應為nullif (res == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}
RedissonBaseLock.unlockInnerAsync()
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {//獲取鎖釋放通知標識 ID 用于創建一個“臨時 Redis key”,幫助其他線程判斷鎖是否真正釋放并通知它們String id = getServiceManager().generateId();//獲取 Redisson 的集群/主從配置,包括超時、重試等參數。MasterSlaveServersConfig config = getServiceManager().getConfig();//計算一個“最大超時值”,用于后續 unlockInnerAsync(threadId, id, timeout) 調用://這個值等于:每次請求 timeout + 每次重試的 delay × 重試次數;//是 Redisson 估算出這次解鎖流程最大可能執行時長,在后續通知訂閱者的時候用到。long timeout = (config.getTimeout() + config.getRetryDelay().calcDelay(config.getRetryAttempts()).toMillis()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);//調用 真正的解鎖邏輯(重載版本),傳入://threadId:驗證是否為持鎖線程;//id:解鎖通知標識;//timeout:通知監聽器等待的超時時間;RFuture<Boolean> r = unlockInnerAsync(threadId, id, (int) timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {//解鎖操作完成后,繼續處理解鎖通知 key 的刪除邏輯。CommandAsyncExecutor ce = commandExecutor;// 判斷當前是否在批量命令上下文中。如果是,則重新包一層 CommandBatchService(防止共享污染);if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}//刪除解鎖通知 key(如:redisson_lock__unlock_latch__{id})//這個 key 是在 unlock 的 Lua 腳本中設置的,用于通知等待線程“鎖釋放”是否成功。//為什么要顯式刪掉它?//因為訂閱者在收到解鎖通知后,會去檢查這個 key 是否存在;//不存在 = 真釋放成功;//為了避免它一直存在,占用內存,Redisson 主動清除它。ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}return v;});return new CompletableFutureWrapper<>(ff);}
執行RedissonLock.unlockInnerAsync()的lua腳本:
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[3]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return 1; " +"end; ",Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}
RCountDownLatch介紹:
RCountDownLatch 是 Redisson 對 Redis 實現的 分布式 CountDownLatch 的封裝,和 Java 標準庫中的 java.util.concurrent.CountDownLatch 類似,但它是跨 JVM、跨服務進程共享狀態的,用于在分布式環境下實現線程/服務間的同步控制。
使用場景:
你有多個微服務或線程并發執行,某個主服務需要等待它們全部完成;
替代本地 CountDownLatch,但要求在集群或多機房環境下同步狀態;、
多個服務節點要協調完成某些“分布式初始化任務”;
實現“多個客戶端等待某個事件發生”這種阻塞機制。
使用案例:
public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();ExecutorService executor = Executors.newFixedThreadPool(2);final RCountDownLatch latch = redisson.getCountDownLatch("latch1");latch.trySetCount(1);executor.execute(new Runnable() {@Overridepublic void run() {latch.countDown();}});executor.execute(new Runnable() {@Overridepublic void run() {try {latch.await(550, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}});executor.shutdown();executor.awaitTermination(10, TimeUnit.SECONDS);}
方法:
trySetCount(long count) 初始化計數值。只能設置一次(如果已存在則返回 false)
await() 阻塞等待直到計數為 0
await(long time, TimeUnit) 指定時間等待,超時返回 false
countDown() 將計數 -1,當計數為 0 時,喚醒所有 await 的線程
getCount() 獲取當前計數值
delete() 刪除 latch 對象在 Redis 中的 key(非必要)
RCountDownLatch源碼分析:
RCountDownLatch.trySetCount()方法,初始化計數值
public boolean trySetCount(long count) {//設置初始的大小,只能設置一次,如果已經設置了則返回0,否則返回1return get(trySetCountAsync(count));}
根據lua腳本設置初始化值,只能設置一次,如果當前的值為0設置成功,返回結果1,否則不需要設置,返回結果0。
public RFuture<Boolean> trySetCountAsync(long count) {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if redis.call('exists', KEYS[1]) == 0 then "+ "redis.call('set', KEYS[1], ARGV[2]); "+ "redis.call(ARGV[3], KEYS[2], ARGV[1]); "+ "return 1 "+ "else "+ "return 0 "+ "end",Arrays.asList(getRawName(), getChannelName()),CountDownLatchPubSub.NEW_COUNT_MESSAGE, count, getSubscribeService().getPublishCommand());}
countDown 將計數-1的源碼分析:
public void countDown() {get(countDownAsync());}//通過lua腳本,將計數-1,如果當前的減完后的技術小于等于0,刪除當前的countdownlatch,如果等于0,則喚醒訂閱阻塞的線程,public RFuture<Void> countDownAsync() {return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local v = redis.call('decr', KEYS[1]);" +"if v <= 0 then redis.call('del', KEYS[1]) end;" +"if v == 0 then redis.call(ARGV[2], KEYS[2], ARGV[1]) end;",Arrays.<Object>asList(getRawName(), getChannelName()),CountDownLatchPubSub.ZERO_COUNT_MESSAGE, getSubscribeService().getPublishCommand());
}
RSemaphore分布式信號量
RSemaphore 是 Redisson 對 Java 標準 Semaphore 的分布式實現(java.util.concurrent.Semaphore 的 Redis 版本),用于 限量訪問資源 的場景,比如控制并發數、資源配額、限流器等。
功能:
支持分布式 可跨 JVM、服務節點共享資源許可數;
支持可阻塞 acquire() 支持等待直到有許可;
支持可限時 tryAcquire(timeout) 支持超時等待;
支持可釋放 release() 歸還許可;
支持動態調整許可數量 通過 trySetPermits();
方法功能:
trySetPermits(int permits) 初始化許可總數(只能設置一次)
addPermits(int permits) 增加許可(可重復調用)
acquire() 請求一個許可(阻塞)
tryAcquire() 非阻塞地獲取許可,獲取不到返回 false
tryAcquire(timeout, unit) 限時等待許可
release() 釋放一個許可
availablePermits() 當前可用許可數
delete() 刪除 Redis 中的信號量記錄
使用案例:
public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();RSemaphore s = redisson.getSemaphore("test");s.trySetPermits(5);s.acquire(3);Thread t = new Thread() {@Overridepublic void run() {RSemaphore s = redisson.getSemaphore("test");s.release();s.release();}};t.start();s.acquire(4);redisson.shutdown();}
RSmaphore源碼分析:
public void acquire() throws InterruptedException {//請求得到一個計數acquire(1);}
public void acquire(int permits) throws InterruptedException {//嘗試獲取計數if (tryAcquire(permits)) {return;}//訂閱CompletableFuture<RedissonLockEntry> future = subscribe();semaphorePubSub.timeout(future);RedissonLockEntry entry = commandExecutor.getInterrupted(future);try {while (true) {//嘗試獲取,如果獲取到就結束,否則就阻塞if (tryAcquire(permits)) {return;}//阻塞,等待通知entry.getLatch().acquire();}} finally {//釋放訂閱信息unsubscribe(entry);}
// get(acquireAsync(permits));}
public RFuture<Boolean> tryAcquireAsync(int permits) {//健壯性校驗if (permits < 0) {throw new IllegalArgumentException("Permits amount can't be negative");}//如果適當的數等于0,直接返回成功if (permits == 0) {return new CompletableFutureWrapper<>(true);}return commandExecutor.getServiceManager().execute(() -> {//核心的方法RFuture<Boolean> future = tryAcquireAsync0(permits);return commandExecutor.handleNoSync(future, e -> releaseAsync(permits));});}//執行我們的lua腳本private RFuture<Boolean> tryAcquireAsync0(int permits) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local value = redis.call('get', KEYS[1]); " +"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getRawName()), permits);}