前言
在分布式系統中,消息中間件是解耦服務、實現異步通信的核心組件。RocketMQ 作為阿里巴巴開源的高性能分布式消息中間件,憑借其高吞吐、低延遲、高可靠等特性,成為企業級應用的首選。而 Spring Boot 通過其“約定優于配置”的設計理念,極大簡化了項目開發的復雜度。本文將通過 手動連接 和 配置連接 兩種方式,詳細講解如何在 Spring Boot 中集成 RocketMQ,實現消息的同步與異步發送,并提供完整示例代碼。
?
?
一、環境準備
在開始前,請確保:
- JDK 17、Maven 3.6+、Spring Boot 2.7+。
- 安裝RocketMQ服務(本地或遠程),推薦使用RocketMQ Docker鏡像快速搭建(可參考之前文章)。
?
二、示例—Springboot集成mq(手動連接)
通過編碼方式初始化生產者,適用于需要動態控制資源的場景。
2.1 新建項目
?
?
?
?
?
2.2 引入依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency>
?
?
2.3 生產者發送消息
- 構建一個消息生產者DefaultMQProducer實例,然后指定生產者組為jihaiProducer;
- 指定NameServer的地址:服務器的ip:9876,因為需要從NameServer拉取Broker的信息
- producer.start() 啟動生產者
- 構建一個內容為:技海拾貝的消息1,然后指定這個消息往jihaishibei這個topic發送
- producer.send(msg):發送消息,打印結果
- 關閉生產者
public class Producer {public static void main(String[] args) throws Exception {//創建一個生產者,指定生產者組為jihaiProducerDefaultMQProducer producer = new DefaultMQProducer("jihaiProducer");// 指定NameServer的地址producer.setNamesrvAddr("localhost:9876");// 第一次發送可能會超時,設置的比較大producer.setSendMsgTimeout(60000);// 啟動生產者producer.start();// 創建一條消息// topic為 jihaishibei// 消息內容為 技海拾貝的消息1// tags 為 TagAMessage msg = new Message("jihaishibei", "TagA", "技海拾貝的消息1 ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 發送消息并得到消息的發送結果,然后打印SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);// 關閉生產者producer.shutdown();}}
?
?
?
?
啟動,發送消息
?
?
在控制臺可以看到這條消息
?
?
?
?
?
這里就能看到發送消息的詳細信息。
左下角消息的消費的消費,因為我們還沒有消費者訂閱這個topic,所以左下角沒數據。
?
2.4 消費者消費消息
- 創建一個消費者實例對象,指定消費者組為jihaiConsumer
- 指定NameServer的地址:服務器的ip:9876
- 訂閱 jihaishibei這個topic的所有信息
- consumer.registerMessageListener ,這個很重要,是注冊一個監聽器,這個監聽器是當有消息的時候就會回調這個監聽器,處理消息,所以需要用戶實現這個接口,然后處理消息。
- 啟動消費者
?
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 通過push模式消費消息,指定消費者組DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jihaiConsumer");// 指定NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 訂閱這個topic下的所有的消息consumer.subscribe("jihaishibei", "*");// 注冊一個消費的監聽器,當有消息的時候,會回調這個監聽器來消費消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("消費消息:%s", new String(msg.getBody()) + "\n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 啟動消費者consumer.start();System.out.printf("Consumer Started.%n");}
}
?
?
?
?
啟動服務,進行消費
?
?
在控制臺,發現被jihaiConsumer這個消費者組給消費了。
?
?
?
三、示例2—Springboot集成mq(配置連接)
在 Spring Boot 中,可以通過配置文件簡化 RocketMQ 的連接配置。以下是在 application.yml
? 文件中進行的配置:
3.1 配置文件修改
?
?
?
?
?
?
?
spring:application:name: rocket-mq-demorocketmq:name-server: 127.0.0.1:9876producer:group: rocket-mq-demo-producersend-message-timeout: 10000comsumer:group: rocket-mq-demo-comsumersend-message-timeout: 10000
?
?
?
3.2 添加依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency>
根據需要選擇最新版本,從中央倉庫可以查看
?https://central.sonatype.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter
?
?
?
?
?
備注:如果添加rocketmq-client依賴,先注釋這個依賴
?
3.3 消費者service類
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** messageModel=MessageModel.CLUSTERING* 監聽模式,有消息就會消費*/
@Service
@RocketMQMessageListener(topic = "jihaishibei-topic", consumerGroup = "rocket-mq-demo-comsumer", messageModel = MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.printf("收到消息: %s\n", s);}
}
?
3.4 生產者service類
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private final String topic = "jihaishibei-topic";// 1.同步發送消息// 同步發送是指發送方發送一條消息后,會等待服務器返回確認信息后再進行后續操作。這種方式適用于需要可靠性保證的場景。public void createAndSend(String message){rocketMQTemplate.convertAndSend(topic, message);System.out.printf("同步發送結果: %s\n", message);}// 1.同步發送消息// 同步發送是指發送方發送一條消息后,會等待服務器返回確認信息后再進行后續操作。這種方式適用于需要可靠性保證的場景。public void sendSyncMessage(String message){SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());System.out.println(sendResult.getMsgId());System.out.printf("同步發送結果: %s\n", message);}// 2.異步發送消息// 異步發送是指發送方發送消息后,不等待服務器返回確認信息,而是通過回調接口處理返回結果。這種方式適用于對響應時間要求較高的場景。public void sendAsyncMessage(String message){rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("異步發送成功: %s\n", sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.printf("異步發送失敗: %s\n", throwable.getMessage());}});}// 3.單向發送消息// 單向發送是指發送方只負責發送消息,不關心服務器的響應。該方式適用于對可靠性要求不高的場景,如日志收集。public void sendOneWayMessage(String message){rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());System.out.println("單向消息發送成功");}
}
?
3.5 測試controller類
@RequestMapping("api")
@RestController
public class RocketController {@Autowiredprivate RocketMQProducer rocketMQProducer;@GetMapping("/createAndSend")public String createAndSend(@RequestParam String message) {rocketMQProducer.createAndSend(message);return "同步消息發送成功";}@GetMapping("/sendSync")public String sendSync(@RequestParam String message) {rocketMQProducer.sendSyncMessage(message);return "同步消息發送成功";}@GetMapping("/sendAsync")public String sendAsync(@RequestParam String message) {rocketMQProducer.sendAsyncMessage(message);return "異步消息發送中";}@GetMapping("/sendOneWay")public String sendOneWay(@RequestParam String message) {rocketMQProducer.sendOneWayMessage(message);return "單向消息發送成功";}
}
?
3.6 啟動服務
?
?
3.7 測試
同步消息1
?
?
?
?
?
?
?
同步消息2
?
?
?
?
?
?
異步消息
?
?
?
?
?
?
?
?
?
單向發送消息
?
?
?
?
?
?
?
四、結束語
本文通過手動連接與配置連接兩種方式,展示了Spring Boot與RocketMQ的集成實踐。手動連接幫助開發者理解底層API邏輯,而Spring Boot的配置化集成則極大簡化了開發流程。無論是同步消息的可靠性保障,還是異步消息的性能優化,RocketMQ均能與Spring Boot無縫協作,為分布式系統提供高效的消息通信能力。
未來可進一步探索集群部署、消息重試機制及監控告警,以實現更健壯的消息服務。希望本文能為開發者快速構建高可用的消息系統提供參考!
?