RabbitMQ延時隊列的兩種實現方式

目錄

一、延時插件實現

1、版本要求

2、為運行新容器時安裝

3、為已運行的容器安裝

4、驗證安裝

5、代碼編寫

1. 配置類

2. 生產者

3. 消費者

二、死信隊列實現

1、代碼編寫

1. 配置類

2. 生產者

3. 消費者

三、踩坑記錄

1、發送消息失敗

2、消息過期后未能轉發到死信隊列

3、消費者消費報錯


一、延時插件實現

1、版本要求

RabbitMQ 3.5.7以上

2、為運行新容器時安裝

# 1. 拉取帶管理界面的鏡像
docker pull rabbitmq:3.11-management
?
# 2. 啟動容器并啟用插件
docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3.11-management \bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"

3、為已運行的容器安裝

# 1. 進入正在運行的容器
docker exec -it rabbitmq /bin/bash
?
# 2. 在容器內執行插件安裝
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
?
# 3. 退出容器
exit
?
# 4. 重啟容器使插件生效
docker restart rabbitmq

4、驗證安裝

# 方法1:檢查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
?
# 方法2:登錄管理界面
# 訪問 http://localhost:15672 (使用設置的賬號密碼登錄)
# 在 "Exchanges" 標簽頁創建交換機時,Type 下拉框會出現 "x-delayed-message" 選項

5、代碼編寫

1. 配置類

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 交換機類型return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", // 固定類型true,false,args);}
?@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}
?@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}

2. 生產者

public void send(String exchange, String routing_key,Object data, Integer delayMillis) {// 消息后處理器:設置延時和持久化MessagePostProcessor processor = message -> {// 毫秒message.getMessageProperties().setDelay(delayMillis);// 持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;};
?rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}

3. 消費者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
?@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消費成功,消息內容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
?
}

二、死信隊列實現

1、代碼編寫

1. 配置類

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
?public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";// 死信隊列(延時隊列)@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(DELAYED_QUEUE).build();}
?// 死信交換機@Beanpublic DirectExchange delayedExchange() {return new DirectExchange(DELAYED_EXCHANGE);}
?// 綁定死信隊列到死信交換機@Beanpublic Binding delayedBinding(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY);}
?// 普通隊列@Beanpublic Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE).deadLetterRoutingKey(DELAYED_ROUTING_KEY).build();}
?// 普通交換機@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}
?// 綁定普通隊列到普通交換機@Beanpublic Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}

2. 生產者

public void send(String exchange, String routing_key, Object data, Integer delayMillis) {String uuid = IdUtil.simpleUUID();// 消息入庫略,uuid為主鍵MessageProperties properties = new MessageProperties();// 設置TTL,單位毫秒properties.setExpiration(String.valueOf(delayMillis));// 消息持久化(2 表示持久化)properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
?Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}

3. 消費者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
?@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消費成功,消息內容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
?
}

三、踩坑記錄

1、發送消息失敗

原因RabbitTemplate 配置了消息抵達確認,消息ID沒有傳值。

RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵達確認通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {String msgId = data.getId();if (ack) {log.info("消息抵達隊列成功:{}", data);} else {log.error("消息未能發送成功,消息ID:{}", data.getId(), cause);}
});

生產者實際發送消息未傳消息ID:

錯誤格式

rabbitTemplate.convertAndSend(exchange, routingKey, data);

正確格式

String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));

2、消息過期后未能轉發到死信隊列

原因:正常消息未綁定死信隊列,消息過期自動刪除,而不會轉發到死信隊列中。

錯誤格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();
}

正確格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交換機.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由鍵.build();
}

3、消費者消費報錯

原因:發送的消息由于自定義的 MessageProperties ,其中缺失了 contentType 參數,需要使用轉化器進行轉換,而不是直接發送消息。

錯誤格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
?
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));

正確格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
?
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/96405.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/96405.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/96405.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

深度學習在股票量化中的應用

