RabbitMQ惰性隊列的工作原理、消息持久化機制、同步刷盤的概念、延遲插件的使用方法

惰性隊列工作原理

惰性隊列通過盡可能多地將消息存儲到磁盤上來減少內存的使用。與傳統隊列相比,惰性隊列不會主動將消息加載到內存中,而是盡量讓消息停留在磁盤上,從而降低內存占用。盡管如此,它并不保證所有操作都是同步寫入磁盤的。這意味著消息可能會先被緩存到操作系統的緩沖區中,然后由操作系統決定何時將其真正寫入磁盤。

  • 優點:適合處理大量消息且對內存壓力敏感的場景。
  • 缺點:由于頻繁的磁盤I/O操作,性能可能不如傳統隊列。

同步刷盤的概念

同步刷盤意味著每次寫入操作都會等待數據完全寫入磁盤后才返回確認信息。雖然這種方式提供了更強的數據持久性保證,但它也顯著增加了寫入操作的延遲。對于RabbitMQ而言,可以通過設置消息為持久化來增加數據的安全性,但對于極端情況下的數據安全性要求,還需要結合其他策略如調整操作系統參數或使用文件系統級別的同步寫入配置。

延遲插件的工作原理

RabbitMQ本身沒有內置的延遲隊列功能,但可以通過安裝rabbitmq_delayed_message_exchange插件實現這一功能。該插件允許創建一個自定義交換機類型,該交換機能夠根據消息頭中的延遲時間屬性來延遲消息的傳遞。

在Spring Boot中集成RabbitMQ惰性隊列和延遲消息

1.?項目初始化

首先,確保你的Spring Boot項目中包含必要的依賴:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
</dependencies>
2.?配置RabbitMQ連接

application.yml中配置RabbitMQ連接信息:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
3.?定義惰性隊列

創建一個配置類來定義惰性隊列:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {/*** 定義惰性模式的隊列* @return 返回惰性隊列實例*/@Beanpublic Queue lazyQueue() {Map<String, Object> args = new HashMap<>();// 設置隊列為惰性模式args.put("x-queue-mode", "lazy");return new Queue("my_lazy_queue", true, false, false, args); // durable=true for queue durability}
}
4.?發送持久化消息

創建一個服務類用于發送消息,并確保消息是持久化的:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送一條持久化消息到惰性隊列* @param message 要發送的消息內容*/public void sendMessage(String message) {rabbitTemplate.convertAndSend("my_lazy_queue", message);System.out.println(" [x] Sent '" + message + "'");}
}

確保消息持久化可以在application.yml中設置如下:

spring:rabbitmq:template:exchange: ''routing-key: 'my_lazy_queue'mandatory: truepublisher-confirms: truepublisher-returns: true
5.?接收消息

創建一個監聽器來接收消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageReceiver {/*** 監聽并接收來自惰性隊列的消息* @param message 接收到的消息內容*/@RabbitListener(queues = "my_lazy_queue")public void receiveMessage(String message) {System.out.println(" [x] Received '" + message + "'");}
}
6.?使用延遲插件發送延遲消息

首先,在RabbitMqConfig中定義延遲交換機:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {/*** 定義延遲交換機* @return 返回延遲交換機實例*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);}/*** 綁定延遲隊列到延遲交換機* @param delayedQueue 延遲隊列* @param delayExchange 延遲交換機* @return 返回綁定實例*/@Beanpublic Binding binding(Queue delayedQueue, CustomExchange delayExchange) {return new Binding("delayed_queue", Binding.DestinationType.QUEUE, "delayed_exchange", "routing.key", Collections.emptyMap());}/*** 定義延遲隊列* @return 返回延遲隊列實例*/@Beanpublic Queue delayedQueue() {return new Queue("delayed_queue");}
}

然后,創建一個服務類來發送延遲消息:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class DelayedMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送帶有延遲的消息* @param message 要發送的消息內容* @param delayTime 延遲時間(毫秒)*/public void sendDelayedMessage(String message, int delayTime) {MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setHeader("x-delay", delayTime);return message;};rabbitTemplate.convertAndSend("delayed_exchange", "routing.key", message, messagePostProcessor);System.out.println(" [x] Sent '" + message + "' with delay.");}
}

最后,創建一個監聽器來接收延遲消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DelayedMessageReceiver {/*** 監聽并接收來自延遲隊列的消息* @param message 接收到的消息內容*/@RabbitListener(queues = "delayed_queue")public void receiveDelayedMessage(String message) {System.out.println(" [x] Received delayed message '" + message + "'");}
}

