預訂接口優化:使用本地消息表保證訂單生成、庫存扣減的一致性

🎯 本文介紹了一種優化預訂接口的方法,通過引入本地消息表解決分布式事務中的最終一致性問題。原先的實現是在一個事務中同時扣減庫存和創建訂單,容易因網絡不穩定導致數據不一致。改進后的方法將業務操作和消息發送封裝在本地事務中,并利用MQ進行異步解耦,確保了即使在網絡故障時也能保證系統的數據一致性。此外,還設計了定時任務重試機制以及冪等性保障措施來進一步確保消息被成功處理,從而實現了高效可靠的分布式事務處理。

說明

在前面的預訂實現中,是先開啟一個事務,然后去扣減庫存,再通過RPC調用訂單服務來創建訂單,如果訂單創建成功,就提交事務;否則回滾事務。代碼實現如下:

/*** 執行下單和數據庫庫存扣減操作** @param timePeriodDO* @param courtIndex* @param venueId* @return*/
@Override
public OrderDO executePreserveV1(TimePeriodDO timePeriodDO,Long courtIndex, Long venueId,String stockKey, String freeIndexBitMapKey) {// 編程式開啟事務,減少事務粒度,避免長事務的發生return transactionTemplate.execute(status -> {try {// 扣減當前時間段的庫存,修改空閑場信息baseMapper.updateStockAndBookedSlots(timePeriodDO.getId(), timePeriodDO.getPartitionId(), courtIndex);// 調用遠程服務創建訂單OrderGenerateReqDTO orderGenerateReqDTO = OrderGenerateReqDTO.builder().timePeriodId(timePeriodDO.getId()).partitionId(timePeriodDO.getPartitionId()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).courtIndex(courtIndex).userId(UserContext.getUserId()).userName(UserContext.getUsername()).venueId(venueId).payAmount(timePeriodDO.getPrice()).build();Result<OrderDO> result;try {result = orderFeignService.generateOrder(orderGenerateReqDTO);if (result == null || !result.isSuccess()) {// --if-- 訂單生成失敗,拋出異常,上面的庫存扣減也會回退throw new ServiceException(BaseErrorCode.ORDER_GENERATE_ERROR);}} catch (Exception e) {// --if-- 訂單生成服務調用失敗// 恢復緩存中的信息this.restoreStockAndBookedSlotsCache(timePeriodDO.getId(),UserContext.getUserId(),courtIndex,stockKey,freeIndexBitMapKey);// todo 如果說由于網絡原因,實際上訂單已經創建成功了,但是因為超時訪問失敗,這里庫存卻回滾了,此時需要將訂單置為廢棄狀態(即刪除)// 發送一個短暫的延時消息(時間過長,用戶可能已經支付),去檢查訂單是否生成,如果生成,將其刪除// 打印錯誤堆棧信息e.printStackTrace();// 把錯誤返回到前端throw new ServiceException(e.getMessage());}return result.getData();} catch (Exception ex) {status.setRollbackOnly();throw ex;}});
}

但是網絡有時候是不穩定的,假如訂單服務創建訂單成功,但是由于網絡原因,沒辦法將訂單數據返回給庫存服務。這時候庫存服務就會誤認為訂單服務出錯,進而回滾了事務。這樣,就出現了訂單創建成功,但是庫存卻沒有扣減,出現了不一致問題,這種不一致會導致超賣。

由于庫存扣減、訂單生成處于不同的服務中,雙方無法使用本地事務來保證兩者的一致性,這屬于分布式事務。常見的分布式事務解決方案有:

  • 強一致:2PC、3PC、TCC、Saga模式
  • 最終一致:本地消息表、MQ事務消息、最大努力通知
  • 工具:Seata

本文使用比較常用的本地消息表來解決

本地消息表介紹

本地消息表的核心思想:將分布式事務拆分為本地事務+異步消息,通過本地事務保證消息的可靠存儲,通過重試機制確保遠程業務最終執行成功。

