Apache RocketMQ 消息過濾的實現原理與騰訊云的使用實踐

導語

本文將系統闡述 Apache RocketMQ 消息過濾機制的技術架構與實踐要點。首先從業務應用場景切入,解析消息過濾的核心價值;接著介紹 Apache RocketMQ 支持的兩種消息過濾實現方式,幫助讀者建立基礎認知框架;隨后深入剖析 SQL 語法過濾與標簽(Tag)過濾的技術實現的核心原理以及規則限制;最后介紹騰訊云在消息過濾性能優化方面的具體實踐。

消息過濾的應用場景

消息過濾功能指消息生產者向 Topic 中發送消息時,設置消息屬性對消息進行分類,消費者訂閱 Topic 時,根據消息屬性設置過濾條件對消息進行過濾,只有符合過濾條件的消息才會被投遞到消費端進行消費。

消費者訂閱 Topic 時若未設置過濾條件,無論消息發送時是否有設置過濾屬性,Topic 中的所有消息都將被投遞到消費端進行消費。

消息過濾功能可以應用在如下場景:

用戶想使用一個 Topic,但是被多個 Group 訂閱,每個 Group 只想消費其中一部分消息。

默認情況下,消費組1和消費組2 會全量消費 Topic 里面的所有消息。但如果我們想選擇性的消費里面一些消息的時候,就可以使用消息過濾功能對消息進行區分過濾。

消息過濾原理介紹

目前消息過濾主要支持兩種過濾方式,分別是 SQL 過濾和 Tag 過濾。其核心邏輯都是在發送消息的時候,設置一些自定義字段,然后通過消費組訂閱的時候指定對應的過濾表達式,消息在服務端進行過濾后,才被消費組消費。

Tag 過濾

代碼示例

在介紹原理之前,我們先直觀的看一下用法,以 RocketMQ 4.x 的 SDK 為例:

// 創建消息生產者
DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);
// 創建消息實例,設置topic和消息內容,設置Tag1
Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
// 發送消息
SendResult sendResult = producer.send(msg, 3000);
System.out.println(sendResult + ":" + new String(msg.getBody()));
producer.shutdown();

在發送消息的時候,我們可以給消息指定 Tag。而在消費組這一側,我們可以訂閱不同的 Tag,例如使用星號(*)匹配全部 Tag。

// 創建消息消費者
DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);
// 訂閱topic 訂閱所有的TAG
pushConsumer.subscribe(TOPIC_NAME, "*");
//訂閱指定的TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1");
//訂閱多個TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");
// 省略其他代碼
核心原理

那么,這個功能在底層究竟是怎么實現的呢?我們都知道,RocketMQ 在存儲消息的時候,會先把消息寫入 CommitLog。CommitLog 可以看作是 RocketMQ 存儲消息的一個大日志,所有消息都會先寫到這里。

寫完 CommitLog 之后,RocketMQ 還會把消息的相關信息再寫入ConsumeQueue。ConsumeQueue 可以理解為是 CommitLog 的一個索引,它里面存儲的不是完整的消息內容,而是消息的一些關鍵信息,方便消費者快速找到和讀取消息。

具體來說,當 RocketMQ 寫入一條原始消息到 CommitLog 之后,它會提取這條消息的一些重要信息,比如這條消息在 CommitLog 里的物理偏移量(Offset)、消息的大小,還有這條消息 Tag 的哈希碼(Hashcode),然后把這些信息寫入到 ConsumeQueue 中。這樣一來,消費者就可以通過 ConsumeQueue 快速定位到 CommitLog 里對應的消息,而不需要每次都去遍歷整個 CommitLog,大大提高了消息消費的效率。

在發送消息的的時候,RocketMQ 會將消息 Tag 的 Hashcode 寫入到 ConsumeQueue 字段中。

msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
//上面這個方法
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {if (Strings.isNullOrEmpty(tags)) {return 0; }return tags.hashCode();
}

當用戶在消費消息時,服務端會從 ConsumeQueue 中一條一條地檢查消息,并對這些消息進行過濾。過濾的時候,服務端會用 ConsumeQueue 中存儲的消息 Tag 的 Hashcode,和當前訂閱組所訂閱的 Tag 進行匹配。這個匹配的過程,是在 RocketMQ 的核心代碼 org.apache.rocketmq.store.DefaultMessageStore#getMessage 方法里實現的。簡單來說,就是服務端會根據訂閱組訂閱的 Tag,從 ConsumeQueue 中找出符合條件的消息,然后交給用戶去消費。