深度學習在股票量化中的具體應用&#xff1a;從時間序列預測到Alpha挖掘深度學習并非量化交易的銀彈&#xff0c;但它是一套強大的工具集&#xff0c;能夠解決傳統量化方法難以處理的復雜問題。其核心價值在于從海量、高維、非結構化的數據中自動提取有效特征并發現非線性關系。…

Web 安全之 HTTP 響應截斷攻擊詳解

這不是危言聳聽。 在一次安全審計中&#xff0c;某電商平臺發現&#xff1a; 用戶訪問首頁后&#xff0c;自動跳轉到了賭博網站。 但代碼沒被篡改&#xff0c;服務器沒被入侵&#xff0c;日志一切正常。 最終追查發現—— 罪魁禍首&#xff0c;竟是一個 %0d%0a&#xff08;回車…

Envoy配置ext_proc

介紹 本文將使用gateway api inference extension作為envoy的ext_proc服務端 啟動Ext_Proc 基于Gateway API Inference Extension https://github.com/kubernetes-sigs/gateway-api-inference-extension.git 先clone代碼到本地 git clone https://github.com/kubernetes-…

echarts關系圖(Vue3)

基礎版效果圖&#xff1a;后期請求接口&#xff0c;接入數據即可用<template><div><v-chartref"vChartRef":option"option"style"width: 100%; height: 800px"></v-chart></div> </template><script lan…

【LeetCode】17. 電話號碼的字母組合

文章目錄17. 電話號碼的字母組合題目描述示例 1&#xff1a;示例 2&#xff1a;示例 3&#xff1a;提示&#xff1a;解題思路算法分析問題本質分析回溯法詳解組合生成過程可視化數字映射關系各種解法對比算法流程圖邊界情況處理時間復雜度分析空間復雜度分析關鍵優化點實際應用…

全文 part1 - DGEMM Using Tensor Cores, and Its Accurate and Reproducible Versions

摘要 本文提出了一種在 NVIDIA 圖形處理器&#xff08;GPU&#xff09;的張量核心&#xff08;Tensor Cores&#xff0c;僅含 FP16、INT8 等 GEMM 計算功能&#xff09;上實現 FP64&#xff08;雙精度&#xff0c;DGEMM&#xff09;和 FP32&#xff08;單精度&#xff0c;SGEMM…

Hexo 博客圖片托管:告別本地存儲,用 PicGo + GitHub 打造高速穩定圖床

之前剛開始進行Hexo博客撰寫&#xff0c;圖片都保存在本地Hexo源文件目錄&#xff08;source/images/&#xff09;文件夾&#xff0c;隨著圖片增多&#xff0c;管理起來壓力增大&#xff0c;于是產生了使用圖床&#xff0c;引入外鏈進行圖片存儲的想法 Pros and Cons 提升部署…

關于 VScode 無法連接 Linux 主機并報錯 <未能下載 VScode 服務器> 的解決方案

1. 出現的情況 VScode 遠程登錄 Linux 主機, 出現一下報錯:2. 檢查方案 2.1 VScode 方面 菜單欄: 點擊 <幫助> →\to→ 點擊 <關于> 在出現的彈窗中記錄 [提交: ] 之后的字符串 (暫且將該字符串命名為變量 $commit_id) 2.2 Linux 方面 使用 ssh or MobaXterm 遠程登…

泛型與反射

也是重新溫習了下泛型與反射,反射基本就是一些api理解即可,不過需要注意類加載器原理,而泛型則需要理解其設計思想,可以代替Object,更加靈活,可讀性強。泛型泛型如果指定后,編譯階段就會檢查,不讓亂輸其他類型,必須是引用類型; 如果不指定就默認Object// 如果指定泛型, 就必須存…

Docker端口映射與數據卷完全指南

目錄 Docker端口映射與數據卷完全指南 1. 端口映射:連接Docker容器與外部世界 1.1 為什么需要端口映射 1.2 實現端口映射 1.3 查看端口映射 1.4 修改端口映射(高級操作) 2. 數據卷:Docker數據持久化解決方案 2.1 數據持久化問題 2.2 數據卷的含義 2.3 數據卷的特點 2.4 掛載…

