007 訂單支付超時自動取消訂單(rabbitmq死信隊列 mybatis)

文章目錄

  • 死信隊列
    • RabbitMQ 配置類 RabbitMQConfig.java
    • 生產者 OrderTimeoutProducer.java
    • 消費者 OrderTimeoutConsumer.java
    • 應用配置 application.yml
    • pom.xml 依賴
    • 實體類 Order.java(不變)
    • Mapper 接口 OrderMapper.java(不變)
    • 服務層 OrderService.java(不變)
    • 緩存配置 CacheConfig.java(不變)
    • 對賬服務 ReconciliationTask.java(不變)
    • 控制器 OrderController.java(不變)

死信隊列

在代碼中,processedMsgCache的類型是Cache<String, Boolean>,鍵是消息ID,值是Boolean。當處理成功時,會執行processedMsgCache.put(msgId, true),將消息ID存入緩存,并設置有效期1小時,最大容量10000條。這樣,在1小時內,如果同樣的消息再次被處理,緩存中已經有記錄,就會直接ACK并跳過處理。

不過,這里需要注意的是,消息ID是否唯一。在RabbitMQ中,消息ID默認可能不是唯一的,生產者發送消息時可以設置messageId。如果生產者沒有明確設置,可能無法保證唯一性,這會導致冪等性檢查失效。因此,需要確保生產者發送消息時設置了唯一的messageId,或者在消費者端使用其他唯一標識,比如消息內容中的orderId結合其他屬性。

另外,緩存的過期時間設置為1小時,這可能與業務場景有關。如果消息的存活時間超過1小時,可能會有重復處理的風險。需要根據實際消息的存活時間來調整緩存的過期時間,確保覆蓋消息可能被重新投遞的時間窗口。

還有一個問題是,緩存是本地緩存,如果消費者有多個實例,每個實例的緩存是獨立的。這可能導致不同的實例處理同一條消息,因為一個實例處理過,但另一個實例的緩存中沒有記錄。這種情況下,本地緩存的冪等性檢查可能不夠,需要考慮分布式緩存,比如Redis,來保證全局唯一性。但根據當前代碼,在單實例或允許短暫重復的場景下使用本地緩存。

總結來說,冪等性檢查的邏輯是通過緩存已處理消息的ID,在消息處理前檢查是否已存在,存在則跳過處理,避免重復執行。這適用于消息隊列保證至少一次投遞,但業務需要確保冪等的場景。

                      +---------------------+|   RabbitMQ Message  ||  (攜帶唯一messageId)   |+----------+----------+|v
+----------------+       +-------+-------+       +-----------------+
|  消息到達消費者   | ----> | 檢查緩存是否存在 | ----> | 存在:直接ACK丟棄消息 |
+----------------+       +-------+-------+       +-----------------+|| 不存在v+-------+-------+       +-----------------+| 執行業務邏輯處理  | ----> | 成功:存入緩存并ACK |+---------------+       +-----------------+

緩存過期時間(1小時)> 消息最大存活時間(30分鐘+重試時間)
計算公式:緩存過期時間 = 消息TTL + 最大重試時間 * 重試次數 + 緩沖時間

緩存擊穿空值緩存對不存在的key也進行緩存(需設置較短過期時間)
緩存穿透布隆過濾器在緩存前增加過濾層
消費者重啟持久化存儲配合數據庫記錄處理狀態
網絡分區最終一致性依賴對賬服務修正狀態
組件類型作用說明
processedMsgCacheCaffeine緩存存儲已處理消息的唯一標識
messageId字符串消息唯一標識(需生產者保證唯一性)
deliveryTag長整型RabbitMQ消息投遞標識
sequenceDiagramparticipant RabbitMQparticipant Consumerparticipant Cacheparticipant DBRabbitMQ->>Consumer: 投遞消息(messageId=123)Consumer->>Cache: 查詢messageId=123alt 存在緩存Cache-->>Consumer: 返回trueConsumer->>RabbitMQ: 發送ACKelse 無緩存Consumer->>DB: 執行取消操作alt 操作成功Consumer->>Cache: 寫入messageId=123Consumer->>RabbitMQ: 發送ACKelse 操作失敗Consumer->>RabbitMQ: 發送NACK(requeue=true)endend

