4.RabbitMQ - 延遲消息

RabbitMQ延遲消息

文章目錄

  • RabbitMQ延遲消息
  • 一、延遲消息介紹
  • 二、實現
    • 2.1 死信交換機
    • 2.2 延遲消息插件
    • 2.3 取消超時訂單

一、延遲消息介紹

延遲消息:生產者發送消息時指定一個時間,消費者不會立刻收到消息,而是在指定時間后才收到消息

用戶下單搶購,搶到了但是沒有付款,此時其實庫存的數量已經扣減了

image-20240524101232243

如果用戶遲遲沒有付款,超過一定的時間,就會將此訂單取消掉,庫存的數量也會重新加回來

我們可以定義一個定時任務掃描數據中訂單的狀態,超過一定時間沒有付款的,我們就將訂單取消

延遲任務:設置在一定時間之后才執行的任務

當用戶下單成功后,立刻向MQ中發送一條延遲消息,設定延遲時間30分鐘,30分鐘到了之后就可以收到此消息,檢查訂單狀態,如果發現未支付,則訂單直接取消。

這樣解決了實效性的問題,同時對數據庫的壓力也很小

二、實現

2.1 死信交換機

當隊列滿足下列的條件之一時就會稱為死信(dead letter)

  • 消費者使用basic.reject或basic.nack聲明消費失敗,并且消息的requeue參數設置為false

    消費者不要這個消息了

  • 消息是一個過期的消息(達到了隊列或消息本身設置的過期時間),超時無人消費

  • 要投遞的隊列消息堆積滿了,最早的消息可能成為死信

如果隊列通過dead-letter-exchange屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中。這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX)

死信交換機只是一種稱呼,和普通的交換機其實是一樣的

我們不給simple.queue隊列綁定消費者,給dlx.queue綁定一個消費者

因為simple.queue隊列沒有消費者,所以不會有人來消費,當有人通過simple.direct交換機向simple.queue隊列發送一條過期時間為30秒的消息,此消息就會在simple.queue隊列卡主

image-20240524103939964

過了30s后,消息就會自動投遞到dlx.direct死信交換機,然后進入dlx.queue隊列,最終消費者拿到后會進行消費

利用死信交換機、死信隊列。過期時間的方式,模擬出了延遲消息的效果

image-20240524104129199

驗證一下

  1. 在控制臺創建simple.direct交換機

image-20240524104741666

  1. 將此交換機與simple.queue綁定

image-20240524104914383

注意!simple.queue并沒有綁定到消費者,進入到simple.queue隊列的消息都會變成死信

  1. 創建隊列dlx.queue和dlx.direct并將其綁定

創建隊列

image-20240524105129163

創建交換機

image-20240524111001370

進行綁定

image-20240524105340163

  1. 給simple.queue隊列設定死信交換機

注意,這個地方只能是在創建隊列的時候進行綁定

image-20240524110834121

  1. 在消費者模塊代碼中定義兩個隊列simple.queue、dlx.queue
//檢查一下,一定不要有simple.queue的消費者
//@RabbitListener(queues = "simple.queue")
//public void listenSimpleQueue(String msg){
//    System.out.println("消費者收到了simple.queue的消息:【" + msg +"】");
//    throw new RuntimeException("拋出異常了");
//}@RabbitListener(queues = "dlx.queue")
public void listenDlxQueue(String msg){log.info("消費者收到了dlx.queue的消息:【" + msg +"】");
}
  1. 發送消息

在控制臺中下面的這個屬性是帶過期時間的屬性

image-20240524111749232

Java代碼中的發送消息如下所示

@Test
void testSendTTLMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setExpiration("10000") //過期時間10s.build();//發送到死信隊列rabbitTemplate.convertAndSend("simple.direct", "hi", message);//直接向隊列發送消息log.info("消息發送成功!");
}

simple.queue隊列中10s內始終存在下面的一條消息

image-20240524112750348

dlx.queue隊列的消費者在10s后會接收到消息

2.2 延遲消息插件

這種定時功能,都是有一定的性能損耗的(Redis除外)

MQ或者Spring的定時功能是在程序內部維護一個時鐘,比如每隔一秒就往前跳一次,這種時鐘的運行過程中CPU就需要不停地計算,定時任務越多,對于CPU的占用越大,定時任務屬于一種CPU密集型的任務

采用延遲消息帶來的弊端就是給服務器CPU造成的額外壓力比較大

使用交換機實現延遲消息非常的繁瑣,需要定義很多的交換機和隊列,而且死信交換機的目的是為了讓我們人工處理死信消息,并不是為了延遲消息而生的

延遲消息的插件能自動實現延遲效果

RabbitMQ官方也推出了一個插件,原生支持延遲消息功能

