rabbitmq的消息應答

消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成
了部分突然它掛掉了,會發生什么情況。RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消
息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續
發送給該消費這的消息,因為它無法接收到。
為了保證消息在發送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是: 消費者在接
收到消息并且處理該消息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息刪除了。

消息應答的方法?

Channel.basicAck(用于肯定確認)

RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了

Channel.basicNack(用于否定確認)

Channel.basicReject(用于否定確認)

與 Channel.basicNack 相比少一個參數
不處理該消息了直接拒絕,可以將其丟棄了

自動應答?

消息發送后立即被認為已經傳送成功,這種模式需要在 高吞吐量和數據傳輸安全性方面做權
,因為這種模式如果消息在接收到之前,消費者那邊出現連接或者 channel 關閉,那么消息就丟
失了,當然另一方面這種模式消費者那邊可以傳遞過載的消息, 沒有對傳遞的消息數量進行限制
當然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,最終
使得內存耗盡,最終這些消費者線程被操作系統殺死, 所以這種模式僅適用在消費者可以高效并
以某種速率能夠處理這些消息的情況下使用。默認消息采用的是自動應答.

手動應答

手動應答的好處是可以批量應答并且減少網絡擁堵multiple 的 true 和 false 代表不同意思

true 代表批量應答 channel 上未應答的消息
比如說 channel 上有傳送 tag 的消息 5,6,7,8 當前 tag 是 8 那么此時
5-8 的這些還未應答的消息都會被確認收到消息應答
false 同上面相比
只會應答 tag=8 的消息 5,6,7 這三個消息依然不會被確認收到消息應答

消費者1?

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1 等待接收消息處理時間較短");DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);try {// 睡眠1秒Thread.sleep(1000*1);} catch (InterruptedException e) {throw new RuntimeException(e);}// 消息標記tag,2.false代表只應答接收到的那個傳遞的消息,true為應答所有消息包括傳遞過來的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C1 消費者啟動等待消費......");boolean autoAck=false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

?消費者2


import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker02 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2 等待接收消息處理時間較長");DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);try {// 睡眠30秒Thread.sleep(1000*30);} catch (InterruptedException e) {throw new RuntimeException(e);}// 消息標記tag,2.false代表只應答接收到的那個傳遞的消息,true為應答所有消息包括傳遞過來的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C2 消費者啟動等待消費......");boolean autoAck=false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

生產者?

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");//channel 實現了自動 close 接口 自動關閉 不需要顯示關閉try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的消息是否持久化 默認消息存儲在內存中* 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費* 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world!!!!";/*** 發送一個消息* 1.發送到那個交換機* 2.路由的 key 是哪個* 3.其他的參數信息* 4.發送消息的消息體*/for (int i = 0; i < 10; i++) {channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}System.out.println("消息發送完畢");}}
}

結果

在發送者發送消息 ,發出消息之后的把 C2 消費者停掉,按理說該 C2 來處理該消息,但是
由于它處理時間較長,在還未處理完,也就是說 C2 還沒有執行 ack 代碼的時候,C2 被停掉了,
此時會看到消息被 C1 接收到了,說明消息? 被重新入隊,然后分配給能處理消息的 C1 處理了?

消息自動重新入隊

如果消費者由于某些原因失去連接(其通道已關閉,連接已關閉或 TCP 連接丟失),導致消息
未發送 ACK 確認,RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費者
可以處理,它將很快將其重新分發給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確
保不會丟失任何消息。

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/37145.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/37145.shtml
英文地址,請注明出處:http://en.pswp.cn/news/37145.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數據分析兩件套ClickHouse+Metabase(一)

ClickHouse篇 安裝ClickHouse ClickHouse有中文文檔, 安裝簡單 -> 文檔 官方提供了四種包的安裝方式, deb/rpm/tgz/docker, 自行選擇適合自己操作系統的安裝方式 這里我們選deb的方式, 其他方式看文檔 sudo apt-get install -y apt-transport-https ca-certificates dirm…

