消息隊列 2.RabbitMQ的基本概念與使用

RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol)協議的開源消息中間件,主要用于實現分布式系統中的消息傳遞,支持異步通信、系統解耦、流量削峰等場景。在 Java 生態中,RabbitMQ 被廣泛應用,其 Java 客戶端提供了簡潔的 API,方便開發者快速集成。

AMQP 協議

核心概念

1.?消息模型
AMQP 采用生產者 - 消費者模型,但引入了更復雜的路由機制:
  • 生產者(Producer):發送消息的應用
  • 消費者(Consumer):接收消息的應用
  • 消息中間件(Broker):負責接收、存儲和轉發消息
2.?核心組件

AMQP(Advanced Message Queuing Protocol)是一種開放標準的應用層協議,專為消息隊列設計。它定義了客戶端與消息中間件之間的通信規范,確保不同廠商的實現可以互操作。

+----------+    +---------+    +----------+
| Producer | -> | Exchange| -> | Queue    | -> Consumer
+----------+    +---------+    +----------+|v+---------+| Binding |+---------+
  • Exchange(交換器)

接收生產者的消息

根據規則(Binding)將消息路由到隊列

類型包括:Direct、Topic、Fanout、Headers

  • Queue(隊列)

存儲消息直到被消費

支持多個消費者競爭消費

消息可持久化存儲

  • Binding(綁定)

定義 Exchange 與 Queue 之間的關聯

通過 Binding Key(綁定鍵)和 Routing Key(路由鍵)匹配

工作流程

1.生產者發送消息

????????指定消息的 Routing Key

????????將消息發送到特定的 Exchange

2.Exchange 路由邏輯

????????Direct Exchange:按 Routing Key 精確匹配

????????Topic Exchange:按 Routing Key 的模式匹配(支持*#通配符)

????????Fanout Exchange:將消息廣播到所有綁定的隊列

????????Headers Exchange:按消息頭部屬性匹配

3.消費者接收消息

????????從隊列中拉取或訂閱消息

????????處理完成后發送確認(ACK)

RabbitMQ 核心概念

在使用 RabbitMQ 前,需先理解其核心組件和消息流轉邏輯:

組件

作用

生產者(Producer)

消息的發送方,負責創建并發送消息到 RabbitMQ 服務器。

消費者(Consumer)

消息的接收方,監聽隊列并處理接收到的消息。

隊列(Queue)

消息的存儲容器,位于 RabbitMQ 服務器中,消息最終會被投遞到隊列中等待消費。

交換機(Exchange)

接收生產者發送的消息,并根據綁定規則(Binding)將消息路由到對應的隊列。

綁定(Binding)

定義交換機與隊列之間的關聯關系,包含路由鍵(Routing Key)和匹配規則。

路由鍵(Routing Key)

生產者發送消息時指定的鍵,交換機根據該鍵和綁定規則路由消息。

RabbitMQ 消息流轉流程

消息從生產者到消費者的完整路徑為:
生產者 → 交換機(根據 Routing Key 和綁定規則)→ 隊列 → 消費者
  • 生產者發送消息時,需指定交換機名稱路由鍵
  • 交換機根據自身類型(如 Direct、Topic 等)和綁定規則,將消息轉發到匹配的隊列;
  • 消費者監聽隊列,獲取并處理消息。

Java 操作 RabbitMQ 基礎示例

1. 連接 RabbitMQ 服務器

所有操作的前提是建立與 RabbitMQ 的連接,需指定服務器地址、端口、賬號密碼(默認賬號guest僅允許本地連接,遠程連接需配置新用戶)。

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQConnection {// RabbitMQ連接配置private static final String HOST = "localhost"; // 服務器地址private static final int PORT = 5672; // 默認端口private static final String USERNAME = "guest";private static final String PASSWORD = "guest";// 獲取連接public static Connection getConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);return factory.newConnection();}
}

2. 生產者發送消息

生產者需完成以下步驟:
  1. 創建連接和通道(Channel);
  2. 聲明交換機(可選,若使用默認交換機則無需聲明);
  3. 聲明隊列(指定隊列名稱、是否持久化等);
  4. 綁定交換機與隊列(若使用自定義交換機);
  5. 發送消息(指定交換機、路由鍵、消息內容)。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 隊列名稱(需與消費者一致)private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 獲取連接Connection connection = RabbitMQConnection.getConnection();// 2. 創建通道(RabbitMQ的操作大多通過通道完成)Channel channel = connection.createChannel();// 3. 聲明隊列(參數:隊列名、是否持久化、是否排他、是否自動刪除、附加參數)channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4. 消息內容String message = "Hello, RabbitMQ from Java!";// 5. 發送消息(參數:交換機名、路由鍵、消息屬性、消息字節數組)// 此處使用默認交換機(""),路由鍵需與隊列名一致channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生產者發送消息:" + message);// 6. 關閉資源channel.close();connection.close();}
}

