RocketMQ 提供了多種消息類型,以滿足不同業務場景對?順序性、事務性、時效性?的要求。其核心設計思想是通過解耦 “消息傳遞模式” 與 “業務邏輯”,實現高性能、高可靠的分布式通信。
一、主要類型包括
- 普通消息(基礎類型)
- 順序消息(保證消費順序)
- 定時 / 延遲消息(控制投遞時間)
- 事務消息(分布式事務最終一致性)
- 批量消息(提升吞吐量)
二、消息類型及代碼示例
1. 普通消息(Normal Message)
描述:最基礎的消息類型,支持異步發送、批量發送等模式,適用于無需嚴格順序和事務保證的場景(如日志收集、通知推送)。
核心優勢:高吞吐量、低延遲,生產端通過負載均衡自動選擇 Broker。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;public class NormalMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {// 1. 加載服務提供者(支持SPI擴展)ClientServiceProvider provider = ClientServiceProvider.loadService();// 2. 配置認證信息(密鑰管理)String accessKey = "yourAccessKey";String secretKey = "yourSecretKey";StaticSessionCredentialsProvider credentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 3. 構建客戶端配置(支持多協議、TLS加密)ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints("localhost:8081") // 支持域名或IP:端口列表.setCredentialProvider(credentialsProvider).setRequestTimeout(Duration.ofSeconds(3)) // 請求超時時間.build();// 4. 創建生產者(支持自動重試、批量發送)String topic = "normal-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setTopics(topic).setMaxAttempts(3) // 發送失敗最大重試次數.build();// 5. 構建并發送消息Message message = provider.newMessageBuilder().setTopic(topic).setBody("Hello RocketMQ 5.0!".getBytes()).setTag("order") // 可選標簽,用于消息過濾.setKeys("key123") // 消息業務鍵,用于查詢.build();// 同步發送(阻塞當前線程直到返回結果)SendReceipt receipt = producer.send(message);System.out.println("消息發送成功: " + receipt.getMessageId());// 異步發送示例/*producer.sendAsync(message).thenAccept(sendReceipt -> {System.out.println("異步發送成功: " + sendReceipt.getMessageId());}).exceptionally(throwable -> {System.out.println("異步發送失敗: " + throwable.getMessage());return null;});*/// 6. 關閉資源(重要!避免內存泄漏)producer.close();}
}
2. 順序消息(Ordered Message)
描述:通過將同一業務主鍵的消息路由到相同隊列,保證消息消費順序與發送順序一致。
應用場景:金融交易流水、訂單狀態變更、時序數據處理。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.util.List;public class OrderedMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "ordered-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 模擬訂單狀態變更(同一訂單ID的消息必須順序處理)String[] orderIds = {"order1001", "order1002", "order1001"};String[] orderStatus = {"CREATED", "PAYED", "SHIPPED"};for (int i = 0; i < orderIds.length; i++) {String orderId = orderIds[i];String status = orderStatus[i % orderStatus.length];// 關鍵:通過MessageGroup確保相同訂單的消息發送到同一隊列Message message = provider.newMessageBuilder().setTopic(topic).setBody(("訂單[" + orderId + "]狀態變更為: " + status).getBytes()).setMessageGroup(orderId) // 消息組決定消息路由的隊列.setKeys(orderId) // 設置業務鍵便于查詢.build();SendReceipt receipt = producer.send(message);System.out.println("發送順序消息: " + receipt.getMessageId() + ", 訂單ID: " + orderId + ", 狀態: " + status);}producer.close();}
}
3. 定時 / 延遲消息(Scheduled/Delay Message)
描述:消息發送后,需等待指定時間(或到達指定時間點)才會被消費者可見。
應用場景:訂單超時自動關閉、任務調度、延遲重試。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.time.Duration;
import java.time.Instant;public class DelayMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "delay-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 方式一:使用絕對時間戳(精確到毫秒)long timestamp = Instant.now().plus(Duration.ofMinutes(5)).toEpochMilli();Message messageByTimestamp = provider.newMessageBuilder().setTopic(topic).setBody("5分鐘后執行的定時消息".getBytes()).setDeliveryTimestamp(timestamp) // 設置投遞時間戳.build();// 方式二:使用預定義延遲級別(需Broker配置支持)Message messageByLevel = provider.newMessageBuilder().setTopic(topic).setBody("延遲30秒的消息".getBytes()).addProperty("DELAY", "3") // 假設3對應30秒(需Broker配置).build();// 發送延遲消息SendReceipt receipt = producer.send(messageByTimestamp);System.out.println("延遲消息發送成功: " + receipt.getMessageId() + ", 將于 " + Instant.ofEpochMilli(timestamp) + " 可見");producer.close();}
}
4. 事務消息(Transactional Message)
描述:
通過兩階段提交機制,保證本地事務與消息發送的最終一致性。
核心流程:
發送半消息(對消費者不可見)
- 執行本地事務
- 根據事務結果提交或回滾半消息
- 支持事務狀態回查(處理超時情況)
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.apis.producer.TransactionalProducer;public class TransactionalMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "transactional-message-topic";TransactionalProducer producer = provider.newTransactionalProducerBuilder().setClientConfiguration(config).setTopics(topic)// 關鍵:設置事務狀態回查處理器(當Broker長時間未收到事務狀態時觸發).setTransactionChecker(messageView -> {System.out.println("回查事務狀態: " + messageView.getBodyAsString());// 根據業務ID查詢本地事務狀態String bizId = messageView.getKeys().iterator().next();boolean transactionStatus = checkLocalTransactionStatus(bizId);return transactionStatus ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();try {// 1. 開啟事務上下文producer.beginTransaction();// 2. 發送半消息(未提交狀態)Message message = provider.newMessageBuilder().setTopic(topic).setBody("用戶賬戶扣款成功,通知庫存系統扣減".getBytes()).setKeys("order_12345") // 設置業務鍵,用于回查.build();producer.send(message);// 3. 執行本地事務(如數據庫操作)boolean localTransactionResult = executeLocalTransaction();// 4. 根據本地事務結果提交或回滾if (localTransactionResult) {producer.commit(); // 提交事務,消息對消費者可見System.out.println("本地事務執行成功,消息提交");} else {producer.rollback(); // 回滾事務,消息被丟棄System.out.println("本地事務執行失敗,消息回滾");}} catch (Exception e) {producer.rollback(); // 異常時回滾e.printStackTrace();} finally {producer.close();}}private static boolean executeLocalTransaction() {// 模擬本地事務:如用戶賬戶扣款System.out.println("執行本地事務...");return true; // 返回事務執行結果}private static boolean checkLocalTransactionStatus(String bizId) {// 模擬查詢本地事務狀態(如查詢數據庫訂單狀態)System.out.println("查詢本地事務狀態: " + bizId);return true; // 實際應根據業務ID查詢真實狀態}
}
5. 批量消息(Batch Message)
描述:將多條消息打包為一個批次發送,減少網絡開銷,提升吞吐量。
注意事項:
- 所有消息必須屬于同一 Topic
- 總大小不能超過 4MB(默認限制,可配置)
- 不支持事務和延遲屬性
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.util.ArrayList;
import java.util.List;public class BatchMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "batch-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 創建批量消息集合List<Message> messages = new ArrayList<>();for (int i = 0; i < 100; i++) { // 示例:批量發送100條消息Message message = provider.newMessageBuilder().setTopic(topic).setBody(("批量消息-" + i).getBytes()).setKeys("key-" + i).build();messages.add(message);}// 智能拆分大批次(避免超過4MB限制)List<List<Message>> batches = splitMessages(messages);// 發送所有批次for (List<Message> batch : batches) {List<SendReceipt> receipts = producer.send(batch);System.out.println("批量發送成功,共" + receipts.size() + "條消息");}producer.close();}// 智能拆分大批次消息(實際生產中建議實現)private static List<List<Message>> splitMessages(List<Message> messages) {// 簡單實現:實際應根據消息大小動態拆分List<List<Message>> result = new ArrayList<>();result.add(messages);return result;}
}
6. 消費者示例(通用)
描述:RocketMQ 5.0 支持 Push 和 Pull 兩種消費模式,以下是基于長輪詢的 PushConsumer 示例。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;import java.time.Duration;public class ConsumerExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();// 訂閱主題和過濾表達式(支持SQL92語法)String topic = "normal-message-topic";FilterExpression filterExpression = new FilterExpression("TAG = 'order' AND age > 18", // 示例SQL過濾條件FilterExpressionType.SQL92);// 創建PushConsumer(基于長輪詢的"偽推"模式)PushConsumer consumer = provider.newPushConsumerBuilder().setClientConfiguration(config).setConsumerGroup("my-consumer-group") // 消費組決定消息負載方式.setSubscriptionExpressions(Map.of(topic, filterExpression)).setMaxPollInterval(Duration.ofSeconds(30)) // 長輪詢超時時間.setConsumptionThreadCount(10) // 消費線程數.setMessageListener(messageView -> {try {// 處理消息邏輯(業務代碼)System.out.println("接收到消息: " + messageView.getBodyAsString());System.out.println("消息屬性: " + messageView.getProperties());// 模擬業務處理耗時Thread.sleep(100);// 返回消費結果(成功/失敗)return ConsumeResult.SUCCESS;} catch (Exception e) {// 消費失敗時返回RETRY,消息將重試消費System.out.println("消息消費失敗: " + e.getMessage());return ConsumeResult.FAILURE;}}).build();// 保持主線程運行,避免消費者立即關閉System.out.println("消費者已啟動,按Ctrl+C退出...");Thread.sleep(Long.MAX_VALUE);}
}
關鍵依賴配置(Maven)
<dependencies><!-- RocketMQ 5.0 Java客戶端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.1.0</version></dependency><!-- gRPC依賴 --><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.54.0</version></dependency><!-- 序列化依賴 --><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.21.9</version></dependency>
</dependencies>
三、最佳實踐建議
-
連接配置:
- 生產環境建議使用域名而非 IP,支持動態擴容
- 開啟 TLS 加密(通過
ClientConfiguration.setSslTrustStorePath
)
-
消息大小:
- 單條消息建議不超過 1MB
- 批量消息總大小不超過 4MB(可通過
producer.setMaxMessageSize
調整)
-
異常處理:
- 生產者需捕獲
ClientException
并實現重試邏輯 - 消費者應避免長時間阻塞,建議使用異步處理
- 生產者需捕獲
-
性能調優:
- 生產者:調整
sendMsgTimeout
和maxAttempts
參數 - 消費者:根據業務吞吐量調整
consumptionThreadCount
- 生產者:調整
-
監控告警:
- 監控 Topic 的 TPS、RT、堆積量等指標
- 配置告警閾值(如單隊列堆積超過 10 萬條)
四、總結
RocketMQ支持多種消息類型以滿足不同業務需求:普通消息適用于高吞吐場景;順序消息保證消費順序;定時/延遲消息控制投遞時間;事務消息確保分布式事務一致性;批量消息提升吞吐量。每種類型都提供了對應的Java代碼示例,包括生產者配置、消息構建和發送邏輯。最佳實踐建議包括合理配置連接、控制消息大小、完善異常處理、性能調優和監控告警。通過解耦消息傳遞與業務邏輯,RocketMQ實現了高性能、高可靠的分布式通信能力。