Redisson的分布式鎖源碼分析2

文章目錄

  • Redisson的讀寫鎖使用
    • 加鎖源碼分析
    • 釋放鎖源碼分析:
  • Redisson一次加多個鎖
    • RedissonMultiLock加鎖源碼分析:
    • RedissonMultiLock釋放鎖源碼分析:
  • RCountDownLatch介紹:
    • RCountDownLatch源碼分析:
  • RSemaphore分布式信號量
    • RSmaphore源碼分析:

Redisson的讀寫鎖使用

Redisson 的讀寫鎖是 RReadWriteLock,它是基于 Redis 實現的可重入的分布式讀寫鎖,支持跨線程、跨 JVM 的并發讀寫控制。

分布式鎖的使用案例:

public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();final RReadWriteLock lock = redisson.getReadWriteLock("lock");lock.writeLock().tryLock();Thread t = new Thread() {public void run() {RLock r = lock.readLock();r.lock();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}r.unlock();};};t.start();t.join();lock.writeLock().unlock();t.join();redisson.shutdown();}

分布式鎖源碼分析:
RedissonReadWriteLock 的實現

public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);}//讀鎖@Overridepublic RLock readLock() {return new RedissonReadLock(commandExecutor, getName());}//寫鎖@Overridepublic RLock writeLock() {return new RedissonWriteLock(commandExecutor, getName());}}

加鎖源碼分析

RedissonLock.lock()源碼分析:

 private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//線程ID綁定long threadId = Thread.currentThread().getId();//嘗試獲取鎖(非阻塞)//tryAcquire 是執行一段 Lua 腳本,核心做法://如果 Redis 上的鎖 key 不存在,則設置并返回 null;//如果存在,返回剩余 TTL;//所以:返回 null 表示獲取成功,非 null 表示鎖被占用(還剩多少 ms 過期)。Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired加鎖成功if (ttl == null) {return;}//訂閱鎖釋放事件,進入阻塞等待CompletableFuture<RedissonLockEntry> future = subscribe(threadId);pubSub.timeout(future);RedissonLockEntry entry;//根據 interruptibly 阻塞當前線程if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}//循環嘗試重新獲取鎖try {while (true) {//嘗試加鎖ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquired加鎖成功if (ttl == null) {//結束break;}//  // 阻塞等待 ttl 時間// 這是 Redisson 的“自旋式阻塞重試”機制;//每次重試之前,都會判斷鎖是否釋放;//若未釋放,根據 TTL 設置 CountDownLatch.tryAcquire(ttl),阻塞一段時間;//如果 ttl < 0,表示未知 TTL,則完全阻塞等待鎖釋放信號;if (ttl >= 0) {try {//如果在這段時間內其他線程釋放了鎖,會收到 Redis 發布的消息,喚醒 latch,線程會提前返回。//如果到時間 latch 還沒被釋放,tryAcquire 會返回 false,再進入下一輪 while 循環繼續嘗試。//這就是典型的“阻塞 + 自旋”模式,節省 CPU,又不失活性。entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// ttl < 0 的分支:表示鎖沒有設置過期時間(或 Redis 中已被人為刪除)if (interruptibly) {entry.getLatch().acquire();//// 可中斷阻塞等待} else {entry.getLatch().acquireUninterruptibly();//不可中斷的阻塞}}}} finally {//finally 中取消訂閱unsubscribe(entry, threadId);}
//        get(lockAsync(leaseTime, unit));}

嘗試獲取鎖的源碼分析RedissonLock.tryAcquireAsync():

    private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//嘗試執行 Redis 腳本加鎖RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {//如果指定了 leaseTime > 0,則鎖將在此時間后自動過期(無續約機制)ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//否則使用默認鎖時間(如 30s)+ 開啟「自動續約」ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {//watch Dog 現成定時給沒有設置過期時間的鎖加鎖,默認10s加一次,最長為30sscheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}

區分讀鎖和寫鎖
讀加鎖RedissonReadLock.tryLockInnerAsync():

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'read'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('set', KEYS[2] .. ':1', 1); " +"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" +"redis.call('set', key, 1); " +"redis.call('pexpire', key, ARGV[1]); " +"local remainTime = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +"return nil; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));}

寫加鎖RedissonWriteLock.tryLockInnerAsync():

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'write'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'write') then " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getRawName()),unit.toMillis(leaseTime), getLockName(threadId));}

釋放鎖源碼分析:

