RabbitMQ 高級特性之消息確認

1. 簡介

RabbitMQ 的消息發送流程:

  • producer 將消息發送給 broker,consumer 從 broker 中獲取消息并消費

那么在這里就涉及到了兩種消息發送,即 producer 與 broker 之間和 consumer 與 broker 之間。

“消息確認” 討論的是 consumer 與 broker 之間的消息發送。

2. 為什么會有這個特性

當 broker 給 consumer 發送消息時,可能會出現下面兩種情況:

  • 消息未成功到達 consumer;
  • 消息成功到達 consumer,但是 consumer 沒有成功消費這條消息,如:在處理消息時發生異常等情況。

這時,就需要有一種解決方案,保證 broker 與 consumer 之間消息傳輸的可靠性,于是就有了消息確認這一特性。

3. 使用 RabbitMQ Java 時如何進行消息確認(不是重點)

public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//創建信道Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//聲明隊列//如果隊列不存在,就創建channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//消費消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}

這是一段路由模式的代碼,在這段代碼中,有下面一條語句:

channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);

在這個方法中,有三個參數:

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  • queue:consumer 通過哪個隊列獲取 broker 發送的消息;
  • autoAck:是否自動確認;
  • callback:consumer 消費消息的邏輯。

其中,autoAck 就是消息確認的體現:

  • autoAck 為 true:RabbitMQ 會將發送給 consumer 的消息視為已被成功接收和消費(consumer 可能并沒有成功接收到或成功消費,但是 RabbitMQ 不管了),就會被將這條消息刪除;
  • autoAck 為 false:當 RabbitMQ 發送消息后,并不會馬上就將消息刪除,而是會等 consumer 調用 Basic.Ack,收到 ack 后,才會將消息刪除。

將 autoAck 設置為 false 后,若 broker 長時間沒有收到 consumer 發送的 ack 且 consumer 已經斷開連接,就會將這條消息重新入隊列,繼續發送給 consumer 進行消費,此時,隊列中的消息就分為了兩種:

  • 還未被發送的消息;
  • 已經發送了的消息,但是沒有收到 ack 而重新入隊列等待被消費。

4. 在 spring 中使用 RabbitMQ 時如何進行消息確認

4.1 basicAck

在 spring 下的 Channel 類中提供了下面幾種方法:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

在這個方法中,有三個參數:

  • deliveryTag:是 broker 給 consumer 發送消息的唯一標識,在一個 channel 中 deliveryTag 是唯一的;
  • mulitple: 是否批量確認

使用這個方法后,就會告知 broker 這條消息已經成功被消費,可以將其刪除。

4.2 basicNack

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

?在這個方法中,多了一個參數:

  • requeue:是否重新入隊列。

使用這個方法,就相當于給 broker 發送 nack,即這條消息沒有被正確消費。

若 requeue 為 true,就會將這條消息重新入隊列,繼續給 consumer 消費;

若 requeue 為 false,broker 就會這條消息刪除。

4.3 basicReject

void basicReject(long deliveryTag, boolean requeue) throws IOException;

這個方法與 basicNack 大致相同,此處省略。

4.4 配置

在 spring 中,提供了三種配置用于消息確認:

  • none:當消息發送給 consumer,不管 consumer 是否成功消費了消息,broker 都會當作這條消息被成功消費了,然后刪除這條消息;
  • auto:在 consumer 處理消息時沒有拋出異常時,就會確認消息,反之就不會確認,并且將消息重新放入隊列中,進行下一次的消費;
  • manual:手動確認,我們需要在代碼中指定這條消息是消費成功還是消費失敗,分別使用 basicAck 和 basicNack。
spring:rabbitmq:listener:simple:acknowledge-mode: none

5. 代碼測試

@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {String messageInfo = "consumer ack mode test...";rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, Constants.ACK_ROUTINGKEY, messageInfo);return "消息發送成功";}
}

這段代碼代表的是一個 producer,下面接收到的消息都是通過這段代碼發送的。

5.1 none

① 無異常時的消費者代碼:

    @RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}

代碼運行結果如下:

我們可以通過訪問 RabbitMQ 客戶端來觀察這條消息是否成功被消費:

?

可以看到,Messages 這一列中,Ready 和 Unacked 都為 0,表示消息被成功消費。?

?② 有異常時的消費者代碼:

    @RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}

代碼運行結果如下:

由于我們使用了除零操作,于是拋出了異常,我們可以通過訪問?RabbitMQ 來觀察這條消息是否被刪除:

