rabbitmq的發布確認

生產者將信道設置成 confirm 模式,一旦信道進入 confirm 模式, 所有在該信道上面發布的
消息都將會被指派一個唯一的 ID (從 1 開始),一旦消息被投遞到所有匹配的隊列之后,broker
就會發送一個確認給生產者(包含消息的唯一 ID),這就使得生產者知道消息已經正確到達目的隊
列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker 回傳
給生產者的確認消息中 delivery-tag 域包含了確認消息的序列號,此外 broker 也可以設置
basic.ack 的 multiple 域,表示到這個序列號之前的所有消息都已經得到了處理。

單個確認發布 ?

這是一種簡單的確認方式,它是一種 同步確認發布 的方式,也就是發布一個消息之后只有它
被確認發布,后續的消息才能繼續發布,waitForConfirmsOrDie(long)這個方法只有在消息被確認
的時候才返回,如果在指定時間范圍內這個消息沒有被確認那么它將拋出異常。
這種確認方式有一個最大的缺點就是: 發布速度特別的慢, 因為如果沒有確認發布的消息就會
阻塞所有后續消息的發布,這種方式最多提供每秒不超過數百條發布消息的吞吐量。當然對于某
些應用程序來說這可能已經足夠了。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;public class publishMessageIndividually {private static final int MESSAGE_COUNT = 5;public static void publishMessageIndividually() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發布確認channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//服務端返回 false 或超時時間內未返回,生產者可以消息重發boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息發送成功");}}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) +"ms");}}
}

耗時

?批量確認發布

上面那種方式非常慢,與單個等待確認消息相比,先發布一批消息然后一起確認可以極大地
提高吞吐量,當然這種方式的缺點就是:當發生故障導致發布出現問題時,不知道是哪個消息出現
問題了,我們必須將整個批處理保存在內存中,以記錄重要的信息而后重新發布消息。當然這種
方案仍然是同步的,也一樣阻塞消息的發布。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;public class publishMessageBatch {private static final int MESSAGE_COUNT = 5;public static void publishMessageBatch() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發布確認channel.confirmSelect();//批量確認消息大小int batchSize = 100;//未確認消息個數int outstandingMessageCount = 0;long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {channel.waitForConfirms();outstandingMessageCount = 0;}}//為了確保還有剩余沒有確認消息 再次確認if (outstandingMessageCount > 0) {channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) +"ms");}}public static void main(String[] args) throws Exception {publishMessageBatch.publishMessageBatch();}
}

?耗時

?異步確認發布