RabbitMQ 配置類 RabbitMQConfig.java

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 訂單超時相關配置public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";public static final String ORDER_DELAY_QUEUE = "order.delay.queue";public static final String ORDER_DELAY_ROUTING_KEY = "order.delay";// 死信隊列配置public static final String ORDER_DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange";public static final String ORDER_DEAD_LETTER_QUEUE = "order.dead.letter.queue";public static final String ORDER_DEAD_LETTER_ROUTING_KEY = "order.dead.letter";// 聲明延遲隊列(設置死信參數)@Beanpublic Queue orderDelayQueue() {return QueueBuilder.durable(ORDER_DELAY_QUEUE).withArgument("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", ORDER_DEAD_LETTER_ROUTING_KEY).build();}// 聲明延遲交換機@Beanpublic DirectExchange orderDelayExchange() {return new DirectExchange(ORDER_DELAY_EXCHANGE);}// 綁定延遲隊列到交換機@Beanpublic Binding delayBinding() {return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);}// 聲明死信隊列@Beanpublic Queue deadLetterQueue() {return new Queue(ORDER_DEAD_LETTER_QUEUE, true);}// 聲明死信交換機@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(ORDER_DEAD_LETTER_EXCHANGE);}// 綁定死信隊列到交換機@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ORDER_DEAD_LETTER_ROUTING_KEY);}// JSON 消息轉換器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}

生產者 OrderTimeoutProducer.java

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class OrderTimeoutProducer {private final RabbitTemplate rabbitTemplate;public OrderTimeoutProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendTimeoutMessage(String orderId) {// 設置消息過期時間為30分鐘(單位:毫秒)MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("1800000");return message;}};rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_DELAY_EXCHANGE,RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,orderId,messagePostProcessor);}
}

消費者 OrderTimeoutConsumer.java

import com.github.benmanes.caffeine.cache.Cache;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Component
public class OrderTimeoutConsumer {private final OrderService orderService;private final Cache<String, Boolean> processedMsgCache;public OrderTimeoutConsumer(OrderService orderService, Cache<String, Boolean> processedMsgCache) {this.orderService = orderService;this.processedMsgCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).maximumSize(10000).build();}@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)public void processMessage(Message message, Channel channel) throws IOException {String orderId = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 冪等性檢查if (processedMsgCache.getIfPresent(messageId) != null) {channel.basicAck(deliveryTag, false);return;}boolean success = orderService.safeCancel(orderId);if (success) {processedMsgCache.put(messageId, true);System.out.println("訂單超時取消成功: " + orderId);}channel.basicAck(deliveryTag, false);} catch (Exception e) {// 記錄錯誤日志,重新放回隊列channel.basicNack(deliveryTag, false, true);System.err.println("處理訂單超時取消失敗: " + orderId);e.printStackTrace();}}
}

應用配置 application.yml

spring:rabbitmq:host: ${RABBITMQ_HOST:localhost}port: 5672username: ${RABBITMQ_USER:guest}password: ${RABBITMQ_PASSWORD:guest}virtual-host: /connection-timeout: 5000template:retry:enabled: truemax-attempts: 3initial-interval: 1000mslistener:simple:acknowledge-mode: manual # 手動確認模式prefetch: 10 # 每次預取數量retry:enabled: truemax-attempts: 3initial-interval: 1000ms

pom.xml 依賴

<!-- 移除 RocketMQ 依賴 -->
<!-- 添加 RabbitMQ 依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

實體類 Order.java(不變)

public class Order {// 保持原有實現
}

Mapper 接口 OrderMapper.java(不變)

@Mapper
public interface OrderMapper {// 保持原有SQL操作
}

服務層 OrderService.java(不變)

@Service
public class OrderService {// 保持原有業務邏輯
}

