分布式鎖之基于redis實現分布式鎖(二)

2. 基于redis實現分布式鎖

2.1. 基本實現

????????借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同時有多個客戶端發送setnx命令,只有一個客戶端可以成功,返回1(true);其他的客戶端返回0(false)。

  1. 多個客戶端同時獲取鎖(setnx)

  2. 獲取成功,執行業務邏輯,執行完成釋放鎖(del)

  3. 其他客戶端等待重試

改造StockService方法:

@Service
public class StockService {@Autowiredprivate StockMapper stockMapper;@Autowiredprivate StringRedisTemplate redisTemplate;public void deduct() {// 加鎖setnxBoolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111");// 重試:遞歸調用if (!lock){try {Thread.sleep(50);this.deduct();} catch (InterruptedException e) {e.printStackTrace();}} else {try {// 1. 查詢庫存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判斷庫存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣減庫存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {// 解鎖this.redisTemplate.delete("lock");}}}
}

其中,加鎖也可以使用循環:

// 加鎖,獲取鎖失敗重試
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "111")){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}
}

解鎖:

// 釋放鎖
this.redisTemplate.delete("lock");

使用Jmeter壓力測試如下:

2.2. 防死鎖

問題:setnx剛剛獲取到鎖,當前服務器宕機,導致del釋放鎖無法執行,進而導致鎖無法鎖無法釋放(死鎖)

解決:給鎖設置過期時間,自動釋放鎖。

設置過期時間兩種方式:

  1. 通過expire設置過期時間(缺乏原子性:如果在setnx和expire之間出現異常,鎖也無法釋放)

  2. 使用set指令設置過期時間:set key value ex 3 nx(既達到setnx的效果,又設置了過期時間)

壓力測試肯定也沒有問題。

2.3. 防誤刪

問題:可能會釋放其他服務器的鎖。

場景:如果業務邏輯的執行時間是7s。執行流程如下

  1. index1業務邏輯沒執行完,3秒后鎖被自動釋放。

  2. index2獲取到鎖,執行業務邏輯,3秒后鎖被自動釋放。

  3. index3獲取到鎖,執行業務邏輯

  4. index1業務邏輯執行完成,開始調用del釋放鎖,這時釋放的是index3的鎖,導致index3的業務只執行1s就被別人釋放。

    最終等于沒鎖的情況。

解決:setnx獲取鎖時,設置一個指定的唯一值(例如:uuid);釋放前獲取這個值,判斷是否自己的鎖

實現如下:

問題:刪除操作缺乏原子性。

場景

  1. index1執行刪除時,查詢到的lock值確實和uuid相等

  2. index1執行刪除前,lock剛好過期時間已到,被redis自動釋放

  3. index2獲取了lock

  4. index1執行刪除,此時會把index2的lock刪除