高級特性和最佳實踐

  • 發布確認機制:為了提高可靠性,可以開啟發布確認機制,以確保消息確實被RabbitMQ服務器接受。

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message acknowledged");} else {System.err.println("Message not acknowledged due to: " + cause);}
});
  • 預取計數(Prefetch Count):通過設置預取計數限制每個消費者同時處理的消息數量,有助于防止消費者被過多未處理的消息壓垮。

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConnectionConfig {@Beanpublic CachingConnectionFactory cachingConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setChannelCacheSize(25);connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(200);return connectionFactory;}
}

可以在application.yml中設置:

spring:rabbitmq:listener:simple:prefetch: 10

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/76567.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/76567.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/76567.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Spark Core(二)

Spark-Core編程&#xff08;二&#xff09; RDD轉換算子 RDD 根據數據處理方式的不同將算子整體上分為 Value 類型、雙 Value 類型和 Key-Value 類型 Value類型 1&#xff09;map 將處理的數據逐條進行映射轉換&#xff0c;這里的轉換可以是類型的轉換&#xff0c;也可以是…

C#打開文件及目錄腳本

如果每天開始工作前都要做一些準備工作&#xff0c;比如打開文件或文件夾&#xff0c;我們可以使用代碼一鍵完成。 using System.Diagnostics; using System.IO;namespace OpenFile {internal class Program{static void Main(string[] args){Console.WriteLine("Hello, …

Python生成exe

其中的 -w 參數是 PyInstaller 用于窗口模式&#xff08;Windowed mode&#xff09;&#xff0c;它會關閉命令行窗口的輸出&#xff0c;這通常用于 圖形界面程序&#xff08;GUI&#xff09;&#xff0c;比如使用 PyQt6, Tkinter, PySide6 等。 所以&#xff1a; 如果你在沒有…

【大模型微調】如何解決llamaFactory微調效果與vllm部署效果不一致如何解決

以下個人沒整理太全 一、生成式語言模型的對話模板介紹 使用Qwen/Qwen1.5-0.5B-Chat訓練 對話模板不一樣。回答的內容就會不一樣。 我們可以看到例如qwen模型的tokenizer_config.json文件&#xff0c;就可以看到對話模板&#xff0c;一般同系列的模型&#xff0c;模板基本都…

Linux網絡編程——詳解網絡層IP協議、網段劃分、路由

目錄 一、前言 二、IP協議的認識 1、什么是IP協議&#xff1f; 2、IP協議報頭 三、網段劃分 1、初步認識IP與路由 2、IP地址 I、DHCP動態主機配置協議 3、IP地址的劃分 I、CIDR設計 II、子網數目的計算 III、子網掩碼的確定 四、特殊的IP地址 五、IP地址的數量限…

ansible+docker+docker-compose快速部署4節點高可用minio集群

目錄 github項目地址 示例服務器列表 安裝前 修改變量文件group_vars/all.yml 修改ansible主機清單 修改setup.sh安裝腳本 用法演示 安裝后驗證 github項目地址 https://github.com/sulibao/ansible_minio_cluster.git 示例服務器列表 安裝前 修改變量文件group_var…

MySql主從相關概念

想象一下&#xff0c;你的業務飛速增長&#xff0c;用戶請求如潮水般涌來&#xff0c;突然數據庫主庫宕機&#xff0c;數據丟失&#xff0c;服務癱瘓——這簡直是開發者的噩夢&#xff01;MySQL主從復制就像一張安全網&#xff0c;通過主庫寫、從庫讀的協作模式&#xff0c;不僅…

機械臂只有位置信息是否可以進行手眼標定?

平常我在做手眼標定時&#xff0c;一般都是通過OpenCV的cv::calibrateHandEye函數進行求解&#xff0c;需要輸入多組不同的機械臂位姿。今天遇到了一款舵機機器人&#xff0c;只能獲取位置&#xff0c;得不到姿態信息&#xff0c;想著那就把姿態都設為0&#xff0c;結果求不出來…

華為數字芯片機考2025合集2已校正

單選 1. 題目內容 關于亞穩態的描述錯誤的是&#xff08; &#xff09;。 1. 解題步驟 1.1 理解亞穩態&#xff08;Metastability&#xff09;的核心特性 亞穩態是指觸發器無法在指定時間內穩定輸出有效邏輯電平&#xff08;0或1&#xff09;的狀態&#xff0c;其關鍵特點…