緩存配置 CacheConfig.java(不變)

@Configuration
public class CacheConfig {// 保持原有緩存配置
}

對賬服務 ReconciliationTask.java(不變)

@Component
public class ReconciliationTask {// 保持原有定時任務邏輯
}

控制器 OrderController.java(不變)

@RestController
@RequestMapping("/orders")
public class OrderController {// 保持原有API接口
}

關鍵差異對比

功能點RocketMQ 實現RabbitMQ 實現
延遲機制內置延遲級別TTL+死信隊列
消息存儲持久化到CommitLog內存+磁盤持久化
消費確認自動ACK手動ACK+重試機制
消息追蹤原生支持消息軌跡需要額外實現
集群方案主從復制鏡像隊列

部署注意事項
隊列初始化:確保首次啟動時自動創建所需交換機和隊列
消息持久化:所有隊列聲明時設置durable=true
監控配置:需要監控以下指標:
死信隊列消息堆積量
消費者處理耗時
消息TTL過期情況
集群配置:建議至少3節點RabbitMQ集群
故障轉移:配置鏡像隊列實現高可用

性能優化建議
批量處理:

@RabbitListener(queues = QUEUE, containerFactory = "batchFactory")
public void processMessages(List<Message> messages) {// 批量查詢緩存Map<String, Boolean> existMap = cache.getAllPresent(messageIds);// 過濾已處理消息// 批量執行數據庫操作
}

異步更新:

CompletableFuture.runAsync(() -> {processedMsgCache.put(messageId, true);
});

分級緩存:

Cache<String, Boolean> L1 = Caffeine.newBuilder().maximumSize(1000).build();
Cache<String, Boolean> L2 = RedisClient.getCache();

首先看CacheConfig.java,里面有一個orderStatusCache的Bean,使用Caffeine配置,最大大小10000,過期時間10分鐘。然后在OrderService.java中,有一個注入的Cache<String, OrderStatus> orderCache。看起來這里應該用的是這個Bean,因為在OrderService里用了@RequiredArgsConstructor,所以orderCache這個字段會被構造器注入,而對應的Bean應該就是CacheConfig里的orderStatusCache。所以雖然用戶可能沒有顯式地用@Autowired,但是因為用了Lombok的@RequiredArgsConstructor,如果orderCache是final的話,Spring會自動注入同類型的Bean。這時候需要檢查OrderService中的orderCache的類型是否匹配。
看OrderService的定義:private final Cache<String, OrderStatus> orderCache; 而CacheConfig中的Bean也是這個類型,所以應該會被正確注入。所以這個Bean是有被使用的,可能沒注意到OrderService里的注入,導致認為沒有用到。
另外,在OrderTimeoutConsumer.java中,還有一個processedMsgCache,這個是用Caffeine直接new的,并沒有使用Spring管理的Bean。所以這個緩存是消費者自己創建的,和CacheConfig無關。而orderStatusCache是被OrderService使用的,所以這個配置是必須的,否則OrderService中的orderCache會找不到對應的Bean,導致啟動失敗。
所以可能在于誤解了哪里用到了這個Bean。可能在代碼中沒有顯式地看到@Autowired或者@Resource,但實際上因為Lombok的@RequiredArgsConstructor和Spring的自動注入機制,這個Bean是被OrderService使用的。如果沒有CacheConfig中的Bean,Spring啟動時會報錯,提示找不到符合條件的Cache類型的Bean來注入到OrderService中。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/72214.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/72214.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/72214.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

計算機畢業設計SpringBoot+Vue.js智慧圖書管理系統(源碼+文檔+PPT+講解)

溫馨提示&#xff1a;文末有 CSDN 平臺官方提供的學長聯系方式的名片&#xff01; 溫馨提示&#xff1a;文末有 CSDN 平臺官方提供的學長聯系方式的名片&#xff01; 溫馨提示&#xff1a;文末有 CSDN 平臺官方提供的學長聯系方式的名片&#xff01; 作者簡介&#xff1a;Java領…

《論數據分片技術及其應用》審題技巧 - 系統架構設計師