解決方案:沒有一個命令可以同時做到判斷 + 刪除,所有只能通過其他方式實現(LUA腳本

2.4. redis中的lua腳本

2.4.1. 現實問題

????????redis采用單線程架構,可以保證單個命令的原子性,但是無法保證一組命令在高并發場景下的原子性。例如:

在串行場景下:A和B的值肯定都是3

在并發場景下:A和B的值可能在0-6之間。

極限情況下1:

則A的結果是0,B的結果是3

極限情況下2:

則A和B的結果都是6

????????如果redis客戶端通過lua腳本把3個命令一次性發送給redis服務器,那么這三個指令就不會被其他客戶端指令打斷。Redis 也保證腳本會以原子性(atomic)的方式執行: 當某個腳本正在運行的時候,不會有其他腳本或 Redis 命令被執行。 這和使用 MULTI/ EXEC 包圍的事務很類似。

????????但是MULTI/ EXEC方法來使用事務功能,將一組命令打包執行,無法進行業務邏輯的操作。這期間有某一條命令執行報錯(例如給字符串自增),其他的命令還是會執行,并不會回滾。

2.4.2. lua介紹

????????Lua 是一種輕量小巧的腳本語言,用標準C語言編寫并以源代碼形式開放, 其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。

設計目的

????????其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。

Lua 特性

  • 輕量級:它用標準C語言編寫并以源代碼形式開放,編譯后僅僅一百余K,可以很方便的嵌入別的程序里。

  • 可擴展:Lua提供了非常易于使用的擴展接口和機制:由宿主語言(通常是C或C++)提供這些功能,Lua可以使用它們,就像是本來就內置的功能一樣。

  • 其它特性:

    • 支持面向過程(procedure-oriented)編程和函數式編程(functional programming);

    • 自動內存管理;只提供了一種通用類型的表(table),用它可以實現數組,哈希表,集合,對象;

    • 語言內置模式匹配;閉包(closure);函數也可以看做一個值;提供多線程(協同進程,并非操作系統所支持的線程)支持;

    • 通過閉包和table可以很方便地支持面向對象編程所需要的一些關鍵機制,比如數據抽象,虛函數,繼承和重載等。

2.4.3. lua基本語法

????????對lua腳本感興趣的同學,請移步到官方教程或者《菜鳥教程》。這里僅以redis中可能會用到的部分語法作介紹。

a = 5               -- 全局變量
local b = 5         -- 局部變量, redis只支持局部變量
a, b = 10, 2*x      -- 等價于       a=10; b=2*x

流程控制:

if( 布爾表達式 1)
then--[ 在布爾表達式 1 為 true 時執行該語句塊 --]
elseif( 布爾表達式 2)
then--[ 在布爾表達式 2 為 true 時執行該語句塊 --]
else --[ 如果以上布爾表達式都不為 true 則執行該語句塊 --]
end

2.4.4. redis執行lua腳本 - EVAL指令

在redis中需要通過eval命令執行lua腳本。

格式:

EVAL script numkeys key [key ...] arg [arg ...]
script:lua腳本字符串,這段Lua腳本不需要(也不應該)定義函數。
numkeys:lua腳本中KEYS數組的大小
key [key ...]:KEYS數組中的元素
arg [arg ...]:ARGV數組中的元素

案例1:基本案例

EVAL "return 10" 0

輸出:(integer) 10

案例2:動態傳參

EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 10 20 30 40 50 60 70 80 90
# 輸出:10 20 60 70EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20
# 輸出:0EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10
# 輸出:1

????????傳入了兩個參數10和20,KEYS的長度是1,所以KEYS中有一個元素10,剩余的一個20就是ARGV數組的元素。

????????redis.call()中的redis是redis中提供的lua腳本類庫,僅在redis環境中可以使用該類庫。

案例3:執行redis類庫方法

set aaa 10  -- 設置一個aaa值為10
EVAL "return redis.call('get', 'aaa')" 0
# 通過return把call方法返回給redis客戶端,打印:"10"

????????注意:腳本里使用的所有鍵都應該由 KEYS 數組來傳遞。但并不是強制性的,代價是這樣寫出的腳本不能被 Redis 集群所兼容。

案例4:給redis類庫方法動態傳參

EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 bbb 20

學到這里基本可以應付redis分布式鎖所需要的腳本知識了。

案例5:pcall函數的使用(了解)

-- 當call() 在執行命令的過程中發生錯誤時,腳本會停止執行,并返回一個腳本錯誤,輸出錯誤信息
EVAL "return redis.call('sets', KEYS[1], ARGV[1]), redis.call('set', KEYS[2], ARGV[2])" 2 bbb ccc 20 30
-- pcall函數不影響后續指令的執行
EVAL "return redis.pcall('sets', KEYS[1], ARGV[1]), redis.pcall('set', KEYS[2], ARGV[2])" 2 bbb ccc 20 30

注意:set方法寫成了sets,肯定會報錯。

2.5. 使用lua保證刪除原子性

刪除LUA腳本:

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end

代碼實現:

public void deduct() {String uuid = UUID.randomUUID().toString();// 加鎖setnxwhile (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS)) {// 重試:循環try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}try {// this.redisTemplate.expire("lock", 3, TimeUnit.SECONDS);// 1. 查詢庫存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判斷庫存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣減庫存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {// 先判斷是否自己的鎖,再解鎖String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +"then " +"   return redis.call('del', KEYS[1]) " +"else " +"   return 0 " +"end";this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid);}
}

壓力測試,庫存量也沒有問題,截圖略過。。。

2.6. 可重入鎖

????????由于上述加鎖命令使用了 SETNX ,一旦鍵存在就無法再設置成功,這就導致后續同一線程內繼續加鎖,將會加鎖失敗。當一個線程執行一段代碼成功獲取鎖之后,繼續執行時,又遇到加鎖的子任務代碼,可重入性就保證線程能繼續執行,而不可重入就是需要等待鎖釋放之后,再次獲取鎖成功,才能繼續往下執行。

用一段 Java 代碼解釋可重入:

public synchronized void a() {b();
}public synchronized void b() {// pass
}

????????假設 X 線程在 a 方法獲取鎖之后,繼續執行 b 方法,如果此時不可重入,線程就必須等待鎖釋放,再次爭搶鎖。

????????鎖明明是被 X 線程擁有,卻還需要等待自己釋放鎖,然后再去搶鎖,這看起來就很奇怪,我釋放我自己~

????????可重入性就可以解決這個尷尬的問題,當線程擁有鎖之后,往后再遇到加鎖方法,直接將加鎖次數加 1,然后再執行方法邏輯。退出加鎖方法之后,加鎖次數再減 1,當加鎖次數為 0 時,鎖才被真正的釋放。

????????可以看到可重入鎖最大特性就是計數,計算加鎖的次數。所以當可重入鎖需要在分布式環境實現時,我們也就需要統計加鎖次數。

解決方案:redis + Hash

2.6.1. 加鎖腳本

????????Redis 提供了 Hash (哈希表)這種可以存儲鍵值對數據結構。所以我們可以使用 Redis Hash 存儲的鎖的重入次數,然后利用 lua 腳本判斷邏輯。

if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
thenredis.call('hincrby', KEYS[1], ARGV[1], 1);redis.call('expire', KEYS[1], ARGV[2]);return 1;
elsereturn 0;
end

假設值為:KEYS:[lock], ARGV[uuid, expire]

????????如果鎖不存在或者這是自己的鎖,就通過hincrby(不存在就新增并加1,存在就加1)獲取鎖或者鎖次數加1。

2.6.2. 解鎖腳本

-- 判斷 hash set 可重入 key 的值是否等于 0
-- 如果為 nil 代表 自己的鎖已不存在,在嘗試解其他線程的鎖,解鎖失敗
-- 如果為 0 代表 可重入次數被減 1
-- 如果為 1 代表 該可重入 key 解鎖成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return nil; 
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0; 
else redis.call('del', KEYS[1]); return 1; 
end;

2.6.3. 代碼實現

由于加解鎖代碼量相對較多,這里可以封裝成一個工具類:

DistributedLockClient工廠類具體實現:

@Component
public class DistributedLockClient {@Autowiredprivate StringRedisTemplate redisTemplate;private String uuid;public DistributedLockClient() {this.uuid = UUID.randomUUID().toString();}public DistributedRedisLock getRedisLock(String lockName){return new DistributedRedisLock(redisTemplate, lockName, uuid);}
}

DistributedRedisLock實現如下:

public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid;}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加鎖方法* @param time* @param unit* @return* @throws InterruptedException*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +"   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +"   redis.call('expire', KEYS[1], ARGV[2]) " +"   return 1 " +"else " +"   return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){Thread.sleep(50);}return true;}/*** 解鎖方法*/@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +"   return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +"   return redis.call('del', KEYS[1]) " +"else " +"   return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getId());if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}/*** 給線程拼接唯一標識* @return*/String getId(){return uuid + ":" + Thread.currentThread().getId();}
}

