【RabbitMQ】 WorkQueues

消息分發

在【RabbitMQ】 HelloWorld中我們寫了發送/接收消息的程序。這次我們將創建一個Work Queue用來在多個消費者之間分配耗時任務。

Work Queues(又稱為:Task Queues)的主要思想是:盡可能的減少執行資源密集型任務時的等待時間。我們將任務封裝為消息并發送到隊列,在后臺的工作進程將彈出任務并進行作業。當你運行很多worker,任務將在他們之間共享。

這個概念在WEB應用中尤為有效,因為在一個HTTP請求進行復雜操作是不可能的。

準備

在上一節我們發送了一條包含“Hello World”的消息。現在我們將要發送代表復雜任務的字符串。我們沒有真實場景的復雜任務,例如調整圖片大小或呈現PDF文件,讓我們假裝自己很忙 - 通過Thread.sleep()。我們將根據字符串中“.”的數量來衡量任務復雜度;每一個“.”增加1秒鐘的工作時間。例如:一個“Hello...”將消耗3秒鐘。

稍微修改下上一節中Send.java的代碼,讓我們可以從命令行參數中輸入任意字符作為消息。這個程序將給我們的工作隊列安排消息,命名為NewTask.java

String message = getMessage(argv);channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

一些封裝方法來幫助我們從命令行參數中得到消息(簡單來說就是將所有的命令行參數當做一條完整消息):

private static String getMessage(String[] strings){if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");
}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();
}

老的Recv.java程序也需要一些修改:他需要為消息中的每一個“.”偽造1秒鐘的工作時間。稱為Worker.java

final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}}
};
boolean autoAck = true; // acknowledgment is covered below  消息確認,在后面會詳細講解
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

模擬任務執行:

private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}
}

循環調度

使用Task Queue的優點之一就是可以輕松的進行并行工作。如果我們正在構建一個積壓的工作,我們可以僅僅通過添加更多的workers來解決。

首先,同時運行兩個worker實例,他們都會從隊列中得到消息,但事實上是什么樣的呢?讓我們看一看:

在IDEA中運行兩次Worker.java,然后他們兩個都會處于等待消息狀態。運行NewTask.java,并攜帶命令行參數,可以在Edit Configurations中設置Program arguements。下面為官方教程中的命令行版本:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

主要觀察兩個worker的輸出:

worker1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'First message.'[x] Received 'Third message...'[x] Received 'Fifth message.....'
worker2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..'[x] Received 'Fourth message....'

默認的,RabbitMQ將會按照順序,以此發送每一條消息到每一個消費者。平均每個消費者是可以獲得相同數量的消息的。這種分發消息的方式稱為循環。

消息確認

?完成一個任務需要消耗一定時間,你可能想知道如果一個消費者開始了一個很長的任務,在僅僅完成了一部分的時候,死掉了,將會發生什么。在我們當前的代碼中,一旦RabbitMQ分發一條消息給消費者,立即就會將該條消息從內存中刪除。這種情況下,如果你殺掉一個worker,我們將會丟失它正在操作的消息。我們也會失去所有分發給他的還未處理的消息。

但是我們不想丟失任何消息。如果worker死掉,我們期望這個任務被重新分發給另一個worker。

為了確保消息從來沒有丟失,RabbitMQ支持消息確認(acknowledgments)。一個確認是從消費者處發送以告訴RabbitMQ指定的消息收到了,處理完成了,RabbitMQ可以刪除它了。

如果一個消費者宕機(channel關閉,connection關閉,TCP連接丟失等),沒有發送ack,RabbitMQ將會知道這條消息沒有處理完成,將會重新排隊。如果此時存在其它消費者,將會迅速轉發給其它消費者。這樣你就可以確保消息不會丟失,即使進程偶爾宕機。

這里不存在消息超時,RabbitMQ在消費者宕機后會重發消息。即使處理數據用了很長很長的時間這也是沒有問題的。

默認的消息確認是被打開的。上面的例子中我們通過autoAck=true明確關閉了它。下面我們打開它,每當處理完一個任務,就發送回一個適當的確認消息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)  每次接收一個未處理消息final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