if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {//省略其他代碼continue;
}

匹配邏輯

@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {return true;
}
if (subscriptionData.isClassFilterMode()) {return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)||     subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

可以看到,只有當訂閱的 Tag 是 “*”(表示訂閱所有消息),或者消息的 Tag 和訂閱的 Tag 匹配上的時候,消費者才能消費到這條消息。這里您可能會有疑問,為啥要用 CodeSet 呢?其實是因為這里會把用戶訂閱組訂閱的 Tag 進行拆分,然后把這些拆分后的 Tag 放到 CodeSet 里。這樣在匹配的時候,就可以快速判斷消息的 Tag 是否在 CodeSet 中,從而決定這條消息能不能被消費。

org.apache.rocketmq.remoting.protocol.filter.FilterAPI#buildSubscriptionData(java.lang.String, java.lang.String)
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {Arrays.stream(tags).map(String::trim).filter(tag -> !tag.isEmpty()).forEach(tag -> {subscriptionData.getTagsSet().add(tag);subscriptionData.getCodeSet().add(tag.hashCode());});
} else {throw new Exception("subString split error");
}

然后,CodeSet 里面存的就是多個 Hashcode 。

這里您可能又會有疑問:要是在服務端通過哈希碼來匹配,那萬一兩個不同的 Tag 經過哈希運算之后,得到的哈希碼是一樣的,這樣不就匹配錯了嗎?沒錯,這種情況確實有可能發生。

為了避免這種問題,客戶端還會再做一層過濾,使用真正的 Tag 字符串再過濾一次,這樣就能保證最終消費到的消息一定是符合訂閱要求的,不會出現因為哈希沖突而導致的匹配錯誤。

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}
}

綜上所述,我們就得到了這樣一個 Tag 過濾的工作流程。

規則限制
  • 發送消息只能設置一個 Tag。

  • 多個 Tag 之間為或的關系,不同 Tag 間使用兩個豎線(||)隔開。例如,Tag1||Tag2||Tag3,表示標簽為 Tag1 或 Tag2 或 Tag3 的消息都滿足匹配條件,都會被發送給消費者進行消費。

  • 多個 Tag 的順序也要保持一致,否則會導致訂閱關系不一致,例如 Tag1||Tag2 和 Tag2||Tag1 就是不同的。

SQL 過濾

從上面可以看到,Tag 過濾比較簡單,通過在 ConsumeQueue 直接進行匹配,效率比較高,但是能支持的消息過濾比較簡單,如果想通過消息的某個擴展字段來進行匹配,做一些更復雜的邏輯,就需要使用 SQL 過濾了。

代碼示例

在發送方我們可以設置一下 putUserProperty,來擴展字段。

public static void main(String[] args) throws Exception {// 創建消息生產者DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);// 創建消息實例,設置topic和消息內容,設置自定義屬性Message msg = new Message(TOPIC_NAME, "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));msg.putUserProperty("key1","value1");// 發送消息SendResult sendResult = producer.send(msg, 3000);System.out.println(sendResult + ":" + new String(msg.getBody()));producer.shutdown();
}

當消費組去消費消息的時候,它可以用 SQL 表達式來精準地篩選消息。例如我們可以設定條件,像 “Key1 必須等于 Value1”,或者設置更復雜的條件,用 “AND” 和 “OR” 這些邏輯運算符把多個條件組合起來。SQL 過濾能按照設置的條件,精確地過濾出符合條件的消息。

public static void main(String[] args) throws Exception {// 創建消息消費者DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);//訂閱所有消息pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));// 訂閱topic 訂閱單個key的sqlpushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));//訂閱多個屬性pushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL  AND key1='value1' AND key2='value2'"));// 省略其他代碼
}
核心原理

從前面的介紹可以知道,ConsumeQueue 里并沒有存儲用于 SQL 表達式匹配的相關信息。所以要是想根據 SQL 表達式來匹配消息,就只能把 CommitLog 里的消息讀取出來,然后進行運算處理。實現這部分功能的代碼,同樣是在 org.apache.rocketmq.store.DefaultMessageStore#getMessage 這個方法里。