3. 消費者接收消息

消費者需持續監聽隊列,步驟如下:
  1. 創建連接和通道;
  2. 聲明隊列(需與生產者隊列名一致);
  3. 定義消息處理邏輯(通過DefaultConsumer回調);
  4. 開啟消費(指定隊列、是否自動確認消息)。
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer {private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 獲取連接Connection connection = RabbitMQConnection.getConnection();// 2. 創建通道Channel channel = connection.createChannel();// 3. 聲明隊列(需與生產者一致,重復聲明不會報錯)channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("消費者已啟動,等待接收消息...");// 4. 定義消息處理回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("消費者接收消息:" + message);};// 5. 開啟消費(參數:隊列名、是否自動確認、消息接收回調、取消消費回調)// 自動確認(autoAck=true):消息被接收后自動從隊列刪除;false則需手動確認channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

Spring AMQP簡化 RabbitMQ

在 Spring Boot 項目中,可通過Spring AMQP簡化 RabbitMQ 的使用,其封裝了底層 API,提供注解驅動開發:

1.引入依賴:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置application.yml

spring:rabbitmq:host: localhostport: 5673username: guestpassword: guest

3. 生產者(使用RabbitTemplate):

@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("queue_name", message);
}

4.消費者(使用@RabbitListener注解):

@RabbitListener(queues = "queue_name")
public void receiveMessage(String message) {System.out.println("接收消息:" + message);
}

交換機類型及 Java 實現

RabbitMQ 的交換機負責消息路由,不同類型的交換機路由規則不同,需根據場景選擇:
1. Direct 交換機(精確匹配)
  • 路由規則:消息的路由鍵與綁定的路由鍵完全一致時,消息被路由到對應隊列。
  • 適用場景:一對一通信(如訂單通知)。
// 生產者聲明Direct交換機并綁定隊列
String EXCHANGE_NAME = "direct_exchange";
String ROUTING_KEY = "order.notify";
// 聲明Direct交換機(參數:交換機名、類型、是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false);
// 綁定交換機與隊列(參數:隊列名、交換機名、路由鍵)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 發送消息(指定交換機和路由鍵)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
2. Topic 交換機(模糊匹配)
  • 路由規則:路由鍵支持通配符(*匹配一個單詞,#匹配多個單詞,單詞以.分隔)。
  • 適用場景:多規則匹配(如日志分類:log.errorlog.warn)。
    // 生產者聲明Topic交換機
    String EXCHANGE_NAME = "topic_exchange";
    // 路由鍵為"log.error"(匹配綁定鍵"log.*"或"log.#")
    String ROUTING_KEY = "log.error";
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);
    // 綁定隊列到交換機,綁定鍵為"log.#"(匹配所有以log.開頭的路由鍵)
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
3. Fanout 交換機(廣播)
  • 路由規則:忽略路由鍵,將消息路由到所有綁定的隊列。
  • 適用場景:一對多通信(如廣播通知)。
// 生產者聲明Fanout交換機
String EXCHANGE_NAME = "fanout_exchange";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false);
// 綁定多個隊列到交換機(無需指定路由鍵)
channel.queueBind(QUEUE1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE2, EXCHANGE_NAME, "");
// 發送消息(路由鍵無效,可設為空)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

RabbitMQ 應用場景

  • 異步通信:如用戶注冊后異步發送郵件 / 短信通知;
  • 系統解耦:訂單系統與庫存系統通過消息通信,避免直接依賴;
  • 流量削峰:秒殺場景中,通過隊列緩沖請求,避免服務器過載;
  • 日志收集:多服務日志通過 Fanout 交換機廣播到日志處理服務。

總結

RabbitMQ 憑借其靈活的路由機制、可靠的消息傳遞和豐富的特性,成為 Java 分布式系統中消息中間件的首選之一。通過 Java 客戶端或 Spring AMQP,開發者可快速實現消息的生產、消費及高級功能,有效提升系統的可擴展性和穩定性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/915332.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/915332.shtml
英文地址,請注明出處:http://en.pswp.cn/news/915332.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【web安全】SQL注入與認證繞過

