SpringBoot 系列之集成 RabbitMQ 實現高效流量控制

系列博客專欄:

  • JVM系列博客專欄
  • SpringBoot系列博客

Spring Boot 2.2.1 集成 RabbitMQ 實現高效流量控制

在分布式系統中,消息隊列是實現異步通信、解耦服務的重要組件。RabbitMQ 作為一款成熟的開源消息隊列,廣泛應用于各類項目中。本文將結合 Spring Boot 2.2.1,詳細介紹如何集成 RabbitMQ 并實現基于隊列長度、內存和磁盤的流量控制,同時引入服務端限流配置,進一步提升系統的穩定性與可靠性。

一、RabbitMQ 流量控制的重要性

當消息產生速度過快,超過消息隊列的處理能力時,可能會導致隊列積壓、系統性能下降甚至崩潰。通過流量控制,可以有效限制消息的流入速度,使系統能夠在合理的負載下運行,保障服務的穩定性和可靠性。

二、Spring Boot 2.2.1 集成 RabbitMQ 基礎配置

1. 引入依賴

pom.xml 文件中添加 Spring Boot AMQP 和 Web 依賴:

<dependencies><!-- Spring Boot Starter AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- JSON處理依賴 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- RabbitMQ測試依賴 --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置文件

application.yml 中配置 RabbitMQ 連接信息和相關參數:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /requested-heartbeat: 30connection-timeout: 10000publisher-confirms: truepublisher-returns: truelistener:simple:acknowledge-mode: autoprefetch: 50concurrency: 3max-concurrency: 10cache:channel:size: 50checkout-timeout: 30000connection:mode: CHANNELsize: 5# 自定義流量控制配置
app:flow-control:max-messages: 1000duration: 5000

3. RabbitMQ 配置類

創建 RabbitMQConfig 類,配置隊列、交換機、綁定關系、消息轉換器以及 RabbitTemplate:

package com.example.springboot.rabbitmq.configuration;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class RabbitMQConfig {public static final String QUEUE_NAME = "flow.control.queue";public static final String EXCHANGE_NAME = "flow.control.exchange";public static final String ROUTING_KEY = "flow.control.key";// 配置隊列@Beanpublic Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();}// 配置交換機@Beanpublic DirectExchange exchange() {return new DirectExchange(EXCHANGE_NAME);}// 綁定隊列和交換機@Beanpublic Binding binding(Queue queue, DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}// 配置消息轉換器@Beanpublic Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);// 設置mandatory標志,確保消息在無法路由時返回rabbitTemplate.setMandatory(true);// 設置發布確認回調rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息發送成功: {}",  correlationData);} else {log.warn("消息發送失敗: {}",  cause);}});// 設置返回回調rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息被退回: {}", new String(message.getBody()));log.info("回復碼: ", replyCode);log.info("回復文本: ", replyText);log.info("交換機: ", exchange);log.info("路由鍵: ", routingKey);});return rabbitTemplate;}// 配置監聽器容器工廠@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter);factory.setConcurrentConsumers(3); // 設置并發消費者數量factory.setMaxConcurrentConsumers(10);factory.setPrefetchCount(50); // 設置 QoSfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動確認模式return factory;}
}

三、基于隊列長度的流量控制

MessageProducer 類中實現基于隊列長度的流量控制邏輯:

package com.example.demo.service;import com.example.demo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;private final AtomicInteger messageCount = new AtomicInteger(0);private static final int MAX_MESSAGES = 1000;private volatile boolean flowControlEnabled = false;public void sendMessage(String message) {if (flowControlEnabled) {System.out.println("流量控制已啟用,暫停發送消息");return;}if (messageCount.get() >= MAX_MESSAGES) {System.out.println("達到最大消息數量,觸發流量控制");enableFlowControl(5000);return;}String correlationId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(correlationId);rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,message,correlationData);messageCount.incrementAndGet();System.out.println("發送消息: " + message + ", 消息ID: " + correlationId);}public void enableFlowControl(long durationMillis) {flowControlEnabled = true;System.out.println("流量控制已啟用,持續時間: " + durationMillis + "ms");new Thread(() -> {try {Thread.sleep(durationMillis);} catch (InterruptedException e) {Thread.currentThread().interrupt();}flowControlEnabled = false;messageCount.set(0);System.out.println("流量控制已禁用");}).start();}
}