2.6.4. 使用及測試

在業務代碼中使用:

public void deduct() {DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock");redisLock.lock();try {// 1. 查詢庫存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判斷庫存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣減庫存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {redisLock.unlock();}
}

測試:

測試可重入性:

2.7. 自動續期

lua腳本:

if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1; 
else return 0; 
end

在RedisDistributeLock中添加renewExpire方法:

public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid + ":" + Thread.currentThread().getId();}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加鎖方法* @param time* @param unit* @return* @throws InterruptedException*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +"   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +"   redis.call('expire', KEYS[1], ARGV[2]) " +"   return 1 " +"else " +"   return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))){Thread.sleep(50);}// 加鎖成功,返回之前,開啟定時器自動續期this.renewExpire();return true;}/*** 解鎖方法*/@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +"   return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +"   return redis.call('del', KEYS[1]) " +"else " +"   return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}// String getId(){//     return this.uuid + ":" + Thread.currentThread().getId();// }private void renewExpire(){String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +"   return redis.call('expire', KEYS[1], ARGV[2]) " +"else " +"   return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {renewExpire();}}}, this.expire * 1000 / 3);}
}

在tryLock方法中使用:

構造方法作如下修改:

解鎖方法作如下修改:

2.8. 手寫分步式鎖小結

特征

  1. 獨占排他:setnx

  2. 防死鎖:

    redis客戶端程序獲取到鎖之后,立馬宕機。給鎖添加過期時間

    不可重入:可重入

  3. 防誤刪:

    先判斷是否自己的鎖才能刪除

  4. 原子性:

    加鎖和過期時間之間:set k v ex 3 nx

    判斷和釋放鎖之間:lua腳本

  5. 可重入性:hash(key field value) + lua腳本

  6. 自動續期:Timer定時器 + lua腳本

  7. 在集群情況下,導致鎖機制失效:

    1. 客戶端程序10010,從主中獲取鎖

    2. 從還沒來得及同步數據,主掛了

    3. 于是從升級為主

    4. 客戶端程序10086就從新主中獲取到鎖,導致鎖機制失效