論數據分片技術及其應用寫作框架 一、考點概述 本論題“論數據分片技術及其應用”主要考察的是軟件工程中數據分片技術的理解、應用及其實際效果分析。考點涵蓋以下幾個方面&#xff1a; 首先&#xff0c;考生需對數據分片的基本概念有清晰的認識&#xff0c;理解數據分片是…

【每日學點HarmnoyOS Next知識】web加載pdf、Toggle禁用、Grid多次渲染問題、Web判斷是否存在title、 List側滑欄關閉

【每日學點HarmnoyOS Next知識】web加載pdf、Toggle禁用、Grid多次渲染問題、Web判斷是否存在title、 List側滑欄關閉 1、HarmonyOS Web組件加載本地pdf文件后&#xff0c;默認顯示標題和下載按鈕&#xff0c;可以隱藏或者有對應的操作這個title的API嗎&#xff1f; 隱藏PDF操…

下載 MindSpore 配置 PyTorch環境

以下是下載 MindSpore 并配置 PyTorch 環境的詳細步驟&#xff0c;適用于常見的 Linux/Windows 系統&#xff08;以 NVIDIA GPU 為例&#xff09;&#xff1a; 一、環境準備 1. 硬件與軟件檢查 GPU 支持&#xff1a;確保使用 NVIDIA 顯卡&#xff0c;通過 nvidia-smi 查看驅動…

三、數據提取

利用 requests 可以獲取網站頁面數據&#xff0c;但是 requests 返回的數據中包含了一些冗余數據&#xff0c;我們需要在這些數據集中提取自己需要的信息。所以我們要學會在數據集中提取自己需要的數據。 需要掌握的知識點如下&#xff1a; json 數據提取 jsonpath 語法 靜態…

Qt | 實戰繼承自QObject的IOThread子類實現TCP客戶端(安全銷毀)

點擊上方"藍字"關注我們 01、QThread >>> start() 啟動線程,調用后會執行 run() 方法。 run() 線程的入口點,子類化 QThread 時需要重寫此方法以定義線程的執行邏輯。 quit() 請求線程退出,線程會在事件循環結束后終止。 exit(int returnCode = 0) 退出…

int new_pos = (pos + delta + 9) % 9 化曲為直算法

公式 int new_pos (pos delta 9) % 9; 是一個常見的 循環數組索引計算 方法&#xff0c;用于處理圓圈排列中的位置計算。這個公式可以總結出一個普遍的規律&#xff0c;適用于任何循環數組或圓圈排列的場景。 普遍規律 假設有一個長度為 ( n ) 的循環數組&#xff08;或圓圈…

生成一個日期時間序列,從‘2024-12-03‘開始,每小時遞增 oracle 轉為達夢

-------------------------------生成一個日期時間序列&#xff0c;從2024-12-03開始&#xff0c;每小時遞增---------------------------- ---原oracle : SELECT to_date(2024-12-03, yyyy-mm-dd) (ROWNUM - 1) / 24 data_time FROM dual CO…

前端學習——HTML

VSCode常用快捷鍵 代碼格式化&#xff1a;ShiftAltF 向上或向下移動一行&#xff1a;AltUp或AltDown 快速復制一行代碼&#xff1a;ShiftAltUp或者ShiftAltDown 快速替換&#xff1a;CtrlH HTML標簽 文本標簽 定義著重文字 定義粗體文字 定義斜體文字 加重語氣 刪除字 無特…

Hadoop之02:MR-圖解

1、不是所有的MR都適合combine 1.1、map端統計出了不同班級的每個學生的年齡 如&#xff1a;(class1, 14)表示class1班的一個學生的年齡是14歲。 第一個map任務&#xff1a; class1 14 class1 15 class1 16 class2 10第二個map任務&#xff1a; class1 16 class2 10 class…

C++核心編程之STL