魔改 axuanup 的 aardio和python 猜拳游戲 代碼

根據 axuanup 的 aardio和python 猜拳游戲 代碼&#xff0c;魔改了一個風格不一樣的代碼。 爭取做到代碼盡量“簡”&#xff0c;但還沒到“變態簡”的程度&#xff0c;因為還能看懂。 原文&#xff1a;aardio和python 猜拳游戲-自由交流樂園-Aardio資源網 代碼如下&#xff…

【Flutter】【基礎】CustomPaint 繪畫功能(一)

功能&#xff1a;CustomPaint 相當于在一個畫布上面畫畫&#xff0c;可以自己繪制不同的顏色形狀等 在各種widget 或者是插件不能滿足到需求的時候&#xff0c;可以自己定義一些形狀 使用實例和代碼&#xff1a; CustomPaint&#xff1a; 能使你繪制的東西顯示在你的ui 上面&a…

競賽項目 酒店評價的情感傾向分析

前言 &#x1f525; 優質競賽項目系列&#xff0c;今天要分享的是 酒店評價的情感傾向分析 該項目較為新穎&#xff0c;適合作為競賽課題方向&#xff0c;學長非常推薦&#xff01; &#x1f9ff; 更多資料, 項目分享&#xff1a; https://gitee.com/dancheng-senior/post…

解決QTabelView無法立即刷新問題

解決QTabelView無法理解刷新問題 在某些時候&#xff0c;Qt的奇葩現象&#xff0c;調試中QTabelView的相關model數據變更了&#xff0c;界面卻沒立即刷新&#xff0c;然而&#xff0c;點擊標題欄等才刷新&#xff0c;奇葩。很多網上資料說QTabelView::update()和QTabelView::r…

用Python做一個滑雪小游戲

游戲是讓人娛樂和放松的好方式&#xff0c;而編寫和玩自己的游戲則是一種特別有趣的體驗。在本文中&#xff0c;我們將使用Python和pygame庫來創建一個簡單的滑雪小游戲。通過這個小游戲項目&#xff0c;我們將學習如何使用Python編程語言來制作自己的游戲&#xff0c;并且享受…

IT運維:使用數據分析平臺監控深信服防火墻

概述 深信服防火墻自身監控可以滿足絕大部分需求&#xff0c;比如哪個應用占了最大帶寬&#xff0c;哪個用戶訪問了哪些網站&#xff1f;這里我們為什么使用鴻鵠呢&#xff1f;因為我們要的是數據的處理和分析&#xff0c;比如某個用戶在某個事件都做了哪些行為&#xff0c;這個…

【設計模式】前端控制器模式

前端控制器模式&#xff08;Front Controller Pattern&#xff09;是用來提供一個集中的請求處理機制&#xff0c;所有的請求都將由一個單一的處理程序處理。該處理程序可以做認證/授權/記錄日志&#xff0c;或者跟蹤請求&#xff0c;然后把請求傳給相應的處理程序。以下是這種…

基于鯤鵬平臺Ceph深度性能調優

劉亮奇 架構師技術聯盟 2021-04-12 07:50 摘自&#xff1a; https://mp.weixin.qq.com/s/o9HH-8TF0DbMqHrvsFh1NA 隨著 IOT、大數據、移動互聯等應用的暴漲&#xff0c;產生的數據也越來越多&#xff0c;整個存儲市場總量也逐年增長&#xff0c;預計到 2021 年分布式存儲會占到…

UNIX基礎知識:UNIX體系結構、登錄、文件和目錄、輸入和輸出、程序和進程、出錯處理、用戶標識、信號、時間值、系統調用和庫函數

引言&#xff1a; 所有的操作系統都為運行在其上的程序提供服務&#xff0c;比如&#xff1a;執行新程序、打開文件、讀寫文件、分配存儲區、獲得系統當前時間等等 1. UNIX體系結構 從嚴格意義上來說&#xff0c;操作系統可被定義為一種軟件&#xff0c;它控制計算機硬件資源&…

