基于Redis的分布式設備庫存服務設計與實現
概述
本文介紹一個基于Redis實現的分布式設備庫存服務方案,通過分布式鎖、重試機制和事務補償等關鍵技術,保證在并發場景下庫存操作的原子性和一致性。該方案適用于物聯網設備管理、分布式資源調度等場景。
代碼實現
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;// 模擬設備庫存服務
public class DeviceInventoryService {private static final Logger logger = LoggerFactory.getLogger(DeviceInventoryService.class);private final Map<String, Integer> inventoryMap = new HashMap<>();private static final int MAX_RETRIES = 3;private static final int LOCK_EXPIRE_TIME = 10; // 鎖的過期時間,單位:秒private final Jedis jedis;public DeviceInventoryService(Jedis jedis) {this.jedis = jedis;}// 初始化庫存public void initializeInventory(String deviceId, int quantity) {inventoryMap.put(deviceId, quantity);logger.info("設備 {} 初始化庫存為 {}", deviceId, quantity);}// 嘗試獲取分布式鎖private boolean tryLock(String lockKey) {SetParams setParams = SetParams.setParams().nx().ex(LOCK_EXPIRE_TIME);String result = jedis.set(lockKey, "locked", setParams);return "OK".equals(result);}// 釋放分布式鎖private void releaseLock(String lockKey) {jedis.del(lockKey);}// 定時更新庫存public boolean updateInventory(String deviceId, int updateQuantity) {String lockKey = "inventory_lock:" + deviceId;int retries = 0;//重試次數while (retries < MAX_RETRIES) {if (tryLock(lockKey)) {try {return doUpdateInventory(deviceId, updateQuantity);} catch (Exception e) {logger.error("設備 {} 庫存更新失敗,重試第 {} 次", deviceId, retries + 1, e);} finally {releaseLock(lockKey);}}retries++;try {Thread.sleep(100); // 等待一段時間后重試} catch (InterruptedException e) {Thread.currentThread().interrupt();}}logger.error("設備 {} 庫存更新失敗,達到最大重試次數", deviceId);return false;}// 實際執行庫存更新操作private boolean doUpdateInventory(String deviceId, int updateQuantity) {int oldQuantity = inventoryMap.getOrDefault(deviceId, 0);try {// 記錄操作日志logger.info("設備 {} 開始更新庫存,更新前庫存: {}", deviceId, oldQuantity);// 模擬更新操作int newQuantity = oldQuantity + updateQuantity;if (newQuantity < 0) {throw new IllegalArgumentException("庫存不能為負數");}inventoryMap.put(deviceId, newQuantity);logger.info("設備 {} 庫存更新成功,當前庫存: {}", deviceId, newQuantity);return true;} catch (Exception e) {logger.error("設備 {} 庫存更新失敗: {}", deviceId, e.getMessage());// 進行事務補償compensateInventory(deviceId, oldQuantity);return false;}}// 事務補償private void compensateInventory(String deviceId, int oldQuantity) {inventoryMap.put(deviceId, oldQuantity);logger.info("設備 {} 庫存已恢復到更新前的狀態,當前庫存: {}", deviceId, oldQuantity);}// 模擬定時任務public static void main(String[] args) {try (Jedis jedis = new Jedis("localhost", 6379)) {DeviceInventoryService service = new DeviceInventoryService(jedis);service.initializeInventory("device001", 10);// 模擬定時更新庫存service.updateInventory("device001", 5);service.updateInventory("device001", -20); // 模擬更新失敗}}}
核心設計
分布式鎖機制
private boolean tryLock(String lockKey) {SetParams setParams = SetParams.setParams().nx().ex(LOCK_EXPIRE_TIME);String result = jedis.set(lockKey, "locked", setParams);return "OK".equals(result);
}
- 使用Redis的set nx ex命令實現原子性加鎖
- 將鎖的顆粒度設置到了設備上(根據實際業務設置)
- 設置10秒過期時間,防止死鎖(根據實際業務設置過期時間)
重試機制
int retries = 0;//重試次數while (retries < MAX_RETRIES) {if (tryLock(lockKey)) {try {return doUpdateInventory(deviceId, updateQuantity);} catch (Exception e) {logger.error("設備 {} 庫存更新失敗,重試第 {} 次", deviceId, retries + 1, e);} finally {releaseLock(lockKey);}}retries++;try {Thread.sleep(100); // 等待一段時間后重試} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
- 最大重試次數三次(MAX_RETRIES)
- 如果沒有獲取到鎖則等待重試,超過重試次數則終止
補償機制
private void compensateInventory(String deviceId, int oldQuantity) {inventoryMap.put(deviceId, oldQuantity);logger.info("設備 {} 庫存已恢復到更新前的狀態,當前庫存: {}", deviceId, oldQuantity);
}
- 在doUpdateInventory捕獲異常后自動回滾
- 基于版本號/快照的恢復機制
- 保證最終數據一致性
關鍵代碼解析
public boolean updateInventory(String deviceId, int updateQuantity) {String lockKey = "inventory_lock:" + deviceId;int retries = 0;while (retries < MAX_RETRIES) {if (tryLock(lockKey)) {try {return doUpdateInventory(deviceId, updateQuantity);} finally {releaseLock(lockKey);}}// ...重試邏輯...}return false;
}
- 獲取設備級別的分布式鎖
- 執行庫存更新操作
- 無論成功失敗都釋放鎖(finally保證)
- 達到重試上限后返回失敗
核心操作方法
private boolean doUpdateInventory(String deviceId, int updateQuantity) {int oldQuantity = inventoryMap.getOrDefault(deviceId, 0);int newQuantity = oldQuantity + updateQuantity;if (newQuantity < 0) {throw new IllegalArgumentException("庫存不能為負數");}inventoryMap.put(deviceId, newQuantity);return true;
}
- 前置校驗:庫存不能為負數
- 原子性操作:庫存增減計算
- 事務性更新:先計算后寫入
使用示例
初始化與測試
public static void main(String[] args) {try (Jedis jedis = new Jedis("localhost", 6379)) {DeviceInventoryService service = new DeviceInventoryService(jedis);service.initializeInventory("device001", 10);service.updateInventory("device001", 5); // 成功:庫存15service.updateInventory("device001", -20); // 失敗:觸發補償}
}
預期輸出
INFO - 設備 device001 初始化庫存為 10
INFO - 設備 device001 開始更新庫存,更新前庫存: 10
INFO - 設備 device001 庫存更新成功,當前庫存: 15
INFO - 設備 device001 開始更新庫存,更新前庫存: 15
ERROR - 設備 device001 庫存更新失敗: 庫存不能為負數
INFO - 設備 device001 庫存已恢復到更新前的狀態,當前庫存: 15
擴展思考
優化方向
- Redis集群支持:當前為單節點Redis,可升級為Redis Cluster
- 鎖續期機制:添加看門狗線程自動續期鎖
- 庫存持久化:結合數據庫實現庫存持久化存儲
- 監控體系:添加Prometheus監控指標
注意事項
- 網絡分區場景下可能出現鎖狀態不一致
- 庫存更新操作應保持冪等性
- Redis連接需要配置合理的超時參數
- 生產環境建議使用Lua腳本保證原子性
通過本文實現的庫存服務,在保證線程安全的基礎上,能夠有效應對分布式環境下的資源競爭問題。實際部署時建議結合具體業務場景進行壓力測試和參數調優。