RabbitMQ Topic RPC

Topics(通配符模式)

Topics 和Routing模式的區別是:

  1. topics 模式使?的交換機類型為topic(Routing模式使?的交換機類型為direct)
  2. topic 類型的交換機在匹配規則上進?了擴展, Binding Key?持通配符匹配(direct類型的交換機路 由規則是BindingKey和RoutingKey完全匹配)

在topic類型的交換機在匹配規則上, 有些要求:

  1. RoutingKey 是?系列由點( . )分隔的單詞, ?如 " stock.usd.nyse ", " nyse.vmw ", " quick.orange.rabbit "
  2. BindingKey 和RoutingKey?樣, 也是點( . )分割的字符串
  3. Binding Key中可以存在兩種特殊字符串, ?于模糊匹配
  • * 表??個單詞
  • # 表?多個單詞(0-N個)

?如:

  • Binding Key 為"d.a.b" 會同時路由到Q1 和Q2
  • Binding Key 為"d.a.f" 會路由到Q1
  • Binding Key 為"c.e.f" 會路由到Q2
  • Binding Key 為"d.b.f" 會被丟棄, 或者返回給?產者(需要設置mandatory參數)

引?依賴

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

編寫?產者代碼 和路由模式, 發布訂閱模式的區別是:

交換機類型不同, 綁定隊列的RoutingKey不同

創建交換機

定義交換機類型為BuiltinExchangeType.TOPIC

channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, false, null);

聲明隊列

channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false, null);

綁定交換機和隊列

//隊列1綁定error, 僅接收error信息
channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,
"*.error");
//隊列2綁定info, error: error,info信息都接收
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"#.info");
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"*.error");

發送消息

String msg = "hello topic, I'm order.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBy
tes());
String msg_black = "hello topic, I'm order.pay.info";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_bl
ack.getBytes());
String msg_green= "hello topic, I'm pay.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.g
etBytes());

完整代碼:

public static String TOPIC_EXCHANGE_NAME = "test_topic";
public static String TOPIC_QUEUE_NAME1 = "topic_queue1";
public static String TOPIC_QUEUE_NAME2 = "topic_queue2";import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;
public class TopicRabbitProducer {public static void main(String[] args) throws Exception {//1. 創建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默認值localhostfactory.setPort(Constants.PORT); //默認值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虛擬機名稱, 默認 /factory.setUsername(Constants.USER_NAME);//??名,默認guestfactory.setPassword(Constants.PASSWORD);//密碼, 默認guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 創建交換機channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,
BuiltinExchangeType.TOPIC, true, false, false, null);//3. 聲明隊列//如果沒有?個這樣的?個隊列, 會?動創建, 如果有, 則不創建channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false,
null);channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false,
null);//4. 綁定隊列和交換機//隊列1綁定error, 僅接收error信息channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,
"*.error");//隊列2綁定info, error: error,info信息都接收channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"#.info");channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,
"*.error");//5. 發送消息String msg = "hello topic, I'm order.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBy
tes());String msg_black = "hello topic, I'm order.pay.info";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_bl
ack.getBytes());String msg_green= "hello topic, I'm pay.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.g
etBytes());//6.釋放資源channel.close();connection.close();}
}

編寫消費者代碼

Routing模式的消費者代碼和Routing模式代碼?樣, 修改消費的隊列名稱即可

同樣復制出來兩份

消費者1:TopicRabbitmqConsumer1

消費者2: TopicRabbitmqConsumer2

完整代碼:

