RabbitMQ 知識詳解(Java版)

RabbitMQ 知識詳解(Java版)

RabbitMQ 是一個開源的消息代理,實現了高級消息隊列協議(AMQP)。它用于在分布式系統中實現應用解耦、異步通信和流量削峰。


核心概念
  1. 生產者(Producer):發送消息的應用
  2. 消費者(Consumer):接收消息的應用
  3. 隊列(Queue):消息存儲的緩沖區
  4. 交換機(Exchange):接收消息并路由到隊列
  5. 綁定(Binding):連接交換機和隊列的規則
  6. 路由鍵(Routing Key):消息的路由標識

交換機類型
類型路由規則典型用途
Direct精確匹配Routing Key點對點通信
Topic模式匹配(支持通配符)多條件路由
Fanout廣播到所有綁定隊列發布/訂閱
Headers消息頭鍵值對匹配復雜路由

Java 示例(使用官方客戶端)

依賴:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>1.7.30</version>
</dependency>

1. 直連交換機(Direct Exchange)

// Producer
public class DirectExchangeProducer {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 綁定不同路由鍵String routingKey1 = "red";String message1 = "重要消息";channel.basicPublish(EXCHANGE_NAME, routingKey1, null, message1.getBytes());System.out.println("發送消息: " + message1);String routingKey2 = "blue";String message2 = "普通消息";channel.basicPublish(EXCHANGE_NAME, routingKey2, null, message2.getBytes());System.out.println("發送消息: " + message2);}}
}// Consumer (紅色隊列)
public class DirectConsumerRed {private static final String EXCHANGE_NAME = "direct_exchange";private static final String QUEUE_NAME = "red_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "red");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("紅色隊列收到消息: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

2. 扇出交換機(Fanout Exchange)

// Producer
public class FanoutExchangeProducer {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "廣播消息";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println("廣播消息已發送");}}
}// Consumer (郵件隊列)
public class FanoutConsumerEmail {private static final String EXCHANGE_NAME = "fanout_exchange";private static final String QUEUE_NAME = "email_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("郵件服務收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

3. 主題交換機(Topic Exchange)

// Producer
public class TopicExchangeProducer {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 發送不同主題的消息String routingKey = "order.create";String message = "訂單創建通知";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("發送訂單創建消息");routingKey = "user.login";message = "用戶登錄通知";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("發送用戶登錄消息");}}
}// Consumer (訂單服務)
public class TopicConsumerOrder {private static final String EXCHANGE_NAME = "topic_exchange";private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.*");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("訂單服務收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

4. 頭交換機(Headers Exchange)

// Producer
public class HeadersExchangeProducer {private static final String EXCHANGE_NAME = "headers_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "headers");// 設置消息頭Map<String, Object> headers = new HashMap<>();headers.put("type", "log");headers.put("level", "error");AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();String message = "系統錯誤日志";channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());System.out.println("發送錯誤日志消息");}}
}// Consumer (日志服務)
public class HeadersConsumerLog {private static final String EXCHANGE_NAME = "headers_exchange";private static final String QUEUE_NAME = "log_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "headers");channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 設置匹配規則 (必須包含type=log且level=error)Map<String, Object> bindingArgs = new HashMap<>();bindingArgs.put("x-match", "all"); // 全部匹配bindingArgs.put("type", "log");bindingArgs.put("level", "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", bindingArgs);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("日志服務收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

通用配置說明

  1. 交換機類型總結:

    • 直連交換機:路由鍵精確匹配
    • 扇出交換機:忽略路由鍵,廣播所有綁定隊列
    • 主題交換機:使用通配符匹配路由鍵
    • 頭交換機:通過消息頭屬性匹配(非路由鍵)
  2. 重要參數:

    • channel.queueDeclare() 參數說明:
      • durable: 是否持久化
      • exclusive: 是否排他
      • autoDelete: 是否自動刪除
    • x-match參數在頭交換機中有兩種模式:
      • “all”: 需匹配所有指定頭
      • “any”: 匹配任意指定頭

關鍵特性(Java實現)

1. 消息持久化
// 聲明持久化隊列
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);// 發送持久化消息
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
2. 公平分發(Prefetch)
// 每次只分發一條消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
3. 消息確認(ACK)
// 消費者關閉自動ACK
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});// 處理完成后手動ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4. 持久化消費者
// 重啟后自動恢復的消費者
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("persistent_queue", true, false, false, args);

使用場景

  1. 服務解耦:訂單系統與庫存系統分離
  2. 異步處理:耗時操作(如郵件發送)
  3. 流量削峰:突發請求緩沖(秒殺系統)
  4. 分布式事務:最終一致性實現
  5. 日志收集:多系統日志聚合

