RabbitMQ 的介紹與使用

一. 簡介

1> 什么是MQ

消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。
其主要用途:不同進程Process/線程Thread之間通信。

那么為什么會產生消息隊列呢?有幾個原因:

  • 不同進程(process)之間傳遞消息時,兩個進程之間耦合程度過高,改動一個進程,引發必須修改另一個進程,為了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),所有兩進程之間傳遞的消息,都必須通過消息隊列來傳遞,單獨修改某一個進程,不會影響另一個;

  • 不同進程(process)之間傳遞消息時,為了實現標準化,將消息的格式規范化了,并且,某一個進程接受的消息太多,一下子無法處理完,并且也有先后順序,必須對收到的消息進行排隊,因此誕生了事實上的消息隊列;

MQ框架非常之多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ。本文主要介紹RabbitMq。

2> 什么是RabbitMQ

RabbitMQ 是一個消息代理:它接受和轉發消息。您可以將其視為郵局:當您將要寄的郵件放入郵箱時,您可以確信信使最終會將郵件發送給您的收件人。在本例中,RabbitMQ 是郵箱、郵局和信使。

RabbitMQ 與郵局的主要區別在于它不處理紙張,而是接受、存儲和轉發二進制數據塊——消息

RabbitMQ 和一般意義上的消息傳遞使用了一些術語。

  • 生產_僅僅意味著發送。發送消息的程序稱為_生產者

  • _隊列_是 RabbitMQ 中郵箱的名稱。雖然消息會流經 RabbitMQ 和您的應用程序,但它們只能存儲在_隊列_中。_隊列_僅受主機內存和磁盤限制的約束,它本質上是一個大型消息緩沖區。許多_生產者_可以發送消息到一個隊列,并且許多_消費者_可以嘗試從一個_隊列_中接收數據。這就是我們表示隊列的方式

其是實現 AMQP(高級消息隊列協議)的消息中間件的一種,最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 RabbitMQ 主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者無法快速消費,那么需要一個中間層。保存這個數據。
AMQP,即 Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ 是一個開源的 AMQP 實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

3> 相關概念

通常我們談到隊列服務,會有三個概念:發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上,多做了一層抽象,在發消息者和隊列之間,加入了交換器 (Exchange)。這樣發消息者和隊列就沒有直接聯系,轉而變成發消息者把消息給交換器,交換器根據調度策略再把消息給隊列。那么,其中比較重要的概念有4個,分別為:虛擬主機,交換機,隊列,和綁定。

  • 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。為什么需要多個虛擬主機呢?很簡單, RabbitMQ 當中,用戶只能在虛擬主機的 粒度進行權限控制。 因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ 服務器 都有一個默認的虛擬主機“/”。
  • 交換機:Exchange 用于轉發消息,但是它不會做存儲 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的 消息。這里有一個比較重要的概念:路由鍵。消息到交換機的時候,交互機會轉發到對應的隊列中,那么究竟轉發到哪個隊列,就要根據
    該路由鍵。
  • 綁定:也就是交換機需要和隊列相綁定,這其中如上圖所示,是多對多的關系。

二. 實現

Spring Boot 集成 RabbitMQ

Spring Boot 集成 RabbitMQ 非常簡單,如果只是簡單的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 項目對消息各種支持。

1. 簡單使用
1>配置 pom 包,主要是添加 spring-boot-starter-amqp 的支持
	<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2>配置文件application.yml

配置 RabbitMQ 的安裝地址、端口以及賬戶信息

# 配置文件
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: rootpassword: 123456# RabbitMQ配置rabbitmq:host: 192.168.146.1port: 5672username: adminpassword: 123456

我這里還配置了數據庫

3>隊列配置
package com.nianxi.mybatisplus.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue Queue() {return new Queue("hello");}
}
4>發送者

rabbitTemplate 是 Spring Boot 提供的默認實現