釋放鎖RedissonBaseLock.unlockInnerAsync():

protected final RFuture<Boolean> unlockInnerAsync(long threadId) {//獲取鎖釋放通知標識 ID 用于創建一個“臨時 Redis key”,幫助其他線程判斷鎖是否真正釋放并通知它們String id = getServiceManager().generateId();//獲取 Redisson 的集群/主從配置,包括超時、重試等參數。MasterSlaveServersConfig config = getServiceManager().getConfig();//計算一個“最大超時值”,用于后續 unlockInnerAsync(threadId, id, timeout) 調用://這個值等于:每次請求 timeout + 每次重試的 delay × 重試次數;//是 Redisson 估算出這次解鎖流程最大可能執行時長,在后續通知訂閱者的時候用到。long timeout = (config.getTimeout() + config.getRetryDelay().calcDelay(config.getRetryAttempts()).toMillis()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);//調用 真正的解鎖邏輯(重載版本),傳入://threadId:驗證是否為持鎖線程;//id:解鎖通知標識;//timeout:通知監聽器等待的超時時間;RFuture<Boolean> r = unlockInnerAsync(threadId, id, (int) timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {//解鎖操作完成后,繼續處理解鎖通知 key 的刪除邏輯。CommandAsyncExecutor ce = commandExecutor;// 判斷當前是否在批量命令上下文中。如果是,則重新包一層 CommandBatchService(防止共享污染);if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}//刪除解鎖通知 key(如:redisson_lock__unlock_latch__{id})//這個 key 是在 unlock 的 Lua 腳本中設置的,用于通知等待線程“鎖釋放”是否成功。//為什么要顯式刪掉它?//因為訂閱者在收到解鎖通知后,會去檢查這個 key 是否存在;//不存在 = 真釋放成功;//為了避免它一直存在,占用內存,Redisson 主動清除它。ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}return v;});return new CompletableFutureWrapper<>(ff);}

讀鎖釋放鎖RedissonReadLock.unlockInnerAsync()

  protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[5]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[5], 1, 'px', ARGV[4]); " +"return nil; " +"end; " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +"if (lockExists == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter == 0) then " +"redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" +"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +"if (redis.call('hlen', KEYS[1]) > 1) then " +"local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " +"if maxRemainTime > 0 then " +"redis.call('pexpire', KEYS[1], maxRemainTime); " +"redis.call('set', KEYS[5], 0, 'px', ARGV[4]); " +"return 0; " +"end;" + "if mode == 'write' then " +"redis.call('set', KEYS[5], 0, 'px', ARGV[4]); " +"return 0;" +"end; " +"end; " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[5], 1, 'px', ARGV[4]); " +"return 1; ",Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix, getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}

寫鎖釋放鎖RedissonWriteLock.unlockInnerAsync()

  protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[3]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return nil; " +"end;" +"if (mode == 'write') then " +"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +"if (lockExists == 0) then " +"return nil;" +"else " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +"return 0; " +"else " +"redis.call('hdel', KEYS[1], ARGV[3]); " +"if (redis.call('hlen', KEYS[1]) == 1) then " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"else " +// has unlocked read-locks"redis.call('hset', KEYS[1], 'mode', 'read'); " +"end; " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return 1; "+"end; " +"end; " +"end; "+ "return nil;",Arrays.<Object>asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}

讀鎖特點
多個線程可以同時獲取讀鎖;
如果有寫鎖存在,其他線程不能獲取讀鎖;
當前線程已持有寫鎖時,可以獲取讀鎖(讀寫重入);
每獲取一次讀鎖,Redisson 都會創建一個 獨立的 Redis key,用于設置過期時間;
所有讀鎖共享一個主 Hash key,如 rwlock:{myKey}。
使用場景:緩存讀取、多線程查看共享數據時

寫鎖特點
只能被一個線程獨占;
如果有讀鎖或寫鎖,其他線程都必須等待;
支持可重入(同線程反復加鎖);
超時釋放:設置 leaseTime 或依賴 WatchDog;
Redis 中使用 Hash key 存儲鎖狀態及持有者線程。

Redisson一次加多個鎖

RedissonMultiLocks加多個鎖的使用案例:

    public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient client = Redisson.create();RLock lock1 = client.getLock("lock1");RLock lock2 = client.getLock("lock2");RLock lock3 = client.getLock("lock3");Thread t = new Thread() {public void run() {RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);lock.lock();try {Thread.sleep(3000);} catch (InterruptedException e) {}lock.unlock();};};t.start();t.join(1000);RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);lock.lock();lock.unlock();}

