目錄
一、Simple(簡單模式)
1.1 概念
1.2 代碼實現
消費者
運行結果
二、Work Queue(工作隊列)
2.1 概念
1.2 代碼實現
生產者
消費者
?運行結果
三、Publish/Subscribe(發布/訂閱模式)
3.1 概念
3.2 代碼實現
生產者
消費者
運行結果
四、Routing(路由模式)
4.1 概念
4.2 代碼實現
Constants類
生產者
消費者
運行結果
五、Topics(通配符模式)
5.1 概念
5.2 代碼實現
生產者
消費者
運行結果
六、RPC(RPC通信)了解
6.1 概念
6.2 代碼實現
客戶端代碼編寫
編寫服務器代碼
運行結果:
七、Publish Confirms(發布確認模式)???????
publishing Messages Individually(單獨確認)
Publishing Messages in Batches(批量確認)
Handling Publisher Confirms Asynchronously(異步確認)
一、Simple(簡單模式)
1.1 概念
P:?生產者,也就是要發送消息的程序
C:?消費者,消息的接受者
Queue:消息隊列,圖中Queue類似提個郵箱,可以緩存消息;生產者向其中投遞消息,消費者從中取出消息。
特點:一個生產者P,一個消費者C,消息只能被消費一次,也稱為點對點(Point-to-Point)模式
適用場景:消息只能被單個消費者處理
1.2 代碼實現
消費者
package rabbitmq.simple;import com.rabbitmq.client.*;
import java.io.IOException;public class ConsumerDemo {public static void main(String[] args) throws Exception {//1.創建連接// 創建一個ConnectionFactory實例來配置RabbitMQ連接ConnectionFactory connectionFactory = new ConnectionFactory();// 設置RabbitMQ服務器的主機地址connectionFactory.setHost("8.136.108.248");// 設置RabbitMQ服務器的端口號connectionFactory.setPort(5672);// 設置登錄RabbitMQ服務器的用戶名connectionFactory.setUsername("pinkboy");// 設置登錄RabbitMQ服務器的密碼connectionFactory.setPassword("123456");// 設置RabbitMQ服務器的虛擬主機connectionFactory.setVirtualHost("/");// 使用ConnectionFactory創建一個新的連接Connection connection = connectionFactory.newConnection();//2.創建ChannelChannel channel = connection.createChannel();/***3.聲明一個隊列** @param channel RabbitMQ的通道,用于執行隊列操作** 此處使用了queueDeclare方法來聲明一個名為"hello"的隊列該方法的參數分別表示:* 1. 隊列名稱("hello"):指定要聲明的隊列的名稱* 2. true:表示該隊列是持久化的,意味著即使RabbitMQ服務重啟,隊列也會被保留* 3. false:表示該隊列不是排他的,意味著該隊列可以被所有通道共享* 4. false:表示該隊列不會在使用后自動刪除,需要手動刪除* 5. null:表示不設置額外的參數** 選擇這些參數值的原因可能是希望創建一個持久化的、共享的隊列,以便在不同的時間點和不同的消費者之間傳遞消息*/channel.queueDeclare("hello", true, false, false, null);// 4.開始從名為"hello"的隊列中消費消息channel.basicConsume("hello", true, new DefaultConsumer(channel) {/*** 處理接收到的消息** @param consumerTag 消費者標簽,用于標識消費者* @param envelope 包含消息路由信息的信封* @param properties 消息的屬性,如內容類型、內容編碼等* @param body 消息的主體內容,以字節數組形式表示* @throws IOException 如果處理消息時發生I/O錯誤*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});Thread.sleep(2000);//5.關閉資源channel.close();connection.close();}
}
運行結果
啟動生產者代碼:
觀察消息隊列
?啟動消費者代碼:
觀察消息隊列?
二、Work Queue(工作隊列)
2.1 概念
一個生產者P,多個消費者C1,C2 在多個消息的情況下,WorkQueue會將消息分派給不同的消費者,每個消費者都會接收到不同的消息
特點:消息不會重復,分配各不同的消費者
適用場景:集群環境中做異步處理
舉個例子:12306短息通知服務,訂票成功后,訂單消息會發送到RabbitMQ,短信服務從RabbitMQ中獲取訂單信息,并發送通知信息(在短信服務之間進行任務分配)
1.2 代碼實現
工作模式就是簡單模式的增強版 和簡單模式的區別就是 簡單模式就一個消費者,工作模式支持多個消費者接收消息,消費者之間是竟爭關系 每個消息只能被一個消費者接收
和簡單模式代碼差不多 為了展示多個消費者競爭的關系 生產者一次生產10條消息
常量類
package rabbitmq.constant;public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//工作隊列模式public static final String WORK_QUEUE = "work_queue";}
生產者
package rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws Exception {//1、建立連接// 創建一個ConnectionFactory實例來配置RabbitMQ連接ConnectionFactory connectionFactory = new ConnectionFactory();// 設置RabbitMQ服務器的主機地址connectionFactory.setHost(Constants.HOST);// 設置RabbitMQ服務器的端口號connectionFactory.setPort(Constants.PORT);// 設置登錄RabbitMQ服務器的用戶名connectionFactory.setUsername(Constants.USER_NAME);// 設置登錄RabbitMQ服務器的密碼connectionFactory.setPassword(Constants.PASSWORD);// 設置RabbitMQ服務器的虛擬主機connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory創建一個新的連接Connection connection = connectionFactory.newConnection();//2、創建通道Channel channel = connection.createChannel();//3、聲明交換機//4、聲明隊列/*** 聲明一個隊列** @param channel RabbitMQ的通道,用于執行隊列操作** 此處使用了queueDeclare方法來聲明一個名為"hello"的隊列該方法的參數分別表示:* 1. 隊列名稱("hello"):指定要聲明的隊列的名稱* 2. true:表示該隊列是持久化的,意味著即使RabbitMQ服務重啟,隊列也會被保留* 3. false:表示該隊列不是排他的,意味著該隊列可以被所有通道共享* 4. false:表示該隊列不會在使用后自動刪除,需要手動刪除* 5. null:表示不設置額外的參數** 選擇這些參數值的原因可能是希望創建一個持久化的、共享的隊列,以便在不同的時間點和不同的消費者之間傳遞消息*/channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//5、發送消息// 循環發送消息到 RabbitMQ 的 "hello" 隊列中for (int i = 0; i < 10; i++) {// 構造消息內容String msg = "hello work queue ..." + i;/*** 參數1 表示交換機名稱,因為使用默認交換機,所以為空字符串* 參數2 表示隊列名稱* 參數3 :消息的屬性* 參數4:消息內容*/channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息發送成功!");//6、釋放資源channel.close();connection.close();}
}
消費者
兩個消費者的代碼是一樣的
消費者1
package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立連接// 創建一個ConnectionFactory實例來配置RabbitMQ連接ConnectionFactory connectionFactory = new ConnectionFactory();// 設置RabbitMQ服務器的主機地址connectionFactory.setHost(Constants.HOST);// 設置RabbitMQ服務器的端口號connectionFactory.setPort(Constants.PORT);// 設置登錄RabbitMQ服務器的用戶名connectionFactory.setUsername(Constants.USER_NAME);// 設置登錄RabbitMQ服務器的密碼connectionFactory.setPassword(Constants.PASSWORD);// 設置RabbitMQ服務器的虛擬主機connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory創建一個新的連接Connection connection = connectionFactory.newConnection();//2.創建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);channel.basicConsume(Constants.WORK_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 啟動成功!");//5.關閉資源
// channel.close();
// connection.close();}
}
消費者2
package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立連接// 創建一個ConnectionFactory實例來配置RabbitMQ連接ConnectionFactory connectionFactory = new ConnectionFactory();// 設置RabbitMQ服務器的主機地址connectionFactory.setHost(Constants.HOST);// 設置RabbitMQ服務器的端口號connectionFactory.setPort(Constants.PORT);// 設置登錄RabbitMQ服務器的用戶名connectionFactory.setUsername(Constants.USER_NAME);// 設置登錄RabbitMQ服務器的密碼connectionFactory.setPassword(Constants.PASSWORD);// 設置RabbitMQ服務器的虛擬主機connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory創建一個新的連接Connection connection = connectionFactory.newConnection();//2.創建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);channel.basicConsume(Constants.WORK_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});System.out.println("Consumer2 啟動成功!");//5.關閉資源
// channel.close();
// connection.close();}
}
?運行結果
生產者
生產者消息隊列
?
消費者
為了避免第一個啟動的消費者會將10條消息消費掉 需要先啟動兩個消費者,再去啟動生產者
消費者1
消費者2
觀察消息隊列?
可以看到管理頁面中有兩個消費者被顯示
三、Publish/Subscribe(發布/訂閱模式)
3.1 概念
生產者發送一條消息,經過交換機轉發到多個不同的隊列,多個不同的隊列就有多個不同的消費者
適合場景:消息需要被多個消費者同時接收的場景.如:實時通知或者廣播消息
3.2 代碼實現
常量類
public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//發布訂閱模式public static final String FANOUT_EXCHANGE = "fanout_exchange";public static final String FANOUT_QUEUE1 = "fanout_queue1";public static final String FANOUT_QUEUE2 = "fanout_queue2";
}
這個模式需要創建交換機,并綁定隊列和交換機?
//3、聲明交換機 /*** 參數1:交換機名稱* 參數2:交換機類型 Fanout類型 -> 廣播機制* 參數3:是否持久化*/ channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
聲明隊列
//4、聲明隊列 /*** 參數1:隊列名稱* 參數2:是否持久化* 參數3:是否獨占隊列,該隊列只允許在該連接中訪問,如果連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 參數4:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除* 參數5:其他參數*/ channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null); channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
綁定隊列和交換機
//5、交換機和隊列綁定 /*** 參數1:隊列名稱* 參數2:交換機名稱* 參數3:路由鍵,綁定規則 如果交換機類型為fanout類型,routingKey設置為空字符串*/ channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, ""); channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
生產者
package rabbitmq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;public class Producer {public static void main(String[] args) throws Exception {//1、建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、打開信道Channel channel = connection.createChannel();//3、聲明交換機/*** 參數1:交換機名稱* 參數2:交換機類型 Fanout類型 -> 廣播機制* 參數3:是否持久化*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//4、聲明隊列/*** 參數1:隊列名稱* 參數2:是否持久化* 參數3:是否獨占隊列,該隊列只允許在該連接中訪問,如果連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 參數4:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除* 參數5:其他參數*/channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//5、交換機和隊列綁定/*** 參數1:隊列名稱* 參數2:交換機名稱* 參數3:路由鍵,綁定規則 如果交換機類型為fanout類型,routingKey設置為空字符串*/channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");//6、發布消息String msg = "hello fanout ...";channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());System.out.println("消息發送成功!");//7、釋放資源channel.close();connection.close();}
}
消費者
消費者1
package rabbitmq.fanout;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、創建信道Channel channel = connection.createChannel();//3、聲明隊列channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);//4、消費消息channel.basicConsume(Constants.FANOUT_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 啟動成功!");}
}
消費者2
package rabbitmq.fanout;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、創建信道Channel channel = connection.createChannel();//3、聲明隊列channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//4、消費消息channel.basicConsume(Constants.FANOUT_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});System.out.println("Consumer2 啟動成功!");}
}
運行結果
啟動生產者
觀察消息隊列
fanout_queue1 和 fanout_queue2 分別有了1條消息
?Exchange中多了隊列的綁定關系
啟動兩個消費者
?
觀察消息隊列
四、Routing(路由模式)
4.1 概念
路由模式是發布訂閱模式的變種,在發布訂閱基礎上,增加路由key
發布訂閱模式是無條件的將所有消息分發給所有的消費者,路由模式是Exchange根據RoutingKey的規則,將數據篩選后發給對應的消費者隊列
適合場景:需要根據特定規則分發消息的場景
?
?
隊列和交換機的綁定,不能是任意的綁定了,而是要制定了一個BindKey(RoutingKey的一種)消息的發送方在向Exchange發送消息時也需要指定消息的RoutingKey
Exchange也不再把消息交給每一個綁定的key,而是根據消息的RountingKey進行判斷,只要隊列的BindingKey和發送消息的RoutingKey完全一致,才會接收到消息
創建交換機,定義交換機類型為BuiltinExchangeType.DIRECT
4.2 代碼實現
Constants類
public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";
}
生產者
package rabbitmq.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;public class Producer {public static void main(String[] args) throws Exception {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2. 創建信道Channel channel = connection.createChannel();//3. 聲明交換機/*** 參數1:交換機名稱* 參數2:交換機類型* 參數3:是否持久化* 參數4:是否自動刪除* 參數5:其他參數*/channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, "direct", true, false, null);//4. 聲明隊列/*** 參數1:隊列名稱* 參數2:是否持久化* 參數3:是否獨占隊列,該隊列只允許在該連接中訪問,如果連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 參數4:是否自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和參數2設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 參數5:其他參數*/channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5、交換機和隊列綁定/*** 參數1:隊列名稱* 參數2:交換機名稱* 參數3:路由鍵,綁定規則*/channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6、發布消息String msg_a = "hello direct my routingKey is a...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg_a.getBytes());String msg_b = "hello direct my routingKey is b...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());String msg_c = "hello direct my routingKey is c...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());System.out.println("消息發送成功!");//7、釋放資源channel.close();connection.close();}
}
消費者
package rabbitmq.direct;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、創建信道Channel channel = connection.createChannel();//3、聲明隊列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//4、消費消息channel.basicConsume(Constants.DIRECT_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 啟動成功!");}
}
package rabbitmq.direct;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、創建信道Channel channel = connection.createChannel();//3、聲明隊列channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//4、消費消息channel.basicConsume(Constants.DIRECT_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 啟動成功!");}
}
運行結果
啟動生產者
觀察消息隊列界面
exchange下隊列和Routing Key的綁定關系
啟動消費者
觀察消息隊列界面
五、Topics(通配符模式)
5.1 概念
在路由模式上進行了升級,在routingKey的基礎上,增加可通配符的功能,適之更加靈活
Topic和Routing的基本原同,即:生產者將消息發送給交換機,交換機根據RoutingKey將消息轉發給RoutingKey匹配的隊列,類似于正則表達式的方式來定義RoutingKey的模式.?
適合場景:需要靈活匹配和過濾消息的場景
Topic和Routing模式的區別:
1、topic模式使用的交換機類型為topic(Rounting模式使用的交換機類型為direct)
2、topic類型的交換機在匹配規則上進行了擴展,BingingKey支持通配符匹配(direct類型的將換季路由規則是BingKey和RoutingKey完全匹配)
在topic類型的交換機在匹配規則上有些要求:
1.RoutingKey是一系列由點(.)分隔的單詞,比如“stock.usd.nyse”,"nyse.vmw","quick.organge.rabbit"
2. BingdingKey和RountingKey,也是點(.)分隔的字符串
3. BingdingKey中可以存在兩種特殊的字符串,用于模糊匹配
? * 表示一個單詞
? # 表示0-N個單詞
舉個例子:
5.2 代碼實現
Constants類
public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";}
生產者
創建交換機類型為BuiltinExchangeType.TOPIC
?//3.聲明交換機
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);
?
聲明隊列
//4.聲明隊列 channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null); channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
?綁定交換機和隊列
//5.綁定交換機和隊列 channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*"); channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b"); channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
package rabbitmq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.開啟信道Channel channel = connection.createChannel();//3.聲明交換機channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);//4.聲明隊列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5.綁定交換機和隊列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6、發布消息String msg_a = "hello topic my routingKey is ae.a.f...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes());String msg_b = "hello topic my routingKey is ef.a.b...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());String msg_c = "hello topic my routingKey is c...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes());System.out.println("消息發送成功!");}
}
消費者
package rabbitmq.topic;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立連接// 創建一個ConnectionFactory實例來配置RabbitMQ連接ConnectionFactory connectionFactory = new ConnectionFactory();// 設置RabbitMQ服務器的主機地址connectionFactory.setHost(Constants.HOST);// 設置RabbitMQ服務器的端口號connectionFactory.setPort(Constants.PORT);// 設置登錄RabbitMQ服務器的用戶名connectionFactory.setUsername(Constants.USER_NAME);// 設置登錄RabbitMQ服務器的密碼connectionFactory.setPassword(Constants.PASSWORD);// 設置RabbitMQ服務器的虛擬主機connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory創建一個新的連接Connection connection = connectionFactory.newConnection();//2.創建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.basicConsume(Constants.TOPIC_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 啟動成功!");}
}
public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立連接// 創建一個ConnectionFactory實例來配置RabbitMQ連接ConnectionFactory connectionFactory = new ConnectionFactory();// 設置RabbitMQ服務器的主機地址connectionFactory.setHost(Constants.HOST);// 設置RabbitMQ服務器的端口號connectionFactory.setPort(Constants.PORT);// 設置登錄RabbitMQ服務器的用戶名connectionFactory.setUsername(Constants.USER_NAME);// 設置登錄RabbitMQ服務器的密碼connectionFactory.setPassword(Constants.PASSWORD);// 設置RabbitMQ服務器的虛擬主機connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory創建一個新的連接Connection connection = connectionFactory.newConnection();//2.創建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);channel.basicConsume(Constants.TOPIC_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主體內容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 啟動成功!");}
}
運行結果
生產者
觀察消息隊列界面
消費者
六、RPC(RPC通信)了解
6.1 概念
RPC(Remote Procedure Call) 即遠程調用 它是一種通過網絡從遠程計算機上請求服務,而不是需要了解底層網絡的技術,類似HTTP遠程調用
RabbitMQ實現RPC通信的過程,大概率是通過兩個隊列實現一個可回調的過程
大概流程
1.客戶端發消息到一個指定的隊列,并在消息屬性中設置replyTo字段,這個字段指定一個回調隊列,服務端處理后,會把響應結果發送到這個隊列
2.服務端接收到請求后,處理請求并發送響應消息到replyTo指定的回調隊列
3.客戶端在回調隊列上等待響應消息,一旦收到響應,客戶端會檢查消息的correlationId屬性,以確保它是所期望的響應
客戶端:
1 發送請求(攜帶replyTo,CorrelationID)
2 接收響應(校驗correlationID)
服務端:
1 接受請求,進行響應
2 發送響應(按照客戶端指定的replyTo,設置correlationID)
6.2 代碼實現
客戶端代碼編寫
1、聲明兩個隊列 RPC_REQUEST_QUEUE和RPC_RESPONSE_QUEUE,聲明本次請求的唯一標志correlationID
2、將RPC_RESPONSE_QUEUE和correlationID配置到要發送的消息隊列中
3、使用阻塞隊列來阻塞當前進程,監聽回調隊列中的消息,把請求放到阻塞隊列中
4、阻塞隊列有消息后,主線程被喚醒,打印返回內容
Constants類
public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//rpc 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";}
客戶端代碼
package rabbitmq.rpc;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;/*** rpc 客戶端* 1. 發送請求* 2. 等待響應*/
public class RpcClient {public static void main(String[] args) throws Exception { //1、建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、開啟信道Channel channel = connection.createChannel();channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//3、發送請求String msg = "hello rpc...";//設置請求唯一標識//設置請求的相關屬性// 生成一個唯一的關聯ID,用于跟蹤請求和響應String correlationID = UUID.randomUUID().toString();// 創建并配置AMQP基本屬性,設置消息的關聯ID和回復隊列AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();// 發布消息到指定的請求隊列,攜帶配置的屬性和消息體channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, prop, msg.getBytes());//4、接收響應//使用阻塞隊列,存儲響應信息final ArrayBlockingQueue<String> response = new ArrayBlockingQueue<>(1);channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回調消息:" + respMsg);if (correlationID.equals(properties.getCorrelationId())) {//如果correlationID校驗一致,則將響應信息保存在response中response.offer(respMsg);}}});/*** 阻塞等待響應*/String take = response.take();System.out.println("[RPC Client 響應結果]:" + take);}
}
編寫服務器代碼
package rabbitmq.rpc;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;/*** 1、接受請求* 2、發送響應*/
public class RpcServer {public static void main(String[] args) throws Exception {//1、建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、接受請求channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body);System.out.println("接收到請求:" + request);String response = "針對request:" + request + ",響應成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}});}
}
運行結果:
啟動客戶端
觀察消息隊列界面
啟動服務器端
客戶端輸出消息?
?觀察消息隊列界面
七、Publish Confirms(發布確認模式)
作為消息中間件,都會面臨消息丟失的問題
消息丟失大概分為三種情況
1、生產者問題 因為應用程序故、障網絡抖動等原因,生產者沒有成功想broker發送消息
2、消息中間件自身問題,生產者成功發送給Broker 但是Broker沒有把消息保存好,導致消息丟失
3、消費者問題,Broker發送到消費者,消費者在消費時,因沒處理好,導致消費者失敗的消息從隊列中刪除了
針對問題1 可以采用發確認(Publisher Cofirms)機制實現
生產者將信道設置成confirm(確認)模式,一但信道進入confirm模式,所有在該信道上面發布的消息都是會被指派一個唯一的ID(從1開始),一但消息被投遞到所有匹配的隊列之后,RabbitMq就會發送一個確認給生產者(包括消息的唯一ID)這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么消息確認會在將消息寫入到磁盤之后發出,broker回傳給生產者的確認消息中
deliveryTag包包含了消息的序號,此外broker也可以設置channel basicAck方法中的multiple參數,表示到這個序號之前的所有消息都已經得到處理
發送方確認機制最大的好處是他是異步的,生產者可以同時發布消息和等待信道返回確認消息
1、當消息最終得到確認之后,生產者可以通過回調方法來處理該確認消息
2、如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack(Basic.Nack)命令,生產者同樣可以在回調方法中處理該nack命令
使用發送確認機制,必須要將信道設置成confirm(確認)模式
package rabbitmq.comfirms;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 200;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {// 發布確認publishingMessagesIndividually();// 批量發布確認publishingMessagesInBatchs();// 異步發布確認publishingMessagesAsynchronously();}private static void publishingMessagesAsynchronously() throws Exception {try (Connection connection = createConnection()) {//1 、創建信道Channel channel = connection.createChannel();//2、設置信道為confirm模式channel.confirmSelect();//3、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE3, true, false, false, null);//4、監聽confirm消息//集合中存儲的是未確認的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();} else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();} else {confirmSeqNo.remove(deliveryTag);}//業務需要根據實際場景進行處理,比如重發}});//5.發送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;long seqNO = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNO);}while (!confirmSeqNo.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("異步確認策略,消息條數:%d,耗時:%d ms \n", MESSAGE_COUNT, (end - start));}}private static void publishingMessagesInBatchs() throws Exception {try (Connection connection = createConnection()) {//1 、創建信道Channel channel = connection.createChannel();//2、設置信道為confirm模式channel.confirmSelect();//3、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE2, true, false, false, null);//4、發送消息long start = System.currentTimeMillis();int batchSize = 100;int outStandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE2, null, msg.getBytes());outStandingMessageCount++;if (outStandingMessageCount == batchSize) {//6、等待確認channel.waitForConfirms(5000);outStandingMessageCount = 0;}}if (outStandingMessageCount > 0) {channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("批量確認策略,消息條數:%d,耗時:%d ms \n", MESSAGE_COUNT, (end - start));}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {//1 、創建信道Channel channel = connection.createChannel();//2、設置信道為confirm模式channel.confirmSelect();//3、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE1, true, false, false, null);//4、發送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE1, null, msg.getBytes());//5、等待確認channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("單獨確認策略,消息條數:%d,耗時:%d ms \n", MESSAGE_COUNT, (end - start));} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException | InterruptedException e) {throw new RuntimeException(e);}}
}
publishing Messages Individually(單獨確認)
Publishing Messages in Batches(批量確認)
?