和上面一樣,在 broker 中這條消息已經被刪除,這與 none 配置性質一致。

5.2 auto?

① 無異常時的消費者代碼:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}

代碼運行結果如下:

在 RabbitMQ 客戶端中顯示,這條消息已經被成功消費:

?

② 有異常時的消費者代碼:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}

?代碼運行結果如下:

在運行結果中,一直會有報錯產生,并且都是兩個兩個為一組,并且在報錯信息中可以看到,producer 發送的消息一直在被消費,這是因為存在異常,就會導致這條消息一直在隊列中,通過觀察 RabbitMQ 客戶端可以看出,這條消息依然保存在隊列中:

5.3??manual

① 無異常的消費者代碼如下:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

在這段代碼中,我們使用了 basicAck 和 basicNack 來進行消息確認,當消息處理成功后,就會執行 basicAck,告訴 broker 這條消息已經被成功消費,可以將其刪除;當消息執行發生異常后,就會執行 basicNack,并且根據 requeue 參數決定如何處理這條消息。

代碼運行結果如下:

RabbitMQ 客戶端顯示這條消息被成功消費:

?

?② 有異常的消費者代碼如下:

當 requeue 為 true:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);int n = 1 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

在此處的 basicNack,將 requeue 設置為了 true,當消息處理失敗后,就會將消息重新入隊列,重新被消費:

我們可以看到,這條消息一直在被消費,并且 delivertTag 在遞增。

并且從?RabbitMQ 客戶端中可以看到,這條消息依然存在,等待被成功消費:

?當 requeue 為 false:

當處理消息發生異常后,就會將消息從隊列中刪除。

代碼運行結果如下:

雖然異常依然存在,但是消息卻沒有重復發送,并且 RabbitMQ 中也將這條消息刪除:

?

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/89704.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/89704.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/89704.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【51單片機用數碼管顯示流水燈的種類是按鈕控制數碼管加一和流水燈】2022-6-14

緣由 #include "REG52.h" unsigned char code smgduan[]{0x3f,0x06,0x5b,0x4f,0x66,0x6d,0x7d,0x07,0x7f,0x6f,0x77,0x7c,0x39,0x5e,0x79,0x71,0,64}; //共陰0~F消隱減號 unsigned char Js0, miao0;//中斷計時 秒 分 時 毫秒 sbit k0P3^0; sbit k1P3^1; void smxs(u…

Android15 開機動畫播放結束之后如何直接啟動應用

問題背景 軟件版本:Android15 在一些需求場景里面,需要開機動畫播放結束立馬去啟動一個應用,下面介紹如何實現這種方案。 解決方案 首選我們需要知道開機動畫播放結束之后的流程,這里會調用到wms里面,也就是一些enableScreen之類的函數,知道這個大概流程之后,再去對應…

AI實踐:大模型痛點和解決方案討論

大家好,我是星野,歡迎來到我的CSDN博客。在這個技術日新月異的時代,我們一起學習,共同進步。 今天想和大家分享的是大模型在實際應用中的痛點以及解決方案,特別是RAG(檢索增強生成)技術。 大模…

Web前端工程化

Web前端工程化 前端工程化是指將軟件工程的方法和原則應用到前端開發中,以提高開發效率、保證代碼質量、便于團隊協作和項目維護的一套體系化實踐。以下是前端工程化的主要內容和實踐: 核心組成部分 1. 模塊化開發 JavaScript模塊化:Comm…

Java 原生 HTTP Client

?介紹 Java 原生 HttpClient 是從 Java 11 開始引入的標準庫,用于簡化 HTTP 請求的發送與響應處理。它支持同步和異步請求,并內置對 HTTP/1.1 和 HTTP/2 協議的支持。HttpClient 提供了易用的 API 來設置請求頭、請求體、處理響應以及配置 SSL/TLS 加密…

【C語言刷題】第十天:加量加餐繼續,代碼題訓練,融會貫通IO模式

🔥個人主頁:艾莉絲努力練劍 ?專欄傳送門:《C語言》、《數據結構與算法》、C語言刷題12天IO強訓、LeetCode代碼強化刷題 🍉學習方向:C/C方向 ??人生格言:為天地立心,為生民立命,為…

【WEB】Polar靶場 6-10題 詳細筆記

六.jwt 這題我又不會寫 先來了解下jwt **JWT(JSON Web Token)**是一種基于JSON的開放標準(RFC 7519),主要用于在網絡應用環境間傳遞聲明信息。JWT通常用于身份驗證和信息交換,確保在各方之間安全地傳輸信…

