RabbitMQ——消息確認

一、消息確認機制

生產者發送的消息,可能有以下兩種情況:

1> 消息消費成功

2> 消息消費失敗

為了保證消息可靠的到達消費者(!!!注意:消息確認機制和前面的工作模式中的publisher confirms模式有很大區別,消息確認保證的是消息可靠的到達消費者,而publisher confirms保證的是消息可靠的到達RabbitMQServer),RabbitMQ引入了消息確認機制:

消費者在消費消息時,可以指定autoAck參數,對應著兩種確認方式

(1)自動確認:消息只要到達消費者就會自動確認,不會考慮消費者是否正確消費了這些消息,直接從 內存/磁盤 中刪除消息;

(2)手動確認:消息到達消費者,不會自動確認,會等待消費者調用Basic.Ack命令,才會從內存/磁盤 移除這條消息。

DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

如果將basicConsume中的true改為false,對于RabbitMQ服務器來說,消息分成兩個部分:

1>? 未發送給消費者的消息;

2>? 已經發送給消費者,但還沒有被確認的消息。

對應管理界面:


二、手動確認方法

RabbitMQ提供了兩種手動確認,分別是肯定確認和否定確認,對應著3個方法:

(1)肯定確認:Channel.basicAck(long deliveryTag, boolean multiple)

??? 其中,deliveryTag為消息的唯一表示,在每一個channel中都是唯一的,相當于TCP協議中的序號的作用,用來確認哪條消息已經收到,multiple為true表示是否批量確認,同樣與TCP中確認序號的作用類似,表示在這個deliveryTag前的消息都已收到,為false則表示一次只確認一條消息。

(2)否定確認:?Channel.basicReject(long deliveryTag, boolean requeue)

? ?如果消息到達消費者后,消費者未正確處理這條消息(如發生異常),就可以通過這個方法進行否定確認,其中,參數requeue表示這條消息是否需要重新入隊,這個方法一次只能確認一條消息。

(3)否定確認:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

? ?與basicAck一樣,這個方法可以通過multiple來實現批量確認


三、使用Spring Boot演示消息確認機制

? 演示之前,需要先了解Spring-AMQP的確認機制,它和RabbitMQ JDK Client庫的確認機制有些許不同:

1. AcknowledgeMode.NONE

? 和RabbitMQ JDK Client庫的自動確認機制一樣,只要消息到達消費者,這條消息就會從隊列中移除。

2.?AcknowledgeMode.AUTO(和JDK Client庫的區別)

? 消息到達消費者且處理成功,才會自動確認,如果消息在處理過程中發生了異常,則不會自動確認。

3.?AcknowledgeMode.MANUAL

? 和JDK Client庫一樣,屬于手動確認機制,同樣分為肯定確認和否定確認。

接下來,進行準備工作,創建一個Spring Boot項目,添加RabbitMQ依賴:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加RabbitMQ相關配置:

spring:rabbitmq:addresses: amqp://study:study@110.41.17.130:5672/extensionlistener:simple:acknowledge-mode: none #表示當前確認機制未AcknowledgeMode.NONE

添加下圖的包,創建對應的類:

?


3.1? AcknowledgeMode.NONE

(1)編寫常量類(由于只是驗證消息確認機制,可以隨便使用一種工作模式)

public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";
}

(2)配置聲明隊列、交換機、交換機與隊列綁定關系

@Configuration
public class RabbitMQConfig {//1.聲明隊列@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}//2.聲明交換機@Bean("ackExchange")public FanoutExchange ackExchange(){return ExchangeBuilder.fanoutExchange(Constants.ACK_EXCHANGE).build();}//3.聲明隊列與交換機的綁定關系@Beanpublic Binding ackBinding(@Qualifier("ackExchange") FanoutExchange fanoutExchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}
}

(3)編寫生產者代碼

@RestController
@RequestMapping("/producer")
public class ProducerController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"","consumer ack mode test...");return "發送消息成功";}
}

