學成在線--13.RabbitMQ工作模式

文章目錄

  • 一.Work queues
  • 二.Publish/subscribe
    • 1.工作模式
    • 2.代碼
      • 1)生產者
      • 2)消費者
    • 3.測試
    • 4.思考
  • 三.Routing
    • 1.工作模式
    • 2.代碼
      • 1)生產者
      • 2)消費者
    • 3.測試
    • 4.思考
  • 四.Topics
    • 1.工作模式
    • 2.代碼
      • 1)生產者
      • 2)消費者
    • 3.測試
    • 4.思考
  • 五.Header模式
    • 1.生產者
    • 2.消費者
  • 六.RPC

RabbitMQ有以下幾種工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC

一.Work queues

work queues與入門程序HelloWord相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。

應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
在這里插入圖片描述

測試:
1、使用入門程序HelloWord,啟動多個消費者。
2、生產者發送多個消息。

結果:
1、一條消息只會被一個消費者接收;
2、rabbit采用輪詢的方式將消息是平均發送給消費者的;
3、消費者在處理完某條消息后,才會收到下一條消息。

二.Publish/subscribe

1.工作模式

發布訂閱模式:
1、每個消費者監聽自己的隊列。
2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收
到消息
在這里插入圖片描述

2.代碼

案例:
當用戶充值成功或轉賬完成系統通知用戶,通知方式有短信、郵件多種方法 。

1)生產者

聲明Exchange_fanout_inform交換機。
聲明兩個隊列并且綁定到此交換機,綁定時不需要指定routingkey
發送消息時不需要指定routingkey

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer02_publish {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊列,如果隊列在mq 中沒有則要創建//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing	工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");//發送消息//參數:String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*/for(int i=0;i<5;i++){//消息內容String message = "send inform message to user";channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//關閉連接//先關閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2)消費者

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02_subscribe_email {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing	工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag  消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

3.測試

打開RabbitMQ的管理界面,觀察交換機綁定情況:
使用生產者發送若干條消息,每條消息都轉發到各個隊列,并且每個消費者都接收到了消息。
在這里插入圖片描述

4.思考

1、publish/subscribe與work queues有什么異同。
區別:
1)work queues不用定義交換機,而publish/subscribe需要定義交換機。
2)publish/subscribe的生產方是面向交換機發送消息,work queues的生產方是面向隊列發送消息(底層使用默認交換機)。
3)publish/subscribe需要設置隊列和交換機的綁定,work queues不需要設置,實質上work queues會將隊列綁定到默認的交換機 。
相同點:
所以兩者實現的發布/訂閱的效果是一樣的,多個消費端監聽同一個隊列不會重復消費消息

2、實質工作用什么 publish/subscribe還是work queues。
建議使用 publish/subscribe,發布訂閱模式比工作隊列模式更強大,并且發布訂閱模式可以指定自己專用的交換

三.Routing

1.工作模式

路由模式:
1、每個消費者監聽自己的隊列,并且設置routingkey。
2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。
在這里插入圖片描述

2.代碼

聲明exchange_routing_inform交換機。
聲明兩個隊列并且綁定到此交換機,綁定時需要指定routingkey
發送消息時需要指定routingkey

1)生產者

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer03_routing {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";private static final String ROUTINGKEY_SMS="inform_sms";public static void main(String[] args) {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊列,如果隊列在mq 中沒有則要創建//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing	工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");//發送消息//參數:String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*/for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//關閉連接//先關閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2)消費者

消費者一:郵件

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer03_routing_email {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing	工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag  消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

消費者二:短信

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer03_routing_sms {//隊列名稱private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_SMS="inform_sms";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing	工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag  消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);}
}

3.測試

使用生產者發送若干條消息,交換機根據routingkey轉發消息到指定的隊列
在這里插入圖片描述

4.思考

1、Routing模式和Publish/subscibe有什么區別?
Routing模式要求隊列在綁定交換機時要指定routingkey,消息會轉發到符合routingkey的隊列。

四.Topics

1.工作模式

路由模式:
1、每個消費者監聽自己的隊列,并且設置帶統配符的routingkey。
2、生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列。
在這里插入圖片描述

2.代碼

案例:
根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種
通知類型都接收的則兩種通知都有效。

1)生產者