使用現在的代碼,我們可以保證即使在操作消息的時候通過CTRL+C關閉了一個消費者,也不會丟失消息。不久后,所有未處理完成的消息都會被重新發送。

Forgotten acknowledgment

忘記設置basicAck是很普通的事情,但是結果卻很嚴重。當客戶端退出(這可能聽起來像隨機分發)消息會被重新發送,但是RabbitMQ會吃掉越來越多的內容,因為它不會釋放任何沒有被確認的消息。

調試這種錯誤的使用rabbitmqctl來打印messages_unacknowledged的部分:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

消息持久化

我們學習了如何在消費者宕機的情況下保證數據不丟失。但是在RabbitMQ服務器宕機的情況下,數據依然是會丟失的。

當RabbitMQ退出或崩潰,它會忘記所有的隊列和消息,除非你告訴它不要。兩件事情來確保消息未丟失:我們需要標記隊列和消息為持久化的。

首先,我們需要確保RabbitMQ從來不會丟失隊列。因此我們需要聲明隊列為持久化的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

這行代碼是沒有問題的,但是在我們的環境下是錯誤的。這是因為我們已經定義了一個叫做hello的非持久化隊列。RabbitMQ不允許重新定義已經存在的隊列(使用不用參數)。這里有一個快速的方法 - 定義一個不同名字的隊列,如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

這個queueDeclare需要同時更改生產者和消費者的代碼。

現在我們確保了task_queue在RabbitMQ重啟的狀態下也不會丟失。現在我們需要去標記我們的消息為持久化的 - 通過設置MessageProperties(實現了BasicProperties)的常量值:PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

注意消息持久化:

標記了消息為持久化也不能完全保證消息不會丟失。盡管告訴了RabbitMQ將消息保存在磁盤中,RabbitMQ剛剛接收數據,還沒有保存的時候,這個時間區間是無法持久化的。同事,RabbitMQ沒有對每條消息都進行fsync(2) -- 也許僅僅保存在緩存中并沒有真正寫入硬盤。持久化并不健壯,但是對于處理簡單的任務隊列已經足夠了。如果你需要更加強健的保證可以使用publisher confirms

公平分發

你可能注意到有時候分發還是無法解決我們的某些問題。例如在某種情況下,有兩個消費者,當所有的奇數消息非常大,偶數消息很小,一個消費者將會持續不斷的工作,另一個消費者基本不工作。但是RabbitMQ并不知道這種情況,依然是依次分發。

這是因為RabbitMQ在消息進入隊列是進行分發。并不探查消息的數量。僅僅是發送第n條消息給第n個消費者。

為了解決這個問題,我們可以使用basicQos方法,設置參數為prefetchCount = 1。這會告訴RabbitMQ每次只給一個消費者一條消息。或者說,不要在消費者正在處理和確認消息的時候發送新的消息給他們。相反,它將分發消息給下一個不忙的消費者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意隊列的大小

如果所有的消費者都處于繁忙狀態,隊列會填滿。可以添加更多的消費者或者其它方案。

Putting it all together

NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = getMessage(argv);channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}private static String getMessage(String[] strings) {if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();}
}

Worker.java

import com.rabbitmq.client.*;import java.io.IOException;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(TASK_QUEUE_NAME, false, consumer);}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

?

轉載于:https://www.cnblogs.com/shiyu404/p/6251773.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/394240.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/394240.shtml
英文地址,請注明出處:http://en.pswp.cn/news/394240.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python matplotlib庫安裝出錯_使用pip install Matplotlib時出現內存錯誤

我使用的是Python2.7&#xff0c;如果我試圖安裝Matplotlib&#xff0c;如果我使用“pip install Matplotlib”&#xff0c;就會出現這個錯誤Exception:Traceback (most recent call last):File "/usr/local/lib/python2.7/dist-packages/pip/basecommand.py", line …

笑看職場什么程序員才搶手,什么樣的程序員漲薪多?