package com.nianxi.mybatisplus.mapper;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class HelloSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hello " + new Date();System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("hello", context);}
}
5>接收者
package com.nianxi.mybatisplus.mapper;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver  : " + hello);}
}
6> 測試
package com.nianxi.mybatisplus;import com.nianxi.mybatisplus.mapper.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RabbitMqHelloTest {@Autowiredprivate HelloSender helloSender;@Testpublic void hello() throws Exception {helloSender.send();}
}

注意:發送者和接收者的 queue name 必須一致,不然不能接收

2.RabbitTemplate

**RabbitTemplate**是SpringAMQP提供的一個高級消息操作模板,**用于在與RabbitMQ進行交互時進行消息的發送和接收操作。**它是對底層AMQP協議的封裝,簡化了與RabbitMQ的交互過程, 是SpringAMQP中的核心類,提供聲明式方式處理RabbitMQ,包括發送和接收消息、消息轉換、屬性設置及回調機制。通過配置和正確使用,簡化了RabbitMQ的集成與操作。

1> 發送消息

**RabbitTemplate**提供了多種發送消息的方法,包括同步發送和異步發送。通過指定交換機、路由鍵和消息體,我們可以將消息發送到 RabbitMQ 服務器上的指定位置。此外,RabbitTemplate還支持消息的確認機制,以確保消息被成功發送和接收。

rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);
2> 接收消息

除了發送消息外,**RabbitTemplate**還提供了接收消息的功能。通過調用相關方法,我們可以從指定的隊列中接收消息,并進行相應的處理。這通常涉及到監聽隊列、處理消息和確認消息接收等步驟。

Message receivedMessage = rabbitTemplate.receive("queueName");
MyMessage myMessage = rabbitTemplate.receiveAndConvert("queueName", MyMessage.class);
3> 消息轉換

**RabbitTemplate支持消息的自動轉換。這意味著我們可以將 Java 對象作為消息體發送,而RabbitTemplate會自動將其轉換為可序列化的格式(如 JSON 或 XML)。同樣地,當從隊列中接收消息時,RabbitTemplate**也可以自動將消息體轉換回 Java 對象。

Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
rabbitTemplate.setMessageConverter(messageConverter);
4> 消息屬性設置

在發送消息時,我們可以設置各種消息屬性,如消息的優先級、持久化標志、過期時間等。這些屬性可以通過**MessageProperties對象進行設置,并在發送消息時傳遞給RabbitTemplate**。

import org.springframework.amqp.core.Message;  
import org.springframework.amqp.core.MessageBuilder;  
import org.springframework.amqp.core.MessageProperties;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Service;  @Service  
public class MessageSender {  @Autowired  private RabbitTemplate rabbitTemplate;  public void sendMessage(String exchange, String routingKey, String message, int priority, boolean persistent, int ttl) {  // 創建MessageProperties  MessageProperties properties = new MessageProperties();  // 設置優先級,值范圍0-9,其中0為最低優先級,9為最高優先級  properties.setPriority(priority);  // 設置消息持久化  properties.setDeliveryMode(persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);  // 設置消息的過期時間,單位為毫秒  properties.setExpiration(String.valueOf(ttl));  // 使用MessageBuilder構建Message對象  Message msg = MessageBuilder.withBody(message.getBytes())  .setContentEncoding("UTF-8")  .setContentType("text/plain")  .setMessageId(UUID.randomUUID().toString()) // 可選,設置消息ID  .setTimestamp(new Date()) // 可選,設置時間戳  .setHeaders(Collections.singletonMap("x-custom-header", "value")) // 可選,設置自定義頭  .andProperties(properties)  .build();  // 發送消息  rabbitTemplate.convertAndSend(exchange, routingKey, msg);  }  
}
5> 回調機制

**RabbitTemplate**支持發送消息時的回調機制。這意味著在發送消息后,我們可以注冊一個回調函數來處理發送結果或接收響應。這對于需要異步處理發送結果或接收響應的場景非常有用。

**setConfirmCallback方法是RabbitTemplate**類中的一個回調方法,用于處理消息的確認(acknowledgment)結果。當消息成功發送到RabbitMQ的交換機時,會觸發確認回調,你可以在該回調中處理相應的邏輯。