最佳實踐

  1. 連接管理:使用連接池(如Spring AMQP的CachingConnectionFactory)
  2. 異常處理:實現Consumer和Connection的監聽器
  3. 死信隊列:處理失敗消息
  4. 集群部署:保證高可用性
  5. 監控管理:使用RabbitMQ Management Plugin

提示:生產環境推薦使用Spring AMQP簡化開發,它提供了RabbitTemplate和@RabbitListener等便捷工具。

建議運行測試時:

  1. 先啟動所有消費者
  2. 再運行生產者發送消息
  3. 觀察各消費者接收到的消息是否符合路由規則

以上示例展示了RabbitMQ的核心路由機制,在實際生產環境中需添加異常處理、連接恢復、消息確認等機制。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/84968.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/84968.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/84968.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Flink task、Operator 和 UDF 之間的關系

要真正駕馭 Flink 并構建出高效、穩定、可擴展的流處理應用&#xff0c;僅僅停留在 API 的表面使用是遠遠不夠的。深入理解其內部的運行機制&#xff0c;洞悉數據從代碼到分布式執行的完整生命周期&#xff0c;以及明晰各個核心組件之間錯綜復雜而又協同工作的關系&#xff0c;…

Veeam Backup Replication系統的安裝與使用

Veeam Backup & Replication系統安裝與使用 系統簡介 核心功能 備份與恢復&#xff1a;專注于虛擬化環境&#xff08;如VMware和Hyper-V&#xff09;的備份與恢復&#xff0c;支持物理服務器和云環境。快速恢復&#xff1a;提供即時恢復功能&#xff0c;可在幾分鐘內恢復…

十四、【ESP32全棧開發指南:搭建輕量級HTTP服務器】

一、HTTP協議基礎 HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;作為互聯網基礎協議&#xff0c;采用請求-響應模型工作&#xff1a; 1.1 HTTP請求組成 GET /uri?query1value1 HTTP/1.1 // 請求行&#xff08;方法URI協議版本&#xff09; Host: example…

java中LinkedList和ArrayList的區別和聯系?

我們被要求解釋Java中LinkedList和ArrayList的區別和聯系。下面將分別從實現原理、性能特點、使用場景等方面進行詳細說明&#xff0c;并總結它們的相同點和不同點。 # 一、聯系&#xff08;共同點&#xff09; 1. 都實現了List接口&#xff0c;因此具有List接口的所有方法&…

明遠智睿SD2351核心板:邊緣計算時代的工業級核心引擎深度解析

在工業4.0與物聯網深度融合的背景下&#xff0c;邊緣計算設備正從單一功能模塊向高集成度、智能化平臺演進。明遠智睿推出的SD2351核心板&#xff0c;憑借其異構計算架構、工業級接口資源和全棧技術生態&#xff0c;重新定義了邊緣計算設備的性能邊界。本文將從技術架構、場景適…

Flask 動態模塊注冊

