【譯】RabbitMQ教程一
- 主要通過Hello Word對RabbitMQ有初步認識
【譯】RabbitMQ教程二
- 工作隊列,即一個生產者對多個消費者
- 循環分發、消息確認、消息持久、公平分發
【譯】RabbitMQ教程三
- 如何同一個消息同時發給多個消費者
- 開始引入RabbitMQ消息模型中的重要概念路由器Exchange以及綁定等
- 使用了fanout類型的路由器
【譯】RabbitMQ教程四
- 如何選擇性地接收消息
- 使用了direct路由器
【譯】RabbitMQ教程五
- 如何通過多重標準接收消息
- 使用了topic路由器,可通過靈活的路由鍵和綁定鍵的設置,
進一步增強消息選擇的靈活性
【譯】RabbitMQ教程六
- 如何使用RabbitMQ實現一個簡單的RPC系統
- 回調隊列
callback queue
和關聯標識correlation id
各教程代碼
- 官方:GitHub rabbitmq-tutorials
或 - 我整理的:rabbitmq-tutorial-java
RabbitMQ 一般工作流程
生產者和RabbitMQ服務器建立連接和通道,聲明路由器,同時為消息設置路由鍵,這樣,所有的消息就會以特定的路由鍵發給路由器,具體路由器會發送到哪個或哪幾個隊列,生產者在大部分場景中都不知道。(1個路由器,但不同的消息可以有不同的路由鍵)。
消費者和RabbitMQ服務器建立連接和通道,然后聲明隊列,聲明路由器,然后通過設置綁定鍵(或叫路由鍵)為隊列和路由器指定綁定關系,這樣,消費者就可以根據綁定鍵的設置來接收消息。(1個路由器,1個隊列,但不同的消費者可以設置不同的綁定關系)。
主要方法
- 聲明隊列(創建隊列):可以生產者和消費者都聲明,也可以消費者聲明生產者不聲明,也可以生產者聲明而消費者不聲明。最好是都聲明。(生產者未聲明,消費者聲明這種情況如果生產者先啟動,會出現消息丟失的情況,因為隊列未創建)
channel.queueDeclare(String queue, //隊列的名字boolean durable, //該隊列是否持久化(即是否保存到磁盤中)boolean exclusive,//該隊列是否為該通道獨占的,即其他通道是否可以消費該隊列boolean autoDelete,//該隊列不再使用的時候,是否讓RabbitMQ服務器自動刪除掉Map<String, Object> arguments)//其他參數
- 聲明路由器(創建路由器):生產者、消費者都要聲明路由器---如果聲明了隊列,可以不聲明路由器。
channel.exchangeDeclare(String exchange,//路由器的名字String type,//路由器的類型:topic、direct、fanout、headerboolean durable,//是否持久化該路由器boolean autoDelete,//是否自動刪除該路由器boolean internal,//是否是內部使用的,true的話客戶端不能使用該路由器Map<String, Object> arguments) //其他參數
-
綁定隊列和路由器:只用在消費者
channel.queueBind(String queue, //隊列String exchange, //路由器String routingKey, //路由鍵,即綁定鍵Map<String, Object> arguments) //其他綁定參數
-
發布消息:只用在生產者
channel.basicPublish(String exchange, //路由器的名字,即將消息發到哪個路由器String routingKey, //路由鍵,即發布消息時,該消息的路由鍵是什么BasicProperties props, //指定消息的基本屬性byte[] body)//消息體,也就是消息的內容,是字節數組
BasicProperties props
:指定消息的基本屬性,如deliveryMode
為2時表示消息持久,2以外的值表示不持久化消息//BasicProperties介紹 String corrId = ""; String replyQueueName = ""; Integer deliveryMode = 2; String contentType = "application/json"; AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).deliveryMode(deliveryMode).contentType(contentType).build();
- 接收消息:只用在消費者
channel.basicConsume(String queue, //隊列名字,即要從哪個隊列中接收消息boolean autoAck, //是否自動確認,默認trueConsumer callback)//消費者,即誰接收消息
- 消費者中一般會有回調方法來消費消息
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, //該消費者的標簽Envelope envelope,//字面意思為信封:packaging data for the messageAMQP.BasicProperties properties, //message content header data byte[] body) //message bodythrows IOException {//獲取消息示例String message = new String(body, "UTF-8");//接下來就可以根據消息處理一些事情}};
- 消費者中一般會有回調方法來消費消息
路由器類型
- fanout:會忽視綁定鍵,每個消費者都可以接受到所有的消息(前提是每個消費者都要有各自單獨的隊列,而不是共有同一隊列)。
- direct:只有綁定鍵和路由鍵完全匹配時,才可以接受到消息。
- topic:可以設置多個關鍵詞作為路由鍵,在綁定鍵中可以使用
*
和#
來匹配 - headers:(可以忽視它的存在)
教程一 HelloWorld
看主要代碼
//生產者
channel.queueDeclare(QUEUE_NAME, false, false, false, null); ----①
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
//消費者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, consumer);
這里,生產者和消費者都沒有聲明路由器,而是聲明了同名的隊列。生產者發布消息時,使用了默認的無名路由器(""
),并以隊列的名字作為了路由鍵。消費者在消費時,由于沒有聲明路由器,這并不表示沒有路由器的存在,消費者此時使用的是默認的路由器,即Default exchange,該路由器和所有的隊列都進行綁定,并且使用隊列的名字作為了路由鍵進行綁定。所以,生產者使用默認路由器以隊列的名字作為了綁定鍵進行了消息發布,而消費者也使用了默認的路由器,并以隊列的名字作為綁定鍵進行了綁定。而默認路由器是direct類型,路由鍵和綁定鍵完全匹配時,消費者才能接受到消息,所以教程1中的消費者可以接收到消息。(為了認證這一點,可以將代碼①去掉,然后先運行消費者,讓它等待監聽,然后啟動生產者,發送消息,消費者同樣會收到消息。這里的生產者聲明隊列,只是讓RabbitMQ服務器先創建這個隊列,以免發送的消息因為找不到隊列而丟失。)
教程二 Work Queues
看主要代碼
//生產者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "1.";
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//消費者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);---①...channel.basicAck(envelope.getDeliveryTag(), false);...---③
boolean autoAck = false;---②
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
這里也使用了默認的direct路由器。假如啟動多個工作者(消費者),按道理這些工作者應該可以接收到所有的消息啊,但是不要忘了這幾個工作者都是從同一個隊列中取消息,消息取出一個,隊列中就少一個,所以每個工作者都只是收到的消息的一部分。既然這幾個工作者都從同一個隊列中取消息,那每個工作者應該怎么取呢?
如果沒有代碼①,并且②設置為true,即自動確認收到消息,RabbitMQ只要發出消息就認為消費者收到了,此時RabbitMQ采取的是循環分發的策略,在這幾個工作者中循環輪流分發消息。每個工作者接受到的消息數量都是相同的。
如果有代碼①,并且②設置為false,則RabbitMQ會采取公平分發策略,即將消息發給空閑的工作者(空閑,工作者將消息處理完畢,執行了代碼③;不空閑,即工作者還在處理消息,還沒有給RabbitMQ發回確認信息,即還沒有執行代碼③)。
代碼①中的參數1:(prefetchCount)maximum number of messages that the server will deliver
。
為了防止隊列丟失,在聲明隊列的時候指定了durable
為true
。為了防止消息丟失,設置了消息屬性BasicProperties
為MessageProperties.PERSISTENT_TEXT_PLAIN
,讓我們看看值是什么:

可以看出里面包含了deliveryMode=2
。從這張圖也可以看到BasicProperties
屬性的全貌。
如果想讓多個消費者共同消費某些消息,只要讓他們共用同一隊列即可(當然前提是你得保證消息可以都進到這個隊列中來,如本例中使用direct
路由器,消息的路由鍵和隊列的綁定鍵設為一致,當然也可以使用fanout
路由器,路由鍵和綁定鍵隨意設置,不一致也能收到,因為fanout
路由器會忽略路由鍵的設置)。
教程三 Publish/Subscribe
看主要代碼
//生產者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));//消費者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();---①
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicConsume(queueName, true, consumer);
教程三才引出路由器的概念。生產者和消費者聲明了同樣的路由,并指明路由類型為fanout
,該路由器會忽視路由鍵,將消息發布到所有綁定的隊列中(仍需要綁定,只是綁定時綁定鍵任意就行了)。
假如啟動多個消費者,因為代碼①中調用無參的聲明去惡劣方法channel.queueDeclare()
,就會創建了一個非持久、獨特的、自動刪除的隊列,并返回一個自動生成的名字。所以多個消費者取消息時使用的是各自的隊列,不會存在多個消費者從同一個隊列取消息的情況。
這樣多個消費者就可以接收到同一消息。
如果想實現多個消費者都可以接收到所有的消息,只要讓他們各自使用單獨的隊列即可(當然前提是保證路由鍵和綁定鍵的設置可以讓消息都進入到隊列,如本例中使用fanout
路由器,無需考慮綁定鍵和路由鍵)。
教程4 Routing
看主要代碼:
//生產者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
//消費者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
String[] severities = {"info", "warning", "error"};
for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
channel.basicConsume(queueName, true, consumer);
可以看出,教程3使用了direct
路由器,該路由器的特點是可以設定路由鍵和綁定鍵,消費者只能從隊列中取出兩者匹配的消息。
在生產者發消息時,為消息設置不同的路由鍵(如例子中severity
可以設為info
、warn
、error
)。
消費者在通過為隊列設置多個綁定關系,來選擇想要接收的消息。
這里有一個概念叫做多重綁定,即多個隊列以相同的綁定鍵binding key綁定到同一個路由器上,此時direct
路由器就會像fanout路由器一樣,將消息廣播給所有符合路由規則的隊列。
教程5 Topics
看主要代碼:
//生產者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = "";
String message = "msg...";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
//消費者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
String bingingKeys[] = {""};for (String bindingKey : bingingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}
channel.basicConsume(queueName, true, consumer);
這里使用了topic
路由器,它與direct
路由器類似,不同在于,topic
路由器可以為路由鍵設置多重標準。一個消息有一個路由鍵,direct
路由器只能為路由鍵指定一個關鍵字,但是topic
路由器可以在路由鍵中通過點號分割多個單詞來組成路由鍵,消費者在綁定的時候,可以設置多重標準來選擇接受。
舉個例子:假如日志根據嚴重級別info
、warn
、error
,也可以根據來源分為cron
、kern
、auth
。某個日志消息設置路由鍵為kern.info
,表示來自kern
的info
級別的日志。想要選擇接收消息的時候,direct
路由器就辦不到,它要么可以根據嚴重級別來篩選,要么根據來源來篩選,而topic
路由器則可以輕松應對,只要將綁定鍵設置為kern.info
就可以精準獲取該類型的日志。