文章目錄
- 概述
- 一、Redis實現分布式鎖
- 1.1、第一版
- 1.2、第二版
- 1.3、第三版
- 1.3、第四版
- 二、Redisson實現分布式鎖核心源碼分析
- 2.1、加鎖核心源碼
- 2.2、鎖續期核心源碼
- 2.3、重試機制核心源碼
- 2.4、解鎖核心源碼
- 總結
概述
??傳統的單機鎖(Synchronized,ReentrantLock)都是進程級別的鎖,無法應對服務多實例部署的場景(每個服務實例都有自己的進程),如果需要跨進程加鎖,則需要引入第三方工具對于進程統一管理。
??使用Redis可以實現簡易的分布式鎖,而最常見的成熟的分布式鎖方案是Redisson
。
一、Redis實現分布式鎖
??案例工程:減庫存,沒有加鎖控制,在高并發的場景下必然會出現超賣的問題。如果是在單點部署的情況下,可以通過本地鎖
解決,但是目前服務多點部署,本地鎖
的方案無法進行控制。
@Service
public class DistributedLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum,int orderId){//業務代碼int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));}
}
1.1、第一版
??以stock:lock:
前綴加上orderId
作為key,使用setIfAbsent
進行加鎖,setIfAbsent
命令是Redis原生的setNx
命令在客戶端的體現,setNx
命令是僅僅當設置的key不存在時,才可以成功,保證互斥性。
??這樣做存在的問題是,如果在執行業務代碼的過程中,出現了異常,那么解鎖的代碼則永遠無法執行,造成死鎖
@Service
public class DistributedLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum,int orderId){Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");if (lock){//業務代碼int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));stringRedisTemplate.delete(STOCK_LOCK + orderId);}}
}
1.2、第二版
??針對第一版的問題進行改造,將解鎖的代碼放到finally
代碼塊中。這種方案依舊會存在問題,因為finally
代碼塊只能保證程序出錯時最終執行,無法保證服務器宕機造成的死鎖,所以最好在加鎖時設置一個超時時間,到期自動釋放。
public void deduceStock(int orderNum,int orderId){try {Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");if (lock){//業務代碼int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));stringRedisTemplate.delete(STOCK_LOCK + orderId);}} catch (Exception e) {}finally {stringRedisTemplate.delete(STOCK_LOCK + orderId);}}
1.3、第三版
??設置超時時間,可以使用stringRedisTemplate.expire
方法,但是這樣寫:
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");
stringRedisTemplate.expire(STOCK_LOCK + orderId, 10, TimeUnit.SECONDS);
??是不具有原子性的,需要分為兩條命令執行,如果在執行兩條命令之間出現問題,依舊會造成死鎖的問題。在Redis的層面提供了一條命令 set NX EX
,保證設置超時時間和加鎖是原子性操作:
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock",10, TimeUnit.SECONDS);
??加鎖時存在的問題看似是解決了,但是解鎖的代碼:
stringRedisTemplate.delete(STOCK_LOCK + orderId);
??會存在一種情況:
- 線程一獲取到了鎖,然后在執行業務代碼的時候陷入了阻塞。
- 線程一的鎖到期自動釋放。
- 線程二獲取到了鎖,執行業務代碼
- 線程一從阻塞狀態恢復,執行完業務代碼,要執行最終的解鎖邏輯
- 線程一將線程二的鎖解鎖。
1.3、第四版
??為了避免當前線程將其他線程的鎖誤解鎖,需要在加鎖時加入自己的線程唯一標識,并且在解鎖時進行判斷:
??注意,不要用當前Thread.currentThread().getId()
方法去獲取線程ID,因為不同機器上的線程ID可能會重復。Redisson底層也不是直接用上述的API獲取的線程ID,而是和UUID進行了拼接。
//加鎖
String threadId = UUID.randomUUID().toString().replace("-","");
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, threadId,10, TimeUnit.SECONDS);//解鎖
String threadIdFromRedis = stringRedisTemplate.opsForValue().get("STOCK_LOCK + orderId");
if (threadId.equals(threadIdFromRedis)){stringRedisTemplate.delete(STOCK_LOCK + orderId);
}
??但是這樣寫, 解鎖和之前的加鎖設置超時時間有同樣的問題,都是操作分為了兩步,不能保證原子性。 在解鎖的判斷上,Redis并沒有提供原子性的命令,需要自己去通過lua腳本實現。
??經過四版改動,自己用Redis實現的分布式鎖已經基本可用了,但是深究下來依舊存在一些問題或不足:
- 如果執行業務代碼的時間,超過了設置的鎖超時時間,當前邏輯是沒有自動續期的。
- 當前的邏輯不支持鎖重入。
- 當前的邏輯沒有實現重試機制,獲取不到鎖的線程無法進行重試。
二、Redisson實現分布式鎖核心源碼分析
??相比較于自己通過set NX EX + lua
腳本實現的分布式緩存鎖,Redisson是更為成熟的方案,也推薦在生產環境使用。Redisson分布式鎖在API層面是非常簡單的:
public class RedissonLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate Redisson redisson;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum, int orderId) {//獲取分布式鎖RLock lock = redisson.getLock(STOCK_LOCK + orderId);try {//加分布式鎖,可以指定超時時間,沒有指定超時時間默認30s,底層會自動續期。lock.lock();//業務代碼int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0) {stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));} catch (Exception e) {} finally {lock.unlock();}}
}
??關鍵代碼:
//獲取分布式鎖
RLock lock = redisson.getLock(STOCK_LOCK + orderId);
//加分布式鎖,可以指定超時時間,沒有指定超時時間默認30s,底層會自動續期。
lock.lock();
//解鎖
lock.unlock();
2.1、加鎖核心源碼
??跟蹤lock.lock();
,進入lockInterruptibly
:
??首先第一次加鎖,進入的是tryAcquire
方法,最終的核心邏輯是:
??底層執行的是一段lua腳本,lua腳本和pipeline類似,也是可以將命令批量執行。雖然腳本中分了很多條命令,但是其他客戶端要等到當前客戶端的lua腳本全部執行完,才能執行腳本。
- KEYS[1]:是作為當前分布式鎖的Key,也就是用戶在
redisson.getLock
時傳入的。 - ARGV[1]:是默認的超時時間,30s。
- ARGV[2]:是當前線程的唯一標識,用線程ID拼接上UUID。
# 加鎖的邏輯
# 當前分布式鎖的key不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
# 調用hset命令,key是分布式鎖的key,value的key是當前線程的唯一標識,用線程ID拼接上UUID。 value是1(重入次數)
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
# 設置超時時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 返回null
"return nil; " +
"end; " +
# 重入的邏輯
# 當前分布式鎖的key存在,并且是當前線程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
# 重入次數 + 1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
# 設置超時時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 返回null
"return nil; " +
"end; " +
# 查詢指定鍵的剩余生存時間,并且返回
"return redis.call('pttl', KEYS[1]);"
??上面這個腳本的執行,包含了可重入鎖
和初次加鎖
的邏輯,最終還會返回當前鎖的剩余時間
。
2.2、鎖續期核心源碼
??Redisson鎖的續期,也稱為看門狗機制。如果要實現鎖續期,常見的設計思想是在業務線程執行時,開啟一個守護線程,對業務線程進行監控,如果鎖到期,業務線程還沒有執行完,就執行續期的邏輯
??在Redisson中的實現,調用完tryLockInnerAsync
方法后,會回調operationComplete
,通過future.getNow();
獲取到加鎖的結果,上面的lua腳本,在加鎖成功和重入成功后,都會返回null。
??進入scheduleExpirationRenewal
方法,該方法就是實現續期的核心方法實現,類似于一個延遲任務的線程池,延遲30/3 = 10s執行,整個方法分為兩部分
??首先依舊是執行一段lua腳本**(KEYS[1],ARGV[2],ARGV[1] 和第一段加鎖時的lua腳本參數含義相同)**
# 當前分布式鎖的key存在,并且是當前線程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
# 進行續期30s
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 續期成功就返回1
"return 1; " +
"end; " +
# 否則返回 0
"return 0;",
??第二部分則是拿到lua腳本執行的結果,遞歸調用scheduleExpirationRenewal
方法,延遲10s執行,最終的結果是每隔10s進行一次續期,每次續期30s
2.3、重試機制核心源碼
??加鎖時的lua腳本,如果沒有加鎖或者重入成功,那么最終返回的是key的剩余生存時間
:
??返回到方法的最外層,在lockInterruptibly
中執行自旋重試的邏輯。這里的自旋重試并非是在while循環中不斷地循環,而是有一定的間隔時間。
??在進入while循環后首先會再次嘗試獲取鎖,如果失敗了,就通過Semaphore
的API,在規定的TTL毫秒內嘗試獲取許可,如果有其他線程釋放(即喚醒),當前線程就會繼續執行。如果超時仍未獲取到許可,則返回 false。
??如果業務代碼執行的時間短于設置的鎖超時時間,那么其他等待鎖的線程并不會阻塞到超時時間后再去競爭鎖,在執行while循環之前,會通過redis的發布訂閱模型
,將自身存入一個隊列中。
??喚醒隊列中元素的邏輯,在解鎖中。
2.4、解鎖核心源碼
??解鎖同樣是通過lua腳本,將判斷線程標識和解鎖組成原子性的操作,解鎖的lua腳本在unlockInnerAsync
方法中:
- KEYS[1]:當前分布式鎖的key
- KEYS[2]:當前分布式鎖的key 拼接上
redisson_lock__channel
- ARGV[1]:解鎖消息標識,默認0L
- ARGV[2]:鎖超時釋放時間
- ARGV[3]:當前線程的唯一標識,用線程ID拼接上UUID。
??主線程在解鎖的時候會往隊列中發送給一條消息,喚醒等待線程:
- 當前鎖不存在,超時釋放了
- 存在并且解鎖成功
# 當前key對應的分布式鎖不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
# 發布解鎖消息標識到當前分布式鎖的key 拼接上 redisson_lock__channel
"redis.call('publish', KEYS[2], ARGV[1]); " +
# 返回1
"return 1; " +
"end;" +
# 當前key對應的分布式鎖非本線程持有
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
# 返回null
"return nil;" +
"end; " +
# 可重入鎖的解鎖,重入次數 - 1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
# 重入次數>0
"if (counter > 0) then " +
# 重新設置超時時間
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
# 返回
"return 0; " +
"else " +
# 刪除當前key對應的分布式鎖
"redis.call('del', KEYS[1]); " +
# 發布解鎖消息標識到當前分布式鎖的key 拼接上 redisson_lock__channel
"redis.call('publish', KEYS[2], ARGV[1]); " +
# 返回1
"return 1; "+
"end; " +
"return nil;",
??消費者 (正在阻塞等待的線程) 接受到了消息,會回調LockPubSub
的onmessage
方法,被喚醒然后重新爭搶鎖。