鎖操作

加鎖:

  1. setnx:獨占排他 死鎖、不可重入、原子性

  2. set k v ex 30 nx:獨占排他、死鎖 不可重入

  3. hash + lua腳本:可重入鎖

    1. 判斷鎖是否被占用(exists),如果沒有被占用則直接獲取鎖(hset/hincrby)并設置過期時間(expire)

    2. 如果鎖被占用,則判斷是否當前線程占用的(hexists),如果是則重入(hincrby)并重置過期時間(expire)

    3. 否則獲取鎖失敗,將來代碼中重試

  4. Timer定時器 + lua腳本:實現鎖的自動續期

    判斷鎖是否自己的鎖(hexists == 1),如果是自己的鎖則執行expire重置過期時間

解鎖

  1. del:導致誤刪

  2. 先判斷再刪除同時保證原子性:lua腳本

  3. hash + lua腳本:可重入 1. 判斷當前線程的鎖是否存在,不存在則返回nil,將來拋出異常

    1. 存在則直接減1(hincrby -1),判斷減1后的值是否為0,為0則釋放鎖(del),并返回1

    2. 不為0,則返回0

重試:遞歸 循環

2.9. 紅鎖算法

redis集群狀態下的問題:

  1. 客戶端A從master獲取到鎖

  2. 在master將鎖同步到slave之前,master宕掉了。

  3. slave節點被晉級為master節點

  4. 客戶端B取得了同一個資源被客戶端A已經獲取到的另外一個鎖。

安全失效

解決集群下鎖失效,參照redis官方網站針對redlock文檔:https://redis.io/docs/manual/patterns/distributed-locks/

????????在算法的分布式版本中,我們假設有N個Redis服務器。這些節點是完全獨立的,因此我們不使用復制或任何其他隱式協調系統。前幾節已經描述了如何在單個實例中安全地獲取和釋放鎖,在分布式鎖算法中,將使用相同的方法在單個實例中獲取和釋放鎖。將N設置為5是一個合理的值,因此需要在不同的計算機或虛擬機上運行5個Redis主服務器,確保它們以獨立的方式發生故障。

為了獲取鎖,客戶端執行以下操作:

  1. 客戶端以毫秒為單位獲取當前時間的時間戳,作為起始時間

  2. 客戶端嘗試在所有N個實例中順序使用相同的鍵名、相同的隨機值來獲取鎖定。每個實例嘗試獲取鎖都需要時間,客戶端應該設置一個遠小于總鎖定時間的超時時間。例如,如果自動釋放時間為10秒,則嘗試獲取鎖的超時時間可能在5到50毫秒之間。這樣可以防止客戶端長時間與處于故障狀態的Redis節點進行通信:如果某個實例不可用,盡快嘗試與下一個實例進行通信。

  3. 客戶端獲取當前時間 減去在步驟1中獲得的起始時間,來計算獲取鎖所花費的時間。當且僅當客戶端能夠在大多數實例(至少3個)中獲取鎖時,并且獲取鎖所花費的總時間小于鎖有效時間,則認為已獲取鎖。

  4. 如果獲取了鎖,則將鎖有效時間減去 獲取鎖所花費的時間,如步驟3中所計算。

  5. 如果客戶端由于某種原因(無法鎖定N / 2 + 1個實例或有效時間為負)而未能獲得該鎖,它將嘗試解鎖所有實例(即使沒有鎖定成功的實例)。

