1. RocketMQ 簡介
RocketMQ 是阿里巴巴開源的一款分布式消息隊列,具有高吞吐量、低延遲、可靠性等特點,廣泛應用于金融、電商、物聯網等領域。
- RocketMQ 的核心特性:
- 高可靠性:支持消息存儲、重復消費、失敗重試等
- 高可用性:分布式架構,支持主從復制
- 高性能:高吞吐量、低延遲
2. 引入依賴
首先,在 Spring Boot 項目 中引入 RocketMQ 和 Spring Cloud Alibaba 的依賴。
2.1 配置依賴
在 pom.xml
中添加以下依賴:
<dependencies><!-- Spring Boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Cloud Alibaba RocketMQ --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-rocketmq</artifactId></dependency><!-- Spring Cloud Alibaba --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-alibaba</artifactId></dependency>
</dependencies>
2.2 配置 RocketMQ
在 application.yml
中配置 RocketMQ 連接信息:
spring:cloud:alibaba:rocketmq:name-server: 127.0.0.1:9876 # RocketMQ NameServer 地址producer:group: my-producer-group # 生產者組consumer:group: my-consumer-group # 消費者組
3. 生產者代碼實現
3.1 創建消息生產者
我們可以通過注解 @RocketMQMessageListener
來創建 RocketMQ 生產者,消息可以通過 @Value
傳遞或直接通過 Bean 注入。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 發送普通消息public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}
}
3.2 發送消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate RocketMQProducer rocketMQProducer;@GetMapping("/send")public String sendMessage() {String message = "Hello, RocketMQ!";rocketMQProducer.sendMessage("my-topic", message);return "Message sent: " + message;}
}
訪問 http://localhost:8080/send
發送消息,RocketMQ 將開始處理消息。
4. 消費者代碼實現
4.1 創建消息消費者
在消費者端,我們需要創建一個消息監聽器,利用 @RocketMQMessageListener
注解監聽 RocketMQ 消息。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group")
public class RocketMQConsumer {@org.springframework.messaging.handler.annotation.MessageMappingpublic void listen(String message) {System.out.println("Received message: " + message);}
}
4.2 消費消息
每次生產者發送的消息,消費者都會通過 listen
方法進行接收。控制臺會打印出收到的消息。
5. 順序消息
RocketMQ 支持順序消息,可以通過設置 MessageQueueSelector
來保證消息的順序性。
5.1 發送順序消息
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 發送順序消息public void sendOrderedMessage(String topic, String message, int orderId) {rocketMQTemplate.convertAndSend(topic, message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 根據orderId選擇隊列return mqs.get(orderId % mqs.size());}}, orderId);}
}
5.2 接收順序消息
順序消息的消費邏輯和普通消息相似,只不過要保證順序消息的消費順序。
6. 事務消息
RocketMQ 提供了事務消息功能,能夠保證消息在分布式事務中的可靠性。我們通過 @RocketMQTransactionListener
來實現。
6.1 配置事務消息
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransaction;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
@RocketMQTransactionListener
public class TransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage(String topic, String message) {rocketMQTemplate.sendMessageInTransaction(topic, message, null);}public RocketMQLocalTransaction executeLocalTransaction(String message, Object arg) {// 事務操作try {// 業務操作return RocketMQLocalTransaction.SUCCESS;} catch (Exception e) {return RocketMQLocalTransaction.ROLLBACK;}}
}
6.2 事務回查
RocketMQ 支持事務回查機制,如果事務消息發送后沒有明確的提交或回滾,RocketMQ 會通過回查接口查詢事務狀態。
7. Spring Cloud Alibaba 集成
7.1 配置 Spring Cloud RocketMQ
在 application.yml
配置 Spring Cloud Alibaba 與 RocketMQ 集成。
spring:cloud:alibaba:rocketmq:name-server: 127.0.0.1:9876producer:group: my-producer-groupconsumer:group: my-consumer-group
7.2 集成 OpenFeign 與 RocketMQ
通過 OpenFeign 實現遠程服務調用,可以和 RocketMQ 一起工作。例如,將 RocketMQ 生產者集成到一個微服務中,使用 OpenFeign 調用。
@FeignClient("rocketmq-producer-service")
public interface RocketMQFeignClient {@PostMapping("/send")void sendMessage(@RequestBody String message);
}
通過 Feign 客戶端發送請求并觸發 RocketMQ 生產者的消息發送。
8. 總結
功能總結
功能 | 說明 |
---|---|
消息生產 | 使用 RocketMQTemplate 發送消息 |
消息消費 | 使用 @RocketMQMessageListener 監聽消息 |
順序消息 | 使用 MessageQueueSelector 保證消息順序 |
事務消息 | 使用 RocketMQTransactionListener 保證事務一致性 |
集成 Spring Cloud | 結合 Spring Cloud Alibaba RocketMQ 進行分布式消息通信 |
集成 RocketMQ 的好處
- 提高系統解耦,避免直接調用遠程服務。
- 支持異步、可靠的消息傳遞。
- 通過順序消息保證業務流程的順序性。
- 事務消息保證分布式事務的一致性。
這篇教程將幫助你實現 Spring Boot 和 Spring Cloud Alibaba 集成 RocketMQ 的基本功能,之后可以根據業務需求進行擴展。如果這篇教程對你有幫助,記得點贊、收藏哦! 🚀