【Linux篇章】穿越網絡迷霧:揭開 HTTP 應用層協議的終極奧秘!從請求響應到實戰編程,從靜態網頁到動態交互,一文帶你全面吃透并征服 HTTP 協議,打造屬于你的 Web 通信利刃!

本篇摘要 本篇將介紹何為HTTP協議&#xff0c;以及它的請求與答復信息的格式&#xff08;請求行&#xff0c;請求包頭&#xff0c;正文等&#xff09;&#xff0c;對一些比較重要的部分來展開講解&#xff0c;其他不常用的即一概而過&#xff0c;從靜態網頁到動態網頁的過渡&a…

QT的項目pro qmake編譯

使用qmake管理Qt庫的子工程示例-CSDN博客 top_srcdir top_builddir

語音交互系統意圖識別介紹和構建

一、意圖識別簡介**意圖識別&#xff08;Intent Recognition&#xff09;**是語音交互系統的核心組件&#xff0c;用于理解用戶語音輸入背后的真實目的&#xff08;如查詢天氣、播放音樂等&#xff09;。輸入&#xff1a;語音轉文本&#xff08;ASR輸出&#xff09;的語句輸出&…

DINOv3 重磅發布

2025年8月14日 Meta 發布了 DINOv3 。 主頁&#xff1a;https://ai.meta.com/dinov3/ 論文&#xff1a;DINOv3 HuggingFace地址&#xff1a;https://huggingface.co/collections/facebook/dinov3-68924841bd6b561778e31009 官方博客&#xff1a;https://ai.meta.com/blog/d…

ansible playbook 實戰案例roles | 實現基于firewalld添加端口

文章目錄一、核心功能描述二、roles內容2.1 文件結構2.2 主配置文件2.3 tasks文件內容免費個人運維知識庫&#xff0c;歡迎您的訂閱&#xff1a;literator_ray.flowus.cn 一、核心功能描述 這個 Ansible Role (firewalld) 的核心功能是&#xff1a;動態地、安全地配置 firewal…

【深度學習實戰(55)】記錄一次在新服務器上使用docker的流程

使用docker&#xff1a;apt-get install dockersudo usermod -aG docker sliu &#xff08;將用戶 sliu 添加到 docker 用戶組&#xff09;newgrp docker &#xff08;刷新&#xff09;docker imagessudo docker load --input /home/sliu/workspace/env/shuai_docker.tar &…

面試后的跟進策略:如何提高錄用幾率并留下專業印象

面試結束后&#xff0c;許多求職者認為自己的任務已經完成&#xff0c;只需等待結果通知。然而&#xff0c;面試后的跟進策略同樣是求職過程中的關鍵環節&#xff0c;它不僅能提高你的錄用幾率&#xff0c;還能展示你的專業素養和持續興趣。本文將結合酷酷面試平臺的專業建議&a…

深入解析RAGFlow六階段架構

下面用“流程圖 六階段拆解”的方式&#xff0c;把 RAGFlow 的完整流程逐層剖開&#xff0c;力求把每一步的輸入、輸出、可選策略、內部機制都講清楚。 ──────────────────────── 一、總覽圖&#xff08;先建立體感&#xff09; 用戶提問 │ ├─→【…

Go語言中的迭代器模式與安全訪問實踐

Go語言中的迭代器模式與安全訪問實踐 1. 迭代器模式在Go中的演進 1.1 傳統迭代器模式回顧 在傳統面向對象語言中&#xff0c;迭代器模式通常涉及三個核心組件&#xff1a;可迭代集合接口(Iterable)迭代器接口(Iterator)具體實現類// 傳統迭代器模式示例 type Iterator interfac…

從零開始:JDK 在 Windows、macOS 和 Linux 上的下載、安裝與環境變量配置

前言 在進入 Java 世界之前&#xff0c;搭建一個穩定、可用的開發環境是每個開發者必須邁過的第一道門檻。JDK&#xff08;Java Development Kit&#xff09;作為 Java 程序開發的核心工具包&#xff0c;其正確安裝與環境變量配置直接關系到后續編譯、運行、調試等所有開發流程…