import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class TopicRabbitmqConsumer1 {public static void main(String[] args) throws Exception {//1. 創建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默認值localhostfactory.setPort(Constants.PORT); //默認值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虛擬機名稱, 默認 /factory.setUsername(Constants.USER_NAME);//??名,默認guestfactory.setPassword(Constants.PASSWORD);//密碼, 默認guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 接收消息, 并消費DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);}
}

運?程序, 觀察結果

運??產者, 可以看到隊列的消息數

運?消費者

RPC(RPC通信)

RPC(Remote Procedure Call), 即遠程過程調?. 它是?種通過?絡從遠程計算機上請求服務, ?不需要 了解底層?絡的技術. 類似于Http遠程調?

RabbitMQ實現RPC通信的過程, ?概是通過兩個隊列實現?個可回調的過程

?概流程如下:

  1. 客?端發送消息到?個指定的隊列, 并在消息屬性中設置replyTo字段, 這個字段指定了?個回調隊 列, 服務端處理后, 會把響應結果發送到這個隊列
  2. 服務端接收到請求后, 處理請求并發送響應消息到replyTo指定的回調隊列
  3. 客?端在回調隊列上等待響應消息. ?旦收到響應,客?端會檢查消息的correlationId屬性,以確 保它是所期望的響應

引?依賴

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

編寫客?端代碼 客?端代碼主要流程如下:

  1. 聲明兩個隊列, 包含回調隊列replyQueueName, 聲明本次請求的唯?標志corrId
  2. 將replyQueueName和corrId配置到要發送的消息隊列中
  3. 使?阻塞隊列來阻塞當前進程, 監聽回調隊列中的消息, 把請求放到阻塞隊列中
  4. 阻塞隊列有消息后, 主線程被喚醒,打印返回內容

聲明隊列

//2. 聲明隊列, 發送消息
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, false,
null);

定義回調隊列

// 定義臨時隊列,并返回?成的隊列名稱
String replyQueueName = channel.queueDeclare().getQueue();

使?內置交換機發送消息

// 本次請求唯?標志
String corrId = UUID.randomUUID().toString();
// ?成發送消息的屬性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId) // 唯?標志本次請求.replyTo(replyQueueName) // 設置回調隊列.build();
// 通過內置交換機, 發送消息
String message = "hello rpc...";
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props,
message.getBytes());

使?阻塞隊列, 來存儲回調結果

// 阻塞隊列,?于存儲回調結果
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
//接收服務端的響應
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回調消息:"+ new String(body));//如果唯?標識正確, 放到阻塞隊列中if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body, "UTF-8"));}}
};
channel.basicConsume(replyQueueName, true, consumer);

獲取回調結果

// 獲取回調的結果
String result = response.take();
System.out.println(" [RPCClient] Result:" + result);

完整代碼

public static String RPC_REQUEST_QUEUE_NAME = "rpc_request_queue";import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class RPCClient {public static void main(String[] args) throws Exception {//1. 創建Channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默認值localhostfactory.setPort(Constants.PORT); //默認值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虛擬機名稱, 默認 /factory.setUsername(Constants.USER_NAME);//??名,默認guestfactory.setPassword(Constants.PASSWORD);//密碼, 默認guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 聲明隊列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false,
false, null);// 唯?標志本次請求String corrId = UUID.randomUUID().toString();// 定義臨時隊列,并返回?成的隊列名稱String replyQueueName = channel.queueDeclare().getQueue();// ?成發送消息的屬性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId) // 唯?標志本次請求.replyTo(replyQueueName) // 設置回調隊列.build();// 通過內置交換機, 發送消息String message = "hello rpc...";channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props,
message.getBytes());// 阻塞隊列,?于存儲回調結果final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//接收服務端的響應DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回調消息:"+ new String(body));if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body, "UTF-8"));}}};channel.basicConsume(replyQueueName, true, consumer);// 獲取回調的結果String result = response.take();System.out.println(" [RPCClient] Result:" + result);//釋放資源channel.close();connection.close();}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/81737.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/81737.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/81737.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

服務器死機了需要檢查哪些問題

在這個數字化的時代&#xff0c;服務器就像是我們信息世界的“大管家”&#xff0c;可要是它突然死機了&#xff0c;那可真是讓人頭疼。今天咱們就來聊聊&#xff0c;服務器死機了&#xff0c;到底需要檢查哪些問題。 一、硬件問題 電源供應&#xff1a;檢查電源是否穩定&…

【MySQL成神之路】運算符總結