核心步驟

  1. 本地事務與消息寫入 業務執行時,先在本地數據庫完成業務操作,同時將待發送的消息(含業務ID、狀態等)插入同一事務的消息表,利用本地事務的ACID特性保證兩者原子性。
  2. 異步輪詢消息 后臺定時任務掃描消息表中狀態為"待發送"的消息,調用下游服務的接口。
  3. 下游服務處理 下游服務執行業務邏輯,成功后返回確認;若失敗或超時,觸發重試(需保證接口冪等性)。
  4. 消息狀態更新 下游處理成功后,更新本地消息表中該消息狀態為"已完成";若多次重試失敗則標記為"失敗",人工介入處理。

關鍵點

  • 可靠性:消息表與業務數據同庫,本地事務確保業務執行成功,本地消息就會記錄成功
  • 異步解耦:通過異步重試替代同步阻塞,提高系統吞吐量
  • 冪等性:下游服務調用要支持冪等性,不然重復消費可能出問題

本文實踐過程

  • 預訂接口首先通過緩存驗證用戶是否預訂成功,預訂成功就發送一條預訂消息到MQ
  • 訂單服務去消費預訂消息,通過本地事務保證插入訂單、插入本地消息的原子性
  • 通過定時任務輪詢本地消息表中還沒有執行成功的消息,將任務投遞到MQ中,后面讓庫存服務去消費,進行庫存扣減(當然這里也可以直接通過RPC調用庫存服務扣減,但是為了解耦兩個服務,本文使用MQ來實現)
  • 注意:庫存服務執行庫存扣減的時候,需要保證冪等性。即一個訂單扣減過庫存之后,不允許再扣減第二次。

數據庫設計

首先需要創建一個表,用來記錄本地消息

