Redis分布式鎖實現

概述

為什么要要分布式鎖

在并發編程中,我們通過鎖,來避免由于競爭而造成的數據不一致問題。
通常,我們以synchronized 、Lock來使用它。Java中的鎖,只能保證在同一個JVM進程內中執行
如果需要在分布式集群環境下的話,便需要分布式鎖

分布式鎖/線程鎖/進程鎖區別

分布式鎖:當多個進程不在同一個系統中(jvm),用分布式鎖控制多個進程對資源的訪問

線程鎖:主要用來給方法、代碼塊加鎖。當某個方法或代碼使用鎖,在同一時刻僅有一個線程執行該方法或該代碼段

  • 線程鎖只在同一JVM中有效果
  • 因為線程鎖的實現在根本上是依靠線程之間共享內存實現的,比如synchronized是共享對象頭,顯示鎖Lock是共享某個變量(state)。

進程鎖:為了控制同一操作系統中多個進程訪問某個共享資源,因為進程具有獨立性,各個進程無法訪問其他進程的資源,因此無法通過synchronized等線程鎖實現進程鎖

分布式鎖的使用場景

雖然線程間并發問題和進程間并發問題都可以通過分布式鎖解決的,但是不推薦這樣去做,因為采用分布式鎖解決這些小問題是非常消耗資源

分布式鎖應該用來解決分布式情況下的多進程并發的問題才最合適

情境:線程A和線程B都共享某個變量X。

  • 如果是單機情況下(單JVM),線程之間共享內存,只要使用線程鎖就可以解決并發問題。
  • 如果是分布式情況下(多JVM),線程A和線程B很可能不是在同一JVM中,這樣線程鎖就無法起到作用了,這時候就要用到分布式鎖來解決

分布式鎖實現邏輯

分布式鎖實現的關鍵是:在分布式的應用服務器外,搭建一個存儲服務器,存儲鎖的信息

實現要點:

  • 鎖信息需要設置過期超時的,不能讓一個線程長期占有一個鎖而導致死鎖
  • 同一時刻只有一個線程可以獲取到鎖

實現方式:

  1. 數據庫樂觀鎖
  2. 基于Redis的分布式鎖

    使用 Redis 實現鎖,主要是將狀態放到 Redis 當中,利用其原子性,當其他線程訪問時,如果 Redis 中已經存在這個狀態,就不允許之后的一些操作

  3. 基于ZooKeeper的分布式鎖

分布式鎖實現要求

鎖的實現同時滿足以下四個條件

  1. 互斥性在任意時刻,只有一個客戶端能持有鎖
  2. 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。也就是設置一個超時時間
  3. 具有容錯性只要大部分的Redis節點正常運行,客戶端就可以加鎖和解鎖
  4. 解鈴還須系鈴人加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。

實現redis分布式鎖需要的命令/API

redis命令

# “set if not exits”,若該key-value不存在,則成功加入緩存并且返回1,否則返回0。
setnx(key, value)# 獲得key對應的value值,若不存在則返回nil。
get(key)# 先獲取key對應的value值,若不存在則返回nil,然后將舊的value更新為新的value。
getset(key, value)# 設置key-value的有效期為seconds秒。
expire(key, seconds)

Set

語法:

  • key: 要設置的鍵。
  • value: 與鍵關聯的值。
  • EX seconds: 設置鍵的過期時間(以秒為單位)。
  • PX milliseconds: 設置鍵的過期時間(以毫秒為單位)。
  • NX: 僅在鍵不存在時設置鍵的值。
  • XX: 僅在鍵已經存在時設置鍵的值
SET key value [EX seconds] [PX milliseconds] [NX|XX]

使用示例

# 基本使用
SET mykey "Hello, Redis!"# 條件設置(僅在鍵不存在時保存, 如果 mykey 不存在,返回 (nil)。)
SET mykey "Hello, Redis!" NX  # 條件更新(僅在鍵已經存在時)
SET mykey "New Value" XX# 設置過期時間(以毫秒為單位) 5000毫秒后過期
SET mykey "Hello, Redis!" PX 5000 
# 設置過期時間(以秒為單位) 5000秒后過期
SET mykey "Hello, Redis!" EX 5000 