異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,
他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功,
下面就讓我們來詳細講解異步確認是怎么實現的。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;public class publishMessageAsync {private static final int MESSAGE_COUNT = 5;public static void publishMessageAsync() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發布確認channel.confirmSelect();/*** 線程安全有序的一個哈希表,適用于高并發的情況* 1.輕松的將序號與消息進行關聯* 2.輕松批量刪除條目 只要給到序列號* 3.支持并發訪問*/ConcurrentSkipListMap<Long, String> outstandingConfirms = newConcurrentSkipListMap<>();/*** 確認收到消息的一個回調* 1.消息序列號* 2.true 可以確認小于等于當前序列號的消息* false 確認當前序列號消息*/ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {if (multiple) {//返回的是小于等于當前序列號的未確認消息集合 是一個 mapConcurrentNavigableMap<Long, String> confirmed =outstandingConfirms.headMap(sequenceNumber, true);//清除該部分未確認消息集合confirmed.clear();}else{//只清除當前序列號的消息outstandingConfirms.remove(sequenceNumber);}};ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {String message = outstandingConfirms.get(sequenceNumber);System.out.println("發布的消息"+message+"未被確認,序列號"+sequenceNumber);};/*** 添加一個異步確認的監聽器* 1.確認收到消息的回調* 2.未收到消息的回調*/channel.addConfirmListener(ackCallback, nackCallback);long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;/*** channel.getNextPublishSeqNo()獲取下一個消息的序列號* 通過序列號與消息體進行一個關聯* 全部都是未確認的消息體*/outstandingConfirms.put(channel.getNextPublishSeqNo(), message);channel.basicPublish("", queueName, null, message.getBytes());}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) +"ms");}}public static void main(String[] args) throws Exception {publishMessageAsync.publishMessageAsync();}
}

耗時

?以上 3 種發布確認速度對比

單獨發布消息

同步等待確認,簡單,但吞吐量非常有限。

批量發布消息

批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是那條

消息出現了問題。

異步處理: 最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實現起來稍微難些

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/40716.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/40716.shtml
英文地址,請注明出處:http://en.pswp.cn/news/40716.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

AI巨浪下,數據技術如何驅動智能未來?

引言 數據技術是大數據時代的核心驅動力&#xff0c;也是推動各行各業數字化轉型和智能化升級的關鍵因素。隨著云計算、人工智能、區塊鏈等新興技術的不斷發展和融合&#xff0c;數據技術也呈現出多模態、混合處理、自動化管理等新的趨勢和特點。 8 月 19 日&#xff08;周六&…

域名和ip的關系

域名和ip的關系 一&#xff1a;什么是域名 域名&#xff0c;簡稱域名、網域&#xff0c;是由一串用點分隔的名字組成的上某一臺計算機或計算機組的名稱&#xff0c;用于在數據傳輸時標識 計算機的電子方位(有時也指地理位置)。網域名稱系統&#xff0c;有時也簡稱為域名…

【寶藏系列】嵌入式 C 語言代碼優化技巧【超詳細版】

【寶藏系列】嵌入式 C 語言代碼優化技巧【超詳細版】 文章目錄 【寶藏系列】嵌入式 C 語言代碼優化技巧【超詳細版】前言整形數除法和取余數合并除法和取余數通過2的冪次進行除法和取余數取模的一種替代方法使用數組下標全局變量使用別名變量的生命周期分割變量類型局部變量指針…

Centos下的tcpdump抓包用法

先查一下是否安裝, 無的話裝一下 (版本低的用yum install) : rpm -qa tcpdump dnf install tcpdump 1. 列出能抓包的網卡: tcpdump -D | --list-interfaces 2. 在eth0網卡上抓來源為10.1.1.1 的包, 只抓一個包 (-n這里是不解析DNS) : tcpdump -i eth0 -n src 10.1.1.1 -…

STM32 F103C8T6學習筆記3:串口配置—串口收發—自定義Printf函數

今日學習使用STM32 C8T6的串口&#xff0c;我們在經過學習筆記2的總結歸納可知&#xff0c;STM32 C8T6最小系統板上有三路串口&#xff0c;如下圖&#xff1a; 今日我們就著手學習如何配置開通這些串口進行收發&#xff0c;這里不講串口通信概念與基礎&#xff0c;可以自行網上…

一文讀懂HTML

文章目錄 HTML的歷史HTML的作用HTML的基本語言 HTML的歷史 HTML&#xff08;HyperText Markup Language&#xff09;的歷史可以追溯到20世紀90年代早期&#xff0c;它是互聯網發展的重要里程碑之一。以下是HTML的歷史概述&#xff1a; 早期階段&#xff08;1980年代末 - 1990年…

FLatten Transformer 簡化版Transformer

今天在找論文時&#xff0c;看到一篇比較新奇的論文&#xff0c;在這里跟大家分享一下&#xff0c;希望可以給一些人提供一些思路。雖然現在Transformer 比較火&#xff0c;在分割上面也應用的比較多&#xff0c;但是我一直不喜歡用&#xff0c;其中一個原因是結構太復雜了&…

golang官方限流器rate包實踐

日常開發中&#xff0c;對于某些接口有請求頻率的限制。比如登錄的接口、發送短信的接口、秒殺商品的接口等等。 官方的golang.org/x/time/rate包中實現了令牌桶的算法。 封裝限流器可以將ip、手機號這種的作為限流器組的標識。 接下來就是實例化限流器和獲取令牌函數的實現…

C++:模擬實現list及迭代器類模板優化方法

文章目錄 迭代器模擬實現 本篇模擬實現簡單的list和一些其他注意的點 迭代器 如下所示是利用拷貝構造將一個鏈表中的數據挪動到另外一個鏈表中&#xff0c;構造兩個相同的鏈表 list(const list<T>& lt) {emptyinit();for (auto e : lt){push_back(e);} }void test_…

運動路徑規劃,ROS發布期望運動軌跡

目錄 一、Python實現&#xff08;推薦方法&#xff09; 1.1代碼cubic_spline_path.py 1.2使用方法 二、C實現 參考博客 想讓機器人/智能車無人駕駛&#xff0c;要有期望路徑&#xff0c;最簡單的是一條直線&#xff0c;或者是一條光滑曲線。 生成路徑的方法有兩種&#xf…

【網絡編程(二)】NIO快速入門

NIO Java NIO 三大核心組件 Buffer&#xff08;緩沖區&#xff09;&#xff1a;每個客戶端連接都會對應一個Buffer&#xff0c;讀寫數據通過緩沖區讀寫。Channel&#xff08;通道&#xff09;&#xff1a;每個channel用于連接Buffer和Selector&#xff0c;通道可以進行雙向讀…

Linux下C++開發

Linux下C開發 Linux 系統介紹 簡介 Linux屬于多用戶多任務操作系統&#xff0c;而Windows屬于單用戶多任務操作系統Linux一切皆文件目錄結構 bin 存儲二進制可執行文件dev 存放的是外接設備&#xff0c;例如磁盤&#xff0c;光盤等。在其中的外接設備是不能直接被使用的&…

Redis數據庫的可視化工具AnotherRedisDesktopManager使用+抖音直播小玩法實踐

一、它是什么 Another Redis DeskTop Manager 是一個開源項目&#xff0c;提供了以可視化的方式管理 Redis 的功能&#xff0c;可供免費下載安裝&#xff0c;也可以在此基礎上進行二次開發&#xff0c;主要特點有&#xff1a; 支持 Windows 平臺和 MacOS 平臺 支持查詢 Key、…

2023-08-17力扣每日一題

鏈接&#xff1a; 1444. 切披薩的方案數 題意&#xff1a; 給定一個矩陣&#xff0c;其中含有多個蘋果&#xff0c;需要切割k-1次,每次可以切割多行/多列&#xff0c;需要保證切割兩個部分都有蘋果&#xff0c;移除靠上/靠右的部分&#xff0c;對留下部分進行后續的切割&…

QT中的按鈕控件Buttons介紹

目錄 Buttons 按鈕控件 1、常用屬性介紹 2、按鈕介紹 2.1QPushButton 普通按鈕 2.2QtoolButton 工具按鈕 2.3Radio Button單選按鈕 2.4CheckButton復選按鈕 2.5Commam Link Button命令鏈接按鈕 2.6Dialog Button Box命令鏈接按鈕 Buttons 按鈕控件 在Qt里&#xff0c;…

Viobot開機指南

0.前言 本篇旨在讓每個拿到Viobot設備的用戶都能夠第一時間測試它的效果&#xff0c;以及將設備配置到自己的環境下面。 1.上電 首先&#xff0c;我們先要把設備接上電源線和網線&#xff0c;最簡單的方式就是網線直連電腦。 電源選用12V1.5A設備自帶的電源即可。 2.配置網…

JavaScript中的this指向,call、apply、bind的簡單實現

JavaScript中的this this是JavaScript中一個特殊關鍵字&#xff0c;用于指代當前執行上下文中的對象。它的難以理解之處就是值不是固定的&#xff0c;是再函數被調用時根據調用場景動態確定的&#xff0c;主要根據函數的調用方式來決定this指向的對象。this 的值在函數被調用時…

深入學習前端開發,掌握HTML、CSS、JavaScript等技術

課程鏈接&#xff1a; 鏈接: https://pan.baidu.com/s/1WECwJ4T8UQfs2FyjUMbxig?pwdi654 提取碼: i654 復制這段內容后打開百度網盤手機App&#xff0c;操作更方便哦 --來自百度網盤超級會員v4的分享 課程介紹&#xff1a; 第1周&#xff1a;HTML5基礎語法與標簽 &#x1f…

web集群學習:搭建 LNMP應用環境

目錄 LNMP的介紹&#xff1a; LNMP組合工作流程&#xff1a; FastCGI介紹&#xff1a; 1、什么是 CGI 2、什么是 FastCGI 配置LNMP 1、部署LNMP環境 2、配置LNMP環境 LNMP的介紹&#xff1a; 隨著 Nginx Web 服務的逐漸流行&#xff0c;又岀現了新的 Web 服務環境組合—…

【Spring Cloud 八】Spring Cloud Gateway網關

gateway網關 系列博客背景一、什么是Spring Cloud Gateway二、為什么要使用Spring Cloud Gateway三、 Spring Cloud Gateway 三大核心概念4.1 Route&#xff08;路由&#xff09;4.2 Predicate&#xff08;斷言&#xff09;4.3 Filter&#xff08;過濾&#xff09; 五、Spring …