CREATE TABLE `local_message` (`id` bigint NOT NULL COMMENT '主鍵ID',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '邏輯刪除 0:未刪除 1:已刪除',`msg_id` varchar(64) NOT NULL COMMENT '唯一消息ID',`topic` varchar(200) NOT NULL COMMENT '消息Topic',`tag` varchar(200) NOT NULL DEFAULT '' COMMENT '消息Tag',`content` text NOT NULL COMMENT '消息內容(JSON格式)',`status` tinyint NOT NULL DEFAULT '0' COMMENT '消息狀態 0:待發送 1:消費失敗 2:消費成功 3:超過重試次數',`fail_reason` varchar(1000) DEFAULT NULL COMMENT '失敗原因',`retry_count` int NOT NULL DEFAULT '0' COMMENT '已重試次數',`next_retry_time` bigint NOT NULL DEFAULT '0' COMMENT '下次重試時間戳(毫秒)',`max_retry_count` int NOT NULL DEFAULT '3' COMMENT '最大重試次數',PRIMARY KEY (`id`),UNIQUE KEY `uk_msg_id` (`msg_id`),KEY `idx_status_retry` (`status`, `next_retry_time`),KEY `idx_topic_tag` (`topic`, `tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='本地消息表';

業務實現

枚舉

package com.vrs.enums;import lombok.Getter;
import lombok.RequiredArgsConstructor;/*** 場館類型枚舉*/
@RequiredArgsConstructor
public enum LocalMessageStatusEnum {INIT(0, "待發送"),SEND_FAIL(1, "消費失敗"),SEND_SUCCESS(2, "消費成功"),ARRIVE_MAX_RETRY_COUNT(3, "超過重試次數"),;@Getterprivate final int status;@Getterprivate final String msg;}

預訂

首先驗證令牌是否充足,充足就發送一條預訂消息到 MQ

/*** 嘗試獲取令牌,令牌獲取成功之后,發送消息,異步執行庫存扣減和訂單生成* 注意:令牌在極端情況下,如扣減令牌之后,服務宕機了,此時令牌的庫存是小于真實庫存的* 如果查詢令牌發現庫存為0,嘗試去數據庫中加載數據,加載之后庫存還是0,說明時間段確實售罄了* 使用消息隊列異步 扣減庫存,更新緩存,生成訂單** @param timePeriodId* @param courtIndex*/
@Override
public String reserve2(Long timePeriodId, Integer courtIndex) { 參數校驗:使用責任鏈模式校驗數據是否正確TimePeriodReserveReqDTO timePeriodReserveReqDTO = new TimePeriodReserveReqDTO(timePeriodId, courtIndex);chainContext.handler(ChainConstant.RESERVE_CHAIN_NAME, timePeriodReserveReqDTO);Long venueId = timePeriodReserveReqDTO.getVenueId();VenueDO venueDO = timePeriodReserveReqDTO.getVenueDO();PartitionDO partitionDO = timePeriodReserveReqDTO.getPartitionDO();TimePeriodDO timePeriodDO = timePeriodReserveReqDTO.getTimePeriodDO(); 使用lua腳本獲取一個空場地對應的索引,并扣除相應的庫存,同時在里面進行用戶的查重// 首先檢測空閑場號緩存有沒有加載好,沒有的話進行加載this.checkBitMapCache(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId()),timePeriodId,partitionDO.getNum());// 其次檢測時間段庫存有沒有加載好,沒有的話進行加載this.getStockByTimePeriodId(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId());// todo 判斷是否還有令牌,沒有的話,重新加載(注意要分布式鎖)// 執行lua腳本Long freeCourtIndex = executeStockReduceByLua(timePeriodReserveReqDTO,venueDO,courtIndex, RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);if (freeCourtIndex == -2L) {// --if-- 用戶已經購買過該時間段throw new ClientException(BaseErrorCode.TIME_PERIOD_HAVE_BOUGHT_ERROR);} else if (freeCourtIndex == -1L) {// --if-- 沒有空閑的場號,查詢數據庫,如果數據庫中有庫存,刪除緩存,下一個用戶預定時重新加載令牌this.refreshTokenByCheckDataBase(timePeriodId);throw new ServiceException(BaseErrorCode.TIME_PERIOD_SELL_OUT_ERROR);} 發送消息,異步更新庫存并生成訂單String orderSn = SnowflakeIdUtil.nextId() + String.valueOf(UserContext.getUserId() % 1000000);SendResult sendResult = executeReserveProducer.sendMessage(ExecuteReserveMqDTO.builder().orderSn(orderSn).timePeriodId(timePeriodId).courtIndex(freeCourtIndex).venueId(venueId).userId(UserContext.getUserId()).userName(UserContext.getUsername()).partitionId(partitionDO.getId()).price(timePeriodDO.getPrice()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).build());if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {log.error("消息發送失敗: " + sendResult.getSendStatus());// 恢復令牌緩存this.restoreStockAndBookedSlotsCache(timePeriodId,UserContext.getUserId(),freeCourtIndex,RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);throw new ServiceException(BaseErrorCode.MQ_SEND_ERROR);}return orderSn;
}

訂單生成

消息預訂消息,執行訂單創建,并插入本地事務

/*** 消費預訂之后的消息* 生成訂單、生成本地消息** @param message*/
@Override
public void generateOrder(ExecuteReserveMqDTO message) {OrderDO orderDO = OrderDO.builder()// 訂單號使用雪花算法生成分布式ID,然后再拼接用戶ID的后面六位.orderSn(message.getOrderSn()).orderTime(new Date()).venueId(message.getVenueId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).timePeriodId(message.getTimePeriodId()).periodDate(message.getPeriodDate()).beginTime(message.getBeginTime()).endTime(message.getEndTime()).userId(message.getUserId()).userName(message.getUserName()).payAmount(message.getPrice()).orderStatus(OrderStatusConstant.UN_PAID).build();TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO = TimePeriodStockReduceMqDTO.builder().orderSn(message.getOrderSn()).timePeriodId(message.getTimePeriodId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).build();LocalMessageDO stockReduceLocalMessageDO = LocalMessageDO.builder().msgId(message.getOrderSn()).topic(RocketMqConstant.VENUE_TOPIC).tag(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG).content(JSON.toJSONString(timePeriodStockReduceMqDTO)).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();LocalMessageDO delayCloseLocalMessageD0 = LocalMessageDO.builder().msgId(SnowflakeIdUtil.nextIdStr()).topic(RocketMqConstant.ORDER_TOPIC).tag(RocketMqConstant.ORDER_DELAY_CLOSE_TAG).content(JSON.toJSONString(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build())).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();// 使用編程式事務,保證訂單創建、本地消息插入的一致性boolean success = transactionTemplate.execute(status -> {try {int insertCount = baseMapper.insert(orderDO);localMessageService.save(stockReduceLocalMessageDO);// 也保存一個本地消息,進行兜底。防止事務提交成功之后就宕機,延時消息沒有發生成功localMessageService.save(delayCloseLocalMessageD0);return insertCount > 0;} catch (Exception ex) {status.setRollbackOnly();throw ex;}});if (success) {// 發送延時消息來關閉未支付的訂單SendResult sendResult = orderDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {// 延遲關單已經發生成功,后面掃描的時候,無需再處理LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(delayCloseLocalMessageD0.getId());localMessageDO.setStatus(LocalMessageStatusEnum.INIT.getStatus());localMessageService.updateById(localMessageDO);}// todo 如果出現宕機,可能出現宕機,但是 websocket 消息沒有消息,所以前端還要實現一個輪詢來保底// 通過 websocket 發送消息,通知前端websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder().toUsername(orderDO.getUserName()).message(JSON.toJSONString(orderDO)).build());}
}

定時任務

  • 定期掃描本地消息表(local_message)中待處理(未處理、上次處理失敗、下次重試時間小于等于現在)的消息
  • 根據消息 Topic 和 tag 調用不同的消息處理器,將本地消息投遞到消息隊列中
  • 消息投遞成功后更新消息狀態,失敗則通過指數退避算法計算下次重試時間,等待下次重試
  • 使用分布式鎖保證集群環境下只有一個實例執行任務

【性能優化】

  • 使用流式查詢,避免分頁查詢的無效掃描
  • 通過批量修改優化單條修改的效率

【策略模式】

  • 通過策略模式,根據不同的 tag 獲得不同的 MQ 生產者,避免if else代碼
package com.vrs.service.scheduled;import com.alibaba.fastjson2.JSON;
import com.vrs.constant.RocketMqConstant;
import com.vrs.design_pattern.strategy.MessageProcessor;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.domain.entity.LocalMessageDO;
import com.vrs.enums.LocalMessageStatusEnum;
import com.vrs.rocketMq.producer.OrderDelayCloseProducer;
import com.vrs.rocketMq.producer.TimePeriodStockReduceProducer;
import com.vrs.service.LocalMessageService;
import jakarta.annotation.PostConstruct;
import lombok.Cleanup;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Author dam* @create 2024/11/17 16:44*/
@Component
@RequiredArgsConstructor
@Slf4j
public class LocalMessageScheduledScan {private final DataSource dataSource;private final LocalMessageService localMessageService;private final TimePeriodStockReduceProducer timePeriodStockReduceProducer;private final OrderDelayCloseProducer orderDelayCloseProducer;private final RedissonClient redissonClient;/*** 使用策略模式處理消息*/// todo 可以優化策略模式的寫法,方便代碼擴展private final Map<String, MessageProcessor> messageProcessors = new HashMap<>();private final int BATCH_SIZE = 1000;/*** 注冊 tag 和其對應的消息處理器*/@PostConstructpublic void init() {messageProcessors.put(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG, mqDTO -> {TimePeriodStockReduceMqDTO dto = JSON.parseObject(mqDTO.getContent(), TimePeriodStockReduceMqDTO.class);return timePeriodStockReduceProducer.sendMessage(dto);});messageProcessors.put(RocketMqConstant.ORDER_DELAY_CLOSE_TAG, mqDTO -> {OrderDelayCloseMqDTO dto = JSON.parseObject(mqDTO.getContent(), OrderDelayCloseMqDTO.class);return orderDelayCloseProducer.sendMessage(dto);});}/*** 定時任務:掃描并處理本地消息* 每分鐘執行一次*/@Scheduled(cron = "0 */1 * * * ?")@SneakyThrowspublic void processLocalMessage() {RLock lock = redissonClient.getLock("LocalMessageScan");boolean locked = false;try {locked = lock.tryLock(1, TimeUnit.MINUTES);if (!locked) {log.warn("獲取分布式鎖失敗,跳過本次處理");return;}log.info("開始掃描本地消息表...");long start = System.currentTimeMillis();@Cleanup Connection conn = dataSource.getConnection();@Cleanup Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(Integer.MIN_VALUE);// 查詢sql,只查詢關鍵的字段String sql = "SELECT id,msg_id,topic,tag,content,retry_count,max_retry_count,next_retry_time FROM local_message where " +"is_deleted = 0 and (status = 0 OR status = 1) and next_retry_time<" + start;@Cleanup ResultSet rs = stmt.executeQuery(sql);List<LocalMessageDO> localMessageBuffer = new ArrayList<>();while (rs.next()) {// 獲取數據中的屬性LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(rs.getLong("id"));localMessageDO.setMsgId(rs.getString("msg_id"));localMessageDO.setTopic(rs.getString("topic"));localMessageDO.setTag(rs.getString("tag"));localMessageDO.setContent(rs.getString("content"));localMessageDO.setRetryCount(rs.getInt("retry_count"));localMessageDO.setMaxRetryCount(rs.getInt("max_retry_count"));localMessageDO.setNextRetryTime(rs.getLong("next_retry_time"));if (localMessageDO.getRetryCount() > localMessageDO.getMaxRetryCount()) continue;localMessageBuffer.add(localMessageDO);if (localMessageBuffer.size() > BATCH_SIZE) {batchProcessMessages(localMessageBuffer);localMessageBuffer.clear();}}if (!localMessageBuffer.isEmpty()) {batchProcessMessages(localMessageBuffer);}log.info("結束掃描本地消息表..." + (System.currentTimeMillis() - start) + "ms");} catch (Exception e) {log.error("處理本地消息表時發生異常", e);throw e; // 或根據業務決定是否拋出} finally {if (locked && lock.isHeldByCurrentThread()) {lock.unlock();}}}/*** 批量處理消息*/private void batchProcessMessages(List<LocalMessageDO> messages) {// 成功和失敗的消息分開處理List<Long> successIds = new ArrayList<>();List<Long> retryIds = new ArrayList<>();List<Long> arriveMaxRetryCountIds = new ArrayList<>();Map<Long, String> failureReasons = new HashMap<>();for (LocalMessageDO message : messages) {try {if (message.getRetryCount() > message.getMaxRetryCount()) {// 已經到達最大重試次數arriveMaxRetryCountIds.add(message.getId());continue;}MessageProcessor processor = messageProcessors.get(message.getTag());SendResult sendResult = processor.process(message);if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {successIds.add(message.getId());} else {retryIds.add(message.getId());failureReasons.put(message.getId(), "MQ發送狀態: " + sendResult.getSendStatus());}} catch (Exception e) {log.error("處理消息 {} 時發生異常", message.getMsgId(), e);retryIds.add(message.getId());failureReasons.put(message.getId(), "處理異常: " + e.getMessage());}}// 批量更新狀態if (!successIds.isEmpty()) {batchUpdateMessagesStatus(successIds, LocalMessageStatusEnum.SEND_SUCCESS);}if (!arriveMaxRetryCountIds.isEmpty()) {// todo 通知人工處理batchUpdateMessagesStatus(arriveMaxRetryCountIds, LocalMessageStatusEnum.ARRIVE_MAX_RETRY_COUNT);}if (!retryIds.isEmpty()) {batchUpdateRetryMessages(retryIds, failureReasons);}}/*** 批量更新消息狀態*/private void batchUpdateMessagesStatus(List<Long> ids, LocalMessageStatusEnum status) {if (ids.isEmpty()) return;List<LocalMessageDO> updates = ids.stream().map(id -> {LocalMessageDO update = new LocalMessageDO();update.setId(id);update.setStatus(status.getStatus());if (status == LocalMessageStatusEnum.SEND_FAIL) {update.setRetryCount(localMessageService.getById(id).getMaxRetryCount());}return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 批量更新重試消息*/private void batchUpdateRetryMessages(List<Long> ids, Map<Long, String> failReasons) {if (ids.isEmpty()) return;List<LocalMessageDO> messages = localMessageService.listByIds(ids);List<LocalMessageDO> updates = messages.stream().map(message -> {LocalMessageDO update = new LocalMessageDO();update.setId(message.getId());update.setStatus(LocalMessageStatusEnum.SEND_FAIL.getStatus());update.setRetryCount(message.getRetryCount() + 1);update.setNextRetryTime(getNextRetryTime(message.getRetryCount() + 1));update.setFailReason(failReasons.get(message.getId()));return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 獲取下次重試時間** @param retryCount* @return*/private long getNextRetryTime(int retryCount) {long interval = (long) Math.min(Math.pow(2, retryCount) * 1000, 3600 * 1000);return System.currentTimeMillis() + interval;}
}

庫存扣減

注意庫存扣減需要通過冪等組件來保證消費冪等性,key 是訂單號,即保證同一個訂單號只能扣減庫存一次

package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG,messageModel = MessageModel.CLUSTERING,// 監聽tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG
)
@RequiredArgsConstructor
public class TimePeriodStockReduceListener implements RocketMQListener<MessageWrapper<TimePeriodStockReduceMqDTO>> {private final TimePeriodService timePeriodService;/*** 消費消息的方法* 方法報錯就會拒收消息** @param messageWrapper 消息內容,類型和上面的泛型一致。如果泛型指定了固定的類型,消息體就是我們的參數*/@Idempotent(uniqueKeyPrefix = "time_period_stock_reduce:",key = "#messageWrapper.getMessage().getOrderSn()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(MessageWrapper<TimePeriodStockReduceMqDTO> messageWrapper) {// 開頭打印日志,平常可 Debug 看任務參數,線上可報平安(比如消息是否消費,重新投遞時獲取參數等)log.info("[消費者] 更新時間段的庫存和空閑場號,時間段ID:{}", messageWrapper.getMessage().getTimePeriodId());timePeriodService.reduceStockAndBookedSlots(messageWrapper.getMessage());}
}

【service】

/*** 扣減庫存** @param timePeriodStockReduceMqDTO*/
@Override
public void reduceStockAndBookedSlots(TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO) {baseMapper.updateStockAndBookedSlots(timePeriodStockReduceMqDTO.getTimePeriodId(), timePeriodStockReduceMqDTO.getPartitionId(), timePeriodStockReduceMqDTO.getCourtIndex());
}

【mapper】

<update id="updateStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots | (1 << #{partitionIndex}), stock = stock - 1WHERE id = #{timePeriodId} AND stock > 0 AND partition_id = #{partitionId}]]>
</update>

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/81376.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/81376.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/81376.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

計算機網絡——客戶端/服務端,URI與URL的區別,以及TCP/IP核心機制全解析

文章目錄 客戶端/服務端&#xff0c;URI與URL的區別&#xff0c;以及TCP/IP核心機制全解析一、客戶端/服務端通信模型概述二、URI 與 URL 的概念與區別1. URL&#xff08;統一資源定位符&#xff09;2. URI&#xff08;統一資源標識符&#xff09;3. URI 與 URL 的關系 三、SYN…

柔性PZT壓電薄膜多維力傳感器在微創手術機器人的應用

隨著醫療技術的迅速發展&#xff0c;微創手術機器人正在成為外科手術的重要助手。與傳統開放式手術相比&#xff0c;微創手術創傷小、恢復快、感染率低&#xff0c;對手術器械的精細操控性和感知能力提出了更高要求。多維力傳感器作為機器人“觸覺”的核心部件&#xff0c;對提…

SpringAI整合DeepSeek生成圖表

利用Spring-ai-openai集成DeepSeek ①、在DeepSeek開放平臺創建API KEY ②、創建springboot項目&#xff0c;引入spring-ai-openai依賴&#xff0c;創建配置文件&#xff0c;配置deepseek的url和api key ③、具體的實現業務應用 RestController public class ChatD…

xss-lab靶場基礎詳解第1~3關

第一關 我去&#xff0c;還是得多學基礎啊 http://127.0.0.1/xss-labs/level1.php?name<u>a</u> 這個看他的網站源碼&#xff0c;可以看到他沒有過濾&#xff0c;沒有被編碼 然后在name<script>alert(1)</script>&#xff0c;就算過關了 第二關 …

【MySQL】聚合查詢 和 分組查詢

個人主頁&#xff1a;?喜歡做夢 歡迎 &#x1f44d;點贊 ?關注 ??收藏 &#x1f4ac;評論 目錄 &#x1f334; 一、聚合查詢 &#x1f332;1.概念 &#x1f332;2.聚合查詢函數 COUNT&#xff08;&#xff09; SUM&#xff08;&#xff09; AVG&#xff08;&…

計算機啟動流程中,都干了啥事。比如文件掛在,操作系統加載,中斷向量表加載,磁盤初始化在哪階段。

建議在電腦上看&#xff0c;手機上格式有點問題&#xff0c;認真讀&#xff0c;這方面沒問題的&#xff0c;肝了一天。 目錄.計算機啟動詳解 一.計算機啟動直觀圖二.步驟詳解前置準備磁盤初始化1.開機階段2.執行BIOS階段3.執行引導記錄&#xff08;MBR&#xff09;階段4.操作系…

后端開發技術之Log日志框架

第一章 日志原理 1.1 log發展歷史 從JDK1.4開始提供java.until.logging&#xff0c;后來大佬發現JUL太難用了&#xff0c;就自己手擼了個log4j&#xff0c;后來log4j發現安全漏洞&#xff0c;加上代碼結構問題難以維護&#xff0c;于是從1.2就停止更新log4j&#xff0c;并又重…

美麗天天秒鏈動2+1源碼(新零售商城搭建)

什么是鏈動21模式&#xff1f; 鏈動21主要是建立團隊模式&#xff0c;同時快速提升銷量。是目前成員中速度最快的裂變模式。鏈動21模式合理合規&#xff0c;同時激勵用戶 公司的利潤分享機制&#xff0c;讓您在享受購物折扣的同時&#xff0c;也能促進并獲得客觀收益。 鏈動21模…

Python10天沖刺-設計模型之策略模式

策略模式是一種行為設計模式&#xff0c;它允許你在運行時動態地改變對象的行為。這種模式的核心思想是將一組相關的算法封裝在一起&#xff0c;并讓它們相互替換。 下面是使用 Python 實現策略模式的一個示例&#xff1a; 示例代碼 假設我們有一個簡單的購物車系統&#xf…

【CTFer成長之路】XSS的魔力

XSS闖關 level1 訪問url&#xff1a; http://c884a553-d874-4514-9c32-c19c7d7b6e1c.node3.buuoj.cn/level1?usernamexss 因為是xss&#xff0c;所以對傳參進行測試&#xff0c;修改?username1&#xff0c;進行訪問 會發現username參數傳入什么&#xff0c;welcome之后就…

自主機器人模擬系統

一、系統概述 本代碼實現了一個基于Pygame的2D自主機器人模擬系統&#xff0c;具備以下核心功能&#xff1a; 雙模式控制&#xff1a;支持手動控制&#xff08;WASD鍵&#xff09;和自動導航模式&#xff08;鼠標左鍵設定目標&#xff09; 智能路徑規劃&#xff1a;采用改進型…

快速上手非關系型數據庫-MongoDB

簡介 MongoDB 是一個基于文檔的 NoSQL 數據庫&#xff0c;由 MongoDB Inc. 開發。 NoSQL&#xff0c;指的是非關系型的數據庫。NoSQL有時也稱作Not Only SQL的縮寫&#xff0c;是對不同于傳統的關系型數據庫的數據庫管理系統的統稱。 MongoDB 的設計理念是為了應對大數據量、…

性能優化實踐:啟動優化方案

性能優化實踐&#xff1a;啟動優化方案 在Flutter應用開發中&#xff0c;啟動性能是用戶體驗的第一印象&#xff0c;也是應用性能優化的重要環節。本文將從理論到實踐&#xff0c;深入探討Flutter應用的啟動優化方案。 一、Flutter應用啟動流程分析 1. 啟動類型 冷啟動&…

在文本廢墟中打撈月光

在文本廢墟中打撈月光 ----再讀三三的《山頂上是海》之“暗室”所理 今天是2025年5月1日&#xff0c;傳統的“五一”小長假。當我早飯后“坐”在衛生間的那幾分鐘里&#xff0c;閨女和兒子就騎著家中僅有的兩輛電動車去了圖書館。我是該做些什么&#xff1f; 于是我左手拿著三…

C++之類和對象基礎

?向對象三?特性&#xff1a;封裝、繼承、多態 類和對象 一.類的定義1. 類的定義格式2.類域 二.實例化1.對象2.對象的大小 三.this指針 在 C 的世界里&#xff0c;類和對象構成了面向對象編程&#xff08;Object-Oriented Programming&#xff0c;OOP&#xff09;的核心框架&…

計算機網絡——HTTP/IP 協議通俗入門詳解

HTTP/IP 協議通俗入門詳解 一、什么是 HTTP 協議&#xff1f;1. 基本定義2. HTTP 是怎么工作的&#xff1f; 二、HTTP 協議的特點三、HTTPS 是什么&#xff1f;它和 HTTP 有啥區別&#xff1f;1. HTTPS 概述2. HTTP vs HTTPS 四、HTTP 的通信過程步驟詳解&#xff1a; 五、常見…

使用 Java 實現一個簡單且高效的任務調度框架

目錄 一、任務調度系統概述 &#xff08;一&#xff09;任務調度的目標 &#xff08;二&#xff09;任務調度框架的關鍵組成 二、任務狀態設計 &#xff08;一&#xff09;任務狀態流轉設計 &#xff08;二&#xff09;任務表設計&#xff08;SQL&#xff09; 三、單機任…

基于GPT 模板開發智能寫作輔助應用

目錄 項目說明 1. 項目背景 2. 項目目標 3. 功能需求 4. 技術選型 項目結構 詳細代碼實現 前端代碼(client) client/src/main.js client/src/App.vue client/src/components/HistoryList.vue 后端代碼(server) server/app.js server/routes/api.js server/mo…

linux 使用nginx部署next.js項目,并使用pm2守護進程

前言 本文基于&#xff1a;操作系統 CentOS Stream 8 使用工具&#xff1a;Xshell8、Xftp8 服務器基礎環境&#xff1a; node - 請查看 linux安裝node并全局可用pm2 - 請查看 linux安裝pm2并全局可用nginx - 請查看 linux 使用nginx部署vue、react項目 所需服務器基礎環境&…

使用huggingface_hub需要注意的事項

在安裝huggingface_hub的時候要注意如果你的python是放在c盤下時記得用管理員模式命令行來安裝huggingface_hub&#xff0c;否則安裝過程會報錯&#xff0c;之后也不會有huggingface-cli命令。 如果安裝時因為沒有用管理員權限安裝而報錯了&#xff0c;可以先卸載huggingface-…