#RabbitMQ# 消息隊列入門

目錄

一 MQ技術選型

1 運行rabbitmq

2 基本介紹

3 快速入門

1 交換機負責路由消息給隊列

2 數據隔離

二 Java客戶端

1 快速入門

2 WorkQueue

3 FanOut交換機

4 Direct交換機

5 Topic交換機

*6 聲明隊列交換機

1 在配置類當中聲明

2 使用注解的方式指定

7 消息轉換器


*前景引入

維度異步通訊同步通訊RabbitMQ 的定位
交互方式通過中間件間接通信,無阻塞等待直接通信,需實時響應作為異步通訊的核心載體,支持消息緩存與路由
耦合度低(生產者和消費者解耦)高(調用方依賴被調用方可用性)通過隊列解耦系統,提升容錯性
適用場景高并發、耗時任務、事件驅動架構實時性要求高的簡單交互天然適合異步場景,也可通過 RPC 支持同步需求
性能與擴展性高吞吐,支持水平擴展受限于實時響應能力通過集群、負載均衡優化異步性能

一 MQ技術選型

MQ(message Queue)消息隊列,字面來看就是存放消息的隊列。也就是異步調用中的Broke。

1 運行rabbitmq

在虛擬機上安裝Docker_虛擬機安裝docker-CSDN博客

拉取鏡像

  • docker pull rabbitmq:3-management

在容器當中運行

  • docker run ...

借助端口訪問

2 基本介紹

核心概念總結

角色作用類比
Publisher發送消息的程序寄信人
Exchange按規則將消息分發到隊列郵局分揀員
Queue存儲消息的容器郵箱
Consumer從隊列取消息并處理的程序收信人
Virtual Host隔離不同業務的消息環境(如測試、生產)郵局內的獨立部門

3 快速入門

1 交換機負責路由消息給隊列

添加成功

找到一臺交換機

需要添加綁定隊列從而實現路由給隊列

消息路由成功

2 數據隔離

RabbitMQ 中的 虛擬主機(vhost) 可以用一個簡單的比喻來理解:它就像一臺大型服務器中的“獨立房間”,每個房間都有自己的門禁系統、家具和規則,互不干擾。以下是它的核心作用:

實現:

先添加一個用戶

現在這個用戶還沒有虛擬主機,這里其是無法訪問之前創建的隊列,是與之前的虛擬主機隔離開的

現在退出原先的用戶,以剛剛創建的用戶信息登錄,然后添加一個虛擬主機

現在就可以在現在的用戶之下的虛擬主機上創建新的隊列

二 Java客戶端

1 快速入門

實現:

1 導入spring-amqp依賴

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2 添加隊列

3 配置MQ地址

4 發送消息

    @Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {String queueName = "simple.queue1";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}

5 隊列

6 在消費者的相關方法中定義

    @RabbitListener(queues = "simple.queue1")public void listenSimpleQueue(String msg) {System.out.println("消費者收到了simple.queue的消息:【" + msg + "】");}

7 然后將項目啟動,再在測試類中發送消息,控制臺會實時監控到發送的消息

8 隊列當中的消息拿出來在控制臺里面就沒有消息了

2 WorkQueue

任務模型:簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列當中的消息。

一個隊列多個消費者,可以緩解消息堆積問題。

1 配置項

2 不寫的話(默認一人一半,處理不完在隊列里等待)

3 新增一個隊列

4 兩個消費者(消費能力不同,消費能力相同應該是輪詢消費)

    @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費者1 收到了 work.queue的消息:【" + msg + "】");Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費者2 收到了 work.queue的消息...... :【" + msg + "】");Thread.sleep(200);}