(4)編寫消費者代碼

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理");
//            int num = 10/0;System.out.println("業務處理完成");}
}

(5)測試正常消費消息的情況


(6)測試消息消費異常情況(將消費者代碼中得 int num? = 10/0? 注解打開)


(7)總結

? 經過確認,可以發現AcknowledgeMode.NONE機制,無論消費者是否正確消費消息,都會自動確認,不會保留異常消費的消息


3.2 AcknowledgeMode.AUTO

(1)?修改配置文件中的 acknowledge-mode: none 為 acknowledge-mode: auto

(2)演示消息正常消費的情況(將 int num = 10/0 注釋)

  @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理");
//            int num = 10/0;System.out.println("業務處理完成");}

運行程序,訪問 producer/ack 接口發送消息:


(3)演示消息處理異常情況(取消 int num = 10/0 的注釋)

 @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理");int num = 10/0;System.out.println("業務處理完成");}

運行程序,訪問生產者端口:

消息沒有成功消費,如果此時我們修改代碼(將 int num = 10/0 注釋,使程序不再發生異常),再次運行程序,隊列中保存的消息就會被正常消費:


3.3 AcknowledgeMode.MANUAL

? (1) 修改配置文件中?acknowledge-mode: auto 為?acknowledge-mode: manul

(2)修改消費者代碼(由于是手動確認,需要在代碼中添加正常消費時的 “肯定確認” 和 消費異常時的 “否定確認”)

    @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//業務邏輯處理System.out.println("業務邏輯處理");
//                int num = 10/0;System.out.println("業務處理完成");//消息正常消費,肯定確認channel.basicAck(deliveryTag,false);//單條確認}catch (Exception e){//消息消費異常,否定確認channel.basicNack(deliveryTag,false,true);//單條否定,異常后重新入隊}}

(3)消息正常消費時的情況(注釋 int num = 10/0)

接下來注釋掉手動肯定確認的這行代碼,看看會發生什么情況:

 //消息正常消費,肯定確認//channel.basicAck(deliveryTag,false);//單條確認

再次運行程序并通過接口 producer/ack 發送消息:

可以看到,在AcknowledgeMode.MANUAL機制下,如果不手動確認,隊列不會移除已經被正常消費的消息:


(3)消息消費異常的情況下(取消 int num = 10/0 的注釋)

接下來注釋掉channel.basicNack,運行程序,訪問接口發送消息:

如果將參數requeue置為false,會怎么樣?

channel.basicNack(deliveryTag,false,false);//單條否定,重新發送消息

運行程序:


(4)總結

1> 無論是否正常處理消息都要進行手動確認;

2> 正常處理消息但未手動確認,管理界面中的隊列會有 一條/多條 Unacked 的消息(重新啟動程序后會重新消費);

3> 異常處理消息且未手動確認,也會有 一條/多條 Unacked 的消息(重啟程序同樣重新消費);

4> 如果異常處理,將requeue置為false,隊列不會保存 這條/多條 異常消費的消息

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?置為true,隊列會不斷重新發送 這條/多條 消息

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/82333.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/82333.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/82333.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C++異步(1)

什么是異步? 異步就是多個線程是同時執行的&#xff0c;與之相對的就是線程同步&#xff0c;二者都應用在并發的場景上。 異步的特點 異步執行的任務無需等待其他任務完成&#xff0c;其本身是通過非阻塞的方式執行的&#xff0c;不依賴前驅任務&#xff0c;通常用于IO密集…

向量數據庫Milvus03-高級功能與性能調優

Milvus高級功能與性能調優 目錄 高級特性詳解性能調優技巧生產環境部署最佳實踐總結與展望 1. 高級特性詳解 1.1 多索引兼容 Milvus 支持多種索引類型&#xff08;如 HNSW、IVF_PQ、IVF_FLAT&#xff09;的混合使用&#xff0c;以適應不同場景的需求。 HNSW&#xff08;Hier…