  • correlationData:關聯數據,可以是任意類型的對象,通常用于唯一標識消息。

  • ack:布爾值,表示消息是否成功發送到交換機。true表示成功,false表示失敗。

  • cause:失敗的原因,當ackfalse時,此參數會提供一個可選的異常信息。

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
    // 消息發送成功
    System.out.println(“Message sent successfully”);
    } else {
    // 消息發送失敗,進行處理
    System.out.println("Message sent failed: " + cause);
    }
    });

6> 異步消息處理

RabbitTemplate支持異步消息處理,你可以注冊ConfirmCallbackReturnCallback來處理消息的確認和返回結果。ConfirmCallback用于確認消息是否成功發送到交換機,ReturnCallback用于處理無法路由到隊列的消息。

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息發送成功} else {// 消息發送失敗,進行處理}
});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 處理無法路由到隊列的消息
});
3.使用 RabbitTemplate 的步驟
1> 配置 RabbitTemplate

在使用**RabbitTemplate**之前,我們需要對其進行配置。這通常涉及到設置連接工廠、交換機、隊列和綁定等。這些配置可以通過 XML 配置或 Java 配置完成。

2> 創建 RabbitTemplate 實例

一旦配置完成,我們可以創建一個**RabbitTemplate**實例。這個實例將使用我們提供的配置來與 RabbitMQ 服務器進行交互。

3> 發送消息

使用**RabbitTemplate**的發送方法,我們可以將消息發送到指定的交換機和路由鍵。我們可以指定消息體、消息屬性和其他發送選項。

4> 接收消息

要接收消息,我們可以使用**RabbitTemplate**的接收方法或結合監聽器來監聽指定的隊列。當消息到達時,我們可以處理消息并執行相應的業務邏輯。

5> 處理異常和錯誤

在使用**RabbitTemplate**時,我們還需要考慮異常和錯誤處理。例如,當發送消息失敗或接收消息時發生異常時,我們需要有相應的處理機制來確保系統的穩定性和可靠性。

4.RabbitTemplate 的優勢與注意事項
優勢
  1. 簡化操作RabbitTemplate封裝了底層細節,使得開發者能夠專注于業務邏輯的實現,而無需關心底層的消息傳輸細節。
  2. 靈活性RabbitTemplate提供了豐富的配置選項和擴展點,使得開發者能夠根據實際需求進行定制和優化。
  3. 性能優化RabbitTemplate內部進行了性能優化,如連接池管理、消息緩存等,以提高消息傳輸的效率和可靠性。
注意事項
  1. 配置正確性:確保RabbitTemplate的配置正確無誤,包括連接工廠、交換機、隊列和綁定等的設置。錯誤的配置可能導致消息無法正確發送或接收。
  2. 異常處理:在使用RabbitTemplate時,要充分考慮異常處理機制,確保在發生異常時能夠及時發現并處理。
  3. 資源釋放:在使用完RabbitTemplate后,要確保釋放相關資源,如關閉連接、釋放連接池中的連接等,以避免資源泄漏和性能問題。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/70876.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/70876.shtml
英文地址,請注明出處:http://en.pswp.cn/web/70876.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

常用的AI文本大語言模型匯總

AI文本【大語言模型】 1、文心一言https://yiyan.baidu.com/ 2、海螺問問https://hailuoai.com/ 3、通義千問https://tongyi.aliyun.com/qianwen/ 4、KimiChat https://kimi.moonshot.cn/ 5、ChatGPThttps://chatgpt.com/ 6、魔塔GPT https://www.modelscope.cn/studios/iic…

在自己的數據上復現一下LlamaGen

