Hello,大家好,我是灰小猿! 在分布式系統中,由于各個服務之間獨立部署,各個服務之間依靠遠程調用完成通信,再加上面對用戶重復點擊時的重復請求等情況,所以如何保證消息消費的冪等性是在分布式或微服務項目中必須要考慮的問題。
常見的保證消息冪等性的策略有以下幾種,根據具體的使用場景選用不同的冪等策略。
1、數據庫唯一索引
這種策略主要通過數據庫的唯一索引約束來保證消息的冪等性主要是用于數據插入的場景,防止數據重復插入,
適用場景:數據強一致性的場景,訂單創建防重,用戶注冊時手機號、郵箱號的唯一性校驗等,如防止用戶重復提交訂單,在訂單表中設置一個唯一標識的字段,如order表的Business_Key(業務ID)字段,當用戶提交重復的訂單時,這些重復的訂單所對應的Business_Key是相同的,此時插入數據數據庫會報索引重復DataIntegrityViolationException
異常,從而避免數據的重復插入。
對于這個Business_Key,可以使用用戶ID+商品ID+下單時間來生成,這個下單時間可能由于用戶點擊的先后順序有所不同,所以可以對時間進行處理,如五分鐘之內使用同一個時間標識,則可以使用下單時間戳除以300000(即5分鐘=300000毫秒),這樣可以有效保證同一業務訂單在一段時間內只會下單一次。
我們以一個商品訂單表為例,舉例數據表結構如下:
CREATE TABLE payment_order (order_id VARCHAR(32) PRIMARY KEY,business_key VARCHAR(64) UNIQUE NOT NULL,user_id BIGINT NOT NULL,amount DECIMAL(10,2),status VARCHAR(20),create_time DATETIME
);
給訂單表的business_key建立唯一索引
ALTER TABLE payment_order ADD UNIQUE INDEX uniq_business_key (business_key);
之后具體的業務實現流程大概如下:
1、用戶發起支付請求
2、生成訂單業務ID(business_key)
3、判斷存在業務ID相同的訂單
-
返回已有訂單
-
生成新的訂單
4、拉取第三方支付接口
生成唯一業務標識的方法如下:
public class BusinessKeyGenerator {// 時間窗口:5分鐘(300000毫秒)private static final long TIME_WINDOW = 300000;public static String generateKey(Long userId, String packageId) {long timeSlot = System.currentTimeMillis() / TIME_WINDOW;String rawKey = userId + ":" + packageId + ":" + timeSlot;return DigestUtils.md5Hex(rawKey);}
}
防止訂單重復創建的冪等性設計
1、通過先查詢訂單是否存在的方式插入
@Service
public class OrderService {@Autowiredprivate OrderRepository orderRepository;@Transactionalpublic Order createOrder(Long userId, String packageId) {String businessKey = BusinessKeyGenerator.generateKey(userId, packageId);// 檢查是否存在未支付的相同業務訂單Order existingOrder = orderRepository.findPendingByBusinessKey(businessKey);if (existingOrder != null) {return existingOrder;}try {// 創建新訂單Order newOrder = new Order();newOrder.setOrderId(generateOrderId()); // 生成唯一訂單號newOrder.setBusinessKey(businessKey);newOrder.setUserId(userId);newOrder.setPackageId(packageId);newOrder.setStatus(OrderStatus.PENDING);return orderRepository.save(newOrder);} catch (DataIntegrityViolationException ex) {// 處理唯一鍵沖突(高并發場景)return orderRepository.findPendingByBusinessKey(businessKey);}}
}
2、通過捕獲唯一索引異常的方式插入
上面這種策略在創建新訂單之前是先通過業務ID的方式去查詢了數據庫中是否已經存在了這個業務ID對應的訂單,還有一種方式是直接生成訂單信息并且執行insert插入,之后通過捕獲唯一索引異常(DuplicateKeyException)的方式來返回已經創建的訂單信息。
具體的實現代碼如下:
@Transactional
public void createOrder(Order order) {try {orderDao.insert(order); // 觸發唯一約束} catch (DataIntegrityViolationException ex) {// 抓取重復提交異常Order existingOrder = orderRepository.findPendingByBusinessKey(businessKey);throw new DuplicateOrderException(existingOrder.getOrderId());}
}
2、樂觀鎖
通過數據庫樂觀鎖的方式保證冪等性,同樣也是基于數據庫的一種實現方式,
首先介紹一下樂觀鎖的概念:
樂觀鎖:即認為死鎖的發生是極小概率的事件,所以在修改數據之前不會對數據進行加鎖,只有在修改數據時通過判斷本次修改的版本和上一次的版本是否相同,相同則表示數據未被修改,不相同則表示數據已經被修改,此時的數據修改失敗。
適用場景:樂觀鎖機制適用于存在版本屬性的更新,這種方式的使用通常需要在數據庫表中增加int類型的versionId字段,每次修改數據時versionId=versionId+1,以此來保證每次更新的版本都是新的。
我們同樣以商品訂單表為例,其中加入version_id字段,用來記錄當前的數據版本。
CREATE TABLE payment_order (order_id VARCHAR(32) PRIMARY KEY,version_id int NOT NULL,user_id BIGINT NOT NULL,amount DECIMAL(10,2),status VARCHAR(20),create_time DATETIME
);
當執行更新時,需要判斷當前查詢到的version和將要更新的version是否相同
#查詢數據
SELECT version_id FROM payment_order WHERE order_id = #{order_id}#更新數據,要求數據當前版本號和已知版本號相同,并且每次更新版本號遞增
UPDATE payment_order SET status=PAID, version_id = version_id+1
WHERE order_id = #{order_id} AND version_id = #{version_id}
3、悲觀鎖
介紹一下悲觀鎖的概念
悲觀鎖:即認為死鎖總是會發生的,所以在每次更新數據時都會對數據進行加鎖,當其他線程想要修改數據時會處于一個阻塞的狀態
這種處理方式一般需要我們在更新數據庫時使用行級鎖的更新方法,即開啟事務并先查詢出數據,同時對數據進行加鎖,更新完成數據之后,再提交事務,從而釋放鎖。
以獲取商品信息并生成訂單,之后進行庫存扣減為例,具體的sql操作如下:
//0.開始事務
begin//1.查詢出商品信息
select number from payment where id=#{payment_id} for update;//2.根據商品信息生成訂單
insert into payment_order (id,其他字段...) values (?,?,?,...);//3.修改商品庫存
update payment set number=#{number} where id=#{payment_id}//4.提交事務
commit
4、狀態機
狀態機的原理是通過狀態機的流轉控制,確保操作只會被執行一次
適用場景:這種機制適用于訂單或工單流程類系統,如訂單狀態變更,狀態機來保證消息冪等性的策略可以說是依據嚴格的業務執行流程來的,換句話來說就是一條數據的狀態只能由一個狀態變為指定的另外一種或多種狀態,
以訂單數據為例,狀態可以分為:待支付、已支付、已超時、已取消這幾種狀態,那么訂單的狀態流向就是固定的一個狀態機制,
以下是一個訂單狀態的狀態機
public enum OrderStateTypeEnum {PENDING, // 待支付PAID, // 已支付EXPIRED, // 已超時CANCELED; // 已取消/*** 狀態機*/private static final Map<OrderStateTypeEnum, Set<OrderStateTypeEnum>> transitions = new HashMap<>();static {//待支付狀態可以轉換為其他三種transitions.put(PENDING, EnumSet.of(PAID, EXPIRED, CANCELED));//已支付狀態不能轉換為其他狀態transitions.put(PAID, EnumSet.noneOf(OrderStateTypeEnum.class));}public boolean canTransitionTo(OrderStateTypeEnum orderStateType) {return transitions.get(this).contains(orderStateType);}
}
通過狀態機的方式去更新數據時,會先查詢訂單當前的狀態,并且判斷當前狀態是否可以轉化為將要更新后的狀態,如果可以再執行數據的更新,否則則認為當前的狀態轉變是不合理的。
5、Redis
基于Redis的原子操作來實現分布式鎖,通過SETNX設置key來標識是否處理過,并且設置過期時間,如果成功則處理,否則則忽略。
適用場景:高并發情況下的快速檢查,比如秒殺活動等,這種方式具有高性能低延遲的特點,但是在使用過程中要注意Redis的高可用問題。
以下是一個通過Redis實現分布式鎖,來避免訂單重復處理的代碼邏輯,
// Redis SETNX 實現分布式鎖
public class RedisLockService {public boolean tryLock(String key, String value, long expireSeconds) {return redis.opsForValue().setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS);}
}public class OrderService {public void addOrder(String orderId) {String lockKey = "lock:order:" + orderId;String requestId = UUID.randomUUID().toString();try {if (tryLock(lockKey, requestId, 30)) {// 業務處理processOrder(orderId);}} finally {// Lua腳本保證原子刪除String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";redis.execute(script, Collections.singletonList(lockKey), requestId);}}
}
6、token機制
token機制的實現是:客戶端先從服務器上獲取一個token,提交請求時攜帶上這個token,服務器端會驗證這個token是否已經存在,如果已經存在則刪除(因為在客戶端獲取這個Token時,服務器端已經存起來了)并且繼續后面的操作,這樣可以防止重復提交的發生,
適用場景:防止請求重復提交,API接口的短時效防重等,如在用戶下單時生成token,提交時服務器進行驗證,在代碼實現中可以用Redis存儲token,以此可以防止用戶重復提交多個訂單,
基于Token的實現的關鍵代碼處理如下:
// Redis + Token 機制
public class TokenService {@Autowiredprivate RedisTemplate<String, String> redis;/*** 生成Token*/public String generateToken(String userId) {String token = UUID.randomUUID().toString();//存入Redisredis.opsForValue().set(userId + ":" + token, "1", 5, TimeUnit.MINUTES);return token;}/*** 校驗Token的有效性*/public boolean validateToken(String userId, String token) {String key = userId + ":" + token;Long deleted = redis.delete(key); // 原子性刪除return deleted != null && deleted > 0;}
}
具體選用哪種冪等策略,還需要根據具體的業務功能來確定,在一個項目中,可能同時使用了多種冪等策略,這些都需要結合他們不同的特點和業務需求來分析,原則就是以實現功能的前提下以最小代碼成本實現功能。