聲明交換機,指定topic類型

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer04_topics {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";private static final String ROUTINGKEY_SMS="inform.#.sms.#";public static void main(String[] args) {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊列,如果隊列在mq 中沒有則要創建//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing	工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);//發送消息//參數:String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*/for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send sms and email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//關閉連接//先關閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2)消費者

消費者一:email

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer04_topics_email {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing	工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag  消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

消費者二:sms

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer04_topics_sms {//隊列名稱private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_SMS="inform.#.sms.#";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing	工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag  消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);}
}

3.測試

使用生產者發送若干條消息,交換機根據routingkey統配符匹配并轉發消息到指定的隊列。
在這里插入圖片描述

4.思考

1、本案例的需求使用Routing工作模式能否實現?
使用Routing模式也可以實現本案例,共設置三個 routingkey,分別是email、sms、all,email隊列綁定email和all,sms隊列綁定sms和all,這樣就可以實現上邊案例的功能,實現過程比topics復雜。
Topic模式更多加強大,它可以實現Routing、publish/subscirbe模式的功能。

五.Header模式

header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊列。
案例:
根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種
通知類型都接收的則兩種通知都有效。

1.生產者

隊列與交換機綁定的代碼與之前不同,如下:

Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消費者綁定的header
//headers.put("inform_type", "sms");//匹配sms通知消費者綁定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

2.消費者

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交換機和隊列綁定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消費隊列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

六.RPC

RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的異步調用,基于Direct交換機實現,流程如下:
1、客戶端即是生產者就是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。
2、服務端監聽RPC請求隊列的消息,收到消息后執行服務端的方法,得到方法返回的結果
3、服務端將RPC方法的結果發送到RPC響應隊列
4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/452219.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/452219.shtml
英文地址,請注明出處:http://en.pswp.cn/news/452219.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《C++字符串完全指南——第一部分:win32 字符編碼》

《C字符串完全指南--第一部分:win32 字符編碼》 原作者:Michael Dun 譯 者:Dingqiao Wang 引言 毫無疑問&#xff0c;你肯定見過像TCHAR, std::string, BSTR等等這類字符串類型.也包括一些以_tcs開頭的奇怪的宏。也許你正盯著屏幕"哇哇"的發愁&#xff0c;然…

Spring、Spring MVC、MyBatis整合文件配置詳解

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 web.xml的配置 web.xml應該是整個項目最重要的配置文件了&#xff0c;不過servlet3.0中已經支持注解配置方式了。在servlet3.0以前每…

19.C++-(=)賦值操作符、初步編寫智能指針

()賦值操作符 編譯器為每個類默認重載了()賦值操作符默認的()賦值操作符僅完成淺拷貝默認的賦值操作符和默認的拷貝構造函數有相同的存在意義()賦值操作符注意事項 首先要判斷兩個操作數是否相等 返回值一定是 return *this; 返回類型是Type&型,避免連續使用后,出現bug 比如…

windows mysqldump 不成功 1049 1064 報錯

1064 路徑不對&#xff0c;需要cd選到mysql bin目錄下 1049 在cmd里面不需要分號 以下是正確的 E:\phpStudy\PHPTutorial\MySQL\bin>mysqldump -uroot -proot db >db.sql 轉載于:https://www.cnblogs.com/JANCHAN/p/9227388.html

學成在線--14.使用RabbitMQ完成頁面發布

文章目錄一.技術方案二.頁面發布——消費方1.需求分析2.創建Cms Client工程1&#xff09;創建maven工程2&#xff09;配置文件3&#xff09;啟動類3.RabbitmqConfig配置類4.定義消息格式5.PageDao1&#xff09;使用CmsPageRepository 查詢頁面信息2&#xff09;使用CmsSiteRepo…

對象模型中類與類間的關系

類與類之間通常有關聯、聚集、泛化(繼承)、依賴和細化4種關系 1.關聯 關聯表示兩個類的對象之間存在某種語義上的聯系。 (1) 普通關聯 只要在類與類之間存在連接關系就可以用普通關聯表示。普通關聯的圖示符號是連接兩個類之間的直線&#xff0c;如下圖所示。關聯…

記憶講師石偉華微信公眾號2017所有文章匯總(待更新)

17-10-24-不勝光榮的記憶 17-10-26-每日一個超長英文單詞&#xff08;2&#xff09; 17-10-27-每日一個超長英文單詞&#xff08;3&#xff09; 17-10-28-每日一個超長英文單詞&#xff08;4&#xff09; 轉載于:https://www.cnblogs.com/bakblog/p/9228096.html

Log4J日志配置詳解

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 一、Log4j簡介 Log4j有三個主要的組件&#xff1a;Loggers(記錄器)&#xff0c;Appenders (輸出源)和Layouts(布局)。這里可簡單理解為日…

中文編碼雜談

編碼問題的例子 在windows自帶的notepad&#xff08;記事本&#xff09;程序中輸入“聯通”兩個字&#xff0c;保存后再次打開&#xff0c;會發現“聯通”不見了&#xff0c;代之以“”的亂碼。這是windows平臺上典型的中文編碼問題。即文件保存的時候是按照ANSI編碼&#xff…

Java NIO (十四)NIO 和 IO 的區別和適用場景分析

在研究Java NIO和IO API時&#xff0c;很快就會想到一個問題&#xff1a; 什么時候應該使用IO&#xff0c;什么時候應該使用NIO&#xff1f; 在本文中&#xff0c;我將嘗試闡明Java NIO和IO之間的區別&#xff0c;它們的用例以及它們如何影響代碼的設計。 ###Java NIO和IO之間的…

面向對象三種模型之間的關系

功能模型指明了系統應該“做什么”&#xff1b;動態模型明確規定了什么時候(即在何種狀態下接受了什么事件的觸發)做&#xff1b;對象模型則定義了做事情的實體。在面向對象方法學中&#xff0c;對象模型是最基本最重要的&#xff0c;它為其他兩種模型奠定了基礎&#xff0c;人…

android node

pkg install nodejs-current轉載于:https://www.cnblogs.com/insight0912/p/9231342.html

springmvc 中@Controller和@RestController的區別

1.Controller, RestController的共同點 都是用來表示Spring某個類的是否可以接收HTTP請求 2.Controller, RestController的不同點 Controller標識一個Spring類是Spring MVC controller處理器 RestController&#xff1a; a convenience annotation that does nothing more …

easyUI 日期控件修改...

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 個人覺得easyUI挺好用的。 它的中文文檔地址&#xff1a; http://www.zi-han.net/case/easyui/ 日期本來效果是這樣的&#xff1a; 改…

面向對象分析的三個模型與5個層次

在面向對象分析中&#xff0c;主要由對象模型、動態模型和功能模型組成。對象模型是最基本、最重要、最核心的。 面向對象建模得到的模型包含系統的3個要素&#xff0c;即靜態結構(對象模型)、交互次序(動態模型)和數據變換(功能模型)。解決的問題不同&#xff0c;這3個子模型…

學成在線--15.課程計劃查詢

文章目錄一.需求分析二.頁面原型1.tree組件介紹2.webstorm配置jsx三.API接口1.數據模型2.自定義模型類3.接口定義四.sql語句五.服務器端1.Dao1&#xff09;Mapper接口2&#xff09;Mapper映射文件2.Service3.Controller4.測試六.前端1.Api方法2.Api調用1&#xff09;定義查詢課…

團隊作業-項目答辯

1. 王書磊 1600802063 http://www.cnblogs.com/wsl-1117/ 劉令斌 1600802017 http://www.cnblogs.com/liulingbin/ 許浩然 1600802066 https://www.cnblogs.com/xuhaoran1/ 成明龍 1600802038 http://www.cnblogs.com/CMLCML/ 2這是我們的效果圖. 3.&#xff08;1&#xff09;修…

Java構造和解析Json數據的兩種方法詳解一

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 在www.json.org上公布了很多JAVA下的json構造和解析工具&#xff0c;其中org.json和json-lib比較簡單&#xff0c;兩者使用上差不多但還是…

面向對象方法開發的方法

面向對象分析首要的工作&#xff0c;是建立問題域的對象模型。 這個模型描述了現實世界中的“類與對象”以及它們之間的關系&#xff0c;表示了目標系統的靜態數據結構。靜態數據結構對應用細節依賴較少&#xff0c;比較容易確定。因此&#xff0c;用面向對象方法開發絕大多數…

程序員編程需要多少個小時?

Michael Arrington曾發表一篇博文說&#xff0c;創業者必須加倍的努力工作&#xff0c;甚至不惜趴在辦公桌上睡覺&#xff0c;這樣才能成功。對此&#xff0c;我并不贊同其觀點&#xff0c;我看了很多評論都是關于這樣工作會適得其反&#xff0c;不但沒有獲得成功&#xff0c;相…