MQ是什么
MQ全稱為Message Queue,即消息隊列 ,是一種提供消息隊列服務的中間件,也稱為消息中間件,是一套提供了消息生 產、存儲、消費全過程的軟件系統,遵循FIFO原則。
MQ的好處有哪些
- 異步解耦
最常見的一個場景是用戶注冊后,需要發送注冊郵件和短信通知,以告知用戶注冊成功。傳統的做法有以下兩種:
數據流動如下所述:
1.注冊頁面填寫賬號和密碼并提交注冊信息,這些注冊信息首先會被寫入注冊系統。
2.注冊信息寫入注冊系統成功后,再發送請求至郵件通知系統。郵件通知系統收到請求后向用戶發送郵件通知。
3.郵件通知系統接收注冊系統請求后再向下游的短信通知系統發送請求。短信通知系統收到請求后向用戶發送短信通知。
以上三個任務全部完成后,才返回注冊結果到客戶端,用戶才能使用賬號登錄。假設每個任務耗時分別為 50ms,則用戶需要在注冊頁面等待總共 150ms 才能登錄。
并行形式:
對于用戶來說,注冊功能實際只需要注冊系統存儲用戶的賬戶信息后,該用戶便可以登錄,后續的注冊短信和郵件不是即時需要關注的步驟。
對于注冊系統而言,發送注冊成功的短信和郵件通知并不一定要綁定在一起同步完成,所以實際當數據寫入注冊系統后,注冊系統就可以把其他的操作放入對應的 RocketMQ 中然后馬上返回用戶結果,由 RocketMQ 異步地進行這些操作。
- 削峰填谷
簡單來說就是當遇到秒殺等業務時,用戶訪問量大增,這時候可以使用MQ,將消息存入MQ當中這樣就可以減少秒殺等高訪問量場景下的造成的影響了 - 分布式定時/延時調度
RocketMQ 提供精確度到秒級的分布式定時消息能力(5.0架構后),可廣泛應用于訂單超時中心處理、分布式延時調度系統等場景。
使用 RocketMQ 定時消息有如下優勢:
-
定時精度高、開發門檻低:消息定時時間不存在階梯間隔,可以輕松實現任意精度事件觸發,無需業務去重。
-
高性能、可擴展:傳統的定時實現方案較為復雜,需要進行數據庫掃描,容易遇到性能瓶頸的問題,RocketMQ 可以基于定時消息特性完成事件驅動,實現百萬級消息 TPS 能力。
什么是RocketMQ
RocketMQ 是一個開源的分布式消息中間件,由阿里巴巴開發并貢獻給 Apache 軟件基金會。它主要用于高吞吐量、低延遲的消息傳遞需求。
RocketMQ 的優點和功能是比較多的,以下是 一些主要特點和功能:
-
高吞吐量和低延遲:RocketMQ 設計用于處理大量的消息,并提供低延遲的消息傳遞服務,適合需要高性能的場景。
-
分布式架構:RocketMQ 使用分布式架構來支持大規模的消息傳遞。它可以水平擴展,以處理更大的數據量和更高的并發需求。
-
消息可靠性:RocketMQ 支持消息持久化和多副本機制,確保在系統故障時不會丟失消息。這使得消息的可靠性和一致性得到了保障。
-
高可用性和容錯:RocketMQ 提供了高可用性的解決方案,包括多主多從等架構方案,確保系統的穩定性和連續性。
官網寫的很詳細,架構、基本概念(主題、隊列、生產者、消費者、NameServer、Beroker 等)、工作原理等。推薦大家學習一波:https://rocketmq.apache.org/zh/docs
RocketMQ架構
RocketMQ架構上主要分為四部分
1.1.Producer
消息發布的角色,支持分布式集群方式部署。Producer通過nameserver的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。
1.2.Consumer
消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時 也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數用戶的需求。
1.3.Broker
Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證。
1.4.NameServer
NameServer是一個Broker與Topic路由的注冊中心支持Broker的動態注冊與發現主要包括兩個功能
-
Broker管理
NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數據。然后提供心跳檢測機制,檢查Broker是否還存活。 -
路由信息管理
每個NameServer將保存關于Broker集群的整個路由信息和用于客戶端查詢的隊列信息。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費
Springboot整合
生產者
@Service
public class RocketMQProducer{@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.send-message-timeout}")private Integer messageTimeOut;/*** 發送普通消息* @return*/public SendResult sendMsg(String msgBody){SendResult result = rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());return result;}/*** 發送異步消息 在SendCallback中可處理相關成功失敗時的邏輯*/public void sendAsyncMsg(String msgBody){rocketMQTemplate.asyncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 處理消息發送成功邏輯}@Overridepublic void onException(Throwable e) {// 處理消息發送異常邏輯}});}/*** 發送延時消息<br/>* 在start版本中 延時消息一共分為18個等級分別為:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>*/public void sendDelayMsg(String msgBody, Integer delayLevel){rocketMQTemplate.syncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(),messageTimeOut,delayLevel);}/*** 發送帶tag的消息,直接在topic后面加上":tag"*/public void sendTagMsg(String msgBody){rocketMQTemplate.syncSend("queue_test_topic:tag1",MessageBuilder.withPayload(msgBody).build());}}
消費者
/*** rocketmq 消息監聽,@RocketMQMessageListener中的selectorExpression為tag,默認為**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "queue_test_topic",selectorExpression="*",consumerGroup = "queue_group_test")
public class RocketMQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String msg = new String(body, CharsetUtil.UTF_8);log.info("接收到消息:{}", msg);}}
測試
@Controller
public class ProducerController {@Autowiredprivate RocketMQProducer rocketMQProducer;@RequestMapping("/send")@ResponseBodypublic SendResult send(String msg) {//formats: `topicName:tags`return rocketMQProducer.sendMsg(msg);}}