?程序員&#xff0c;怎么才算合格&#xff0c;不好說吧&#xff1b;他就像銷售一樣&#xff0c;一名銷售員&#xff0c;比如網絡銷售賣茶葉&#xff0c;他賣茶葉很厲害呀&#xff0c;可是你讓他去銷售房地產&#xff0c;就算他有點銷售的基礎&#xff0c;也要重新去學怎么銷售…

Android畫布Canvas裁剪clipRect,Kotlin

Android畫布Canvas裁剪clipRect&#xff0c;Kotlin private fun mydraw() {val originBmp BitmapFactory.decodeResource(resources, R.mipmap.pic).copy(Bitmap.Config.ARGB_8888, true)val newBmp Bitmap.createBitmap(originBmp.width, originBmp.height, Bitmap.Config.A…

調查|73%的公司正使用存在漏洞的超期服役設備

本文講的是調查&#xff5c;73%的公司正使用存在漏洞的超期服役設備&#xff0c;一份新近的調查覆蓋了北美350家機構的212000臺思科設備。結果顯示&#xff0c;73%的企業正在使用存在漏洞、超期服役的網絡設備。該數字在上一年僅為60%。 Softchoice公司思科部門業務主管大衛魏格…

為什么要做稀疏編碼_為什么我每天都要編碼一年,所以我也學到了什么,以及如何做。...

為什么要做稀疏編碼by Paul Rail由Paul Rail 為什么我每天都要編碼一年&#xff0c;所以我也學到了什么&#xff0c;以及如何做。 (Why I coded every day for a year, what I learned, and how you can do it, too.) I was looking to switch careers. The world today is no…

深度裝機大師一鍵重裝_筆記本怎么重裝系統?筆記本自己如何重裝系統?

如何給筆記本重裝系統呢?筆記本系統使用時間長了難免會運行緩慢&#xff0c;我們第一反應就是重裝系統筆記本了。但是很多小白用戶們就惆悵了&#xff0c;不知道筆記本怎么重裝系統&#xff0c;怎么進行重裝系統筆記本呢?首先&#xff0c;筆記本電腦可以重置系統&#xff0c;…

leetcode劍指 Offer 11. 旋轉數組的最小數字(二分查找)

把一個數組最開始的若干個元素搬到數組的末尾&#xff0c;我們稱之為數組的旋轉。輸入一個遞增排序的數組的一個旋轉&#xff0c;輸出旋轉數組的最小元素。例如&#xff0c;數組 [3,4,5,1,2] 為 [1,2,3,4,5] 的一個旋轉&#xff0c;該數組的最小值為1。 示例 1&#xff1a; 輸…

XMLHttpRequest狀態碼及相關事件