git倉庫&#xff1a;https://github.com/FoundationVision/LlamaGen 數據集準備 如果用ImageFolder讀取&#xff0c;則最好和ImageNet一致。 data_path/class_1/image_001.jpgimage_002.jpg...class_2/image_003.jpgimage_004.jpg......class_n/image_005.jpgimage_006.jpg.…

Go入門之接口

type Usber interface {start()stop() } type Phone struct {Name string }func (p Phone) start() {fmt.Println(p.Name, "啟動") } func (p Phone) stop() {fmt.Println(p.Name, "關機") } func main() {p : Phone{Name: "華為手機",}var p1 U…

【數據結構進階】哈希表

&#x1f31f;&#x1f31f;作者主頁&#xff1a;ephemerals__ &#x1f31f;&#x1f31f;所屬專欄&#xff1a;數據結構 目錄 前言 一、哈希表的概念 二、哈希函數的實現方法 1. 直接定址法 2. 除留余數法 三、哈希沖突 1. 開放定址法&#xff08;閉散列&#xff0…

《深度學習實戰》第4集:Transformer 架構與自然語言處理(NLP)

《深度學習實戰》第4集&#xff1a;Transformer 架構與自然語言處理&#xff08;NLP&#xff09; 在自然語言處理&#xff08;NLP&#xff09;領域&#xff0c;Transformer 架構的出現徹底改變了傳統的序列建模方法。它不僅成為現代 NLP 的核心&#xff0c;還推動了諸如 BERT、…

高效管理 React 狀態和交互:我的自定義 Hooks 實踐

高效管理 React 狀態和交互&#xff1a;自定義 Hooks 實踐 在 React 中&#xff0c;Hooks 是一種使我們能夠在函數組件中使用狀態和副作用的強大工具。隨著項目的增大&#xff0c;重復的邏輯可能會出現在多個組件中&#xff0c;這時使用自定義 Hooks 就非常合適。它們幫助我們…

Exoplayer(MediaX)實現音頻變調和變速播放

在K歌或錄音類應用中變調是個常見需求&#xff0c;比如需要播出蘿莉音/大叔音等。變速播放在影視播放類應用中普遍存在&#xff0c;在傳統播放器Mediaplayer中這兩個功能都比較難以實現&#xff0c;特別在低版本SDK中&#xff0c;而Exoplayer作為google官方推出的Mediaplayer替…

Meta最新研究:從單張照片到3D數字人的革命性突破

隨著人工智能技術的發展,3D建模和虛擬人物生成逐漸變得更加普及和高效。Meta(前身為Facebook)的最新研究成果展示了如何僅通過一張普通手機拍攝的照片就能生成高質量、全方位的3D數字人。這項技術不僅適用于虛擬試衣、游戲角色建模,還能廣泛應用于AR/VR內容生成等領域。本文…

軟件供應鏈安全工具鏈研究系列——RASP自適應威脅免疫平臺(上篇)

1.1 基本能力 RASP是一種安全防護技術&#xff0c;運行在程序執行期間&#xff0c;使程序能夠自我監控和識別有害的輸入和行為。也就是說一個程序如果注入或者引入了RASP技術&#xff0c;那么RASP就和這個程序融為一體&#xff0c;使應用程序具備了自我防護的能力&#xff0c;…

2025-02-27 學習記錄--C/C++-PTA 7-29 刪除字符串中的子串

合抱之木&#xff0c;生于毫末&#xff1b;九層之臺&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; 一、題目描述 ?? 二、代碼&#xff08;C語言&#xff09;?? #include <stdio.h> // 引入標準輸入輸出庫&#xff0c…

Redis---字符串SDS(簡單動態字符串)底層結構

文章目錄 什么是SDS&#xff08;簡單動態字符串&#xff09;SDS結構SDS的優點O(1) 時間復雜度獲取字符串長度避免緩沖區溢出減少內存重分配次數二進制安全兼容C語言字符串函數 SDS的操作總結 什么是SDS&#xff08;簡單動態字符串&#xff09; redis是由C語言編寫的&#xff0…

Elasticsearch:使用阿里云 AI 服務進行嵌入和重新排名

作者&#xff1a;來自 Elastic Toms Mura 將阿里云 AI 服務功能與 Elastic 結合使用。 更多閱讀&#xff0c;請參閱 “Elasticsearch&#xff1a;使用阿里 infererence API 及 semantic text 進行向量搜索”。 在本文中&#xff0c;我們將介紹如何將阿里云 AI 功能與 Elastics…

Spring Cloud Alibaba與Spring Boot、Spring Cloud版本對應關系

一、前言 在搭建SpringCloud項目環境架構的時候&#xff0c;需要選擇SpringBoot和SpringCloud進行兼容的版本號&#xff0c;因此對于選擇SpringBoot版本與SpringCloud版本的對應關系很重要&#xff0c;如果版本關系不對應&#xff0c;常見的會遇見項目啟動不起來&#xff0c;怪…

【含文檔+PPT+源碼】基于過濾協同算法的旅游推薦管理系統設計與實現

項目介紹 本課程演示的是一款基于過濾協同算法的旅游推薦管理系統設計與實現&#xff0c;主要針對計算機相關專業的正在做畢設的學生與需要項目實戰練習的 Java 學習者。 1.包含&#xff1a;項目源碼、項目文檔、數據庫腳本、軟件工具等所有資料 2.帶你從零開始部署運行本套…

MTK Android12 預裝apk可卸載

文章目錄 需求解決方法1、device/mediatek/mt6761/device.mk2、/vendor/mediatek/proprietary/frameworks/base/data/etc/pms_sysapp_removable_vendor_list.txt3、路徑&#xff1a;4、Android.mk 需求 近期&#xff0c;客戶需要預裝一個apk&#xff0c;同時該apk要可卸載。解…

從 0 到 1,用 Python 構建超實用 Web 實時聊天應用

從 0 到 1&#xff0c;用 Python 構建超實用 Web 實時聊天應用 本文深入剖析如何運用 Python 的 Flask 框架與 SocketIO 擴展&#xff0c;搭建一個功能完備的 Web 實時聊天應用。從環境搭建、前后端代碼實現&#xff0c;到最終運行展示&#xff0c;逐步拆解關鍵步驟&#xff0…

視頻字幕識別和翻譯

下載的視頻很多不是漢語的&#xff0c;我們需要用剪映將語音識別出來作為字幕壓制到視頻中去。 剪映6.0以后語音識別需要收費&#xff0c;但是低版本還是沒有問題。 如果想要非漢語字幕轉成中文&#xff0c;剪映低版本不提供這樣功能。但是&#xff0c;用剪映導出識別字幕&am…

每天一個Flutter開發小項目 (4) : 構建收藏地點應用 - 深入Flutter狀態管理

引言 歡迎回到 每天一個Flutter開發小項目 系列博客!在前三篇博客中,我們從零開始構建了計數器應用、待辦事項列表應用,以及簡易天氣應用。您不僅掌握了 Flutter 的基礎組件和布局,還學習了網絡請求、JSON 解析等實用技能,更重要的是,我們一起探討了高效的 Flutter 學習…

Visual Studio打開文件后,中文變亂碼的解決方案

文件加載 使用Unicode&#xff08;UTF-8&#xff09;編碼加載文件 C:\WorkSpace\Assets\Scripts\UI\View\ExecuteComplateView.cs時&#xff0c;有些字節已用Unicode替換字符替換。保存該文件將不會保留原始文件內容。

OpenGL ES -> GLSurfaceView繪制點、線、三角形、正方形、圓(頂點法繪制)

XML文件 <?xml version"1.0" encoding"utf-8"?> <com.example.myapplication.MyGLSurfaceViewxmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"…