大家好,我是工藤學編程 🦉 | 一個正在努力學習的小博主,期待你的關注 |
---|---|
實戰代碼系列最新文章😉 | C++實現圖書管理系統(Qt C++ GUI界面版) |
SpringBoot實戰系列🐷 | 【SpringBoot實戰系列】SpringBoot3.X 整合 MinIO 存儲原生方案 |
分庫分表 | 分庫分表之實戰-sharding-JDBC分庫分表執行流程原理剖析 |
消息隊列 | 深入淺出 RabbitMQ-核心概念介紹與容器化部署 |
前情摘要:
1、深入淺出 RabbitMQ-核心概念介紹與容器化部署
【親測寶藏】發現一個讓 AI 學習秒變輕松的神站!不用啃高數、不用怕編程,高中生都能看懂的人工智能教程來啦!
👉點擊跳轉,和 thousands of 小伙伴一起用快樂學習法征服 AI,說不定下一個開發出爆款 AI 程序的就是你!
本文章目錄
- 深入淺出 RabbitMQ:簡單隊列實戰指南
- 一、什么是RabbitMQ簡單隊列
- 二、環境準備
- 三、實戰代碼實現
- 3.1 消息生產者(Send)
- 3.2 消息消費者(Recv)
- 四、關鍵知識點解析
- 五、測試步驟
- 六、總結
深入淺出 RabbitMQ:簡單隊列實戰指南
一、什么是RabbitMQ簡單隊列
RabbitMQ作為主流的消息中間件,其核心功能是實現系統間的異步通信。簡單隊列(Simple Queue)是RabbitMQ中最基礎的通信模式,采用"點對點"的消息傳遞方式:一個生產者發送消息到隊列,一個消費者從隊列中獲取消息,適用于簡單的異步通信場景。
二、環境準備
- RabbitMQ服務:參考深入淺出 RabbitMQ-核心概念介紹與容器化部署)
同時,為了方便環境之間的隔離(開發、測試、上線),我們需要按照如圖添加一個虛擬環境
- 依賴配置:在Maven項目中添加AMQP客戶端依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>
三、實戰代碼實現
3.1 消息生產者(Send)
生產者負責創建連接、聲明隊列并發送消息,核心代碼如下:
public class Send {// 隊列名稱(生產者和消費者需使用相同名稱)private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 1. 創建連接工廠ConnectionFactory factory = new ConnectionFactory();// 配置連接參數factory.setHost("192.168.229.128"); // 服務器地址factory.setPort(5672); // 端口(默認5672)factory.setUsername("admin"); // 用戶名factory.setPassword("password"); // 密碼factory.setVirtualHost("/dev"); // 虛擬主機(需提前創建)// 2. 創建連接和信道(使用try-with-resources自動關閉資源)try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 聲明隊列(不存在則創建)/*** 參數說明:* 1. queue:隊列名稱* 2. durable:是否持久化(重啟RabbitMQ后隊列依然存在)* 3. exclusive:是否獨占(僅當前連接可訪問,連接關閉后自動刪除)* 4. autoDelete:是否自動刪除(當最后一個消費者斷開后自動刪除)* 5. arguments:額外參數(如隊列長度限制等)*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4. 發送消息String message = "Hello World!";/*** 參數說明:* 1. exchange:交換機名稱(使用默認交換機"")* 2. routingKey:路由鍵(默認交換機下需與隊列名稱一致)* 3. props:消息屬性(如優先級、過期時間等)* 4. body:消息體(字節數組)*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] 已發送消息: '" + message + "'");}}
}
3.2 消息消費者(Recv)
消費者負責監聽隊列并處理消息,核心代碼如下:
public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 1. 創建連接工廠(配置與生產者一致)ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.229.128");factory.setUsername("admin");factory.setPassword("password");factory.setVirtualHost("/dev"); // 注意:需與生產者使用相同虛擬主機factory.setPort(5672);// 2. 創建連接和信道(消費者通常保持長連接,不使用自動關閉)Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3. 聲明隊列(與生產者保持一致,確保隊列存在)channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] 等待接收消息中...(按CTRL+C退出)");// 4. 定義消息處理回調Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消費標識(可用于標識當前會話)System.out.println("消費標識: " + consumerTag);// 消息元數據(包含交換機、路由鍵、消息ID等)System.out.println("元數據: " + envelope);// 消息屬性(如發送時間、優先級等)System.out.println("屬性: " + properties);// 消息內容System.out.println("接收內容: " + new String(body, "UTF-8"));}};// 5. 開始消費消息/*** 參數說明:* 1. queue:隊列名稱* 2. autoAck:是否自動確認消息(消費成功后自動通知MQ刪除消息)* 3. consumer:消息處理回調*/channel.basicConsume(QUEUE_NAME, true, consumer);}
}
四、關鍵知識點解析
-
連接配置要點
- 虛擬主機(VirtualHost):用于隔離不同項目的消息隊列,生產者和消費者必須使用相同虛擬主機
- 端口說明:5672是AMQP協議端口,15672是管理界面端口
-
隊列聲明參數
- 持久化(durable):僅隊列本身持久化,消息需額外設置持久化屬性
- 獨占性(exclusive):通常用于臨時隊列,生產環境一般設為false
- 自動刪除(autoDelete):當最后一個消費者斷開后自動清理隊列,適合臨時任務
-
消息確認機制
- 代碼中使用
autoAck=true
(自動確認):消息被接收后立即從隊列刪除 - 生產環境建議使用
autoAck=false
(手動確認):確保消息被正確處理后再刪除,避免消息丟失
- 代碼中使用
五、測試步驟
-
啟動RabbitMQ服務:
rabbitmq-server start
-
訪問管理界面(http://localhost:15672),確認虛擬主機
/dev
已創建 -
先運行消費者(Recv):啟動后進入監聽狀態
-
再運行生產者(Send):發送消息后控制臺輸出發送成功
-
查看消費者控制臺:成功接收并打印消息詳情
六、總結
通過本文實戰,我們掌握了RabbitMQ簡單隊列的核心用法:
- 生產者通過信道發送消息到隊列
- 消費者通過回調函數異步處理消息
- 虛擬主機和隊列參數的正確配置是通信基礎
簡單隊列適合一對一的消息通信場景,下一篇我們將講解工作隊列(Work Queue),敬請關注!
覺得有用請點贊收藏!
如果有相關問題,歡迎評論區留言討論~