Redis實現分布式鎖的原理
Redis分布式鎖基于其單線程執行命令的特性,通過原子操作實現多節點間的互斥訪問。下面從原理、實現、問題及優化四個方面詳細解析:
1.原子性與互斥性
Redis分布式鎖的核心是原子性操作:
-
獲取鎖:使用
SET key value NX EX timeout
命令NX
(Not eXists):僅當key不存在時設置成功EX timeout
:設置過期時間,防止死鎖- 原子性:Redis單線程執行命令,確保多客戶端并發請求時只有一個能成功
-
釋放鎖:先驗證鎖持有者再刪除
- 必須使用Lua腳本保證原子性,避免誤刪其他線程的鎖
-- 釋放鎖的Lua腳本
if redis.call('get', KEYS[1]) == ARGV[1] thenreturn redis.call('del', KEYS[1])
elsereturn 0
end
2.分布式鎖的實現步驟
1. 獲取鎖流程:
- 客戶端生成唯一標識(如UUID)作為鎖的值
- 執行
SET lock_key unique_id NX EX 10
(10秒過期) - 返回
OK
表示獲取鎖成功,否則失敗
2. 釋放鎖流程:
- 客戶端攜帶鎖的唯一標識調用Lua腳本
- 腳本先檢查鎖的值是否與傳入標識一致
- 一致則刪除鎖,返回1;不一致返回0
示例
1. 添加依賴
在pom.xml
中添加Spring Data Redis依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2. 配置Redis連接
在application.yml
中配置Redis服務器信息:
spring:redis:host: localhostport: 6379password: yourpassword # 如果有密碼timeout: 5000mslettuce:pool:max-active: 8max-wait: -1msmax-idle: 8min-idle: 0
3. 創建分布式鎖接口
定義鎖的基本操作:
public interface RedisLock {/*** 嘗試獲取鎖* @param lockKey 鎖的鍵* @param requestId 請求標識(用于釋放鎖時校驗)* @param expireTime 鎖的過期時間* @param timeUnit 時間單位* @return 是否成功獲取鎖*/boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit);/*** 釋放鎖* @param lockKey 鎖的鍵* @param requestId 請求標識* @return 是否成功釋放鎖*/boolean releaseLock(String lockKey, String requestId);
}
4. 實現分布式鎖(重點)
使用RedisTemplate
實現鎖操作,關鍵在于:
- 獲取鎖:使用
setIfAbsent
原子操作 - 釋放鎖:使用Lua腳本保證原子性
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;import java.util.Collections;
import java.util.concurrent.TimeUnit;@Component
public class RedisLockImpl implements RedisLock {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;// 釋放鎖的Lua腳本:先驗證鎖的持有者,再刪除鎖private static final DefaultRedisScript<Long> RELEASE_LOCK_SCRIPT;static {RELEASE_LOCK_SCRIPT = new DefaultRedisScript<>();RELEASE_LOCK_SCRIPT.setScriptText("if redis.call('get', KEYS[1]) == ARGV[1] then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end");RELEASE_LOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit) {// 核心方法:原子性地設置鎖和過期時間Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime, timeUnit);return result != null && result;}@Overridepublic boolean releaseLock(String lockKey, String requestId) {// 使用Lua腳本保證原子性Long result = redisTemplate.execute(RELEASE_LOCK_SCRIPT,Collections.singletonList(lockKey),requestId);return result != null && result == 1L;}
}
5. 使用分布式鎖
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.TimeUnit;@Service
public class OrderService {@Autowiredprivate RedisLock redisLock;public void createOrder(String orderId) {String lockKey = "order-lock:" + orderId;String requestId = UUID.randomUUID().toString();boolean locked = false;try {// 嘗試獲取鎖,設置過期時間為10秒locked = redisLock.tryLock(lockKey, requestId, 10, TimeUnit.SECONDS);if (locked) {// 獲得鎖成功,執行關鍵業務邏輯System.out.println("獲取鎖成功,開始處理訂單: " + orderId);// 模擬業務處理Thread.sleep(2000);} else {// 獲得鎖失敗,處理失敗邏輯System.out.println("獲取鎖失敗,稍后重試或執行其他策略");}} catch (Exception e) {e.printStackTrace();} finally {// 無論如何都嘗試釋放鎖,確保不會死鎖if (locked) {redisLock.releaseLock(lockKey, requestId);}}}
}
setIfAbsent
方法
RedisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit)
是實現分布式鎖的核心方法,它對應Redis的命令:
SET key value NX EX timeout
關鍵點:
-
原子性:該方法會原子性地完成三個操作:
- 檢查key是否存在
- 如果不存在,則設置key的值
- 同時設置key的過期時間
-
防止死鎖:
- 必須設置過期時間,確保即使持有鎖的進程崩潰,鎖也會自動釋放
- 過期時間不宜過短(避免業務未完成鎖就過期)或過長(影響性能)
-
唯一標識:
- value使用唯一的requestId(如UUID),用于標識鎖的持有者
- 釋放鎖時必須驗證requestId,防止誤刪其他線程的鎖
釋放鎖的原子性問題
釋放鎖時不能簡單地直接刪除key,必須先驗證鎖的持有者:
// 錯誤示例(非原子操作,有競態條件)
if (redis.get(lockKey).equals(requestId)) {redis.delete(lockKey);
}// 正確方式:使用Lua腳本保證原子性
Long result = redisTemplate.execute(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey), requestId);