Work queues 工作隊列模式
模式說明
通過Helloworld工程我們已經能夠構建一個簡單的消息隊列的基本項目,項目中存在幾個角色:生產 者、消費者、隊列,而對于我們真實的開發中 ,對于消息的消費者通過是有多個的。
比如在實現用戶注冊功能時,用戶注冊成功,會給響對應用戶發送郵件,同時給用戶發送手機短信,告訴用戶已成功注冊網站或者app 應用,這種功能在大部分項目開發中都比較常見 ,而對于helloworld 的應用中雖然能夠對 消息進行消費,但是有很大問題: 消息消費者只有一個,當消息量非常大時,單個消費者處理消息就會變得很慢,同時給節點頁帶來很大壓力,導致消息堆積越來越多。對于這種情況,RabbitMQ 提供了工作 隊列模式,通過工作隊列提供做個消費者,對MQ產生的消息進行消費,提高MQ消息的吞吐率,降低消息的處理時間。處理模型圖如下。
實現步驟
生產者
package cn.wolfcode.java.rabbitmq._02worker;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;/*** Created by wolfcode-fanjialong*/public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.142.129");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = "tasequeue";for(int i=0;i<20;i++){channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,(message+i).getBytes("UTF-8"));}System.out.println(" [x] Sent '" + message + "'");}}
}
消費者
package cn.wolfcode.java.rabbitmq._02worker;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.TimeUnit;/*** Created by wolfcode-fanjialong*/public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.142.129");final Connection connection = factory.newConnection;final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}finally {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });}}
簡單問題說明
從結果可以看出消息被平均分配到兩個消費方,來對消息進行處理,提高了消息處理效率,創建多個消費者來對消息進行處理。這里RabitMQ采用輪詢來對消息進行分發時保證了消息被平均分配到每個消費方 。
但是引入新的問題:真正的生產環境下,對于消息的處理基本不會像我們現在看到的這樣,每個消 費方處理的消息數量是平均分配的,比如因為網絡原因,機器cpu ,內存等硬問題,消費方處理消息時 同類消息不同機器進行處理時消耗時間也是不一樣的,比如1號消費者消費1條消息時1秒,2號消費者消費1條消息是5秒,對于1號消費者比2號消費者處理消息快,那么在分配消息時就應該讓1號消費者多收 到消息進行處理,也即是我們通常所說的”能者多勞”,同樣Rabbitmq對于這種消息分配模式提供了支持。
問題: 任務量很大,消息雖然得到了及時的消費,單位時間內消息處理速度加快,提高了吞吐量,可 是不同消費者處理消息的時間不同,導致部分消費者的資源被浪費。
解決:采用消息公平分發。
總結:工作隊列消息輪詢分發消費者收到的消息數量平均分配,單位時間內消息處理速度加快,提高了吞吐量。
工作模式隊列-消息公平分發(fair dispatch)
在案例01中對于消息分發采用的是默認輪詢分發,消息應答采用的自動應答模式,這是因為當消息進 入隊列,RabbitMQ就會分派消息。它不看消費者為應答的數目,只是盲目的將第n條消息發給第n個消費者。
為了解決這個問題,我們使用 basicQos(prefetchCount = 1) 方法,來限RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢后,有了反饋,才會進行第二次發送。執行模型圖如下:
Pub/Sub 訂閱模式
模式說明
在訂閱模型中,多了一個 Exchange 角色,而且過程略有變化:
P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
C:消費者,消息的接收者,會一直等待消息到來
Queue:消息隊列,接收消息、緩存消息
Exchange:交換機(X)。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、
遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
-
Fanout:廣播,將消息交給所有綁定到交換機的隊列
-
Direct:定向,把消息交給符合指定routing key 的隊列
-
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
實現步驟
生產者
package cn.wolfcode.java.rabbitmq._03pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** Created by wolfcode-fanjialong*/
public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "info: Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}
消費者
package cn.wolfcode.java.rabbitmq._03pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** Created by wolfcode-fanjialong*/public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}
Routing 路由模式
模式說明
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個 RoutingKey(路由key)
消息的發送方在向 Exchange 發送消息時,也必須指定消息的 RoutingKey
Exchange 不再把消息交給每一個綁定的隊列,而是根據消息的 Routing Key 進行判斷,只有隊列的 Routingkey 與消息的 Routing key 完全一致,才會接收到消息
圖解:
P:生產者,向 Exchange 發送消息,發送消息時,會指定一個routing key
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給與 routing key 完全匹配的隊列
C1:消費者,其所在隊列指定了需要 routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要 routing key 為 info、error、warning 的消息
實現步驟
生產者
package cn.wolfcode.java.rabbitmq._04rounting;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionF
actory;
/*** Created by wolfcode-fanjialong*/public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String severity = "info";String message = "directMsg";channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");}}
}
消費者
package cn.wolfcode.java.rabbitmq._04rounting;import com.rabbitmq.client.*;/*** Created by wolfcode-fanjialong*/public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "error");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}}
Topic 模式
模式介紹
Topic 類型與 Direct 相比,都是可以根據 RoutingKey 把消息路由到不同的隊列。只不過 Topic 類型
Exchange 可以讓隊列在綁定 Routing key 的時候使用通配符!
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規則:# 匹配一個或多個詞,* 匹配不多不少恰好1個詞,例如:item.# 能夠匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
圖解:
紅色 Queue:綁定的是 usa.# ,因此凡是以 usa. 開頭的 routing key 都會被匹配到
黃色 Queue:綁定的是 #.news ,因此凡是以 .news 結尾的 routing key 都會被匹配
實現步驟
生產者
package cn.wolfcode.java.rabbitmq._05topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** Created by wolfcode-fanjialong*/public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");String routingKey = "order1.save";String message = "topicMsg";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");}}}
消費者
package cn.wolfcode.java.rabbitmq._05topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** Created by wolfcode-fanjialong*/public class ReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "order.*");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}