RabbitMQ個人理解與基本使用

目錄

一. 作用:

二. RabbitMQ的5中隊列模式:

1. 簡單模式

2. Work模式

3. 發布/訂閱模式

4. 路由模式

5. 主題模式

三. 消息持久化:

消息過期時間

ACK應答?

四. 同步接收和異步接收:

應用場景

五. 基本使用 :

引入依賴庫:

配置文件RabbitMQConfig:?

創建消息任務類:?

解析:


一. 作用:

????????RabbitMQ主要用于消息隊列的實現。

二. RabbitMQ的5中隊列模式:

1. 簡單模式

一個生產者(發送方)對應一個消費者(接收方)

2. Work模式

一個生產者對應多個消費者,但是只能有一個消費者獲得消息(排他)

3. 發布/訂閱模式

一個消費者將消息首先發送到fanout交換器,交換器綁定到多個隊列,然后與之對應的所有消費者都能接收到消息(不排他)

4. 路由模式

生產者將消息發送到direct交換器,交換器按照關鍵字(Key),把消息路由到某個隊列

5. 主題模式

生產者將消息發送到Topic交換器,交換器按照復雜的規則,把消息路由到某個隊列

三. 消息持久化:

????????消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息可靠性的呢?答案就是消息持久化。持久化可以防止在異常情況下丟失數據。除了消息持久化之外,甚至交換器和隊列都能持久化。也就是說rabbitmq的消息會被存儲在磁盤上,只有當消費收到消息,rabbitmq確認消費者收到消息(Acknowledgments--簡稱ACK)后才會將消息從隊列中刪除。??

  • 消息過期時間

? ? ? ? 如果消費者一直不接收消息,消息會一直保存在消息隊列當中,短期內可能不會有什么影響,但是如果經過長時間的積累后,消息會變得很多很多?,浪費大量的資源,內存。

? ? ? ? 為了應對這種情況,就可以對rabbitmq設置消息的過期時間,在規定時間內消息沒有被接收,就會刪除掉該消息。

  • ACK應答?

? ? ? ? 消費者接收到消息后,為了讓RabbitMQ?知道,就需要返回一個ACK應答,告訴RabbitMQ消費者已經收到了消息,如果收到消息后我們需要刪除該消息,只需要在ACK應答中加上deliveryTag標志位。

四. 同步接收和異步接收:

? ? ? ? 同步接收:指消費者調用方法時,會阻塞來等待消息,直到消息被成功消費或者隊列為空。(沒有消息等待消息再接著處理)。

? ? ? ? 異步接收:?指消費者不會在接收消息時阻塞,而是通過回調函數處理消息。消費者在等待消息的同時不會停下,可以處理其他任務。(當有消息時才來處理消息)。

  • 應用場景

????????同步接收 :當消息的處理順序對業務邏輯非常重要,就使用同步接收,消費者一次只處理一個消息,確保了每條消息的處理順序。

? ? ? ? 異步接收:當處理消息的時間比較長,或者系統的并發量大時,采用異步接收會更好。

RabbitMQ還有一個殺手锏——同時使用異步收發和同步收發。

五. 基本使用 :

引入依賴庫:

<dependency>
?? ?<groupId>com.rabbitmq</groupId>
? ? <artifactId>amqp-client</artifactId>
? ? <version>5.9.0</version>
</dependency>
<dependency>
?? ?<groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>?

配置文件RabbitMQConfig:?

import com.rabbitmq.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Value("${rabbitmq.factoryHost}")private String host;@Beanpublic ConnectionFactory connectionFactory() {ConnectionFactory factory = new ConnectionFactory();factory.setHost(host);factory.setPort(5672);return factory;}
}

?host配置,我將rabbitMQ放在虛擬機上的,所有ip是虛擬機的地址:

創建消息任務類:?