高階亞馬遜運營秘籍:關鍵詞矩陣打法深度解析與應用

當競爭對手還在為單個大詞競價廝殺時,頭部賣家已悄然構建了一張覆蓋數千長尾關鍵詞的隱形網絡,精準觸達每一個細分需求,以更低的成本撬動更高的轉化率在亞馬遜流量紅利消退、廣告成本高企的2025年,傳統“爆款關鍵詞”打法已顯疲態…

【問題解決】org.springframework.web.util.NestedServletException Handler dispatch failed;

詳細異常信息: org.springframework.web.util.NestedServletException: Handler dispatch failed; nested exception is java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter at org.springframework.web.servlet.DispatcherServlet.doDispatch(Disp…

【已解決】mac 聚焦搜索設置了edge 的地址欄搜索為google,還是跳轉到百度

問題詳情:在macbook的聚焦搜索中點擊edge搜索的時候,跳轉到了百度,即使已經將地址欄的搜索引擎設置為了goole,但是還是會跳轉到百度。解決方案:1、打開safari瀏覽器。(看清了,是打開Safari&…

MimicMotion 讓你的圖片動起來

MimicMotion 是由騰訊公司推出的一款人工智能人像動態視頻生成框架。可以模仿視頻動作再讓圖片模仿動作姿態,最后生成視頻。 MimicMotion 的核心在于其置信度感知的姿態引導技術,確保視頻幀的高質量和時間上的平滑過渡。 以前咱們也手搭過Animate-X讓圖…

云計算考核 - 分析電子銀行需求采用微服務架構對系統進行設計

二、使用的技術以及分析 微服務(Microservices)是一種架構風格,一個大型復雜軟件應用由一個或多個微服務組成。系統中的各個微服務可被獨立部署,各個微服務之間是松耦合的。每個微服務僅關注于完成一件任務并很好地完成該任務。在…

Ionic 安裝使用教程

一、Ionic 簡介 Ionic 是一個基于 Web 技術(HTML、CSS、JavaScript)的跨平臺移動應用開發框架,結合 Angular、React 或 Vue 可快速構建 iOS 和 Android 應用。Ionic 提供豐富的 UI 組件、命令行工具及原生插件封裝,廣泛用于混合應…

滲透測試 - 簡介

Web滲透測試簡介 Web滲透測試(Penetration Testing)是一種模擬黑客攻擊的安全評估方法,旨在發現Web應用程序中的漏洞,幫助開發者修復問題并提升系統安全性。它涉及主動測試目標系統(如網站或API)的弱點&am…

云原生AI研發體系建設路徑

當AI遇上云原生,就像咖啡遇上牛奶,總能擦出不一樣的火花 ?? 📋 文章目錄 引言:為什么要建設云原生AI研發體系整體架構設計:搭建AI研發的"樂高積木"技術棧選擇:選擇合適的"武器裝備"…

【網絡安全】深入理解 IoC 與 IoA:從“事后識別”到“事前防御”

1. 簡介 在網絡安全領域,IoC(Indicators of Compromise,入侵指標) 和 IoA(Indicators of Attack,攻擊指標) 是兩個核心概念。它們是安全分析師識別攻擊行為、調查事件、制定防御策略的重要依據…

貪心專題練習

牛牛學括號題目要求每次操作必須刪除一個左括號和一個右括號,且刪除后序列仍需合法。合法的括號序列要求每個右括號之前必須有對應的左括號。分析輸入的都是合法的括號,即左括號右括號,可利用這一點去解題注意:中間取模是必要的&a…

屏幕分辨率修改工具 SwitchResX(Mac電腦)

蘋果電腦屏幕分辨率修改工具,SwitchResX for Mac,可以為您提供控制顯示器分辨率所需的工具和功能。 原文地址:屏幕分辨率修改工具 SwitchResX(Mac電腦)

【Java編程動手學】Java中的數組與集合

文章目錄 一、Java數組基礎1.1 數組結構概述1.2 一維數組1.2.1 聲明與初始化1.2.2 訪問與修改元素1.2.3 數組遍歷 1.3 二維數組1.3.1 聲明與初始化1.3.2 訪問與遍歷 1.4 三維數組及更高維數組1.5 數組類(Arrays)1.5.1 常用方法 1.6 復制數組1.6.1 系統復制方法1.6.2 手動復制 二…

Linux在線安裝docker

1.切換阿里云鏡像源 備份原有 repo 文件 sudo mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup 下載阿里云的 CentOS 7 repo 文件 sudo curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 清華 sudo…