????????每臺計算機都有一個本地時鐘,我們通常可以依靠不同的計算機來產生很小的時鐘漂移。只有在擁有鎖的客戶端將在鎖有效時間內(如步驟3中獲得的)減去一段時間(僅幾毫秒)的情況下終止工作,才能保證這一點。以補償進程之間的時鐘漂移

????????當客戶端無法獲取鎖時,它應該在隨機延遲后重試,以避免同時獲取同一資源的多個客戶端之間不同步(這可能會導致腦裂的情況:沒人勝)。同樣,客戶端在大多數Redis實例中嘗試獲取鎖的速度越快,出現裂腦情況(以及需要重試)的窗口就越小,因此理想情況下,客戶端應嘗試將SET命令發送到N個實例同時使用多路復用。

????????值得強調的是,對于未能獲得大多數鎖的客戶端,盡快釋放(部分)獲得的鎖有多么重要,這樣就不必等待鎖定期滿才能再次獲得鎖(但是,如果發生了網絡分區,并且客戶端不再能夠與Redis實例進行通信,則在等待密鑰到期時需要付出可用性損失)。

2.10. redisson中的分布式鎖

????????Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。

官方文檔地址:Home · redisson/redisson Wiki · GitHub

2.10.1. 可重入鎖(Reentrant Lock)

????????基于Redis的Redisson分布式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口。

????????大家都知道,如果負責儲存這個分布式鎖的Redisson節點宕機以后,而且這個鎖正好處于鎖住的狀態時,這個鎖會出現鎖死的狀態。為了避免這種情況的發生,Redisson內部提供了一個監控鎖的看門狗,它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期。默認情況下,看門狗檢查鎖的超時時間是30秒鐘,也可以通過修改Config.lockWatchdogTimeout來另行指定。

????????RLock對象完全符合Java的Lock規范。也就是說只有擁有鎖的進程才能解鎖,其他進程解鎖則會拋出IllegalMonitorStateException錯誤。

????????另外Redisson還通過加鎖的方法提供了leaseTime的參數來指定加鎖的時間。超過這個時間后鎖便自動解開了。

RLock lock = redisson.getLock("anyLock");
// 最常見的使用方法
lock.lock();// 加鎖以后10秒鐘自動解鎖
// 無需調用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {lock.unlock();}
}
  1. 引入依賴

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.2</version>
</dependency>
  1. 添加配置

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();// 可以用"rediss://"來啟用SSL連接config.useSingleServer().setAddress("redis://172.16.116.100:6379");return Redisson.create(config);}
}
  1. 代碼中使用

@Autowired
private RedissonClient redissonClient;public void checkAndLock() {// 加鎖,獲取鎖失敗重試RLock lock = this.redissonClient.getLock("lock");lock.lock();// 先查詢庫存是否充足Stock stock = this.stockMapper.selectById(1L);// 再減庫存if (stock != null && stock.getCount() > 0){stock.setCount(stock.getCount() - 1);this.stockMapper.updateById(stock);}// 釋放鎖lock.unlock();
}
  1. 壓力測試

性能跟我們手寫的區別不大。

數據庫也沒有問題

2.10.2. 公平鎖(Fair Lock)

????????基于Redis的Redisson分布式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。同時還提供了異步(Async)、反射式(Reactive)和RxJava2標準的接口。它保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。所有請求線程會在一個隊列中排隊,當某個線程出現宕機時,Redisson會等待5秒后繼續下一個線程,也就是說如果前面有5個線程都處于等待狀態,那么后面的線程會等待至少25秒。

RLock fairLock = redisson.getFairLock("anyLock");
// 最常見的使用方法
fairLock.lock();// 10秒鐘以后自動解鎖
// 無需調用unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();

2.10.3. 聯鎖(MultiLock)

????????基于Redis的Redisson分布式聯鎖RedissonMultiLock對象可以將多個RLock對象關聯為一個聯鎖,每個RLock對象實例可以來自于不同的Redisson實例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 所有的鎖都上鎖成功才算成功。
lock.lock();
...
lock.unlock();

