Topics(通配符模式)
Topics 和Routing模式的區別是:
- topics 模式使?的交換機類型為topic(Routing模式使?的交換機類型為direct)
- topic 類型的交換機在匹配規則上進?了擴展, Binding Key?持通配符匹配(direct類型的交換機路 由規則是BindingKey和RoutingKey完全匹配)
在topic類型的交換機在匹配規則上, 有些要求:
- RoutingKey 是?系列由點( . )分隔的單詞, ?如 " stock.usd.nyse ", " nyse.vmw ", " quick.orange.rabbit "
- BindingKey 和RoutingKey?樣, 也是點( . )分割的字符串
- Binding Key中可以存在兩種特殊字符串, ?于模糊匹配
- * 表??個單詞
- # 表?多個單詞(0-N個)
?如:
- Binding Key 為"d.a.b" 會同時路由到Q1 和Q2
- Binding Key 為"d.a.f" 會路由到Q1
- Binding Key 為"c.e.f" 會路由到Q2
- Binding Key 為"d.b.f" 會被丟棄, 或者返回給?產者(需要設置mandatory參數)
引?依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>
編寫?產者代碼 和路由模式, 發布訂閱模式的區別是:
交換機類型不同, 綁定隊列的RoutingKey不同
創建交換機
定義交換機類型為BuiltinExchangeType.TOPIC
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, false, null);
聲明隊列
channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false, null);
綁定交換機和隊列
//隊列1綁定error, 僅接收error信息
channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,
"*.error");
//隊列2綁定info, error: error,info信息都接收
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"#.info");
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"*.error");
發送消息
String msg = "hello topic, I'm order.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBy
tes());
String msg_black = "hello topic, I'm order.pay.info";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_bl
ack.getBytes());
String msg_green= "hello topic, I'm pay.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.g
etBytes());
完整代碼:
public static String TOPIC_EXCHANGE_NAME = "test_topic";
public static String TOPIC_QUEUE_NAME1 = "topic_queue1";
public static String TOPIC_QUEUE_NAME2 = "topic_queue2";import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;
public class TopicRabbitProducer {public static void main(String[] args) throws Exception {//1. 創建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默認值localhostfactory.setPort(Constants.PORT); //默認值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虛擬機名稱, 默認 /factory.setUsername(Constants.USER_NAME);//??名,默認guestfactory.setPassword(Constants.PASSWORD);//密碼, 默認guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 創建交換機channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, false, null);//3. 聲明隊列//如果沒有?個這樣的?個隊列, 會?動創建, 如果有, 則不創建channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false,
null);channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false,
null);//4. 綁定隊列和交換機//隊列1綁定error, 僅接收error信息channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,
"*.error");//隊列2綁定info, error: error,info信息都接收channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"#.info");channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"*.error");//5. 發送消息String msg = "hello topic, I'm order.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBy
tes());String msg_black = "hello topic, I'm order.pay.info";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_bl
ack.getBytes());String msg_green= "hello topic, I'm pay.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.g
etBytes());//6.釋放資源channel.close();connection.close();}
}
編寫消費者代碼
Routing模式的消費者代碼和Routing模式代碼?樣, 修改消費的隊列名稱即可
同樣復制出來兩份
消費者1:TopicRabbitmqConsumer1
消費者2: TopicRabbitmqConsumer2
完整代碼:
import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class TopicRabbitmqConsumer1 {public static void main(String[] args) throws Exception {//1. 創建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默認值localhostfactory.setPort(Constants.PORT); //默認值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虛擬機名稱, 默認 /factory.setUsername(Constants.USER_NAME);//??名,默認guestfactory.setPassword(Constants.PASSWORD);//密碼, 默認guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 接收消息, 并消費DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);}
}
運?程序, 觀察結果
運??產者, 可以看到隊列的消息數
運?消費者
RPC(RPC通信)
RPC(Remote Procedure Call), 即遠程過程調?. 它是?種通過?絡從遠程計算機上請求服務, ?不需要 了解底層?絡的技術. 類似于Http遠程調?
RabbitMQ實現RPC通信的過程, ?概是通過兩個隊列實現?個可回調的過程
?概流程如下:
- 客?端發送消息到?個指定的隊列, 并在消息屬性中設置replyTo字段, 這個字段指定了?個回調隊 列, 服務端處理后, 會把響應結果發送到這個隊列
- 服務端接收到請求后, 處理請求并發送響應消息到replyTo指定的回調隊列
- 客?端在回調隊列上等待響應消息. ?旦收到響應,客?端會檢查消息的correlationId屬性,以確 保它是所期望的響應
引?依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>
編寫客?端代碼 客?端代碼主要流程如下:
- 聲明兩個隊列, 包含回調隊列replyQueueName, 聲明本次請求的唯?標志corrId
- 將replyQueueName和corrId配置到要發送的消息隊列中
- 使?阻塞隊列來阻塞當前進程, 監聽回調隊列中的消息, 把請求放到阻塞隊列中
- 阻塞隊列有消息后, 主線程被喚醒,打印返回內容
聲明隊列
//2. 聲明隊列, 發送消息
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, false,
null);
定義回調隊列
// 定義臨時隊列,并返回?成的隊列名稱
String replyQueueName = channel.queueDeclare().getQueue();
使?內置交換機發送消息
// 本次請求唯?標志
String corrId = UUID.randomUUID().toString();
// ?成發送消息的屬性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId) // 唯?標志本次請求.replyTo(replyQueueName) // 設置回調隊列.build();
// 通過內置交換機, 發送消息
String message = "hello rpc...";
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props,
message.getBytes());
使?阻塞隊列, 來存儲回調結果
// 阻塞隊列,?于存儲回調結果
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
//接收服務端的響應
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回調消息:"+ new String(body));//如果唯?標識正確, 放到阻塞隊列中if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body, "UTF-8"));}}
};
channel.basicConsume(replyQueueName, true, consumer);
獲取回調結果
// 獲取回調的結果
String result = response.take();
System.out.println(" [RPCClient] Result:" + result);
完整代碼
public static String RPC_REQUEST_QUEUE_NAME = "rpc_request_queue";import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class RPCClient {public static void main(String[] args) throws Exception {//1. 創建Channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默認值localhostfactory.setPort(Constants.PORT); //默認值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虛擬機名稱, 默認 /factory.setUsername(Constants.USER_NAME);//??名,默認guestfactory.setPassword(Constants.PASSWORD);//密碼, 默認guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 聲明隊列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false,
false, null);// 唯?標志本次請求String corrId = UUID.randomUUID().toString();// 定義臨時隊列,并返回?成的隊列名稱String replyQueueName = channel.queueDeclare().getQueue();// ?成發送消息的屬性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId) // 唯?標志本次請求.replyTo(replyQueueName) // 設置回調隊列.build();// 通過內置交換機, 發送消息String message = "hello rpc...";channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props,
message.getBytes());// 阻塞隊列,?于存儲回調結果final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//接收服務端的響應DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回調消息:"+ new String(body));if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body, "UTF-8"));}}};channel.basicConsume(replyQueueName, true, consumer);// 獲取回調的結果String result = response.take();System.out.println(" [RPCClient] Result:" + result);//釋放資源channel.close();connection.close();}
}