Redisson 源碼解析 —— 分布式鎖實現過程
在分布式系統中,分布式鎖 是非常常見的需求,用來保證多個節點之間的互斥操作。Redisson
是 Redis 的一個 Java 客戶端,它提供了對分布式鎖的良好封裝。本文將從源碼角度剖析 Redisson 的分布式鎖實現過程。
一、分布式鎖的基本需求
一個健壯的分布式鎖需要滿足以下條件:
- 互斥性:同一時間只能有一個客戶端持有鎖。
- 死鎖避免:客戶端宕機后,鎖不會永久被占用。
- 可重入性:同一線程可多次獲取同一把鎖。
- 高可用性:在 Redis 集群模式下仍能正常工作。
- 超時釋放:設置持有鎖時間,時間超過鎖釋放,避免死鎖。
- 鎖時間續約:看門狗機制,避免業務未執行完畢鎖釋放,導致并發問題。
二、Redisson 分布式鎖的核心實現類以及加鎖方法
在源碼中,Redisson 提供了多種鎖的實現,最核心的是:
RedissonLock
—— 基于 Redis 的可重入鎖實現RedissonReadWriteLock
—— 讀寫鎖RedissonFairLock
—— 公平鎖
我們主要關注 RedissonLock
的實現。
RLock lock = redissonClient.getLock("32r");lock.方法名()
常用加鎖方法:
- lock():獲取鎖,獲取不到會一致阻塞直到獲取。通過看門狗機制續期,默認持有鎖是30s,每隔10s續期一次。
- lock(long l, TimeUnit timeUnit):獲取鎖,獲取不到會一致阻塞直到獲取。持有鎖時間是手動入參的timeUnit,到期釋放鎖。
- tryLock(long waite, long l1, TimeUnit timeUnit) :獲取鎖失敗后,自旋,等待 waite 秒,獲取不到返回false,獲取到,持有鎖時間是 l1,單位 timeUnit。
- tryLock():嘗試獲取一次鎖,如果獲取不到,立即返回 false,獲取鎖成功,觸發 看門狗續期機制(和 lock() 一樣)。
- tryLock(long waitTime, TimeUnit unit):在 waitTime 時間窗口內,不斷嘗試執行,范圍內獲取鎖失敗,返回false。獲取成功,啟動看門狗機制。
RLock lock = redissonClient.getLock("32r");
我們可以看到 redissonClient 調用這個方法時候,客戶端返回的是RedissonLock
這個類
所以對應的我們主要關注 RedissonLock
子類和父類RedissonBaseLock
這里我主要分析 lock()
方法的調用,其他鎖的邏輯都是參考這個去完善的。
三、加鎖流程解析
1. 調用入口
當我們執行:
RLock lock = redisson.getLock("myLock");
lock.lock();
進入RedissonLock#lock
方法:
可以看到調用lock方法其實都是調用的另外一個lock(long leaseTime, TimeUnit unit, boolean interruptibly)
方法。
對應真正調用的lock()
方法:
/*** 獲取分布式鎖的核心方法* @param leaseTime 鎖的租約時間* @param unit 時間單位* @param interruptibly 是否允許中斷* @throws InterruptedException 當線程被中斷時拋出*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 獲取當前線程ID,用于標識鎖的持有者long threadId = Thread.currentThread().getId();// 嘗試獲取鎖,返回剩余的TTL(生存時間)// 如果返回null表示獲取鎖成功,否則返回鎖的剩余過期時間Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);// 如果ttl不為null,說明鎖獲取失敗,需要等待if (ttl != null) {// 訂閱鎖釋放的通知,返回一個Future對象CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId);// 設置訂閱操作的超時時間this.pubSub.timeout(future);// 根據是否允許中斷來獲取訂閱結果RedissonLockEntry entry;if (interruptibly) {// 允許中斷的方式獲取結果entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future);} else {// 不允許中斷的方式獲取結果entry = (RedissonLockEntry)this.commandExecutor.get(future);}try {// 自旋等待鎖釋放while(true) {// 再次嘗試獲取鎖ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);// 如果獲取鎖成功(ttl為null),則退出循環if (ttl == null) {return;}// 如果ttl大于等于0,說明鎖還存在,需要等待指定的時間if (ttl >= 0L) {try {// 使用信號量等待指定的ttl時間entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {// 如果允許中斷,直接拋出異常if (interruptibly) {throw e;}// 如果不允許中斷,繼續等待entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 如果ttl小于0,表示需要無限等待if (interruptibly) {// 允許中斷的無限等待entry.getLatch().acquire();} else {// 不允許中斷的無限等待entry.getLatch().acquireUninterruptibly();}}}} finally {// 無論成功與否,都要取消訂閱,釋放資源this.unsubscribe(entry, threadId);}}
}
這時候我們只需要重點關注對應的this.tryAcquire(-1L, leaseTime, unit, threadId);
這個方法。
源碼圖如下:
對應的Java代碼解釋:
/*** 異步嘗試獲取鎖* @param waitTime 等待時間* @param leaseTime 鎖的租約時間* @param unit 時間單位* @param threadId 線程ID* @return 返回鎖的剩余TTL時間,null表示獲取鎖成功*/
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 聲明TTL剩余時間的Future對象RFuture<Long> ttlRemainingFuture;// 判斷是否指定了租約時間if (leaseTime > 0L) {// 使用指定的租約時間嘗試獲取鎖ttlRemainingFuture = this.<Long>tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 使用默認的內部鎖租約時間嘗試獲取鎖ttlRemainingFuture = this.<Long>tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 對獲取鎖的結果進行后續處理CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {// 如果ttlRemaining為null,說明成功獲取到鎖if (ttlRemaining == null) {// 判斷是否指定了租約時間if (leaseTime > 0L) {// 將指定的租約時間轉換為毫秒并存儲到內部鎖租約時間this.internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 如果沒有指定租約時間,啟動鎖的自動續期機制// 防止鎖因過期而被誤釋放this.scheduleExpirationRenewal(threadId);}}// 返回TTL剩余時間(null表示獲取鎖成功,非null表示需要等待的時間)return ttlRemaining;});// 將CompletionStage包裝成RFuture并返回return new CompletableFutureWrapper(f);
}
這里最重要的是調用對應的tryAcquire
里面的tryLockInnerAsync
方法,方法詳解如下:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});}
這個tryLockInnerAsync
方法主要是執行對應的腳本,然后返回剩余的時間,如果獲取鎖成功返回 nil ,獲取鎖失敗會返回 持有鎖的鎖過期時間。
核心 Lua 腳本詳解如下:
Redisson 并不是簡單地 SETNX
,而是使用 Lua 腳本 來保證操作的原子性。
加鎖腳本大致邏輯如下:
if (redis.call('exists', KEYS[1]) == 0) then-- 鎖不存在,設置鎖并綁定到線程redis.call('hset', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;-- 鎖已存在,判斷是否是當前線程重入
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;return redis.call('pttl', KEYS[1]);
解釋:
- KEYS[1]: 鎖的 key (如
myLock
) - ARGV[1]: 鎖的過期時間(默認 30s)
- ARGV[2]: 當前線程標識(由 UUID + 線程 ID 組成)
執行流程:
- 如果鎖不存在,設置
hash
,key = 線程標識,value = 1。 - 如果鎖存在且是自己線程,則遞增重入次數。
- 否則返回鎖的剩余過期時間。
問題延伸:
Redis不是單線程嗎,高并發線程下不是線程安全嗎?為什么還需要使用Lua腳本保證原子性?
想想為什么使用lua腳本,你可以想象一下高并發場景下,Redis執行命令是單線程的,Redis只能保證對應的單條命令是原子性的,不能保證多條命令的原子性,假設線程A執行:redis.call('exists', KEYS[1]) == 0
結束后,線程B搶到執行權,然后線程B也執行:redis.call('exists', KEYS[1]) == 0
,然后后續大家都會進行對應的鎖設置,導致線程A上鎖可能會被覆蓋,不過可以用hsetnx解決,但是后續可能判斷還是會有并發問題。使用 lua 腳本可以將多條命令整合成類似一條命令,redis執行,從而保證原子性
WatchDog 自動續期機制
Redisson 的一大亮點是 鎖續期機制:
- 當線程獲取鎖后,會啟動一個 看門狗定時任務,默認每隔
lockWatchdogTimeout / 3
秒續期一次(默認 30s → 10s)。 - 如果業務邏輯執行很久,不用擔心鎖被提前釋放。
- 如果線程宕機,定時任務不再執行,鎖會在超時后自動釋放。
判斷對應的leasetime有沒有指定,然后執行對應的續期或不續期的方法
源碼關鍵點在:scheduleExpirationRenewal()
方法。
關鍵代碼
CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {if (ttlRemaining == null) {if (leaseTime > 0L) {this.internalLockLeaseTime = unit.toMillis(leaseTime);} else {this.scheduleExpirationRenewal(threadId);}}return ttlRemaining;});
根據對應的沒指定leaseTime ,然后執行對應的RedissonBaseLock#scheduleExpirationRenewal
對應的方法邏輯如下:
/*** 調度鎖的過期時間續期任務* 為指定線程啟動自動續期機制,防止鎖因過期而被誤釋放* @param threadId 需要續期的線程ID*/
protected void scheduleExpirationRenewal(long threadId) {// 創建新的過期時間管理條目ExpirationEntry entry = new ExpirationEntry();// 嘗試將新條目放入續期映射表中,如果已存在則返回舊條目// 使用putIfAbsent確保原子性操作,避免并發問題ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);// 判斷是否已經存在續期任務if (oldEntry != null) {// 如果已存在續期任務,只需將當前線程ID添加到現有條目中// 這種情況發生在同一個鎖被多個線程(可重入鎖)或同一線程多次獲取時oldEntry.addThreadId(threadId);} else {// 如果是首次為這個鎖創建續期任務// 將當前線程ID添加到新創建的條目中entry.addThreadId(threadId);try {// 啟動實際的續期任務// 這會創建定時任務,定期延長鎖的過期時間this.renewExpiration();} finally {// 檢查當前線程是否被中斷if (Thread.currentThread().isInterrupted()) {// 如果線程被中斷,取消剛剛啟動的續期任務// 防止資源泄漏和無效的續期操作this.cancelExpirationRenewal(threadId);}}}
}
這個通過一個創建一個ExpirationEntry 然后通過EXPIRATION_RENEWAL_MAP判斷是否存在,如果條目不存在就啟動對應的自動續期機制任務 renewExpiration()
RedissonBaseLock#renewExpiration()
方法如下:
/*** 啟動鎖的自動續期機制* 創建定時任務,定期延長鎖的過期時間,防止鎖因超時而被釋放*/
private void renewExpiration() {// 從續期映射表中獲取當前鎖的過期時間管理條目ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());// 如果條目存在,說明需要為這個鎖設置續期任務if (ee != null) {// 創建定時任務,在鎖租約時間的1/3處執行續期操作// 選擇1/3時間點是為了在鎖過期前有足夠的時間進行續期Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {// 定時任務執行時,重新獲取續期條目(防止在延遲期間被移除)ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());// 雙重檢查:確保續期條目仍然存在if (ent != null) {// 獲取需要續期的第一個線程ID// 對于可重入鎖,可能有多個線程ID,取第一個進行續期Long threadId = ent.getFirstThreadId();// 如果線程ID有效,執行續期操作if (threadId != null) {// 異步執行鎖的續期操作CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);// 處理續期結果future.whenComplete((res, e) -> {// 如果續期過程中發生異常if (e != null) {// 記錄錯誤日志RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);// 從續期映射表中移除條目,停止續期RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());} else {// 續期操作成功完成if (res) {// 如果續期成功(返回true),遞歸調用繼續下一輪續期// 這樣就形成了持續的自動續期循環RedissonBaseLock.this.renewExpiration();} else {// 如果續期失敗(返回false),說明鎖已經不存在或不屬于當前線程// 取消續期任務,清理資源RedissonBaseLock.this.cancelExpirationRenewal((Long)null);}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 在租約時間的1/3處執行續期// 將定時任務保存到條目中,用于后續的取消操作ee.setTimeout(task);}
}
最后完美結束對應的獲取鎖的過程,返回一個對應的時間值 ttl
如果返回的是null代表加鎖成功,否則是加鎖失敗,此時會進行訂閱持有鎖者this.subscribe(threadId)
,如果釋放鎖會通知這個獲取鎖失敗的線程,會將這個線程喚醒。
四、解鎖流程解析
解鎖的流程
解鎖時同樣使用 Lua 腳本,保證原子性:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) thenreturn nil;
end;local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1);if (counter > 0) thenreturn 0;
elseredis.call('del', KEYS[1]);return 1;
end;
解釋:
- 檢查當前線程是否持有鎖。
- 如果是可重入鎖,計數 -1。
- 如果計數為 0,則刪除鎖。
六、源碼設計亮點
- Lua 腳本保證原子性,避免分布式并發問題。
- 可重入性設計:使用
hash
結構存儲線程標識和重入次數。 - 鎖超時釋放設計:避免死鎖問題。
- 看門狗機制:保證長時間任務也能安全持有鎖。
- 異步化設計:Redisson 提供
lockAsync()
等方法,方便高并發場景。
七、總結
- Redisson 的分布式鎖實現基于 Redis + Lua 腳本,解決了互斥、可重入和死鎖問題。
- 看門狗續期機制 是 Redisson 的亮點,保證了業務執行時間不可預測的情況下的安全性。
- 在生產環境中,Redisson 的分布式鎖相較于
SETNX + EXPIRE
的手寫版本,更加健壯和可靠。