總結:

  • SET 命令 是 Redis 中最常用的命令之一,用于存儲鍵值對
  • 通過 EX 和 PX 設置過期時間
  • 通過 NX 和 XX 控制設置的條件`。

Del

在 Redis 中,DEL 命令用于刪除一個或多個鍵。這個命令可以用來清除不再需要的數據。
參數說明

  • key: 要刪除的鍵,可以指定一個或多個鍵。

返回值:返回被刪除的鍵的數量。如果指定的鍵不存在,則不會報錯,返回值仍然是被刪除的鍵的數量

DEL key [key ...]

示例

# 刪除單個鍵。如果 mykey 存在,返回 1;如果不存在,返回 0。
DEL mykey# 刪除多個鍵。如果 key1、key2 和 key3 中的某些鍵存在,返回被刪除的鍵的數量。
DEL key1 key2 key3

注意事項

  • 使用 DEL 命令時,如果鍵不存在,不會報錯,返回值仍然是被刪除的鍵的數量
  • DEL 命令是一個 O(1) 操作,但在刪除大量鍵時,可能會影響性能

call

在 Redis 中,CALL 命令并不是一個直接的命令,而是 Lua 腳本中用于調用 Redis 命令的函數

通過 redis.call,你可以在 Lua 腳本中執行 Redis 的原生命令。

redis.call('COMMAND_NAME', arg1, arg2, ...)

詳細說明

  • KEYS: 在腳本中,KEYS 是一個數組,包含傳遞給腳本的所有鍵
  • ARGV: 你也可以使用 ARGV 數組來傳遞額外的參數

示例

-- Lua 腳本:將兩個鍵的值相加并返回結果
local value1 = redis.call('GET', KEYS[1])  -- 獲取第一個鍵的值
local value2 = redis.call('GET', KEYS[2])  -- 獲取第二個鍵的值
return value1 + value2  -- 返回兩個值的和

總結

  • redis.call 是在 Lua 腳本中執行 Redis 命令的方式。
  • 通過 KEYS 和 ARGV 數組,可以靈活地傳遞鍵和參數
  • Lua 腳本的執行是原子性的,可以提高操作的效率

Jedis 接口

pom依賴

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version>
</dependency>

eval()

在 Jedis 中,eval 方法用于執行 Lua 腳本

通過這個方法,你可以在 Redis 服務器上運行 Lua 腳本,從而實現原子操作和復雜的邏輯處理。

String result = jedis.eval(String script, List<String> keys, List<String> args);

參數說明

  1. script: 要執行的 Lua 腳本,作為字符串傳入
  2. keys: 需要在腳本中使用的鍵的列表。鍵的數量可以在腳本中通過 KEYS 表達式訪問。
  3. args: 傳遞給腳本的參數列表。參數的數量可以在腳本中通過 ARGV 表達式訪問。

注意事項

  • 原子性: Lua 腳本在 Redis 中是原子執行的,這意味著在腳本執行期間,其他命令不會干擾。
  • 性能: 使用 Lua 腳本可以減少網絡往返次數,提高性能,尤其是在需要執行多個命令時。
  • 調試: Lua 腳本的調試相對較難,因此在編寫時要確保邏輯正確。

示例

import redis.clients.jedis.Jedis;public class RedisLuaExample {public static void main(String[] args) {Jedis jedis = new Jedis("localhost");// Lua 腳本:將兩個鍵的值相加并返回結果String script = "return redis.call('GET', KEYS[1]) + redis.call('GET', KEYS[2])";// 需要使用的鍵List<String> keys = Arrays.asList("key1", "key2");// 執行腳本String result = jedis.eval(script, keys, Collections.emptyList()).toString();System.out.println("Result: " + result);jedis.close();}
}

API(Springboot)

Redisson Pom依賴

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.10.1</version>
</dependency>

redis pom依賴

<!-- 引入redis依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

spring boot使用Redis的操作主要是通過RedisTemplate(或StringRedisTemplate )來實現

RedisTemplate和StringRedisTemplate的區別

RedisTemplate和StringRedisTemplate的區別:

  • 兩者的關系是StringRedisTemplate繼承RedisTemplate
  • 兩者的數據是不共通的;也就是說StringRedisTemplate只能管理StringRedisTemplate里面的數據,RedisTemplate只能管理RedisTemplate中的數據
  • SDR默認采用的序列化策略有兩種,一種是String的序列化策略,一種是JDK的序列化策略。
  • StringRedisTemplate默認采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的(StringRedisSerializer)。
  • RedisTemplate默認采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。(JdkSerializationRedisSerializer)

總結:

  • StringRedisTemplate:當你的redis數據庫里面本來存的是字符串數據或者你要存取的數據就是字符串類型數據的時候

    Redis當中的數據值是以數組形式顯示出來的時候,只能使用RedisTemplate才能獲取到里面的數據

  • RedisTemplate:但是如果你的數據是復雜的對象類型,而取出的時候又不想做任何的數據轉換,直接從Redis里面取出一個對象。

    Redis當中的數據值是以可讀形式顯示出來的時候,只能使用StringRedisTemplate才能獲取到里面的數據

redisTemplate

// 將鎖狀態放入 Redis:setIfAbsent如果鍵不存在則新增,存在則不改變已經有的值。
redisTemplate.opsForValue().setIfAbsent("lockkey", "value"); 
// 設置鎖的過期時間
redisTemplate.expire("lockkey", 30000, TimeUnit.MILLISECONDS);//spring-data-redis 2.1 之后版本,加鎖的同時設置過期時間,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);// 刪除/解鎖
redisTemplate.delete("lockkey");// 獲取鎖
redisTemplate.opsForValue().get("lockkey");

StringRedisTemplate

@Autowired
private StringRedisTemplate stringRedisTemplate;//在設置值的同時指定過期時間, 時間單位 s
stringRedisTemplate.opsForValue().set("key","value",7200, TimeUnit.SECONDS);//刪除key對應的鍵值對
stringRedisTemplate.opsForValue().delete("key");//獲取對應key的value
stringRedisTemplate.opsForValue().get("key");

實現redis分布式鎖

單節點Redis的分布式鎖

如果你的項目中Redis是多機部署的,那么可以嘗試使用Redisson實現分布式鎖,

加鎖 實現

加鎖實際上就是在redis中,給Key鍵設置一個值,為避免死鎖,并給定一個過期時間

但是不建議分別使用加鎖和設置超時這兩個命令去設置值和過期時間,因為違背了原子性,也就是一旦鎖被創建,而沒有設置過期時間,則鎖會一直存在

Jedis 實現
參數說明

第一個為key,我們使用key來當鎖,因為key是唯一的

第二個為value,我們傳的是requestId,可靠性保證,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。requestId可以使用UUID.randomUUID().toString()方法生成

第三個為NX(鍵不存在時設置值),這個參數的意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;表示僅在鍵不存在時設置值

redis在set時,如果原先有值,SET 命令會返回 "OK"

第四個為PX(毫秒為單位),這個參數我們傳的是PX,意思是我們要給這個key加一個過期時間的設置,具體時間由第五個參數決定

第五個為expireTime,與第四個參數相呼應,代表key的過期時間大小

代碼實現

public class RedisTool {private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";/*** 嘗試獲取分布式鎖** @param jedis      Redis客戶端* @param lockKey    鎖* @param requestId  請求標識* @param expireTime 超期時間* @return 是否獲取成功*/public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);if (LOCK_SUCCESS.equals(result)) {return true;}return false;}
}
redis命令
-- 設置一個鍵 myLock,值為 12345,并且希望在 5000 毫秒后過期
SET myLock 12345 NX PX 5000
springBoot

但是在spring-data-redis 2.1 之后的版本,便可以直接設置過期時間了

spring-data-redis 2.1 前

spring-data-redis 2.1 前的版本:在jedis當中是有這種原子操作的方法的,需要通過 RedisTemplate 的 execute 方法獲取到jedis里操作命令的對象設置

String result = template.execute(new RedisCallback<String>() {@Overridepublic String doInRedis(RedisConnection connection) throws DataAccessException {JedisCommands commands = (JedisCommands) connection.getNativeConnection();return commands.set(key, "鎖定的資源", "NX", "PX", 3000);}
});
spring-data-redis 2.1 后

spring-data-redis 2.1 之后的版本

//加鎖的同時設置過期時間,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);

解鎖實現

解鎖的過程就是將Key鍵刪除。但也不能亂刪,不能說客戶端1的請求將客戶端2的鎖給刪除掉

為什么不直接刪除鎖?這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的

Jedis實現
代碼
public class RedisTool {private static final Long RELEASE_SUCCESS = 1L;/*** 釋放分布式鎖** @param jedis     Redis客戶端* @param lockKey   鎖* @param requestId 請求標識* @return 是否釋放成功*/public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));if (RELEASE_SUCCESS.equals(result)) {return true;}return false;}
}
為什么不直接刪除

為什么不直接使用jedis.del()方法刪除鎖?這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的

 jedis.del(lockKey);
為什么不判斷是后在刪除

如果想判斷是不是這個客戶端的鎖,再去解鎖行不行不行,如果調用jedis.del()方法的時候,這把鎖已經不屬于當前客戶端的時候會解除他人加的鎖。

比如客戶端A加鎖,一段時間之后客戶端A解鎖,在執行jedis.del()之前,鎖突然過期了,此時客戶端B嘗試加鎖成功,然后客戶端A再執行del()方法,則將客戶端B的鎖給解除了

 // 判斷加鎖與解鎖是不是同一個客戶端
if (requestId.equals(jedis.get(lockKey))) {// 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖jedis.del(lockKey);
}
Springboot
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {// 生成一個唯一的 UUID 作為鎖的標識String uuid = UUID.randomUUID().toString();// 獲取 Redis 的 ValueOperations 對象ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();// 嘗試在 Redis 中設置鎖,過期時間為 5 秒Boolean lock = ops.setIfAbsent("lock", uuid, 5, TimeUnit.SECONDS);// 如果成功獲取到鎖if (lock) {// 獲取分類數據Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();// 獲取當前鎖的值String lockValue = ops.get("lock");// Lua 腳本,用于安全釋放鎖String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +"    return redis.call(\"del\",KEYS[1])\n" +"else\n" +"    return 0\n" +"end";// 執行 Lua 腳本,釋放鎖stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);// 返回獲取的分類數據return categoriesDb;} else {// 如果未能獲取到鎖,線程休眠 100 毫秒try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace(); // 打印異常堆棧}// 遞歸調用,嘗試再次獲取鎖return getCatalogJsonDbWithRedisLock();}
}

多節點的redis分布式鎖

概述

如果負責儲存這個分布式鎖的Redisson節點宕機以后,而且這個鎖正好處于鎖住的狀態時,這個鎖會出現鎖死的狀態

為了避免這種情況的發生,Redisson內部提供了一個監控鎖的看門狗,它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期

默認情況下,看門狗的檢查鎖的超時時間是30秒鐘,也可以通過修改Config.lockWatchdogTimeout來另行指定

另外Redisson還通過加鎖的方法提供了leaseTime的參數來指定加鎖的時間。超過這個時間后鎖便自動解開了

高并發場景下的問題

高并發場景下如下問題:

  • 主從切換后,原從庫被推舉為主庫,當在其他請求加鎖的時候,連接的redis可能還沒有同步到第一次加的鎖,造成鎖失效。
  • 主庫發生故障,加鎖完成,還未同步到從節點或者集群中其他節點的時候,當前節點掛掉,鎖就丟失了。
  • 兩種情況導致出現的原因就是redis的數據同步是異步的

Redisson

相對于Jedis而言,Redisson強大很多。當然了,隨之而來的就是它的復雜性。它里面也實現了分布式鎖,而且包含多種類型的鎖

具體內容:分布式鎖和同步器

可重入鎖(Reentrant Lock)示例

下述已可重入鎖(Reentrant Lock)示例,獲取客戶端進行加解鎖操作如下

public static void main(String[] args) {// 創建 Redis 配置對象Config config = new Config();// 設置 Redis 服務器地址config.useSingleServer().setAddress("redis://127.0.0.1:6379");// 設置 Redis 服務器密碼config.useSingleServer().setPassword("redis1234");// 創建 Redisson 客戶端實例final RedissonClient client = Redisson.create(config);  // 獲取名為 "lock1" 的分布式鎖RLock lock = client.getLock("lock1");try {// 嘗試獲取鎖,最多等待 10 秒,鎖定 30 秒if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {try {// 在此處執行需要加鎖的操作// 例如:處理共享資源或執行關鍵業務邏輯} finally {// 確保在操作完成后釋放鎖lock.unlock(); }} else {// 如果無法獲取鎖,輸出提示信息System.out.println("無法獲取鎖,操作被跳過");}} catch (InterruptedException e) {// 如果線程被中斷,恢復中斷狀態Thread.currentThread().interrupt(); // 輸出中斷信息System.out.println("線程被中斷");}
}
加鎖

調用lock方法,定位到lockInterruptibly。在這里,完成了加鎖的邏輯。

代碼
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 當前線程的 IDlong threadId = Thread.currentThread().getId();// 嘗試獲取鎖,返回值為鎖的剩余時間(TTL)Long ttl = tryAcquire(leaseTime, unit, threadId);// 如果 ttl 為空,則證明獲取鎖成功if (ttl == null) {return; // 成功獲取鎖,直接返回}// 如果獲取鎖失敗,則訂閱到對應這個鎖的 channelRFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future); // 同步訂閱,等待鎖的釋放try {while (true) {// 再次嘗試獲取鎖ttl = tryAcquire(leaseTime, unit, threadId);// ttl 為空,說明成功獲取鎖,跳出循環if (ttl == null) {break; // 成功獲取鎖,退出循環}// ttl 大于 0 則等待 ttl 時間后繼續嘗試獲取if (ttl >= 0) {// 嘗試在 ttl 時間內獲取鎖getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// ttl 小于 0,表示無限期等待getEntry(threadId).getLatch().acquire();}}} finally {// 取消對 channel 的訂閱,確保資源釋放unsubscribe(future, threadId);}// get(lockAsync(leaseTime, unit)); // 可能是異步獲取鎖的邏輯,注釋掉的部分
}// 兩種處理方式,一種是帶有過期時間的鎖,一種是不帶過期時間的鎖。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {// 如果帶有過期時間,則按照普通方式獲取鎖if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 先按照 30 秒的過期時間來執行獲取鎖的方法RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);// 如果還持有這個鎖,則開啟定時任務不斷刷新該鎖的過期時間ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return; // 如果獲取鎖失敗,直接返回}Long ttlRemaining = future.getNow();// 鎖已成功獲取if (ttlRemaining == null) {// 開啟定時任務以刷新鎖的過期時間scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture; // 返回異步獲取鎖的結果
}// 正執行獲取鎖的邏輯,它是一段LUA腳本代碼,hash數據結構。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     long threadId, RedisStrictCommand<T> command) {// 將過期時間轉換為毫秒internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// 如果鎖不存在,則通過 hset 設置它的值,并設置過期時間"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果鎖已存在,并且鎖是當前線程,則通過 hincrby 給數值遞增 1"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果鎖已存在,但并非本線程,則返回過期時間 ttl"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
流程圖
存在
不存在
是當前線程
不是當前線程
失敗
獲取失敗
獲取成功
獲取成功
開始
嘗試獲取鎖
判斷鎖是否存在
設置值和過去時間
加鎖成功
結束
判斷鎖是否為當前線程
1:鎖的值遞增加1, 2:設置過去時間
加鎖失敗,返回當前鎖的過期時間
獲取成功/失敗
訂閱對應鎖的channel
再次嘗試獲取鎖
獲取成功/失敗
等待channel是否鎖信息
解鎖

通過調用unlock方法來解鎖。

代碼如下
// 這是一個公開的異步解鎖方法,接受當前線程的 ID 作為參數,返回一個 RFuture<Void> 對象。
public RFuture<Void> unlockAsync(final long threadId) {// 創建一個 Promise 對象,用于異步操作的結果final RPromise<Void> result = new RedissonPromise<Void>();// 調用解鎖的內部異步方法RFuture<Boolean> future = unlockInnerAsync(threadId);// 添加監聽器以處理解鎖操作的結果future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {// 檢查解鎖操作是否成功if (!future.isSuccess()) {// 如果失敗,取消過期時間的續期任務cancelExpirationRenewal(threadId);// 將失敗原因傳遞給 Promiseresult.tryFailure(future.cause());return;}// 獲取解鎖操作的返回值Boolean opStatus = future.getNow();// 如果返回值為空,表示當前線程未持有鎖,拋出異常if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}// 解鎖成功,取消刷新過期時間的定時任務if (opStatus) {cancelExpirationRenewal(null);}// 將成功結果傳遞給 Promiseresult.trySuccess(null);}});// 返回 Promise 對象return result;
}// 執行 Redis 腳本:使用 evalWriteAsync 方法執行 Lua 腳本,進行原子操作。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,//如果鎖已經不存在, 發布鎖釋放的消息"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +//如果釋放鎖的線程和已存在鎖的線程不是同一個線程,返回null"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//通過hincrby遞減1的方式,釋放一次鎖//若剩余次數大于0 ,則刷新過期時間"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +//否則證明鎖已經釋放,刪除key并發布鎖釋放的消息"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}
流程圖
存在
同一個線程
小于等于0
大于0
不是同一個線程
不存在
開始
exist判斷鎖是否存在
判斷解鎖線程和當前線程是否為同一個線程
遞減解鎖,獲取剩余次數
剩余次數是否大于等于0
刪除key,并發布鎖釋放信息
結束
解鎖成功
解鎖失敗

redlock

使用流程

redlock的使用流程大致如下:

  1. 客戶端獲取到當前的時間戳。
  2. 客戶端按順序向部署的N個Redis實例執行加鎖操作。在設定時間內,不管加鎖成功還是失敗,都會繼續向下一個實例申請加鎖操作
  3. 加鎖成功的實例個數>= (N/2) + 1,并且加鎖的總耗時要<鎖設定的過期時間,Redlock就判斷加鎖成功,反之就是加鎖失敗。
  4. 加鎖成功了,就繼續往下操作,比如操作MySQL資源;若加鎖失敗,則會向所有節點發起鎖釋放的操作請求。
設計規則

Redlock的設計規則就是:

  • 客戶端要在所有實例上申請加鎖,只有保證大多數節點加鎖成功了才判定為加鎖成功
  • 加鎖的總耗時要 < 鎖設定的過期時間
  • 釋放鎖的時候,要向所有節點發起鎖釋放的請求,不管之前加鎖是否成功為了確保只釋放自己的鎖,需要用前面提到的 Lua 腳本來代替直接使用 DEL 命令進行解鎖操作
代碼
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 實現要點之允許加鎖失敗節點限制(N-(N/2+1))int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());// 實現要點之遍歷所有節點通過EVAL命令執行lua加鎖for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {// 對節點嘗試加鎖lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);} catch (RedisConnectionClosedException|RedisResponseTimeoutException e) {// 如果拋出這類異常,為了防止加鎖成功,但是響應失敗,需要解鎖unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {// 拋出異常表示獲取鎖失敗lockAcquired = false;}if (lockAcquired) {// 成功獲取鎖集合acquiredLocks.add(lock);} else {// 如果達到了允許加鎖失敗節點限制,那么break,即此次Redlock加鎖失敗if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}               }}return true;
}
RedLock 紅鎖 已經廢棄

RedLock 紅鎖:RedLock 會對集群的每個節點進行加鎖,如果大多數(N/2+1)加鎖成功了,則認為獲取鎖成功

  • 這個過程中可能會因為網絡問題,或節點超時的問題,影響加鎖的性能,故而在最新的 Redisson 版本中中已經正式宣布廢棄 RedLock

redisTemplate

為了解決多節點的上述問題,可以使用redisTemplate中的setIfAbsent方法

setIfAbsent方法是原子性的

  • 單個 Redis 實例:在單個 Redis 實例中,setIfAbsent 是原子操作,確保在鍵不存在時才會設置值。
  • Redis 集群:在 Redis 集群中,setIfAbsent 仍然是原子操作,但它只在同一個分片(slot)內有效。如果不同的節點(分片)之間存在競爭條件,可能會導致不一致的結果

可以在這個方法中,構造一個和可重用鎖差不多的代碼,及判斷當前線程是否為加鎖線程,去實現多節點先的分布式鎖

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/80464.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/80464.shtml
英文地址,請注明出處:http://en.pswp.cn/web/80464.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

軟件設計師-錯題筆記-網絡基礎知識

1. 解析&#xff1a; 1.子網劃分相關知識&#xff1a; 在IPv4地址中&#xff0c;/27表示子網掩碼為255.255.255.224&#xff0c;它將一個C類網絡&#xff08;默認子網掩碼255.255.255.0&#xff09;進一步劃分 對于子網掩碼255.255.255.224&#xff0c;其對應的二進制為111…

Fine-Tuning Llama2 with LoRA

Fine-Tuning Llama2 with LoRA 1. What is LoRA?2. How does LoRA work?3. Applying LoRA to Llama2 models4. LoRA finetuning recipe in torchtune5. Trading off memory and model performance with LoRAModel ArgumentsReferences https://docs.pytorch.org/torchtune/ma…

python打卡day29

類的裝飾器 知識點回顧 類的裝飾器裝飾器思想的進一步理解&#xff1a;外部修改、動態類方法的定義&#xff1a;內部定義和外部定義 回顧一下&#xff0c;函數的裝飾器是 &#xff1a;接收一個函數&#xff0c;返回一個修改后的函數。類也有修飾器&#xff0c;類裝飾器本質上確…

十一、STM32入門學習之FREERTOS移植

目錄 一、FreeRTOS1、源碼下載&#xff1a;2、解壓源碼 二、移植步驟一&#xff1a;在需要移植的項目中新建myFreeRTOS的文件夾&#xff0c;用于存放FREERTOS的相關源碼步驟二&#xff1a;keil中包含相關文件夾和文件引用路徑步驟三&#xff1a;修改FreeRTOSConfig.h文件的相關…

2025 年十大網絡安全預測

隨著我們逐步邁向 2026 年&#xff0c;網絡安全領域正處于一個關鍵的轉折點&#xff0c;技術創新與數字威脅以前所未有的復雜態勢交織在一起。 地緣政治環境進一步加劇了這些網絡安全挑戰&#xff0c;國際犯罪組織利用先進的技術能力來追求戰略目標。 人工智能在這一不斷演變…

Mac 環境下 JDK 版本切換全指南

概要 在 macOS 上安裝了多個 JDK 后&#xff0c;可以通過系統自帶的 /usr/libexec/java_home 工具來查詢并切換不同版本的 Java。只需在終端中執行 /usr/libexec/java_home -V 列出所有已安裝的 JDK&#xff0c;然后將你想使用的版本路徑賦值給環境變量 JAVA_HOME&#xff0c;…

中級網絡工程師知識點6

1.堆疊方式可以共享使用交換機背板帶寬&#xff1b;級聯方式可以使用雙絞線將交換機連接在一起 2.光功率計是專門測量光功率大小的儀器&#xff0c;在對光纜進行檢測時&#xff0c;通過在光纜的發送端和接收端分別測量光功率&#xff0c;進而計算出光衰情況。 3.光時域反射計…

動態規劃——烏龜棋

題目描述 解題思路 首先這是一個很明顯的線性dp的題目&#xff0c;很容易發現規律 數據輸入 我們用 h[ N ] 數組存儲每一個格子的分數 用 cnt [ ]&#xff0c;數組表示每一中卡片的數目 1&#xff0c;狀態表示 因為這里一個有4種跳躍方式可以選擇 f[ i ][ a ][ b ][ c ][ d…

C#自定義控件-實現了一個支持平移、縮放、雙擊重置的圖像顯示控件

1. 控件概述 這是一個繼承自 Control 的自定義控件&#xff0c;主要用于圖像的顯示和交互操作&#xff0c;具有以下核心功能&#xff1a; 圖像顯示與縮放&#xff08;支持鼠標滾輪縮放&#xff09;圖像平移&#xff08;支持鼠標拖拽&#xff09;視圖重置&#xff08;雙擊重置…

C++ map multimap 容器:賦值、排序、大小與刪除操作

概述 map和multimap是C STL中的關聯容器&#xff0c;它們存儲的是鍵值對(key-value pairs)&#xff0c;并且會根據鍵(key)自動排序。兩者的主要區別在于&#xff1a; map不允許重復的鍵multimap允許重復的鍵 本文將詳細解析示例代碼中涉及的map操作&#xff0c;包括賦值、排…

AI Agent開發第70課-徹底消除RAG知識庫幻覺(4)-解決知識庫問答時語料“總重復”問題

開篇 “解決知識庫幻覺”系列還在繼續,這是因為:如果只是個人玩玩,像自媒體那些說的什么2小時搭一個知識庫+deepseek不要太香一類的RAG或者是基于知識庫的應用肯定是沒法用在企業級落地上的。 我們真的經歷過或者正在經歷的人都是知道的,怎么可能2小時就搭建完成一個知識…

【DAY22】 復習日

內容來自浙大疏錦行python打卡訓練營 浙大疏錦行 仔細回顧一下之前21天的內容 作業&#xff1a; 自行學習參考如何使用kaggle平臺&#xff0c;寫下使用注意點&#xff0c;并對下述比賽提交代碼 kaggle泰坦里克號人員生還預測

【Docker】Docker Compose方式搭建分布式協調服務(Zookeeper)集群

開發分布式應用時,往往需要高度可靠的分布式協調,Apache ZooKeeper 致力于開發和維護開源服務器&#xff0c;以實現高度可靠的分布式協調。具體內容見zookeeper官網。現代應用往往使用云原生技術進行搭建,如何用Docker搭建Zookeeper集群,這里介紹使用Docker Compose方式搭建分布…

若依框架Consul微服務版本

1、最近使用若依前后端分離框架改造為Consul微服務版本 在這里分享出來供大家參考 # Consul微服務配置參數已經放置/bin/Consul微服務配置目錄 倉庫地址&#xff1a; gitee&#xff1a;https://gitee.com/zlxls/Ruoyi-Consul-Cloud.git gitcode&#xff1a;https://gitcode.c…

BOM知識點

BOM&#xff08;Browser Object Model&#xff09;即瀏覽器對象模型&#xff0c;是用于訪問和操作瀏覽器窗口的編程接口。以下是一些BOM的知識點總結&#xff1a; 核心對象 ? window&#xff1a;BOM的核心對象&#xff0c;代表瀏覽器窗口。它也是全局對象&#xff0c;所有全…

什么是遷移學習(Transfer Learning)?

什么是遷移學習&#xff08;Transfer Learning&#xff09;&#xff1f; 一句話概括 遷移學習研究如何把一個源領域&#xff08;source domain&#xff09;/源任務&#xff08;source task&#xff09;中獲得的知識遷移到目標領域&#xff08;target domain&#xff09;/目標任…

[創業之路-362]:企業戰略管理案例分析-3-戰略制定-華為使命、愿景、價值觀的演變過程

一、華為使命、愿景、價值觀的演變過程 1、創業初期&#xff08;1987 - 1994 年&#xff09;&#xff1a;生存導向&#xff0c;文化萌芽 使命愿景雛形&#xff1a;1994年華為提出“10年之后&#xff0c;世界通信行業三分天下&#xff0c;華為將占一份”的宏偉夢想&#xff0c…

Python黑魔法與底層原理揭秘:突破語言邊界的深度探索

Python黑魔法與底層原理揭秘&#xff1a;突破語言邊界的深度探索 開篇&#xff1a;超越表面的Python Python常被稱為"膠水語言"&#xff0c;但其真正的威力在于對底層的高度可控性。本文將揭示那些鮮為人知的Python黑魔法&#xff0c;帶你深入CPython實現層面&…

Es的text和keyword類型以及如何修改類型

昨天同事觸發定時任務發現es相關服務報了一個序列化問題&#xff0c; 今天早上捕獲異常將異常堆棧全部打出來看&#xff0c;才發現是聚合的字段不是keyword類型的問題。 到kibbna命令行執行也是一樣的錯誤 使用 /_mapping查看索引的字段類型&#xff0c;才發現userUniqueid是te…

大語言模型 07 - 從0開始訓練GPT 0.25B參數量 - MiniMind 實機訓練 預訓練 監督微調

寫在前面 GPT&#xff08;Generative Pre-trained Transformer&#xff09;是目前最廣泛應用的大語言模型架構之一&#xff0c;其強大的自然語言理解與生成能力背后&#xff0c;是一個龐大而精細的訓練流程。本文將從宏觀到微觀&#xff0c;系統講解GPT的訓練過程&#xff0c;…