if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {                                              // 省略部分代碼continue;
}
//匹配邏輯。          
if (tempProperties == null && msgBuffer != null) {tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}

可以看到,這里是把所有消息的屬性字段都放到 MessageEvaluationContext 里面,然后根據用戶寫的 SQL 表達式來對消息進行過濾。這里用到的 getCompiledExpression() 方法,其實就是通過 SqlFilter 把用戶寫的 SQL 表達式編譯成一個 BooleanExpression,最終計算結果就是匹配或者不匹配。

如果您想了解這個功能的完整 SQL 實現細節,可以去深入研究 Rocketmq-filter 模塊。

@Override
public Expression compile(final String expr) throws MQFilterException {return SelectorParser.parse(expr);
}

和 Tag 的過濾方式不太一樣,BooleanExpression 需要讀取真實的消息內容,并且是基于消息的實際字符串去做匹配。這種方式的好處是客戶端不需要額外做什么配置,但缺點是性能相對會差一些,因為每次都要讀取消息內容來匹配。

為了提升性能,社區想了個辦法,就是在 ConsumeQueue 里增加一個擴展字段。不過要使用這個功能,需要先打開 enableConsumeQueueExt 這個開關。打開之后,就可以利用布隆過濾器(Bloom Filter)來做優化了。

布隆過濾器的原理大概是這樣的:它會根據消息的屬性生成一個序列化的布隆過濾器數據。在過濾的時候,如果布隆過濾器判斷消息不符合條件,那這條消息肯定是不符合的,就可以直接過濾掉;如果布隆過濾器判斷消息符合條件,那還需要進一步做精確匹配。

綜上所述,我們就得到了這樣一個 SQL 過濾的工作流程。

規則限制

由于 SQL 屬性過濾是生產者定義消息屬性,消費者設置 SQL 過濾條件,計算之后,可能有不同的結果,因此服務端的處理方式如下:

  • 異常情況處理:如果過濾條件的表達式計算拋異常,消息默認被過濾,不會被投遞給消費者。例如比較數字和非數字類型的值。

  • 空值情況處理:如果過濾條件的表達式計算值為 null 或不是布爾類型(true 和 false),則消息默認被過濾,不會被投遞給消費者。例如發送消息時不存在某個屬性,在訂閱時過濾條件中直接使用該屬性,則過濾條件的表達式計算結果為 null。

  • 類型不符處理:如果消息自定義屬性為浮點型,但過濾條件中使用整數進行判斷,則消息默認被過濾,不會被投遞給消費者。

雖然這種方式是靈活的,但是在消息頭中還是不建議設置太多的值,因為總的消息頭部屬性有大小限制(32k),內置的已經占用了不少。超長之后,可能導致消息發送或者消費異常。

兩種過濾方式的對比總結

過濾方式TAG過濾SQL過濾
是否支持多個過濾條件
性能
處理方客戶端+服務端服務端
易用性一般

使用建議

合理劃分主題和 Tag 標簽。

從消息的過濾機制和主題的原理機制可以看出,業務消息的拆分可以基于主題進行篩選,也可以基于主題內消息的 Tag 標簽及屬性進行篩選。關于拆分方式的選擇,應遵循以下原則:

  • 消息類型是否一致:不同類型的消息,如順序消息和普通消息需要使用不同的主題進行拆分,無法通過 Tag 標簽進行分類。

  • 業務域是否相同:不同業務域和部門的消息應該拆分不同的主題。例如物流消息和支付消息應該使用兩個不同的主題;同樣是一個主題內的物流消息,普通物流消息和加急物流消息則可以通過不同的 Tag 進行區分。

  • 消息量級和重要性是否一致:如果消息的量級規模存在巨大差異,或者說消息的鏈路重要程度存在差異,則應該使用不同的主題進行隔離拆分。

騰訊云消息過濾軌跡展示

從上述消息過濾的原理介紹可以發現,如果消息被過濾掉了,用戶收不到這條消息,和消息本身沒有被消費的情況看起來是一樣的。這時候用戶可能會有一些疑惑:在 RocketMQ 的管理控制臺(dashboard)上,消息顯示是“已消費”狀態,可實際上自己并沒收到。

在騰訊云 TDMQ RocketMQ 版上,我們針對過濾條件的查詢進行了優化。通過這個優化,能夠區分展示消息過濾和真正被消費兩種情況的消息軌跡展示。這樣一來,用戶就能很直觀地看到消息到底是被過濾掉了,還是真正被消費了。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/85475.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/85475.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/85475.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