1.創建一個XMLHttpRequest對象 2.對XMLHttpRequest對象進行事件的監聽(定義監聽事件的位置不影響 3.對XMLHttpRequest對象的狀態碼 狀態 名稱描述0Uninitialized初始化狀態。XMLHttpRequest 對象已創建或已被 abort() 方法重置1Open open() 方法已調用&#xff0c;但是 send()…

-code vs 1474 十進制轉m進制

1474 十進制轉m進制 時間限制: 1 s空間限制: 128000 KB題目等級 : 白銀 Silver題解查看運行結果題目描述 Description將十進制數n轉換成m進制數 m<16 n<100 輸入描述 Input Description共一行 n和m 輸出描述 Output Description共一個數 表示n的m進制 樣例輸入 Sample In…

人工智能時代號角已吹響 COMPUTEX如何凝聚AI這股力量?

當前談到人工智能&#xff08;AI&#xff09;&#xff0c;或許大家最直接的反應是Google的AlphaGo&#xff0c;但比起“遙不可及”的圍棋機器人&#xff0c;其實AI早就融入人們生活&#xff0c;就像蘋果手機中的語音助手Siri&#xff0c;如此“平易近人”。從自動駕駛、機器人、…

python寫入文字到txt只寫入最后一行_python文件寫入:向txt寫入內容的設置

創建文本流的最簡單方法是使用 open(),可以選擇指定編碼: f=open("myfile.txt","r",encoding="utf-8") 但是更為安全的方法是: with open("myfile.txt","w",encoding="utf-8") as f: f.write(str) 還可以設置…

python自帶ide和pycharm哪個好_排名前三的Python IDE你選擇哪個?我選PyCharm

世界上最好的 Python 編輯器或 IDE 是什么&#xff1f;炫酷的界面、流暢的體驗&#xff0c;我們投 PyCharm一票&#xff0c;那么你呢&#xff1f;編輯Python程序&#xff0c;您有許多選項。有些人仍然喜歡一個基本的文本編輯器&#xff0c;如Emacs&#xff0c;VIM或Gedit&#…

leetcode1254. 統計封閉島嶼的數目(dfs)

有一個二維矩陣 grid &#xff0c;每個位置要么是陸地&#xff08;記號為 0 &#xff09;要么是水域&#xff08;記號為 1 &#xff09;。 我們從一塊陸地出發&#xff0c;每次可以往上下左右 4 個方向相鄰區域走&#xff0c;能走到的所有陸地區域&#xff0c;我們將其稱為一座…

Dash的快速入門將使您在5分鐘內進入“ Hello World”

by Anuj Pahade由Anuj Pahade Dash的快速入門將使您在5分鐘內進入“ Hello World” (This quick intro to Dash will get you to “Hello World” in under 5 minutes) Dash is an open source library for creating reactive apps in Python. You can create amazing dashboa…

JSON/xml

JSON是什么&#xff1a; JSON(JavaScriptObject Notation, JS 對象簡譜) 是一種輕量級的數據交換格式。它基于ECMAScript(歐洲計算機協會制定的js規范)的一個子集&#xff0c;采用完全獨立于編程語言的文本格式來存儲和表示數據。簡潔和清晰的層次結構使得 JSON 成為理想的數據…

unity開寶箱動畫_[技術博客]Unity3d 動畫控制

在制作游戲時&#xff0c;導入的箱子模型本身自帶動畫。然而&#xff0c;它的動畫是一個從打開到關閉的完整過程&#xff0c;并且沒有給出控制打開關閉的方法。最直接的想法是對該動畫進行拆分&#xff0c;再封裝成不同的動畫狀態&#xff0c;但是不巧的是&#xff0c;這個動畫…

php上傳大文件時,服務器端php.ini文件中需要額外修改的選項

幾個修改點&#xff1a; 1、upload_max_filesize 上傳的最大文件 2、post_max_size 上傳的最大文件 3、max_execution_time 修改為0表示無超時&#xff0c;一直等待 4、max_input_time 參考網址&#xff1a; 在php.ini中把max_input_time 和 max_execution_time設置得特別長…

《中國人工智能學會通訊》——11.21 結束語

11.21 結束語 本文針對交通流的網絡性、異質性和動態性特點&#xff0c;結合當前多任務學習方法的不足提出了相應的解決方案。然而&#xff0c;在實際的應用場景中還存在更多的挑戰&#xff0c;需要進一步深入的研究方向包括&#xff1a;① 高維任務的共同學習方法。在高維數據…

如何把一個軟件嵌入另一個軟件_自動化正在成為一個“軟件”行業

摘要在智能制造時代&#xff0c;自動化行業正在成為一個軟件行業&#xff0c;它正在改變著整個產業的未來&#xff0c;也將為制造業帶來更為廣闊的空間。自動化正在成為一個“軟件”行業&#xff0c;在智能時代&#xff0c;軟件正在成為自動化行業競爭的關鍵。自動化已然成為軟…

leetcode1020. 飛地的數量(dfs)

給出一個二維數組 A&#xff0c;每個單元格為 0&#xff08;代表海&#xff09;或 1&#xff08;代表陸地&#xff09;。 移動是指在陸地上從一個地方走到另一個地方&#xff08;朝四個方向之一&#xff09;或離開網格的邊界。 返回網格中無法在任意次數的移動中離開網格邊界…