010 rocketmq批量消息

文章目錄

  • 批量消息
  • BatchProducer.java
  • BatchConsumer.java

批量消息

批量發送可以提?發送性能,但有?定的限制:
topic 相同
waitStoreMsgOK 相同 (?先我們建設消息的iswaitstoremsgok=true(默認為true), 如果沒有異常,我們將始終收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延時發送
?批消息的大小不能?于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因為消息是動態的,不注意的話就可能超限,就會報錯:
計算消息的大小
= (topic + body + (key + value) * N) * 吞吐量

int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();
}

BatchProducer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);//計算消息的大小 = (topic + body + (key + value) * N) * 吞吐量int tmpSize = message.getTopic().length() + message.getBody().length;//屬性值的添加Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {//+key + valuetmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

BatchConsumer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72219.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72219.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72219.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

6.6.6 嵌入式SQL

文章目錄 2個核心問題識別SQL語句主語言和SQL通信完整導圖 2個核心問題 SQL語句嵌入高級語言需要解決的2個核心問題是&#xff1a;如何識別嵌入語句&#xff1f;如何讓主語言&#xff08;比如C,C語言&#xff09;和SQL通信&#xff1f; 識別SQL語句 為了識別主語言中嵌入的SQL…

Windows安裝sql server2017

看了下官網的文檔&#xff0c;似乎只有ubuntu18.04可以安裝&#xff0c;其他debian系的都不行&#xff0c;還有通過docker的方式安裝的。 雙擊進入下載的ISO&#xff0c;點擊執行可執行文件&#xff0c;并選擇“是” 不要勾選 警告而已&#xff0c;不必理會 至少勾選這兩…

RuoYi框架介紹,以及如何基于Python使用RuoYi框架

若依框架&#xff08;RuoYi&#xff09;是一款基于Spring Boot和Vue.js的開源快速開發平臺&#xff0c;廣泛應用于企業級應用開發。它提供了豐富的功能模塊和代碼生成工具&#xff0c;幫助開發者快速搭建后臺管理系統。 主要特點 前后端分離&#xff1a;前端采用Vue.js&#x…

從零搭建Tomcat:深入理解Java Web服務器的工作原理

Tomcat是Java生態中最常用的Web服務器之一&#xff0c;廣泛應用于Java Web應用的部署和運行。本文將帶你從零開始搭建一個簡易的Tomcat服務器&#xff0c;深入理解其工作原理&#xff0c;并通過代碼實現一個基本的Servlet容器。 1. Tomcat的基本概念 Tomcat是一個開源的Servl…

京東云DeepSeek-R1模型一鍵部署教程,基于智算GCS【成本2元】

使用京東云智算一鍵部署DeepSeek-R1模型&#xff0c;京東云智算服務AI平臺GCS支持DeepSeek-R1模型預裝環境&#xff0c;支持1.5B、7B、32B及70B參數模型環境&#xff0c;用戶可在GCS中快速啟動&#xff0c;使用ChatbotUI或者Open-WebUI作為用戶界面&#xff0c;進行測試并接入業…

Jenkins 自動打包項目鏡像部署到服務器 ---(前端項目)

Jenkins 新增前端項目Job 指定運行的節點 選擇部署運行的節點標簽&#xff0c;dev標簽對應開發環境 節點的遠程命令執行配置 jenkins完整流程 配置源碼 拉取 Credentials添加 觸發遠程構建 配置后可以支持遠程觸發jenkins構建&#xff08;比如自建的CICD自動化發布平臺&…

7.2 - 定時器之計算脈沖寬度實驗

文章目錄 1 實驗任務2 系統框圖3 軟件設計 1 實驗任務 本實驗任務是通過CPU私有定時器來計算按鍵按下的時間長短。 2 系統框圖 參見7.1。 3 軟件設計 注意事項&#xff1a; 定時器是遞減計數的&#xff0c;需要考慮StartCount&#xff1c;EndCount的情況。 /***********…

雙機熱備旁掛組網實驗

1拓撲圖 2.要求 1 、 SW3 的流量 正常情況下&#xff1a; SW1_VRF-->FW1--->SW1_Public--->R5 故障情況下&#xff1a; SW2_VRF-->FW2--->SW2_Public--->R6 2 、 SW4 的流量 正常情況下&#xff1a; SW2_VRF-->FW2--->SW2_Public--->R6 故障情…

2025春新生培訓數據結構(樹,圖)

教學目標&#xff1a; 1&#xff0c;清楚什么是樹和圖&#xff0c;了解基本概念&#xff0c;并且理解其應用場景 2&#xff0c;掌握一種建圖&#xff08;樹&#xff09;方法 3&#xff0c;掌握圖的dfs和樹的前中后序遍歷 例題與習題 2025NENU新生培訓&#xff08;樹&#…