該插件的原理是設計了一種支持延遲消息功能的交換機,當消息投遞到交換機后可以暫存一定時間,到期后再投遞到隊列

暫存的時間取決于發消息時配置的時間(也就是延遲時間)

在Java代碼中配置延遲交換機的兩種方式

在聲明交換機的時候,需要多添加一個參數delayed=“true”

  • 注解的方式

    在消費者模塊聲明交換機、隊列

@RabbitListener(bindings = @QueueBinding(//隊列value = @Queue(name = "delay.queue", durable = "true"),//交換機exchange = @Exchange(name = "hmall.direct", delayed = "true"),//Routing keykey = "delay"
))
public void listenDelayMessage(String msg) {log.info("接收到delay.queue的延遲消息【" + msg + "】");
}
  • 注入Bean的方式

    這種方式只聲明了交換機

@Bean
public DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct").delayed() //設置delay的屬性為true 主要是這個.durable(true) //持久化.build();
}

發送延遲消息的Java代碼

    @Testvoid testSendDelayMessage() {
//        Message message = MessageBuilder
//                .withBody("hello".getBytes(StandardCharsets.UTF_8))
//                .setExpiration("10000") //過期時間10s
//                .build();
//        //發送到死信隊列rabbitTemplate.convertAndSend("dela.direct", "hi", "hello", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");//延遲十秒return message;}});//直接向隊列發送消息log.info("消息發送成功!");}

2.3 取消超時訂單

設置三十分鐘后檢測訂單支付狀態,存在兩個問題

  • 如果并發比較高,30分鐘可能堆積消息過多,對MQ的壓力很大

  • 大多數訂單在下單后1分鐘內就會支付,但是卻需要再MQ內等待30分鐘,浪費資源

    30分鐘太長,可以縮短為10s,10s后立刻來檢查有沒有支付

    假如10s后沒有支付,可以再發一個10s的延遲消息,直到成功后不再發送延遲消息

    這樣的話MQ的壓力會減少很多

image-20240524162518090

處理如下所示:

查詢支付狀態的時候,需要先查詢本地,之后再查詢支付服務,查完之后判斷支付狀態

image-20240524164718756

定義延時消息時間數組

package com.hmall.common.domain;import com.hmall.common.utils.CollUtils;
import lombok.Data;import java.util.List;@Data
public class MultiDelayMessage<T> {/*** 消息體*/private T data;/*** 記錄延遲時間的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 獲取并移除下一個延遲時間* @return 隊列中的第一個延遲時間*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否還有下一個延遲時間*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

定義好對應的交換機和隊列

@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.order.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),key = "pay.success"))public void listenOrderPay(Long orderId) {/* // 1.查詢訂單Order order = orderService.getById(orderId);// 2.判斷訂單狀態是否為未支付if(order == null || order.getStatus() != 1){// 訂單不存在,或者狀態異常return;}// 3.如果未支付,標記訂單狀態為已支付orderService.markOrderPaySuccess(orderId);*/// update order set status = 2 where id = ? AND status = 1orderService.lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();}
}

在方法中發送延遲檢查訂單狀態的消息

    @Override@GlobalTransactionalpublic Long createOrder(OrderFormDTO orderFormDTO) {// 1.訂單數據Order order = new Order();// 1.1.查詢商品List<OrderDetailDTO> detailDTOS = orderFormDTO.getDetails();// 1.2.獲取商品id和數量的MapMap<Long, Integer> itemNumMap = detailDTOS.stream().collect(Collectors.toMap(OrderDetailDTO::getItemId, OrderDetailDTO::getNum));Set<Long> itemIds = itemNumMap.keySet();// 1.3.查詢商品List<ItemDTO> items = itemClient.queryItemByIds(itemIds);if (items == null || items.size() < itemIds.size()) {throw new BadRequestException("商品不存在");}// 1.4.基于商品價格、購買數量計算商品總價:totalFeeint total = 0;for (ItemDTO item : items) {total += item.getPrice() * itemNumMap.get(item.getId());}order.setTotalFee(total);// 1.5.其它屬性order.setPaymentType(orderFormDTO.getPaymentType());order.setUserId(UserContext.getUser());order.setStatus(1);// 1.6.將Order寫入數據庫order表中save(order);// 2.保存訂單詳情List<OrderDetail> details = buildDetails(order.getId(), items, itemNumMap);detailService.saveBatch(details);// 3.扣減庫存try {itemClient.deductStock(detailDTOS);} catch (Exception e) {throw new RuntimeException("庫存不足!");}// 4.清理購物車商品// cartClient.deleteCartItemByIds(itemIds);try {rabbitTemplate.convertAndSend(MqConstants.TRADE_EXCHANGE_NAME, MqConstants.ORDER_CREATE_KEY,itemIds/*,new RelyUserInfoMessageProcessor()*/);} catch (AmqpException e) {log.error("清理購物車的消息發送異常", e);}// 5.延遲檢測訂單狀態消息try {MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,new DelayMessageProcessor(msg.removeNextDelay().intValue()));} catch (AmqpException e) {log.error("延遲消息發送異常!", e);}return order.getId();}

將MessagePostProcessMessage對象提取出來了,不用每次都new了

@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}

或者是使用下面視頻里面的代碼

但是下面的代碼每次都要使用一個內部類

image-20240524174351318

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/78716.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/78716.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/78716.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

5.學習筆記-SpringMVC(P53-P60)

1.響應 &#xff08;1&#xff09;響應頁面 &#xff08;2&#xff09;響應數據&#xff08;異步提交&#xff09;&#xff1a;文本數據、json數據 2.REST風格 (1)REST:表現形式狀態轉換。 (2)傳統風格資源描述形式 3.Restful入門案例 5.基于RESTful頁面數據…

Golang | 搜索表達式

// (( A | B | C ) & D ) | E & (( F | G ) & H )import "strings"// 實例化一個搜索表達式 func NewTermQuery(field, keyword string) *TermQuery {return &TermQuery{Keyword: &Keyword{Field: field, Word: keyword},} }func (tq *TermQuery…

LangChain構建大模型應用之RAG

RAG(Retrieval-augmented Generation 檢索增強生成)是一種結合信息檢索與生成模型的技術,通過動態整合外部知識庫提升大模型輸出的準確性和時效性。其核心思想是在生成答案前,先檢索外部知識庫中的相關信息作為上下文依據,從而突破傳統生成模型的靜態知識邊界。 為什么我們…

Ubuntu 下 Nginx 1.28.0 源碼編譯安裝與 systemd 管理全流程指南

一、環境與依賴準備 為確保編譯順利&#xff0c;我們首先更新系統并安裝必要的編譯工具和庫&#xff1a; sudo apt update sudo apt install -y build-essential \libpcre3 libpcre3-dev \zlib1g zlib1g-dev \libssl-dev \wgetbuild-essential&#xff1a;提供 gcc、make 等基…

第十二章-PHP文件上傳

第十二章-PHP文件上傳 一&#xff0c;文件上傳原理 一、HTTP協議與文件上傳 1. 請求體結構 當表單設置enctype"multipart/form-data"時&#xff0c;瀏覽器會將表單數據編碼為多部分&#xff08;multipart&#xff09;格式。 Boundary分隔符&#xff1a;隨機生成的…

CSS元素動畫篇:基于當前位置的變換動畫(三)

基于當前位置的變換動畫&#xff08;三&#xff09; 前言縮放效果類元素動畫脈沖動畫效果效果預覽代碼實現 橡皮筋動畫效果效果預覽代碼實現 果凍動畫效果效果預覽代碼實現 歡呼動畫效果效果預覽代碼實現 心跳動畫效果效果預覽代碼實現 結語 前言 CSS元素動畫一般分為兩種&…

Redis ssd是什么?Redis 內存空間優化的點都有哪些?embstr 和 row、intset、ziplist分別是什么?

Redis SSD 是什么&#xff1f; Redis SSD 通常指 Redis 使用 SSD&#xff08;固態硬盤&#xff09;作為持久化存儲介質的場景。雖然 Redis 是內存數據庫&#xff08;數據主要駐留內存&#xff09;&#xff0c;但其持久化機制&#xff08;如 RDB 快照和 AOF 日志&#xff09;需…

【藍橋杯】 數字詩意

數字詩意 在詩人的眼中&#xff0c;數字是生活的韻律&#xff0c;也是詩意的表達。 小藍&#xff0c;當代頂級詩人與數學家&#xff0c;被賦予了”數學詩人”的美譽。他擅長將冰冷的數字與抽象的詩意相融合&#xff0c;并用優雅的文字將數學之美展現于紙上。 某日&#xff0…

DHCP 服務器運行流程圖

以常見的 DHCP v4 為例,其完整流程如下: 一、客戶端請求 IP 地址階段 DHCPDiscover:客戶端啟動后,會以廣播的形式發送 DHCPDiscover 報文,目的是在網絡中尋找可用的 DHCP 服務器。該報文中包含客戶端的 MAC 地址等信息,以便服務器能夠識別客戶端。DHCPOffer:網絡中的 D…

一種企業信息查詢系統設計和實現:xujian.tech/cs

一種企業信息查詢系統設計和實現&#xff1a;xujian.tech/cs 背景與定位 企業在對外合作、風控審查或市場調研時&#xff0c;常需快速獲取公開的工商信息。本文介紹一個企業信息搜索引擎&#xff0c;面向普通用戶與開發者&#xff0c;幫助快速定位企業名稱、統一社會信用代碼…

前端面試高頻算法

前端面試高頻算法 1 排序算法&#xff1b;1.1 如何分析一個排序算法1.1.1 執行效率3.1.2 內存消耗1.1.3 穩定性 1.2 冒泡排序&#xff08;Bubble Sort&#xff09;1.3 插入排序&#xff08;Insertion Sort&#xff09;1.4 選擇排序&#xff08;Selection Sort&#xff09;1.5 歸…

C++初階-模板初階

目錄 1.泛型編程 2.函數模板 2.1函數模板概念 2.2實現函數模板 2.3模板的原理 2.4函數模板的實例化 2.4.1隱式實例化 2.4.2顯式初始化 2.5模板參數的匹配原則 3.類模板 3.1類模板定義格式 3.2類模板的實例化 4.總結 1.泛型編程 對廣泛的類型法寫代碼&#xff0c;我…

「Mac暢玩AIGC與多模態02」部署篇01 - 在 Mac 上部署 Ollama + Open WebUI

一、概述 本篇介紹如何在 macOS 環境下本地部署 Ollama 推理服務,并通過 Open WebUI 實現可視化交互界面。該流程無需 CUDA 或專用驅動,適用于 M 系列或 Intel 芯片的 Mac,便于快速測試本地大語言模型能力。 二、部署流程 1. 環境準備 安裝 Homebrew(如尚未安裝):/bin…

JavaScript 中 undefined 和 not defined 的區別

在 JavaScript 的調試過程中&#xff0c;你是否經常看到 undefined 卻不知其來源&#xff1f;是否曾被 ReferenceError: xxx is not defined 的錯誤提示困擾&#xff1f;這兩個看似相似的概念&#xff0c;實際上是 JavaScript 類型系統中最重要的分水嶺。本文將帶你撥開迷霧&am…

django admin AttributeError: ‘UserResorce‘ object has no attribute ‘ID‘

在 Django 中遇到 AttributeError: ‘UserResource’ object has no attribute ‘ID’ 這類錯誤通常是因為你在代碼中嘗試訪問一個不存在的屬性。在你的例子中&#xff0c;錯誤提示表明 UserResource 類中沒有名為 ID 的屬性。這可能是由以下幾個原因造成的&#xff1a; 拼寫錯…

對鴻蒙 Next 系統“成熟論”的深度剖析-優雅草卓伊凡

對鴻蒙 Next 系統“成熟論”的深度剖析-優雅草卓伊凡 在科技飛速發展的當下&#xff0c;鴻蒙 Next 系統無疑成為了眾多科技愛好者與行業人士關注的焦點。今日&#xff0c;卓伊凡便收到這樣一個饒有趣味的問題&#xff1a;鴻蒙 Next 系統究竟需要多長時間才能完全成熟&#xff…

快速上手GO的net/http包,個人學習筆記

更多個人筆記&#xff1a;&#xff08;僅供參考&#xff0c;非盈利&#xff09; gitee&#xff1a; https://gitee.com/harryhack/it_note github&#xff1a; https://github.com/ZHLOVEYY/IT_note 針對GO中net/http包的學習筆記 基礎快速了解 創建簡單的GOHTTP服務 func …

AI-Browser適用于 ChatGPT、Gemini、Claude、DeepSeek、Grok的客戶端開源應用程序,集成了 Monaco 編輯器。

一、軟件介紹 文末提供程序和源碼下載學習 AI-Browser適用于 ChatGPT、Gemini、Claude、DeepSeek、Grok、Felo、Cody、JENOVA、Phind、Perplexity、Genspark 和 Google AI Studio 的客戶端應用程序&#xff0c;集成了 Monaco 編輯器。使用 Electron 構建的強大桌面應用程序&a…

Dify框架面試內容整理-Dify如何處理知識庫的集成?

Dify 在知識庫集成方面采用了“檢索增強生成(RAG)”的技術架構,核心實現思路如下: 一、知識庫集成的整體流程 Dify處理知識庫集成通常包括以下關鍵步驟: 文檔上傳↓

Laravel 模型使用全局作用域和局部作用域

一. 需要解決什么問題 最近Laravel 項目中遇到一個需求&#xff0c;我有一個客戶表&#xff0c;每個員工都有自己的客戶&#xff0c;但是自己只能看自己的客戶。 項目中&#xff0c;有很多功能需要查詢客戶列表&#xff0c;客戶詳情&#xff0c;查詢客戶入口很多&#xff0c;…