@Slf4j
@Component
public class MessageTask {@Autowiredprivate ConnectionFactory factory;@Autowiredprivate MessageService messageService;/** 同步發送消息* */public void send(String topic, MessageEntity entity) {//向MongoDB保存消息數據,返回消息IDString id = messageService.insertMessage(entity);//向RabbitMQ發送消息try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){//連接到某個topicchannel.queueDeclare(topic, true, false, false, null);HashMap header = new HashMap();header.put("messageId",id);//創建AMQP協議參與對象,添加附加屬性AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();channel.basicPublish("",topic,properties,entity.getMsg().getBytes());log.debug("消息發送成功");} catch (Exception e){log.error(e.getMessage());throw new EmosException("向MQ發送消息失敗");}}/** 異步發送消息* */@Async("AsyncTaskExecutor")public void sendAsync(String topic, MessageEntity entity) {send(topic, entity);}/** 同步接收消息* */public int receive(String topic) {int i = 0;try (//接收消息數據Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 從隊列中獲取消息,不自動確認channel.queueDeclare(topic, true, false, false, null);//Topic中有多少條數據未知,所以使用死循環接收數據,直到接收不到消息,退出死循環while (true) {//創建響應接收數據,禁止自動發送Ack應答GetResponse response = channel.basicGet(topic, false);if (response != null) {AMQP.BasicProperties properties = response.getProps();Map<String, Object> header = properties.getHeaders(); //獲取附加屬性對象String messageId = header.get("messageId").toString();byte[] body = response.getBody();//獲取消息正文String message = new String(body);log.debug("從RabbitMQ接收的消息:" + message);MessageRefEntity entity = new MessageRefEntity();entity.setMessageId(messageId);entity.setReceiverId(Integer.parseInt(topic));entity.setReadFlag(false);entity.setLastFlag(true);messageService.insertRef(entity); //把消息存儲在MongoDB中//數據保存到MongoDB后,才發送Ack應答,讓Topic刪除這條消息long deliveryTag = response.getEnvelope().getDeliveryTag();channel.basicAck(deliveryTag, false);i++;} else {break; //接收不到消息,則退出死循環}}} catch (Exception e) {log.error("執行異常", e);}return i;}/** 異步接收消息* */@Asyncpublic int receiveAsync(String topic) {return receive(topic);}/** 同步刪除消息* */public void deleteQueue(String topic) {try(//接收消息數據Connection connection = factory.newConnection();Channel channel = connection.createChannel()){channel.queueDelete(topic);log.debug("成功刪除消息隊列:"+topic);} catch (Exception e){log.error("刪除消息隊列失敗:",e);throw new EmosException("刪除消息隊列失敗");}}/** 異步刪除消息* */@Asyncpublic void deleteAsync(String topic) {deleteQueue(topic);}
}

解析:

channel.queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
  • queueName:隊列的名稱,用于標識消息的存儲位置。
  • durable:

????????true,表示隊列是持久化的。

????????false,表示隊列是非持久化的。

  • exclusive:

????????true:隊列僅供當前連接使用,連接斷開時隊列會自動刪除。

????????false:隊列可供多個連接共享。

  • autoDelete:
    true:當隊列不再被任何消費者訂閱時,隊列會自動刪除。
    false:隊列即使沒有消費者訂閱也會一直存在,直到手動刪除。

  • arguments:額外的參數,null表示沒有額外參數

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 60000); // 設置隊列中消息的過期時間為 60 秒(60000 毫秒)

channel.queueDeclare("myQueue", true, false, false, arguments);

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/62808.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/62808.shtml
英文地址,請注明出處:http://en.pswp.cn/web/62808.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

前端怎么預覽pdf

1.背景 后臺返回了一個在線的pdf地址&#xff0c;需要我這邊去做一個pdf的預覽&#xff08;需求1&#xff09;&#xff0c;并且支持配置是否可以下載&#xff08;需求2&#xff09;&#xff0c;需要在當前頁就能預覽&#xff08;需求3&#xff09;。之前我寫過一篇預覽pdf的文…

Python 參數配置使用 XML 文件的教程:輕松管理你的項目配置