HTML 日常開發常用標簽

文章目錄 HTML 日常開發常用標簽1、基本結構標簽2、內容標簽3、多媒體標簽4、表單標簽5、列表和定義標簽6、表格標簽7、鏈接和圖像8、元數據9、語義化標簽&#xff08;HTML5新增&#xff09;10、框架和內聯11、交互12、過時或不推薦使用的標簽 HTML 日常開發常用標簽 1、基本結…

7.1.1 計算機網絡的組成

文章目錄 物理組成功能組成工作方式完整導圖 物理組成 計算機網絡是將分布在不同地域的計算機組織成系統&#xff0c;便于相互之間資源共享、傳遞信息。 計算機網絡的物理組成包括硬件和軟件。硬件中包含主機、前端處理器、連接設備、通信線路。軟件中包含協議和應用軟件。 功…

【AI論文】MedVLM-R1:通過強化學習激勵視覺語言模型(VLMs)的醫療推理能力

摘要&#xff1a;推理是推進醫學影像分析的關鍵前沿領域&#xff0c;其中透明度和可信度對于贏得臨床醫生信任和獲得監管批準起著核心作用。盡管醫學視覺語言模型&#xff08;VLMs&#xff09;在放射學任務中展現出巨大潛力&#xff0c;但大多數現有VLM僅給出最終答案&#xff…

國產RISCV64 也能跑AI

Banana Pi BPI-F3 進控時空 K1開發板 AI人工智能AI 部署工具使用手冊_bianbu software-CSDN博客 文章置頂了 有興趣的可以一起留言探索&#xff0c;非常有意思&#xff1a; 我最近接觸到了進迭時空研發的 Spacengine?&#xff0c;這是一套能在進迭時空 RISC-V 系列芯片上部署…

APISIX Dashboard上的配置操作

文章目錄 登錄配置路由配置消費者創建后端服務項目配置上游再創建一個路由測試 登錄 http://192.168.10.101:9000/user/login?redirect%2Fdashboard 根據docker 容器里的指定端口&#xff1a; 配置路由 通過apisix 的API管理接口來創建&#xff08;此路由&#xff0c;直接…

【WPF】綁定報錯:雙向綁定需要 Path 或 XPath

背景 最開始使用的是 TextBlock: <ItemsControl ItemsSource"{Binding CameraList}"><ItemsControl.ItemsPanel><ItemsPanelTemplate><StackPanel Orientation"Horizontal"/></ItemsPanelTemplate></ItemsControl.Item…

Kotlin協變與逆變區別

在Kotlin中&#xff0c;協變和逆變是泛型編程中的兩個重要概念&#xff0c;它們允許我們在類型系統中更加靈活地處理類型關系。 1.協變&#xff1a;協變允許我們使用比原始類型更具體的類型。在kotlin中&#xff0c;通過在類型參數上加out關鍵字來表示協變,生產者&#xff0c;例…

如何調試Linux內核?

通過創建一個最小的根文件系統&#xff0c;并使用QEMU和GDB進行調試。 1.準備工作環境 確保系統上安裝了所有必要的工具和依賴項。 sudo apt-get update //更新一下軟件包 sudo apt-get install build-essential git libncurses-dev bison flex libssl-dev qemu-system-x…

Java 調試模式下 Redisson 看門狗失效

一、場景分析 前幾天在做分布式鎖測試&#xff1a; 在調試模式下&#xff0c;lock.lock() 之后打上斷點&#xff0c;想測試一下在當前線程放棄鎖之前&#xff0c;別的線程能否獲取得到鎖。 發現調試模式下&#xff0c;看門狗機制失效了&#xff0c;Redis 上 30 秒后&#xff0…

GPT-4.5震撼登場,AI世界再掀波瀾!(3)

GPT-4.5震撼登場&#xff0c;AI世界再掀波瀾! GPT-4.5震撼登場&#xff0c;AI世界再掀波瀾!(2) &#xff08;一&#xff09;倫理困境&#xff1a;如何抉擇 GPT-4.5 的強大功能在為我們帶來諸多便利的同時&#xff0c;也引發了一系列深刻的倫理問題&#xff0c;這些問題猶如高…

【數據挖掘】Pandas

Pandas 是 Python 進行 數據挖掘 和 數據分析 的核心庫之一&#xff0c;提供了強大的 數據清洗、預處理、轉換、分析 和 可視化 功能。它通常與 NumPy、Matplotlib、Seaborn、Scikit-Learn 等庫結合使用&#xff0c;幫助構建高效的數據挖掘流程。 &#x1f4cc; 1. 讀取數據 P…