rabbitmq 不同的消費者消費同一個隊列_RabbitMQ 消費端限流、TTL、死信隊列

消費端限流

1. 為什么要對消費端限流

假設一個場景,首先,我們 Rabbitmq 服務器積壓了有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數據!

當數據量特別大的時候,我們對生產端限流肯定是不科學的,因為有時候并發量就是特別大,有時候并發量又特別少,我們無法約束生產端,這是用戶的行為。所以我們應該對消費端限流,用于保持消費端的穩定,當消息數量激增的時候很有可能造成資源耗盡,以及影響服務的性能,導致系統的卡頓甚至直接崩潰。

2.限流的 api 講解

RabbitMQ 提供了一種 qos (服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息(通過基于 consume 或者 channel 設置 Qos 的值)未被確認前,不進行消費新的消息。

/*** Request specific "quality of service" settings.* These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* @param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @param global true if the settings should be applied to the* entire channel rather than each consumer* @throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • prefetchSize:0,單條消息大小限制,0代表不限制
  • prefetchCount:一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 將 block 掉,直到有消息 ack。
  • global:true、false 是否將上面設置應用于 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別。當我們設置為 false 的時候生效,設置為 true 的時候沒有了限流功能,因為 channel 級別尚未實現。
  • 注意:prefetchSize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchCount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的。

3.如何對消費端進行限流

  • 首先第一步,我們既然要使用消費端限流,我們需要關閉自動 ack,將 autoAck 設置為 falsechannel.basicConsume(queueName, false, consumer);
  • 第二步我們來設置具體的限流大小以及數量。channel.basicQos(0, 15, false);
  • 第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設置批量處理 ack 回應為 truechannel.basicAck(envelope.getDeliveryTag(), true);

這是生產端代碼,與前幾章的生產端代碼沒有做任何改變,主要的操作集中在消費端。

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class QosProducer { public static void main(String[] args) throws Exception { //1. 創建一個 ConnectionFactory 并進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連接工廠來創建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創建 Channel Channel channel = connection.createChannel(); //4. 聲明 String exchangeName = "test_qos_exchange"; String routingKey = "item.add"; //5. 發送 String msg = "this is qos msg"; for (int i = 0; i < 10; i++) { String tem = msg + " : " + i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println("Send message : " + tem); } //6. 關閉連接 channel.close(); connection.close(); }}

這里我們創建了兩個消費者,以方便驗證限流api中的 global 參數設置為 true 時不起作用.。整體結構如下圖所示,兩個 Consumer 都綁定在同一個隊列上,這樣的話兩個消費者將共同消費發送的10條消息。

4dc3878d43629026932a8e33f192e33c.png
import com.rabbitmq.client.*;import java.io.IOException;public class QosConsumer { public static void main(String[] args) throws Exception { //1. 創建一個 ConnectionFactory 并進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通過連接工廠來創建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創建 Channel final Channel channel = connection.createChannel(); //4. 聲明 String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String queueName1 = "test_qos_queue_1"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/388887.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/388887.shtml
英文地址,請注明出處:http://en.pswp.cn/news/388887.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

動量策略 python_在Python中使用動量通道進行交易

動量策略 pythonMost traders use Bollinger Bands. However, price is not normally distributed. That’s why only 42% of prices will close within one standard deviation. Please go ahead and read this article. However, I have some good news.大多數交易者使用布林…

css3 變換、過渡效果、動畫

1 CSS3 選擇器 1.1 基本選擇器 1.2 層級 空格 > .itemli ~ .item~p 1.3 屬性選擇器 [attr] [attrvalue] [attr^value] [attr$value] [attr*value] [][][] 1.4 偽類選擇器 :link :visited :hover :active :focus :first-child .list li:first-child :last-chi…

webservice 啟用代理服務器

您會發現你寫完了一個webservice在調用的時候發現怎也沒辦法調用&#xff0c;一個簡單的webservice怎么不能使用&#xff0c;一肚子的怨恨&#xff0c;哈哈您可能沒有為webservice設置代理。 下面就給您寫個調用的用例和大家分享下。其實很簡單&#xff0c;但是你沒有想到的時…

mysql常用的存儲引擎_Mysql存儲引擎

什么是存儲引擎&#xff1f;關系數據庫表是用于存儲和組織信息的數據結構&#xff0c;可以將表理解為由行和列組成的表格&#xff0c;類似于Excel的電子表格的形式。有的表簡單&#xff0c;有的表復雜&#xff0c;有的表根本不用來存儲任何長期的數據&#xff0c;有的表讀取時非…

android studio設計模式和文本模式切換

轉載于:https://www.cnblogs.com/judes/p/9437104.html

高斯模糊為什么叫高斯濾波_為什么高斯是所有發行之王?

高斯模糊為什么叫高斯濾波高斯分布及其主要特征&#xff1a; (Gaussian Distribution and its key characteristics:) Gaussian distribution is a continuous probability distribution with symmetrical sides around its center. 高斯分布是連續概率分布&#xff0c;其中心周…

C# webbrowser 代理

百度&#xff0c;google加自己理解后&#xff0c;將所得方法總結一下&#xff1a; 方法1&#xff1a;修改注冊表Software//Microsoft//Windows//CurrentVersion//Internet Settings下 ProxyEnable和ProxyServer。這種方法適用于局域網用戶&#xff0c;撥號用戶無效。 1p…

C MySQL讀寫分離連接串_Mysql讀寫分離

一 什么是讀寫分離MySQL Proxy最強大的一項功能是實現“讀寫分離(Read/Write Splitting)”。基本的原理是讓主數據庫處理事務性查詢&#xff0c;而從數據庫處理SELECT查詢。數據庫復制被用來把事務性查詢導致的變更同步到集群中的從數據庫。當然&#xff0c;主服務器也可以提供…

golang 編寫的在線redis 內存分析工具 rma4go

redis 內存分析工具 rma4go redis是一個很有名的內存型數據庫&#xff0c;這里不做詳細介紹。而rma4go (redis memory analyzer for golang) 是一個redis的內存分析工具&#xff0c;這個工具的主要作用是針對運行時期的redis進行內存的分析&#xff0c;統計redis中key的分布情…

從Jupyter Notebook到腳本

16 Aug: My second article: From Scripts To Prediction API8月16日&#xff1a;我的第二篇文章&#xff1a; 從腳本到預測API As advanced beginners, we know quite a lot: EDA, ML concepts, model architectures etc…… We can write a big Jupyter Notebook, click “Re…

【EasyNetQ】- 使用Future Publish調度事件

許多業務流程要求在將來某個日期安排事件。例如&#xff0c;在與客戶進行初次銷售聯系后&#xff0c;我們可能希望在將來的某個時間安排跟進電話。EasyNetQ可以通過其Future Publish功能幫助您實現此功能。例如&#xff0c;這里我們使用FuturePublish擴展方法來安排未來一個月的…

Java這些多線程基礎知識你會嗎?

0、并發和并行、進程核線程、多進程和多線程的區別&#xff1a; &#xff08;這里的時間和時刻上的概念同物理上的一樣&#xff09; 并發&#xff1a;在一段時間內多個任務同時執行&#xff0c;或者說是在一段很短的時間內可以執行多條程序指令&#xff0c;微觀上看起來好像是可…

MySQL set names 命令_mysql set names 命令和 mysql 字符編碼問題

先看下面的執行結果&#xff1a;(rootlocalhost)[(none)]mysql>show variables like character%;---------------------------------------------------------------------------------------| Variable_name | Value |---------------------------------------------------…

設置Proxy Server和SQL Server實現數據庫安全

首先&#xff0c;我們需要了解一下SQL Server在WinSock上定義協議的步驟&#xff1a; 1. 在”啟動”菜單上&#xff0c;指向”程序/Microsoft Proxy Server”&#xff0c;然后點擊”Microsoft Management Console”。 2. 展開”Internet Information Service”,再展開運行Proxy…

Python django解決跨域請求的問題

解決方案 1.安裝django-cors-headers pip3 install django-cors-headers 2.配置settings.py文件 INSTALLED_APPS [...corsheaders&#xff0c;...] MIDDLEWARE_CLASSES (...corsheaders.middleware.CorsMiddleware,django.middleware.common.CommonMiddleware, # 注意順序...…

加勒比海兔_加勒比海海洋物種趨勢

加勒比海兔Ok, here’s a million dollar question: is the Caribbean really dying? Or, more specifically, are marine species found on Caribbean reefs becoming less abundant?好吧&#xff0c;這是一個百萬美元的問題&#xff1a;加勒比海真的死了嗎&#xff1f; 或者…

mysql 查出相差年數_MySQL計算兩個日期相差的天數、月數、年數

MySQL自帶的日期函數TIMESTAMPDIFF計算兩個日期相差的秒數、分鐘數、小時數、天數、周數、季度數、月數、年數&#xff0c;當前日期增加或者減少一天、一周等等。SELECT TIMESTAMPDIFF(類型,開始時間,結束時間)相差的秒數&#xff1a;SELECT TIMESTAMPDIFF(SECOND,1993-03-23 0…

tornado 簡易教程

引言 回想Django的部署方式 以Django為代表的python web應用部署時采用wsgi協議與服務器對接&#xff08;被服務器托管&#xff09;&#xff0c;而這類服務器通常都是基于多線程的&#xff0c;也就是說每一個網絡請求服務器都會有一個對應的線程來用web應用&#xff08;如Djang…

如果你的電腦是通過代理上網的.就要用端口映射

由于公網IP地址有限&#xff0c;不少ISP都采用多個內網用戶通過代理和網關路由共用一個公網IP上INTERNET的方法&#xff0c; 這樣就限制了這些用戶在自己計算機上架設個人網站&#xff0c;要實現在這些用戶端架設網站&#xff0c;最關鍵的一點是&#xff0c; 怎樣把多用戶的內網…

人口密度可視化_使用GeoPandas可視化菲律賓的人口密度

人口密度可視化GeoVisualization /菲律賓。 (GeoVisualization /Philippines.) Population density is a crucial concept in urban planning. Theories on how it affects economic growth are divided. Some claim, as Rappaport does, that an economy is a form of “spati…