Lison
<dreamlison@163.com>
, v1.0.0
, 2024.4.20
Spring Cloud Alibaba-07-RocketMQ消息驅動
文章目錄
- Spring Cloud Alibaba-07-RocketMQ消息驅動
- MQ簡介
- MQ的應用場景
- 常見的MQ產品
- RocketeMQ的架構及概念
- RocketMQ入門
- RocketMQ環境搭建
- SpringBoot 集成 RocketMQ
MQ簡介
消息隊列(Message Queue)簡稱 MQ,是一種跨進程的通信機制,通常用于應用程序間進行數據的異步傳輸,MQ 產品在架構中通常也被叫作“消息中間件”。它的最主要職責就是保證服務間進行可靠的數據傳輸,同時實現服務間的解耦。
MQ的應用場景
-
應用解耦
在電商平臺中,用戶下訂單需要調用訂單系統,此時訂單系統還需要調用庫存系統、支付系統、物流系統完成業務。此時會產生兩個問題:
1 如果庫存系統出現故障,會造成整個訂單系統崩潰。
2 如果需求修改,新增了一個X系統,此時必須修改訂單系統的代碼。
如果在系統中引入MQ,即訂單系統將消息先發送到MQ中,MQ再轉發到其他系統,則會解決以下問題:
1、由于訂單系統只發消息給MQ,不直接對接其他系統,如果庫存系統出現故障,不影響整個訂單。
2、如果需求修改,新增了一個X系統,此時無需修改訂單系統的代碼,只需修改MQ將消息發送給X系統即可。
-
異步提速
如果訂單系統同步訪問每個系統,則用戶下單等待時長如下:
如果引入MQ,則用戶等待時間如下:
-
消峰填谷
假設我們的系統每秒只能承載1000請求,如果請求瞬間增多到每秒 5000,則會造成系統崩潰。此時引入mq即可解決該問題
使用了MQ之后,限制消費消息的速度為1000,這樣一來,高峰期產生的數據勢必會被積壓在MQ中,高峰就被“削”掉了,但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持 在1000,直到消費完積壓的消息,這就叫做“填谷”。
流量削峰也是消息隊列 MQ 的常用場景,一般在秒殺或團隊搶購(高并發)活動中使用廣泛。
在秒殺或團隊搶購活動中,由于用戶請求量較大,導致流量暴增,秒殺的應用在處理如此大量的訪問流 量后,下游的通知系統無法承載海量的調用量,甚至會導致系統崩潰等問題而發生漏通知的情況。為解 決這些問題,可在應用和下游通知系統之間加入消息隊列 MQ。秒殺處理流程如下所述:
- 用戶發起海量秒殺請求到秒殺業務處理系統。
- 秒殺處理系統按照秒殺處理邏輯將滿足秒殺條件的請求發送至消息隊列 MQ。
- 下游的通知系統訂閱消息隊列 MQ 的秒殺相關消息,再將秒殺成功的消息發送到相應用戶。
- 用戶收到秒殺成功的通知。
常見的MQ產品
目前業界有很多MQ產品,比較出名的有下面這些:
ZeroMQ
號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。擴展性好,開發比較靈活,采用C語言 實現,實際上只是一個socket庫的重新封裝,如果做為消息隊列使用,需要開發大量的代碼。 ZeroMQ僅提供非持久性的隊列,也就是說如果down機,數據將會丟失。
RabbitMQ
使用erlang語言開發,性能較好,適合于企業級的開發。但是不利于做二次開發和維護。 ActiveMQ
歷史悠久的Apache開源項目。已經在很多產品中得到應用,實現了JMS1.1規范,可以和spring- jms輕松融合,實現了多種協議,支持持久化到數據庫,對隊列數較多的情況支持不好。
RocketMQ
阿里巴巴的MQ中間件,由java語言開發,性能非常好,能夠撐住雙十一的大流量,而且使用起來 很簡單。
Kafka
Kafka是Apache下的一個子項目,是一個高性能跨語言分布式Publish/Subscribe消息隊列系統, 相對于ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布 式系統。
RocketeMQ的架構及概念
RocketMQ 有很多優秀的特性,在可用性方面,RocketMQ 強調集群無單點,任意一點高可用,客戶端具備負載均衡能力,可以輕松實現水平擴容;在性能方面,在天貓雙 11 大促背后的億級消息處理就是通過 RocketMQ 提供的保障;在 API 方面,提供了豐富的功能,可以實現異步消息、同步消息、順序消息、事務消息等豐富的功能,能滿足大多數應用場景;在可靠性方面,提供了消息持久化、失敗重試機制、消息查詢追溯的功能,進一步為可靠性提供保障。
了解 RocketMQ 的諸多特性后,咱們來理解 RocketMQ 幾個重要的概念:
- 消息模型(Message Model):RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一臺服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的 Broker。Message Queue 用于存儲消息的物理地址,每個Topic中的消息地址存儲于多個 Message Queue 中。ConsumerGroup 由多個Consumer 實例構成。
- 消息生產者(Producer):負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。
- 消息消費者(Consumer):負責消費消息,一般是后臺系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
- 生產者組(Producer Group):同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事物消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。
- 消費者組(Consumer Group):同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。
- 代理服務器(Broker Server):消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
- 名字服務(Name Server):名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。
- 主題(Topic):表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。
- 標簽(Tag):為消息設置的標志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
- 消息(Message):消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。
- 拉取式消費(Pull Consumer):Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。
- 推動式消費(Push Consumer):Consumer消費的一種類型,該模式下Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高。
- 集群消費(Clustering):集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。
- 廣播消費(Broadcasting):廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
- 普通順序消息(Normal Ordered Message):普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。
- 嚴格順序消息(Strictly Ordered Message):嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
官方文檔地址:https://rocketmq.apache.org/zh/docs/
RocketMQ入門
RocketMQ 是一款分布式消息隊列中間件,官方地址為http://rocketmq.apache.org/,目前最新版本為4.8.0。RocketMQ 最初設計是為了滿足阿里巴巴自身業務對異步消息傳遞的需要,在 3.X 版本后正式開源并捐獻給 Apache,目前已孵化為 Apache 頂級項目,同時也是國內使用最廣泛、使用人數最多的 MQ 產品之一
RocketMQ環境搭建
接下來我們先在linux平臺下安裝一個RocketMQ的服務,本文使用 docker-compose 安裝RocketMq
創建docker文件夾
mkdir rocketmq
cd rocketmq
mkdir data
cd data
mkdir -p brokerconf logs store
在rocketmq文件夾下創建docker-compose.yml文件
## rocketmq
version: '3.8'
services:rmqnamesrv1:image: apache/rocketmq:4.9.3container_name: rmqnamesrv1ports:- 9876:9876volumes:- /Users/lison/work/data/dockerData/rocketmq/rmqnamesrv1/data/logs:/opt/logs- /Users/lison/work/data/dockerData/rocketmq/rmqnamesrv1/data/store:/opt/storecommand: sh mqnamesrv networks:- nt_devrmqnamesrv2:image: apache/rocketmq:4.9.3container_name: rmqnamesrv2ports:- 9877:9876volumes:- /Users/lison/work/data/dockerData/rocketmq/rmqnamesrv2/data/logs:/opt/logs- /Users/lison/work/data/dockerData/rocketmq/rmqnamesrv2/data/store:/opt/storecommand: sh mqnamesrv networks:- nt_dev rmqbroker1:image: apache/rocketmq:4.9.3container_name: rmqbroker1ports:- 10911:10911volumes:- /Users/lison/work/data/dockerData/rocketmq/rmqbroker1/data/logs:/opt/logs- /Users/lison/work/data/dockerData/rocketmq/rmqbroker1/data/store:/opt/store- /Users/lison/work/data/dockerData/rocketmq/rmqbroker1/brokerconf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv1:9876;rmqnamesrv2:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"command: sh mqbroker -c /etc/rocketmq/broker.conf autoCreateTopicEnable=true &depends_on:- rmqnamesrv1- rmqnamesrv2networks:- nt_dev rmqbroker2:image: apache/rocketmq:4.9.3container_name: rmqbroker2ports:- 10912:10911volumes:- /Users/lison/work/data/dockerData/rocketmq/rmqbroker2/data/logs:/opt/logs- /Users/lison/work/data/dockerData/rocketmq/rmqbroker2/data/store:/opt/store- /Users/lison/work/data/dockerData/rocketmq/rmqbroker2/brokerconf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv1:9876;rmqnamesrv2:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"command: sh mqbroker -c /etc/rocketmq/broker.conf autoCreateTopicEnable=true &depends_on:- rmqnamesrv1- rmqnamesrv2networks:- nt_dev rmqbroker3:image: apache/rocketmq:4.9.3container_name: rmqbroker3ports:- 10913:10911volumes:- /Users/lison/work/data/dockerData/rocketmq/rmqbroker3/data/logs:/opt/logs- /Users/lison/work/data/dockerData/rocketmq/rmqbroker3/data/store:/opt/store- /Users/lison/work/data/dockerData/rocketmq/rmqbroker3/brokerconf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv1:9876;rmqnamesrv2:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"command: sh mqbroker -c /etc/rocketmq/broker.conf autoCreateTopicEnable=true &depends_on:- rmqnamesrv1- rmqnamesrv2networks:- nt_devrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- 8080:8080environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv1:9876;rmqnamesrv2:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqbroker1- rmqbroker2- rmqbroker3networks:- nt_devnetworks:nt_dev:external: truedriver: bridge
brokerconf下新建broker.conf文件并存儲
創建文件
/Users/lison/work/data/dockerData/rocketmq/rmqbroker1/brokerconf/broker.conf
brokerClusterName = DefaultCluster
#broker名稱
brokerName = rmqbroker1
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
/Users/lison/work/data/dockerData/rocketmq/rmqbroker2/brokerconf/broker.conf
brokerClusterName = DefaultCluster
#broker名稱
brokerName = rmqbroker2
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
/Users/lison/work/data/dockerData/rocketmq/rmqbroker3/brokerconf/broker.conf
brokerClusterName = DefaultCluster
#broker名稱
brokerName = rmqbroker3
brokerId = 2
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
保存上邊配置,執行docker-compose
docker-compose up -d
docker ps 查看是否都啟動了,如果都啟動了,在成功,如果有沒有啟動成功,則可以查看docker日志,一般都是,ip設置問題。
打開對應的對口之后可以通過瀏覽器控制臺進行查看
SpringBoot 集成 RocketMQ
1、使用Java代碼來演示消息的發送和接收,加入依賴
<!-- RocketMQ客戶端,版本與Broker保持一致 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><!-- 也可直接定義指定版本 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.2</version></dependency>-->
2、配置應用 application.yml。
#rocketmq配置
rocketmq:enhance:# 啟動隔離,用于激活配置類EnvironmentIsolationConfig# 啟動后會自動在topic上拼接激活的配置文件,達到自動隔離的效果enabledIsolation: true# 隔離環境名稱,拼接到topic后,topic_dev,默認空字符串environment: devtopic: springboot-mqname-server: 127.0.0.1:9876# 生產者配置producer:# 發送同一類消息的設置為同一個group,保證唯一group: rocketmq-pro-group# 發送消息超時時間,默認3000sendMessageTimeout: 30000# 發送消息失敗重試次數,默認2retryTimesWhenSendFailed: 10# 異步消息重試此處,默認2retryTimesWhenSendAsyncFailed: 10# 消息最大長度 默認1024*4(4M)maxMessageSize: 4096# 是否在內部發送失敗時重試另一個broker,默認falseretryNextServer: false# 壓縮消息閾值,默認4k(1024 * 4)compressMessageBodyThreshold: 4096consumer:group: rocketmq-consumer-group
package com.ruipeng.service;
//發送短信的服務
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "rocketmq-consumer-group", topic = "springboot-mq")
public class SmsService implements RocketMQListener<Order> { @Override public void onMessage(Order order) {log.info("收到一個信息{},接下來發送短信", JSON.toJSONString(order)); }}
//測試
@RunWith(SpringRunner.class)
@SpringBootTest(classes = OrderApplication.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;//同步消息@Testpublic void testSyncSend() {
//參數一: topic, 如果想添加tag 可以使用"topic:tag"的寫法 //參數二: 消息內容SendResult sendResult =rocketMQTemplate.syncSend("test-topic-1", "這是一條同步消息");System.out.println(sendResult);}//異步消息@Testpublic void testAsyncSend() throws InterruptedException {public void testSyncSendMsg () {
//參數一: topic, 如果想添加tag 可以使用"topic:tag"的寫法
//參數二: 消息內容
//參數三: 回調函數, 處理返回結果 rocketMQTemplate.asyncSend("test-topic-1", "這是一條異步消息", new SendCallback() {@Overridepublic void onSuccess (SendResult sendResult){System.out.println(sendResult);}@Overridepublic void onException (Throwable throwable){System.out.println(throwable);}});
//讓線程不要終止 Thread.sleep(30000000); }}//單向消息@Testpublic void testOneWay() {rocketMQTemplate.sendOneWay("test-topic-1", "這是一條單向消息");}
}
三種發送方式的對比
發送方式 | 發送 TPS | 發送結果反饋 | 可靠性 |
---|---|---|---|
同步發送 | 快 | 有 | 不丟失 |
異步發送 | 快 | 有 | 不丟失 |
單向發送 | 最快 | 無 | 可能丟失 |
消費主義細節:
@RocketMQMessageListener(
consumerGroup = "shop",//消費者分組
topic = "order-topic",//要消費的主題
consumeMode = ConsumeMode.CONCURRENTLY, //消費模式:無序和有序 messageModel = MessageModel.CLUSTERING, //消息模式:廣播和集群,默認是集群
)
public class SmsService implements RocketMQListener<Order> {}
RocketMQ支持兩種消息模式:
- 廣播消費: 每個消費者實例都會收到消息,也就是一條消息可以被每個消費者實例處理;
- 集群消費: 一條消息只能被一個消費者實例消費