5 生產者

    @Testvoid testWorkQueue() throws InterruptedException {String queueName = "work.queue";for (int i = 1; i <= 50; i++) {String msg = "hello, worker, message_" + i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}

6 測試

3 FanOut交換機

真正生產環境都會經過exchange來發送消息,而不是直接發送到隊列,交換機的類型有以下三種

Fanout模式會將接受到的消息廣播到跟其綁定的每一個隊列,廣播模式。

例子

1 先將隊列聲明好

2 再聲明交換機同時與隊列綁定

3 消費者

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) throws InterruptedException {System.out.println("消費者1 收到了 fanout.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) throws InterruptedException {System.out.println("消費者2 收到了 fanout.queue2的消息:【" + msg + "】");}

4 生產者

    @Testvoid testSendFanout() {String exchangeName = "hmall.fanout";String msg = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, null, msg);}

測試結果:

為什么第二個參數是?null

在你的代碼中,第二個參數是 null,這是為了配合 Fanout 交換機 的特性。以下是關鍵點:

Fanout 交換機的特性

  • Fanout 交換機(也稱為廣播交換機)會將消息?無條件廣播到所有綁定到該交換機的隊列完全忽略路由鍵
  • 因此,在使用 Fanout 交換機時,路由鍵(routingKey)可以設為?null,因為交換機不會使用它來決定消息的路由規則。

4 Direct交換機

這種交換機可以實現與Fanout交換機相同的效果同時也可以實現定向的效果。

需求

1 創建隊列與交換機

(交換機需要給routingKey值)

2 消費者

@RabbitListener(queues = "direct.queue1") // 直接監聽名為 direct.queue1 的隊列
public void listenDirectQueue1(String msg) {System.out.println("消費者1 收到了 direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2") // 直接監聽名為 direct.queue2 的隊列
public void listenDirectQueue2(String msg) {System.out.println("消費者2 收到了 direct.queue2的消息:【" + msg + "】");
}

3 生產者

    @Testvoid testSendDirect() {String exchangeName = "hmall.direct";String msg = "藍色通知,警報解除,哥斯拉是放的氣球";rabbitTemplate.convertAndSend(exchangeName, "blue", msg);}

測試:

發送的路由鍵接收隊列觸發的消費者
reddirect.queue1, direct.queue2消費者1 + 消費者2
bluedirect.queue1消費者1
yellowdirect.queue2消費者2

可以根據需求更改生產者的代碼邏輯:

5 Topic交換機

Topic 交換機是 RabbitMQ 中基于模式匹配的路由機制,允許通過通配符(*?和?#)實現靈活的路由規則。

需求

實現:

聲明隊列和交換機

消費者

    @RabbitListener(queues = "topic.queue1")public void listenTopicQueue1(String msg) throws InterruptedException {System.out.println("消費者1 收到了 topic.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2(String msg) throws InterruptedException {System.out.println("消費者2 收到了 topic.queue2的消息:【" + msg + "】");}

生產者

    @Testvoid testSendTopic() {String exchangeName = "hmall.topic";String msg = "今天天氣挺不錯,我的心情的挺好的";rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);}

測試:可以根據需求修改發送的RoutingKey

Direct交換機與Topic的差異

特性Direct 交換機Topic 交換機
路由鍵匹配方式精確匹配(完全一致)模式匹配(支持通配符?*?和?#
靈活性低(適合簡單路由)高(適合復雜路由場景)
典型場景訂單狀態變更、任務分發日志分類、多維度消息分發

*6 聲明隊列交換機

為了改善在控制臺創建隊列交換機的笨重,可以使用相關接口

聲明隊列和交換機

實現:

1 在配置類當中聲明

Fanout的

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {// fanoutExchange 定義交換機@Beanpublic FanoutExchange fanoutExchange(){// ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("hmall.fanout2");}//  queue 創建隊列@Beanpublic Queue fanoutQueue3(){// QueueBuilder.durable("ff").build();//持久化return new Queue("fanout.queue3");}// 綁定隊列和交換機@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}// 創建隊列@Beanpublic Queue fanoutQueue4(){return new Queue("fanout.queue4");}// 綁定隊列和交換機@Beanpublic Binding fanoutBinding4(){return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}
}

Direct的

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;// @Configuration
public class DirectConfiguration {//   定義交換機@Beanpublic DirectExchange directExchange() {return new DirectExchange("hmall.direct");}//   創建隊列@Beanpublic Queue directQueue1() {return new Queue("direct.queue1");}// 隊列與交換機進行綁定@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}//  隊列與交換機進行綁定@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}//   創建隊列@Beanpublic Queue directQueue2() {return new Queue("direct.queue2");}//  隊列與交換機進行綁定@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}//  隊列與交換機進行綁定@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}

2 使用注解的方式指定

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg) throws InterruptedException {System.out.println("消費者1 收到了 direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg) throws InterruptedException {System.out.println("消費者2 收到了 direct.queue2的消息:【" + msg + "】");}

通過使用?@RabbitListener?的?bindings?+?@QueueBinding?注解的方式,不需要手動創建隊列、交換機或綁定關系

  1. 檢查資源是否存在
    Spring 會通過?RabbitAdmin?組件向 RabbitMQ 服務器發起檢查,確認隊列、交換機是否已存在。

  2. 自動創建缺失的資源

    • 若隊列?direct.queue1?或?direct.queue2?不存在,會根據?@Queue?注解的配置(如?namedurable自動創建隊列

    • 若交換機?hmall.direct?不存在,會根據?@Exchange?注解的配置(如?nametype自動創建交換機

  3. 自動綁定隊列到交換機
    根據?key?指定的路由鍵,將隊列與交換機綁定(如?direct.queue1?綁定?red?和?blue?路由鍵)。

7 消息轉換器

使用

1.?SimpleMessageConverter(默認)

  • 行為

    • 支持?Stringbyte[]Serializable?對象。

    • 若消息是?Serializable?對象,使用 Java 原生序列化。

  • 問題

    • 強耦合:發送方和接收方必須有相同的類路徑(否則反序列化失敗)。

    • 安全性差:Java 原生序列化易受攻擊(如反序列化漏洞)。

2.?Jackson2JsonMessageConverter(推薦)

  • 行為

    • 將對象轉換為 JSON 字符串,再轉為?byte[]

    • 反序列化時,將 JSON 還原為對象(需指定目標類型)。

  • 優勢

    • 跨語言兼容:JSON 是通用格式,非 Java 客戶端也可解析。

    • 松耦合:不強制要求發送方和接收方的類路徑一致。

    • 安全性高:避免 Java 原生序列化漏洞

1 依賴引入

        <!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>

2 Bean的創建

    // 消息轉換器@Beanpublic MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}

3 消費者

    @RabbitListener(queues = "object.queue")public void listenObject(Map<String, Object> msg) throws InterruptedException {System.out.println("消費者 收到了 object.queue的消息:【" + msg + "】");}

4 生產者

    @Testvoid testSendObject() {Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);rabbitTemplate.convertAndSend("object.queue", msg);}

5 在實際業務當中的使用

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/81303.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/81303.shtml
英文地址,請注明出處:http://en.pswp.cn/web/81303.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【深度學習】多目標融合算法(六):漸進式分層提取模型PLE(Progressive Layered Extraction)

目錄 一、引言 二、PLE&#xff08;Progressive Layered Extraction&#xff0c;漸進式分層提取模型&#xff09; 2.1 技術原理 2.2 技術優缺點 2.3 業務代碼實踐 2.3.1 業務場景與建模 2.3.2 模型代碼實現 2.3.3 模型訓練與推理測試 2.3.4 打印模型結構 三、總結 一…

【Java開發日記】如何使用Java開發在線生成 pdf 文檔

一、介紹 在實際的業務開發的時候&#xff0c;研發人員往往會碰到很多這樣的一些場景&#xff0c;需要提供相關的電子憑證信息給用戶&#xff0c;例如網銀&#xff0f;支付寶&#xff0f;微信購物支付的電子發票、訂單的庫存打印單、各種電子簽署合同等等&#xff0c;以方便用…

Oracle 11g 單實例使用+asm修改主機名導致ORA-29701 故障分析

解決 把服務器名修改為原來的&#xff0c;重啟服務器。 故障 建表空間失敗。 分析 查看告警日志 ORA-1119 signalled during: create tablespace splex datafile ‘DATA’ size 2000M… Tue May 20 18:04:28 2025 create tablespace splex datafile ‘DATA/option/dataf…

消息隊列的使用

使用內存隊列來處理基于內存的【生產者-消費者】場景 思考和使用Disruptor Disruptor可以實現單個或多個生產者生產消息&#xff0c;單個或多個消費者消息&#xff0c;且消費者之間可以存在消費消息的依賴關系 使用Disruptor需要結合業務特性&#xff0c;設計要靈活 什么業務…

《帝國時代1》游戲秘籍

資源類 PEPPERONI PIZZA&#xff1a;獲得 1000 食物。COINAGE&#xff1a;獲得 1000 金。WOODSTOCK&#xff1a;獲得 1000 木頭。QUARRY&#xff1a;獲得 1000 石頭。 建筑與生產類 STEROIDS&#xff1a;快速建筑。 地圖類 REVEAL MAP&#xff1a;顯示所有地圖。NO FOG&#xf…

使用JSP踩過的坑

雖然說jsp已經過時了&#xff0c;但是有時維護比較老的項目還是需要的。 下面說下&#xff0c;我使用jsp踩過的坑&#xff1a; 1.關于打印輸出 在jsp中輸出使用 out.println("hello");而不是 System.out.println("hello");如果在定義函數部分需要打印…

redis集群創建時手動指定主從關系的方法

適用場景&#xff1a; 創建主從關系時默認參數 --cluster-replicas 1 會自動分配從節點。 為了能精確控制 Redis Cluster 的主從拓撲結構&#xff0c;我們通過 Redis Cluster 的手動分片功能來實現 一、手動指定主從關系的方法 使用 redis-cli --cluster-replicas 0 先創建純…

ROS合集(七)SVIn2聲吶模塊分析

文章目錄 一、整體思想二、具體誤差建模流程三、總結明確&#xff08;預測值與觀測值&#xff09;四、選點邏輯五、Sonar 數據處理流水線1. ROS Launch 配置&#xff08;imagenex831l.launch&#xff09;2. SonarNode 節點&#xff08;sonar_node.py&#xff09;3. Subscriber …

Python爬蟲實戰:研究PySpider框架相關技術

1. 引言 1.1 研究背景與意義 網絡爬蟲作為互聯網數據采集的重要工具,在信息檢索、輿情分析、市場調研等領域發揮著重要作用。隨著互聯網信息的爆炸式增長,如何高效、穩定地獲取所需數據成為了一個關鍵挑戰。PySpider 作為一款功能強大的 Python 爬蟲框架,提供了豐富的功能…

《大模型開源與閉源的深度博弈:科技新生態下的權衡與抉擇》

開源智能體大模型的核心魅力&#xff0c;在于它構建起了一個全球開發者共同參與的超級協作網絡。想象一下&#xff0c;來自世界各個角落的開發者、研究者&#xff0c;無論身處繁華都市還是偏遠小鎮&#xff0c;只要心懷對技術的熱愛與追求&#xff0c;就能加入到這場技術狂歡中…

大數據模型對陌生場景圖像的識別能力研究 —— 以 DEEPSEEK 私有化部署模型為例

摘要 本研究聚焦于已訓練的大數據模型能否識別未包含在樣本數據集中的陌生場景圖像這一問題&#xff0c;以 DEEPSEEK 私有化部署模型為研究對象&#xff0c;結合機器學習理論&#xff0c;分析模型識別陌生場景圖像的影響因素&#xff0c;并通過理論探討與實際應用場景分析&…

STM32——從點燈到傳感器控制

STM32基礎外設開發&#xff1a;從點燈到傳感器控制 一、前言 本篇文章總結STM32F10x系列基礎外設開發實例&#xff0c;涵蓋GPIO控制、按鍵檢測、傳感器應用等。所有代碼基于標準庫開發&#xff0c;適合STM32初學者參考。 二、硬件準備 STM32F10x系列開發板LED模塊有源蜂鳴器…

[特殊字符] 使用增量同步+MQ機制將用戶數據同步到Elasticsearch

在開發用戶搜索功能時&#xff0c;我們通常會將用戶信息存儲到 Elasticsearch&#xff08;簡稱 ES&#xff09; 中&#xff0c;以提高搜索效率。本篇文章將詳細介紹我們是如何實現 MySQL 到 Elasticsearch 的增量同步&#xff0c;以及如何通過 MQ 消息隊列實現用戶信息實時更新…

MyBatis緩存機制全解析

在MyBatis中&#xff0c;緩存分為一級緩存和二級緩存&#xff0c;它們的主要目的是減少數據庫的訪問次數&#xff0c;提高查詢效率。下面簡述這兩種緩存的工作原理&#xff1a; 一、 一級緩存&#xff08;SqlSession級別的緩存&#xff09; 一級緩存是MyBatis默認開啟的緩存機…

【短距離通信】【WiFi】WiFi7關鍵技術之4096-QAM、MRU

目錄 3. 4096-QAM 3.1 4096-QAM 3.2 QAM 的階數越高越好嗎&#xff1f; 4. MRU 4.1 OFDMA 和 RU 4.2 MRU 資源分配 3. 4096-QAM 摘要 本章主要介紹了Wi-Fi 7引入的4096-QAM對數據傳輸速率的提升。 3.1 4096-QAM 對速率的提升 Wi-Fi 標準一直致力于提升數據傳輸速率&a…

【二刷力扣】【力扣熱題100】今天的題目是:283.移動零

題目&#xff1a; 給定一個數組 nums&#xff0c;編寫一個函數將所有 0 移動到數組的末尾&#xff0c;同時保持非零元素的相對順序。 請注意 &#xff0c;必須在不復制數組的情況下原地對數組進行操作。 示例 1: 輸入: nums [0,1,0,3,12] 輸出: [1,3,12,0,0] 示例 2: 輸…

機器學習中的多GPU訓練模式

文章目錄 一、數據并行&#xff08;Data Parallelism&#xff09;二、模型并行&#xff08;Model Parallelism&#xff09;1. 模型并行2. 張量并行&#xff08;Tensor Parallelism&#xff09; 三、流水線并行&#xff08;Pipeline Parallelism&#xff09;四、混合并行&#x…

《JavaScript 性能優化:從原理到實戰的全面指南》

《JavaScript 性能優化&#xff1a;從原理到實戰的全面指南》 一、JavaScript 性能優化基礎理論 在深入探討 JavaScript 性能優化技術之前&#xff0c;我們需要明白JavaScript 的執行機制和性能瓶頸產生的根本原因。JavaScript 是一種單線程、非阻塞的腳本語言&#xff0c;其…

選擇合適的Azure數據庫監控工具

Azure云為組織提供了眾多服務&#xff0c;使其能夠無縫運行應用程序、Web服務和服務器部署&#xff0c;其中包括云端數據庫部署。Azure數據庫能夠與云應用程序實現無縫集成&#xff0c;具備可靠、易擴展和易管理的特性&#xff0c;不僅能提升數據庫可用性與性能&#xff0c;同時…

9.4在 VS Code 中配置 Maven

在 VS Code 中配置 Maven 需要完成 Maven 環境安裝 一、安裝 Maven&#xff08;如果未安裝&#xff09; 下載 Maven 訪問 Apache Maven 官網&#xff0c;下載最新版本的 Maven&#xff08;如apache-maven-3.9.9-bin.zip&#xff09;。 解壓文件 將下載的 ZIP 文件解壓到本地目…