
有時候我們在使用消息隊列的時候,往往需要能夠保證消息的順序消費,而RocketMQ是可以支持消息的順序消費的。
RocketMQ在發送消息的時候,是將消息發送到不同的隊列中,然后消費端從多個隊列中讀取消息進行消費,很明顯,在這種全局模式下,是無法實現順序消費的。
為了實現順序消費,我們需要把有順序的消息按照他的順序,將他們發送到同一個隊列中,這樣消費端在消費的時候,就保證了其順序。
但是順序消費的性能肯定也相對差一些,因為只能使用一個隊列。
一、在pom.xml中添加依賴:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version>
</dependency>
二、在application.yml中配置RocketMQ地址:
server:port: 8888rocketmq:name-server: 127.0.0.1:9876producer:group: ${spring.application.name}sendMessageTimeout: 300000
備注:官方下載RocketMQ,本地啟動RocketMQ。
三、 一個簡單的生產消費案例:
生產者:向 stringTopic 的主題中發送一個 Hello RecketMQ 的字符串。
@RestController@RequestMapping("/mq")public class ProducerController {@Resourceprivate RocketMQTemplate rocketMQTemplate;@RequestMapping("/sync/send1")public String syncSendString() {//發送一個同步消息,會返回值 ---發送到 stringTopic 主題SendResult sendResult = rocketMQTemplate.syncSend("stringTopic", "Hello RocketMQ");return sendResult.toString();}}
消費者:監聽 stringTopic 主題。
@Service@RocketMQMessageListener(topic = "stringTopic", consumerGroup = "string_consumer")public class StringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("消費者接收消息:" + message);}}
1、啟動當前服務。
2、用瀏覽器或者HTTP Client工具訪問:http://localhost:8888/mq/sync/send1
3、查看控制臺輸出:【消費者接收消息:Hello RocketMQ】即表示消息消費成功。
四、實現順序消費
生產者: 生產多條消息,方便觀察順序。向 orderTopic 主題發送5條消息,內容分別是 no1 no2 no3 no4 no5。第三個參數是order ,他的作用是會根據他的hash值計算發送到哪一個隊列。用同一個值order,那么他們的hash一樣。可以保證發送到同一個隊列里。
@RestController@RequestMapping("/mq")public class ProducerController {@Resourceprivate RocketMQTemplate rocketMQTemplate;/**************驗證RocketMQ順序消費***************/@RequestMapping("/send/ordered")public String sendOrderedMsg(){/*** hashKey: 為了保證報到同一個隊列中,將消息發送到orderTopic主題上*/rocketMQTemplate.syncSendOrderly("orderTopic","no1","order");rocketMQTemplate.syncSendOrderly("orderTopic","no2","order");rocketMQTemplate.syncSendOrderly("orderTopic","no3","order");rocketMQTemplate.syncSendOrderly("orderTopic","no4","order");rocketMQTemplate.syncSendOrderly("orderTopic","no5","order");return "success";}}
消費者:消費者在消費的時候,默認是異步多線程消費的,所以無法保證順序,需要指定同步消費。指定 consumeMode = ConsumeMode.ORDERLY。默認值是 consumeMode = ConsumeMode.CONCURRENT。
@Service@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "ordered-consumer", consumeMode = ConsumeMode.ORDERLY)public class OrderedConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("順序消費,收到消息:" + message);}}
1、啟動當前服務。
2、用瀏覽器或者HTTP Client工具訪問:http://localhost:8888/mq/send/ordered
3、查看控制臺輸出:【順序打印:no1 no2 no3 no4 no5】即表示消息消費成功。

Spring是一個JavaEE一站式的開發框架。它提供的功能涵蓋了JavaEE程序中的表示層,服務層,持久層功能組件。這意味著,使用了Spring框架,一個框架就可以滿足整個JavaEE程序的開發。
但Spring框架,更加強調的是它的輕量級(模塊的可插拔)!!也就是說,除了內核以外模塊,如果你不想使用可以不用,它能夠整合任何第三方的框架。
所以,在現實開發中,Spring主要用于整合其他框架。我也整理了一份關于spring的面試題,希望可以幫助到大家!
感謝你看到這里,我是程序員青秧,一枚小碼農,從事開發六年了,每天都會分享java相關技術文章或行業資訊
歡迎大家關注我的專欄:
程序員青秧
里面不定期分享Java架構技術知識點及解析,還會不斷更新BATJ面試專題,歡迎大家前來探討交流,如有好的文章也歡迎投稿。(注意專欄簡介的介紹獲取最新一線大廠Java面試題總結資料!)
注意專欄簡介的介紹獲取最新一線大廠Java面試題總結資料!