CTFshow 限時活動 紅包挑戰7、紅包挑戰8

CTFshow紅包挑戰7 寫不出來一點&#xff0c;還是等了官方wp之后才復現。 直接給了源碼 <?php highlight_file(__FILE__); error_reporting(2);extract($_GET); ini_set($name,$value);system("ls ".filter($_GET[1])."" );function filter($cmd){$cmd…

【圖像分類】理論篇(2)經典卷積神經網絡 Lenet~Densenet

1、卷積運算 在二維卷積運算中&#xff0c;卷積窗口從輸入張量的左上角開始&#xff0c;從左到右、從上到下滑動。 當卷積窗口滑動到新一個位置時&#xff0c;包含在該窗口中的部分張量與卷積核張量進行按元素相乘&#xff0c;得到的張量再求和得到一個單一的標量值&#xff0c…

Java 集合擴容概括

參考博文&#xff1a; java集合的擴容機制_這個名字先用著的博客-CSDN博客 # ArrayList 可隨著元素的增長而自動擴容&#xff0c;正常擴容的話&#xff0c;每次擴容到原來的 1.5倍。 # ArrayList 和Vector擴容機制總結&#xff1a; ArrayList 和Vector,底層都是Object數組…

SQL- 每日一題【1327. 列出指定時間段內所有的下單產品】

題目 表: Products 表: Orders 寫一個解決方案&#xff0c;要求獲取在 2020 年 2 月份下單的數量不少于 100 的產品的名字和數目。 返回結果表單的 順序無要求 。 查詢結果的格式如下。 示例 1: 解題思路 1.題目要求我們獲取在 2020 年 2 月份下單的數量不少于 100 的產品的…

如何重置樹莓派 Pico(重置外圍設備失敗)

有時候需要重置樹莓派 Pico&#xff0c;一種方法是按住 Pico 上的“BOOTSEL”按鈕再插入 USB&#xff1b;或者用按鈕連接“RUN”和“GND”針腳&#xff0c;然后同時按下這個按鈕和“BOOTSEL”按鈕。這樣就可以進入 USB 模式&#xff0c;這樣從一定程度進行了重置。 但是這種方…

IO多路復用

常見的網絡IO模型 網絡 IO 模型分為四種&#xff1a;同步阻塞 IO(Blocking IO, BIO)、同步非阻塞IO(NIO, NewIO)、IO 多路復用、異步非阻塞 IO(Async IO, AIO)&#xff0c;其中AIO為異步IO&#xff0c;其他都是同步IO 同步阻塞IO 同步阻塞IO&#xff1a;在線程處理過程中&am…

劍指Offer10-I.斐波那契數列 C++

1、題目描述 寫一個函數&#xff0c;輸入 n &#xff0c;求斐波那契&#xff08;Fibonacci&#xff09;數列的第 n 項&#xff08;即 F(N)&#xff09;。斐波那契數列的定義如下&#xff1a; F(0) 0, F(1) 1 F(N) F(N - 1) F(N - 2), 其中 N > 1. 斐波那契數列由 0 和 …

Redis_事務操作

13. redis事務操作 13.1事務簡介 原子性(Atomicity) 一致性(Consistency) 隔離性(isolation) 持久性(durabiliby) ACID 13.2 Redis事務 提供了multi、exec命令來完成 第一步&#xff0c;客戶端使用multi命令顯式地開啟事務第二步&#xff0c;客戶端把事務中要執行的指令發…

前沿分享-通過經皮神經刺激來治療糖尿病神經性疼痛

經皮神經電刺激&#xff08;PENS&#xff09;設備用于對糖尿病周圍神經病變引起的慢性、頑固性疼痛進行多次治療。 放在耳朵上的這種可穿戴設備在幾天內持續提供低水平的脈沖電流。 這是一種安全有效的非麻醉性替代治療慢性疼痛的方法。還有一張設備放在糖足上的照片&#xff0…