🎯 本文介紹了一種優化預訂接口的方法,通過引入本地消息表解決分布式事務中的最終一致性問題。原先的實現是在一個事務中同時扣減庫存和創建訂單,容易因網絡不穩定導致數據不一致。改進后的方法將業務操作和消息發送封裝在本地事務中,并利用MQ進行異步解耦,確保了即使在網絡故障時也能保證系統的數據一致性。此外,還設計了定時任務重試機制以及冪等性保障措施來進一步確保消息被成功處理,從而實現了高效可靠的分布式事務處理。
說明
在前面的預訂實現中,是先開啟一個事務,然后去扣減庫存,再通過RPC調用訂單服務來創建訂單,如果訂單創建成功,就提交事務;否則回滾事務。代碼實現如下:
/*** 執行下單和數據庫庫存扣減操作** @param timePeriodDO* @param courtIndex* @param venueId* @return*/
@Override
public OrderDO executePreserveV1(TimePeriodDO timePeriodDO,Long courtIndex, Long venueId,String stockKey, String freeIndexBitMapKey) {// 編程式開啟事務,減少事務粒度,避免長事務的發生return transactionTemplate.execute(status -> {try {// 扣減當前時間段的庫存,修改空閑場信息baseMapper.updateStockAndBookedSlots(timePeriodDO.getId(), timePeriodDO.getPartitionId(), courtIndex);// 調用遠程服務創建訂單OrderGenerateReqDTO orderGenerateReqDTO = OrderGenerateReqDTO.builder().timePeriodId(timePeriodDO.getId()).partitionId(timePeriodDO.getPartitionId()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).courtIndex(courtIndex).userId(UserContext.getUserId()).userName(UserContext.getUsername()).venueId(venueId).payAmount(timePeriodDO.getPrice()).build();Result<OrderDO> result;try {result = orderFeignService.generateOrder(orderGenerateReqDTO);if (result == null || !result.isSuccess()) {// --if-- 訂單生成失敗,拋出異常,上面的庫存扣減也會回退throw new ServiceException(BaseErrorCode.ORDER_GENERATE_ERROR);}} catch (Exception e) {// --if-- 訂單生成服務調用失敗// 恢復緩存中的信息this.restoreStockAndBookedSlotsCache(timePeriodDO.getId(),UserContext.getUserId(),courtIndex,stockKey,freeIndexBitMapKey);// todo 如果說由于網絡原因,實際上訂單已經創建成功了,但是因為超時訪問失敗,這里庫存卻回滾了,此時需要將訂單置為廢棄狀態(即刪除)// 發送一個短暫的延時消息(時間過長,用戶可能已經支付),去檢查訂單是否生成,如果生成,將其刪除// 打印錯誤堆棧信息e.printStackTrace();// 把錯誤返回到前端throw new ServiceException(e.getMessage());}return result.getData();} catch (Exception ex) {status.setRollbackOnly();throw ex;}});
}
但是網絡有時候是不穩定的,假如訂單服務創建訂單成功,但是由于網絡原因,沒辦法將訂單數據返回給庫存服務。這時候庫存服務就會誤認為訂單服務出錯,進而回滾了事務。這樣,就出現了訂單創建成功,但是庫存卻沒有扣減,出現了不一致問題,這種不一致會導致超賣。
由于庫存扣減、訂單生成處于不同的服務中,雙方無法使用本地事務來保證兩者的一致性,這屬于分布式事務。常見的分布式事務解決方案有:
- 強一致:2PC、3PC、TCC、Saga模式
- 最終一致:本地消息表、MQ事務消息、最大努力通知
- 工具:Seata
本文使用比較常用的本地消息表來解決
本地消息表介紹
本地消息表的核心思想:將分布式事務拆分為本地事務+異步消息,通過本地事務保證消息的可靠存儲,通過重試機制確保遠程業務最終執行成功。
核心步驟
- 本地事務與消息寫入 業務執行時,先在本地數據庫完成業務操作,同時將待發送的消息(含業務ID、狀態等)插入同一事務的
消息表
,利用本地事務的ACID特性保證兩者原子性。 - 異步輪詢消息 后臺定時任務掃描
消息表
中狀態為"待發送"的消息,調用下游服務的接口。 - 下游服務處理 下游服務執行業務邏輯,成功后返回確認;若失敗或超時,觸發重試(需保證接口冪等性)。
- 消息狀態更新 下游處理成功后,更新本地
消息表
中該消息狀態為"已完成";若多次重試失敗則標記為"失敗",人工介入處理。
關鍵點
- 可靠性:消息表與業務數據同庫,本地事務確保業務執行成功,本地消息就會記錄成功
- 異步解耦:通過異步重試替代同步阻塞,提高系統吞吐量
- 冪等性:下游服務調用要支持冪等性,不然重復消費可能出問題
本文實踐過程
- 預訂接口首先通過緩存驗證用戶是否預訂成功,預訂成功就發送一條預訂消息到MQ
- 訂單服務去消費預訂消息,通過本地事務保證插入訂單、插入本地消息的原子性
- 通過定時任務輪詢本地消息表中還沒有執行成功的消息,將任務投遞到MQ中,后面讓庫存服務去消費,進行庫存扣減(當然這里也可以直接通過RPC調用庫存服務扣減,但是為了解耦兩個服務,本文使用MQ來實現)
- 注意:庫存服務執行庫存扣減的時候,需要保證冪等性。即一個訂單扣減過庫存之后,不允許再扣減第二次。
數據庫設計
首先需要創建一個表,用來記錄本地消息
CREATE TABLE `local_message` (`id` bigint NOT NULL COMMENT '主鍵ID',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '邏輯刪除 0:未刪除 1:已刪除',`msg_id` varchar(64) NOT NULL COMMENT '唯一消息ID',`topic` varchar(200) NOT NULL COMMENT '消息Topic',`tag` varchar(200) NOT NULL DEFAULT '' COMMENT '消息Tag',`content` text NOT NULL COMMENT '消息內容(JSON格式)',`status` tinyint NOT NULL DEFAULT '0' COMMENT '消息狀態 0:待發送 1:消費失敗 2:消費成功 3:超過重試次數',`fail_reason` varchar(1000) DEFAULT NULL COMMENT '失敗原因',`retry_count` int NOT NULL DEFAULT '0' COMMENT '已重試次數',`next_retry_time` bigint NOT NULL DEFAULT '0' COMMENT '下次重試時間戳(毫秒)',`max_retry_count` int NOT NULL DEFAULT '3' COMMENT '最大重試次數',PRIMARY KEY (`id`),UNIQUE KEY `uk_msg_id` (`msg_id`),KEY `idx_status_retry` (`status`, `next_retry_time`),KEY `idx_topic_tag` (`topic`, `tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='本地消息表';
業務實現
枚舉
package com.vrs.enums;import lombok.Getter;
import lombok.RequiredArgsConstructor;/*** 場館類型枚舉*/
@RequiredArgsConstructor
public enum LocalMessageStatusEnum {INIT(0, "待發送"),SEND_FAIL(1, "消費失敗"),SEND_SUCCESS(2, "消費成功"),ARRIVE_MAX_RETRY_COUNT(3, "超過重試次數"),;@Getterprivate final int status;@Getterprivate final String msg;}
預訂
首先驗證令牌是否充足,充足就發送一條預訂消息到 MQ
/*** 嘗試獲取令牌,令牌獲取成功之后,發送消息,異步執行庫存扣減和訂單生成* 注意:令牌在極端情況下,如扣減令牌之后,服務宕機了,此時令牌的庫存是小于真實庫存的* 如果查詢令牌發現庫存為0,嘗試去數據庫中加載數據,加載之后庫存還是0,說明時間段確實售罄了* 使用消息隊列異步 扣減庫存,更新緩存,生成訂單** @param timePeriodId* @param courtIndex*/
@Override
public String reserve2(Long timePeriodId, Integer courtIndex) { 參數校驗:使用責任鏈模式校驗數據是否正確TimePeriodReserveReqDTO timePeriodReserveReqDTO = new TimePeriodReserveReqDTO(timePeriodId, courtIndex);chainContext.handler(ChainConstant.RESERVE_CHAIN_NAME, timePeriodReserveReqDTO);Long venueId = timePeriodReserveReqDTO.getVenueId();VenueDO venueDO = timePeriodReserveReqDTO.getVenueDO();PartitionDO partitionDO = timePeriodReserveReqDTO.getPartitionDO();TimePeriodDO timePeriodDO = timePeriodReserveReqDTO.getTimePeriodDO(); 使用lua腳本獲取一個空場地對應的索引,并扣除相應的庫存,同時在里面進行用戶的查重// 首先檢測空閑場號緩存有沒有加載好,沒有的話進行加載this.checkBitMapCache(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId()),timePeriodId,partitionDO.getNum());// 其次檢測時間段庫存有沒有加載好,沒有的話進行加載this.getStockByTimePeriodId(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId());// todo 判斷是否還有令牌,沒有的話,重新加載(注意要分布式鎖)// 執行lua腳本Long freeCourtIndex = executeStockReduceByLua(timePeriodReserveReqDTO,venueDO,courtIndex, RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);if (freeCourtIndex == -2L) {// --if-- 用戶已經購買過該時間段throw new ClientException(BaseErrorCode.TIME_PERIOD_HAVE_BOUGHT_ERROR);} else if (freeCourtIndex == -1L) {// --if-- 沒有空閑的場號,查詢數據庫,如果數據庫中有庫存,刪除緩存,下一個用戶預定時重新加載令牌this.refreshTokenByCheckDataBase(timePeriodId);throw new ServiceException(BaseErrorCode.TIME_PERIOD_SELL_OUT_ERROR);} 發送消息,異步更新庫存并生成訂單String orderSn = SnowflakeIdUtil.nextId() + String.valueOf(UserContext.getUserId() % 1000000);SendResult sendResult = executeReserveProducer.sendMessage(ExecuteReserveMqDTO.builder().orderSn(orderSn).timePeriodId(timePeriodId).courtIndex(freeCourtIndex).venueId(venueId).userId(UserContext.getUserId()).userName(UserContext.getUsername()).partitionId(partitionDO.getId()).price(timePeriodDO.getPrice()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).build());if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {log.error("消息發送失敗: " + sendResult.getSendStatus());// 恢復令牌緩存this.restoreStockAndBookedSlotsCache(timePeriodId,UserContext.getUserId(),freeCourtIndex,RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);throw new ServiceException(BaseErrorCode.MQ_SEND_ERROR);}return orderSn;
}
訂單生成
消息預訂消息,執行訂單創建,并插入本地事務
/*** 消費預訂之后的消息* 生成訂單、生成本地消息** @param message*/
@Override
public void generateOrder(ExecuteReserveMqDTO message) {OrderDO orderDO = OrderDO.builder()// 訂單號使用雪花算法生成分布式ID,然后再拼接用戶ID的后面六位.orderSn(message.getOrderSn()).orderTime(new Date()).venueId(message.getVenueId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).timePeriodId(message.getTimePeriodId()).periodDate(message.getPeriodDate()).beginTime(message.getBeginTime()).endTime(message.getEndTime()).userId(message.getUserId()).userName(message.getUserName()).payAmount(message.getPrice()).orderStatus(OrderStatusConstant.UN_PAID).build();TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO = TimePeriodStockReduceMqDTO.builder().orderSn(message.getOrderSn()).timePeriodId(message.getTimePeriodId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).build();LocalMessageDO stockReduceLocalMessageDO = LocalMessageDO.builder().msgId(message.getOrderSn()).topic(RocketMqConstant.VENUE_TOPIC).tag(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG).content(JSON.toJSONString(timePeriodStockReduceMqDTO)).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();LocalMessageDO delayCloseLocalMessageD0 = LocalMessageDO.builder().msgId(SnowflakeIdUtil.nextIdStr()).topic(RocketMqConstant.ORDER_TOPIC).tag(RocketMqConstant.ORDER_DELAY_CLOSE_TAG).content(JSON.toJSONString(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build())).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();// 使用編程式事務,保證訂單創建、本地消息插入的一致性boolean success = transactionTemplate.execute(status -> {try {int insertCount = baseMapper.insert(orderDO);localMessageService.save(stockReduceLocalMessageDO);// 也保存一個本地消息,進行兜底。防止事務提交成功之后就宕機,延時消息沒有發生成功localMessageService.save(delayCloseLocalMessageD0);return insertCount > 0;} catch (Exception ex) {status.setRollbackOnly();throw ex;}});if (success) {// 發送延時消息來關閉未支付的訂單SendResult sendResult = orderDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {// 延遲關單已經發生成功,后面掃描的時候,無需再處理LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(delayCloseLocalMessageD0.getId());localMessageDO.setStatus(LocalMessageStatusEnum.INIT.getStatus());localMessageService.updateById(localMessageDO);}// todo 如果出現宕機,可能出現宕機,但是 websocket 消息沒有消息,所以前端還要實現一個輪詢來保底// 通過 websocket 發送消息,通知前端websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder().toUsername(orderDO.getUserName()).message(JSON.toJSONString(orderDO)).build());}
}
定時任務
- 定期掃描本地消息表(
local_message
)中待處理(未處理、上次處理失敗、下次重試時間小于等于現在)的消息 - 根據消息 Topic 和 tag 調用不同的消息處理器,將本地消息投遞到消息隊列中
- 消息投遞成功后更新消息狀態,失敗則通過指數退避算法計算下次重試時間,等待下次重試
- 使用分布式鎖保證集群環境下只有一個實例執行任務
【性能優化】
- 使用流式查詢,避免分頁查詢的無效掃描
- 通過批量修改優化單條修改的效率
【策略模式】
- 通過策略模式,根據不同的 tag 獲得不同的 MQ 生產者,避免
if else
代碼
package com.vrs.service.scheduled;import com.alibaba.fastjson2.JSON;
import com.vrs.constant.RocketMqConstant;
import com.vrs.design_pattern.strategy.MessageProcessor;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.domain.entity.LocalMessageDO;
import com.vrs.enums.LocalMessageStatusEnum;
import com.vrs.rocketMq.producer.OrderDelayCloseProducer;
import com.vrs.rocketMq.producer.TimePeriodStockReduceProducer;
import com.vrs.service.LocalMessageService;
import jakarta.annotation.PostConstruct;
import lombok.Cleanup;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Author dam* @create 2024/11/17 16:44*/
@Component
@RequiredArgsConstructor
@Slf4j
public class LocalMessageScheduledScan {private final DataSource dataSource;private final LocalMessageService localMessageService;private final TimePeriodStockReduceProducer timePeriodStockReduceProducer;private final OrderDelayCloseProducer orderDelayCloseProducer;private final RedissonClient redissonClient;/*** 使用策略模式處理消息*/// todo 可以優化策略模式的寫法,方便代碼擴展private final Map<String, MessageProcessor> messageProcessors = new HashMap<>();private final int BATCH_SIZE = 1000;/*** 注冊 tag 和其對應的消息處理器*/@PostConstructpublic void init() {messageProcessors.put(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG, mqDTO -> {TimePeriodStockReduceMqDTO dto = JSON.parseObject(mqDTO.getContent(), TimePeriodStockReduceMqDTO.class);return timePeriodStockReduceProducer.sendMessage(dto);});messageProcessors.put(RocketMqConstant.ORDER_DELAY_CLOSE_TAG, mqDTO -> {OrderDelayCloseMqDTO dto = JSON.parseObject(mqDTO.getContent(), OrderDelayCloseMqDTO.class);return orderDelayCloseProducer.sendMessage(dto);});}/*** 定時任務:掃描并處理本地消息* 每分鐘執行一次*/@Scheduled(cron = "0 */1 * * * ?")@SneakyThrowspublic void processLocalMessage() {RLock lock = redissonClient.getLock("LocalMessageScan");boolean locked = false;try {locked = lock.tryLock(1, TimeUnit.MINUTES);if (!locked) {log.warn("獲取分布式鎖失敗,跳過本次處理");return;}log.info("開始掃描本地消息表...");long start = System.currentTimeMillis();@Cleanup Connection conn = dataSource.getConnection();@Cleanup Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(Integer.MIN_VALUE);// 查詢sql,只查詢關鍵的字段String sql = "SELECT id,msg_id,topic,tag,content,retry_count,max_retry_count,next_retry_time FROM local_message where " +"is_deleted = 0 and (status = 0 OR status = 1) and next_retry_time<" + start;@Cleanup ResultSet rs = stmt.executeQuery(sql);List<LocalMessageDO> localMessageBuffer = new ArrayList<>();while (rs.next()) {// 獲取數據中的屬性LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(rs.getLong("id"));localMessageDO.setMsgId(rs.getString("msg_id"));localMessageDO.setTopic(rs.getString("topic"));localMessageDO.setTag(rs.getString("tag"));localMessageDO.setContent(rs.getString("content"));localMessageDO.setRetryCount(rs.getInt("retry_count"));localMessageDO.setMaxRetryCount(rs.getInt("max_retry_count"));localMessageDO.setNextRetryTime(rs.getLong("next_retry_time"));if (localMessageDO.getRetryCount() > localMessageDO.getMaxRetryCount()) continue;localMessageBuffer.add(localMessageDO);if (localMessageBuffer.size() > BATCH_SIZE) {batchProcessMessages(localMessageBuffer);localMessageBuffer.clear();}}if (!localMessageBuffer.isEmpty()) {batchProcessMessages(localMessageBuffer);}log.info("結束掃描本地消息表..." + (System.currentTimeMillis() - start) + "ms");} catch (Exception e) {log.error("處理本地消息表時發生異常", e);throw e; // 或根據業務決定是否拋出} finally {if (locked && lock.isHeldByCurrentThread()) {lock.unlock();}}}/*** 批量處理消息*/private void batchProcessMessages(List<LocalMessageDO> messages) {// 成功和失敗的消息分開處理List<Long> successIds = new ArrayList<>();List<Long> retryIds = new ArrayList<>();List<Long> arriveMaxRetryCountIds = new ArrayList<>();Map<Long, String> failureReasons = new HashMap<>();for (LocalMessageDO message : messages) {try {if (message.getRetryCount() > message.getMaxRetryCount()) {// 已經到達最大重試次數arriveMaxRetryCountIds.add(message.getId());continue;}MessageProcessor processor = messageProcessors.get(message.getTag());SendResult sendResult = processor.process(message);if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {successIds.add(message.getId());} else {retryIds.add(message.getId());failureReasons.put(message.getId(), "MQ發送狀態: " + sendResult.getSendStatus());}} catch (Exception e) {log.error("處理消息 {} 時發生異常", message.getMsgId(), e);retryIds.add(message.getId());failureReasons.put(message.getId(), "處理異常: " + e.getMessage());}}// 批量更新狀態if (!successIds.isEmpty()) {batchUpdateMessagesStatus(successIds, LocalMessageStatusEnum.SEND_SUCCESS);}if (!arriveMaxRetryCountIds.isEmpty()) {// todo 通知人工處理batchUpdateMessagesStatus(arriveMaxRetryCountIds, LocalMessageStatusEnum.ARRIVE_MAX_RETRY_COUNT);}if (!retryIds.isEmpty()) {batchUpdateRetryMessages(retryIds, failureReasons);}}/*** 批量更新消息狀態*/private void batchUpdateMessagesStatus(List<Long> ids, LocalMessageStatusEnum status) {if (ids.isEmpty()) return;List<LocalMessageDO> updates = ids.stream().map(id -> {LocalMessageDO update = new LocalMessageDO();update.setId(id);update.setStatus(status.getStatus());if (status == LocalMessageStatusEnum.SEND_FAIL) {update.setRetryCount(localMessageService.getById(id).getMaxRetryCount());}return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 批量更新重試消息*/private void batchUpdateRetryMessages(List<Long> ids, Map<Long, String> failReasons) {if (ids.isEmpty()) return;List<LocalMessageDO> messages = localMessageService.listByIds(ids);List<LocalMessageDO> updates = messages.stream().map(message -> {LocalMessageDO update = new LocalMessageDO();update.setId(message.getId());update.setStatus(LocalMessageStatusEnum.SEND_FAIL.getStatus());update.setRetryCount(message.getRetryCount() + 1);update.setNextRetryTime(getNextRetryTime(message.getRetryCount() + 1));update.setFailReason(failReasons.get(message.getId()));return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 獲取下次重試時間** @param retryCount* @return*/private long getNextRetryTime(int retryCount) {long interval = (long) Math.min(Math.pow(2, retryCount) * 1000, 3600 * 1000);return System.currentTimeMillis() + interval;}
}
庫存扣減
注意庫存扣減需要通過冪等組件來保證消費冪等性,key 是訂單號,即保證同一個訂單號只能扣減庫存一次
package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG,messageModel = MessageModel.CLUSTERING,// 監聽tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG
)
@RequiredArgsConstructor
public class TimePeriodStockReduceListener implements RocketMQListener<MessageWrapper<TimePeriodStockReduceMqDTO>> {private final TimePeriodService timePeriodService;/*** 消費消息的方法* 方法報錯就會拒收消息** @param messageWrapper 消息內容,類型和上面的泛型一致。如果泛型指定了固定的類型,消息體就是我們的參數*/@Idempotent(uniqueKeyPrefix = "time_period_stock_reduce:",key = "#messageWrapper.getMessage().getOrderSn()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(MessageWrapper<TimePeriodStockReduceMqDTO> messageWrapper) {// 開頭打印日志,平常可 Debug 看任務參數,線上可報平安(比如消息是否消費,重新投遞時獲取參數等)log.info("[消費者] 更新時間段的庫存和空閑場號,時間段ID:{}", messageWrapper.getMessage().getTimePeriodId());timePeriodService.reduceStockAndBookedSlots(messageWrapper.getMessage());}
}
【service】
/*** 扣減庫存** @param timePeriodStockReduceMqDTO*/
@Override
public void reduceStockAndBookedSlots(TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO) {baseMapper.updateStockAndBookedSlots(timePeriodStockReduceMqDTO.getTimePeriodId(), timePeriodStockReduceMqDTO.getPartitionId(), timePeriodStockReduceMqDTO.getCourtIndex());
}
【mapper】
<update id="updateStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots | (1 << #{partitionIndex}), stock = stock - 1WHERE id = #{timePeriodId} AND stock > 0 AND partition_id = #{partitionId}]]>
</update>