Python 參數配置使用 XML 文件的教程&#xff1a;輕松管理你的項目配置 一句話總結&#xff1a;當配置項存儲在外部文件&#xff08;如 XML、JSON&#xff09;時&#xff0c;修改配置無需重新編譯和發布代碼。通過更新 XML 文件即可調整參數&#xff0c;無需更改源代碼&#xf…

解決 MySQL 啟動失敗與大小寫問題,重置數據庫

技術文檔&#xff1a;解決 MySQL 啟動失敗與大小寫問題&#xff0c;重置數據庫 1. 問題背景 在使用 MySQL 時&#xff0c;可能遇到以下問題&#xff1a; MySQL 啟動失敗&#xff0c;日志顯示 “permission denied” 或 “Can’t create directory” 錯誤。MySQL 在修改配置文…

python webdriver-manager 實現selenium 免下載安裝webdriver

python webdriver-manager 實現selenium 免下載安裝webdriver selenium在自動化測試中,通常需要使用瀏覽器驅動來與瀏覽器進行交互。然而,手動下載、安裝、以及管理這些驅動非常麻煩,尤其是當驅動版本頻繁更新時。為此,webdriver-manager庫提供了一個極簡的方案,自動幫我…

滑動窗口算法專題

滑動窗口簡介 滑動窗口就是利用單調性&#xff0c;配合同向雙指針來優化暴力枚舉的一種算法。 該算法主要有四個步驟 1. 先進進窗口 2. 判斷條件&#xff0c;后續根據條件來判斷是出窗口還是進窗口 3. 出窗口 4.更新結果&#xff0c;更新結果這個步驟是不確定的&#xff0c…

C# 中的Task

文章目錄 前言一、Task 的基本概念二、創建 Task使用異步方法使用 Task.Run 方法 三、等待 Task 完成使用 await 關鍵字使用 Task.Wait 方法 四、處理 Task 的異常使用 try-catch 塊使用 Task.Exception 屬性 五、Task 的延續使用 ContinueWith 方法使用 await 關鍵字和異步方法…

【AIGC】如何高效使用ChatGPT挖掘AI最大潛能?26個Prompt提問秘訣幫你提升300%效率的!

還記得第一次使用ChatGPT時&#xff0c;那種既興奮又困惑的心情嗎&#xff1f;我是從一個對AI一知半解的普通用戶&#xff0c;逐步成長為現在的“ChatGPT大神”。這一過程并非一蹴而就&#xff0c;而是通過不斷的探索和實踐&#xff0c;掌握了一系列高效使用的技巧。今天&#…

浩辰CAD教程004:柱梁板

文章目錄 柱梁板標準柱角柱構造柱柱齊墻邊繪制梁繪制樓板 柱梁板 標準柱 繪制標準柱&#xff1a; ①&#xff1a;點選插入柱子②&#xff1a;沿著一根軸線布置柱子③&#xff1a;指定的矩形區域內的軸線交點插入柱子 替換現有柱子&#xff1a;選擇替換之后的柱子形狀&#x…

UNIX數據恢復—UNIX系統常見故障問題和數據恢復方案

UNIX系統常見故障表現&#xff1a; 1、存儲結構出錯&#xff1b; 2、數據刪除&#xff1b; 3、文件系統格式化&#xff1b; 4、其他原因數據丟失。 UNIX系統常見故障解決方案&#xff1a; 1、檢測UNIX系統故障涉及的設備是否存在硬件故障&#xff0c;如果存在硬件故障&#xf…

橋接模式的理解和實踐

橋接模式&#xff08;Bridge Pattern&#xff09;&#xff0c;又稱橋梁模式&#xff0c;是一種結構型設計模式。它的核心思想是將抽象部分與實現部分分離&#xff0c;使它們可以獨立地進行變化&#xff0c;從而提高系統的靈活性和可擴展性。本文將詳細介紹橋接模式的概念、原理…

HTML綜合

一.HTML的初始結構 <!DOCTYPE html> <html lang"en"><head><!-- 設置文本字符 --><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><!-- 設置網頁…