安卓JetPack篇——LifeCycle原理

LifeCycle 一、什么是Lifecycle 具備宿主生命周期感知能力的組件。它能持有組件&#xff08;如Activity或Fragment&#xff09;生命周期狀態的信息&#xff0c;并且允許其他觀察者監聽宿主的狀態。 二、基本原理 1、安卓10以下版本 隱形的Fragment注入在LifecycleOwner&am…

CSS 圓角邊框屬性(`border-radius`)筆記

一、作用&#xff1a; 用于設置元素四個角的圓角效果&#xff0c;讓元素不再死板&#xff0c;更加柔和。 二、基本語法&#xff1a; border-radius: 圓角大小; 單位&#xff1a;px&#xff08;像素&#xff09;或 %&#xff08;百分比&#xff09; 示例&#xff1a; div { ?…

python自助棋牌室管理系統

目錄 技術棧介紹具體實現截圖系統設計研究方法&#xff1a;設計步驟設計流程核心代碼部分展示研究方法詳細視頻演示試驗方案論文大綱源碼獲取/詳細視頻演示 技術棧介紹 Django-SpringBoot-php-Node.js-flask 本課題的研究方法和研究步驟基本合理&#xff0c;難度適中&#xf…

計算機——硬盤分區和格式化

硬盤驅動器 硬盤驅動器&#xff08;HDD&#xff09;是一種成熟、經濟的大容量存儲解決方案。它的核心優勢在于每GB成本低和超大容量。然而&#xff0c;其機械結構帶來的速度瓶頸、噪音、功耗和對物理沖擊的敏感性是其主要的缺點。隨著 SSD 價格的持續下降和性能的絕對領先&…

從IEC到UL:技術主權競爭下的斷路器合規性戰略

1 國際標準體系割裂的現狀 在全球低壓電器領域&#xff0c;國際標準體系呈現出日益明顯的割裂態勢。當前主要存在四大標準體系&#xff1a;國際通用的??IEC標準體系??、歐洲采用的??EN標準體系??、北美實施的??UL與CSA標準體系??&#xff0c;以及具有地域特色的?…

第十六屆藍橋杯_省賽B組(D).產值調整

題目如下 這道題看似很簡單&#xff0c;其實還是得觀察一下&#xff0c;要不然就會… 話不多說回到題目&#xff0c;這個題的坑就在于當A,B,C三個產值相同的時候&#xff0c;再怎么變還是之前的產值&#xff0c;或者也可以通過另外一種方法理解&#xff1a; 通過一個案例來舉…

設計模式 | 單例模式——餓漢模式 懶漢模式

單例模式 文章目錄 單例模式一、餓漢模式&#xff08;Eager Initialization&#xff09;1. 定義2. 特點3. 餓漢單例模式&#xff08;定義時-類外初始化&#xff09;4. 實現細節 二、懶漢模式&#xff08;Lazy Initialization&#xff09;1. 定義2. 特點3. 懶漢單例模式&#xf…

dify本地部署及添加ollama模型(ubuntu24.04)

說明&#xff1a;ubuntu是虛擬機的&#xff0c;用的橋接模式&#xff0c;與本地同局域網不同ip地址。 參考VM虛擬機網絡配置&#xff08;ubuntu24橋接模式&#xff09;&#xff1a;配置靜態IP前提&#xff1a;需要有docker及docker-compose環境 參考ubuntu24安裝docker及docker…

Python爬蟲實戰:研究multiprocessing相關技術