5月24日day35打卡

模型可視化與推理 知識點回顧&#xff1a; 三種不同的模型可視化方法&#xff1a;推薦torchinfo打印summary權重分布可視化進度條功能&#xff1a;手動和自動寫法&#xff0c;讓打印結果更加美觀推理的寫法&#xff1a;評估模式 作業&#xff1a;調整模型定義時的超參數&#x…

野火魯班貓(arrch64架構debian)從零實現用MobileFaceNet算法進行實時人臉識別(三)用yolov5-face算法實現人臉檢測

環境直接使用第一篇中安裝好的環境即可 先clone yolov5-face項目 git clone https://github.com/deepcam-cn/yolov5-face.git 并下載預訓練權重文件yolov5n-face.pt 網盤鏈接: https://pan.baidu.com/s/1xsYns6cyB84aPDgXB7sNDQ 提取碼: lw9j &#xff08;野火官方提供&am…

R語言科研編程-柱狀圖

R語言簡介 R語言是一種開源的統計計算和圖形繪制編程語言&#xff0c;廣泛應用于數據分析、機器學習、數據可視化等領域。它由Ross Ihaka和Robert Gentleman于1993年開發&#xff0c;具有豐富的統計函數庫和圖形功能&#xff0c;尤其適合數據科學研究和可視化任務。 使用R語言…

Android-Handler學習總結

??面試官?&#xff1a;你好&#xff01;我看你簡歷里提到熟悉 Android 的 Handler 機制&#xff0c;能簡單說一下它的作用嗎&#xff1f; ?候選人?&#xff1a; Handler 是 Android 中用來做線程間通信的工具。比如Android 應用的 UI 線程&#xff08;也叫主線程…

【iOS】分類、擴展、關聯對象

分類、擴展、關聯對象 前言分類擴展擴展和分類的區別關聯對象key的幾種用法流程 總結 前言 最近的學習中筆者發現自己對于分類、擴展相關知識并不是很熟悉&#xff0c;剛好看源碼類的加載過程中發現有類擴展與關聯對象詳解。本篇我們來探索一下這部分相關知識&#xff0c;首先…

30.第二階段x64游戲實戰-認識網絡數據包發送流程

免責聲明&#xff1a;內容僅供學習參考&#xff0c;請合法利用知識&#xff0c;禁止進行違法犯罪活動&#xff01; 內容參考于&#xff1a;圖靈Python學院 上一個內容&#xff1a;29.第二階段x64游戲實戰-技能冷卻 發送數據包的方式&#xff08;函數&#xff09;操作系統提供…

【每日一題】【前綴和優化】【前/后綴最值】牛客練習賽139 B/C題 大衛的密碼 (Hard Version) C++

牛客練習賽139 B題 大衛的密碼 (Easy Version) 牛客練習賽139 C題 大衛的密碼 (Hard Version) 大衛的密碼 題目背景 牛客練習賽139 題目描述 給定一個 n m n\times m nm的網格圖&#xff0c;我們使用 ( i , j ) (i,j) (i,j)表示網格中從上往下數第 i i i行和從左往右數第…

文件夾圖像批處理教程

前言 因為經常對圖像要做數據清洗&#xff0c;又很費時間去重新寫一個&#xff0c;我一直在想能不能寫一個通用的腳本或者制作一個可視化的界面對文件夾圖像做批量的修改圖像大小、重命名、劃分數據訓練和驗證集等等。這里我先介紹一下我因為寫過的一些腳本&#xff0c;然后我…

【Unity實戰筆記】第二十四 · 使用 SMB+Animator 實現基礎戰斗系統

轉載請注明出處&#xff1a;&#x1f517;https://blog.csdn.net/weixin_44013533/article/details/146409453 作者&#xff1a;CSDN|Ringleader| 1 結構 1.1 狀態機 1.2 SMB 2 代碼實現 2.1 核心控制 Player_Base_SMB 繼承 StateMachineBehaviour &#xff0c;控制變量初始…