MySQL運算符總結 MySQL提供了豐富的運算符&#xff0c;用于在SQL語句中進行各種計算和比較操作。這些運算符可以分為算術運算符、比較運算符、邏輯運算符、位運算符等幾大類。合理使用這些運算符可以構建復雜的查詢條件和計算表達式。 一、算術運算符 MySQL支持基本的算術運…

自用Vscode 配置c++ debug環境

前言 使用vscode配置c debug環境的好處 1、可以借助vscode方便輕量的擴展和功能 2、避免了傳統使用gdb 復雜按鍵以及不夠直觀的可視化 3、方便一次運行&#xff0c;斷點處查看變量&#xff0c;降低找bug難度 4、某大公司項目采用類似配置&#xff0c;經過實踐檢驗 配置c運行環…

創建一個使用 GPT-4o 和 SERP 數據的 RAG 聊天機器人

亮數據-網絡IP代理及全網數據一站式服務商屢獲殊榮的代理網絡、強大的數據挖掘工具和現成可用的數據集。亮數據&#xff1a;網絡數據平臺領航者https://www.bright.cn/?promogithub15?utm_sourceorganic-social-cn&utm_campaigncsdn 本指南將解釋如何使用 Python、GPT-4…

吳恩達 Deep Learning(1-36)ppt逐行理解

課程地址&#xff1a;(超爽中英!) 2024公認最好的【吳恩達深度學習】教程&#xff01;附課件代碼 Professionalization of Deep Learning_嗶哩嗶哩_bilibili 1.目錄 2.什么是神經網絡 3.用神經網絡進行監督學習 4.為什么深度學習會興起 7.二分分類 適用于二元分類問題的函數&…

三維點云的處理

1 點云原理 https://zh.wikipedia.org/wiki/%E9%BB%9E%E9%9B%B2 點云&#xff08;英語&#xff1a;point cloud&#xff09;是空間中點的數據集&#xff0c;可以表示三維形狀或對象&#xff0c;通常由三維掃描儀獲取。點云中每個點的位置都由一組笛卡爾坐標(X,Y,Z)描述[1]&…

鴻蒙HarmonyOS多設備流轉:分布式的智能協同技術介紹

隨著物聯網和智能設備的普及&#xff0c;多設備間的無縫協作變得越來越重要。鴻蒙&#xff08;HarmonyOS&#xff09;作為華為推出的新一代操作系統&#xff0c;其分布式技術為實現多設備流轉提供了強大的支持。本文將詳細介紹鴻蒙多設備流轉的技術原理、實現方式和應用場景。 …

Spring Boot- 2 (數萬字入門教程 ):數據交互篇

JDBC交互框架: Spring的JDBC操作工具: 依賴: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> JDBC的模版類:JdbcTemplate 引入Mysql的依賴 <depe…

在 Kotlin 中,什么是內聯函數?有什么作用?

在 Kotlin 中&#xff0c;內聯函數是一種通過 inline 關鍵字聲明的函數&#xff0c;其主要目的是優化高階函數&#xff08;即以函數作為參數或返回值的函數&#xff09;的性能。 內聯函數的函數體會在編譯時直接插入到調用處&#xff0c;從而避免函數調用的開銷&#xff0c;并…

LLM筆記(五)概率論

1. 隨機變量與概率分布&#xff1a;模型輸出的基礎 在LLM中&#xff0c;隨機變量最直觀的體現就是模型預測的下一個token。每個時刻&#xff0c;模型都會輸出一個概率分布&#xff0c;表示詞匯表中每個token可能是"下一個詞"的概率。 直觀理解 想象模型在處理句子…

LeetCode-滑動窗口-找到字符串中所有字母異位詞

LeetCode-滑動窗口-找到字符串中所有字母異位詞 ?? 關于專欄&#xff1a;專欄用于記錄 prepare for the coding test。 文章目錄 LeetCode-滑動窗口-找到字符串中所有字母異位詞&#x1f4dd; 找到字符串中所有字母異位詞&#x1f3af;題目描述&#x1f50d; 輸入輸出示例&am…

PostgreSQL 初體驗