2.10.4. 紅鎖(RedLock)

????????基于Redis的Redisson紅鎖RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也可以用來將多個RLock對象關聯為一個紅鎖,每個RLock對象實例可以來自于不同的Redisson實例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 紅鎖在大部分節點上加鎖成功就算成功。
lock.lock();
...
lock.unlock();

2.10.5. 讀寫鎖(ReadWriteLock)

????????基于Redis的Redisson分布式可重入讀寫鎖RReadWriteLock Java對象實現了java.util.concurrent.locks.ReadWriteLock接口。其中讀鎖和寫鎖都繼承了RLock接口。

????????分布式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處于加鎖狀態。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();// 10秒鐘以后自動解鎖
// 無需調用unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

添加StockController方法:

@GetMapping("test/read")
public String testRead(){String msg = stockService.testRead();return "測試讀";
}@GetMapping("test/write")
public String testWrite(){String msg = stockService.testWrite();return "測試寫";
}

添加StockService方法:

public String testRead() {RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");rwLock.readLock().lock(10, TimeUnit.SECONDS);System.out.println("測試讀鎖。。。。");// rwLock.readLock().unlock();return null;
}public String testWrite() {RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");rwLock.writeLock().lock(10, TimeUnit.SECONDS);System.out.println("測試寫鎖。。。。");// rwLock.writeLock().unlock();return null;
}

打開開兩個瀏覽器窗口測試:

  • 同時訪問寫:一個寫完之后,等待一會兒(約10s),另一個寫開始

  • 同時訪問讀:不用等待

  • 先寫后讀:讀要等待(約10s)寫完成

  • 先讀后寫:寫要等待(約10s)讀完成

2.10.6. 信號量(Semaphore)

????????基于Redis的Redisson的分布式信號量(Semaphore)Java對象RSemaphore采用了與java.util.concurrent.Semaphore相似的接口和用法。同時還提供了異步(Async)、反射式(Reactive)和RxJava2標準的接口。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.trySetPermits(3);
semaphore.acquire();
semaphore.release();

在StockController添加方法:

@GetMapping("test/semaphore")
public String testSemaphore(){this.stockService.testSemaphore();return "測試信號量";
}

在StockService添加方法:

public void testSemaphore() {RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");semaphore.trySetPermits(3);try {semaphore.acquire();TimeUnit.SECONDS.sleep(5);System.out.println(System.currentTimeMillis());semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}
}

添加測試用例:并發10次,循環一次

控制臺效果:

控制臺1:
1606960790234
1606960800337
1606960800443
1606960805248控制臺2:
1606960790328
1606960795332
1606960800245控制臺3:
1606960790433
1606960795238
1606960795437

由此可知:

1606960790秒有3次請求進來:每個控制臺各1次

1606960795秒有3次請求進來:控制臺2有1次,控制臺3有2次

1606960800秒有3次請求進來:控制臺1有2次,控制臺2有1次

1606960805秒有1次請求進來:控制臺1有1次

2.10.7. 閉鎖(CountDownLatch)

????????基于Redisson的Redisson分布式閉鎖(CountDownLatch)Java對象RCountDownLatch采用了與java.util.concurrent.CountDownLatch相似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();// 在其他線程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

需要兩個方法:一個等待,一個計數countDown

給StockController添加測試方法:

@GetMapping("test/latch")
public String testLatch(){this.stockService.testLatch();return "班長鎖門。。。";
}@GetMapping("test/countdown")
public String testCountDown(){this.stockService.testCountDown();return "出來了一位同學";
}

給StockService添加測試方法:

public void testLatch() {RCountDownLatch latch = this.redissonClient.getCountDownLatch("latch");latch.trySetCount(6);try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}
}public void testCountDown() {RCountDownLatch latch = this.redissonClient.getCountDownLatch("latch");latch.trySetCount(6);latch.countDown();
}

重啟測試,打開兩個頁面:當第二個請求執行6次之后,第一個請求才會執行。


?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/163141.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/163141.shtml
英文地址,請注明出處:http://en.pswp.cn/news/163141.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

市場是變化的?這種悖論fpmarkets澳福一秒打破