Python虛擬環境再PyCharm中自由切換使用方法

Python開發中的環境隔離是必不可少的步驟,通過使用虛擬環境可以有效地管理不同項目間的依賴,避免包沖突和環境污染。虛擬環境是Python官方提供的一種獨立運行環境,每個項目可以擁有自己單獨的環境,不同項目之間的環境互不影響。在日常開發中,結合PyCharm這樣強大的IDE進行…

大模型智能體入門掃盲——基于camel的概述

前言 本篇博客想帶讀者進行一個智能體入門掃盲&#xff0c;了解基礎知識&#xff0c;為什么用camel呢&#xff0c;因為小洛發現它們文檔對這種智能體的基本組件介紹得很全面深入。 基礎概念 agent 一個典型的agent智能體包含三個核心部分&#xff1a; 感知模塊&#xff1…

目標檢測 RT-DETR(2023)詳細解讀

文章目錄 主干網絡&#xff1a;Encoder&#xff1a;不確定性最小Query選擇Decoder網絡&#xff1a; 將DETR擴展到實時場景&#xff0c;提高了模型的檢測速度。網絡架構分為三部分組成&#xff1a;主干網絡、混合編碼器、帶有輔助預測頭的變換器編碼器。具體來說&#xff0c;先利…

DeepSeek 賦能數字農業:從智慧種植到產業升級的全鏈條革新

目錄 一、數字農業的現狀與挑戰二、DeepSeek 技術解析2.1 DeepSeek 的技術原理與優勢2.2 DeepSeek 在人工智能領域的地位與影響力 三、DeepSeek 在數字農業中的應用場景3.1 精準種植決策3.2 病蟲害監測與防治3.3 智能灌溉與施肥管理3.4 農產品質量追溯與品牌建設 四、DeepSeek …

<uniapp><vuex><狀態管理>在uniapp中,如何使用vuex實現數據共享與傳遞?

前言 本專欄是基于uniapp實現手機端各種小功能的程序&#xff0c;并且基于各種通訊協議如http、websocekt等&#xff0c;實現手機端作為客戶端&#xff08;或者是手持機、PDA等&#xff09;&#xff0c;與服務端進行數據通訊的實例開發。 發文平臺 CSDN 環境配置 系統&…

高速串行差分信號仿真分析及技術發展挑戰續

7.3 3.125Gbps 差分串行信號設計實例仿真分析 7.3.1 設計用例說明 介紹完 Cadence 系統本身所具有的高速差分信號的仿真分析功能之后&#xff0c;我們以一個實例來說明 3.125Gbps 以下的高速差分系統的仿真分析方法。 在網上下載的設計文件“Booksi_Demo_Allegro160_Finishe…

【Golang】部分語法格式和規則

1、時間字符串和時間戳的相互轉換 func main() {t1 : int64(1546926630) // 外部傳入的時間戳&#xff08;秒為單位&#xff09;&#xff0c;必須為int64類型t2 : "2019-01-08 13:50:30" // 外部傳入的時間字符串//時間轉換的模板&#xff0c;golang里面只能是 &quo…

第十六章:數據治理之數據架構:數據模型和數據流轉關系

本章我們說一下數據架構&#xff0c;說到數據架構&#xff0c;就很自然的想到企業架構、業務架構、軟件架構&#xff0c;因為個人并沒有對這些內容進行深入了解&#xff0c;所以這里不做比對是否有相似或者共通的地方&#xff0c;僅僅來說一下我理解的數據架構。 1、什么是架構…

Day126 | 靈神 | 二叉樹 | 層數最深的葉子結點的和

Day126 | 靈神 | 二叉樹 | 層數最深的葉子結點的和 1302.層數最深的葉子結點的和 1302. 層數最深葉子節點的和 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 這道題用層序遍歷的思路比較好想&#xff0c;就把每層的都算一下&#xff0c;然后返回最后一層的和就…