目錄 一、PostgreSQL 1. 簡介 2. 特點 &#xff08;1&#xff09; 開源免費&#xff08;Open Source&#xff09; &#xff08;2&#xff09;標準兼容&#xff08;SQL Compliance&#xff09; &#xff08;3&#xff09; 豐富的數據類型&#xff08;Data Types&#xff09…

05_核支持向量機

描述 核支持向量機&#xff08;通常簡稱為SVM&#xff09;可以推廣到更復雜模型的擴展&#xff0c;這些模型無法被輸入空間的超平面定義。 SVM 的核心思想是找到一個最優的超平面&#xff0c;將不同類別的數據分開。這個超平面不僅要能夠正確分類數據&#xff0c;還要使得兩個…

Java + 鴻蒙雙引擎:ZKmall開源商城如何定義下一代B2C商城技術標準?

在 B2C 電商領域持續革新的當下&#xff0c;技術架構的優劣成為決定商城競爭力的核心要素。ZKmall開源商城以其創新融合的 Java 與鴻蒙雙引擎&#xff0c;為下一代 B2C 商城技術標準勾勒出全新藍圖&#xff0c;在性能、兼容性、拓展性等關鍵維度實現了重大突破。 一、Java 技術…

關于 Web 漏洞原理與利用:3. CSRF(跨站請求偽造)

一、原理&#xff1a; 利用用戶登錄態偽造操作 CSRF&#xff08;Cross-Site Request Forgery&#xff0c;跨站請求偽造&#xff09;是攻擊者“借刀殺人”&#xff0c;借用用戶瀏覽器中已有的登錄狀態&#xff0c;誘導用戶完成攻擊者指定的操作。 1. 基本機制分解 1&#xf…

【HTML5】【AJAX的幾種封裝方法詳解】

【HTML5】【AJAX的幾種封裝方法詳解】 AJAX (Asynchronous JavaScript and XML) 封裝是為了簡化重復的異步請求代碼&#xff0c;提高開發效率和代碼復用性。下面我將介紹幾種常見的 AJAX 封裝方式。 方法1. 基于原生 XMLHttpRequest 的封裝 XMLHttpRequest。其主要特點如下…

C++ - 網絡編程之初始連接(Winsock2 概述、初始連接案例、初始連接案例解讀)

一、Winsock2 概述 Winsock2&#xff08;Windows Sockets 2&#xff09;是微軟提供的 Windows 平臺網絡編程庫 二、初始連接案例 1、Server #include <winsock2.h> #include <ws2tcpip.h> #include <iostream>#pragma comment(lib, "ws2_32.lib&quo…

Spring Cloud Gateway深度解析:原理、架構與生產實踐

文章目錄 前言一、概述二、核心架構設計及設計原理2.1 分層架構模型網絡層&#xff08;I/O模型&#xff09;核心處理層 2.2 核心組件協作流程路由定位階段過濾器執行階段 2.3 響應式編程模型實現Reactor上下文傳遞背壓處理機制 2.4 動態路由設計原理2.5 異常處理體系2.6 關鍵路…

游戲開發實戰(一):Python復刻「崩壞星穹鐵道」嗷嗚嗷嗚事務所---源碼級解析該小游戲背后的算法與設計模式【純原創】

文章目錄 奇美拉項目游戲規則奇美拉(Chimeras)檔案領隊成員 結果展示&#xff1a; 奇美拉項目 由于項目工程較大&#xff0c;并且我打算把我的思考過程和實現過程中踩過的坑都分享一下&#xff0c;因此會分3-4篇博文詳細講解本項目。本文首先介紹下游戲規則并給出奇美拉檔案。…

說一下響應狀態碼有哪些?

HTTP響應狀態碼分類(RFC 7231標準) 1. 1xx(信息類) 臨時響應,表示請求已被接收,需要繼續處理 100 Continue:客戶端應繼續發送請求體 101 Switching Protocols:服務器同意升級協議(如WebSocket) 102 Processing(WebDAV):服務器正在處理但未完成 2. 2xx(成功類)…