package com.ldj.rocketmq.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** User: ldj* Date: 2024/5/26* Time: 15:09* Description: 局部順序消息*/
public class OrderProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("produce-group-order");producer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");producer.start();/*** 局部順序消息的要點的分2級,拿最外層的id作為計算隊列下標,讓相同一級的消息進入同一個隊列*/for (int i = 0; i < 5; i++) {for (int j = 0; j < 5; j++) {int orderId = i;Message message = new Message("OrderTopic", "orderMessage", ("order_step[" + orderId + "-" + j + "]").getBytes(StandardCharsets.UTF_8));producer.send(message, (mqs, msg, arg) -> {Integer id = (Integer) arg;if (id != null) {return mqs.get(id.hashCode() % mqs.size());}throw new RuntimeException("缺少決定消息順序的id!");}, orderId);}}}}
?
將一級分類下的所有信息收集進map,模擬順序消費信息
package com.ldj.rocketmq.consumer;import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** User: ldj* Date: 2024/5/26* Time: 16:08* Description: 順序消息消費者*/
public class OrderConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-order");consumer.setNamesrvAddr("192.168.208.190:9876;192.168.208.191:9876;192.168.208.192:9876");consumer.subscribe("OrderTopic", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(MessageModel.CLUSTERING);//訂單id做為key,消息作為valueMap<String, List<String>> concurrentHashMap = new ConcurrentHashMap<>();consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {msgs.forEach(msg -> {String str = new String(msg.getBody());int left = str.indexOf("[");int right = str.indexOf("]");String substring = str.substring(left + 1, right);String[] k_v = substring.split("-");List<String> messages = concurrentHashMap.get(k_v[0]);if (CollectionUtils.isEmpty(messages)) {List<String> msgFirstList = new ArrayList<>();msgFirstList.add(str);concurrentHashMap.put(k_v[0], msgFirstList);} else {List<String> msgAddList = concurrentHashMap.get(k_v[0]);msgAddList.add(str);concurrentHashMap.put(k_v[0], msgAddList);}});System.out.println(JSON.toJSONString(concurrentHashMap));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.println("消費者準備就緒...");}
}
?復制最后一行,并格式化