你是不是始終認為市場是經常變化的&#xff0c;其實這是不對的&#xff0c;這種認識fpmarkets澳福今天一秒打破。 市場經常變化嗎?眾多投資者無需過多思考&#xff0c;就認為答案是肯定的。因為無論是在互聯網的哪個角落&#xff0c;都可以看到這樣的信息。即使我們沒有深入研…

NLP基本知識

NLP基本知識 詞嵌入&詞向量 詞嵌入&#xff08;Word Embedding&#xff09;是一種將單詞或文本轉化為向量表示的技術&#xff0c;它在自然語言處理&#xff08;NLP&#xff09;中廣泛應用。詞嵌入的目標是將文本數據映射到一個低維度的向量空間中&#xff0c;以便計算機可…

Python---函數的嵌套(一個函數里面又調用了另外一個函數)詳解

函數嵌套調用------就是一個函數里面又調用了另外一個函數。 基本語法&#xff1a; # 定義 函數B def funcB():print(這是funcB函數的函數體部分...)# 定義 函數A def funcA():print(- * 80) # 這一行為了更好區分print(這是funcA函數的函數體部分...)# 假設我們在調用funcA…

設計模式-Adapter

定義 適配器設計模式是一種結構型設計模式&#xff0c;用于將一個類的接口變換成客戶端所期待的另一種接口&#xff0c;從而使原本因接口不匹配而無法在一起工作的兩個類能夠在一起工作。 適配器模式包括三種形式&#xff1a;類適配器模式、對象適配器模式、接口適配器模式&a…

Ubuntu18 Opencv3.4.12 viz 3D顯示安裝、編譯、使用、移植

Opencv3.*主模塊默認包括兩個3D庫 calib3d用于相機校準和三維重建 &#xff0c;viz用于三維圖像顯示&#xff0c;其中viz是cmake選配。 參考&#xff1a; https://docs.opencv.org/3.4.12/index.html 下載linux版本的源碼 sources。 查看cmake apt list --installed | grep…

App Cleaner Uninstaller Pro 一鍵清理,徹底卸載Mac應用

隨著科技的不斷發展&#xff0c;Mac電腦已經成為許多用戶工作和娛樂的首選。然而&#xff0c;隨著時間的推移&#xff0c;我們的Mac電腦上可能會堆積大量的無效文件和冗余數據&#xff0c;這不僅占用了寶貴的磁盤空間&#xff0c;還可能影響到系統的運行速度。為了解決這一問題…

基于51單片機zigbee溫室大棚監控系統

**單片機設計介紹&#xff0c;基于51單片機zigbee溫室大棚監控系統 文章目錄 一 概要二、功能設計設計思路 三、 軟件設計原理圖 五、 程序六、 文章目錄 一 概要 基于51單片機和Zigbee技術的溫室大棚監控系統是一種用于監測和控制溫室大棚環境的設備。以下是一個基本的設計介…

STM32 CAN通信自定義數據包多幀連發亂序問題

場景&#xff1a; can標準幀中每一幀只能傳輸8字節&#xff0c;而應用中傳輸一包的內容往往超過8字節&#xff0c;因此需要把一個包拆成多個幀發送&#xff0c;接收端才把收到的多幀重新組裝成一個完整的包 問題描述 在一問一答的兩塊板間通信&#xff0c;多幀連發是能夠按照…

UDP分片和丟包與TCP效果對比

UDP 分片 與 丟包&#xff0c;UDP 真的比 TCP 高效嗎&#xff1f; UDP&#xff08;用戶數據報協議&#xff09;和TCP&#xff08;傳輸控制協議&#xff09;在很多方面都有顯著的區別。總體來說&#xff0c;TCP更適合需要可靠傳輸的應用&#xff0c;例如網頁瀏覽、電子郵件等&a…

信創系列之大數據,分布式數據庫產業鏈跟蹤梳理筆記…

并購優塾 投行界的大叔&#xff0c;大叔界的投行 【產業鏈地圖&#xff0c;版權、內容與免責聲明】1&#xff09;版權&#xff1a;版權所有&#xff0c;違者必究&#xff0c;未經許可不得翻版、摘編、拷貝、復制、傳播。2&#xff09;尊重原創&#xff1a;如有引用未標注來源…

