全面解剖 消息中間件 RocketMQ-(4)
一、RocketMQ 順序消息分析
1、消息有序:指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ 可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
2、順序消費的原理解析
在默認的情況下消息發送會采取 Round Robin 輪詢方式把消息發送到不同的 queue (分區隊列),而消費消息的時候從多個 queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息只依次發送到同一個 queue 中,消費的時候只從這個 queue 上依次拉取,則就保證了順序。當發送和消費參與的 queue 只有一個,則是全局有序;如果多個 queue 參與,則為分區有序,即相對每個 queue,消息都是有序的。
3、下面用訂單進行分區有序的示例。一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個 Orderld 獲取到的肯定是同一個隊列。
二、RocketMQ 順序消息發送者
1、在工程 rocketmq_demo (模塊)中,創建 訂單構建 實體類 OrderStep.java
/*** D:\java-test\idea2019\rocketmq_demo\src\main\java\djh\it\mq\rocketmq\order\OrderStep.java** 2024-6-2 創建 訂單構建 實體類 OrderStep.java*/package djh.it.mq.rocketmq.order;import java.util.ArrayList;
import java.util.List;public class OrderStep {private long orderId; //訂單 idprivate String desc; //訂單描述public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}public static List<OrderStep> buildOrders() {// 1039L : 創建 付款 推送 完成// 1065L : 創建 付款// 7235L : 創建 付款List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
2、在工程 rocketmq_demo (模塊)中,創建 順序消息發送 類 Producer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\order\Producer.java** 2024-6-2 創建 順序消息發送 類 Producer.java*/
package djh.it.mq.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws Exception {//1.創建消息生產者 producer,并制定生產者組名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址(集群配置)//producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)producer.setNamesrvAddr("172.18.30.110:9876");//3.啟動 producerproducer.start();//構建消息集合List<OrderStep> orderSteps = OrderStep.buildOrders();//發送消息for(int i=0; i<orderSteps.size(); i++){String body = orderSteps.get(i)+"";//4.創建消息對象,指定主題 Topic、Tag 和消息體//參數一:消息對象, 參數二:消息隊列選擇器, 參數三:選擇隊列的業務標識(訂單id)Message message = new Message("OrderTopic", "Order", "i"+i, body.getBytes());//5.發送 異步 消息SendResult sendResult = producer.send(message, new MessageQueueSelector(){/**** @param mqs :隊列集合* @param msg :消息對象 ** @param arg :業務標識的參數* @return*/public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {long orderId = (Long) arg;long index = orderId % mqs.size();return mqs.get((int) index);}}, orderSteps.get(i).getOrderId());System.out.println("發送結果:"+sendResult);}//6.關閉生產者 producer。producer.shutdown(); }
}
三、RocketMQ 順序消息消費者
1、在工程 rocketmq_demo (模塊)中,創建 順序消息消費 類 Consumer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\order\Consumer.java** 2024-6-2 創建 順序消息消費 類 Consumer.java 。*/
package djh.it.mq.rocketmq.order;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.創建消費者 Consumer,制定消費者組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址(集群配置)//consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)consumer.setNamesrvAddr("172.18.30.110:9876");//3.訂閱主題 Topic 和 Tagconsumer.subscribe("OrderTopic", "*"); //接收所有消息。//4.注冊消息監聽器consumer.registerMessageListener(new MessageListenerOrderly() {public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for(MessageExt msg : msgs){System.out.println("線程名稱:【"+Thread.currentThread().getName() + "】 消費消息:" + new String(msg.getBody())); //轉換為字符串消息}return ConsumeOrderlyStatus.SUCCESS;}});//5.啟動消費者 consumer。consumer.start();System.out.println("消費消息啟動了");}
}
2、先啟動 順序消息發送 類 Producer.java,再啟動 順序消息消費 類 Consumer.java 進行測試。
四、RocketMQ 延遲消息
1、RocketMQ 延遲消息
比如電商里,提交了一個訂單就可以發送一個延時消息,1h 后去檢查這個訂單的狀態,
如果還是未付款就取消訂單釋放庫存。
2、RocketMQ 延遲消息 使用限制
//org/apache/rocketmg/store/config/Messagestoreconfig.java
private string messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”.
現在 RocketMg 并不支持任意時間的延時,需要設置幾個固定的延時等級,從 1s 到 2h 分別對應著等級 1 到 18。
3、在工程 rocketmq_demo (模塊)中,創建 延遲消息 發送 類 Producer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\delay\Producer.java** 2024-6-2 創建 延遲消息 發送 類 Producer.java*/
package djh.it.mq.rocketmq.delay;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws Exception {//1.創建消息生產者 producer,并制定生產者組名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址(集群配置)//producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)producer.setNamesrvAddr("172.18.30.110:9876");//3.啟動 producerproducer.start();//發送消息for(int i=0; i<10; i++){//4.創建消息對象,指定主題 Topic、Tag 和消息體//參數一:消息主題 Topic, 參數二:消息 Tag, 參數三:消息內容Message msg = new Message("DelayTopic", "Tag1", ("Hello World"+i).getBytes());//設定延遲發送 時間為 5 秒(目前 rocketmq 支持的延遲等級:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h".)msg.setDelayTimeLevel(2);//5.發送消息SendResult result = producer.send(msg);System.out.println("發送結果:"+result);TimeUnit.SECONDS.sleep(1); //線程睡1秒}//6.關閉生產者 producer。producer.shutdown();}
}
4、在工程 rocketmq_demo (模塊)中,創建 延遲消息 消費 類 Consumer.java 。
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\delay\Consumer.java** 2024-6-2 創建 延遲消息 消費 類 Consumer.java 。*/
package djh.it.mq.rocketmq.delay;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.創建消費者 Consumer,制定消費者組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址(集群配置)//consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)consumer.setNamesrvAddr("172.18.30.110:9876");//3.訂閱主題 Topic 和 Tag//consumer.subscribe("base", "Tag1"); //接收同步消息//consumer.subscribe("base", "Tag2"); //接收異步消息前,可以讓先發送異步消息。//consumer.subscribe("base", "Tag1 | Tag2"); //接收同步消息 和 異步消息consumer.subscribe("DelayTopic", "*"); //接收所有消息。//添加消費模式//consumer.setMessageModel(MessageModel.CLUSTERING); //默認是負載均衡模式消費consumer.setMessageModel(MessageModel.BROADCASTING); //廣播模式消費//4.設置回調函數,處理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息內容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未轉換的字節碼for(MessageExt msg : msgs){System.out.println("消息ID:【" + msg.getMsgId()+"】,延遲時間:"+(System.currentTimeMillis()-msg.getStoreTimestamp())); //轉換為字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.啟動消費者 consumer。consumer.start();System.out.println("消費者啟動");}
}
5、先啟動 延遲消息 消費 類 Consumer.java 再啟動 延遲消息 發送 類 Producer.java 進行測試。
五、RocketMQ 批量消息發送
1、批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的 topic,相同的 waitstoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過無 4MB。如果消息的總長度可能大于4MB時,這時候最好把消息進行分割。
2、在工程 rocketmq_demo (模塊)中,創建 批量消息發送 發送 類 Producer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\batch\Producer.java** 2024-6-2 創建 批量消息發送的 發送 類 Producer.java*/
package djh.it.mq.rocketmq.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws Exception {//1.創建消息生產者 producer,并制定生產者組名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址(集群配置)//producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)producer.setNamesrvAddr("172.18.30.110:9876");//3.啟動 producerproducer.start();//創建一個集合List<Message> msgs = new ArrayList<Message>();//發送消息//4.創建消息對象,指定主題 Topic、Tag 和消息體//參數一:消息主題 Topic, 參數二:消息 Tag, 參數三:消息內容Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World"+1).getBytes());Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World"+2).getBytes());Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World"+3).getBytes());msgs.add(msg1);msgs.add(msg2);msgs.add(msg3);//5.發送消息SendResult result = producer.send(msgs);System.out.println("發送結果:"+result);TimeUnit.SECONDS.sleep(1); //線程睡1秒//6.關閉生產者 producer。producer.shutdown();}
}
3、在工程 rocketmq_demo (模塊)中,創建 批量消息發送 消費 類 Consumer.java 。
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\batch\Consumer.java** 2024-6-2 創建 批量消息發送 消費 類 Consumer.java 。*/
package djh.it.mq.rocketmq.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.創建消費者 Consumer,制定消費者組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址(集群配置)//consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)consumer.setNamesrvAddr("172.18.30.110:9876");//3.訂閱主題 Topic 和 Tagconsumer.subscribe("BatchTopic", "*"); //接收所有消息。//4.設置回調函數,處理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息內容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未轉換的字節碼for(MessageExt msg : msgs){System.out.println("consumeThread=" + Thread.currentThread().getName()+", "+new String(msg.getBody())); //轉換為字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.啟動消費者 consumer。consumer.start();System.out.println("消費者啟動");}
}