文章目錄
- 消費模式
- 同步消息
- 異步消息
- 單向消息
- 延遲消息
- 批量消息
- 順序消息
- 事務消息
- Tag標簽和Key鍵
- Tag的使用
- Key的使用
首先引入rocketmq的依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version>
</dependency>
然后我們編寫一個簡單的生產者和消費者
@SpringBootTest
public class RocketMQTest {/*** 對于生產者 同一組的生產者可以向不同的topic隊列發送消息*/@Testpublic void produce() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();Message message = new Message("testTopic","一個簡單的消息".getBytes());SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());producer.shutdown();}/*** 對于消費者 同一組的消費者只能接收同一個topic的消息* 并且如果存在多個消費者組,他們都監聽同一個topic的消息* 那么就可以選擇使用 負載均衡策略 或者 廣播策略*/@Testpublic void consume() throws MQClientException, IOException {//創建一個消費者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");consumer.setNamesrvAddr(MQConstant.NAMESRV);// * 標識訂閱這個主題中的所有消息 后期會有消息過濾consumer.subscribe("testTopic", "*");//設置一個監聽器 (他會一直監聽,然后是一個異步回調的機制)//那么我們就不能讓他start之后這個方法就返回結束 需要掛起當前的JVM(test模式得這樣子)//正常運行項目的時候項目的JVM會正常運行的 不需要掛起consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {//這個就是對應的消費方法 業務處理//消息如果消費失敗 那么就要重新放入到消費隊列System.out.println("我是消費者");System.out.println(list.get(0).toString());System.out.println("消息上下文"+context);//返回值如果為null/報錯/RECONSUMER_LATER 代表消費失敗//消息會重新回到隊列 然后過一會在投遞給當前消費者或者其他消費者return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//啟動consumer.start();//掛起當前的JVMSystem.in.read();}
}
這里需要注意的是,對于Rocketmq,如果在你的監聽器中,也就是這個MessageListenerConcurrently中,你的返回值為null,或者ConsumeConcurrentlyStatus.RECONSUME_LATER,亦或者拋出了一個異常,那么這條消息都會重新的被放回到我們的隊列中,等待其他消費者或者當前消費者再一次消費。
消費模式
MQ的消費模式可以大致分為兩種,一種是推Push,一種是拉Pull。
Push是服務端【MQ】主動推送消息給客戶端,優點是及時性較好,但如果客戶端沒有做好流控,一旦服務端推送大量消息到客戶端時,就會導致客戶端消息堆積甚至崩潰。
Pull是客戶端需要主動到服務端取數據,優點是客戶端可以依據自己的消費能力進行消費,但拉取的頻率也需要用戶自己控制,拉取頻繁容易造成服務端和客戶端的壓力,拉取間隔長又容易造成消費不及時。
Push模式也是基于pull模式的,只能客戶端內部封裝了api,一般場景下,上游消息生產量小或者均速的時候,選擇push模式。在特殊場景下,例如電商大促,搶優惠券等場景可以選擇pull模式
同步消息
上面的快速入門就是發送同步消息,發送過后會有一個返回值,也就是mq服務器接收到消息后返回的一個確認,這種方式非常安全,但是性能上并沒有這么高,而且在mq集群中,也是要等到所有的從機都復制了消息以后才會返回,所以針對重要的消息可以選擇這種方式
異步消息
異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。發送完以后會有一個異步消息通知。
@Test
public void testAsyncProducer() throws Exception {// 創建默認的生產者DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 設置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 啟動實例producer.start();Message msg = new Message("testTopic", ("異步消息").getBytes());producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("發送成功");}@Overridepublic void onException(Throwable e) {System.out.println("發送失敗");}});System.out.println("看看誰先執行");// 掛起jvm 因為回調是異步的不然測試不出來System.in.read();// 關閉實例producer.shutdown();
}
單向消息
這種方式主要用在不關心發送結果的場景,這種方式吞吐量很大,但是存在消息丟失的風險,例如日志信息的發送。
@Test
public void testOnewayProducer() throws Exception {// 創建默認的生產者DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 設置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 啟動實例producer.start();Message msg = new Message("testTopic", ("單向消息").getBytes());// 發送單向消息producer.sendOneway(msg);// 關閉實例producer.shutdown();
}
延遲消息
消息放入mq后,過一段時間,才會被監聽到,然后消費
比如下訂單業務,提交了一個訂單就可以發送一個延時消息,30min后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
這里注意的是RocketMQ不支持任意時間的延時
只支持以下幾個固定的延時等級,等級1就對應1s,以此類推,最高支持2h延遲
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
@Test
public void testDelayProducer() throws Exception {// 創建默認的生產者DefaultMQProducer producer = new DefaultMQProducer("test-group");// 設置nameServer地址producer.setNamesrvAddr("localhost:9876");// 啟動實例producer.start();Message msg = new Message("TopicTest", ("延遲消息").getBytes());// 給這個消息設定一個延遲等級// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);// 發送單向消息producer.send(msg);// 打印時間System.out.println(new Date());// 關閉實例producer.shutdown();
}
批量消息
批量消息就是一次性發送一個消息集合出去。
@Test
public void testBatchProducer() throws Exception {// 創建默認的生產者DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 設置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 啟動實例producer.start();List<Message> messages = Arrays.asList(new Message("testTopic", "批量消息1".getBytes()),new Message("testTopic", "批量消息2".getBytes()),new Message("testTopic", "批量消息3".getBytes()));producer.send(messages);System.out.println("批量執行任務");// 掛起jvm 因為回調是異步的不然測試不出來System.in.read();// 關閉實例producer.shutdown();
}
順序消息
我們知道一個topic中可以有多個隊列,那么如果我們的消息發送到多個隊列中去,那么很明顯我們的消息消費就是并行消費的,也就是沒有了順序性。
因此如果我們需要發送順序消息,也就是希望MQ那邊的消費者順序的消費一些消息,我們就得按照如下方式發送順序消息。
消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為:分區有序或者全局有序。
可能大家會有疑問,mq不就是FIFO嗎?
rocketMq的broker的機制,導致了rocketMq會有這個問題
因為一個broker中對應了四個queue。
不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。
下面用訂單進行分區有序的示例。一個訂單的順序流程是:下訂單、發短信通知、物流、簽收。訂單順序號相同的消息會被先后發送到同一個隊列中,消費時,同一個順序獲取到的肯定是同一個隊列。
package zhang.blossom.seckillbyrocketmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import zhang.blossom.seckillbyrocketmq.constant.MQConstant;
import zhang.blossom.seckillbyrocketmq.entity.MsgModel;import java.io.IOException;
import java.util.Arrays;
import java.util.List;/*** @author: 張錦標* @date: 2023/8/17 9:58* OrderedRocketMQTest類*/@SpringBootTest
public class OrderedRocketMQTest {private List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1L, "下單"),new MsgModel("qwer", 1L, "短信"),new MsgModel("qwer", 1L, "物流"),new MsgModel("zxcv", 2L, "下單"),new MsgModel("zxcv", 2L, "短信"),new MsgModel("zxcv", 2L, "物流"));//發送順序消息@Testpublic void orderedProducer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();//發送順序消息 發送時要確保有序 并且要發送到同一個隊列下面去msgModels.forEach(msgModel -> {Message message = new Message("testTopic",msgModel.toString().getBytes());try {//發送 相同的訂單號應該去相同的隊列producer.send(message, new MessageQueueSelector() {//這里的send方法的第三個參數arg 就是這個隊列選擇器的第三個參數 會傳遞過來@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object arg) {//這個方法的返回值就是要選擇的隊列//這里可以用hash的方式就可以選擇到同樣的隊列了int hash = arg.toString().hashCode();int index = hash % list.size();return list.get(index);}}, msgModel.getOrderSn());} catch (MQClientException e) {throw new RuntimeException(e);} catch (RemotingException e) {throw new RuntimeException(e);} catch (MQBrokerException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}});producer.shutdown();System.out.println("發送完畢");}@Testpublic void orderedConsumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe("testTopic", "*");//MessageListenerConcurrently 并發模式 多線程的 失敗后最多重試16次 然后放入死信隊列//MessageListenerOrderly 順序模式 單線程的 失敗后無限次重試 Integer.MAX_VALUEconsumer.registerMessageListener(new MessageListenerOrderly() {//順序模式只有一個線程來執行消費@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {//這里的一個線程是一個隊列一個線程System.out.println(new String(list.get(0).getBody()));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.in.read();}
}
事務消息
一般我們不使用RocketMQ的事務消息,所以有興趣的可以看看其他的實現。
Tag標簽和Key鍵
Rocketmq提供消息過濾功能,通過tag或者key進行區分。
我們往一個主題里面發送消息的時候,根據業務邏輯,可能需要區分,比如帶有tagA標簽的被A消費,帶有tagB標簽的被B消費,還有在事務監聽的類里面,只要是事務消息都要走同一個監聽,我們也需要通過過濾才區別對待。
Tag的使用
@Test
public void tagProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();Message message1 = new Message("testTopic", "test1","test1的消息".getBytes() );Message message2 = new Message("testTopic", "test2","test2的消息".getBytes() );producer.send(message1);producer.send(message2);producer.shutdown();System.out.println("消息發送成功");
}@Test
public void test1Consumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe("testTopic", "test1");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("消費test1的消息"+new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
@Test
public void test2Consumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe("testTopic", "test1 || test2");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("消費test1/test2的消息"+new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
什么時候該用Topic,什么時候該用 Tag?
總結:不同的業務應該使用不同的Topic如果是相同的業務里面有不同表的表現形式,那么我們要使用tag進行區分
可以從以下幾個方面進行判斷:
1.消息類型是否一致:如普通消息、事務消息、定時(延時)消息、順序消息,不同的消息類型使用不同的 Topic,無法通過 Tag 進行區分。
2.業務是否相關聯:沒有直接關聯的消息,如淘寶交易消息,京東物流消息使用不同的 Topic 進行區分;而同樣是天貓交易消息,電器類訂單、女裝類訂單、化妝品類訂單的消息可以用 Tag 進行區分。
3.消息優先級是否一致:如同樣是物流消息,盒馬必須小時內送達,天貓超市 24 小時內送達,淘寶物流則相對會慢一些,不同優先級的消息用不同的 Topic 進行區分。
4.消息量級是否相當:有些業務消息雖然量小但是實時性要求高,如果跟某些萬億量級的消息使用同一個 Topic,則有可能會因為過長的等待時間而“餓死”,此時需要將不同量級的消息進行拆分,使用不同的 Topic。
總的來說,針對消息分類,您可以選擇創建多個Topic,或者在同一個 Topic 下創建多個 Tag。但通常情況下,不同的 Topic 之間的消息沒有必然的聯系,而 Tag 則用來區分同一個 Topic 下相互關聯的消息,例如全集和子集的關系、流程先后的關系。
Key的使用
在rocketmq中的消息,默認會有一個messageId當做消息的唯一標識,我們也可以給消息攜帶一個key,用作唯一標識或者業務標識,包括在控制面板查詢的時候也可以使用messageId或者key來進行查詢。
@Test
public void testKeyProducer() throws Exception {// 創建默認的生產者DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 設置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 啟動實例producer.start();Message msg = new Message("testTopic","test1","key", "我是一個帶標記和key的消息".getBytes());SendResult send = producer.send(msg);System.out.println(send);// 關閉實例producer.shutdown();
}@Test
public void testKeyConsumer() throws Exception {// 創建默認消費者組DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");// 設置nameServer地址consumer.setNamesrvAddr(MQConstant.NAMESRV);// 訂閱一個主題來消費 表達式,默認是*,支持"tagA || tagB || tagC" 這樣或者的寫法 只要是符合任何一個標簽都可以消費consumer.subscribe("testTopic", "test1 || test2 || test3");// 注冊一個消費監聽 MessageListenerConcurrently是并發消費// 默認是20個線程一起消費,可以參看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 這里執行消費的代碼 默認是多線程消費System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));System.out.println(msgs.get(0).getTags());System.out.println(msgs.get(0).getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}