分布式鎖的進階實現與優化方案
作為Java高級開發工程師,我將為您提供更完善的Redis分布式鎖實現方案,包含更多生產級考量。
1. 生產級Redis分布式鎖實現
1.1 完整實現類(支持可重入、自動續約)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class RedisDistributedLock {private final JedisPool jedisPool;private final ScheduledExecutorService scheduler;// 本地存儲鎖的持有計數(用于可重入)private final ThreadLocal<Map<String, LockEntry>> lockHoldCounts = ThreadLocal.withInitial(HashMap::new);// 鎖續約的Future集合private final ConcurrentMap<String, ScheduledFuture<?>> renewalFutures = new ConcurrentHashMap<>();private static class LockEntry {final String requestId;final AtomicInteger holdCount;LockEntry(String requestId) {this.requestId = requestId;this.holdCount = new AtomicInteger(1);}}public RedisDistributedLock(JedisPool jedisPool) {this.jedisPool = jedisPool;this.scheduler = Executors.newScheduledThreadPool(4);}public boolean tryLock(String lockKey, String requestId, int expireTime) {return tryLock(lockKey, requestId, expireTime, 0, 0);}public boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime, long retryInterval) {long startTime = System.currentTimeMillis();try {// 檢查是否已經持有鎖(可重入)LockEntry entry = lockHoldCounts.get().get(lockKey);if (entry != null && entry.requestId.equals(requestId)) {entry.holdCount.incrementAndGet();return true;}// 嘗試獲取鎖while (true) {if (acquireLock(lockKey, requestId, expireTime)) {// 獲取成功,記錄持有信息lockHoldCounts.get().put(lockKey, new LockEntry(requestId));// 啟動自動續約scheduleRenewal(lockKey, requestId, expireTime);return true;}// 檢查是否超時if (maxWaitTime > 0 && System.currentTimeMillis() - startTime > maxWaitTime) {return false;}// 等待重試if (retryInterval > 0) {try {Thread.sleep(retryInterval);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}}} catch (Exception e) {// 異常處理return false;}}private boolean acquireLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {SetParams params = SetParams.setParams().nx().px(expireTime);return "OK".equals(jedis.set(lockKey, requestId, params));}}private void scheduleRenewal(String lockKey, String requestId, int expireTime) {// 計算續約間隔(通常是過期時間的1/3)long renewalInterval = expireTime * 2 / 3;ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {try {renewLock(lockKey, requestId, expireTime);} catch (Exception e) {// 續約失敗,取消任務renewalFutures.remove(lockKey);}}, renewalInterval, renewalInterval, TimeUnit.MILLISECONDS);renewalFutures.put(lockKey, future);}public boolean renewLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Arrays.asList(requestId, String.valueOf(expireTime)));return "OK".equals(result);}}public boolean releaseLock(String lockKey, String requestId) {try {LockEntry entry = lockHoldCounts.get().get(lockKey);if (entry == null || !entry.requestId.equals(requestId)) {return false;}// 減少持有計數if (entry.holdCount.decrementAndGet() > 0) {return true;}// 完全釋放鎖lockHoldCounts.get().remove(lockKey);// 取消續約任務ScheduledFuture<?> future = renewalFutures.remove(lockKey);if (future != null) {future.cancel(false);}// 釋放Redis鎖try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));return Long.valueOf(1L).equals(result);}} catch (Exception e) {return false;}}public void shutdown() {scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}}
}
1.2 鎖工廠模式(支持多種鎖類型)
public interface DistributedLock {boolean tryLock(String lockKey, String requestId, int expireTime);boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime, long retryInterval);boolean releaseLock(String lockKey, String requestId);
}public class RedisLockFactory {private final JedisPool jedisPool;public RedisLockFactory(JedisPool jedisPool) {this.jedisPool = jedisPool;}public DistributedLock createSimpleLock() {return new SimpleRedisLock(jedisPool);}public DistributedLock createReentrantLock() {return new ReentrantRedisLock(jedisPool);}public DistributedLock createReadWriteLock() {return new RedisReadWriteLock(jedisPool);}private static class SimpleRedisLock implements DistributedLock {// 簡單實現(同前面基礎實現)}private static class ReentrantRedisLock extends RedisDistributedLock {// 可重入實現(同前面完整實現)}private static class RedisReadWriteLock implements DistributedLock {// 讀寫鎖實現}
}
2. 高級特性實現
2.1 讀寫鎖實現
public class RedisReadWriteLock {private final JedisPool jedisPool;private static final String READ_LOCK_PREFIX = "READ_LOCK:";private static final String WRITE_LOCK_PREFIX = "WRITE_LOCK:";public RedisReadWriteLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}public boolean tryReadLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {// 檢查是否有寫鎖if (jedis.exists(WRITE_LOCK_PREFIX + lockKey)) {return false;}// 獲取讀鎖String readLockKey = READ_LOCK_PREFIX + lockKey;Long count = jedis.incr(readLockKey);if (count == 1L) {// 第一次獲取讀鎖,設置過期時間jedis.pexpire(readLockKey, expireTime);}return true;}}public boolean tryWriteLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {// 檢查是否有讀鎖if (jedis.exists(READ_LOCK_PREFIX + lockKey)) {return false;}// 獲取寫鎖SetParams params = SetParams.setParams().nx().px(expireTime);return "OK".equals(jedis.set(WRITE_LOCK_PREFIX + lockKey, requestId, params));}}// 釋放方法類似...
}
2.2 公平鎖實現(基于Redis列表)
public class RedisFairLock {private final JedisPool jedisPool;private static final String QUEUE_PREFIX = "LOCK_QUEUE:";private static final String LOCK_PREFIX = "LOCK:";public RedisFairLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}public boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime) {long startTime = System.currentTimeMillis();String queueKey = QUEUE_PREFIX + lockKey;String lockRealKey = LOCK_PREFIX + lockKey;try (Jedis jedis = jedisPool.getResource()) {// 加入等待隊列jedis.rpush(queueKey, requestId);try {while (true) {// 檢查是否輪到自己String firstRequestId = jedis.lindex(queueKey, 0);if (requestId.equals(firstRequestId)) {// 嘗試獲取鎖SetParams params = SetParams.setParams().nx().px(expireTime);if ("OK".equals(jedis.set(lockRealKey, requestId, params))) {return true;}}// 檢查超時if (System.currentTimeMillis() - startTime > maxWaitTime) {// 從隊列中移除自己jedis.lrem(queueKey, 0, requestId);return false;}// 短暫等待Thread.sleep(100);}} finally {// 確保最終從隊列中移除(防止異常情況)jedis.lrem(queueKey, 0, requestId);}} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}public void releaseLock(String lockKey, String requestId) {try (Jedis jedis = jedisPool.getResource()) {String lockRealKey = LOCK_PREFIX + lockKey;String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";jedis.eval(script, Collections.singletonList(lockRealKey), Collections.singletonList(requestId));}}
}
3. 生產環境最佳實踐
3.1 配置建議
-
鎖過期時間:
- 根據業務操作的最長時間設置,通常為業務平均耗時的3倍
- 例如:業務平均耗時200ms,可設置鎖過期時間為600ms
-
續約間隔:
- 設置為過期時間的1/3到1/2
- 例如:過期時間600ms,續約間隔200-300ms
-
重試策略:
- 初始重試間隔50-100ms
- 可考慮指數退避策略
3.2 監控與告警
public class LockMonitor {private final JedisPool jedisPool;private final MeterRegistry meterRegistry; // 假設使用Micrometerpublic LockMonitor(JedisPool jedisPool, MeterRegistry meterRegistry) {this.jedisPool = jedisPool;this.meterRegistry = meterRegistry;}public void monitorLockStats() {// 監控鎖獲取成功率Timer lockAcquireTimer = Timer.builder("redis.lock.acquire.time").description("Time taken to acquire redis lock").register(meterRegistry);// 監控鎖等待時間DistributionSummary waitTimeSummary = DistributionSummary.builder("redis.lock.wait.time").description("Time spent waiting for redis lock").register(meterRegistry);// 監控鎖競爭情況Gauge.builder("redis.lock.queue.size", () -> {try (Jedis jedis = jedisPool.getResource()) {return jedis.llen("LOCK_QUEUE:important_lock");}}).description("Number of clients waiting for lock").register(meterRegistry);}
}
3.3 異常處理策略
- Redis不可用時的降級策略:
- 本地降級鎖(僅適用于單機或可以接受短暫不一致的場景)
- 快速失敗,避免系統雪崩
public class DegradableRedisLock implements DistributedLock {private final DistributedLock redisLock;private final ReentrantLock localLock = new ReentrantLock();private final CircuitBreaker circuitBreaker;public DegradableRedisLock(DistributedLock redisLock) {this.redisLock = redisLock;this.circuitBreaker = CircuitBreaker.ofDefaults("redisLock");}@Overridepublic boolean tryLock(String lockKey, String requestId, int expireTime) {return CircuitBreaker.decorateSupplier(circuitBreaker, () -> {try {return redisLock.tryLock(lockKey, requestId, expireTime);} catch (Exception e) {// Redis不可用,降級到本地鎖return localLock.tryLock();}}).get();}// 其他方法實現類似...
}
4. 測試方案
4.1 單元測試
public class RedisDistributedLockTest {private RedisDistributedLock lock;private JedisPool jedisPool;@BeforeEachvoid setUp() {jedisPool = new JedisPool("localhost");lock = new RedisDistributedLock(jedisPool);}@Testvoid testLockAndUnlock() {String lockKey = "test_lock";String requestId = UUID.randomUUID().toString();assertTrue(lock.tryLock(lockKey, requestId, 10000));assertTrue(lock.releaseLock(lockKey, requestId));}@Testvoid testReentrantLock() {String lockKey = "test_reentrant_lock";String requestId = UUID.randomUUID().toString();assertTrue(lock.tryLock(lockKey, requestId, 10000));assertTrue(lock.tryLock(lockKey, requestId, 10000)); // 可重入assertTrue(lock.releaseLock(lockKey, requestId));assertTrue(lock.releaseLock(lockKey, requestId)); // 需要釋放兩次}// 更多測試用例...
}
4.2 并發測試
@Test
void testConcurrentLock() throws InterruptedException {String lockKey = "concurrent_test_lock";int threadCount = 10;CountDownLatch latch = new CountDownLatch(threadCount);AtomicInteger successCount = new AtomicInteger();for (int i = 0; i < threadCount; i++) {new Thread(() -> {String requestId = UUID.randomUUID().toString();if (lock.tryLock(lockKey, requestId, 1000, 5000, 100)) {try {successCount.incrementAndGet();Thread.sleep(100); // 模擬業務處理} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {lock.releaseLock(lockKey, requestId);}}latch.countDown();}).start();}latch.await();assertEquals(1, successCount.get()); // 確保只有一個線程獲取到鎖
}
5. 性能優化建議
-
連接池優化:
- 合理配置Jedis連接池大小
- 使用try-with-resources確保連接釋放
-
Lua腳本優化:
- 預加載常用Lua腳本
- 減少腳本復雜度
-
批量操作:
- 對于RedLock等多節點場景,考慮使用管道(pipeline)
-
本地緩存:
- 對于頻繁使用的鎖信息,可考慮本地緩存
6. 替代方案對比
方案 | 優點 | 缺點 | 適用場景 |
---|---|---|---|
Redis單節點 | 實現簡單,性能高 | 單點故障,可靠性較低 | 對可靠性要求不高的場景 |
Redis集群+RedLock | 可靠性較高 | 實現復雜,性能較低 | 對可靠性要求高的場景 |
Zookeeper | 可靠性高,原生支持臨時節點 | 性能較低,依賴Zookeeper | 強一致性要求的場景 |
數據庫實現 | 無需額外組件 | 性能差,容易成為瓶頸 | 簡單場景,并發量低 |
這個進階方案提供了生產環境所需的完整功能,包括可重入鎖、讀寫鎖、公平鎖等高級特性,以及監控、降級等生產級考量。您可以根據實際項目需求選擇合適的實現方式。