一、引言 1.1 研究背景與意義 隨著互聯網信息的爆炸式增長,網絡爬蟲已成為獲取海量數據的重要工具。傳統的單線程爬蟲在面對大規模數據采集任務時效率低下,無法充分利用現代計算機多核 CPU 的優勢。多線程爬蟲雖然在一定程度上提高了效率,但受限于 Python 的全局解釋器鎖(…

6.18 redis面試題 日志 緩存淘汰過期刪除 集群

Redis有哪2種持久化方式&#xff1f;分別的優缺點是什么&#xff1f; Redis 的重寫 AOF 過程是由后臺子進程 bgrewriteaof 來完成的。 過期刪除策略和內存淘汰策略有什么區別&#xff1f; 內存淘汰策略是在內存滿了的時候&#xff0c;redis 會觸發內存淘汰策略&#xff0c;來淘…

什么時候會發生內存泄漏?

1. 內存泄漏是什么&#xff1f; 定義&#xff1a;內存泄漏是指程序中的對象已經不再需要&#xff0c;但由于被其他對象錯誤引用&#xff0c;導致垃圾回收器&#xff08;GC&#xff09;無法回收它&#xff0c;從而長期占用內存空間的現象。 2. 內存泄漏的危害 問題具體表現內存…

用RSA算法模擬類的適配器模式

“RAS算法”這個術語本身并不常見或標準&#xff0c;它可能指向兩個主要領域的不同概念&#xff0c;具體取決于上下文&#xff1a; 更可能是拼寫錯誤&#xff1a;指 RSA 算法&#xff08;密碼學&#xff09; 這是最常見的情況。 “RAS” 極有可能是 “RSA” 的拼寫錯誤。RSA 算…

CARSIM-與C#自動化測試方案

using System; using System.Runtime.InteropServices; using System.Collections.Generic;namespace CarSimAutomation {/// <summary>/// CarSim COM 自動化測試接口/// 封裝所有 CarSim COM 功能用于自動化測試/// </summary>[ComVisible(true)][ClassInterface…

企微CRM系統中的任務分配與效率提升技巧

在數字化管理時代&#xff0c;企業微信(企微)與CRM系統的深度融合&#xff0c;為企業提供了更高效的客戶管理與團隊協作方案。企微CRM軟件不僅整合了客戶溝通、銷售跟進、數據分析等功能&#xff0c;還能通過智能任務分配優化團隊效率。本文將深入探討企微CRM管理系統的任務分配…

day66—BFS—最短的橋(LeetCode-934)

題目描述 給你一個大小為 n x n 的二元矩陣 grid &#xff0c;其中 1 表示陸地&#xff0c;0 表示水域。 島 是由四面相連的 1 形成的一個最大組&#xff0c;即不會與非組內的任何其他 1 相連。grid 中 恰好存在兩座島 。 你可以將任意數量的 0 變為 1 &#xff0c;以使兩座…

FramePack 安裝指南(中文)

FramePack 安裝指南&#xff08;中文&#xff09; -Windows FramePack 是最前沿的 AI 視頻生成框架&#xff0c;以極小的硬件需求顛覆視頻創作&#xff01;它能在僅 6GB 筆記本 GPU 內存上&#xff0c;驅動 13B 模型以 30 FPS 生成超長 120 秒視頻&#xff0c;幾乎無內容限制&…

Redis Sentinel 非集群模式高可用部署指南

1. Sentinel 在非集群模式的定位 一句話&#xff1a;在單主多從架構中&#xff0c;用 Sentinel 替你盯哨——探測故障、選舉新主、通知客戶端。 核心四職能&#xff1a; 職能作用點Monitoring定時 PING 主從&#xff0c;自身也互相探測Notification通過日志/PubSub/外部調用報…

2025Java面試八股文

文章目錄 Java基礎JVM多線程SpringSpring Boot數據庫與SQL分布式系統其他 Java基礎 自動裝箱與拆箱&#xff1a;Java中基礎數據類型與包裝類之間的轉換。例如&#xff0c;Integer x 1; 是裝箱&#xff0c;int y x; 是拆箱。Object類常用方法&#xff1a;如clone()、getClass…

寶塔安裝nginx-rtmp,音視頻直播

前置&#xff1a;需要自己開發音視頻直播&#xff0c; 注意不是實時音視頻&#xff0c;不是一對一視頻聊天&#xff0c;不是視頻會議 方案有 srs &#xff0c;nginx-rtmp&#xff0c;live555&#xff0c;node-media-server&#xff0c;EasyDarwin等 今天是說 nginx-rtmp 怎么…

基于微信小程序和深度學習的寵物照片拍攝指導平臺的設計與實現

文章目錄 摘要前言緒論1. 課題背景2. 國內外現狀與趨勢2.1 國內研究現狀2.2 國外研究現狀2.3 發展趨勢3. 課題內容相關技術與方法介紹1. 微信小程序開發技術2. 深度學習模型選型2.1 MobileNetV22.2 ResNet-503. 系統架構設計4. 關鍵技術實現4.1 實時拍攝指導4.2 多模態建議生成…