目錄 一、SQL注入漏洞 1.1 基礎注入原理 1.2 實用注入Payload分類 邏輯繞過型 注釋截斷型 聯合查詢型 常見的萬能密碼-CSDN博客 二、登錄繞過實戰技巧 2.1 基礎繞過手法 2.2 高級繞過技巧 編碼繞過 多重注釋 參數污染 三、密碼重置漏洞利用 3.1 常見漏洞模式 3…

Python適配器模式詳解:讓不兼容的接口協同工作

一、模式定義與核心思想 適配器模式&#xff08;Adapter Pattern&#xff09; 是一種結構型設計模式&#xff0c;它通過創建一個中間層&#xff08;適配器&#xff09;&#xff0c;將不兼容的接口轉換為客戶端期望的接口。就像現實中的電源適配器&#xff0c;讓不同國家的插頭…

微信小程序列表數據上拉加載,下拉刷新

1.上拉加載數據&#xff0c;數據 下一頁數據 前面的數據&#xff08;[...this.data.list, ...data.records&#xff09;2.當用戶上拉加載過快時&#xff0c;會不停的調用接口&#xff0c;需要節流閥isLoading3.上拉加載到最后一頁的判斷&#xff0c;isFinish// pages/list.js…

【樹上倍增 LCA DFS 前綴和】P10391 [藍橋杯 2024 省 A] 零食采購|普及+

本文涉及知識點 C算法&#xff1a;前綴和、前綴乘積、前綴異或的原理、源碼及測試用例 包括課程視頻 CDFS 樹上倍增 LCA P10391 [藍橋杯 2024 省 A] 零食采購 題目描述 小藍準備去星際旅行&#xff0c;出發前想在本星系采購一些零食&#xff0c;星系內有 nnn 顆星球&#x…

PDF發票批量打印工具哪個好?高效打印發票的實用工具推薦

開小超市這幾年&#xff0c;每月要打幾十張進貨發票做賬&#xff0c;以前打印時總犯愁&#xff1a;有的發票 PDF 太大&#xff0c;打出來字小得看不清&#xff1b;有的又太窄&#xff0c;白白浪費半張紙。試過手動調整&#xff0c;每張都要改縮放比例&#xff0c;累不說&#x…

4G模塊 A7680通過MQTT協議連接到華為云

命令說明 基礎AT指令 ATi顯示產品的標志信息 ATCIMI查詢IMSI ATCICCID從SIM卡讀取ICCID ATCGSN查詢產品序列號 ATCPIN查詢卡狀態 ATCSQ查詢信號強度 ATCGATT查詢當前PS域狀態 ATCREG查詢GPRS注冊狀態 ATCEREG查詢4G注冊狀態 ATCGPADDR查詢PDP地址 ATCMGF選擇短信格式 ATCMGS發…

大模型詞表設計與作用解析

幾乎所有大型語言模型&#xff08;LLM&#xff09;都有自己獨立的詞表&#xff08;Vocabulary&#xff09;。這是模型設計和訓練過程中的核心組件之一。以下是關于詞表的關鍵點&#xff1a; 1. 詞表的作用 分詞基礎&#xff1a;詞表定義了模型如何將輸入文本拆分成基本單元&…

(一)Eshop(異常處理中間件/grpc)

文章目錄項目地址一、異常處理1.1 自定異常1.2 自定義異常處理中間件1.3 注冊中間件二、grpc服務2.1 創建protos1. 打折的protos2. 設置grpc server3. program配置服務4. docker-compose2.2 CRUD1. 查詢2.3 測試1. 發起查詢請求三、grpc服務消費3.1 創建client1. 添加服務2. 選…

BLIP、InternVL Series(下)

目錄 一、InternVL1.5 1、改進 二、InternVL2 1、漸進式擴展 2、多模態擴展 三、InternVL2.5 1、方法 2、數據優化 四、InternVL3 2、方法 3、訓練后處理 4、測試時擴展 五、BLIP-3o 一、InternVL1.5 1、改進 InternVL1.5在InternVL基礎上&#xff0c;優化了QLLa…

【數據結構】二維差分數組