CentOS 7啟動時報“Started Crash recovery kernel arming.....shutdown....”問題處理過程

有臺虛擬機由于CPU負載過高而宕機&#xff0c;宕機重啟后停在“Started Crash recovery kernel arming…shutdown…”階段&#xff0c;如下所示&#xff1a; 重置虛擬機&#xff0c;進入grub菜單&#xff0c;按e編輯啟動選項&#xff0c;在linux16 行末&#xff0c;加上&…

【考研】數據結構(更新到循環鏈表)

聲明&#xff1a;所有代碼都可以運行&#xff0c;可以直接粘貼運行&#xff08;只有庫函數沒有聲明&#xff09; 線性表的定義和基本操作 基本操作 定義 靜態&#xff1a; #include<stdio.h> #include<stdlib.h>#define MaxSize 10//靜態 typedef struct{int d…

【追求卓越02】數據結構--鏈表

引導 今天我們進入鏈表的學習&#xff0c;我相信大家對鏈表都很熟悉。鏈表和數組一樣&#xff0c;作為最基礎的數據結構。在我們的工作中常常會使用到。但是我們真的了解到數組和鏈表的區別嗎&#xff1f;什么時候使用數組&#xff0c;什么時候使用鏈表&#xff0c;能夠正確的選…

監控員工上網有什么軟件

監控員工上網的軟件主要用于監控員工在工作時間內的網絡行為&#xff0c;包括瀏覽網頁、使用社交媒體、發送郵件等。通過監控員工上網行為&#xff0c;企業管理者可以更好地了解員工的工作狀態和行為&#xff0c;規范員工的上網行為&#xff0c;提高工作效率&#xff0c;同時也…

SSL證書對網站的作用及影響?

SSL證書作為當下互聯網的重要安全件&#xff0c;包括搜索引擎的收錄、網站是否具備信任的條件以及HTTP2.0傳輸協議的相互作用等&#xff0c;尤其是瀏覽器對古老的http協議警告提示不安全將直接影響到用戶的信任度以及品牌形象&#xff0c;對于網站來說可謂是必不可少。 SSL證書…

Webstorm 插件文件目錄顏色分析——白藍綠紅黃灰

Webstorm 插件文件目錄【白色、藍色、綠色、紅色、黃色、灰色】對應當前文件發生什么了&#xff0c;即文件夾當前狀態。 WebStrom配置好git或SVN后文件顏色代表的含義&#xff1a; 白色&#xff1a;本地無修改內容 藍色&#xff1a;文件內容有修改&#xff0c;暫未提交到git…

python命令行 引導用戶填寫可用的ip地址和端口號

字多不看&#xff0c;直接體驗 待補充 演示代碼 # -*- coding:UTF-8 -*- """ author: dyy contact: douyaoyuan126.com time: 2023/11/23 10:29 file: 引導用戶填寫可用的ip地址和端口號.py desc: xxxxxx """# region 引入必要的依賴 import …

C語言-判斷上三角矩陣

上三角矩陣指主對角線以下的元素都為0的矩陣&#xff1b;主對角線為從矩陣的左上角至右下角的連線。 本題要求編寫程序&#xff0c;判斷一個給定的方陣是否上三角矩陣。 輸入格式&#xff1a; 輸入第一行給出一個正整數T&#xff0c;為待測矩陣的個數。接下來給出T個矩陣的信…

【LeetCode:2304. 網格中的最小路徑代價 | dijkstra(迪杰斯特拉)】

&#x1f680; 算法題 &#x1f680; &#x1f332; 算法刷題專欄 | 面試必備算法 | 面試高頻算法 &#x1f340; &#x1f332; 越難的東西,越要努力堅持&#xff0c;因為它具有很高的價值&#xff0c;算法就是這樣? &#x1f332; 作者簡介&#xff1a;碩風和煒&#xff0c;…

Vue中使用Echarts實現數據可視化

文章目錄 引言一、安裝Echarts二、引入Echarts三、創建圖表容器四、初始化Echarts實例五、配置圖表選項和數據六、實現圖表更新七、Vue實例代碼結語我是將軍&#xff0c;我一直都在&#xff0c;。&#xff01; 引言 接著上一篇內容&#xff0c;我將繼續分享有關數據可視化的相…