前期, 我們介紹了什么是分布式鎖及分布式鎖應用場景, 今天我們基于Redis方案來實現分布式鎖的應用。
1. 基于Redis分布式鎖方案介紹
基于Redis實現的分布式鎖是分布式系統中控制資源訪問的常用方案,利用Redis的原子操作和高性能特性實現跨進程的鎖機制。我們需根據業務特性合理設計,避免引入新的問題(如死鎖、鎖失效)。
基于Redis方案的分布式具有以下特點:
- 可重入性:同一線程可多次獲取同一把鎖,通過 ThreadLocal 計數。
- 鎖續期(看門狗機制):自動延長鎖的過期時間,防止業務未執行完鎖就過期。
- 公平 / 非公平鎖:支持通過構造函數指定是否公平鎖。
- 異常處理:包含完整的異常處理和資源釋放邏輯。
- 原子操作:能夠使用 Lua 腳本保證解鎖的原子性。
1.1 核心原理
-
原子性加鎖
通過Redis的原子命令SET key value NX PX timeout
實現:NX
(Not eXists):僅當key不存在時設置值,保證互斥性。PX timeout
:設置鎖的過期時間(毫秒),避免死鎖(如持有鎖的進程崩潰時自動釋放)。
-
唯一標識
鎖的value使用唯一ID(如UUID),確保鎖只能被持有者釋放,防止誤刪。 -
原子性解鎖
使用Lua腳本保證解鎖的原子性:if redis.call("GET", KEYS[1]) == ARGV[1] thenreturn redis.call("DEL", KEYS[1]) elsereturn 0 end
先驗證鎖的持有者,再刪除鎖,避免誤釋放其他進程持有的鎖。
1.2 優缺點
-
優點
- 高性能:Redis基于內存操作,加鎖和解鎖延遲極低。
- 高可用:通過Redis集群(如Sentinel、Cluster)避免單點故障。
- 靈活配置:支持設置鎖的過期時間、重試策略等參數。
-
缺點
- 主從復制延遲:主從架構中,主節點寫入鎖后未同步到從節點就崩潰,可能導致鎖丟失(Redlock算法可部分解決)。
- 過期時間難控制:若業務執行時間超過鎖的過期時間,可能導致多個進程同時持有鎖。
1.3 典型實現方式
-
簡單實現(原生Redis命令)
直接使用Redis客戶端(如Jedis、Lettuce)調用SET
和Lua腳本,適合輕量級場景。 -
Redisson框架
提供分布式鎖的高級抽象(如可重入鎖、公平鎖、讀寫鎖),內置看門狗機制自動續期鎖:// Redisson可重入鎖示例 RLock lock = redissonClient.getLock("myLock"); lock.lock(); // 自動續期,默認30秒 try {// 業務邏輯 } finally {lock.unlock(); }
-
Redlock算法
針對主從復制缺陷,在多個獨立的Redis節點上獲取鎖,多數節點成功時才認為加鎖成功,提升可靠性,但犧牲部分性能。
1.4 關鍵參數配置
- 鎖過期時間:需根據業務執行時間合理設置,避免過短導致鎖提前釋放,或過長導致資源長時間被占用。
- 重試策略:獲取鎖失敗時的重試次數和間隔,避免頻繁重試耗盡資源。
- 續期機制:通過看門狗自動延長鎖的過期時間,確保業務執行期間鎖不會過期。
1.5 適用場景
- 高并發場景:如秒殺、庫存扣減,利用Redis高性能快速響應。
- 異步任務:如定時任務去重執行,通過鎖避免多個節點重復處理。
- 緩存重建:防止緩存失效時多個請求同時重建緩存,造成緩存擊穿。
1.6 注意事項
- 避免鎖粒度過大:只在關鍵操作上加鎖,減少鎖持有時間。
- 異常處理:使用
try-finally
確保鎖最終被釋放。 - 監控與告警:監控鎖的持有時間、競爭情況,及時發現異常。
2. 實現代碼
以下是基于Redis方案的分布式鎖的重要實現代碼片段(僅供參考)。
使用時需要在Maven文件中添加 Jedis 組件依賴:
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.4.3</version>
</dependency>
RedisDistributedLock.java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** Redis分布式鎖實現,基于Redis的SETNX和Lua腳本機制* 支持可重入、鎖續期(看門狗機制)和公平鎖特性*/
public class RedisDistributedLock implements Lock {private static final String LOCK_SUCCESS = "OK";private static final Long RELEASE_SUCCESS = 1L;private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";// 鎖續期的間隔時間(毫秒),默認是鎖超時時間的1/3private static final long RENEWAL_INTERVAL_RATIO = 3;private final JedisPool jedisPool;private final String lockKey;private final String clientId; // 客戶端唯一標識private final long expireTime; // 鎖超時時間(毫秒)private final boolean isFair; // 是否公平鎖// 可重入計數private final ThreadLocal<AtomicInteger> reentrantCount = ThreadLocal.withInitial(() -> new AtomicInteger(0));// 看門狗線程private ThreadLocal<WatchDog> watchDog = new ThreadLocal<>();public RedisDistributedLock(JedisPool jedisPool, String lockKey, long expireTime, boolean isFair) {this.jedisPool = jedisPool;this.lockKey = lockKey;this.clientId = generateClientId();this.expireTime = expireTime;this.isFair = isFair;}@Overridepublic void lock() {if (!tryLock()) {// 等待并重試waitAndRetry();}}@Overridepublic void lockInterruptibly() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}if (!tryLock()) {waitAndHandleInterrupt();}}@Overridepublic boolean tryLock() {// 檢查是否已持有鎖(可重入)if (isHeldByCurrentThread()) {reentrantCount.get().incrementAndGet();return true;}try (Jedis jedis = jedisPool.getResource()) {// SET key value NX PX expireTimeString result = jedis.set(lockKey, clientId, new SetParams().nx().px(expireTime));if (LOCK_SUCCESS.equals(result)) {reentrantCount.get().set(1);startWatchDog(); // 啟動看門狗線程return true;}return false;}}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {long startTime = System.currentTimeMillis();long timeoutMillis = unit.toMillis(time);if (tryLock()) {return true;}while (System.currentTimeMillis() - startTime < timeoutMillis) {if (Thread.interrupted()) {throw new InterruptedException();}if (tryLock()) {return true;}Thread.sleep(100); // 避免CPU空轉}return false;}@Overridepublic void unlock() {if (!isHeldByCurrentThread()) {throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current thread");}// 可重入鎖計數減1int count = reentrantCount.get().decrementAndGet();if (count > 0) {return; // 仍持有鎖,不釋放}try {// 釋放鎖前停止看門狗stopWatchDog();// 使用Lua腳本保證原子性釋放鎖try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(clientId));if (!RELEASE_SUCCESS.equals(result)) {throw new IllegalMonitorStateException("Failed to release lock");}}} finally {// 清理ThreadLocalreentrantCount.remove();watchDog.remove();}}@Overridepublic Condition newCondition() {throw new UnsupportedOperationException("Conditions are not supported by this lock");}// 生成客戶端唯一標識private String generateClientId() {return Thread.currentThread().getName() + "-" + System.currentTimeMillis() + "-" + (int)(Math.random() * 10000);}// 判斷當前線程是否持有鎖private boolean isHeldByCurrentThread() {AtomicInteger count = reentrantCount.get();return count.get() > 0;}// 等待并重試獲取鎖(非公平)private void waitAndRetry() {while (true) {if (tryLock()) {return;}try {Thread.sleep(100); // 避免CPU空轉} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}}}// 等待并處理中斷private void waitAndHandleInterrupt() throws InterruptedException {while (true) {if (Thread.interrupted()) {throw new InterruptedException();}if (tryLock()) {return;}Thread.sleep(100); // 避免CPU空轉}}// 啟動看門狗線程,自動續期鎖private void startWatchDog() {WatchDog dog = new WatchDog(expireTime / RENEWAL_INTERVAL_RATIO);watchDog.set(dog);dog.start();}// 停止看門狗線程private void stopWatchDog() {WatchDog dog = watchDog.get();if (dog != null) {dog.interrupt();}}/*** 看門狗線程,負責自動續期鎖*/private class WatchDog extends Thread {private final long renewalInterval;private boolean running = true;public WatchDog(long renewalInterval) {this.renewalInterval = renewalInterval;setDaemon(true);setName("LockWatchDog-" + lockKey);}@Overridepublic void run() {while (running && !isInterrupted()) {try {Thread.sleep(renewalInterval);// 續期鎖try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else " +"return 0 " +"end";jedis.eval(script, Collections.singletonList(lockKey), List.of(clientId, String.valueOf(expireTime)));}} catch (InterruptedException e) {running = false;Thread.currentThread().interrupt();} catch (Exception e) {// 記錄異常但繼續運行e.printStackTrace();}}}}
}
RedisLockExample.java
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** Redis分布式鎖使用示例*/
public class RedisLockExample {private static final String REDIS_HOST = "localhost";private static final int REDIS_PORT = 6379;private static final String LOCK_KEY = "distributed:lock:example";private static final long LOCK_EXPIRE_TIME = 30000; // 30秒public static void main(String[] args) {// 創建Redis連接池JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(100);poolConfig.setMaxIdle(20);poolConfig.setMinIdle(5);poolConfig.setTestOnBorrow(true);JedisPool jedisPool = new JedisPool(poolConfig, REDIS_HOST, REDIS_PORT);// 創建分布式鎖實例RedisDistributedLock lock = new RedisDistributedLock(jedisPool, LOCK_KEY, LOCK_EXPIRE_TIME, false);// 模擬多線程競爭ExecutorService executor = Executors.newFixedThreadPool(5);for (int i = 0; i < 10; i++) {executor.submit(() -> {try {// 獲取鎖(帶超時)if (lock.tryLock(5, TimeUnit.SECONDS)) {try {System.out.println(Thread.currentThread().getName() + " 獲取到鎖");// 模擬業務操作Thread.sleep(2000);} finally {lock.unlock();System.out.println(Thread.currentThread().getName() + " 釋放鎖");}} else {System.out.println(Thread.currentThread().getName() + " 獲取鎖超時");}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}executor.shutdown();try {executor.awaitTermination(1, TimeUnit.MINUTES);} catch (InterruptedException e) {e.printStackTrace();}// 關閉Redis連接池jedisPool.close();}
}