STL初識&#xff1a;從零開始的奇幻冒險 1 STL的誕生&#xff1a;一場代碼復用的革命 很久很久以前&#xff0c;在編程的世界里&#xff0c;開發者們每天都在重復造輪子。無論是數據結構還是算法&#xff0c;每個人都得從頭開始寫&#xff0c;仿佛在無盡的沙漠中尋找綠洲。直到…

【Python】OpenCV算法使用案例全解

OpenCV算法使用案例全解 前言 OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一個開源的計算機視覺和機器學習軟件庫&#xff0c;它提供了大量的圖像和視頻處理功能。從簡單的圖像濾波到復雜的三維重建&#xff0c;OpenCV涵蓋了計算機視覺領域的眾多算…

Redis的持久化-RDBAOF

文章目錄 一、 RDB1. 觸發機制2. 流程說明3. RDB 文件的處理4. RDB 的優缺點 二、AOF1. 使用 AOF2. 命令寫?3. 文件同步4. 重寫機制5 啟動時數據恢復 一、 RDB RDB 持久化是把當前進程數據生成快照保存到硬盤的過程&#xff0c;觸發 RDB 持久化過程分為手動觸發和自動觸發。 …

Python Cookbook-2.29 帶版本號的文件名

任務 如果你想在改寫某文件之前對其做個備份&#xff0c;可以在老文件的名字后面根據慣例加上三個數字的版本號。 解決方案 我們需要編寫一個函數來完成備份工作: def VersionFile(file_spec, vtypecopy):import os,shutilif os.path.isfile(file_spec):#檢查vtype參數if v…

CCF-CSP認證 202104-1灰度直方圖

題目描述 思路 首先輸入矩陣長度、矩陣寬度和灰度范圍&#xff0c;結果數組長度可固定&#xff0c;其中的元素要初始化為0。在輸入灰度值的時候&#xff0c;結果數組中以該灰度值為索引的元素值1&#xff0c;即可統計每個灰度值的數量。 代碼 C版&#xff1a; #include <…

水果識別系統 | BP神經網絡水果識別系統,含GUI界面(Matlab)

使用說明 代碼下載&#xff1a;BP神經網絡水果識別系統&#xff0c;含GUI界面&#xff08;Matlab&#xff09; BP神經網絡水果識別系統 一、引言 1.1、研究背景及意義 在當今科技迅速發展的背景下&#xff0c;人工智能技術尤其是在圖像識別領域的應用日益廣泛。水果識別作為…

如何在網頁上顯示3D CAD PMI

在現代制造業中&#xff0c;3D CAD模型已成為產品設計和制造的核心。為了更有效地傳達設計意圖和制造信息&#xff0c;產品和制造信息&#xff08;PMI&#xff09;被嵌入到3D模型中。然而&#xff0c;如何在網頁上清晰、準確地顯示這些3D CAD PMI&#xff0c;成為了一個重要的技…

Git基本命令索引

GIT基本命令索引 創建代碼庫修改和提交代碼日志管理遠程操作操作分支 創建代碼庫 操作指令初始化倉庫git init克隆遠程倉庫git clone 修改和提交代碼 操作指令查看文件狀態git status文件暫存git add文件比較git diff文件提交git commit回滾版本git reset重命名或者移動工作…

基于Selenium的Python淘寶評論爬取教程

文章目錄 前言1. 環境準備安裝 Python&#xff1a;安裝 Selenium&#xff1a;下載瀏覽器驅動&#xff1a; 2. 實現思路3. 代碼實現4. 代碼解釋5. 注意事項 前言 以下是一個基于 Selenium 的 Python 淘寶評論爬取教程&#xff0c;需要注意的是&#xff0c;爬取網站數據應當遵守…

GenBI 可視化選誰:Python Matplotlib?HTML ?Tableau?

引言 生成式 BI(Generative BI,GenBI)通過自然語言交互和自動化內容生成,革新了數據分析和商業智能(BI)領域。用戶可以通過自然語言提問,GenBI 系統自動生成相應的 SQL 查詢、獲取數據,并以可視化圖表、表格、自然語言摘要等形式呈現分析結果。 可視化是 GenBI 的關鍵…