RocketMq局部順序消息

package com.ldj.rocketmq.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** User: ldj* Date: 2024/5/26* Time: 15:09* Description: 局部順序消息*/
public class OrderProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("produce-group-order");producer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");producer.start();/*** 局部順序消息的要點的分2級,拿最外層的id作為計算隊列下標,讓相同一級的消息進入同一個隊列*/for (int i = 0; i < 5; i++) {for (int j = 0; j < 5; j++) {int orderId = i;Message message = new Message("OrderTopic", "orderMessage", ("order_step[" + orderId + "-" + j + "]").getBytes(StandardCharsets.UTF_8));producer.send(message, (mqs, msg, arg) -> {Integer id = (Integer) arg;if (id != null) {return mqs.get(id.hashCode() % mqs.size());}throw new RuntimeException("缺少決定消息順序的id!");}, orderId);}}}}

?

將一級分類下的所有信息收集進map,模擬順序消費信息

package com.ldj.rocketmq.consumer;import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** User: ldj* Date: 2024/5/26* Time: 16:08* Description: 順序消息消費者*/
public class OrderConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-order");consumer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");consumer.subscribe("OrderTopic", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(MessageModel.CLUSTERING);//訂單id做為key,消息作為valueMap<String, List<String>> concurrentHashMap = new ConcurrentHashMap<>();consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {msgs.forEach(msg -> {String str = new String(msg.getBody());int left = str.indexOf("[");int right = str.indexOf("]");String substring = str.substring(left + 1, right);String[] k_v = substring.split("-");List<String> messages = concurrentHashMap.get(k_v[0]);if (CollectionUtils.isEmpty(messages)) {List<String> msgFirstList = new ArrayList<>();msgFirstList.add(str);concurrentHashMap.put(k_v[0], msgFirstList);} else {List<String> msgAddList = concurrentHashMap.get(k_v[0]);msgAddList.add(str);concurrentHashMap.put(k_v[0], msgAddList);}});System.out.println(JSON.toJSONString(concurrentHashMap));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.println("消費者準備就緒...");}
}

?復制最后一行,并格式化

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/16122.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/16122.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/16122.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Linux】$()中的內容與不加$()時有什么區別

$()中的內容與不加$()有什么區別&#xff0c;例如$(/usr/local/hadoop/bin/hadoop classpath)與/usr/local/hadoop/bin/hadoop classpath兩者有何區別&#xff1f;&#xff1f;&#xff1f; 關于這個問題&#xff0c;筆者建議可以參考如下文章&#xff1a; Linux—shell中$((…

css卡片翻轉 父元素翻轉子元素不翻轉效果

css卡片翻轉 父元素翻轉子元素不翻轉效果 vue <div class"moduleBox"><div class"headTitle"><span class"headName">大額案例</span></div><div class"moduleItem"><span class"module…

three.js判斷物體在人的前面,還是后面

three.js判斷物體在人的前面&#xff0c;還是后面 const player new THREE.Vectors(10, 0, 5); const mesh new THREE.Vectors(15, 0, 6);上面&#xff0c;兩個變量分別表示&#xff0c;玩家的位置&#xff0c;物體的位置。 從這發現&#xff0c;當玩家和物體的角度關系 小…

spring boot 整合j2cache 項目啟動警告 Redis mode [null] not defined. Using ‘single‘

好 之前的文章 spring boot 整合j2cache 基礎操作 在spring boot環境中整合了 j2cache 我們 項目啟動時 日志會有一個關鍵信息 Redis的模式 沒有定義 默認使用 single Redis 的這個模式有四種 大家可以自己去網上找一下 做個了解 不用很糾結 我們直接在 j2cache.properties …

一文讀懂Apollo客戶端配置加載流程

本文基于 apollo-client 2.1.0 版本源碼進行分析 Apollo 是攜程開源的配置中心&#xff0c;能夠集中化管理應用不同環境、不同集群的配置&#xff0c;配置修改后能夠實時推送到應用端&#xff0c;并且具備規范的權限、流程治理等特性。 Apollo支持4個維度管理Key-Value格式的配…

Elasticsearch智能數據分析平臺項目

Elasticsearch智能數據分析平臺項目是一個功能強大且靈活的數據分析工具,旨在幫助企業快速、準確地分析和挖掘數據中的價值。以下是關于該項目的一些關鍵特點和功能: 數據搜索: Elasticsearch作為全球下載量最大的搜索引擎,支持從關鍵字搜索到向量搜索等多樣化搜索方式,讓…

比勤奮更重要的是系統思考的能力

不要在接近你問題癥狀的地方尋找解決辦法&#xff0c;要追溯過去&#xff0c;查找問題的根源。通常&#xff0c;最有效的活動是最微妙的。有時最好按兵不動&#xff0c;使系統自我修正&#xff0c;或讓系統引導行動。有時會發現&#xff0c;最好的解決辦法出現在完全出乎預料的…

HTML藍色愛心

