概述
為什么要要分布式鎖
在并發編程中,我們通過鎖,來避免由于競爭而造成的數據不一致問題。
通常,我們以synchronized 、Lock來使用它。Java中的鎖,只能保證在同一個JVM進程內中執行
如果需要在分布式集群環境下的話,便需要分布式鎖
分布式鎖/線程鎖/進程鎖區別
分布式鎖
:當多個進程不在同一個系統中(jvm),用分布式鎖控制多個進程對資源的訪問
。
線程鎖
:主要用來給方法、代碼塊加鎖。當某個方法或代碼使用鎖,在同一時刻僅有一個線程執行該方法或該代碼段。
- 線程鎖
只在同一JVM中有效果
, - 因為線程鎖的實現在
根本上是依靠線程之間共享內存實現
的,比如synchronized是共享對象頭,顯示鎖Lock是共享某個變量(state)。
進程鎖
:為了控制同一操作系統中多個進程訪問某個共享資源,因為進程具有獨立性,各個進程無法訪問其他進程的資源
,因此無法通過synchronized等線程鎖實現進程鎖。
分布式鎖的使用場景
雖然線程間并發問題和進程間并發問題都可以通過分布式鎖解決的,但是不推薦這樣去做,因為采用分布式鎖解決這些小問題是非常消耗資源
分布式鎖應該用來解決分布式情況下的多進程并發的問題
才最合適
情境:線程A和線程B都共享某個變量X。
- 如果是單機情況下(單JVM),線程之間共享內存,只要
使用線程鎖就可以解決
并發問題。 - 如果是分布式情況下(多JVM),線程A和線程B很可能不是在同一JVM中,這樣線程鎖就無法起到作用了,這時候就
要用到分布式鎖來解決
分布式鎖實現邏輯
分布式鎖實現的關鍵是:在分布式的應用服務器外,搭建一個存儲服務器,存儲鎖的信息
實現要點:
鎖信息需要設置過期超時
的,不能讓一個線程長期占有一個鎖而導致死鎖同一時刻只有一個線程可以獲取到鎖
。
實現方式:
數據庫樂觀鎖
;基于Redis的分布式鎖
;使用 Redis 實現鎖,主要是
將狀態放到 Redis 當中,利用其原子性,當其他線程訪問時,如果 Redis 中已經存在這個狀態,就不允許之后的一些操作
基于ZooKeeper的分布式鎖
分布式鎖實現要求
鎖的實現同時滿足以下四個條件
:
互斥性
。在任意時刻,只有一個客戶端能持有鎖
。不會發生死鎖
。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。也就是設置一個超時時間具有容錯性
。只要大部分的Redis節點正常運行,客戶端就可以加鎖和解鎖。解鈴還須系鈴人
。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。
實現redis分布式鎖需要的命令/API
redis命令
# “set if not exits”,若該key-value不存在,則成功加入緩存并且返回1,否則返回0。
setnx(key, value)# 獲得key對應的value值,若不存在則返回nil。
get(key)# 先獲取key對應的value值,若不存在則返回nil,然后將舊的value更新為新的value。
getset(key, value)# 設置key-value的有效期為seconds秒。
expire(key, seconds)
Set
語法:
- key: 要設置的鍵。
- value: 與鍵關聯的值。
- EX seconds: 設置鍵的過期時間(以秒為單位)。
- PX milliseconds: 設置鍵的過期時間(以毫秒為單位)。
- NX: 僅在鍵不存在時設置鍵的值。
- XX: 僅在鍵已經存在時設置鍵的值
SET key value [EX seconds] [PX milliseconds] [NX|XX]
使用示例
# 基本使用
SET mykey "Hello, Redis!"# 條件設置(僅在鍵不存在時保存, 如果 mykey 不存在,返回 (nil)。)
SET mykey "Hello, Redis!" NX # 條件更新(僅在鍵已經存在時)
SET mykey "New Value" XX# 設置過期時間(以毫秒為單位) 5000毫秒后過期
SET mykey "Hello, Redis!" PX 5000
# 設置過期時間(以秒為單位) 5000秒后過期
SET mykey "Hello, Redis!" EX 5000
總結:
- SET 命令 是 Redis 中最常用的命令之一,用于存儲鍵值對。
- 通過 EX 和 PX 設置過期時間
- 通過 NX 和 XX 控制設置的條件`。
Del
在 Redis 中,DEL 命令用于刪除一個或多個鍵。這個命令可以用來清除不再需要的數據。
參數說明
- key: 要刪除的鍵,可以指定一個或多個鍵。
返回值:返回被刪除的鍵的數量
。如果指定的鍵不存在,則不會報錯,返回值仍然是被刪除的鍵的數量。
DEL key [key ...]
示例
# 刪除單個鍵。如果 mykey 存在,返回 1;如果不存在,返回 0。
DEL mykey# 刪除多個鍵。如果 key1、key2 和 key3 中的某些鍵存在,返回被刪除的鍵的數量。
DEL key1 key2 key3
注意事項
- 使用 DEL 命令時,
如果鍵不存在,不會報錯,返回值仍然是被刪除的鍵的數量
。 - DEL 命令是一個 O(1) 操作,但在刪除大量鍵時,可能會影響性能。
call
在 Redis 中,CALL 命令并不是一個直接的命令,而是 Lua 腳本中用于調用 Redis 命令的函數
。
通過 redis.call,你可以在 Lua 腳本中執行 Redis 的原生命令。
redis.call('COMMAND_NAME', arg1, arg2, ...)
詳細說明
- KEYS: 在腳本中,KEYS 是一個數組,包含傳遞給腳本的所有鍵。
- ARGV: 你也可以使用 ARGV 數組來傳遞額外的參數。
示例
-- Lua 腳本:將兩個鍵的值相加并返回結果
local value1 = redis.call('GET', KEYS[1]) -- 獲取第一個鍵的值
local value2 = redis.call('GET', KEYS[2]) -- 獲取第二個鍵的值
return value1 + value2 -- 返回兩個值的和
總結
redis.call 是在 Lua 腳本中執行 Redis 命令的方式。
- 通過 KEYS 和 ARGV 數組,可以
靈活地傳遞鍵和參數
。 Lua 腳本的執行是原子性的,可以提高操作的效率
。
Jedis 接口
pom依賴
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version>
</dependency>
eval()
在 Jedis 中,eval 方法用于執行 Lua 腳本
。
通過這個方法,你可以在 Redis 服務器上運行 Lua 腳本,從而實現原子操作和復雜的邏輯處理。
String result = jedis.eval(String script, List<String> keys, List<String> args);
參數說明
- script: 要執行的 Lua 腳本,作為字符串傳入。
- keys: 需要在腳本中使用的鍵的列表。鍵的數量可以在腳本中通過 KEYS 表達式訪問。
- args: 傳遞給腳本的參數列表。參數的數量可以在腳本中通過 ARGV 表達式訪問。
注意事項
原子性: Lua 腳本在 Redis 中是原子執行的
,這意味著在腳本執行期間,其他命令不會干擾。性能: 使用 Lua 腳本可以減少網絡往返次數
,提高性能,尤其是在需要執行多個命令時。- 調試: Lua 腳本的調試相對較難,因此在編寫時要確保邏輯正確。
示例
import redis.clients.jedis.Jedis;public class RedisLuaExample {public static void main(String[] args) {Jedis jedis = new Jedis("localhost");// Lua 腳本:將兩個鍵的值相加并返回結果String script = "return redis.call('GET', KEYS[1]) + redis.call('GET', KEYS[2])";// 需要使用的鍵List<String> keys = Arrays.asList("key1", "key2");// 執行腳本String result = jedis.eval(script, keys, Collections.emptyList()).toString();System.out.println("Result: " + result);jedis.close();}
}
API(Springboot)
Redisson Pom依賴
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.10.1</version>
</dependency>
redis pom依賴
<!-- 引入redis依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring boot使用Redis的操作主要是通過RedisTemplate(或StringRedisTemplate )來實現
RedisTemplate和StringRedisTemplate的區別
RedisTemplate和StringRedisTemplate的區別:
- 兩者的關系是
StringRedisTemplate繼承RedisTemplate
。 - 兩者的
數據是不共通的
;也就是說StringRedisTemplate只能管理StringRedisTemplate里面的數據,RedisTemplate只能管理RedisTemplate中的數據。 - SDR默認采用的序列化策略有兩種,一種是String的序列化策略,一種是JDK的序列化策略。
- StringRedisTemplate默認采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的(StringRedisSerializer)。
- RedisTemplate默認采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。(JdkSerializationRedisSerializer)
總結:
-
StringRedisTemplate:當你的redis數據庫里面本來
存的是字符串數據或者你要存取的數據就是字符串類型數據的時候
。Redis當中的數據值是以數組形式顯示出來的時候,只能使用RedisTemplate才能獲取到里面的數據
-
RedisTemplate:但是如果你的
數據是復雜的對象類型
,而取出的時候又不想做任何的數據轉換,直接從Redis里面取出一個對象。Redis當中的
數據值是以可讀形式顯示出來的時候,只能使用StringRedisTemplate才能獲取到里面的數據
redisTemplate
// 將鎖狀態放入 Redis:setIfAbsent如果鍵不存在則新增,存在則不改變已經有的值。
redisTemplate.opsForValue().setIfAbsent("lockkey", "value");
// 設置鎖的過期時間
redisTemplate.expire("lockkey", 30000, TimeUnit.MILLISECONDS);//spring-data-redis 2.1 之后版本,加鎖的同時設置過期時間,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);// 刪除/解鎖
redisTemplate.delete("lockkey");// 獲取鎖
redisTemplate.opsForValue().get("lockkey");
StringRedisTemplate
@Autowired
private StringRedisTemplate stringRedisTemplate;//在設置值的同時指定過期時間, 時間單位 s
stringRedisTemplate.opsForValue().set("key","value",7200, TimeUnit.SECONDS);//刪除key對應的鍵值對
stringRedisTemplate.opsForValue().delete("key");//獲取對應key的value
stringRedisTemplate.opsForValue().get("key");
實現redis分布式鎖
單節點Redis的分布式鎖
如果你的項目中Redis是多機部署的,那么可以嘗試使用Redisson實現分布式鎖,
加鎖 實現
加鎖實際上就是在redis中,給Key鍵設置一個值,為避免死鎖,并給定一個過期時間
但是不建議分別使用加鎖和設置超時這兩個命令去設置值和過期時間,因為違背了原子性,也就是一旦鎖被創建,而沒有設置過期時間,則鎖會一直存在
Jedis 實現
參數說明
第一個為key,我們使用key來當鎖,因為key是唯一的
。
第二個為value,我們傳的是requestId
,可靠性保證,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了
,在解鎖的時候就可以有依據。requestId可以使用UUID.randomUUID().toString()方法生成
。
第三個為NX(鍵不存在時設置值),這個參數的意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;表示僅在鍵不存在時設置值
redis在set時,如果原先有值,SET 命令會返回 "OK"
第四個為PX(毫秒為單位),這個參數我們傳的是PX,意思是我們要給這個key加一個過期時間的設置,具體時間由第五個參數決定。
第五個為expireTime,與第四個參數相呼應,代表key的過期時間大小
代碼實現
public class RedisTool {private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";/*** 嘗試獲取分布式鎖** @param jedis Redis客戶端* @param lockKey 鎖* @param requestId 請求標識* @param expireTime 超期時間* @return 是否獲取成功*/public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);if (LOCK_SUCCESS.equals(result)) {return true;}return false;}
}
redis命令
-- 設置一個鍵 myLock,值為 12345,并且希望在 5000 毫秒后過期
SET myLock 12345 NX PX 5000
springBoot
但是在spring-data-redis 2.1 之后的版本,便可以直接設置過期時間了
spring-data-redis 2.1 前
spring-data-redis 2.1 前的版本
:在jedis當中是有這種原子操作的方法的,需要通過 RedisTemplate 的 execute 方法獲取到jedis里操作命令的對象設置
String result = template.execute(new RedisCallback<String>() {@Overridepublic String doInRedis(RedisConnection connection) throws DataAccessException {JedisCommands commands = (JedisCommands) connection.getNativeConnection();return commands.set(key, "鎖定的資源", "NX", "PX", 3000);}
});
spring-data-redis 2.1 后
spring-data-redis 2.1 之后的版本
//加鎖的同時設置過期時間,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);
解鎖實現
解鎖的過程就是將Key鍵刪除
。但也不能亂刪,不能說客戶端1的請求將客戶端2的鎖給刪除掉
為什么不直接刪除鎖?這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的
Jedis實現
代碼
public class RedisTool {private static final Long RELEASE_SUCCESS = 1L;/*** 釋放分布式鎖** @param jedis Redis客戶端* @param lockKey 鎖* @param requestId 請求標識* @return 是否釋放成功*/public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));if (RELEASE_SUCCESS.equals(result)) {return true;}return false;}
}
為什么不直接刪除
為什么不直接使用jedis.del()方法刪除鎖
?這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的
jedis.del(lockKey);
為什么不判斷是后在刪除
如果想判斷是不是這個客戶端的鎖,再去解鎖行不行
?不行
,如果調用jedis.del()方法的時候,這把鎖已經不屬于當前客戶端的時候會解除他人加的鎖。
比如客戶端A加鎖,一段時間之后客戶端A解鎖,在執行jedis.del()之前,鎖突然過期了,此時客戶端B嘗試加鎖成功,然后客戶端A再執行del()方法,則將客戶端B的鎖給解除了。
// 判斷加鎖與解鎖是不是同一個客戶端
if (requestId.equals(jedis.get(lockKey))) {// 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖jedis.del(lockKey);
}
Springboot
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {// 生成一個唯一的 UUID 作為鎖的標識String uuid = UUID.randomUUID().toString();// 獲取 Redis 的 ValueOperations 對象ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();// 嘗試在 Redis 中設置鎖,過期時間為 5 秒Boolean lock = ops.setIfAbsent("lock", uuid, 5, TimeUnit.SECONDS);// 如果成功獲取到鎖if (lock) {// 獲取分類數據Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();// 獲取當前鎖的值String lockValue = ops.get("lock");// Lua 腳本,用于安全釋放鎖String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +" return redis.call(\"del\",KEYS[1])\n" +"else\n" +" return 0\n" +"end";// 執行 Lua 腳本,釋放鎖stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);// 返回獲取的分類數據return categoriesDb;} else {// 如果未能獲取到鎖,線程休眠 100 毫秒try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace(); // 打印異常堆棧}// 遞歸調用,嘗試再次獲取鎖return getCatalogJsonDbWithRedisLock();}
}
多節點的redis分布式鎖
概述
如果負責儲存這個分布式鎖的Redisson節點宕機以后,而且這個鎖正好處于鎖住的狀態時,這個鎖會出現鎖死的狀態。
為了避免這種情況的發生,Redisson內部提供了一個監控鎖的看門狗,它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期
。
默認情況下,看門狗的檢查鎖的超時時間是30秒鐘
,也可以通過修改Config.lockWatchdogTimeout來另行指定。
另外Redisson還通過加鎖的方法提供了leaseTime的參數來指定加鎖的時間。超過這個時間后鎖便自動解開了
。
高并發場景下的問題
高并發場景下如下問題:
- 主從切換后,原從庫被推舉為主庫,當在其他請求加鎖的時候,連接的redis可能還沒有同步到第一次加的鎖,造成鎖失效。
- 主庫發生故障,加鎖完成,還未同步到從節點或者集群中其他節點的時候,當前節點掛掉,鎖就丟失了。
- 兩種情況導致出現的原因就是redis的數據同步是異步的
Redisson
相對于Jedis而言,Redisson強大很多。當然了,隨之而來的就是它的復雜性。它里面也實現了分布式鎖,而且包含多種類型的鎖
,
具體內容:分布式鎖和同步器
可重入鎖(Reentrant Lock)示例
下述已可重入鎖(Reentrant Lock)示例,獲取客戶端進行加解鎖操作如下
public static void main(String[] args) {// 創建 Redis 配置對象Config config = new Config();// 設置 Redis 服務器地址config.useSingleServer().setAddress("redis://127.0.0.1:6379");// 設置 Redis 服務器密碼config.useSingleServer().setPassword("redis1234");// 創建 Redisson 客戶端實例final RedissonClient client = Redisson.create(config); // 獲取名為 "lock1" 的分布式鎖RLock lock = client.getLock("lock1");try {// 嘗試獲取鎖,最多等待 10 秒,鎖定 30 秒if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {try {// 在此處執行需要加鎖的操作// 例如:處理共享資源或執行關鍵業務邏輯} finally {// 確保在操作完成后釋放鎖lock.unlock(); }} else {// 如果無法獲取鎖,輸出提示信息System.out.println("無法獲取鎖,操作被跳過");}} catch (InterruptedException e) {// 如果線程被中斷,恢復中斷狀態Thread.currentThread().interrupt(); // 輸出中斷信息System.out.println("線程被中斷");}
}
加鎖
調用lock方法,定位到lockInterruptibly。在這里,完成了加鎖的邏輯。
代碼
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 當前線程的 IDlong threadId = Thread.currentThread().getId();// 嘗試獲取鎖,返回值為鎖的剩余時間(TTL)Long ttl = tryAcquire(leaseTime, unit, threadId);// 如果 ttl 為空,則證明獲取鎖成功if (ttl == null) {return; // 成功獲取鎖,直接返回}// 如果獲取鎖失敗,則訂閱到對應這個鎖的 channelRFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future); // 同步訂閱,等待鎖的釋放try {while (true) {// 再次嘗試獲取鎖ttl = tryAcquire(leaseTime, unit, threadId);// ttl 為空,說明成功獲取鎖,跳出循環if (ttl == null) {break; // 成功獲取鎖,退出循環}// ttl 大于 0 則等待 ttl 時間后繼續嘗試獲取if (ttl >= 0) {// 嘗試在 ttl 時間內獲取鎖getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// ttl 小于 0,表示無限期等待getEntry(threadId).getLatch().acquire();}}} finally {// 取消對 channel 的訂閱,確保資源釋放unsubscribe(future, threadId);}// get(lockAsync(leaseTime, unit)); // 可能是異步獲取鎖的邏輯,注釋掉的部分
}// 兩種處理方式,一種是帶有過期時間的鎖,一種是不帶過期時間的鎖。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {// 如果帶有過期時間,則按照普通方式獲取鎖if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 先按照 30 秒的過期時間來執行獲取鎖的方法RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);// 如果還持有這個鎖,則開啟定時任務不斷刷新該鎖的過期時間ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return; // 如果獲取鎖失敗,直接返回}Long ttlRemaining = future.getNow();// 鎖已成功獲取if (ttlRemaining == null) {// 開啟定時任務以刷新鎖的過期時間scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture; // 返回異步獲取鎖的結果
}// 正執行獲取鎖的邏輯,它是一段LUA腳本代碼,hash數據結構。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// 將過期時間轉換為毫秒internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// 如果鎖不存在,則通過 hset 設置它的值,并設置過期時間"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果鎖已存在,并且鎖是當前線程,則通過 hincrby 給數值遞增 1"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果鎖已存在,但并非本線程,則返回過期時間 ttl"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
流程圖
解鎖
通過調用unlock方法來解鎖。
代碼如下
// 這是一個公開的異步解鎖方法,接受當前線程的 ID 作為參數,返回一個 RFuture<Void> 對象。
public RFuture<Void> unlockAsync(final long threadId) {// 創建一個 Promise 對象,用于異步操作的結果final RPromise<Void> result = new RedissonPromise<Void>();// 調用解鎖的內部異步方法RFuture<Boolean> future = unlockInnerAsync(threadId);// 添加監聽器以處理解鎖操作的結果future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {// 檢查解鎖操作是否成功if (!future.isSuccess()) {// 如果失敗,取消過期時間的續期任務cancelExpirationRenewal(threadId);// 將失敗原因傳遞給 Promiseresult.tryFailure(future.cause());return;}// 獲取解鎖操作的返回值Boolean opStatus = future.getNow();// 如果返回值為空,表示當前線程未持有鎖,拋出異常if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}// 解鎖成功,取消刷新過期時間的定時任務if (opStatus) {cancelExpirationRenewal(null);}// 將成功結果傳遞給 Promiseresult.trySuccess(null);}});// 返回 Promise 對象return result;
}// 執行 Redis 腳本:使用 evalWriteAsync 方法執行 Lua 腳本,進行原子操作。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,//如果鎖已經不存在, 發布鎖釋放的消息"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +//如果釋放鎖的線程和已存在鎖的線程不是同一個線程,返回null"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//通過hincrby遞減1的方式,釋放一次鎖//若剩余次數大于0 ,則刷新過期時間"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +//否則證明鎖已經釋放,刪除key并發布鎖釋放的消息"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}
流程圖
redlock
使用流程
redlock的使用流程大致如下:
- 客戶端獲取到當前的時間戳。
- 客戶端按順序向部署的N個Redis實例執行加鎖操作。在設定時間內,不管加鎖成功還是失敗,都會繼續向下一個實例申請加鎖操作。
- 若加鎖成功的實例個數>= (N/2) + 1,并且加鎖的總耗時要<鎖設定的過期時間,Redlock就判斷加鎖成功,反之就是加鎖失敗。
- 加鎖成功了,就繼續往下操作,比如操作MySQL資源;若加鎖失敗,則會向所有節點發起鎖釋放的操作請求。
設計規則
Redlock的設計規則就是:
客戶端要在所有實例上申請加鎖,只有保證大多數節點加鎖成功了才判定為加鎖成功
。加鎖的總耗時要 < 鎖設定的過期時間
。釋放鎖的時候,要向所有節點發起鎖釋放的請求,不管之前加鎖是否成功
( 為了確保只釋放自己的鎖,需要用前面提到的 Lua 腳本來代替直接使用 DEL 命令進行解鎖操作)
代碼
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 實現要點之允許加鎖失敗節點限制(N-(N/2+1))int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());// 實現要點之遍歷所有節點通過EVAL命令執行lua加鎖for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {// 對節點嘗試加鎖lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);} catch (RedisConnectionClosedException|RedisResponseTimeoutException e) {// 如果拋出這類異常,為了防止加鎖成功,但是響應失敗,需要解鎖unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {// 拋出異常表示獲取鎖失敗lockAcquired = false;}if (lockAcquired) {// 成功獲取鎖集合acquiredLocks.add(lock);} else {// 如果達到了允許加鎖失敗節點限制,那么break,即此次Redlock加鎖失敗if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;} }}return true;
}
RedLock 紅鎖 已經廢棄
RedLock 紅鎖:RedLock 會對集群的每個節點進行加鎖,如果大多數(N/2+1)加鎖成功了,則認為獲取鎖成功
。
- 這個過程中
可能會因為網絡問題,或節點超時的問題,影響加鎖的性能,故而在最新的 Redisson 版本中中已經正式宣布廢棄 RedLock
。
redisTemplate
為了解決多節點的上述問題,可以使用redisTemplate中的setIfAbsent方法
setIfAbsent方法是原子性的
單個 Redis 實例:在單個 Redis 實例中,setIfAbsent 是原子操作
,確保在鍵不存在時才會設置值。Redis 集群:在 Redis 集群中,setIfAbsent 仍然是原子操作
,但它只在同一個分片(slot)內有效。如果不同的節點(分片)之間存在競爭條件,可能會導致不一致的結果
可以在這個方法中,構造一個和可重用鎖差不多的代碼,及判斷當前線程是否為加鎖線程,去實現多節點先的分布式鎖