RedissonMultiLock加鎖源碼分析:

RedissonMultiLock.lock()

   public void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
 public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {//每個鎖默認等待 1500ms,因此基礎等待時間是鎖數量 × 1500 毫秒;long baseWaitTime = locks.size() * 1500;while (true) {long waitTime;//如果你不傳 leaseTime,那就等 baseWaitTime 毫秒;if (leaseTime <= 0) {waitTime = baseWaitTime;} else {//如果你傳了 leaseTime,就在一個合理范圍內隨機等待一段時間;//這個隨機機制可以緩解多個線程同時競爭鎖時的“驚群效應”(即都在同一時間重試造成 Redis 壓力),提高鎖的公平性和獲取率。waitTime = unit.toMillis(leaseTime);if (waitTime <= baseWaitTime) {waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);} else {waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);}}if (leaseTime > 0) {leaseTime = unit.toMillis(leaseTime);}//嘗試在 waitTime 時間內加鎖成功;//如果成功,就返回;//如果失敗,繼續下一輪(外層 while 循環);//若線程在這期間被中斷,則拋出 InterruptedException。if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {return;}}}

嘗試加鎖,本質還是對單個的對象加鎖,全部加鎖成功,才算成功,一個加鎖失敗,都需要將加鎖成功的釋放,防止出現死鎖的現象。

 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//        try {
//            return tryLockAsync(waitTime, leaseTime, unit).get();
//        } catch (ExecutionException e) {
//            throw new IllegalStateException(e);
//        }//重新計算內部實際鎖定的時間 newLeaseTime。long newLeaseTime = -1;if (leaseTime > 0) {//如果設置了 waitTime,newLeaseTime 會取 waitTime*2,用于確保鎖不會因為 續期等待時間過短 提前釋放。if (waitTime > 0) {newLeaseTime = unit.toMillis(waitTime)*2;} else {//        //否則按用戶指定的 leaseTime。newLeaseTime = unit.toMillis(leaseTime);}}//記錄當前時間 time,用于后續計算剩余等待時間long time = System.currentTimeMillis();//如果設置了 waitTime,計算還剩多少時間 remainTime。long remainTime = -1;if (waitTime > 0) {remainTime = unit.toMillis(waitTime);}//lockWaitTime 是為了當前線程嘗試單個鎖時的最大等待時間,內部有可能會縮小單個鎖的等待時間,避免整個鎖組阻塞時間太久。long lockWaitTime = calcLockWaitTime(remainTime);//獲取容忍失敗的鎖個數(在 RedLock 中可能允許某些節點獲取失敗)。int failedLocksLimit = failedLocksLimit();//初始化已獲取鎖的列表。List<RLock> acquiredLocks = new ArrayList<>(locks.size());//遍歷所有要加的鎖(一般用于 RedLock 或 MultiLock)。for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime <= 0 && leaseTime <= 0) {// 非阻塞鎖lockAcquired = lock.tryLock();} else {//如果沒有設置等待時間和租約時間,直接嘗試非阻塞 tryLock()。//否則嘗試阻塞獲取鎖,最大等待時間是 awaitTime。//如果 Redis 響應超時或異常,則判定加鎖失敗,并釋放當前鎖。long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}//判斷當前鎖是否成功if (lockAcquired) {//加鎖成功acquiredLocks.add(lock);} else {//加鎖失敗 如果失敗鎖已達上限(如 RedLock 只要求超過半數成功),直接退出;if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}//如果不允許失敗if (failedLocksLimit == 0) {//解鎖之前已成功的鎖(防止部分加鎖造成死鎖)。unlockInner(acquiredLocks);//如果超時時間是 0,立即返回失敗。if (waitTime <= 0) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// 重置迭代器,重新加鎖;while (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}//剩余時間調整if (remainTime > 0) {//每次加鎖后都要更新剩余的可等待時間;remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();//如果耗盡 waitTime,釋放當前已加鎖部分,返回失敗。if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}//如果指定了鎖有效期 leaseTime,則設置所有成功加鎖的鎖的過期時間。if (leaseTime > 0) {acquiredLocks.stream().map(l -> (RedissonBaseLock) l).map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS)).forEach(f -> f.toCompletableFuture().join());}return true;}

RedissonMultiLock釋放鎖源碼分析:

RedissonMultiLock.unlock()的源碼流程

    public void unlock() {//循環遍歷加鎖的對象,進行鎖的釋放locks.forEach(Lock::unlock);}

RedissonBaseLock.unlockAsync0()

    private RFuture<Void> unlockAsync0(long threadId) {CompletionStage<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((res, e) -> {cancelExpirationRenewal(threadId, res);//釋放鎖報錯if (e != null) {if (e instanceof CompletionException) {throw (CompletionException) e;}throw new CompletionException(e);}//返回的響應為nullif (res == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}

RedissonBaseLock.unlockInnerAsync()

protected final RFuture<Boolean> unlockInnerAsync(long threadId) {//獲取鎖釋放通知標識 ID 用于創建一個“臨時 Redis key”,幫助其他線程判斷鎖是否真正釋放并通知它們String id = getServiceManager().generateId();//獲取 Redisson 的集群/主從配置,包括超時、重試等參數。MasterSlaveServersConfig config = getServiceManager().getConfig();//計算一個“最大超時值”,用于后續 unlockInnerAsync(threadId, id, timeout) 調用://這個值等于:每次請求 timeout + 每次重試的 delay × 重試次數;//是 Redisson 估算出這次解鎖流程最大可能執行時長,在后續通知訂閱者的時候用到。long timeout = (config.getTimeout() + config.getRetryDelay().calcDelay(config.getRetryAttempts()).toMillis()) * config.getRetryAttempts();timeout = Math.max(timeout, 1);//調用 真正的解鎖邏輯(重載版本),傳入://threadId:驗證是否為持鎖線程;//id:解鎖通知標識;//timeout:通知監聽器等待的超時時間;RFuture<Boolean> r = unlockInnerAsync(threadId, id, (int) timeout);CompletionStage<Boolean> ff = r.thenApply(v -> {//解鎖操作完成后,繼續處理解鎖通知 key 的刪除邏輯。CommandAsyncExecutor ce = commandExecutor;// 判斷當前是否在批量命令上下文中。如果是,則重新包一層 CommandBatchService(防止共享污染);if (ce instanceof CommandBatchService) {ce = new CommandBatchService(commandExecutor);}//刪除解鎖通知 key(如:redisson_lock__unlock_latch__{id})//這個 key 是在 unlock 的 Lua 腳本中設置的,用于通知等待線程“鎖釋放”是否成功。//為什么要顯式刪掉它?//因為訂閱者在收到解鎖通知后,會去檢查這個 key 是否存在;//不存在 = 真釋放成功;//為了避免它一直存在,占用內存,Redisson 主動清除它。ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));if (ce instanceof CommandBatchService) {((CommandBatchService) ce).executeAsync();}return v;});return new CompletableFutureWrapper<>(ff);}

執行RedissonLock.unlockInnerAsync()的lua腳本:

    protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local val = redis.call('get', KEYS[3]); " +"if val ~= false then " +"return tonumber(val);" +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +"return 1; " +"end; ",Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);}

RCountDownLatch介紹:

RCountDownLatch 是 Redisson 對 Redis 實現的 分布式 CountDownLatch 的封裝,和 Java 標準庫中的 java.util.concurrent.CountDownLatch 類似,但它是跨 JVM、跨服務進程共享狀態的,用于在分布式環境下實現線程/服務間的同步控制。

使用場景:
你有多個微服務或線程并發執行,某個主服務需要等待它們全部完成;
替代本地 CountDownLatch,但要求在集群或多機房環境下同步狀態;、
多個服務節點要協調完成某些“分布式初始化任務”;
實現“多個客戶端等待某個事件發生”這種阻塞機制。

使用案例:

public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();ExecutorService executor = Executors.newFixedThreadPool(2);final RCountDownLatch latch = redisson.getCountDownLatch("latch1");latch.trySetCount(1);executor.execute(new Runnable() {@Overridepublic void run() {latch.countDown();}});executor.execute(new Runnable() {@Overridepublic void run() {try {latch.await(550, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}});executor.shutdown();executor.awaitTermination(10, TimeUnit.SECONDS);}

方法:

trySetCount(long count) 初始化計數值。只能設置一次(如果已存在則返回 false)
await() 阻塞等待直到計數為 0
await(long time, TimeUnit) 指定時間等待,超時返回 false
countDown() 將計數 -1,當計數為 0 時,喚醒所有 await 的線程
getCount() 獲取當前計數值
delete() 刪除 latch 對象在 Redis 中的 key(非必要)

RCountDownLatch源碼分析:

RCountDownLatch.trySetCount()方法,初始化計數值

   public boolean trySetCount(long count) {//設置初始的大小,只能設置一次,如果已經設置了則返回0,否則返回1return get(trySetCountAsync(count));}

根據lua腳本設置初始化值,只能設置一次,如果當前的值為0設置成功,返回結果1,否則不需要設置,返回結果0。

    public RFuture<Boolean> trySetCountAsync(long count) {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if redis.call('exists', KEYS[1]) == 0 then "+ "redis.call('set', KEYS[1], ARGV[2]); "+ "redis.call(ARGV[3], KEYS[2], ARGV[1]); "+ "return 1 "+ "else "+ "return 0 "+ "end",Arrays.asList(getRawName(), getChannelName()),CountDownLatchPubSub.NEW_COUNT_MESSAGE, count, getSubscribeService().getPublishCommand());}

countDown 將計數-1的源碼分析:

    public void countDown() {get(countDownAsync());}//通過lua腳本,將計數-1,如果當前的減完后的技術小于等于0,刪除當前的countdownlatch,如果等于0,則喚醒訂閱阻塞的線程,public RFuture<Void> countDownAsync() {return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local v = redis.call('decr', KEYS[1]);" +"if v <= 0 then redis.call('del', KEYS[1]) end;" +"if v == 0 then redis.call(ARGV[2], KEYS[2], ARGV[1]) end;",Arrays.<Object>asList(getRawName(), getChannelName()),CountDownLatchPubSub.ZERO_COUNT_MESSAGE, getSubscribeService().getPublishCommand());
}

RSemaphore分布式信號量

RSemaphore 是 Redisson 對 Java 標準 Semaphore 的分布式實現(java.util.concurrent.Semaphore 的 Redis 版本),用于 限量訪問資源 的場景,比如控制并發數、資源配額、限流器等。
功能:
支持分布式 可跨 JVM、服務節點共享資源許可數;
支持可阻塞 acquire() 支持等待直到有許可;
支持可限時 tryAcquire(timeout) 支持超時等待;
支持可釋放 release() 歸還許可;
支持動態調整許可數量 通過 trySetPermits();
方法功能:
trySetPermits(int permits) 初始化許可總數(只能設置一次)
addPermits(int permits) 增加許可(可重復調用)
acquire() 請求一個許可(阻塞)
tryAcquire() 非阻塞地獲取許可,獲取不到返回 false
tryAcquire(timeout, unit) 限時等待許可
release() 釋放一個許可
availablePermits() 當前可用許可數
delete() 刪除 Redis 中的信號量記錄
使用案例:

    public static void main(String[] args) throws InterruptedException {// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();RSemaphore s = redisson.getSemaphore("test");s.trySetPermits(5);s.acquire(3);Thread t = new Thread() {@Overridepublic void run() {RSemaphore s = redisson.getSemaphore("test");s.release();s.release();}};t.start();s.acquire(4);redisson.shutdown();}

RSmaphore源碼分析:

 public void acquire() throws InterruptedException {//請求得到一個計數acquire(1);}
  public void acquire(int permits) throws InterruptedException {//嘗試獲取計數if (tryAcquire(permits)) {return;}//訂閱CompletableFuture<RedissonLockEntry> future = subscribe();semaphorePubSub.timeout(future);RedissonLockEntry entry = commandExecutor.getInterrupted(future);try {while (true) {//嘗試獲取,如果獲取到就結束,否則就阻塞if (tryAcquire(permits)) {return;}//阻塞,等待通知entry.getLatch().acquire();}} finally {//釋放訂閱信息unsubscribe(entry);}
//        get(acquireAsync(permits));}
    public RFuture<Boolean> tryAcquireAsync(int permits) {//健壯性校驗if (permits < 0) {throw new IllegalArgumentException("Permits amount can't be negative");}//如果適當的數等于0,直接返回成功if (permits == 0) {return new CompletableFutureWrapper<>(true);}return commandExecutor.getServiceManager().execute(() -> {//核心的方法RFuture<Boolean> future = tryAcquireAsync0(permits);return commandExecutor.handleNoSync(future, e -> releaseAsync(permits));});}//執行我們的lua腳本private RFuture<Boolean> tryAcquireAsync0(int permits) {return commandExecutor.syncedEvalNoRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local value = redis.call('get', KEYS[1]); " +"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getRawName()), permits);}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87470.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87470.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87470.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

系統架構設計師論文分享-論軟件過程模型及應用

我的軟考歷程 摘要 2023年2月&#xff0c;我所在的公司通過了研發紗線MES系統的立項&#xff0c;該系統為國內紗線工廠提供SAAS服務&#xff0c;旨在提升紗線工廠的數字化和智能化水平。我在該項目中擔任架構設計師&#xff0c;負責該項目的架構設計工作。本文結合我在該項目…

云原生Kubernetes系列 | etcd3.5集群部署和使用

云原生Kubernetes系列 | etcd3.5集群部署和使用 1. etcd集群部署2. etcd集群操作3. 新增etcd集群節點1. etcd集群部署 etcd3.5官網站點: ?? https://etcd.io/docs/v3.5/op-guide/clustering/ ?? https://etcd.io/docs/v3.5/tutorials/how-to-setup-cluster/ [root@localh…

helm安裝配置jenkins

1、k8s1.28.2、helm3.12.0&#xff0c;集群搭建 查看節點運行情況 kubectl get node -o wide openebs部署情況 kubectl get sc -n openebs 2、添加Jenkins Helm倉庫 helm repo add jenkins https://charts.jenkins.iohelm repo update# 查看版本 helm search repo -l jen…

Wagtail - Django 內容管理系統

文章目錄 一、關于 Wagtail1、項目概覽2、相關鏈接資源3、功能特性 二、安裝配置三、使用入門1、快速開始2、兼容性 四、其它社區與支持1、社區資源2、商業支持 開發貢獻參考項目參考文獻 一、關于 Wagtail 1、項目概覽 Wagtail 是一個基于 Django 構建的開源內容管理系統&am…

Spring AI Alibaba 來啦!!!

博客標題&#xff1a;Spring AI Alibaba&#xff1a;深度解析其優勢與阿里云生態的無縫集成 引言 隨著人工智能技術的快速發展&#xff0c;越來越多的企業和開發者開始關注如何將 AI 技術融入到現有的應用開發框架中。Spring AI 作為 Spring 框架在 AI 領域的擴展&#xff0c;…

【論文閱讀39】PINN求邊坡內時空變化的地震動響應(位移、速度、加速度)場分布

論文提出了一種基于物理信息神經網絡&#xff08;PINN&#xff09;和極限分析上界定理相結合的巖體邊坡地震穩定性分析框架&#xff0c;重點考慮了邊坡中的預存裂縫對穩定性的影響。 PINN用來求解巖質邊坡內隨時間和空間變化的地震動響應&#xff08;位移、速度、加速度&#…

驅動開發系列59- 再述如何處理硬件中斷

在本文中,我們將重點討論編寫設備驅動程序時一個非常關鍵的方面:什么是硬件中斷,更重要的是,作為驅動開發者,你該如何準確地處理它們。事實上,大量的外設(也就是你可能會為其編寫驅動的設備)在需要操作系統或驅動程序立即響應時,通常會通過觸發硬件中斷的方式發出請求…

【藍牙】Linux Qt4查看已經配對的藍牙信息

在Linux系統中使用Qt4查看已配對的藍牙設備信息&#xff0c;可以基于DBus與BlueZ&#xff08;Linux下的藍牙協議棧&#xff09;進行交互。以下是一個實現方案&#xff1a; 1. 引入必要的庫和頭文件 確保項目中包含DBus相關的頭文件&#xff0c;并鏈接QtDBus模塊&#xff1a; …

企業客戶數據防竊指南:從法律要件到維權實操

作者&#xff1a;邱戈龍、曾建萍 ——上海商業秘密律師 在數字經濟時代&#xff0c;客戶數據已成為企業最核心的資產之一。然而&#xff0c;數據顯示&#xff0c;近三年全國商業秘密侵權案件中&#xff0c;涉及客戶信息的案件占比高達42%&#xff0c;但最終進入刑事程序的不足…

WHAT - React Native 中 Light and Dark mode 深色模式(黑暗模式)機制

文章目錄 一、Light / Dark Mode 的原理1. 操作系統層2. React Native 如何獲取?3. 樣式怎么跟著變?二、關鍵代碼示例講解代碼講解:三、自定義主題四、運行時自動更新五、核心原理一張圖組件應用例子最小示例:動態樣式按鈕的動態樣式如何封裝一套自定義主題四、如何和 Them…

[25-cv-07396、25-cv-07470]Keith代理Anderson這9張版權圖,除此之外原告還有50多個版權!賣家要小心!

Anderson 版權圖 案件號&#xff1a;25-cv-07396、25-cv-07470 立案時間&#xff1a;2025年7月2日 原告&#xff1a;Anderson Design Group, Inc. 代理律所&#xff1a;Keith 原告介紹 原告是美國的創意設計公司&#xff0c;成立于1993年&#xff0c;簡稱ADG&#xff0c;一…

五、代碼生成器:gen項目開發

目錄 1.新建數據庫 2.nacos中配置文件 3.gen項目配置代碼 4.前端項目 我們再項目中需要代碼生成器,這邊自己開發一個gen代碼生成器服務。 1.新建數據庫 CREATE TABLE `gen_table` (`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT 編號,`table_name` varchar(200) DEF…

UI前端大數據處理安全性保障:數據加密與隱私保護策略

hello寶子們...我們是艾斯視覺擅長ui設計、前端開發、數字孿生、大數據、三維建模、三維動畫10年經驗!希望我的分享能幫助到您!如需幫助可以評論關注私信我們一起探討!致敬感謝感恩! 一、引言&#xff1a;大數據時代前端安全的核心挑戰 在數據驅動業務發展的今天&#xff0c;U…

基于 alpine 構建 .net 的基礎鏡像

準備基礎鏡像 alpine:3.22 完整的 Dockerfile 如下&#xff1a; # 使用官方的 Alpine 3.22 鏡像作為基礎鏡像 FROM --platform$TARGETPLATFORM alpine:3.22 AS builder# 設置環境變量 ENV DEBIAN_FRONTENDnoninteractive# 創建目錄結構 WORKDIR /app# 備份原始源文件并更換為…

Blob分析及形態學分析

目錄 Blob分析的基本思想&#xff1a; Blob分析主要流程&#xff1a; Blob分析 分割: Binary Threshold 分割: Histogram 分割: 動態閾值 全局閾值與動態局部閾值的比較 形態學處理 連通區域 connetion 形態學算子 特征提取 提取特征 常用相關算子 區域特征&#…

中小河流雨水情監測預報系統解決方案

一、方案概述 中小河流在防洪減災體系中地位關鍵&#xff0c;但由于其數量眾多、分布廣泛&#xff0c;監測預報基礎相對薄弱&#xff0c;易引發洪水災害&#xff0c;威脅沿岸居民生命財產安全。本系統旨在構建完善的中小河流雨水情監測預報體系&#xff0c;提升防洪減災能力。實…

Abase和ByteKV存儲方案對比

Abase 和 ByteKV 是字節跳動內部自研的兩款分布式 KV 存儲系統&#xff0c;雖然都服務于大規模在線業務&#xff0c;但在設計目標、架構模型、適用場景等方面存在顯著差異。以下是核心區別的詳細分析&#xff1a; &#x1f527; ?1. 設計目標與一致性模型? ?Abase?&#x…

JSON的縮進格式方式和緊湊格式方式

將對象轉化為json格式字符串在以縮進的方式顯示 HxParamMsg hxCommMsg new HxParamMsg() {name "Tom",age 25 }; string json JsonConvert.SerializeObject(hxCommMsg); var parsed JToken.Parse(json); string data parsed.ToString(Formatting.Indented); // …

設計模式篇:靈活多變的策略模式

引言&#xff1a;從現實世界到代碼世界的面向對象在商業策略制定中&#xff0c;企業會根據市場環境選擇不同的競爭策略&#xff1b;在軍事行動中&#xff0c;指揮官會根據敵情選擇不同的戰術&#xff1b;在游戲對戰中&#xff0c;玩家會根據局勢調整作戰方式。這種根據情境選擇…

Bitvisse SSH Client 安裝配置文檔

一、軟件功能介紹? Bitvisse SSH Client 是一款功能強大的 SSH 客戶端軟件&#xff0c;具備以下顯著特點&#xff1a;? 豐富的代理隧道協議支持&#xff1a;支持 socks4、socks4a、socks5 和 http 等多種連接代理隧道協議&#xff0c;為網絡連接提供多樣選擇。?便捷的應用…