目錄
概述
Redis分布式鎖的實現方式
1. 基于SETNX命令(String類型)
2. 使用SET命令的NX和EX參數(推薦方式)
3. 基于Lua腳本實現復雜邏輯
4. RedLock算法(多節點Redis實現)
Redisson的分布式鎖
Redisson的鎖類型
Redisson可重入鎖的實現
Redis其他實現分布式鎖的方式
1. 基于Sorted Set實現公平鎖
2. 基于List實現分布式鎖
總結
實際應用場景
場景一:庫存超賣控制
業務場景
分布式鎖解決方案
關鍵實現細節
實際開發注意事項
場景二:訂單防重復提交
業務場景
分布式鎖解決方案
關鍵實現細節
實際開發注意事項
場景三:促銷活動限流與防超賣
業務場景
分布式鎖解決方案
關鍵實現細節
實際開發注意事項
分布式鎖在電商中的最佳實踐
概述
分布式鎖是控制分布式系統之間同步訪問共享資源的一種方式。下面詳細介紹Redis分布式鎖的實現方式以及Redisson的鎖類型。
Redis分布式鎖的實現方式
1. 基于SETNX命令(String類型)
最基本的實現方式是使用Redis的SETNX
(SET if Not eXists)命令:
# 使用Python和Redis-py庫實現簡單分布式鎖
import redis
import timedef acquire_lock(redis_client, lock_name, acquire_timeout=10, lock_timeout=10):end_time = time.time() + acquire_timeoutlock_value = str(time.time() + lock_timeout) # 鎖的過期時間while time.time() < end_time:# 使用SETNX嘗試獲取鎖if redis_client.setnx(lock_name, lock_value):# 設置鎖的過期時間,防止死鎖redis_client.expire(lock_name, lock_timeout)return True# 檢查鎖是否過期current_value = redis_client.get(lock_name)if current_value and float(current_value) < time.time():# 鎖已過期,嘗試競爭并獲取鎖old_value = redis_client.getset(lock_name, lock_value)if old_value == current_value:redis_client.expire(lock_name, lock_timeout)return Truetime.sleep(0.1) # 短暫休眠避免頻繁重試return Falsedef release_lock(redis_client, lock_name):redis_client.delete(lock_name)
這種方式的問題是:
- 鎖的釋放缺乏原子性
- 不支持可重入
- 沒有鎖失效機制(需手動設置過期時間)
2. 使用SET命令的NX和EX參數(推薦方式)
Redis 2.6.12版本后,SET
命令支持原子化操作:
# 使用SET命令的NX和EX參數實現分布式鎖
def acquire_lock(redis_client, lock_name, acquire_timeout=10, lock_timeout=10):end_time = time.time() + acquire_timeoutwhile time.time() < end_time:# 使用SET命令原子化操作:NX表示只有鍵不存在時才設置,EX設置過期時間result = redis_client.set(lock_name, "locked", nx=True, ex=lock_timeout)if result:return Truetime.sleep(0.1)return False
這種方式解決了原子性問題,但仍不支持可重入。
3. 基于Lua腳本實現復雜邏輯
使用Lua腳本可以實現更復雜的原子操作,例如鎖的釋放:
# 使用Lua腳本確保鎖釋放的原子性
RELEASE_SCRIPT = """
if redis.call("get", KEYS[1]) == ARGV[1] thenreturn redis.call("del", KEYS[1])
elsereturn 0
end
"""def release_lock(redis_client, lock_name, lock_value):return redis_client.eval(RELEASE_SCRIPT, 1, lock_name, lock_value)
4. RedLock算法(多節點Redis實現)
為了提高可靠性,Redis作者提出了RedLock算法,基于多個Redis節點:
# RedLock算法的簡化實現
def redlock_acquire(redis_clients, lock_name, acquire_timeout=10, lock_timeout=10):start_time = time.time()lock_value = str(uuid.uuid4()) # 唯一標識鎖acquired_locks = 0for client in redis_clients:if time.time() > start_time + acquire_timeout:breakif client.set(lock_name, lock_value, nx=True, px=lock_timeout):acquired_locks += 1# 需要獲得多數節點的鎖才算成功if acquired_locks >= (len(redis_clients) // 2 + 1):return lock_value# 獲取鎖失敗,釋放已獲得的鎖for client in redis_clients:client.eval(RELEASE_SCRIPT, 1, lock_name, lock_value)return False
Redisson的分布式鎖
Redisson是一個基于Redis的Java駐內存數據網格(In-Memory Data Grid),它提供了分布式鎖的實現。
Redisson的鎖類型
Redisson的分布式鎖是可重入鎖,但從鎖的性質上來說,它屬于悲觀鎖。因為:
- 它在獲取鎖后會阻塞其他線程的訪問
- 遵循"先獲取鎖再操作"的悲觀策略
Redisson可重入鎖的實現
Redisson的可重入鎖通過Hash結構實現:
// Redisson可重入鎖的內部實現(簡化版)
public class RedissonLock {// 鎖的名稱private String lockName;// 當前線程IDprivate String threadId;// 重入次數private int reentrantCount = 0;// 嘗試獲取鎖public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();// 使用Hash結構存儲鎖信息// HSET lockName threadId 1Long currentLockCount = redisClient.hset(lockName, threadId, 1);if (currentLockCount == 0) {// 鎖已存在,檢查是否是當前線程持有String currentValueStr = redisClient.hget(lockName, threadId);if (currentValueStr != null) {int count = Integer.parseInt(currentValueStr);// 鎖重入redisClient.hset(lockName, threadId, count + 1);reentrantCount = count + 1;return true;}return false;}// 設置鎖的過期時間redisClient.expire(lockName, leaseTime, unit);reentrantCount = 1;return true;}// 釋放鎖public void unlock() {String threadId = Thread.currentThread().getId();// 獲取當前鎖的重入次數String currentValueStr = redisClient.hget(lockName, threadId);if (currentValueStr == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ threadId + " thread-id: " + Thread.currentThread().getId());}int count = Integer.parseInt(currentValueStr);if (count == 1) {// 完全釋放鎖redisClient.hdel(lockName, threadId);return;}// 減少重入次數redisClient.hset(lockName, threadId, count - 1);reentrantCount = count - 1;}
}
Redis其他實現分布式鎖的方式
1. 基于Sorted Set實現公平鎖
# 使用Sorted Set實現公平鎖
def acquire_fair_lock(redis_client, lock_name, client_id, acquire_timeout=10):end_time = time.time() + acquire_timeoutwhile time.time() < end_time:# 使用當前時間戳作為分數timestamp = time.time()# ZADD命令添加成員,NX確保只有不存在時才添加result = redis_client.zadd(lock_name, {client_id: timestamp}, nx=True)if result:# 獲取鎖成功return True# 檢查自己是否是隊列中的第一個first_client = redis_client.zrange(lock_name, 0, 0)if first_client and first_client[0] == client_id:return Truetime.sleep(0.01)return False
2. 基于List實現分布式鎖
# 使用List實現分布式鎖
def acquire_lock_with_list(redis_client, lock_name, acquire_timeout=10):end_time = time.time() + acquire_timeoutwhile time.time() < end_time:# LPUSH返回列表的長度result = redis_client.lpush(lock_name, "locked")if result == 1:# 設置過期時間redis_client.expire(lock_name, 10)return Truetime.sleep(0.1)return False
總結
Redis實現分布式鎖的核心思想是利用其原子操作和單線程特性。不同的實現方式適用于不同的場景:
- 簡單場景:使用SET命令的NX和EX參數
- 復雜場景:使用Redisson等成熟框架
- 高可靠性場景:使用RedLock算法
Redisson的分布式鎖屬于悲觀鎖范疇,同時支持可重入特性,是企業級應用中常用的分布式鎖實現方案。在電商平臺中,分布式鎖是保障數據一致性和業務正確性的關鍵技術。以下是三個典型場景及實際開發中的使用方法:
實際應用場景
場景一:庫存超賣控制
業務場景
用戶下單時需要扣減庫存,若多個用戶同時購買同一商品,可能導致庫存超賣(如庫存1件,卻被10個用戶同時下單成功)。
分布式鎖解決方案
# 使用Redis分布式鎖防止庫存超賣
def deduct_stock(product_id, quantity):lock_name = f"stock_lock:{product_id}"with redisson_client.getLock(lock_name): # 獲取分布式鎖# 查詢當前庫存current_stock = redis_client.get(f"stock:{product_id}")if current_stock >= quantity:# 扣減庫存redis_client.decr(f"stock:{product_id}", quantity)return Truereturn False
關鍵實現細節
- 鎖粒度:使用商品ID作為鎖名,確保同一商品的庫存操作互斥
- 原子性:庫存查詢和扣減操作必須在鎖內完成
- 超時設置:設置合理的鎖超時時間(如30秒),防止死鎖
實際開發注意事項
- 使用Redisson的可重入鎖,避免同一線程重復獲取鎖時阻塞
- 結合Lua腳本進一步優化庫存扣減邏輯,確保原子性
- 庫存預扣減機制:先扣減緩存庫存,異步同步到數據庫
場景二:訂單防重復提交
業務場景
用戶點擊提交訂單按鈕后,由于網絡延遲等原因可能重復提交,導致創建多個相同訂單。
分布式鎖解決方案
// 使用Redis分布式鎖防止訂單重復提交
public Response createOrder(OrderRequest request) {String orderToken = request.getOrderToken(); // 前端生成的唯一標識RLock lock = redissonClient.getLock("order:" + orderToken);try {// 嘗試獲取鎖,1秒后自動釋放boolean isLocked = lock.tryLock(100, 1000, TimeUnit.MILLISECONDS);if (!isLocked) {return Response.failure("請勿重復提交訂單");}// 處理訂單創建邏輯Order order = orderService.createOrder(request);return Response.success(order);} catch (InterruptedException e) {Thread.currentThread().interrupt();return Response.failure("系統繁忙,請稍后再試");} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}
}
關鍵實現細節
- 唯一標識:前端生成唯一訂單Token(如UUID),隨請求一起傳遞
- 短超時:設置較短的鎖超時時間(如1秒),防止用戶正常操作被長時間阻塞
- 冪等性:結合數據庫唯一索引,確保訂單號唯一性
實際開發注意事項
- 前端防抖處理:按鈕點擊后禁用,防止用戶手動重復點擊
- 鎖的粒度控制:按訂單Token加鎖,而非全局鎖
- 異常處理:確保鎖在異常情況下仍能釋放
場景三:促銷活動限流與防超賣
業務場景
限時秒殺、優惠券領取等促銷活動中,大量并發請求可能導致系統崩潰或資源超發。
分布式鎖解決方案
# 使用Redis分布式鎖和計數器實現秒殺限流
def seckill(product_id, user_id):lock_name = f"seckill_lock:{product_id}"with redisson_client.getLock(lock_name):# 檢查活動是否結束if not is_activity_active(product_id):return "活動已結束"# 檢查用戶是否已參與if redis_client.sismember(f"seckill_users:{product_id}", user_id):return "每個用戶限參與一次"# 檢查庫存stock = redis_client.get(f"seckill_stock:{product_id}")if stock <= 0:return "已售罄"# 扣減庫存redis_client.decr(f"seckill_stock:{product_id}")# 記錄用戶參與redis_client.sadd(f"seckill_users:{product_id}", user_id)return "秒殺成功"
關鍵實現細節
- 雙重檢查:先檢查庫存再獲取鎖,減少鎖競爭
- 用戶限制:使用Set記錄已參與用戶,防止重復參與
- 庫存預熱:活動開始前將庫存加載到Redis
- 異步處理:訂單創建等耗時操作放入MQ異步處理
實際開發注意事項
- 熱點商品處理:對熱門商品采用多節點RedLock或分段鎖
- 熔斷機制:當并發過高時自動拒絕請求,保護系統
- 降級策略:庫存為0時直接返回,不再加鎖
分布式鎖在電商中的最佳實踐
- 合理設置鎖超時時間:根據業務處理時間設置合理的超時值,避免死鎖
- 鎖粒度控制:盡量使用細粒度鎖(如按商品ID、訂單ID),避免全局鎖
- 異常處理:使用try-finally確保鎖釋放,或依賴Redisson的看門狗機制
- 性能優化:
-
- 減少鎖內操作時間
- 使用讀寫鎖分離讀操作
- 采用分段鎖處理熱點數據
- 監控與報警:監控鎖的持有時間、等待隊列長度等指標,及時發現異常
分布式鎖是電商系統的重要組成部分,但需結合業務場景選擇合適的實現方式,并注意性能與可靠性的平衡。