2. 基于redis實現分布式鎖
2.1. 基本實現
????????借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同時有多個客戶端發送setnx命令,只有一個客戶端可以成功,返回1(true);其他的客戶端返回0(false)。
-
多個客戶端同時獲取鎖(setnx)
-
獲取成功,執行業務邏輯,執行完成釋放鎖(del)
-
其他客戶端等待重試
改造StockService方法:
@Service
public class StockService {@Autowiredprivate StockMapper stockMapper;@Autowiredprivate StringRedisTemplate redisTemplate;public void deduct() {// 加鎖setnxBoolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111");// 重試:遞歸調用if (!lock){try {Thread.sleep(50);this.deduct();} catch (InterruptedException e) {e.printStackTrace();}} else {try {// 1. 查詢庫存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判斷庫存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣減庫存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {// 解鎖this.redisTemplate.delete("lock");}}}
}
其中,加鎖也可以使用循環:
// 加鎖,獲取鎖失敗重試
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "111")){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}
}
解鎖:
// 釋放鎖
this.redisTemplate.delete("lock");
使用Jmeter壓力測試如下:
2.2. 防死鎖
問題:setnx剛剛獲取到鎖,當前服務器宕機,導致del釋放鎖無法執行,進而導致鎖無法鎖無法釋放(死鎖)
解決:給鎖設置過期時間,自動釋放鎖。
設置過期時間兩種方式:
-
通過expire設置過期時間(缺乏原子性:如果在setnx和expire之間出現異常,鎖也無法釋放)
-
使用set指令設置過期時間:set key value ex 3 nx(既達到setnx的效果,又設置了過期時間)
壓力測試肯定也沒有問題。
2.3. 防誤刪
問題:可能會釋放其他服務器的鎖。
場景:如果業務邏輯的執行時間是7s。執行流程如下
-
index1業務邏輯沒執行完,3秒后鎖被自動釋放。
-
index2獲取到鎖,執行業務邏輯,3秒后鎖被自動釋放。
-
index3獲取到鎖,執行業務邏輯
-
index1業務邏輯執行完成,開始調用del釋放鎖,這時釋放的是index3的鎖,導致index3的業務只執行1s就被別人釋放。
最終等于沒鎖的情況。
解決:setnx獲取鎖時,設置一個指定的唯一值(例如:uuid);釋放前獲取這個值,判斷是否自己的鎖
實現如下:
問題:刪除操作缺乏原子性。
場景:
-
index1執行刪除時,查詢到的lock值確實和uuid相等
-
index1執行刪除前,lock剛好過期時間已到,被redis自動釋放
-
index2獲取了lock
-
index1執行刪除,此時會把index2的lock刪除
解決方案:沒有一個命令可以同時做到判斷 + 刪除,所有只能通過其他方式實現(LUA腳本)
2.4. redis中的lua腳本
2.4.1. 現實問題
????????redis采用單線程架構,可以保證單個命令的原子性,但是無法保證一組命令在高并發場景下的原子性。例如:
在串行場景下:A和B的值肯定都是3
在并發場景下:A和B的值可能在0-6之間。
極限情況下1:
則A的結果是0,B的結果是3
極限情況下2:
則A和B的結果都是6
????????如果redis客戶端通過lua腳本把3個命令一次性發送給redis服務器,那么這三個指令就不會被其他客戶端指令打斷。Redis 也保證腳本會以原子性(atomic)的方式執行: 當某個腳本正在運行的時候,不會有其他腳本或 Redis 命令被執行。 這和使用 MULTI/ EXEC 包圍的事務很類似。
????????但是MULTI/ EXEC方法來使用事務功能,將一組命令打包執行,無法進行業務邏輯的操作。這期間有某一條命令執行報錯(例如給字符串自增),其他的命令還是會執行,并不會回滾。
2.4.2. lua介紹
????????Lua 是一種輕量小巧的腳本語言,用標準C語言編寫并以源代碼形式開放, 其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。
設計目的
????????其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。
Lua 特性
-
輕量級:它用標準C語言編寫并以源代碼形式開放,編譯后僅僅一百余K,可以很方便的嵌入別的程序里。
-
可擴展:Lua提供了非常易于使用的擴展接口和機制:由宿主語言(通常是C或C++)提供這些功能,Lua可以使用它們,就像是本來就內置的功能一樣。
-
其它特性:
-
支持面向過程(procedure-oriented)編程和函數式編程(functional programming);
-
自動內存管理;只提供了一種通用類型的表(table),用它可以實現數組,哈希表,集合,對象;
-
語言內置模式匹配;閉包(closure);函數也可以看做一個值;提供多線程(協同進程,并非操作系統所支持的線程)支持;
-
通過閉包和table可以很方便地支持面向對象編程所需要的一些關鍵機制,比如數據抽象,虛函數,繼承和重載等。
-
2.4.3. lua基本語法
????????對lua腳本感興趣的同學,請移步到官方教程或者《菜鳥教程》。這里僅以redis中可能會用到的部分語法作介紹。
a = 5 -- 全局變量
local b = 5 -- 局部變量, redis只支持局部變量
a, b = 10, 2*x -- 等價于 a=10; b=2*x
流程控制:
if( 布爾表達式 1)
then--[ 在布爾表達式 1 為 true 時執行該語句塊 --]
elseif( 布爾表達式 2)
then--[ 在布爾表達式 2 為 true 時執行該語句塊 --]
else --[ 如果以上布爾表達式都不為 true 則執行該語句塊 --]
end
2.4.4. redis執行lua腳本 - EVAL指令
在redis中需要通過eval命令執行lua腳本。
格式:
EVAL script numkeys key [key ...] arg [arg ...]
script:lua腳本字符串,這段Lua腳本不需要(也不應該)定義函數。
numkeys:lua腳本中KEYS數組的大小
key [key ...]:KEYS數組中的元素
arg [arg ...]:ARGV數組中的元素
案例1:基本案例
EVAL "return 10" 0
輸出:(integer) 10
案例2:動態傳參
EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 10 20 30 40 50 60 70 80 90
# 輸出:10 20 60 70EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20
# 輸出:0EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10
# 輸出:1
????????傳入了兩個參數10和20,KEYS的長度是1,所以KEYS中有一個元素10,剩余的一個20就是ARGV數組的元素。
????????redis.call()中的redis是redis中提供的lua腳本類庫,僅在redis環境中可以使用該類庫。
案例3:執行redis類庫方法
set aaa 10 -- 設置一個aaa值為10
EVAL "return redis.call('get', 'aaa')" 0
# 通過return把call方法返回給redis客戶端,打印:"10"
????????注意:腳本里使用的所有鍵都應該由 KEYS 數組來傳遞。但并不是強制性的,代價是這樣寫出的腳本不能被 Redis 集群所兼容。
案例4:給redis類庫方法動態傳參
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 bbb 20
學到這里基本可以應付redis分布式鎖所需要的腳本知識了。
案例5:pcall函數的使用(了解)
-- 當call() 在執行命令的過程中發生錯誤時,腳本會停止執行,并返回一個腳本錯誤,輸出錯誤信息
EVAL "return redis.call('sets', KEYS[1], ARGV[1]), redis.call('set', KEYS[2], ARGV[2])" 2 bbb ccc 20 30
-- pcall函數不影響后續指令的執行
EVAL "return redis.pcall('sets', KEYS[1], ARGV[1]), redis.pcall('set', KEYS[2], ARGV[2])" 2 bbb ccc 20 30
注意:set方法寫成了sets,肯定會報錯。
2.5. 使用lua保證刪除原子性
刪除LUA腳本:
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
代碼實現:
public void deduct() {String uuid = UUID.randomUUID().toString();// 加鎖setnxwhile (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS)) {// 重試:循環try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}try {// this.redisTemplate.expire("lock", 3, TimeUnit.SECONDS);// 1. 查詢庫存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判斷庫存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣減庫存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {// 先判斷是否自己的鎖,再解鎖String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid);}
}
壓力測試,庫存量也沒有問題,截圖略過。。。
2.6. 可重入鎖
????????由于上述加鎖命令使用了 SETNX ,一旦鍵存在就無法再設置成功,這就導致后續同一線程內繼續加鎖,將會加鎖失敗。當一個線程執行一段代碼成功獲取鎖之后,繼續執行時,又遇到加鎖的子任務代碼,可重入性就保證線程能繼續執行,而不可重入就是需要等待鎖釋放之后,再次獲取鎖成功,才能繼續往下執行。
用一段 Java 代碼解釋可重入:
public synchronized void a() {b();
}public synchronized void b() {// pass
}
????????假設 X 線程在 a 方法獲取鎖之后,繼續執行 b 方法,如果此時不可重入,線程就必須等待鎖釋放,再次爭搶鎖。
????????鎖明明是被 X 線程擁有,卻還需要等待自己釋放鎖,然后再去搶鎖,這看起來就很奇怪,我釋放我自己~
????????可重入性就可以解決這個尷尬的問題,當線程擁有鎖之后,往后再遇到加鎖方法,直接將加鎖次數加 1,然后再執行方法邏輯。退出加鎖方法之后,加鎖次數再減 1,當加鎖次數為 0 時,鎖才被真正的釋放。
????????可以看到可重入鎖最大特性就是計數,計算加鎖的次數。所以當可重入鎖需要在分布式環境實現時,我們也就需要統計加鎖次數。
解決方案:redis + Hash
2.6.1. 加鎖腳本
????????Redis 提供了 Hash (哈希表)這種可以存儲鍵值對數據結構。所以我們可以使用 Redis Hash 存儲的鎖的重入次數,然后利用 lua 腳本判斷邏輯。
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)
thenredis.call('hincrby', KEYS[1], ARGV[1], 1);redis.call('expire', KEYS[1], ARGV[2]);return 1;
elsereturn 0;
end
假設值為:KEYS:[lock], ARGV[uuid, expire]
????????如果鎖不存在或者這是自己的鎖,就通過hincrby(不存在就新增并加1,存在就加1)獲取鎖或者鎖次數加1。
2.6.2. 解鎖腳本
-- 判斷 hash set 可重入 key 的值是否等于 0
-- 如果為 nil 代表 自己的鎖已不存在,在嘗試解其他線程的鎖,解鎖失敗
-- 如果為 0 代表 可重入次數被減 1
-- 如果為 1 代表 該可重入 key 解鎖成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return nil;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0;
else redis.call('del', KEYS[1]); return 1;
end;
2.6.3. 代碼實現
由于加解鎖代碼量相對較多,這里可以封裝成一個工具類:
DistributedLockClient工廠類具體實現:
@Component
public class DistributedLockClient {@Autowiredprivate StringRedisTemplate redisTemplate;private String uuid;public DistributedLockClient() {this.uuid = UUID.randomUUID().toString();}public DistributedRedisLock getRedisLock(String lockName){return new DistributedRedisLock(redisTemplate, lockName, uuid);}
}
DistributedRedisLock實現如下:
public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid;}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加鎖方法* @param time* @param unit* @return* @throws InterruptedException*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){Thread.sleep(50);}return true;}/*** 解鎖方法*/@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getId());if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}/*** 給線程拼接唯一標識* @return*/String getId(){return uuid + ":" + Thread.currentThread().getId();}
}
2.6.4. 使用及測試
在業務代碼中使用:
public void deduct() {DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock");redisLock.lock();try {// 1. 查詢庫存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判斷庫存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣減庫存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {redisLock.unlock();}
}
測試:
測試可重入性:
2.7. 自動續期
lua腳本:
if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1;
else return 0;
end
在RedisDistributeLock中添加renewExpire方法:
public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid + ":" + Thread.currentThread().getId();}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加鎖方法* @param time* @param unit* @return* @throws InterruptedException*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))){Thread.sleep(50);}// 加鎖成功,返回之前,開啟定時器自動續期this.renewExpire();return true;}/*** 解鎖方法*/@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}// String getId(){// return this.uuid + ":" + Thread.currentThread().getId();// }private void renewExpire(){String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" return redis.call('expire', KEYS[1], ARGV[2]) " +"else " +" return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {renewExpire();}}}, this.expire * 1000 / 3);}
}
在tryLock方法中使用:
構造方法作如下修改:
解鎖方法作如下修改:
2.8. 手寫分步式鎖小結
特征:
-
獨占排他:setnx
-
防死鎖:
redis客戶端程序獲取到鎖之后,立馬宕機。給鎖添加過期時間
不可重入:可重入
-
防誤刪:
先判斷是否自己的鎖才能刪除
-
原子性:
加鎖和過期時間之間:set k v ex 3 nx
判斷和釋放鎖之間:lua腳本
-
可重入性:hash(key field value) + lua腳本
-
自動續期:Timer定時器 + lua腳本
-
在集群情況下,導致鎖機制失效:
-
客戶端程序10010,從主中獲取鎖
-
從還沒來得及同步數據,主掛了
-
于是從升級為主
-
客戶端程序10086就從新主中獲取到鎖,導致鎖機制失效
-
鎖操作:
加鎖:
-
setnx:獨占排他 死鎖、不可重入、原子性
-
set k v ex 30 nx:獨占排他、死鎖 不可重入
-
hash + lua腳本:可重入鎖
-
判斷鎖是否被占用(exists),如果沒有被占用則直接獲取鎖(hset/hincrby)并設置過期時間(expire)
-
如果鎖被占用,則判斷是否當前線程占用的(hexists),如果是則重入(hincrby)并重置過期時間(expire)
-
否則獲取鎖失敗,將來代碼中重試
-
-
Timer定時器 + lua腳本:實現鎖的自動續期
判斷鎖是否自己的鎖(hexists == 1),如果是自己的鎖則執行expire重置過期時間
解鎖
-
del:導致誤刪
-
先判斷再刪除同時保證原子性:lua腳本
-
hash + lua腳本:可重入 1. 判斷當前線程的鎖是否存在,不存在則返回nil,將來拋出異常
-
存在則直接減1(hincrby -1),判斷減1后的值是否為0,為0則釋放鎖(del),并返回1
-
不為0,則返回0
-
重試:遞歸 循環
2.9. 紅鎖算法
redis集群狀態下的問題:
-
客戶端A從master獲取到鎖
-
在master將鎖同步到slave之前,master宕掉了。
-
slave節點被晉級為master節點
-
客戶端B取得了同一個資源被客戶端A已經獲取到的另外一個鎖。
安全失效!
解決集群下鎖失效,參照redis官方網站針對redlock文檔:https://redis.io/docs/manual/patterns/distributed-locks/
????????在算法的分布式版本中,我們假設有N個Redis服務器。這些節點是完全獨立的,因此我們不使用復制或任何其他隱式協調系統。前幾節已經描述了如何在單個實例中安全地獲取和釋放鎖,在分布式鎖算法中,將使用相同的方法在單個實例中獲取和釋放鎖。將N設置為5是一個合理的值,因此需要在不同的計算機或虛擬機上運行5個Redis主服務器,確保它們以獨立的方式發生故障。
為了獲取鎖,客戶端執行以下操作:
-
客戶端以毫秒為單位獲取當前時間的時間戳,作為起始時間。
-
客戶端嘗試在所有N個實例中順序使用相同的鍵名、相同的隨機值來獲取鎖定。每個實例嘗試獲取鎖都需要時間,客戶端應該設置一個遠小于總鎖定時間的超時時間。例如,如果自動釋放時間為10秒,則嘗試獲取鎖的超時時間可能在5到50毫秒之間。這樣可以防止客戶端長時間與處于故障狀態的Redis節點進行通信:如果某個實例不可用,盡快嘗試與下一個實例進行通信。
-
客戶端獲取當前時間 減去在步驟1中獲得的起始時間,來計算獲取鎖所花費的時間。當且僅當客戶端能夠在大多數實例(至少3個)中獲取鎖時,并且獲取鎖所花費的總時間小于鎖有效時間,則認為已獲取鎖。
-
如果獲取了鎖,則將鎖有效時間減去 獲取鎖所花費的時間,如步驟3中所計算。
-
如果客戶端由于某種原因(無法鎖定N / 2 + 1個實例或有效時間為負)而未能獲得該鎖,它將嘗試解鎖所有實例(即使沒有鎖定成功的實例)。
????????每臺計算機都有一個本地時鐘,我們通常可以依靠不同的計算機來產生很小的時鐘漂移。只有在擁有鎖的客戶端將在鎖有效時間內(如步驟3中獲得的)減去一段時間(僅幾毫秒)的情況下終止工作,才能保證這一點。以補償進程之間的時鐘漂移
????????當客戶端無法獲取鎖時,它應該在隨機延遲后重試,以避免同時獲取同一資源的多個客戶端之間不同步(這可能會導致腦裂的情況:沒人勝)。同樣,客戶端在大多數Redis實例中嘗試獲取鎖的速度越快,出現裂腦情況(以及需要重試)的窗口就越小,因此理想情況下,客戶端應嘗試將SET命令發送到N個實例同時使用多路復用。
????????值得強調的是,對于未能獲得大多數鎖的客戶端,盡快釋放(部分)獲得的鎖有多么重要,這樣就不必等待鎖定期滿才能再次獲得鎖(但是,如果發生了網絡分區,并且客戶端不再能夠與Redis實例進行通信,則在等待密鑰到期時需要付出可用性損失)。
2.10. redisson中的分布式鎖
????????Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
官方文檔地址:Home · redisson/redisson Wiki · GitHub
2.10.1. 可重入鎖(Reentrant Lock)
????????基于Redis的Redisson分布式可重入鎖RLock
Java對象實現了java.util.concurrent.locks.Lock
接口。
????????大家都知道,如果負責儲存這個分布式鎖的Redisson節點宕機以后,而且這個鎖正好處于鎖住的狀態時,這個鎖會出現鎖死的狀態。為了避免這種情況的發生,Redisson內部提供了一個監控鎖的看門狗,它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期。默認情況下,看門狗檢查鎖的超時時間是30秒鐘,也可以通過修改Config.lockWatchdogTimeout
來另行指定。
????????RLock
對象完全符合Java的Lock規范。也就是說只有擁有鎖的進程才能解鎖,其他進程解鎖則會拋出IllegalMonitorStateException
錯誤。
????????另外Redisson還通過加鎖的方法提供了leaseTime
的參數來指定加鎖的時間。超過這個時間后鎖便自動解開了。
RLock lock = redisson.getLock("anyLock");
// 最常見的使用方法
lock.lock();// 加鎖以后10秒鐘自動解鎖
// 無需調用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {lock.unlock();}
}
-
引入依賴
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.2</version>
</dependency>
-
添加配置
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();// 可以用"rediss://"來啟用SSL連接config.useSingleServer().setAddress("redis://172.16.116.100:6379");return Redisson.create(config);}
}
-
代碼中使用
@Autowired
private RedissonClient redissonClient;public void checkAndLock() {// 加鎖,獲取鎖失敗重試RLock lock = this.redissonClient.getLock("lock");lock.lock();// 先查詢庫存是否充足Stock stock = this.stockMapper.selectById(1L);// 再減庫存if (stock != null && stock.getCount() > 0){stock.setCount(stock.getCount() - 1);this.stockMapper.updateById(stock);}// 釋放鎖lock.unlock();
}
-
壓力測試
性能跟我們手寫的區別不大。
數據庫也沒有問題
2.10.2. 公平鎖(Fair Lock)
????????基于Redis的Redisson分布式可重入公平鎖也是實現了java.util.concurrent.locks.Lock
接口的一種RLock
對象。同時還提供了異步(Async)、反射式(Reactive)和RxJava2標準的接口。它保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。所有請求線程會在一個隊列中排隊,當某個線程出現宕機時,Redisson會等待5秒后繼續下一個線程,也就是說如果前面有5個線程都處于等待狀態,那么后面的線程會等待至少25秒。
RLock fairLock = redisson.getFairLock("anyLock");
// 最常見的使用方法
fairLock.lock();// 10秒鐘以后自動解鎖
// 無需調用unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();
2.10.3. 聯鎖(MultiLock)
????????基于Redis的Redisson分布式聯鎖RedissonMultiLock對象可以將多個RLock
對象關聯為一個聯鎖,每個RLock
對象實例可以來自于不同的Redisson實例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 所有的鎖都上鎖成功才算成功。
lock.lock();
...
lock.unlock();
2.10.4. 紅鎖(RedLock)
????????基于Redis的Redisson紅鎖RedissonRedLock
對象實現了Redlock介紹的加鎖算法。該對象也可以用來將多個RLock
對象關聯為一個紅鎖,每個RLock
對象實例可以來自于不同的Redisson實例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 紅鎖在大部分節點上加鎖成功就算成功。
lock.lock();
...
lock.unlock();
2.10.5. 讀寫鎖(ReadWriteLock)
????????基于Redis的Redisson分布式可重入讀寫鎖RReadWriteLock Java對象實現了java.util.concurrent.locks.ReadWriteLock
接口。其中讀鎖和寫鎖都繼承了RLock接口。
????????分布式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處于加鎖狀態。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();// 10秒鐘以后自動解鎖
// 無需調用unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
添加StockController方法:
@GetMapping("test/read")
public String testRead(){String msg = stockService.testRead();return "測試讀";
}@GetMapping("test/write")
public String testWrite(){String msg = stockService.testWrite();return "測試寫";
}
添加StockService方法:
public String testRead() {RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");rwLock.readLock().lock(10, TimeUnit.SECONDS);System.out.println("測試讀鎖。。。。");// rwLock.readLock().unlock();return null;
}public String testWrite() {RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");rwLock.writeLock().lock(10, TimeUnit.SECONDS);System.out.println("測試寫鎖。。。。");// rwLock.writeLock().unlock();return null;
}
打開開兩個瀏覽器窗口測試:
-
同時訪問寫:一個寫完之后,等待一會兒(約10s),另一個寫開始
-
同時訪問讀:不用等待
-
先寫后讀:讀要等待(約10s)寫完成
-
先讀后寫:寫要等待(約10s)讀完成
2.10.6. 信號量(Semaphore)
????????基于Redis的Redisson的分布式信號量(Semaphore)Java對象RSemaphore
采用了與java.util.concurrent.Semaphore
相似的接口和用法。同時還提供了異步(Async)、反射式(Reactive)和RxJava2標準的接口。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.trySetPermits(3);
semaphore.acquire();
semaphore.release();
在StockController添加方法:
@GetMapping("test/semaphore")
public String testSemaphore(){this.stockService.testSemaphore();return "測試信號量";
}
在StockService添加方法:
public void testSemaphore() {RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");semaphore.trySetPermits(3);try {semaphore.acquire();TimeUnit.SECONDS.sleep(5);System.out.println(System.currentTimeMillis());semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}
}
添加測試用例:并發10次,循環一次
控制臺效果:
控制臺1:
1606960790234
1606960800337
1606960800443
1606960805248控制臺2:
1606960790328
1606960795332
1606960800245控制臺3:
1606960790433
1606960795238
1606960795437
由此可知:
1606960790秒有3次請求進來:每個控制臺各1次
1606960795秒有3次請求進來:控制臺2有1次,控制臺3有2次
1606960800秒有3次請求進來:控制臺1有2次,控制臺2有1次
1606960805秒有1次請求進來:控制臺1有1次
2.10.7. 閉鎖(CountDownLatch)
????????基于Redisson的Redisson分布式閉鎖(CountDownLatch)Java對象RCountDownLatch
采用了與java.util.concurrent.CountDownLatch
相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();// 在其他線程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
需要兩個方法:一個等待,一個計數countDown
給StockController添加測試方法:
@GetMapping("test/latch")
public String testLatch(){this.stockService.testLatch();return "班長鎖門。。。";
}@GetMapping("test/countdown")
public String testCountDown(){this.stockService.testCountDown();return "出來了一位同學";
}
給StockService添加測試方法:
public void testLatch() {RCountDownLatch latch = this.redissonClient.getCountDownLatch("latch");latch.trySetCount(6);try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}
}public void testCountDown() {RCountDownLatch latch = this.redissonClient.getCountDownLatch("latch");latch.trySetCount(6);latch.countDown();
}
重啟測試,打開兩個頁面:當第二個請求執行6次之后,第一個請求才會執行。
?