5.1 前文鎖問題
基于 setnx 實現的分布式鎖存在下面的問題:
重入問題:重入問題是指 獲得鎖的線程可以再次進入到相同的鎖的代碼塊中,可重入鎖的意義在于防止死鎖,比如 HashTable 這樣的代碼中,他的方法都是使用 synchronized 修飾的,假如他在一個方法內,調用另一個方法,那么此時如果是不可重入的,不就死鎖了嗎?所以可重入鎖他的主要意義是防止死鎖,我們的 synchronized 和 Lock 鎖都是可重入的。
不可重試:是指目前的分布式只能嘗試一次,我們認為合理的情況是:當線程在獲得鎖失敗后,他應該能再次嘗試獲得鎖。
超時釋放:我們在加鎖時增加了過期時間,這樣的我們可以防止死鎖,但是如果卡頓的時間超長,雖然我們采用了 lua 表達式防止刪鎖的時候,誤刪別人的鎖,但是畢竟沒有鎖住,有安全隱患
主從一致性:?如果 Redis 提供了主從集群,當我們向集群寫數據時,主機需要異步的將數據同步給從機,而萬一在同步過去之前,主機宕機了,就會出現死鎖問題。
5.2 Redission
Redisson 是一個在 Redis 的基礎上實現的 Java 駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的 Java 常用對象,還提供了許多分布式服務,其中就包含了各種分布式鎖的實現。
Redission 提供了分布式鎖的多種多樣的功能
5.2.1 快速入門
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
@Configuration
public?class?RedissonConfig?{@Beanpublic?RedissonClient?redissonClient(){//?配置Config?config?=?new?Config();config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");//?創建RedissonClient對象return?Redisson.create(config);}
}
@Resource
private?RedissionClient?redissonClient;@Test
void?testRedisson()?throws?Exception{//獲取鎖(可重入),指定鎖的名稱RLock?lock?=?redissonClient.getLock("anyLock");//嘗試獲取鎖,參數分別是:獲取鎖的最大等待時間(期間會重試),鎖自動釋放時間,時間單位boolean?isLock?=?lock.tryLock(1,10,TimeUnit.SECONDS);//判斷獲取鎖成功if(isLock){try{System.out.println("執行業務");}finally{//釋放鎖lock.unlock();}}}
5.2.2 注入 RedissonClient
@Resource
private?RedissonClient?redissonClient;@Override
public?Result?seckillVoucher(Long?voucherId)?{//?1.查詢優惠券SeckillVoucher?voucher?=?seckillVoucherService.getById(voucherId);//?2.判斷秒殺是否開始if?(voucher.getBeginTime().isAfter(LocalDateTime.now()))?{//?尚未開始return?Result.fail("秒殺尚未開始!");}//?3.判斷秒殺是否已經結束if?(voucher.getEndTime().isBefore(LocalDateTime.now()))?{//?尚未開始return?Result.fail("秒殺已經結束!");}//?4.判斷庫存是否充足if?(voucher.getStock()?<?1)?{//?庫存不足return?Result.fail("庫存不足!");}Long?userId?=?UserHolder.getUser().getId();//創建鎖對象?這個代碼不用了,因為我們現在要使用分布式鎖//SimpleRedisLock?lock?=?new?SimpleRedisLock("order:"?+?userId,?stringRedisTemplate);RLock?lock?=?redissonClient.getLock("lock:order:"?+?userId);//獲取鎖對象boolean?isLock?=?lock.tryLock();//加鎖失敗if?(!isLock)?{return?Result.fail("不允許重復下單");}try?{//獲取代理對象(事務)IVoucherOrderService?proxy?=?(IVoucherOrderService)?AopContext.currentProxy();return?proxy.createVoucherOrder(voucherId);}?finally?{//釋放鎖lock.unlock();}}
5.3 可重入原理
- 在 Lock 鎖中,他是借助于底層的一個 voaltile 的一個 state 變量來記錄重入的狀態的
- 比如當前沒有人持有這把鎖,那么 state=0
- 假如有人持有這把鎖,那么 state=1
- 如果持有這把鎖的人再次持有這把鎖
- 那么 state 就會 +1
- 如果是對于 synchronized 而言,他在 c 語言代碼中會有一個 count
- 原理和 state 類似,也是重入一次就加一,釋放一次就 -1,直到減少成 0 時,表示當前這把鎖沒有被人持有。
- 在 redission 中,我們的也支持支持可重入鎖
- 在分布式鎖中,他采用 hash 結構用來存儲鎖
- 其中大 key 表示表示這把鎖是否存在
- 用小 key 表示當前這把鎖被哪個線程持有
- 所以接下來我們一起分析一下當前的這個 lua 表達式
- 這個地方一共有 3 個參數
- KEYS[1]:鎖名稱
- ARGV[1]:鎖失效時間
- ARGV[2]: id + ":" + threadId; 鎖的小 key
- exists: 判斷數據是否存在 name:是 lock 是否存在,如果==0,就表示當前這把鎖不存在
- redis.call('hset', KEYS[1], ARGV[2], 1); 此時他就開始往 redis 里邊去寫數據,寫成一個 hash 結構
Lock{id?+?":"?+?threadId:?1}
- 如果當前這把鎖存在,則第一個條件不滿足,再判斷
- redis.call('hexists', KEYS[1], ARGV[2]) == 1
- 此時需要通過大 key+ 小 key 判斷當前這把鎖是否是屬于自己的,如果是自己的,則進行
- redis.call('hincrby', KEYS[1], ARGV[2], 1)
- 將當前這個鎖的 value 進行 +1,redis.call('pexpire', KEYS[1], ARGV[1])
- 然后再對其設置過期時間,如果以上兩個條件都不滿足,則表示當前這把鎖搶鎖失敗,最后返回 pttl,即為當前這把鎖的失效時間
- 如果小伙幫們看了前邊的源碼,你會發現他會去判斷當前這個方法的返回值是否為 null,如果是 null,則對應則前兩個 if 對應的條件,退出搶鎖邏輯,如果返回的不是 null,即走了第三個分支,在源碼處會進行 while(true) 的自旋搶鎖。
5.4 重試和 WatchDog
搶鎖過程中,獲得當前線程,通過 tryAcquire 進行搶鎖,該搶鎖邏輯和之前邏輯相同
- 1.?先判斷當前這把鎖是否存在,如果不存在,插入一把鎖,返回 null
- 2.?判斷當前這把鎖是否是屬于當前線程,如果是,則返回 null
所以如果返回是 null,則代表著當前這哥們已經搶鎖完畢,或者可重入完畢,但是如果以上兩個條件都不滿足,則進入到第三個條件,返回的是鎖的失效時間,同學們可以自行往下翻一點點,你能發現有個 while(true) 再次進行 tryAcquire 進行搶鎖
long threadId = Thread.currentThread().getId();
Long?ttl?=?tryAcquire(-1,?leaseTime,?unit,?threadId);
//?lock?acquired
if?(ttl?==?null)?{return;
}
接下來會有一個條件分支,因為 lock 方法有重載方法,一個是帶參數,一個是不帶參數,如果帶帶參數傳入的值是 -1,如果傳入參數,則 leaseTime 是他本身,所以如果傳入了參數,此時 leaseTime!= -1 則會進去搶鎖,搶鎖的邏輯就是之前說的那三個邏輯
if?(leaseTime?!=?-1)?{return?tryLockInnerAsync(waitTime,?leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
}
如果是沒有傳入時間,則此時也會進行搶鎖,而且搶鎖時間是默認看門狗時間 commandExecutor.getConnectionManager()。getCfg().getLockWatchdogTimeout()
ttlRemainingFuture.onComplete((ttlRemaining, e) 這句話相當于對以上搶鎖進行了監聽,也就是說當上邊搶鎖完畢后,此方法會被調用,具體調用的邏輯就是去后臺開啟一個線程,進行續約邏輯,也就是看門狗線程
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining,?e)?->?{if?(e?!=?null)?{return;}//?lock?acquiredif?(ttlRemaining?==?null)?{scheduleExpirationRenewal(threadId);}
});
return ttlRemainingFuture;
此邏輯就是續約邏輯,注意看 commandExecutor.getConnectionManager()。newTimeout()此方法
Method(**new** TimerTask() {},參數 2,參數 3)
指的是:通過參數 2,參數 3 去描述什么時候去做參數 1 的事情,現在的情況是:10s 之后去做參數一的事情
因為鎖的失效時間是 30s,當 10s 之后,此時這個 timeTask 就觸發了,他就去進行續約,把當前這把鎖續約成 30s,如果操作成功,那么此時就會遞歸調用自己,再重新設置一個 timeTask(),于是再過 10s 后又再設置一個 timerTask,完成不停的續約
那么大家可以想一想,假設我們的線程出現了宕機他還會續約嗎?當然不會,因為沒有人再去調用 renewExpiration 這個方法,所以等到時間之后自然就釋放了。
private?void?renewExpiration()?{ExpirationEntry?ee?=?EXPIRATION_RENEWAL_MAP.get(getEntryName());if?(ee?==?null)?{return;}Timeout?task?=?commandExecutor.getConnectionManager().newTimeout(new?TimerTask()?{@Overridepublic?void?run(Timeout?timeout)?throws?Exception?{ExpirationEntry?ent?=?EXPIRATION_RENEWAL_MAP.get(getEntryName());if?(ent?==?null)?{return;}Long?threadId?=?ent.getFirstThreadId();if?(threadId?==?null)?{return;}RFuture<Boolean>?future?=?renewExpirationAsync(threadId);future.onComplete((res,?e)?->?{if?(e?!=?null)?{log.error("Can't?update?lock?"?+?getName()?+?"?expiration",?e);return;}if?(res)?{//?reschedule?itselfrenewExpiration();}});}},?internalLockLeaseTime?/?3,?TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
5.5 MutiLock 原理
為了提高 redis 的可用性,我們會搭建集群或者主從,現在以主從為例
此時我們去寫命令,寫在主機上,主機會將數據同步給從機,但是假設在主機還沒有來得及把數據寫入到從機去的時候,此時主機宕機,哨兵會發現主機宕機,并且選舉一個 slave 變成 master,而此時新的 master 中實際上并沒有鎖信息,此時鎖信息就已經丟掉了。
了解決這個問題,redission 提出來了 MutiLock 鎖,使用這把鎖咱們就不使用主從了,每個節點的地位都是一樣的,這把鎖加鎖的邏輯需要寫入到每一個主叢節點上,只有所有的服務器都寫入成功,此時才是加鎖成功,假設現在某個節點掛了,那么他去獲得鎖的時候,只要有一個節點拿不到,都不能算是加鎖成功,就保證了加鎖的可靠性。
那么 MutiLock 加鎖原理是什么呢?筆者畫了一幅圖來說明
當我們去設置了多個鎖時,redission 會將多個鎖添加到一個集合中,然后用 while 循環去不停去嘗試拿鎖,但是會有一個總共的加鎖時間,這個時間是用需要加鎖的個數 \* 1500ms,假設有 3 個鎖,那么時間就是 4500ms,假設在這 4500ms 內,所有的鎖都加鎖成功,那么此時才算是加鎖成功,如果在 4500ms 有線程加鎖失敗,則會再次去進行重試。