rabbitmq-amqp事務消息+消費失敗重試機制+prefetch限流

1. 安裝和配置

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

1.2 yml 配置

### 生產端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虛擬主機username: guestpassword: guestpublisher-returns: true  #確認消息已經發送到隊列,生產上無需開啟# simple:同步等待confirm結果,直到超時#開啟消息確認 :correlated:異步回調,MQ返回結果時會回調這個ComfirmCallbackpublisher-confirm-type: correlated #確認消息已發送到交換機
## 生產端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虛擬主機username: guestpassword: guestpublisher-returns: true  #確認消息已經發送到隊列,生產上無需開啟# simple:同步等待confirm結果,直到超時#開啟消息確認 :correlated:異步回調,MQ返回結果時會回調這個ComfirmCallbackpublisher-confirm-type: correlated #確認消息已發送到交換機

2.生產端的消息確認發送代碼

/*** (1) RabbitTemplate.ConfirmCallback 這個接口是用來確定消息是否到達交換器的* (2) RabbitTemplate.ReturnsCallback 這個則是用來確定消息是否到達隊列的,未到達隊列時會被調用*/
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{private RabbitTemplate rabbitTemplate;public void queueConfirm(Map<String, String> map) {// 第一個參數表示交換機,第二個參數表示 routing key,第三個參數即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));// 故意輸入一個不存在的交換機rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));// 故意輸入一個不存在的隊列rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));log.info("Confirm -- 消息--發送結束");}/*** 需要給ConfirmCallback賦值 不然不會走回調方法,默認是null* //將當前類的實例設置為 RabbitMQ 的確認回調處理器,跟下面的confirm方法聯合使用,* // 還需要打開配置:spring: rabbitmq: publisher-confirm-type: correlated*/@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Autowiredpublic RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;
//        rabbitTemplate.setConfirmCallback(this);}/** 此方法用于監聽消息是否發送到交換機* 回調*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("confirm -- 監聽消息成功發送到交換機--回調id = {}", correlationData);} else {log.info("confirm -- 消息沒有發送到交換機回調id= {},消息發送失敗:{}。", correlationData, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息未到達隊列 --- returnedMessage= " + returnedMessage);}
}

2.2 生產端的截圖

3.消費端代碼

@Component
@Slf4j
public class RabbitConfirmConsumer {// 交換機public static final String confirm_exchange_name = "confirm_exchange";// 隊列public static final String confirm_queue_name="confirm_queue";// routingkeypublic static final String confirm_routing_key = "confirm_key1";// 聲明交換機@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name);}// 聲明隊列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(confirm_queue_name).build();}// 綁定隊列到交換機@Beanpublic Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);}/*** ack:成功處理消息,RabbitMQ從隊列中刪除該消息* nack:消息處理失敗,RabbitMQ需要再次投遞消息* reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//獲取消息的唯一標記long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息為:{},消息的唯一標記={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//獲取消息的內容byte[] body = message.getBody();//basicAck:表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。channel.basicAck(deliveryTag,false);//false 表示僅確認當前消息消費成功log.info("接收的消息為:{}", map);}else{channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消費數據");}}}

3.2消費端截圖

4 消費端重試機制

@Service
@Slf4j
public class RabbitRetryConsumer {@Beanpublic Queue retryQueue(){Map<String,Object> params = new HashMap<>();return QueueBuilder.durable("retry_queue").withArguments(params).build();}@Beanpublic TopicExchange retryTopicExchange(){return new TopicExchange("retry_exchange",true,false);}//隊列與交換機進行綁定@Beanpublic Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");}int count  = 0;//測試重試,需要在yml配置 retry@RabbitListener(queues = "retry_queue")public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("retryConsumer 重試次數 = {},重試接收數據為:{}",count++, map);int i = 10 /0;channel.basicAck(tag,false);}}

4.2 重試機制截圖

5. 限流設置--消費端

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 開啟手動確認模式prefetch: 5 #控制消費者從隊列中預取(prefetch)消息的數量,以此來實現流控制

5.1 生產端--發送19條信息

@GetMapping("/xianliu")public String xianliuTest(){for(int i = 1; i < 20; i++){Map<String, String> map = new HashMap<>();map.put("key","限流測試--" + i);rabbitMqProducer.xianliuTest(map);}return "限流測試發送成功";}/**** 限流消息的發送測試*/public void xianliuTest(Map<String, String> map) {// 第一個參數表示交換機,第二個參數表示 routing key,第三個參數即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));}

5.2 消費端

 /*** ack:成功處理消息,RabbitMQ從隊列中刪除該消息* nack:消息處理失敗,RabbitMQ需要再次投遞消息* reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//獲取消息的唯一標記long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息為:{},消息的唯一標記={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//basicAck:表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。//channel.basicAck(deliveryTag,false);//false 表示僅確認當前消息消費成功log.info("接收的消息為:{}", map);}else{//否定確認//channel.basicNack(deliverTag,false,true);//requeue為false,則變成死信隊列channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消費數據");}}

5.3 注釋掉channel.basicAck--堵塞了

5.4 注釋掉了?prefetch -- 19條全部被消費,即使沒有ack

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/71490.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/71490.shtml
英文地址,請注明出處:http://en.pswp.cn/web/71490.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Python】05、Python運算符

文章目錄 1.算術運算符2.賦值運算符3.關系運算符4.邏輯運算符4.1 布爾值邏輯運算4.2 非布爾值的邏輯運算符 5.條件運算符6.運算符優先級 運算符也稱為操作符&#xff0c;可以對一個或多個值進行運算或各種操作。比如、-、都屬于運算符 1.算術運算符 加法 如果是兩個字符串之間…

2025-03-06 學習記錄--C/C++-PTA 習題6-6 使用函數輸出一個整數的逆序數

合抱之木&#xff0c;生于毫末&#xff1b;九層之臺&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; 一、題目描述 ?? 二、代碼&#xff08;C語言&#xff09;?? #include <stdio.h>int reverse( int number );int main…

簡記_硬件系統設計之需求分析要點

目錄 一、 功能需求 二、 整體性能需求 三、 用戶接口需求 四、 功耗需求 五、 成本需求 六、 IP和NEMA防護等級需求 七、 認證需求 功能需求 供電方式及防護 供電方式&#xff1a;市電供電、外置直流穩壓電源供電、電池供電、PoE&#xff08;Power Over Ether…

[原創](Modern C++)現代C++的關鍵性概念: 利用“概念(Concepts)“,可以優雅地約束模板參數

[作者] 常用網名: 豬頭三 出生日期: 1981.XX.XX 企鵝交流: 643439947 個人網站: 80x86匯編小站 編程生涯: 2001年~至今[共24年] 職業生涯: 22年 開發語言: C/C、80x86ASM、PHP、Perl、Objective-C、Object Pascal、C#、Python 開發工具: Visual Studio、Delphi、XCode、Eclipse…

Self-Pro: A Self-Prompt and Tuning Framework for Graph Neural Networks

Self-Pro: A Self-Prompt and Tuning Framework for Graph Neural Networks ?#paper/GFM/GNN-BASED#? #paper/???#? 注意&#xff1a;這篇文章是每個圖一個GCN模型&#xff0c;而不是所有圖一個GCN 模型 算是最早的涉及異配圖的prompt了 貢獻和動機&#xff1a; 非對…

寶塔 Linux 計劃任務中添加運行項目網站PHP任務-定時任務

一、指定php版運行&#xff0c; cd /www/wwwroot/www.xxx.com/ && /www/server/php/56/bin/php think timedtasks start >> /tmp/timedtasks.log 2>&1 二、不指定php版 cd /www/wwwroot/www.xxx.com/ && php think timedtasks start >> …

【電控筆記z29】擾動估測器DOB估測慣量J-摩擦系數B

基本原理 擾動估測器的核心思想是通過向電機系統施加特定的擾動信號&#xff0c;觀察系統響應的變化&#xff0c;然后利用系統的動態模型和控制理論來估計未知參數&#xff0c;如慣量和摩擦系數 。一般基于電機的運動方程建立數學模型&#xff0c;結合觀測到的電機實際運行數據…

要查看 SQLite 數據庫中的所有表,可以通過查詢 SQLite 的系統表 sqlite_master

要查看 SQLite 數據庫中的所有表&#xff0c;可以查詢 SQLite 的系統表 sqlite_master。 每個 SQLite 數據庫都包含一個名為 sqlite_master 的系統表。該表定義了數據庫的模式&#xff0c;存儲了數據庫中所有表、索引、視圖和觸發器等對象的信息。 通過查詢 sqlite_master&am…

如何在Spring Boot中讀取JAR包內resources目錄下文件

精心整理了最新的面試資料和簡歷模板&#xff0c;有需要的可以自行獲取 點擊前往百度網盤獲取 點擊前往夸克網盤獲取 以下是如何在Spring Boot中讀取JAR包內resources目錄下文件的教程&#xff0c;分為多種方法及詳細說明&#xff1a; 方法1&#xff1a;使用 ClassPathResour…

清華大學DeepSeek賦能家庭教育【附下載鏈接】

核心要點&#xff1a; DeepSeek通過基礎模型&#xff08;V3&#xff09;、深度思考模型&#xff08;R1&#xff09;及聯網模型&#xff0c;為家庭教育提供分層支持&#xff1a;V3用于作業輔導&#xff0c;R1培養批判性思維&#xff0c;聯網模型助力探究性學習。家長需遵循目標導…

C語言番外篇(5)-------------->作用域與生命周期

作用域與生命周期是非常重要的編程知識。本篇文章使用C語言講述作用域與生命周期。 一、作用域 在程序設計中&#xff0c;變量并非總是有效的&#xff0c;可以使用的區域就是作用域。 1.1局部變量的作用域 在{}中的都是局部變量&#xff0c;只是作用大小不一樣而已。我們可…

若依前后端分離版使用Electron打包前端Vue為Exe文件

1.前言 本文詳細介紹如何使用electron將若依框架前后端分離版的前端Vue頁面打包為Exe文件&#xff0c;并且包括如何實現應用更新。使用若依基礎代碼體現不出打包功能&#xff0c;因此我使用開發的文件管理系統&#xff0c;介紹上述過程&#xff0c;具體可以查看我的文章《若依…

Linux——Docker容器內MySQL密碼忘記了如何查看

目錄 查看正在運行的MySQL的容器ID 方法一&#xff1a;查看MySQL容器的日志里的密碼 方法二&#xff1a;通過環境變量密碼登錄 方法三&#xff1a;修改密碼 查看正在運行的MySQL的容器ID docker ps 方法一&#xff1a;查看MySQL容器的日志里的密碼 docker logs [MySQL的容器…

康謀分享 | 3DGS:革新自動駕駛仿真場景重建的關鍵技術

隨著自動駕駛技術的迅猛發展&#xff0c;構建高保真、動態的仿真場景成為了行業的迫切需求。傳統的三維重建方法在處理復雜場景時常常面臨效率和精度的挑戰。在此背景下&#xff0c;3D高斯點陣渲染&#xff08;3DGS&#xff09;技術應運而生&#xff0c;成為自動駕駛仿真場景重…

大模型架構記錄2

一 應用場景 1.1 prompt 示例 1.2 自己搭建一個UI界面&#xff0c;調用接口 可以選用不同的模型&#xff0c;需要對應的API KEY 二 Agent 使用 2.1 構建GPT

【C++】二叉樹相關算法題

一、根據二叉樹創建字符串 題目描述&#xff1a; 給你二叉樹的根節點 root &#xff0c;請你采用前序遍歷的方式&#xff0c;將二叉樹轉化為一個由括號和整數組成的字符串&#xff0c;返回構造出的字符串。 空節點使用一對空括號對 “()” 表示&#xff0c;轉化后需要省略所有…

【機械視覺】C#+visionPro聯合編程———【一、C# + VisionPro 聯合編程詳解以及如何將visionPro工具加載到winform】

機械視覺與 C# VisionPro 聯合編程詳解 目錄 機械視覺與 C# VisionPro 聯合編程詳解 概念 應用場景 1. 工業檢測與質量控制缺陷檢測 2. 定位與機器人引導 3. 識別與分類 4. 復雜流程控制 將visionPro工具加載到winform 環境準備 一、創建winform項目 二、打開窗體…

修改hosts文件,修改安全屬性,建立自己的DNS

初級代碼游戲的專欄介紹與文章目錄-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代碼都將會位于ctfc庫中。已經放入庫中我會指出在庫中的位置。 這些代碼大部分以Linux為目標但部分代碼是純C的&#xff0c;可以在任何平臺上使用。 源碼指引&#xff1a;github源…

對NXP提供的BSP里邊所使用的u-boot的環境變量`bootcmd`的解析

為什么我們要解析環境變量bootcmd&#xff1f; 承接博文 https://blog.csdn.net/wenhao_ir/article/details/145902134 繼續解析u-boot的環境變量bootcmd。 為什么要解析u-boot的這個環境變量bootcmd&#xff1f;因為如果u-boot在倒計時完后,首先執行的是就是下面這條命令&am…

NSSCTF [SWPUCTF 2024 秋季新生賽]金絲雀

5948.[SWPUCTF 2024 秋季新生賽]金絲雀 canary繞過和64位的ret2libc(格式化字符串泄露) (1) motalymotaly-VMware-Virtual-Platform:~/桌面$ file xn xn: ELF 64-bit LSB executable, x86-64, version 1 (SYSV), dynamically linked, interpreter /lib64/ld-linux-x86-64.so.…