目錄 寫在前面 HTML入門 完整代碼 代碼分析 運行結果 系列推薦 寫在后面 寫在前面 最近好冷吖&#xff0c;小編給大家準備了一個超級炫酷的愛心&#xff0c;一起來看看吧&#xff01; HTML入門 HTML全稱為HyperText Markup Language&#xff0c;是一種標記語言&#…

C++-指針

在C中&#xff0c;指針是至關重要的組成部分。它是C語言最強大的功能之一&#xff0c;也是最棘手的功能之一。 指針具有強大的能力&#xff0c;其本質是協助程序員完成內存的直接操縱。 指針&#xff1a;特定類型數據在內存中的存儲地址&#xff0c;即內存地址。 指針變量的定…

2024.5組隊學習——MetaGPT(0.8.1)智能體理論與實戰(下):多智能體開發

傳送門&#xff1a; 《2024.5組隊學習——MetaGPT&#xff08;0.8.1&#xff09;智能體理論與實戰&#xff08;上&#xff09;&#xff1a;MetaGPT安裝、單智能體開發》《2024.5組隊學習——MetaGPT&#xff08;0.8.1&#xff09;智能體理論與實戰&#xff08;中&#xff09;&…

ModelBuilder之GDP空間化——批量值提取

一、前言 前面明確說到對于空間化過程中其實只有兩個過程可以進行批量操作,一個是我們燈光指數提取過程和批量進行值提取,這里補充一點,對于燈光指數計算可以實現批量計算總燈光指數和平均燈光指數,綜合燈光指數需要用平均燈光指數乘以面積占比求得,面積比就是(DN大于0的…

VS2022通過C++網絡庫Boost.asio搭建一個簡單TCP異步服務器和客戶端

基本介紹 上一篇博客我們介紹了通過Boost.asio搭建一個TCP同步服務器和客戶端&#xff0c;這次我們再通過asio搭建一個異步通信的服務器和客戶端系統&#xff0c;由于這是一個簡單異步服務器&#xff0c;所以我們的異步特指異步服務器而不是異步客戶端&#xff0c;同步服務器在…

BGP選路規則

配置地址&#xff0c;AS123使用ospf保證通訊&#xff0c;修改接口類型保證ospf學習環回20.0,30.0,100.0 地址時&#xff0c;是以24位掩碼學習&#xff0c;R1&#xff0c;R2&#xff0c;R3都處于BGP邊界&#xff0c;各自都需要宣告三者的私網環回 1&#xff0c; [R4]ip ip-prefi…

點分治練習

P3806 【模板】點分治 1 #include <bits/stdc.h> using namespace std;inline long long read() {char ch getchar();long long f 1,x 0;while (ch > 9 || ch < 0) { if (ch -)f -1; ch getchar(); }while (ch > 0 && ch < 9) { x (x <&l…

Thrift學習深入

Thrift學習深入 https://zhuanlan.zhihu.com/p/22934974 https://zhuanlan.zhihu.com/p/26993406 從具體的demo入手,我們需要學習的是三部分 IDLserver端client端一、IDL深入 IDL定義的通用類型有: 基礎數據類型結構體容器 list、set、map異常:語法與結構體無異,不過用…

第十二周筆記

微信小程序的自定義事件是指開發者可以自行定義并觸發的事件&#xff0c;以實現特定的功能或邏輯。通過自定義事件&#xff0c;開發者可以更靈活地管理小程序的交互和數據流動&#xff0c;提升用戶體驗和開發效率。下面我將詳細講解微信小程序自定義事件&#xff0c;包括定義、…

容器化部署

目錄 docker容器化部署 怎樣使用Docker Compose或Kubernetes等容器編排工具來管理和擴展聯邦學習系統 使用Docker Compose

【Qnx 】Qnx IPC通信PPS

Qnx IPC通信PPS Qnx自帶PPS服務&#xff0c;PPS全稱Persistent Publish/Subscribe Service&#xff0c;就是常見的P/S通信模式。 Qnx PPS的通信模式是異步的&#xff0c;Publisher和Subscriber也無需關心對方是否存在。 利用Qnx提供的PPS服務&#xff0c;Publisher可以通知多…

嵌入式進階——LED呼吸燈(PWM)

&#x1f3ac; 秋野醬&#xff1a;《個人主頁》 &#x1f525; 個人專欄:《Java專欄》《Python專欄》 ??心若有所向往,何懼道阻且長 文章目錄 PWM基礎概念STC8H芯片PWMA應用PWM配置詳解占空比 PWM基礎概念 PWM全稱是脈寬調制&#xff08;Pulse Width Modulation&#xff09…

Arduino下載與安裝(Windows 10)

Arduino下載與安裝(Windows 10) 官網 下載安裝 打開官網&#xff0c;點擊SOFTWARE&#xff0c;進入到軟件下載界面&#xff0c;選擇Windows 選擇JUST DOWNLOAD 在彈出的界面中&#xff0c;填入電子郵件地址&#xff0c;勾選Privacy Policy&#xff0c;點擊JUST DOWNLOAD即可 …