消息分發
在【RabbitMQ】 HelloWorld中我們寫了發送/接收消息的程序。這次我們將創建一個Work Queue用來在多個消費者之間分配耗時任務。
Work Queues(又稱為:Task Queues)的主要思想是:盡可能的減少執行資源密集型任務時的等待時間。我們將任務封裝為消息并發送到隊列,在后臺的工作進程將彈出任務并進行作業。當你運行很多worker,任務將在他們之間共享。
這個概念在WEB應用中尤為有效,因為在一個HTTP請求進行復雜操作是不可能的。
準備
在上一節我們發送了一條包含“Hello World”的消息。現在我們將要發送代表復雜任務的字符串。我們沒有真實場景的復雜任務,例如調整圖片大小或呈現PDF文件,讓我們假裝自己很忙 - 通過Thread.sleep()。我們將根據字符串中“.”的數量來衡量任務復雜度;每一個“.”增加1秒鐘的工作時間。例如:一個“Hello...”將消耗3秒鐘。
稍微修改下上一節中Send.java的代碼,讓我們可以從命令行參數中輸入任意字符作為消息。這個程序將給我們的工作隊列安排消息,命名為NewTask.java:
String message = getMessage(argv);channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
一些封裝方法來幫助我們從命令行參數中得到消息(簡單來說就是將所有的命令行參數當做一條完整消息):
private static String getMessage(String[] strings){if (strings.length < 1)return "Hello World!";return joinStrings(strings, " "); }private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString(); }
老的Recv.java程序也需要一些修改:他需要為消息中的每一個“.”偽造1秒鐘的工作時間。稱為Worker.java:
final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}} }; boolean autoAck = true; // acknowledgment is covered below 消息確認,在后面會詳細講解 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
模擬任務執行:
private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);} }
循環調度
使用Task Queue的優點之一就是可以輕松的進行并行工作。如果我們正在構建一個積壓的工作,我們可以僅僅通過添加更多的workers來解決。
首先,同時運行兩個worker實例,他們都會從隊列中得到消息,但事實上是什么樣的呢?讓我們看一看:
在IDEA中運行兩次Worker.java,然后他們兩個都會處于等待消息狀態。運行NewTask.java,并攜帶命令行參數,可以在Edit Configurations中設置Program arguements。下面為官方教程中的命令行版本:
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message.....
主要觀察兩個worker的輸出:
worker1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'First message.'[x] Received 'Third message...'[x] Received 'Fifth message.....'
worker2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..'[x] Received 'Fourth message....'
默認的,RabbitMQ將會按照順序,以此發送每一條消息到每一個消費者。平均每個消費者是可以獲得相同數量的消息的。這種分發消息的方式稱為循環。
消息確認
?完成一個任務需要消耗一定時間,你可能想知道如果一個消費者開始了一個很長的任務,在僅僅完成了一部分的時候,死掉了,將會發生什么。在我們當前的代碼中,一旦RabbitMQ分發一條消息給消費者,立即就會將該條消息從內存中刪除。這種情況下,如果你殺掉一個worker,我們將會丟失它正在操作的消息。我們也會失去所有分發給他的還未處理的消息。
但是我們不想丟失任何消息。如果worker死掉,我們期望這個任務被重新分發給另一個worker。
為了確保消息從來沒有丟失,RabbitMQ支持消息確認(acknowledgments)。一個確認是從消費者處發送以告訴RabbitMQ指定的消息收到了,處理完成了,RabbitMQ可以刪除它了。
如果一個消費者宕機(channel關閉,connection關閉,TCP連接丟失等),沒有發送ack,RabbitMQ將會知道這條消息沒有處理完成,將會重新排隊。如果此時存在其它消費者,將會迅速轉發給其它消費者。這樣你就可以確保消息不會丟失,即使進程偶爾宕機。
這里不存在消息超時,RabbitMQ在消費者宕機后會重發消息。即使處理數據用了很長很長的時間這也是沒有問題的。
默認的消息確認是被打開的。上面的例子中我們通過autoAck=true明確關閉了它。下面我們打開它,每當處理完一個任務,就發送回一個適當的確認消息。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) 每次接收一個未處理消息final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}} }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
使用現在的代碼,我們可以保證即使在操作消息的時候通過CTRL+C關閉了一個消費者,也不會丟失消息。不久后,所有未處理完成的消息都會被重新發送。
Forgotten acknowledgment
忘記設置basicAck是很普通的事情,但是結果卻很嚴重。當客戶端退出(這可能聽起來像隨機分發)消息會被重新發送,但是RabbitMQ會吃掉越來越多的內容,因為它不會釋放任何沒有被確認的消息。
調試這種錯誤的使用rabbitmqctl來打印messages_unacknowledged的部分:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
消息持久化
我們學習了如何在消費者宕機的情況下保證數據不丟失。但是在RabbitMQ服務器宕機的情況下,數據依然是會丟失的。
當RabbitMQ退出或崩潰,它會忘記所有的隊列和消息,除非你告訴它不要。兩件事情來確保消息未丟失:我們需要標記隊列和消息為持久化的。
首先,我們需要確保RabbitMQ從來不會丟失隊列。因此我們需要聲明隊列為持久化的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
這行代碼是沒有問題的,但是在我們的環境下是錯誤的。這是因為我們已經定義了一個叫做hello的非持久化隊列。RabbitMQ不允許重新定義已經存在的隊列(使用不用參數)。這里有一個快速的方法 - 定義一個不同名字的隊列,如task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
這個queueDeclare需要同時更改生產者和消費者的代碼。
現在我們確保了task_queue在RabbitMQ重啟的狀態下也不會丟失。現在我們需要去標記我們的消息為持久化的 - 通過設置MessageProperties(實現了BasicProperties)的常量值:PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
注意消息持久化:
標記了消息為持久化也不能完全保證消息不會丟失。盡管告訴了RabbitMQ將消息保存在磁盤中,RabbitMQ剛剛接收數據,還沒有保存的時候,這個時間區間是無法持久化的。同事,RabbitMQ沒有對每條消息都進行fsync(2) -- 也許僅僅保存在緩存中并沒有真正寫入硬盤。持久化并不健壯,但是對于處理簡單的任務隊列已經足夠了。如果你需要更加強健的保證可以使用publisher confirms。
公平分發
你可能注意到有時候分發還是無法解決我們的某些問題。例如在某種情況下,有兩個消費者,當所有的奇數消息非常大,偶數消息很小,一個消費者將會持續不斷的工作,另一個消費者基本不工作。但是RabbitMQ并不知道這種情況,依然是依次分發。
這是因為RabbitMQ在消息進入隊列是進行分發。并不探查消息的數量。僅僅是發送第n條消息給第n個消費者。
為了解決這個問題,我們可以使用basicQos方法,設置參數為prefetchCount = 1。這會告訴RabbitMQ每次只給一個消費者一條消息。或者說,不要在消費者正在處理和確認消息的時候發送新的消息給他們。相反,它將分發消息給下一個不忙的消費者。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意隊列的大小
如果所有的消費者都處于繁忙狀態,隊列會填滿。可以添加更多的消費者或者其它方案。
Putting it all together
NewTask.java
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = getMessage(argv);channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}private static String getMessage(String[] strings) {if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();} }
Worker.java
import com.rabbitmq.client.*;import java.io.IOException;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(TASK_QUEUE_NAME, false, consumer);}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}} }
?