除了用代碼限制外,可以用maxLength設置,示例代碼:

 // 配置隊列@Beanpublic Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();}

四、x-max-length-bytes 參數詳解

x-max-length-bytes 用于限制隊列中消息的總字節數。在創建隊列時,可以通過代碼配置:

@Bean
public Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).maxLengthBytes(1024 * 1024 * 10) // 設置隊列消息總字節數上限為10MB.build();
}

當隊列中消息的總字節數達到設定的閾值時,后續新消息的處理策略由 x-overflow 參數決定:

  • drop-head:丟棄隊列頭部的消息,為新消息騰出空間。
  • reject-publish:拒絕接收新消息,并向生產者返回 Basic.Reject 響應。

五、基于內存和磁盤的流量控制

通過配置 RabbitMQ 服務器的內存和磁盤告警閾值,當服務器內存使用或磁盤空間達到閾值時,會自動觸發流量控制。例如:

rabbitmqctl set_vm_memory_high_watermark 0.6

此命令將內存高水位線設置為系統內存的 60%。

六、服務端限流配置

1. 基于 Guava 的限流實現

添加 Guava 依賴:

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>28.2-jre</version>
</dependency>

使用 RateLimiter 進行限流:

package com.example.demo.service;import com.google.common.util.concurrent.RateLimiter;
import org.springframework.stereotype.Service;@Service
public class LimitedService {private final RateLimiter rateLimiter = RateLimiter.create(5);public void limitedMethod() {if (rateLimiter.tryAcquire()) {System.out.println("請求被處理");} else {System.out.println("請求被限流");}}
}

七、 消費端限流

默認情況下,如果不進行配置,RabbitMQ會盡可能快速地把隊列中的消息發送到消費者。如果消息數量過多,可能會導致OOM或者影響其他進程的正常運行

1. 消費端限流示例

