本文從一個經典的庫存超賣問題分析說明常見鎖的應用,假設庫存資源存儲在Redis里面。
假設我們的減庫存代碼如下:
@Autowired
StringRedisTemplate redisTemplate;public void deduct(){String stock = redisTemplate.opsForValue().get("stock");if(StringUtils.hasLength(stock)){Integer st = Integer.valueOf(stock);if(st>0){redisTemplate.opsForValue().set("stock",String.valueOf(--st));}}
}
此時方法操作是先讀后寫
,非原子性操作,是存在并發問題的。如何解決該問題,有三種方案:
- JVM本地鎖
- Redis樂觀鎖
- Redis實現分布式鎖
JVM本地鎖的實現與優缺點在從庫存超賣問題分析鎖和分布式鎖的應用(一)已經分析過了,這里不再贅述。
【1】Redis樂觀鎖
也就是watch
、multi
與exec
組合指令的使用。
watch可以監控一個或多個key的值,如果在事務(exec)執行之前,key的值發生變化則取消事務執行。
multi用來開啟事務,exec用來提交/執行事務。
watch stock
multi
set stock 5000
exec
代碼修改如下:
public void deduct(){this.redisTemplate.execute(new SessionCallback() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {operations.watch("stock");// 1. 查詢庫存信息Object stock = operations.opsForValue().get("stock");// 2. 判斷庫存是否充足int st = 0;if (stock != null && (st = Integer.parseInt(stock.toString())) > 0) {// 3. 扣減庫存operations.multi();//開啟事務operations.opsForValue().set("stock", String.valueOf(--st));List exec = operations.exec();//執行事務if (exec == null || exec.size() == 0) {try {// 這里睡眠一下,降低競爭,提高樂觀鎖的吞吐量Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}//再次遞歸deduct();}return exec;}return null;}});
}
這種方式確實可以解決并發問題,但也可能在高并發的情況下由于不斷重試(CAS思想)出現性能問題、連接被耗盡的情況。
【2】Redis分布式鎖
① 基于setnx思想簡單實現
借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同時有多個客戶端發送setnx命令,只有一個客戶端可以成功,返回1(true);其他的客戶端返回0(false)。
// 遞歸思想
public void deduct(){//獲取鎖Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1111");//如果獲取不到則遞歸重試if(!lock){deduct();}else{try{String stock = redisTemplate.opsForValue().get("stock");if(StringUtils.hasLength(stock)){Integer st = Integer.valueOf(stock);if(st>0){redisTemplate.opsForValue().set("stock",String.valueOf(--st));}}}finally {//釋放鎖redisTemplate.delete("lock");}}}
或者使用while思想:
public void deduct(){
//當setnx剛剛獲取到鎖,當前服務器宕機,導致del釋放鎖無法執行,進而導致鎖無法鎖無法釋放(死鎖)while (Boolean.FALSE.equals(this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx"))){try {Thread.sleep(100);}catch (Exception e){e.printStackTrace();}}try{String stock = redisTemplate.opsForValue().get("stock");if(StringUtils.hasLength(stock)){Integer st = Integer.valueOf(stock);if(st>0){redisTemplate.opsForValue().set("stock",String.valueOf(--st));}}}finally {//釋放鎖redisTemplate.delete("lock");}}
這種方式存在問題:當setnx剛剛獲取到鎖,當前服務器宕機,導致del釋放鎖無法執行,進而導致鎖無法鎖無法釋放(死鎖)
解決方案:給鎖設置過期時間,自動釋放鎖。
設置過期時間兩種方式:
- 通過expire設置過期時間(缺乏原子性:如果在setnx和expire之間出現異常,鎖也無法釋放)
- 使用set指令設置過期時間:
set key value ex 3 nx
(既達到setnx的效果,又設置了過期時間)
② 防死鎖優化
修改while中獲取鎖的邏輯如下所示:
while (Boolean.FALSE.equals(this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx",3, TimeUnit.SECONDS)){try {Thread.sleep(100);}catch (Exception e){e.printStackTrace();}
}
這種方式解決了死鎖問題但是可能會釋放其他服務器的鎖。
場景:如果業務邏輯的執行時間是7s。執行流程如下
- index1業務邏輯沒執行完,3秒后鎖被自動釋放。
- index2獲取到鎖,執行業務邏輯,3秒后鎖被自動釋放。
- index3獲取到鎖,執行業務邏輯
- index1業務邏輯執行完成,開始調用del釋放鎖,這時釋放的是index3的鎖,導致index3的業務只
執行1s就被別人釋放。最終等于沒鎖的情況。
解決:setnx獲取鎖時,設置一個指定的唯一值(例如:uuid);釋放前獲取這個值,判斷是否自己的鎖
③ 防誤刪優化
如下這里設置鎖的密鑰為UUID,加鎖者持有。
public void deduct(){String uuid = UUID.randomUUID().toString();while (Boolean.FALSE.equals(this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){try {Thread.sleep(100);}catch (Exception e){e.printStackTrace();}}try{String stock = redisTemplate.opsForValue().get("stock");if(StringUtils.hasLength(stock)){Integer st = Integer.valueOf(stock);if(st>0){redisTemplate.opsForValue().set("stock",String.valueOf(--st));}}}finally {//釋放鎖if(uuid.equals(redisTemplate.opsForValue().get("lock"))){redisTemplate.delete("lock");}}
}
這種方式仍舊存在問題:刪除操作缺乏原子性。
場景:
- index1執行刪除時,查詢到的lock值確實和uuid相等
- index1執行刪除前,lock剛好過期時間已到,被redis自動釋放
- index2獲取了lock
- index1執行刪除,此時會把index2的lock刪除
解決方案:沒有一個命令可以同時做到判斷 + 刪除,所有只能通過其他方式實現(LUA腳本)
④ lua腳本保證刪除原子性
redis采用單線程架構,可以保證單個命令的原子性,但是無法保證一組命令在高并發場景下的原子性。
如下AB兩個進程示例:
在串行場景下:A和B的值肯定都是3。在并發場景下:A和B的值可能在0-6之間。
極限情況下1:則A的結果是0,B的結果是3
極限情況下2:則A和B的結果都是6
如果redis客戶端通過lua腳本把3個命令一次性發送給redis服務器,那么這三個指令就不會被其他客戶端指令打斷。Redis 也保證腳本會以原子性(atomic)的方式執行: 當某個腳本正在運行的時候,不會有其他腳本或 Redis 命令被執行。 這和使用 MULTI/ EXEC 包圍的事務很類似。
但是MULTI/ EXEC方法來使用事務功能,將一組命令打包執行,無法進行業務邏輯的操作。這期間有某一條命令執行報錯(例如給字符串自增),其他的命令還是會執行,并不會回滾。
優化代碼如下所示:
public void deduct(){String uuid = UUID.randomUUID().toString();while (Boolean.FALSE.equals(this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS))){try {Thread.sleep(100);}catch (Exception e){e.printStackTrace();}}try{String stock = redisTemplate.opsForValue().get("stock");if(StringUtils.hasLength(stock)){int st = Integer.parseInt(stock);if(st>0){redisTemplate.opsForValue().set("stock",String.valueOf(--st));}}}finally {// 先判斷是否自己的鎖,再解鎖String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList("lock"), uuid);
// //釋放鎖
// if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
// redisTemplate.delete("lock");
// }}}
到這里似乎完美解決了我們考慮到的幾點問題,那么結束了嗎?
并沒有,目前這種方式不支持可重入性、并且集群環境下也存在失效情況
。更甚者如果由于異常情況,獲取鎖后服務邏輯未執行完畢,鎖就自動釋放了呢
?