一、RocketMQ 概述
RocketMQ 是阿里巴巴開源的一款分布式消息中間件,后捐贈給 Apache 基金會成為頂級項目。它具有低延遲、高并發、高可用、高可靠等特點,廣泛應用于訂單交易、消息推送、流計算、日志收集等場景。
核心特點
分布式架構:支持集群部署,可水平擴展
高吞吐量:單機可支持10萬級TPS
低延遲:毫秒級消息投遞
高可用性:支持主從復制,自動故障轉移
消息可靠性:支持消息持久化,確保不丟失
豐富的消息模式:支持普通消息、順序消息、事務消息、定時消息等
二、核心概念
1. 基本組件
組件 | 說明 |
---|---|
NameServer | 輕量級注冊中心,負責Broker的注冊與發現 |
Broker | 消息存儲與轉發服務器,負責消息存儲、投遞和查詢 |
Producer | 消息生產者,負責發送消息 |
Consumer | 消息消費者,負責消費消息 |
Topic | 消息主題,用于消息分類 |
Message Queue | 消息隊列,Topic的分區單位 |
Tag | 消息標簽,用于消息二級分類 |
Group | 生產者組/消費者組,用于集群管理 |
一、阿里云rocketMQ
使用阿里云 ONS SDK
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>2.0.5.Final</version> <!-- 推薦最新版本 -->
</dependency>
獲取阿里云 RocketMQ 配置
Endpoint:
http://{YourInstanceId}.mq-internet.aliyuncs.com:80
AccessKey:阿里云賬號的?
AccessKey ID
?和?AccessKey Secret
Topic:消息主題(需在阿里云控制臺創建)
Group ID:消費者組(需在控制臺創建)
1、發消息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;import java.util.Properties;public class AliyunMQProducer {public static void main(String[] args) {// 1. 配置 ProducerProperties properties = new Properties();properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Producer Group ID// 2. 創建 ProducerProducer producer = ONSFactory.createProducer(properties);producer.start();// 3. 創建消息Message msg = new Message("YourTopic", // Topic"YourTag", // Tag"Hello Aliyun RocketMQ!".getBytes() // Body);// 4. 發送消息producer.send(msg);System.out.println("消息發送成功!");// 5. 關閉 Producerproducer.shutdown();}
}
2、消費MQ
import com.aliyun.openservices.ons.api.*;
import java.util.Properties;public class AliyunMQConsumer {public static void main(String[] args) {// 1. 配置 ConsumerProperties properties = new Properties();properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Consumer Group ID// 2. 創建 ConsumerConsumer consumer = ONSFactory.createConsumer(properties);// 3. 訂閱 Topic 和 Tag(* 表示所有 Tag)consumer.subscribe("YourTopic", "*", new MessageListener() {@Overridepublic Action consume(Message message, ConsumeContext context) {System.out.println("收到消息: " + new String(message.getBody()));return Action.CommitMessage; // 消費成功}});// 4. 啟動 Consumerconsumer.start();System.out.println("消費者已啟動,等待消息...");}
}
?
阿里云 ONS SDK 更穩定,推薦使用(比 Apache RocketMQ 客戶端更適配阿里云環境)。
Topic 和 Group ID 需先在阿里云控制臺創建,否則會報錯。
生產環境建議配置重試機制和日志監控,避免消息丟失。
消費模式:
集群消費(CLUSTERING):同 Group ID 的多個 Consumer 分攤消息(默認)。
廣播消費(BROADCASTING):同 Group ID 的每個 Consumer 都收到所有消息。
二、騰訊云RocketMQ
import java.io.UnsupportedEncodingException;
import java.util.List;import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import lombok.extern.slf4j.Slf4j;/*** 騰訊云rocketMQ服務類*/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class RocketTXMqService {@Value("${rocketmq.namespace:-1}")private String namespace;@Value("${rocketmq.producer.group:-1}")private String groupName;@Value("${rocketmq.producer.access-key:-1}")private String accessKey;@Value("${rocketmq.producer.secret-key:-1}")private String secretKey;@Value("${rocketmq.name-server:-1}")private String nameserver;// MQ生產者private DefaultMQProducer producer;// MQ實例化消費者pushprivate DefaultMQPushConsumer pushConsumer;// MQ實例化消費者pullprivate DefaultLitePullConsumer pullConsumer;/*** 創建生產者* * @return*/public DefaultMQProducer getProducer() {if (null == producer) {// 實例化消息生產者Producerproducer = new DefaultMQProducer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL權限);// 設置NameServer的地址producer.setNamesrvAddr(nameserver);try {// 啟動Producer實例producer.start();} catch (MQClientException e) {e.printStackTrace();}}return producer;}/*** 同步發送 發送消息*/public void syncSend(String topic, String tag, String data) {producer = getProducer();// 發送消息SendResult sendResult = null;try {// 創建消息實例,設置topic和消息內容Message msg = new Message(topic, tag, data.getBytes(RemotingHelper.DEFAULT_CHARSET));sendResult = producer.send(msg);log.info("埋點信息發送騰訊云MQ:" + data);log.info("發送騰訊云MQ接口返回狀態sendResult:" + sendResult);} catch (UnsupportedEncodingException e) {log.error("UnsupportedEncodingException:" + e.getMessage());} catch (MQClientException e) {log.error("MQClientException:" + e.getMessage());} catch (RemotingException e) {log.error("RemotingException:" + e.getMessage());} catch (MQBrokerException e) {log.error("MQBrokerException:" + e.getMessage());} catch (InterruptedException e) {log.error("InterruptedException:" + e.getMessage());}}/*** 創建push消費者* * @return*/public DefaultMQPushConsumer getPushConsumer() {if (null == pushConsumer) {// 實例化消費者pushConsumer = new DefaultMQPushConsumer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // ACL權限// 設置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);}return pushConsumer;}/*** 創建pull 消費者* * @return*/public DefaultLitePullConsumer getPullConsumer() {if (null == pullConsumer) {// 實例化消費者// 實例化消費者pullConsumer = new DefaultLitePullConsumer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));// 設置NameServer的地址pullConsumer.setNamesrvAddr(nameserver);// 設置從第一個偏移量開始消費pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);}return pullConsumer;}/*** push方式訂閱消費* * @param topicName*/public void pushConsumer(String topicName) {pushConsumer = this.getPushConsumer();if (null != pushConsumer) {try {pushConsumer.subscribe(topicName, "*");// 注冊回調實現類來處理從broker拉取回來的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息處理邏輯log.info("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 標記該消息已經被成功消費, 根據消費情況,返回處理狀態return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 啟動消費者實例pushConsumer.start();} catch (MQClientException e) {log.error("push MQClientException:" + e.getMessage());}}}/*** pull方式訂閱消費* * @param topicName*/public void pullConsumer(String topicName) {pullConsumer = this.getPullConsumer();if (null != pullConsumer) {try {// 訂閱topicpullConsumer.subscribe(topicName, "*");// 啟動消費者實例pullConsumer.start();} catch (MQClientException e) {log.error(" pull MQClientException:" + e.getMessage());}try {log.info("Consumer Started.%n");while (true) {// 拉取消息List<MessageExt> messageExts = pullConsumer.poll();log.info("%s%n", messageExts);}} finally {pullConsumer.shutdown();}}}}