package com.example.springboot.rabbitmq.service;import com.example.springboot.rabbitmq.configuration.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;@Service
@Slf4j
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)@Retryable(value = {IOException.class}, maxAttempts = 3,backoff = @Backoff(delay = 2000, multiplier = 2))public void receiveMessage(Message message, Channel channel) throws IOException {try {if (channel == null || !channel.isOpen()) {log.warn("Channel is closed or null, unable to process message");return;}// 動態設置預取計數channel.basicQos(calculatePrefetchCount());String content = new String(message.getBody());log.info("接收到消息:{} ", content);// 模擬消息處理時間Thread.sleep(100);// 發送消息確認channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("消息處理完成");} catch (Exception e) {log.error("處理消息時發生錯誤: {}", e.getMessage(), e);if (channel != null && channel.isOpen()) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 失敗后重新入隊}}}// 根據系統負載動態計算預取計數private int calculatePrefetchCount() {double cpuLoad = getSystemCpuLoad();int basePrefetch = 10;return (int) Math.max(1, basePrefetch * (1 - cpuLoad));}// 獲取當前系統 CPU 負載private double getSystemCpuLoad() {OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();return osBean.getSystemLoadAverage() / osBean.getAvailableProcessors();}}

八、總結

通過上述配置和代碼示例,您可以實現對 RabbitMQ 的高效流量控制,從而提升系統的穩定性和可靠性。合理利用隊列長度限制、內存和磁盤流量控制,以及服務端限流策略,可以幫助系統在高負載情況下保持良好的運行狀態。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/83490.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/83490.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/83490.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

新視訊影視官網入口,影視動漫在線播放網站

新視訊影視是一個免費為廣大追劇迷提供在線播放服務的影視平臺&#xff0c;深受眾多影視愛好者的喜愛。它涵蓋了大量免費的VIP電視劇資源、最新上映的大片、好看的綜藝節目以及動漫視頻&#xff0c;是一個播放速度快、資源多的免費影視網站。用戶無需注冊或登錄&#xff0c;即可…

【使用】【經驗】docker 清理未使用的鏡像的命令

docker images prune在 Docker 中清理未使用的鏡像&#xff08;包括懸空鏡像和完全未被引用的鏡像&#xff09;&#xff0c;可以使用以下命令&#xff1a; 1. ?刪除所有懸空鏡像?&#xff08;推薦常用&#xff09; docker image prune?懸空鏡像 (dangling images)?? 是指…

OpenCV CUDA模塊圖像處理------圖像融合函數blendLinear()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 該函數執行 線性融合&#xff08;加權平均&#xff09; 兩個圖像 img1 和 img2&#xff0c;使用對應的權重圖 weights1 和 weights2。 融合公式…

【Typst】6.布局函數

概述 上節我們介紹了文檔結構元素的函數&#xff0c;本節介紹一些控制布局使用的函數&#xff0c;掌握他們之后你可以更進一步的控制頁面元素的布局。 系列目錄 1.Typst概述2.Typst標記語法和基礎樣式3.Typst腳本語法4.導入、包含和讀取5.文檔結構元素與函數6.布局函數 對齊…

【音視頻】FFmpeg 編碼H265

一、概述 實現了讀入本地yuv文件&#xff0c;通過libx265編碼為H265格式&#xff0c;并存儲到本地文件中 二、實現流程 準備文件 在build路徑下準備yuv文件 在項目中添加文件參數&#xff0c;輸出為h265文件&#xff0c;使用libx265編碼 初始化解碼器 通過傳進來的libx265…

ECreator低代碼平臺-文件管理器的使用說明

Ecreator是中山華拓信息技術公司旗下的一款低代碼平臺&#xff0c;主要功能包含&#xff1a;文件管理器&#xff0c;表單數據管理器&#xff0c;儀表盤設計界面&#xff0c;內容頁面自定義等功能&#xff0c;可以用于快速低成本的構建網站和企業內部應用。 下面介紹一下文件管…

高考加油!UI界面生成器!

這個高考助力標語生成器具有以下特點&#xff1a; 視覺設計&#xff1a;采用了藍色為主色調&#xff0c;搭配漸變背景和圓形裝飾元素&#xff0c;營造出寧靜而充滿希望的氛圍&#xff0c;非常適合高考主題。 標語生成&#xff1a;內置了超過 100 條精心挑選的高考加油標語&a…

阿姆達爾定律的演進:古斯塔夫森定律

前言 在上一篇文章《使用阿姆達爾定律來提升效率》中提到的阿姆達爾定律前提是假設問題的規模保持不變&#xff0c;并且給定一臺速度更快的機器&#xff0c;目標是更快地解決問題。然而&#xff0c;在大多數情況下&#xff0c;這并不完全正確。當有一臺更快的機器時&#xff0…

【RabbitMQ】- Channel和Delivery Tag機制

在 RabbitMQ 的消費者代碼中&#xff0c;Channel 和 tag 參數的存在是為了實現消息確認機制&#xff08;Acknowledgment&#xff09;和精細化的消息控制。 Channel 參數 作用 Channel 是 AMQP 協議的核心操作接口&#xff0c;通過它可以直接與 RabbitMQ 交互&#xff1a; 手…

核心機制:流量控制

搭配滑動窗口使用的 窗口大小 窗口越大,傳輸速度就越快,但是也不能無限大,太大了,對于可靠性會有影響 比如發生方以非常快的速度,發送,接收方的處理速度跟不上,也就會導致有效數據被接受方丟棄(又得重傳) 流量控制,就是根據接收方的處理能力(如何衡量?),干預到發送方的發送…

深度強化學習賦能城市消防優化,中科院團隊提出DRL新方法破解設施配置難題

在城市建設與發展中&#xff0c;地理空間優化至關重要。從工業園區選址&#xff0c;到公共服務設施布局&#xff0c;它都發揮著關鍵作用。但傳統求解方法存在諸多局限&#xff0c;如今&#xff0c;深度學習技術為其帶來了新的轉機。 近日&#xff0c;在中國地理學會地理模型與…

安科電動機保護器通過ModbusRTU轉profinet網關與PLC通訊

安科電動機保護器通過ModbusRTU轉profinet網關與PLC通訊 在工業自動化領域&#xff0c;設備間的通信和數據交互至關重要。Modbus作為一種常用的通訊協議&#xff0c;廣泛應用于各種工業現場&#xff1b;而Profinet則憑借其高效、實時性&#xff0c;在工業以太網通訊中占據重要…

python直方圖

在Python中&#xff0c;繪制直方圖&#xff08;Histogram&#xff09;是一項非常常見的任務&#xff0c;通常用于數據可視化&#xff0c;以展示數據的分布情況。Python中有多種庫可以繪制直方圖&#xff0c;其中最常用的兩個庫是Matplotlib和Seaborn。此外&#xff0c;Pandas庫…

在Oxygen編輯器中使用DeepSeek

羅馬尼亞公司研制開發的Oxygen編輯器怎樣與國產大模型結合&#xff0c;這是今年我在tcworld大會上給大家的分享&#xff0c;需要ppt的朋友請私信聯系 - 1 - Oxygen編輯器中的人工智能助手 Oxygen編輯器是羅馬尼亞的Syncro Soft公司開發的一款結構化文檔編輯器。 它是用來編寫…

neo4j 5.19.0安裝、apoc csv導入導出 及相關問題處理

前言 突然有需求需要用apoc 導入 低版本的圖譜數據&#xff0c;網上資料又比較少&#xff0c;所以就看官網資料并處理了apoc 導入的一些問題。 相關地址 apoc 官方安裝網址 apoc 官方導出csv 教程地址 apoc 官方 導入 csv 地址 docker 安裝 執行如下命令啟動鏡像 doc…

macos常見且應該避免被覆蓋的系統環境變量(避免用 USERNAME 作為你的自定義變量名)

文章目錄 macos避免用 USERNAME 作為你的自定義變量名macos常見且應該避免被覆蓋的系統環境變量 macos避免用 USERNAME 作為你的自定義變量名 問題&#xff1a; 你執行了&#xff1a;export USERNAME“admin” 然后執行&#xff1a;echo ${USERNAME} 輸出卻是&#xff1a;xxx …

Python訓練打卡Day41

簡單CNN 知識回顧 數據增強卷積神經網絡定義的寫法batch歸一化&#xff1a;調整一個批次的分布&#xff0c;常用與圖像數據特征圖&#xff1a;只有卷積操作輸出的才叫特征圖調度器&#xff1a;直接修改基礎學習率 卷積操作常見流程如下&#xff1a; 1. 輸入 → 卷積層 → Batch…

【親測有效】Mybatis-Plus中更新字段為null

Mybatis-Plus中更新字段為null 遇到問題 Mybatis-Plus更新的默認行為如下: Mybatis-Plus默認如果某個字段為null, 默認不更新這個字段, 例如有個Double類型的字段, 當前數據庫數據為10, 然后傳參時當前字段為null, 實際上Mybatis-Plus是不會覆蓋該字段為null的 在傳參的時候如…

如何使用插件和子主題添加WordPress自定義CSS(附:常見錯誤)

您是否曾經想更改網站外觀的某些方面&#xff0c;但不知道怎么做&#xff1f;有一個解決方案——您可以將自定義 CSS&#xff08;層疊樣式表&#xff09;添加到您的WordPress網站&#xff01; 在本文中&#xff0c;我們將討論您需要了解的有關CSS的所有知識以及如何使用它來修…

左值引用和右值引用

一、基本概念 左值&#xff08;lvalue&#xff09;和右值&#xff08;rvalue&#xff09; 左值指的是有確定存儲位置&#xff08;地址&#xff09;的對象&#xff0c;通常可以出現在賦值語句左側。例如&#xff1a;變量名、解引用指針得到的對象、數組元素等都屬于左值。 右值…