二維碼數據集,使用yolov,voc,coco標注,3044張各種二維碼原始圖片(未圖像增強)

二維碼數據集&#xff0c;使用yolov&#xff0c;voc&#xff0c;coco標注&#xff0c;3044張各種二維碼原始圖片&#xff08;未圖像增強&#xff09; 數據集分割 訓練組70&#xff05; 2132圖片 有效集20&#xff05; 607圖片 測試集10&#xff05; 305圖…

Python爬蟲技術的最新發展

在互聯網的海洋中&#xff0c;數據就像是一顆顆珍珠&#xff0c;而爬蟲技術就是我們手中的潛水艇。2024年&#xff0c;爬蟲技術有了哪些新花樣&#xff1f;讓我們一起潛入這個話題&#xff0c;看看最新的發展和趨勢。 1. 異步爬蟲&#xff1a;速度與激情 隨著現代Web應用的復…

用豆包MarsCode IDE,從0到1畫出精美數據大屏!

豆包MarsCode IDE 是一個云端 AI IDE 平臺&#xff0c;通過內置的 AI 編程助手&#xff0c;開箱即用的開發環境&#xff0c;可以幫助開發者更專注于各類項目的開發。 作為一名前端開發工程師&#xff0c;今天想嘗試利用豆包MarsCode IDE&#xff0c;選擇 Vue Echarts 創建一個…

游戲引擎學習第42天

倉庫: https://gitee.com/mrxiao_com/2d_game 簡介 目前我們正在研究的內容是如何構建一個基本的游戲引擎。我們將深入了解游戲開發的每一個環節&#xff0c;從最基礎的技術實現到高級的游戲編程。 角色移動代碼 我們主要討論的是角色的移動代碼。我一直希望能夠使用一些基…

Redis是什么?Redis和MongoDB的區別在那里?

Redis介紹 Redis&#xff08;Remote Dictionary Server&#xff09;是一個開源的、基于內存的數據結構存儲系統&#xff0c;它可以用作數據庫、緩存和消息中間件。以下是關于Redis的詳細介紹&#xff1a; 一、數據結構支持 字符串&#xff08;String&#xff09; 這是Redis最…

計算機網絡中的三大交換技術詳解與實現

目錄 計算機網絡中的三大交換技術詳解與實現1. 計算機網絡中的交換技術概述1.1 交換技術的意義1.2 三大交換技術簡介 2. 電路交換技術2.1 理論介紹2.2 Python實現及代碼詳解2.3 案例分析 3. 分組交換技術3.1 理論介紹3.2 Python實現及代碼詳解3.3 案例分析 4. 報文交換技術4.1 …

[Python] 操作redis使用pipeline保證原子性

1. pipeline介紹 在Python中使用Redis的Pipeline可以使多個Redis命令在一個請求中批量執行&#xff0c;從而提高效率。redis-py庫提供了對Redis Pipeline的支持&#xff0c;下面是一個基本的例子&#xff1a; 首先&#xff0c;確保你已安裝了redis庫&#xff1a; pip instal…

Bug 解決 無法正常登錄或獲取不到用戶信息

目錄 1、跨域問題 2、后端代碼問題 3、前端代碼問題 我相信登錄這個功能是很多人做項目時候遇到第一個檻&#xff01; **看起來好像很簡單的登錄功能&#xff0c;實際上還是有點坑的&#xff0c;比如明明賬號密碼都填寫正確了&#xff0c;**為什么登錄后請求接口又說我沒登…

論文翻譯 | ChunkRAG: Novel LLM-Chunk Filtering Method for RAG Systems

摘要 使用大型語言模型&#xff08;LLM&#xff09;的檢索-增強生成&#xff08;RAG&#xff09;系統經常由于檢索不相關或松散相關的信息而生成不準確的響應。現有的在文檔級別操作的方法無法有效地過濾掉此類內容。我們提出了LLM驅動的塊過濾&#xff0c;ChunkRAG&#xff0…