文章目錄
- 什么是 RocketMQ,有哪些使用場景?
- RocketMQ 由哪些?色組成,每個?色作用和特點是什么?
- RocketMQ 中的 Topic 和 JMS 的 queue 有什么區別?
- RocketMQ 消費模式有幾種?
- RocketMQ 的 Consumer 是如何消費消息的?
- Broker 如何處理拉取請求的?
- RocketMQ 如何做負載均衡?
- 消息重復消費
- RocketMQ 如何保證消息不丟失
- 高吞吐量下如何優化生產者和消費者的性能?
- RocketMQ的集群架構是怎樣的
- RocketMQ 的 Broker 有哪幾種集群模式?
- RocketMQ消息積壓會發生什么問題?如何避免?
- RocketMQ 延遲消息是如何實現的?
- RocketMQ 如何處理大量的消息?有哪些優化措施?
- RocketMQ的消息存儲如何進行清理和歸檔?
- RocketMQ 的消息是如何進行存儲的?
- RocketMQ 的事務消息是如何實現的?有什么用途?
- RocketMQ 如何保證消息順序?
- RocketMQ 的廣播消息和集群消息有什么區別?
- RocketMQ 提供了哪些消息過濾的機制?
- RocketMQ 的 Producer 是如何發送消息的?
什么是 RocketMQ,有哪些使用場景?
RocketMQ是阿里開源的一款非常優秀的中間件產品,后捐贈給Apache基金會作為一款孵化技術,僅僅經歷了一年多的時間就成為Apache基金會的頂級項目。RocketMQ是一款分布式、隊列模型的消息中間件,支持分布式事務,天然的支持集群模型、負載均衡、水平擴展能力,億級別的消息堆積能力。
RocketMQ的使用場景包括:
- 異步通信:RocketMQ 可以在不同的應用程序之間進行異步通信,從而提高系統的可伸縮性和響應速度。同時減少多個模塊之間的依賴性,使整個系統更加靈活并易于維護。
- 應用解耦:通過RocketMQ作為中介,生產方與消費方通過消息進行交互,減少多個模塊之間的依賴性,使整個系統更加靈活并易于維護。
- 削峰填谷:RocketMQ 可以用于平滑處理流量峰值,將請求緩沖并逐漸處理,以防止系統過載。例如,在大型活動(如秒殺、搶紅包、企業開門紅等)中,通過RocketMQ的削峰填谷能力,平穩流量峰值,避免系統壓力過大。
- 保證消息順序:適用于需要保證多條消息處理順序的場景,例如證券交易過程、訂單創建、支付、退款等流程。
- 事件驅動架構:RocketMQ 適用于構建事件驅動的架構,以便快速響應事件和狀態變化。
- 日志收集:統一收集業務日志,供分析系統進行數據分析,消息隊列作為日志數據的中轉站。并且,RocketMQ 的流式計算框架非常適合與大數據框架集成,如 Apache Hadoop 和 Flink 等,用于構建實時數據流處理。
總之,RocketMQ是一個功能強大的消息中間件,適用于各種分布式應用程序和場景,特別是那些需要高性能、低延遲和可靠性的應用。
RocketMQ 由哪些?色組成,每個?色作用和特點是什么?
?色 | 作用 |
---|---|
Nameserver | 無狀態,動態列表;這是和 ZooKeeper 的重要區別之一。ZooKeeper 是有狀態的。 |
Producer | 消息生產者,負責發消息到 Broker。 |
Broker | 就是 MQ 本身,負責收發消息、持久化消息等。 |
Consumer | 消息消費者,負責從 Broker 上拉取消息進行消費,消費完進行 ack。 |
RocketMQ 中的 Topic 和 JMS 的 queue 有什么區別?
queue 就是來源于數據結構的 FIFO 隊列。而 Topic 是個抽象的概念,每個 Topic 底層對 應 N 個 queue,而數據也真實存在 queue 上的。
RocketMQ 消費模式有幾種?
消費模型由 Consumer 決定,消費維度為 Topic。
集群消費
- 一條消息只會被同 Group 中的一個 Consumer 消費
- 多個 Group 同時消費一個 Topic 時,每個 Group 都會有一個 Consumer 消費到數據。
廣播消費
- 消息將對一個 Consumer Group 下的各個 Consumer 實例都消費一遍。即即使這些Consumer 屬于同一個Consumer Group,消息也會被 Consumer Group 中的每個 Consumer 都消費一次。
RocketMQ 的 Consumer 是如何消費消息的?
RocketMQ的Consumer消費消息的方式有兩種:Push方式和Pull方式。
- 在 Push 推模式下,RocketMQ 的 Broker 會主動將消息推送給對應的 Consumer。而 Consumer 會注冊一個 MessageListener 回調函數,并在接收到消息后立即觸發回調函數。
- 在 Pull 拉模式下,Consumer 需要主動向 Broker 定期發送拉取消息的請求,自行完成處理消息,以及向 Broker 返回 ack 信息等步驟。
這兩種模式都有各自的優缺點,適合不同的業務場景。其中:
- Push 模式的優點是,實現比較簡單,客戶端只要注冊回調方法,專心處理業務就可以了。并且消息到達 Broker 后立即會被推送到 Consumer,消息處理比較及時。缺點也比較明顯,消費者需要與 Broker保持長時間的連接,對網絡要求比較高。
- Push 模式的有點是,Consumer 有更大的控制權,可以根據自身處理能力來調整拉取消息的頻率,避免消息積壓。并且拉模式只需要在拉取消息時與 Broker 保持短暫的網絡連接,比較適合那些網絡連接不是很穩定的環境。而對應的缺點則是,Pull 模式下,Consumer 需要自行調整消息拉取的頻率,處理消息就沒有 Push 模式那么及時。另外,Push 模式下,Consumer 的實現相對比較復雜,容易產生消息積壓。
不論是Push方式還是Pull方式,從Broker獲取消息后,Consumer都會進行消費處理。RocketMQ還提供了多種消費策略,如集群消費、廣播消費、并行消費和順序消費等,以滿足不同的業務需求。
Broker 如何處理拉取請求的?
Consumer 首次請求 Broker。
- Broker 中是否有符合條件的消息
- 有
- 響應 Consumer。
- 等待下次 Consumer 的請求。
- 沒有
- DefaultMessageStore#ReputMessageService#run 方法。 - PullRequestHoldService 來 Hold 連接,每個 5s 執行一次檢查pullRequestTable 有沒有消息,有的話立即推送。
- 每隔 1ms 檢查 commitLog 中是否有新消息,有的話寫入到pullRequestTable。
- 當有新消息的時候返回請求。
- 掛起 consumer 的請求,即不斷開連接,也不返回數據。
- 使用 consumer 的 offset。
RocketMQ 如何做負載均衡?
通過 Topic 在多 Broker 中分布式存儲實現。
producer 端
發送端指定 message queue 發送消息到相應的 broker,來達到寫入時的負載均衡
- 提升寫入吞吐量,當多個 producer 同時向一個 broker 寫入數據的時候,性能會下降 - 消息分布在多 broker中,為負載消費做準備
默認策略是隨機選擇:
- producer 維護一個 index
- 每次取節點會自增
- index 向所有 broker 個數取余
- 自帶容錯策略
其他實現:
- SelectMessageQueueByHash
- hash 的是傳入的 args
- SelectMessageQueueByRandom
- SelectMessageQueueByMachineRoom 沒有實現
也可以自定義實現 MessageQueueSelector 接口中的 select 方法
MESSAGEQUEUE SELECT(FINAL LIST<MESSAGEQUEUE> MQS, FINAL MESSAGE MSG, FINAL OBJECT ARG);
consumer 端
采用的是平均分配算法來進行負載均衡。
其他負載均衡算法
平均分配策略(默認)(AllocateMessageQueueAveragely) 、環形分配策略(AllocateMessageQueueAveragelyByCircle) 、手動配置分配策略(AllocateMessageQueueByConfig) 、機房分配策略 (AllocateMessageQueueByMachineRoom)、一致性哈希分配策略 (AllocateMessageQueueConsistentHash)、靠近機房策略 (AllocateMachineRoomNearby)
追問:當消費負載均衡 consumer 和 queue 不對等的時候會發生什么?
Consumer 和 queue 會優先平均分配,如果 Consumer 少于 queue 的個數,則會存在部分Consumer 消費多個queue 的情況,如果 Consumer 等于 queue 的個數,那就是一 個 Consumer 消費一個 queue,如果 Consumer個數大于 queue 的個數,那么會有部分 Consumer 空余出來,白白的浪費了。
消息重復消費
影響消息正常發送和消費的重要原因是網絡的不確定性。
引起重復消費的原因
- ACK
正常情況下在 consumer 真正消費完消息后應該發送 ack,通知 broker 該消息 已正常消費,從 queue 中剔除。
當 ack 因為網絡原因無法發送到 broker,broker 會認為詞條消息沒有被消費, 此后會開啟消息重投機制把消息再次投遞到 consumer- 消費模式
在 CLUSTERING模式下,消息在 broker 中會保證相同 group 的 consumer 消 費一次,但是針對不同 group 的consumer 會推送多次解決方案
- 數據庫表
處理消息前,使用消息主鍵在表中帶有約束的字段中 insert- Map
單機時可以使用 map ConcurrentHashMap -> putIfAbsent guava cache - Redis
分布式鎖搞起來。
RocketMQ 如何保證消息不丟失
首先在如下三個部分都可能會出現丟失消息的情況:
- Producer 端
- Broker 端
- Consumer 端
Producer 端如何保證消息不丟失
- 采取 send()同步發消息,發送結果是同步感知的。
- 發送失敗后可以重試,設置重試次數。默認 3 次。PRODUCER.SETRETRYTIMESWHENSENDFAILED(10);
集群部署,比如發送失敗了的原因可能是當前 Broker 宕機了,重試的時候會發送到其他 Broker 上。
Broker 端如何保證消息不丟失
- 修改刷盤策略為同步刷盤。默認情況下是異步刷盤的,將刷盤方式(FlushDiskType)配置為 Sync 同步刷盤,保證消息盡快寫入到磁盤中,防止 Broker 出現故障造成內存中沒有及時刷如到硬盤的那一部分消息丟失。
- 集群部署,主從模式,高可用。為了防止 Broker 端硬盤出現故障造成消息丟失,給 Broker 配置一個或多個 Slave 從節點,進行消息冗余。同時將消息同步方式配置為 Sync 同步同步。
Consumer 端如何保證消息不丟失
- 完全消費正常后在進行手動 ack 確認。
高吞吐量下如何優化生產者和消費者的性能?
開發
- 同一 group 下,多機部署,并行消費 - 單個 Consumer 提高消費線程個數 - 批量消費
- 消息批量拉取
- 業務邏輯批量處理
運維
- 網卡調優
- jvm 調優
- 多線程與 cpu 調優 - Page Cache
RocketMQ的集群架構是怎樣的
RocketMQ的集群架構包括四個主要角色:Name Server集群、Broker主從集群、Producer和Consumer客戶端。
- Name Server集群是RocketMQ的一種輕量級的服務節點,負責注冊和管理Broker的服務地址,提供服務的注冊和發現功能。每個Broker節點都要跟所有的Name Server節點建立長連接,定義注冊Topic路由信息和發送心跳。每個 NameServer 節點都會保存完整的 Broker 列表數據,并且 NameServer 個個節點之間不會同步數據。因此,NameServer 集群不會因為單個節點發生故障而停止服務。
- Broker 是 RocketMQ 的核心組件,負責存儲和傳輸消息。一個 RocketMQ 集群通常包含多個 Broker 實例,共同協作來提高 RocketMQ 的可用性和吞吐量。其中,Master Broker,主節點,負責處理客戶端的請求,并將消息存儲到磁盤上,然后將消息同步復制給所有的從節點。而從節點,是Master Broker 的消息備份。
- 客戶端包含 Producer 生產者和 Consumer 消費者。其中,Producer 負責將消息發送給 Broker。Producer 可以將消息發送到指定的 Topic,RocketMQ 會負責將消息存放到對應的 Broker 上。Consumer 可以訂閱一個或多個 Topic,并從對應的 Broker 上接收消息進行處理。RocketMQ 的客戶端提供了多種處理消息的方式,比如延遲消息、事務消息、集群消息、廣播消息等。
RocketMQ 的 Broker 有哪幾種集群模式?
RocketMQ的Broker有三種集群模式:
- 單Master模式:只有一個Master節點,其他都是Slave節點。Master節點負責響應客戶端的請求并存儲消息,Slave節點只同步Master節點的消息,也會響應部分客戶端的讀請求。這種模式的優點是簡單易部署,但是存在單點故障的問題,如果Master節點宕機,會導致整個服務不可用。
- Master-Slave模式(經典雙集群部署):一個Master節點對應多個Slave節點,Master和Slave都是獨立的NameServer。Master節點負責響應客戶端請求并存儲消息,Slave節點只同步Master節點的消息,也會響應部分客戶端的讀請求。這種模式的優點是高可用性,即使Master節點宕機,Slave節點可以自動升級為Master節點,繼續提供服務。但是,如果只有一個Master節點,存在單點故障的問題。
- Dledger模式(高可用集群部署):在Master-Slave模式的基礎上增加了Raft協議,實現了自動腦裂后的數據高可靠性。即使某個節點從網絡上掉下來或者宕機后,仍然能夠保證所有的消息不會丟失。這種模式的優點是高可用性和高可靠性,即使某個節點出現故障,也能保證服務的可用性。
總的來說,單Master模式適合測試和開發環境,Master-Slave模式適合生產環境,而Dledger模式適合需要高可靠性的生產環境。
RocketMQ消息積壓會發生什么問題?如何避免?
在RocketMQ中,如果未消費的消息過多,會給集群帶來非常多的問題:
- 消息堆積:消息在Broker端不斷堆積,可能會導致Broker的存儲壓力過大,影響整個系統的性能和穩定性。
- 死信隊列:RocketMQ中的Broker會在一定時間內無法被消費的消息轉換到死信隊列中。如果消息持續堆積,死信隊列的空間有限,一些消息可能會被丟棄,導致數據丟失。
- 消息延遲和積壓:如果消息持續堆積,可能會導致消息的延遲增大,進一步影響系統的響應速度和處理能力。
- 消息丟失:RocketMQ 會定期刪除過期的日志文件。在刪除時,RocketMQ并不會關注過期文件中的消息是否被消費者處理。這就會造成過期日志文件中未被 Consumer 消費的消息丟失。
為了避免這些問題,可以采取以下措施:
- 控制消息發送速度:Producer可以根據Consumer的處理能力,動態地控制消息的發送速度。例如,可以通過監控Consumer的處理情況,調整發送速度,避免消息堆積。
- 增加Consumer:可以增加更多的Consumer來提高消息的消費能力。通過增加Consumer的數量,可以并行處理消息,提高系統的吞吐量。
- 及時處理死信隊列:可以在Broker端配置死信隊列,對無法正常被消費的消息進行捕獲和處理。這樣即使消息被丟棄,也可以通過死信隊列進行恢復和處理。
- 合理配置Broker和Producer/Consumer的參數:可以通過調整Broker和Producer/Consumer的配置參數,如緩存大小、請求超時等,來優化系統的性能和穩定性。
綜合來說,合理的配置和監控是避免生產者速率超過消費者速率的關鍵。根據實際業務需求和資源配置,可以選擇適當的措施來優化消息的處理,以確保消息隊列系統的穩定和高性能。
RocketMQ 延遲消息是如何實現的?
RocketMQ的延遲消息實現是通過在消息發送時設置一個延遲級別,然后消息會被存儲到DelayMessageService中,等待達到指定的延遲時間后再被重新推送到Broker的commitLog服務中。
具體流程如下:
- Producer 將消息投遞到Broker的commitLog服務。
- commitLog服務判斷消息是否為延遲消息,如果是,則將實際的topic和queueId保存到消息的屬性中,并將topic設置成延遲topic(SCHEDULE_TOPIC_XXXX),queueId對應的延遲級別和消息投遞時間保存在tagCode中。
- 消息延遲服務(DelayMessageService)從SCHEDULE_TOPIC_XXXX主題循環拉取消息。
- DelayMessageService根據tagCode找到對應的延遲隊列,并按照延遲級別進行排序。
- 當達到指定的延遲時間后,DelayMessageService會將消息重新推送到commitLog服務。
- commitLog服務將消息推到Producer 指定的目標 Topic 中。
- Consumer從 目標 Topic 中拉取消息。
RocketMQ支持最多18個延遲級別,可以滿足不同延遲時間的需求。
另外,在新版本的 RocketMQ 中,使用時間輪機制,提供了指定任意時間的延遲消息功能。
RocketMQ 如何處理大量的消息?有哪些優化措施?
RocketMQ可以通過以下優化措施來處理大量的消息:
- 增加Broker數量:增加Broker數量可以使得消息在消費端的負載均衡更加靈活,同時可以提高系統的容錯性和可用性。
- 消息生產者的異步發送: 消息生產者可以使用異步發送方式,以降低發送消息的延遲,提高發送吞吐量。異步發送允許生產者在等待確認之前繼續發送更多的消息。
- 使用批量發送和批量消費: RocketMQ 支持批量發送消息和批量消費消息。批量操作可以減少網絡開銷和提高處理效率,特別是在高吞吐量的場景下。使用批量發送和批量消費可以顯著減少網絡負載和提高性能。
- 合理設置消費者數量:增加消費者數量可以顯著提升消費者處理消息的并發能力。但是,消費者數量不能超過對應 Topic 中的 MessageQueue 數量。另外,增加消費者的工作線程數,也能一定程度上提升消費者的處理性能。
- 優化JVM參數:通過調整JVM參數可以使得系統更加穩定,減少因為垃圾回收導致的性能下降。
- 使用硬件加速:可以通過使用SSD等更快的存儲設備來提高I/O性能,從而加快消息的處理速度。
總之,處理大量消息需要綜合考慮多個因素,包括集群配置、性能優化、順序消息處理、分區等。通過合理的配置和優化,可以實現高吞吐量和高性能的消息處理。同時,要根據業務需求和負載情況不斷進行性能監測和調整,以保持系統的穩定性和可伸縮性。
RocketMQ的消息存儲如何進行清理和歸檔?
RocketMQ 提供了消息存儲清理和歸檔的機制,以便管理消息存儲空間,刪除過期消息,并將歷史消息歸檔到其他存儲介質中。這些功能有助于維護消息隊列的性能和可用性。以下是關于 RocketMQ 消息存儲清理和歸檔的主要方面:
- 消息文件刪除策略: RocketMQ 支持多種消息文件刪除策略,可以在配置文件中進行設置。以下是一些常見的策略:
- 定時刪除策略: 您可以配置 RocketMQ 定期刪除過期的消息文件和索引文件。這樣,一旦消息文件中的消息過期,RocketMQ 將自動刪除它們。
- 空間滿策略: 如果存儲磁盤空間達到一定限制,RocketMQ 可以自動刪除最早的消息文件,以釋放磁盤空間。這個策略確保了存儲空間不會無限制地增長。
- 指定時間段刪除策略: 您可以配置 RocketMQ 只刪除特定時間段內的消息文件,以保留歷史消息。
- 消息歸檔: RocketMQ 允許您將歷史消息歸檔到其他存儲介質中,以減小消息服務器的存儲負擔。歸檔通常涉及將消息轉移到長期存儲(如云存儲或本地歸檔系統)中。歸檔可以手動觸發,也可以自動觸發,具體取決于您的需求。
- 歷史消息訪問: 盡管消息被歸檔,RocketMQ 仍然提供了訪問歷史消息的機制。通過合適的歸檔系統或者存儲介質,您可以檢索和訪問歷史消息,以滿足合規性要求或其他業務需求。
需要注意的是,清理和歸檔消息不是 RocketMQ 的核心功能,而是輔助功能。您需要根據自己的需求和業務場景來配置和管理消息的清理和歸檔策略。確保配置合理的清理策略以防止存儲空間耗盡,并根據業務需求進行消息的歸檔操作,以保留歷史消息數據。同時,歸檔后的消息可以根據需要進行合適的檢索和恢復,以滿足特定的數據需求。
RocketMQ 的消息是如何進行存儲的?
RocketMQ是采用分布式存儲的方式來存儲消息的。每個Broker的存儲結構主要包括:CommitLog、ConsumeQueue和IndexFile。
- CommitLog是消息存儲的物理文件,存儲了所有消息的主題、標簽、時間戳等基本信息和消息體。每個Broker上的CommitLog被當前機器上的所有ConsumeQueue共享。
- ConsumeQueue是消息的邏輯隊列,存儲了具有相同屬性(如Topic、隊列ID等)的消息。每個Broker上有多個ConsumeQueue,每個Topic的消息都對應一個ConsumeQueue。Consu meQueue采用順序寫入、隨機讀取的方式存儲消息,同時支持高效的預寫日志和刷盤策略。
- IndexFile是消息索引文件,存儲了消息在CommitLog中的偏移量和消息物理偏移量對應關系,采用Hash索引方式加速定位。
RocketMQ通過這種分布式存儲方式可以高效地存儲和訪問大量消息,同時也具有良好的可擴展性和可靠性。
RocketMQ 的事務消息是如何實現的?有什么用途?
RocketMQ的事務消息是通過兩階段提交(Two-phase Commit)協議實現的。具體實現步驟如下:
- 發送方將半事務消息發送至RocketMQ服務端,由于消息為半事務消息,在未收到生產者對該消息的二次確認前,此消息被標記成“暫不能投遞”狀態,不會被消費。
- 發送方開始執行本地事務邏輯。可能是一系列的數據庫更新、文件寫入等操作,他們要么全部成功,要么全部回滾。
- 發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半事務消息,訂閱方將不會接受該消息。
- 如果二次確認時,發送方的本地事務沒有執行完成,則可以向服務端返回 Unknown 狀態,服務端收到 Unknown 狀態則會等一段時間后,重新向發送放發起狀態確認。如果發送方多次返回 Unknown 狀態,服務端則會直接丟棄這一條消息。
RocketMQ 的事務消息機制是為了解決分布式事務問題而設計的,它適用于需要確保一系列操作的一致性的場景,如訂單支付、庫存扣減、資金結算等。主要的用途有:
- 分布式事務: 事務消息用于支持分布式系統中的分布式事務。它可以確保涉及多個操作的事務要么全部成功,要么全部失敗,以維護數據的一致性。典型的應用包括訂單支付、庫存扣減、資金結算等。
- 消息可靠性: 事務消息與普通消息一樣,具有消息的可靠性傳遞特性。一旦事務消息被成功發送到 RocketMQ 服務器,它將被存儲和傳遞,不會丟失。
- Exactly Once 語義: RocketMQ 事務消息提供了"Exactly Once"語義,確保消息要么完全提交,要么完全回滾,不會出現部分提交或回滾的情況。
使用事務消息,您可以更好地處理這些復雜的業務流程,確保數據的完整性和一致性。但請注意,事務消息也需要謹慎使用,因為它可能會引入一些復雜性,并影響系統的性能和可伸縮性。
RocketMQ 如何保證消息順序?
RocketMQ 提供了順序消息機制,用來保證一組消息的局部有序性,具體實現步驟如下:
- Producer 在發送消息時,通過設置一個 MessageQueueSelector 方法,將一組有順序的消息,依次發送到對應 Topic 下的同一個 MessageQueue 上。而 MessageQueue 是能夠保證 FIFO 先進先出的,這樣就可以保證一組有順序的消息在 Broker 上是有序的。
- Consumer 在配置 MessageListener 時,需要指定為 MessageListenerOrderly 實現類。這樣就能保證一組有序的消息可以按照發送時的順序進入 Consumer 進行處理,從而保證這一組消息的順序。
但是在使用順序消息時,有幾個需要注意的問題:
- RocketMQ 的順序消息機制只保證一組消息的局部有序性,而并不保證所有消息的全局有序性。應用需要自行定義消息中的一些標識符來確定消息的順序。
- 順序消息機制會給 MessageQueue 添加線程鎖,這會降低 Consumer 的消息吞吐量。
- 如果 Consumer 消費消息出現錯誤,會將整個 MessageQueue 阻塞,而無法單獨重試這一條消息。這樣非常容易導致 Topic產生大量的消息積壓,因此使用順序消息要盡量保證 Consumer 的消息處理正確性。
RocketMQ 的廣播消息和集群消息有什么區別?
廣播消息和集群消息是 RocketMQ 的兩種不同的消息消費模式。其中
- 廣播模式意味著一條消息會被發送到所有訂閱了這個主題 Topic 的消費者,而所有消費者都會收到相同的消息副本。
- 集群模式意味著一條消息只會分發給訂閱了這個主題 Topic 的同一個消費者組中的一個消費者處理。每個消費者組只會處理一次消息。
他們的區別主要提現在實現方式以及適用場景上。
- 集群模式在 Broker 端統一管理每個消費者組的消費進度,對消費進度的管理是嚴格的。這樣,每次消費者服務啟動后,都可以從上一次消費的進度開始開始進行消費。 而廣播模式是交由每個消費者自行管理消費進度,消費進度的管理是不嚴格的,容易產生丟失。當消費者服務啟動后,如果本地的消費進度丟失了,就只能消費到啟動之后的消息,而無法從上一次消費的進度開始消費。因此,廣播模式對于消息的連續性保證是不強的。
- 集群模式適用于大多數常規對消息安全敏感的業務場景,例如訂單處理、庫存管理等。多個消費者協同工作可以提高消息的處理能力并實現消息的負載均衡。而廣播模式適用于一些對消息安全不太敏感的特殊業務場景。例如日記記錄、時間通知等。這些場景下所有的消費者都需要處理相同的消息。
RocketMQ 提供了哪些消息過濾的機制?
RocketMQ消息過濾分為兩種:基于表達式的過濾和基于類模式的過濾。
基于表達式的過濾有兩種模式:TAG模式和SQL92模式。其中,RocketMQ 允許為每一條消息設置一個 Tag 標簽。
- TAG 模式下,Consumer 可以選擇訂閱特定的 TAG,對消息進行過濾。TAG模式根據消息的屬性進行過濾,適合于簡單的場景;SQL92模式可以支持更復雜的邏輯,可以使用SQL92的語法進行過濾。這種過濾方式由于是在 ConsumeQueue 文件中直接進行過濾,所以性能比較高。
- SQL92 模式下,Consumer 可以通過設定一個滿足 SQL92 標準的 SQL 語句,定制相對比較復雜的過濾邏輯,例如數值比較等。同時,也可以引用消息中添加的其他自定義屬性,定制多維度的過濾條件組合。
基于類模式的過濾是使用用戶自定義的過濾器類來實現消息過濾。消費者可以在消息監聽器中編寫自定義邏輯來實現更復雜的消息過濾機制。在這種模式下,消費者可以完全控制消息的過濾邏輯,適用于需要導讀定制和特殊處理的場景。但是同樣,對Consumer 的編碼能力要求更高。
RocketMQ 在進行消息過濾時,都會將消息過濾的邏輯上推到 Broker 端執行。這樣可以減少不必要的網絡數據傳遞。但是,同時也會給 Broker 增加業務復雜性。因此,客戶端需要根據不同的業務場景,選擇合適的過濾機制。
RocketMQ 的 Producer 是如何發送消息的?
RocketMQ的Producer有三種消息發送模式,RocketMQ允許在不同的場景下使用不同的消息發送模式,以滿足不同的業務需求。:
- 同步發送(Sync Send):這是默認的發送模式。在同步發送模式下,發送者發送一條消息后會等待 Broker 的響應,直到 Broker 確認收到消息并返回結果。如果發送失敗,將會拋出異常。這種模式下,Producer 可以確保消息成功發送到了 Broker,消息安全性更高。但是,由于需要等待 Broker 的響應,可能會引起較大的發送消息延遲。
- 異步發送(Async Send):這種模式下,Producer發送消息后不會等待Broker的響應,而是繼續執行后續操作。Producer可以注冊回調函數,在消息發送完成后Broker會異步調用回調函數。這種模式可以提高 Producer 的吞吐量和響應速度,但是,Producer 需要在回調函數中自行處理 Broker 的消息響應,對客戶端代碼要求較高。并且,在這種模式下,如果消息發送出現問題,Producer 只能通過回調函數處理,這樣 Producer 處理消息錯誤的時機是有延遲的。
- 單向發送(Oneway Send):這種模式下,Producer 發送消息后不關心消息是否被 Broker 成功接收和存儲,也不等待 Broker 的響應。這樣,Producer 發送消息的效率是最高的。但是,消息的安全性就無法保證。這種模式通常適用于那些強調消息吞吐量而不關心消息可靠性的場景,例如日志消息。