前言:
在上篇博客中,我們探討了單機模式下如何通過悲觀鎖(synchronized)實現"一人一單"功能。然而,在分布式系統或集群環境下,單純依賴JVM級別的鎖機制會出現線程并發安全問題,因為這些鎖無法跨JVM生效。
(詳情請參考我的上一篇博客:Redis實戰-單機項目下實現優惠劵秒殺【萬字長文】)
那么,在分布式環境下如何正確實現"一人一單"功能呢?本文將深入探討分布式鎖的實現方案,重點介紹基于Redis和Lua腳本的分布式鎖實現,并分析Redission框架的源碼實現。
今日所學:
- 分布式鎖原理及其實現
- redis分布式可能造成的誤刪問題
- 引入lua腳本解決多條命令的原子性問題
- Redisson源碼解析
1.分布式鎖原理及其實現
1.1 什么是分布式鎖
分布式鎖是一種在分布式系統中用于協調多個節點或進程對共享資源進行互斥訪問的機制。它的核心目標是確保在分布式環境下,同一時間只有一個節點能夠執行關鍵操作(如修改共享數據、訪問數據庫等),避免并發導致的數據不一致問題。
一句話總結:
分布式鎖是在分布式系統中協調多個節點對共享資源訪問的一種同步機制。
同俗一點解釋就是,synchronized
只能管住自己JVM里的線程,而Redis分布式鎖相當于一個"全局管理員",能管住所有JVM的線程,讓它們在集群里排隊用資源。就好比單機游戲(只能在本機內存檔)和網游游戲的區別(所有電腦都可以讀取存檔)
分布式鎖的特性有:
- 互斥性:同一時刻只能有一個節點持有鎖。
- 可重入性:同一個節點多次請求鎖時能夠成功(避免死鎖)。
- 超時釋放:鎖需設置超時時間,防止節點崩潰后鎖無法釋放。
- 高可用性:鎖服務需具備容錯能力,避免單點故障。
- 高性能:獲取和釋放鎖的操作應高效。
1.2 常見的分布式鎖
常見的分布式鎖有三種
Mysql:mysql本身就帶有鎖機制,但是由于mysql性能本身一般,所以采用分布式鎖的情況下,其實使用mysql作為分布式鎖比較少見
Redis:redis作為分布式鎖是非常常見的一種使用方式,現在企業級開發中基本都使用redis或者zookeeper作為分布式鎖,利用setnx這個方法,如果插入key成功,則表示獲得到了鎖,如果有人插入成功,其他人插入失敗則表示無法獲得到鎖,利用這套邏輯來實現分布式鎖
Zookeeper:zookeeper也是企業級開發中較好的一個實現分布式鎖的方案,由于黑馬的那套視頻并不講解zookeeper的原理和分布式鎖的實現,所以不過多闡述
這里我們使用redis作為分布式鎖。
1.3 redis分布式鎖的實現
1.3.1 核心思路:
分布式鎖應該說但凡是鎖都要實現的兩個基本方法:
1.獲取鎖:
- ?互斥:確保只能有一個線程獲取鎖
- ?非阻塞:嘗試一次,成功返回true,失敗返回false
2. 釋放鎖:
- ?手動釋放
- ?超時釋放:獲取鎖時添加一個超時時間
那么我們改如何去實現呢,redis中有什么指令或者說方法可以滿足這兩點呢?
我們先說獲取鎖,核心思路是我們利用redis 的setNx 方法,當有多個線程進入時,我們就利用該方法,第一個線程進入時,redis 中就有這個key 了,返回了1,如果結果是1,則表示他搶到了鎖,那么他去執行業務,然后再刪除鎖,退出鎖邏輯,沒有搶到鎖的哥們,等待一定時間后重試即可
想通了這個,釋放鎖就非常簡單了,要么等業務執行完自動刪除,要么就是服務宕機或者業務執行時間過長到達鎖的超時時間,鎖自動刪除。
最后問題就變成了如何設置一個指令(只能一條指令以保證原子性),既能實現setnx保證鎖的互斥性,同時增加過期時間,防止死鎖。
1.3.2 代碼實現:
1.獲取鎖(util包下SimpleRedisLock類)
邏輯思路很簡單:
1.先獲取線程標識
2.獲取鎖(等同于redis中的SET lock_key unique_value NX PX 30000,創建一個redis字段)nx等同于setnx代表互斥,ex設置過期時間,一條指令保證了原子性。
3.根據返回的boolean確定是否獲取鎖成功(成功返回1,失敗返回0),這邊使用Boolean.TRUE是為了防止Boolean包裝類有返回null的情況
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {
? ? // 獲取線程標示
? ? String threadId = Thread.currentThread().getId()
? ? // 獲取鎖
? ? Boolean success = stringRedisTemplate.opsForValue()
? ? ? ? ? ? .setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
? ? return Boolean.TRUE.equals(success);
}
2.釋放鎖(utils下SimpleRedisLock類):
要釋放鎖時給相應的redis鎖刪除就行
public void unlock() {
? ? //通過del刪除鎖
? ? stringRedisTemplate.delete(KEY_PREFIX + name);
}
最后在service層下的VoucherOrderServiceImpl類下修改相應的代碼:
@Override
? ? public Result seckillVoucher(Long voucherId) {
? ? ? ? // 1.查詢優惠券
? ? ? ? SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
? ? ? ? // 2.判斷秒殺是否開始
? ? ? ? if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
? ? ? ? ? ? // 尚未開始
? ? ? ? ? ? return Result.fail("秒殺尚未開始!");
? ? ? ? }
? ? ? ? // 3.判斷秒殺是否已經結束
? ? ? ? if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
? ? ? ? ? ? // 尚未開始
? ? ? ? ? ? return Result.fail("秒殺已經結束!");
? ? ? ? }
? ? ? ? // 4.判斷庫存是否充足
? ? ? ? if (voucher.getStock() < 1) {
? ? ? ? ? ? // 庫存不足
? ? ? ? ? ? return Result.fail("庫存不足!");
? ? ? ? }
? ? ? ? Long userId = UserHolder.getUser().getId();
? ? ? ? //創建鎖對象(新增代碼)
? ? ? ? SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
? ? ? ? //獲取鎖對象
? ? ? ? boolean isLock = lock.tryLock(1200);
? ? ? ? //加鎖失敗
? ? ? ? if (!isLock) {
? ? ? ? ? ? return Result.fail("不允許重復下單");
? ? ? ? }
? ? ? ? try {
? ? ? ? ? ? //獲取代理對象(事務)
? ? ? ? ? ? IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
? ? ? ? ? ? return proxy.createVoucherOrder(voucherId);
? ? ? ? } finally {
? ? ? ? ? ? //釋放鎖
? ? ? ? ? ? lock.unlock();
? ? ? ? }
? ? }
2.redis分布式鎖可能造成的誤刪問題
2.1 問題分析
我們假設持有鎖的線程在鎖的內部出現了阻塞,導致他的鎖自動釋放,這時其他線程,線程2來嘗試獲得鎖,就拿到了這把鎖,然后線程2在持有鎖執行過程中,線程1反應過來,繼續執行,而線程1執行過程中,走到了刪除鎖邏輯,此時就會把本應該屬于線程2的鎖進行刪除,這就是誤刪別人鎖的情況說明
2.2 解決思路
解決方案就是在每個線程釋放鎖的時候,去判斷一下當前這把鎖是否屬于自己,如果屬于自己,則不進行鎖的刪除,假設還是上邊的情況,線程1卡頓,鎖自動釋放,線程2進入到鎖的內部執行邏輯,此時線程1反應過來,然后刪除鎖,但是線程1,一看當前這把鎖不是屬于自己,于是不進行刪除鎖邏輯,當線程2走到刪除鎖邏輯時,如果沒有卡過自動釋放鎖的時間點,則判斷當前這把鎖是屬于自己的,于是刪除這把鎖
2.3 需求分析
修改之前的分布式鎖實現,滿足:
在獲取鎖時存入線程標示(可以用UUID表示,在釋放鎖時先獲取鎖中的線程標示,判斷是否與當前線程標示一致
-
如果一致則釋放鎖
-
如果不一致則不釋放鎖
核心邏輯:在存入鎖時,放入自己線程的標識,在刪除鎖時,判斷當前這把鎖的標識是不是自己存入的,如果是,則進行刪除,如果不是,則不進行刪除
2.4 代碼實現
在util包下SimpleRedisLock類中,對代碼進行修改
具體修改邏輯:
1.生成一個靜態常量UUID隨機數
2.在加鎖操作中,將生成的隨機數與進程編號拼接,得到一個唯一的進程ID(防止多進程下編號相重)
2.在釋放鎖那,從鎖中獲取value值,并跟線程標識進行比較,如果相同。則是自己的鎖,可以刪除,如果不同,則不能刪除
加鎖
```java
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
? ?// 獲取線程標示
? ?String threadId = ID_PREFIX + Thread.currentThread().getId();
? ?// 獲取鎖
? ?Boolean success = stringRedisTemplate.opsForValue()
? ? ? ? ? ? ? ? .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
? ?return Boolean.TRUE.equals(success);
}
```
釋放鎖
```java
public void unlock() {
? ? // 獲取線程標示
? ? String threadId = ID_PREFIX + Thread.currentThread().getId();
? ? // 獲取鎖中的標示
? ? String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
? ? // 判斷標示是否一致
? ? if(threadId.equals(id)) {
? ? ? ? // 釋放鎖
? ? ? ? stringRedisTemplate.delete(KEY_PREFIX + name);
? ? }
}
3.?引入lua腳本解決多條命令的原子性問題
3.1 極端情況說明
線程1現在持有鎖之后,在執行業務邏輯過程中,他正準備刪除鎖,而且已經走到了條件判斷的過程中,也就是確定這把鎖就是自己的了。但是此時線程1出現了阻塞,鎖超時,線程2獲得鎖,執行業務,就在此時,線程1停止阻塞,執行刪除操作(因為已經走過了判斷過程),線程2的鎖被刪除。線程3獲得鎖和線程2并發執行。而導致這的原因是因為判斷和執行刪除是兩個操作,沒能實現操作的原子性。
3.2 lua腳本解決原子性問題思路
Redis提供了Lua腳本功能,在一個腳本中編寫多條Redis命令,確保多條命令執行時的原子性。Lua是一種編程語言,它的基本語法大家可以參考網站:https://www.runoob.com/lua/lua-tutorial.html,這里重點介紹Redis提供的調用函數,我們可以使用lua去操作redis,又能保證他的原子性,這樣就可以實現拿鎖比鎖刪鎖是一個原子性動作了。
3.3 lua語言redis調用
這里重點講下如何在lua語言中調用redis,語法如下:
```lua
redis.call('命令名稱', 'key', '其它參數', ...)
```
例如,我們要在lua語言中執行set name jack,腳本是這樣的
redis,call('set', 'name', 'jack')
在比如,我們要先執行set name Rose, 再執行 get name,則腳本如下:
# 先執行set name Rose
redis.call('set', 'name', 'Rose')
# 再執行 get name
local name = redis.call('get', 'name')
# 返回
return name
注意這里lua是一門弱語言(跟某py一樣),所以沒有什么變量類型只說,全局變量就加個local
寫好腳本以后,需要用Redis命令來調用腳本,調用腳本的常見命令如下:
EVAL?script numkeys [keys] arg[arg...]
比如說要調用set name jack 這個腳本
eval 'return redis.call('set', 'name', 'jack')' 0
這個的0代表這傳入了0個KEYS參數,'name'和‘jack’作為普通字符傳入(就相當于傳入常量,不設變量)
如果腳本中的key、value不想寫死,可以作為參數傳遞。key類型參數會放入KEYS數組,其它參數會放入ARGV數組,在腳本中可以從KEYS和ARGV數組獲取這些參數
EVAL"return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose
其中1代表著傳入的KEYS的參數的數量,比如這條語句因為傳入的參數是1,所以KEYS[1]數組長度為1,name作為key,其他的都作為ARGV參數(當然,這里只傳入了一個Rose,所以ARGV[1]長度也為1)
3.4 代碼實現
接下來我們來回顧一下我們釋放鎖的邏輯:
釋放鎖的業務流程是這樣的
- 獲取鎖中的線程標示
- ?判斷是否與指定的標示(當前線程標示)一致
- 如果一致則釋放鎖(刪除)
- 如果不一致則什么都不做
如果用Lua腳本來表示則是這樣的:
1.獲取Java中傳入的key和線程標識
2. 通過key從redis鎖中獲取線程標識value
3.比較,如果相同,則刪除,不然返回0表示false
-- 鎖的key local key = KEYS[1]-- 當前線程標識 local threadId = ARGV[1]-- 獲取鎖中的線程標識 get key local id = redis.call('get', key)-- 比較線程標示與鎖中的標識是否一致 if(id == ARGV[1]) then-- 釋放鎖 del keyreturn redis.call('del', KEYS[1]) end return 0
lua腳本寫好了,那么怎么在Java中調用呢
我們的RedisTemplate中,可以利用execute方法去執行lua腳本,參數對應關系就如下圖
具體代碼如下(大致看懂什么意思就行):
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
? ? static {
? ? ? ? UNLOCK_SCRIPT = new DefaultRedisScript<>();
? ? ? ? UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
? ? ? ? UNLOCK_SCRIPT.setResultType(Long.class);
? ? }
public void unlock() {
? ? // 調用lua腳本
? ? stringRedisTemplate.execute(
? ? ? ? ? ? UNLOCK_SCRIPT,
? ? ? ? ? ? Collections.singletonList(KEY_PREFIX + name),
? ? ? ? ? ? ID_PREFIX + Thread.currentThread().getId());
}
這里可以看到,經過我們的改造,原本幾行代碼被我們改為1行,使用lua腳本,我們就能夠實現 拿鎖和鎖刪鎖的原子性動作了。
3.5 總結
基于Redis的分布式鎖實現思路:
- ?利用set nx ex獲取鎖,并設置過期時間,保存線程標示
- ?釋放鎖時先判斷線程標示是否與自己一致,一致則刪除鎖
?特性:
- ?利用set nx滿足互斥性
- 利用set ex保證故障時鎖依然能釋放,避免死鎖,提高安全性
- 利用Redis集群保證高可用和高并發特性
做到這,我們先是setnx, ex(防止死鎖)實現了一個簡單的分布式鎖,然后為了解決誤刪問題,我們又引入了線程標識作為評判標準,但是只是在Java代碼中加入if()判斷的話,仍可能出現線程安全問題(不是原子性的),因此我們又引入了lua腳本來解決這個問題。
但是問題來了,引入ex設置過期時間是解決了死鎖問題,但是也造成了另一個問題:時間不好把握,如果業務執行時間過長,還有執行完所就給釋放了怎么辦,這就是所謂的鎖不住問題,為了解決這個問題,我們需要一種機制,當過期時間到了但是業務沒有執行完,我們可以刷新下它的過期時間,相當于給它續費。
4. Redission框架介紹
4.1 setnx鎖存在的問題
我們說下使用setnx使用分布式鎖存在的幾個問題:
1.重入問題
重入問題是指 獲得鎖的線程可以再次進入到相同的鎖的代碼塊中,可重入鎖的意義在于防止死鎖,比如HashTable這樣的代碼中,他的方法都是使用synchronized修飾的,假如他在一個方法內,調用另一個方法,那么此時如果是不可重入的,不就死鎖了嗎?所以可重入鎖他的主要意義是防止死鎖,我們的synchronized和Lock鎖都是可重入的
2.不可重試
是指目前的分布式只能嘗試一次,我們認為合理的情況是:當線程在獲得鎖失敗后,他應該能再次嘗試獲得鎖。
3. 超時釋放
我們在加鎖時增加了過期時間,這樣的我們可以防止死鎖,但是如果卡頓的時間超長,雖然我們采用了lua表達式防止刪鎖的時候,誤刪別人的鎖,但是畢竟沒有鎖住,有安全隱患。
4. 主從一致性
如果Redis提供了主從集群,當我們向集群寫數據時,主機需要異步的將數據同步給從機,而萬一在同步過去之前,主機宕機了,就會出現死鎖問題。
那么如何解決這些問題呢?下面我們就來介紹一個框架Redission
4.2 Redission介紹
什么是Redission呢?
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務,其中就包含了各種分布式鎖的實現。
一句話總結:
Redisson 是一個基于 Redis 的高級 Java 分布式服務框架,提供分布式鎖、數據結構、遠程服務等企業級功能,大幅簡化分布式系統開發。
Redission提供了分布式鎖的多種多樣的功能。
官網地址:https://redission.org
Github地址:https://github.com/redission/redission
4.3 Redission快速入門
1. 在pom.xml引入相應依賴
<dependency>
? ? <groupId>org.redisson</groupId>
? ? <artifactId>redisson</artifactId>
? ? <version>3.13.6</version>
</dependency>
2.在config包下配置相應的Redission客戶端
@Configuration
public class RedissonConfig {
? ? @Bean
? ? public RedissonClient redissonClient(){
? ? ? ? // 配置
? ? ? ? Config config = new Config();
? ? ? ? config.useSingleServer().setAddress("redis://192.168.150.101:6379")
? ? ? ? ? ? .setPassword("123321");
? ? ? ? // 創建RedissonClient對象
? ? ? ? return Redisson.create(config);
? ? }
}
3.進行測試(test包下新建一個RedissionTest類)
1.創建鎖,指定鎖的名稱
2.嘗試獲取鎖,傳入最大等待時間(用于重試),鎖的自動釋放時間,時間單位
3. 判斷是否獲取鎖成功
4。業務執行完釋放鎖
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
? ? //創建鎖(可重入),指定鎖的名稱
? ? RLock lock = redissonClient.getLock("anyLock");
? ? //嘗試獲取鎖,參數分別是:獲取鎖的最大等待時間(期間會重試),鎖自動釋放時間,時間單位
? ? boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
? ? //判斷獲取鎖成功
? ? if(isLock){
? ? ? ? try{
? ? ? ? ? ? System.out.println("執行業務"); ? ? ? ? ?
? ? ? ? }finally{
? ? ? ? ? ? //釋放鎖
? ? ? ? ? ? lock.unlock();
? ? ? ? }??
? ? }
}
4. 測試成功,更改service包下VoucherOrderServiceImpl類相應的邏輯
主要更改有兩點:
1.依賴注入,注入相應的redissionClient類
2.給自定義的simpleRedisLock類創建鎖對象改成使用redissionClient自帶的內置方法創建鎖對象
@Resource private RedissonClient redissonClient;/*** 搶購優惠卷* @param voucherId* @return*/@Override public Result seckillVoucher(Long voucherId) {// 1.查詢優惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2. 判斷秒殺是否開始if(voucher.getBeginTime().isAfter(LocalDateTime.now())){// 尚未開始return Result.fail("秒殺尚未開始");}// 3.判斷秒殺是否結束if(voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒殺已經結束");}// 4. 判斷庫存是否充足if(voucher.getStock() < 1){// 庫存不足return Result.fail("庫存不足");}Long userId = UserHolder.getUser().getId();// 創建鎖對象//SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId); RLock simpleRedisLock = redissonClient.getLock("lock:order:" + userId);// 獲取鎖boolean isLock = simpleRedisLock.tryLock();// 判斷是否獲取鎖成功if(!isLock){// 獲取鎖失敗, 返回錯誤或者重試return Result.fail("一個人只能下一單");}try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} catch (IllegalStateException e) {e.printStackTrace();} finally {simpleRedisLock.unlock();}return null; }
5. Redission源碼解析
首先,我們回顧下setnx存在的幾個問題,并逐一對這幾個問題進行解決。
5.1 hash結構解決不可重入問題
通過設置一個hash結構,讓filed儲存線程標識,value儲存重入次數。
利用唯一Key保證互斥性(不同線程競爭時只有一個能成功創建鎖,exists判斷),同時通過Field記錄線程ID+Value計數實現可重入(同一線程多次獲取鎖時計數器+1)。底層使用Lua腳本保證「判斷存在→設置鎖→設置過期時間」的原子性操作,既防止并發沖突,又支持鎖的重入,最終通過「計數器歸零刪除Key」來安全釋放鎖。
-- 公共變量 local key = KEYS[1] local threadId = ARGV[1] local releaseTime = ARGV[2]-- 取鎖 -- 判斷是否存在 if (redis.call('exists', key) == 0) then-- 不存在,獲取鎖redis.call('hset', key, threadId, '1');-- 設置有效期redis.call('expire', key, releaseTime);return 1; -- 返回結果 end-- 鎖已經存在,判斷threadId是否是自己的 if (redis.call('hexists', key, threadId) == 1) then-- 獲取鎖 重入次數+1redis.call('hincrby', key, threadId, '1');--設置有效期redis.call('expire', key, releaseTime);return 1; endreturn 0;-- 解鎖
-- 判斷當前鎖是否還是被自己鎖持有 if (redis.call('HINCRBY', key, threadId) == 0) thenreturn nil; -- 如果不是自己的,直接返回 end-- 是自己的鎖,則重入次數-1 local count = redis.call('HINCRBY', key, threadId, -1);-- 判斷是否沖入次數已經為零 if (count > 0) then-- 大于0說明不能釋放鎖redis.call('expire', key, releaseTime);return nil; elseredis.call('del', key);return nil; end
接下來我們打開Redission源碼(RLock下實現類RedissionLock? tryLockInnerAsync方法)
在tryLockInnerAsync方法下可以看到如出一轍的加鎖模式:
1.判斷鎖是否存在,不存在創建鎖
2.存在并且是同一線程則value值加一,并刷新過期時間
3.如果鎖存在并且不是為當前線程所持有(沒有進行if語句),則返回當前鎖的剩余時間,也就是鎖到什么時候過期,以此后續確定是否重試。
5.2 信號量和Pubsub機制解決不可重試問題
不可重試問題,主要指的是鎖競爭失敗后無法自動重試。在Redission框架中,這主要依靠異步等待+訂閱發布機制解決
這里先介紹下什么是異步等待:
異步等待是一種非阻塞的資源競爭處理機制,其核心特點是:當資源不可立即獲取時,不會阻塞當前線程,而是通過事件監聽或回調機制,在資源可用時自動恢復執行。這種模式在分布式系統和高并發場景中至關重要。
比如在解決不可重試問題時,使用異步等待,如果不成功,不會陷入阻塞,而是取執行查看訂閱信息頻道,以此進行超時判斷或者重試
1.回到Redission的源碼,找到tryAcquireAsync方法
leaseTime代表著等待超時時間,如果設置了等待超時時間則用自己的超時時間,不然啟用看門狗機制自動續約(這個后面會講),我們現在主要就重試問題進行分析。
在重入問題中,我們講過如果獲取到了鎖,就返回null,沒有獲取到鎖,返回當前線程的剩余時間
這個方法中返回的結果記錄為ttlRemainingFuture
如果為ttl為null的話記錄等待超時時間或啟用看門狗機制,否則繼續返回剩余時間。
下面是完整代碼:
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f); }
2.ttl值通過get()同步傳遞到tryAcquire方法,trylock方法通過調用tryAcquire執行重試機制。
這邊注意不是lock方法是trylock方法。
接下來我們來到trylock方法,傳入等待重試時間(waittime),等待超時時間(鎖持有時間leaseTime),還有時間單位。
ttl(當前鎖的剩余時間,也就是鎖到什么時候過期)時間由tryAcquire方法獲取,底層邏輯就是獲取鎖的那段lua腳本
如果ttl為null的話,表示成功獲取到鎖,直接返回
否則看是否超時,執行嘗試獲取鎖的線程后的時間(system.current)減去執行前的時間(current/waitime),如果小于零,表示超過等待重試時間,不再重試,直接返回
如果沒有拿到鎖并且還剩下時間,則向redis發送訂閱消息,監聽該鎖的釋放事件,并等待訂閱完成(讓線程進入阻塞等待狀態)
做完后在檢測waitTime剩余時間,如果小于零,不在重試,直接返回
否則進入while循環,做4件事
1.嘗試獲取鎖
2.查看waitTime剩余時間,判斷是否超時
3.確定最長阻塞等待時間,這里的if對應兩種情況
- ttl(鎖的剩余時間)在waitTime剩余時間之內,阻塞等待最長時間設置為ttl
- 如果不在,則阻塞等待最長時間設置為waitTime剩余時間(超出時間取消訂閱)
然后執行阻塞等待,期間可能被兩種事件喚醒:
- 超時通知(到達時間超過ttl或者time的時間限制)
- 鎖釋放通知(持有鎖線程通過Pubsub(訂閱信息通道)發送信息)
4.判斷剩余時間
最后while循環結束不管沒有沒拿到鎖,都要去取消訂閱
最后流程圖:
總結下:
Redission利用信號量和PubSub功能實現等待、喚醒,獲取鎖失敗的重試機制
5.3?Watchdog機制解決超時釋放問題
什么是WatchDog機制?
Redisson的看門狗機制(Watchdog)是一種用于自動續期分布式鎖的機制,主要解決客戶端在持有鎖期間因業務執行時間過長導致鎖超時釋放的問題。該機制通過后臺線程定期檢查并延長鎖的持有時間,確保鎖的安全性。
一句話總結,Watchdog機制通過一種定時續約邏輯來解決分布式鎖過期的問題。
來到Redission源碼,一樣是在RedissionLock類中,回到tryAcquireOnceAsync方法,之前我們在講解決不可重試問題時講過,鎖的等待超時(也就是過期時間leaseTime)可以設可以不設,如果設置的話,就使用自己設置的過期時間,如果沒有設置過期時間,就走看門狗(watchdog)機制。
我們看到如果沒有設置leaseTime,源碼傳達的時nternalLockLeaseTime,它的初始化是這樣的:
進入getLockWatchdogTimeout()方法
發現return 了一個lockWatchdogTimeout,這個一個long類型的變量,值為30 * 1000
由此我們知道,采用看門狗機制它的初始鎖的超時釋放時間設置的是30s,那么他是如何做到自動續時的呢,我們往后看。
可以看到,紅框部分同樣有個判斷,如果設置了leaseTime,則將其轉換成毫秒單元。如果沒有,則走scheduleExpirationRenewal()方法
進入scheduleExpirationRenewal()方法
當線程成功獲取鎖后,會調用此方法,執行以下邏輯:
1.嘗試為當前鎖新建一個續期條目,將當前線程ID加入其中
2.判斷該鎖是否已經有續期條目(確保同一個鎖的多個線程共享同一個續期任務,防止重復續期)
3.如果已經存在,只需要將當前線程ID添加到該鎖現有條目中
4.如果是新條目,執行續期任務(renewExpiration())
5.最后,如果線程被中斷,取消續期
換言之,對于這個方法,他主要做兩件事:
- 對已有續期條目的鎖,給新的線程ID添加進去(續期任務已開啟,滿足相應條件自動續期)
- 對沒有續期條目的鎖,創建新的續期條目,開啟續期任務
那么續期任務是具體怎么執行的呢?我們打開rennewExpiration()方法
1.獲取當前的續期條目,如果沒有,直接返回(代表著鎖被釋放或者未初始化)
2.創建一個定時任務
其中internalLockLeaseTime默認為30s,也就是每10s默認執行一次定時任務
3.進入到定時任務的具體邏輯中,二次檢查是否存在相應的續約條目和線程
4.調用renewExpirationAsync方法,更新鎖的過期時間,執行續約,返回一個boolean值
下面是renewExpirationAsync具體邏輯:
可以看到執行lua腳本實現過期時間的更新。
5.先是進行異常判斷,然后根據傳遞的boolean值(lua腳本的執行結果)判斷是否進行遞歸(繼續每10s進行續約)還是取消續約任務。
具體執行流程:
5.4?multiLock解決主從一致性問題
原有鎖存在問題:
redis集群主節點獲得鎖后可能立即宕機,沒有及時給數據同步從節點
解決方法:
設立多個獨立的Redis節點,必須在所有節點都獲取重入鎖,才算獲取鎖成功
這樣做,此時就算假設此時有一個主節點宕機,其他線程趁虛而入獲得那個節點的鎖,只要沒有獲得其他所有主節點的鎖,也是獲取失敗的。
最后:
今天的分享就到這里。如果我的內容對你有幫助,請點贊,評論,收藏。創作不易,大家的支持就是我堅持下去的動力!(?`・?・′?)