T-Box車載系統介紹及其應用

定義 T-Box汽車系統&#xff0c;全稱為Telematics - BOX&#xff0c;也常簡稱為車載T - BOX&#xff0c;是汽車智能系統及車聯網系統中的核心組成部分&#xff0c;是安裝在車輛上的一種高科技遠程信息處理器。 工作原理 T-Box的核心功能主要通過MPU和MCU實現。MPU負責應用程序功…

[redis進階一]redis的持久化(1)RDB篇章

目錄 一 認識持久化 (1)先看總結圖 (2)什么是持久化? (3)redis是怎么進行持久化的呢 (4)簡單分析一下RDB持久化和AOF持久化的不同 二 RDB持久化 (1)RDB的觸發機制 (2)RDB的bgsave執行流程 (3)RDB文件的處理 (4)RDB的優缺點 (5)RDB效果演示板書 三 溫習Linux文件…

uniapp日常總結--uniapp頁面跳轉方式

uniapp日常總結--uniapp頁面跳轉方式_uniapp 跳轉-CSDN博客

《汽車電器與電子技術》實驗報告

SRS系統結構原理與故障檢測診斷 車輛上為什么要配安全氣囊&#xff1f;——解析汽車被動安全的關鍵防線 一、安全氣囊的核心作用&#xff1a;應對高速碰撞的“救命緩沖墊” 車輛在高速碰撞時&#xff08;如正面碰撞、側面碰撞&#xff09;&#xff0c;人體會因慣性以極高速度…

ffmpeg編解碼器相關函數

文章目錄 &#x1f3af; 你需要理解的核心結構體&#xff1a;&#x1f4e6; 常用函數及使用順序&#xff08;以解碼為例&#xff09;1?? avcodec_find_decoder() / avcodec_find_encoder()2?? avcodec_alloc_context3()3?? avcodec_parameters_to_context()4?? avcodec…

尚硅谷2019版Java網絡編程筆記

第14章 網絡編程 網絡編程概述 什么是網絡編程&#xff1f; 網絡編程是通過網絡協議實現計算機之間的數據交換。Java提供了強大的網絡編程支持&#xff0c;隱藏了底層細節&#xff0c;開發者可以輕松實現網絡通信。 網絡編程的核心問題 如何定位網絡上的主機&#xff1a;通…

解決【遠程主機可能不符合 glibc 和 libstdc++ Vs code 服務器的先決條件】

可能是因為vscode不支持遠程操作系統的版本&#xff0c;要么升級操作系統要么回退vscode版本 vscode回退1.97版本下載地址&#xff1a; 1.97版本VSCODE

forms+windows添加激活水印

formswindows添加激活水印 多語言水印文本&#xff0c;根據系統語言自動切換。水印顯示在每個屏幕的右下角&#xff0c;位置動態調整。半透明灰色文字&#xff0c;微軟雅黑字體。窗口無邊框、置頂、透明背景&#xff0c;不干擾用戶操作。支持多顯示器。高DPI適配。 效果圖&am…

LeetCode --- 444 周賽

題目列表 3507. 移除最小數對使數組有序 I 3508. 設計路由器 3509. 最大化交錯和為 K 的子序列乘積 3510. 移除最小數對使數組有序 II 一、移除最小數對使數組有序 I & II 由于數組是給定的&#xff0c;所以本題的操作步驟是固定的&#xff0c;我們只要能快速模擬操作的過…

限流、降級、熔斷、隔離?

在微服務架構中&#xff0c;服務限流、降級、熔斷和隔離是保障系統高可用性的核心手段&#xff0c;但它們解決的問題和應用場景不同。以下是它們的區別、解決方案和實際案例的詳細說明&#xff1a; 一、服務限流&#xff08;Rate Limiting&#xff09; 定義&#xff1a;通過限…

Day22 -php開發01--留言板+知識點(超全局變量 文件包含 數據庫操作 第三方插件)

環境要求&#xff1a;php7.0.9 小皮 navicat phpstorm24.1 知識點&#xff1a;會寫&#xff08;留言板 留言板后臺&#xff09; 超全局變量 三方插件的使用 文件包含 1、開啟小皮并利用navicat新建一個數據庫 注意&#xff1a;本地的服務mysql關閉后 才可打開小皮。屬…