rocketmq批量執行跑批任務報錯

rocketmq批量執行跑批任務,報下面的錯誤,怎么處理一下呢?是修改配置還是修改代碼還是?? org.apache.rocketmq.client.exception.MQBrokerException: CODE: 215 DESC: [FLOW]client has exhausted the send quota for the current time slot, start flow control for a while. BROKER:

這個錯誤是?RocketMQ 服務端對客戶端進行了流量控制,說明你的批量任務發送消息的速度超過了Broker的配置限制。需要從多個方面進行處理:

🎯 錯誤原因分析

text

CODE: 215  DESC: [FLOW]client has exhausted the send quota for the current time slot

這是RocketMQ的流控機制,Broker為了保護自己不被壓垮,對生產者進行了限流。

🛠? 解決方案

方案1:調整發送策略(代碼層面)?- 推薦首選

java

// 1. 添加發送間隔,避免瞬間大量發送
public void sendBatchMessages(List<Message> messages) {for (Message message : messages) {try {producer.send(message);// 添加適當間隔Thread.sleep(50); // 50ms間隔} catch (Exception e) {// 異常處理}}
}// 2. 使用批量發送(如果支持)
public void sendBatch(List<Message> messages) {try {// 分批發送,每批100條int batchSize = 100;for (int i = 0; i < messages.size(); i += batchSize) {List<Message> batch = messages.subList(i, Math.min(i + batchSize, messages.size()));producer.send(batch);Thread.sleep(100); // 批次間間隔}} catch (Exception e) {// 異常處理}
}// 3. 添加重試機制
public void sendWithRetry(Message message, int maxRetries) {int retryCount = 0;while (retryCount < maxRetries) {try {producer.send(message);return;} catch (MQBrokerException e) {if (e.getResponseCode() == 215) { // 流控錯誤retryCount++;try {// 指數退避重試long waitTime = (long) (Math.pow(2, retryCount) * 100);Thread.sleep(waitTime);} catch (InterruptedException ie) {Thread.currentThread().interrupt();}} else {throw e;}}}
}

方案2:調整生產者配置(配置層面)

java

// 修改Producer配置
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 設置發送超時時間
producer.setSendMsgTimeout(10000);
// 設置發送重試次數
producer.setRetryTimesWhenSendFailed(3);
// 設置異步發送不重試
producer.setRetryTimesWhenSendAsyncFailed(0);
// 開啟VIP通道(減少網絡開銷)
producer.setVipChannelEnabled(false);

方案3:調整Broker配置(需要運維權限)

如果可能,讓運維調整Broker的流控參數:

properties

# broker.conf 中的相關配置
sendMessageThreadPoolNums=16  # 發送消息線程數
waitTimeMillsInSendQueue=500  # 發送隊列等待時間
maxMessageSize=1024*1024*4    # 最大消息大小
defaultTopicQueueNums=8       # 默認主題隊列數

方案4:優化消息發送

java

// 1. 壓縮消息大小
Message message = new Message(topic, tags, compressIfNeeded(messageBody.getBytes()));// 2. 減少不必要的消息屬性
message.setKeys(""); // 清除不必要的keys// 3. 使用更高效的消息格式
public byte[] compressIfNeeded(byte[] data) {if (data.length > 1024) { // 大于1KB才壓縮// 使用GZIP壓縮return compressGZIP(data);}return data;
}

方案5:分布式限流控制

java

// 使用Redis或其他分布式協調工具控制發送速率
public class RateLimiter {private final RedisTemplate<String, String> redisTemplate;private final String rateLimitKey = "rocketmq:rate:limit";public boolean acquirePermission() {// 使用令牌桶算法控制速率Long current = redisTemplate.opsForValue().increment(rateLimitKey, 1);if (current == 1) {redisTemplate.expire(rateLimitKey, 1, TimeUnit.SECONDS);}return current <= 1000; // 每秒1000條}
}// 使用方式
public void sendWithRateLimit(Message message) {if (rateLimiter.acquirePermission()) {producer.send(message);} else {// 等待或進入隊列}
}

🔧 立即修復方案

在你的?QueueTemplateServiceImpl.sendObj?方法中添加流控處理:

java

// 修改發送方法,添加重試和流控處理
public class QueueTemplateServiceImpl {private static final int MAX_RETRIES = 3;public void sendObj(String topic, String tag, Object obj) {Message message = createMessage(topic, tag, obj);sendWithRetry(message, MAX_RETRIES);}private void sendWithRetry(Message message, int maxRetries) {int retryCount = 0;while (retryCount < maxRetries) {try {producer.send(message);return;} catch (MQBrokerException e) {if (e.getResponseCode() == 215) { // 流控錯誤retryCount++;log.warn("RocketMQ流控,第{}次重試", retryCount);try {// 指數退避:1s, 2s, 4sThread.sleep(1000L * (1 << (retryCount - 1)));} catch (InterruptedException ie) {Thread.currentThread().interrupt();throw new RuntimeException("發送中斷", ie);}} else {throw new RuntimeException("發送消息失敗", e);}} catch (Exception e) {throw new RuntimeException("發送消息失敗", e);}}throw new RuntimeException("發送消息失敗,超過最大重試次數");}
}

📊 監控和預警

java

// 添加監控指標
public class RocketMQMonitor {private static final MeterRegistry meterRegistry;public static void recordFlowControl() {meterRegistry.counter("rocketmq.flow_control.count").increment();}public static void recordSendLatency(long latency) {meterRegistry.timer("rocketmq.send.latency").record(latency, TimeUnit.MILLISECONDS);}
}// 在發送方法中添加監控
try {long start = System.currentTimeMillis();producer.send(message);long latency = System.currentTimeMillis() - start;RocketMQMonitor.recordSendLatency(latency);
} catch (MQBrokerException e) {if (e.getResponseCode() == 215) {RocketMQMonitor.recordFlowControl();}
}

? 推薦處理順序

  1. 立即修復:在代碼中添加重試機制和指數退避

  2. 優化發送:調整發送策略,添加適當間隔

  3. 監控預警:添加流控監控,便于及時發現問題

  4. 配置調整:如果問題持續,聯系運維調整Broker配置

  5. 架構優化:考慮使用批量發送或異步發送

首選方案:在?sendObj?方法中添加流控重試機制,這是最快速有效的解決方案。

您遇到的錯誤:

ERROR: CODE: 215 DESC: [FLOW] client has exhausted the send quota for the current time slot, start flow control for a while.

這是 RocketMQ 的客戶端流量控制(Flow Control)機制觸發的限流錯誤,表示 在當前時間窗口內,客戶端發送消息的速率超過了 Broker 設置的配額(quota),因此被強制限流。


? 錯誤原因分析

🔍 錯誤碼?215?含義:

  • CODE: 215:RocketMQ 內置的?客戶端流控錯誤
  • [FLOW]client has exhausted the send quota...:明確指出是?客戶端發送配額耗盡
  • 觸發條件:在?單位時間(如 1 秒)內發送消息數量或字節數?超過了 Broker 配置的閾值。

📌 常見于以下場景:

  • 批量跑批任務:短時間內發送大量消息(如 1000+ 條/秒)。
  • Producer 并發高:多個線程同時發送。
  • 消息體較大:雖然條數不多,但總字節數超限。

? 解決方案(分優先級)

? 方案一:【推薦】調整?發送速率(代碼層優化,最安全)

不要一次性發送太多消息,加入 限流或延遲

方法 1:加?Thread.sleep()?控制頻率
List<Message> messages = buildMessages(); // 批量消息
int batchSize = 10; // 每批發送 10 條
for (int i = 0; i < messages.size(); i += batchSize) {List<Message> subList = messages.subList(i, Math.min(i + batchSize, messages.size()));try {producer.send(subList); // 批量發送} catch (Exception e) {e.printStackTrace();}// 控制發送頻率,避免觸發流控Thread.sleep(100); // 每批間隔 100ms
}
方法 2:使用?令牌桶/計數器限流(更優雅)
<!-- 引入 Guava -->
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>32.0.0-jre</version>
</dependency>
import com.google.common.util.concurrent.RateLimiter;RateLimiter rateLimiter = RateLimiter.create(50); // 每秒最多 50 次發送for (Message msg : messages) {rateLimiter.acquire(); // 阻塞直到允許發送producer.send(msg);
}

? 方案二:調整?Broker 配置(需運維配合)

如果業務確實需要高吞吐,可以 調大 Broker 的客戶端流控閾值

修改?broker.conf?配置文件:
# 客戶端每秒最多發送消息條數(默認值可能為 1000 或更低)
clientManage.clientMaxMessageSendRequestSizePerSecond=2000# 或者:控制每秒最大發送字節數(單位:字節)
# clientManage.clientMaxMessageSendRequestSizePerSecond=10485760  # 10MB/s

?? 注意:該參數名稱在不同 RocketMQ 版本中可能略有不同,如:

  • clientManage.clientMaxMessageSendRequestSizePerSecond
  • clientManage.sendRequestLimit(舊版本)

📌 修改后重啟 Broker 生效


? 方案三:優化?Producer 發送方式

1. 使用?異步發送?替代同步

同步發送阻塞線程,容易觸發流控。

producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("發送成功");}@Overridepublic void onException(Throwable e) {System.out.println("發送失敗: " + e.getMessage());}
});
2. 使用?批量發送(Batch)
MessageBatch batch = MessageBatch.generateFromList(messageList);
try {producer.send(batch);
} catch (Exception e) {e.printStackTrace();
}

? 批量發送減少網絡請求次數,降低流控概率。


? 方案四:檢查是否?Producer 實例過多

  • 每個?DefaultMQProducer?實例都會占用連接和流控配額。
  • 避免在循環中創建 Producer,應?復用單個 Producer 實例
// ? 正確:單例復用
public class RocketMQClient {private static DefaultMQProducer producer;public static DefaultMQProducer getProducer() {if (producer == null) {synchronized (RocketMQClient.class) {if (producer == null) {producer = new DefaultMQProducer("GROUP_TEST");producer.start();}}}return producer;}
}

? 方案五:升級 RocketMQ 版本

  • 低版本 RocketMQ 的流控策略較嚴格。
  • 推薦使用 4.9.4 或 5.x 版本,流控更智能,支持動態調整。

? 總結:如何選擇?

場景推薦方案
臨時修復加?Thread.sleep()?或?RateLimiter?限流
長期穩定運行使用異步 + 批量發送 + 限流
業務必須高吞吐調整 Broker 配置?clientMaxMessageSendRequestSizePerSecond
避免問題復用 Producer 實例、避免短連接
根本解決代碼優化為主,配置調整為輔

🚨 建議最終處理步驟

  1. 立即修復:在跑批任務中加入?RateLimiter?或?sleep,控制發送速率。
  2. 優化代碼:改用?異步發送 + 批量提交
  3. 聯系運維:評估是否需要調大 Broker 的?clientMaxMessageSendRequestSizePerSecond
  4. 監控:觀察?broker.log?是否還有?CODE: 215?錯誤。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/95544.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/95544.shtml
英文地址,請注明出處:http://en.pswp.cn/web/95544.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

大語言模型(LLM)簡介與應用分享

1. 什么是大語言模型&#xff08;LLM&#xff09; 大語言模型&#xff08;Large Language Model&#xff0c;簡稱 LLM&#xff09;是基于 深度學習 和 海量文本數據 訓練而成的人工智能模型。 采用 Transformer 架構參數規模巨大&#xff08;數十億到數千億&#xff09;能夠 理…

【算法筆記】選擇排序、插入排序、冒泡排序、二分查找問題

算法的筆記&#xff0c;直接上代碼&#xff0c;思路和問題這些&#xff0c;都在代碼注釋上面 1、工具類 為了生成測試代碼和比較器&#xff0c;專門寫了一個數組工具類&#xff0c;代碼如下&#xff1a; /*** 數組工具類*/ public class ArrUtil {/*** 生成隨機數組* 長度是[0,…

行業分享丨基于SimSolid的大型汽車連續沖壓模具剛度分析

*本文投稿自機械零部件制造業用戶 汽車連續模具的剛度直接決定了沖壓件質量&#xff08;尺寸精度、表面缺陷&#xff09;與模具壽命。傳統有限元分析&#xff08;FEA&#xff09;在面對大型復雜模具裝配體時&#xff0c;存在網格劃分困難、計算資源消耗大、周期長等瓶頸。本文以…

用AI生成的html頁面設計放到到Axure上實現再改造的方法

要將 AI 生成的 HTML 原型導入 Axure&#xff0c;該方法的核心邏輯是以 Figma 為 “中間橋梁”&#xff08;因 Axure 無法直接讀取 HTML&#xff0c;需通過 Figma 轉換格式&#xff09;&#xff0c;分 3 步即可完成&#xff0c;以下是詳細操作指南&#xff08;含每步目標、具體…

從入門到實戰:Linux sed命令全攻略,文本處理效率翻倍

從入門到實戰&#xff1a;Linux sed命令全攻略&#xff0c;文本處理效率翻倍 文章目錄從入門到實戰&#xff1a;Linux sed命令全攻略&#xff0c;文本處理效率翻倍一、認識sed&#xff1a;什么是流編輯器&#xff1f;二、吃透sed工作原理&#xff1a;為什么它能高效處理文本&am…

TIOBE 8月編程語言榜深度解析:Python占比突破26%,Perl成最大黑馬

根據TIOBE最新發布的2025年8月編程語言排行榜&#xff0c;一場靜默的技術變革正在上演&#xff1a;Python以26.14%的占比首次突破26%大關&#xff0c;連續12個月穩居榜首。這一數據不僅刷新了Python自身的歷史紀錄&#xff0c;更成為TIOBE指數自2001年創立以來的最高單語言占比…

從發現到恢復,看瑞數信息如何構建“抗毀重構”實戰路徑

在信息化社會&#xff0c;“韌性”“彈性”這些詞匯常被用來形容系統抵御和應對風險的能力&#xff0c;但對于身處關鍵基礎設施行業的運營者來說&#xff0c;這些概念往往過于抽象&#xff0c;難以直接指導實踐。 相比之下&#xff0c;“抗毀重構”更具畫面感。它不僅是一個管理…

深入理解 jemalloc:從內存分配機制到技術選型

在高性能服務&#xff08;如數據庫、緩存、JVM&#xff09;的底層優化中&#xff0c;內存分配效率直接影響系統整體性能。本文將從操作系統底層的malloc機制切入&#xff0c;詳解 jemalloc 的設計理念、開源應用場景、實戰案例&#xff0c;技術選型分析 一、操作系統底層的內存…

websoket使用記錄

1.項目使用記錄1.醫療項目中渲染回收柜溫濕度&#xff0c;需要實時更新2.回收柜安瓿回收和余液回收時&#xff0c;需要前端發送指令給回收柜&#xff0c;比如開門、關門等。還需要收到回收柜結果&#xff0c;比如回收的藥品信息等。我項目中用的是瀏覽器自帶的websoket&#xf…

DevOps篇之通過GitLab CI 流水線實現k8s集群中helm應用發布

一. 設計思路 構建一個 GitLab CI 流水線&#xff0c;并且要集成到 K8s 集群中的 Helm 應用發布流程。首先&#xff0c;需要了解 GitLab CI 的基本結構&#xff0c;比如.gitlab-ci.yml 文件的配置&#xff0c;包括 stages、jobs、變量設置等。然后&#xff0c;結合之前討論的 H…

詳盡 | Deeplabv3+結構理解

https://arxiv.org/pdf/1802.02611.pdf https://link.springer.com/chapter/10.1007/978-3-319-10578-9_23 目錄 Deeplabv3 Encoder部分 Decoder部分 補充摘要 SPP 空間金字塔池化層模塊 Dilated/Atrous Conv 空洞卷積 Deeplabv3 deeplab-v3是語義分割網絡&#xff0c;組…

【51單片機】【protues仿真】基于51單片機音樂盒(8首歌曲)系統

目錄 一、主要功能 二、使用步驟 三、硬件資源 四、軟件設計 五、實驗現象 一、主要功能 1、數碼管顯示當前歌曲序號 2、按鍵切換歌曲和播放暫停? 3、內置8首音樂 二、使用步驟 基于51單片機的音樂盒是一種能夠存儲和播放多首歌曲的電子設備&#xff0c;通過定時器產…

@ZooKeeper 詳細介紹部署與使用詳細指南

文章目錄 **ZooKeeper 詳細介紹、部署與使用** 1. 概述 & 核心介紹 1.1 什么是 ZooKeeper? 1.2 核心特性 1.3 核心概念 1.4 典型應用場景 2. 部署 (以 3 節點集群為例) 2.1 環境準備 2.2 安裝步驟 (在所有節點執行) 2.3 啟動與停止集群 2.4 防火墻配置 (如果開啟) 3. 基本…

騰訊Hunyuan-MT-7B翻譯模型完全指南:2025年開源AI翻譯的新標桿

&#x1f3af; 核心要點 (TL;DR) 突破性成就&#xff1a;騰訊混元MT-7B在WMT25全球翻譯競賽中獲得30/31項第一名雙模型架構&#xff1a;Hunyuan-MT-7B基礎翻譯模型 Hunyuan-MT-Chimera-7B集成優化模型廣泛語言支持&#xff1a;支持33種語言互譯&#xff0c;包括5種中國少數民…

Web 集群高可用全方案:Keepalived+LVS (DR) 負載均衡 + Apache 服務 + NFS 共享存儲搭建指南

文章目錄Keepalived LVS&#xff08;DR&#xff09; Apache NFS項目背景業務場景與核心需求傳統架構的痛點與局限技術方案的選型邏輯項目價值與預期目標項目實踐項目環境基礎配置配置 router配置免密登錄-可選配置 nfs配置 web配置 LVS-RS配置 HA 和 LVS-DS配置 ha1配置 ha2測…

Prometheus監控預警系統深度解析:架構、優劣、成本與競品

目錄 一、Prometheus是什么&#xff1f;核心定位與架構 二、競品分析&#xff08;Prometheus vs. Zabbix vs. Nagios vs. Commercial SaaS&#xff09; 三、部署成本分析 四、服務器資源消耗分析 五、給您的最終建議 一、Prometheus是什么&#xff1f;核心定位與架構 Prom…

Nginx反向代理及配置

Nginx反向代理 二級域名系統 顧名思義&#xff0c;我們有很多的這個不同的二級域名的用戶來訪問我們&#xff0c;比如說微博。它有一個主域名weibo.com。如果我叫一鳴,申請了一個微博&#xff0c;然后我就可以在微博這個主系統上申請一個二級域名來訪問我微博的主頁&#xff0…

嵌入式系統通信總線全景探秘:從板內到云端

引言 在嵌入式系統設計中&#xff0c;選擇合適的通信總線是決定系統性能、成本和可靠性的關鍵因素。從簡單的芯片間通信到復雜的工業網絡&#xff0c;不同的總線技術各司其職&#xff0c;形成了嵌入式世界的"交通網絡"。本文將深入探討五種經典且重要的通信技術&…

2022版Unity創建時沒有2D燈光(2D Light),沒有Global LIght2D怎么辦?

簡單來說就是你的渲染管線沒有升級到URP管線&#xff0c;所以才沒有這些2D燈光 如果你的創建燈光和我一樣&#xff0c;沒有紅線劃掉的部分&#xff0c;說明你和我的問題一樣&#xff0c;看下面的教程可以解決。 1. 確保Unity版本 確保你的Unity版本至少為2019.4或更高版本&…

技術小白如何快速的了解opentenbase?--把握四大特色

1.基本介紹 作為一名計算機專業相關背景的學生&#xff0c;我們或多或者接觸過一些數據庫&#xff0c;對于數據庫肯定是有所了解的&#xff1b; 你可能學習的是和微軟的sql server這樣的數據庫&#xff1b; 你可能接觸的更多的是企業級項目開發里面使用的這個mysql數據庫&#…