目錄 1. 項目概述2. 項目結構3. 核心組件解析3.1 動態模塊注冊系統 (api/__init__.py)3.2 應用程序入口 (setup_demo.py) 4. 模塊開發指南4.1 標準模塊 (*_app.py)4.2 SDK模塊 (sdk/*.py) 5. URL路徑規則6. 如何使用6.1 啟動應用6.2 添加新模塊 7. 工作原理 1. 項目概述 這個項…

JVM 內存、JMM內存與集群機器節點內存的聯系

目錄 1、JVM 內存 1.1、分配機制 1.2、jvm模型位置 1.3、字節碼內存塊 2、JMM內存 2.1、JMM模型 2.2、工作流程圖 1、工作內存與主內存的交互 2. 多線程下的主內存與堆內存交互 2.3、 主內存與工作內存的同步方案 1、volatile 2、synchronized 3、final 3、內存使…

學習昇騰開發的第一天--環境配置

1、昇騰社區官網&#xff1a;昇騰社區官網-昇騰萬里 讓智能無所不及 2、產品-->選擇開發者套件-->點擊制卡工具的下載&#xff1a;資源-Atlas 200I DK A2-昇騰社區 3、如果制卡工具不能使用在線制卡&#xff0c;可以下載鏡像到本地使用本地制卡&#xff1a;Linux系統制…

Android WebView 深色模式適配方案總結

Android WebView 深色模式適配方案總結 在 Android WebView 中適配深色模式&#xff08;Dark Mode&#xff09;是一個常見的需求&#xff0c;尤其是當加載的網頁沒有原生支持 prefers-color-scheme 時。本文將介紹 3 種主流方案&#xff0c;并分析它們的優缺點&#xff0c;幫助…

項目練習:使用mybatis的foreach標簽,實現union all的拼接語句

文章目錄 一、需求說明二、需求分析三、代碼實現四、報表效果 一、需求說明 在sql查詢數據后&#xff0c;對數據分組統計。并最后進行總計。 二、需求分析 最終&#xff0c;我想用sql來實現這個統計和查詢的功能。 那么&#xff0c;怎么又查詢&#xff0c;又統計了&#xf…

7.7 Extracting and saving responses

Chapter 7-Fine-tuning to follow instructions 7.7 Extracting and saving responses 在本節中&#xff0c;我們保存測試集響應以便在下一節中評分&#xff0c;除此之外保存模型的副本以供將來使用。 ? 首先&#xff0c;讓我們簡單看看finetuned模型生成的響應 torch.manu…

計算機網絡第3章(上):數據鏈路層全解析——組幀、差錯控制與信道效率

目錄 一、數據鏈路層的功能二、組幀2.1 字符計數法&#xff08;Character Count&#xff09;2.2 字符填充法&#xff08;Character Stuffing&#xff09;2.3 零比特填充法2.4 違規編碼法 三、差錯控制3.1 檢錯編碼&#xff08;奇偶校驗碼&#xff09;3.2 循環冗余校驗&#xff…

鑄鐵試驗平臺的重要性及應用前景

鑄鐵作為一種重要的金屬材料&#xff0c;在工業生產中扮演著舉足輕重的角色。為了確保鑄鐵制品的質量和性能&#xff0c;鑄鐵材料的試驗是必不可少的環節。而鑄鐵試驗平臺則是進行鑄鐵試驗的關鍵設備之一&#xff0c;它為鑄鐵材料的研究和開發提供了重要的技術支持。本文將探討…

std::shared_ptr引起內存泄漏的例子

目錄 一、循環引用&#xff08;最常見場景&#xff09; 示例代碼 內存泄漏原因 二、共享指針管理的對象包含自身的 shared_ptr 示例代碼 內存泄漏&#xff08;或雙重釋放&#xff09;原因 三、解決方案 1. 循環引用&#xff1a;使用 std::weak_ptr 2. 對象獲取自身的 …

AI 知識數據庫搭建方案:從需求分析到落地實施

AI 知識數據庫的搭建需結合業務場景、數據特性與技術架構&#xff0c;形成系統化解決方案。以下是一套完整的搭建框架&#xff0c;涵蓋規劃、設計、實施及優化全流程&#xff1a; 一、前期規劃&#xff1a;需求分析與目標定義 1. 明確業務場景與知識需求 場景導向&#xff1a…

Tensorflow 基礎知識:變量、常量、占位符、Session 詳解

在深度學習領域,TensorFlow 是一個廣泛使用的開源機器學習框架。想要熟練使用 TensorFlow 進行模型開發,掌握變量、常量、占位符和 Session 這些基礎知識是必不可少的。接下來,我們就深入了解一下它們的概念、用處,并通過代碼示例進行演示。 一、常量(Constant) 常量,顧…

linux 常見問題之如何清除大文件的內容

linux 常見問題之如何清除大文件的內容 在 Linux 系統中&#xff0c;我們有時會遇到文件隨著時間增長變得巨大&#xff0c;最常見的就是服務器的日志文件&#xff0c;隨著時間的推移占用大量的磁盤空間&#xff0c;下面介紹如何清楚大文件的內容&#xff0c;當然避免文件內容過…

薛定諤的貓思想實驗如何推演到量子計算

前言 這是我的選修課作業&#xff0c;但是我并不喜歡小論文方式的寫法&#xff0c;死板又老套。先在這打一份底稿。 薛定諤的貓 可能一說到量子這個關鍵詞&#xff0c;大家第一時間都會想到的是“薛定諤的貓”。 實驗介紹 薛定諤的貓是一個著名的思想實驗&#xff0c;由奧…

嵌入式開發中fmacro-prefix-map選項解析

在嵌入式開發中&#xff0c;-fmacro-prefix-map 是 GCC 和 Clang 等編譯器提供的一個路徑映射選項&#xff0c;主要用于在預處理階段重寫宏定義中出現的絕對路徑。它的核心目的是解決以下問題&#xff1a; 核心作用 構建可重現性 消除編譯輸出&#xff08;如 .o、.d 文件&…

Javaweb學習——day3(Servlet 中處理表單數據)

文章目錄 一、概念學習1. GET vs POST 請求方式的區別2. HttpServletRequest 獲取表單數據 二、代碼講解與練習第 1 步&#xff1a;在 webapp 下創建 login.html第 2 步&#xff1a;在 com.example 包下創建 LoginServlet第 3 步&#xff1a;修改 web.xml 注冊 LoginServlet第 …