題目鏈接 【模板】二維差分_牛客題霸_牛客網 牛客網 - 找工作神器|筆試題庫|面試經驗|實習招聘內推&#xff0c;求職就業一站解決_牛客網 描述 給定一個 nmnm 的整數矩陣 bb&#xff0c;矩陣的下標從 11 開始記作 bi,jbi,j?。現在需要支持 qq 次操作&#xff0c;第 tt 次…

【JDK內置工具】常用工具和實戰指令

作者&#xff1a;唐叔在學習 專欄&#xff1a;唐叔的Java實踐 關鍵詞: #JDK工具 #Java性能調優 #JVM調優 #內存泄漏排查 #線程死鎖分析 #Java開發工具 #線上問題排查 #Java診斷工具 Hello&#xff0c;大家好&#xff0c;我是愛學習的唐叔。作為Java開發者&#xff0c;JDK內置工…

一站式PDF轉Markdown解決方案PDF3MD

簡介 什么是 PDF3MD &#xff1f; PDF3MD 是一個現代化、用戶友好的網絡應用程序&#xff0c;旨在將 PDF 文檔轉換為干凈、格式化的 Markdown 文本。它提供了高效的轉換工具&#xff0c;支持多種文件格式之間的轉換。 主要特點 PDF 轉 Markdown&#xff1a;能夠將 PDF 文檔轉…

RocketMQ學習系列之——MQ入門概念

一、什么是MQMQ&#xff08;Message Queue&#xff0c;消息隊列&#xff09;是一種能夠實現跨進程消息傳輸&#xff0c;并且消息緩存符合隊列特性的組件。二、MQ的作用異步&#xff1a;消息發送方無需等待消息接收方收到消息&#xff0c;發送方將消息成功發送到 MQ 之后即可無阻…

血條識別功能實現及原理

從零開始學Python圖像處理 - 血條識別 從實際問題中能快速的學習特定技能&#xff0c;通過完成一個能自動刷怪的工具&#xff0c;達成快速學習python圖像處理和識別。 自動刷怪需要先識別怪物&#xff0c;在游戲中怪物類型很多&#xff0c;同時在移動中形態會一直發生變化&…

網絡地址和主機地址之間進行轉換的類

#pragma once #include "Common.hpp" // 網絡地址和主機地址之間進行轉換的類class InetAddr { public:InetAddr(){}InetAddr(struct sockaddr_in &addr) : _addr(addr){// 網絡轉主機_port ntohs(_addr.sin_port); // 從網絡中拿到的&#xff01;網絡序列// _i…

《Python 項目 CI/CD 實戰指南:從零構建自動化部署流水線》

??《Python 項目 CI/CD 實戰指南:從零構建自動化部署流水線》 一、引言:為什么 Python 項目需要 CI/CD? 在現代軟件開發中,CI/CD(持續集成 / 持續部署)已成為不可或缺的工程實踐。它不僅提升了開發效率,還顯著降低了部署風險。對于 Python 項目而言,CI/CD 的價值尤…

AJAX 技術

AJAX全稱是 Asynchronous JavaScript and XML ( 異步的JavaScript 和 XML )&#xff0c;使用該技術后&#xff0c;可以實現不刷新整個網頁&#xff0c;與服務器進行異步通信并更新部分網頁。一&#xff09;為什么需要AJAX?傳統網頁在與服務器通信時&#xff0c;需要刷新整個頁…

Python爬蟲實戰:研究NLTK庫相關技術

1. 引言 1.1 研究背景與意義 隨著互聯網的快速發展,網絡新聞已成為人們獲取信息的主要來源之一。每天產生的海量新聞文本蘊含著豐富的信息和知識,但也給信息獲取和分析帶來了挑戰。如何從大量非結構化的新聞文本中自動提取有價值的信息,識別熱點話題和趨勢,成為當前自然語…

ARM 學習筆記(二)

參考文獻&#xff1a;《ARM ArchitectureReference Manual ARMv7-A and ARMv7-R edition》1、MMU 1.1 背景早期的內存是比較小的&#xff0c;一般是幾十k&#xff0c;不過相應的程序也是比較小的&#xff0c;這時程序可以直接加載到內存中運行。后來為了支持多個程序的并行&…

Github 貪吃蛇 主頁設置

自動化腳本頂部元信息觸發條件&#xff08;on:&#xff09;作業&#xff08;jobs:&#xff09;步驟&#xff08;steps:&#xff09;1. 生成 SVG2. 推送到 output 分支Commit & Push在 README 里引用參考&#xff1a;https://github.com/Platane/Platane/tree/master 首先寫…