導語
本文將系統闡述 Apache RocketMQ 消息過濾機制的技術架構與實踐要點。首先從業務應用場景切入,解析消息過濾的核心價值;接著介紹 Apache RocketMQ 支持的兩種消息過濾實現方式,幫助讀者建立基礎認知框架;隨后深入剖析 SQL 語法過濾與標簽(Tag)過濾的技術實現的核心原理以及規則限制;最后介紹騰訊云在消息過濾性能優化方面的具體實踐。
消息過濾的應用場景
消息過濾功能指消息生產者向 Topic 中發送消息時,設置消息屬性對消息進行分類,消費者訂閱 Topic 時,根據消息屬性設置過濾條件對消息進行過濾,只有符合過濾條件的消息才會被投遞到消費端進行消費。
消費者訂閱 Topic 時若未設置過濾條件,無論消息發送時是否有設置過濾屬性,Topic 中的所有消息都將被投遞到消費端進行消費。
消息過濾功能可以應用在如下場景:
用戶想使用一個 Topic,但是被多個 Group 訂閱,每個 Group 只想消費其中一部分消息。
默認情況下,消費組1和消費組2 會全量消費 Topic 里面的所有消息。但如果我們想選擇性的消費里面一些消息的時候,就可以使用消息過濾功能對消息進行區分過濾。
消息過濾原理介紹
目前消息過濾主要支持兩種過濾方式,分別是 SQL 過濾和 Tag 過濾。其核心邏輯都是在發送消息的時候,設置一些自定義字段,然后通過消費組訂閱的時候指定對應的過濾表達式,消息在服務端進行過濾后,才被消費組消費。
Tag 過濾
代碼示例
在介紹原理之前,我們先直觀的看一下用法,以 RocketMQ 4.x 的 SDK 為例:
// 創建消息生產者
DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);
// 創建消息實例,設置topic和消息內容,設置Tag1
Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
// 發送消息
SendResult sendResult = producer.send(msg, 3000);
System.out.println(sendResult + ":" + new String(msg.getBody()));
producer.shutdown();
在發送消息的時候,我們可以給消息指定 Tag。而在消費組這一側,我們可以訂閱不同的 Tag,例如使用星號(*)匹配全部 Tag。
// 創建消息消費者
DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);
// 訂閱topic 訂閱所有的TAG
pushConsumer.subscribe(TOPIC_NAME, "*");
//訂閱指定的TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1");
//訂閱多個TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");
// 省略其他代碼
核心原理
那么,這個功能在底層究竟是怎么實現的呢?我們都知道,RocketMQ 在存儲消息的時候,會先把消息寫入 CommitLog。CommitLog 可以看作是 RocketMQ 存儲消息的一個大日志,所有消息都會先寫到這里。
寫完 CommitLog 之后,RocketMQ 還會把消息的相關信息再寫入ConsumeQueue。ConsumeQueue 可以理解為是 CommitLog 的一個索引,它里面存儲的不是完整的消息內容,而是消息的一些關鍵信息,方便消費者快速找到和讀取消息。
具體來說,當 RocketMQ 寫入一條原始消息到 CommitLog 之后,它會提取這條消息的一些重要信息,比如這條消息在 CommitLog 里的物理偏移量(Offset)、消息的大小,還有這條消息 Tag 的哈希碼(Hashcode),然后把這些信息寫入到 ConsumeQueue 中。這樣一來,消費者就可以通過 ConsumeQueue 快速定位到 CommitLog 里對應的消息,而不需要每次都去遍歷整個 CommitLog,大大提高了消息消費的效率。
在發送消息的的時候,RocketMQ 會將消息 Tag 的 Hashcode 寫入到 ConsumeQueue 字段中。
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
//上面這個方法
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {if (Strings.isNullOrEmpty(tags)) {return 0; }return tags.hashCode();
}
當用戶在消費消息時,服務端會從 ConsumeQueue 中一條一條地檢查消息,并對這些消息進行過濾。過濾的時候,服務端會用 ConsumeQueue 中存儲的消息 Tag 的 Hashcode,和當前訂閱組所訂閱的 Tag 進行匹配。這個匹配的過程,是在 RocketMQ 的核心代碼 org.apache.rocketmq.store.DefaultMessageStore#getMessage 方法里實現的。簡單來說,就是服務端會根據訂閱組訂閱的 Tag,從 ConsumeQueue 中找出符合條件的消息,然后交給用戶去消費。
if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {//省略其他代碼continue;
}
匹配邏輯
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {return true;
}
if (subscriptionData.isClassFilterMode()) {return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)|| subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
可以看到,只有當訂閱的 Tag 是 “*”(表示訂閱所有消息),或者消息的 Tag 和訂閱的 Tag 匹配上的時候,消費者才能消費到這條消息。這里您可能會有疑問,為啥要用 CodeSet 呢?其實是因為這里會把用戶訂閱組訂閱的 Tag 進行拆分,然后把這些拆分后的 Tag 放到 CodeSet 里。這樣在匹配的時候,就可以快速判斷消息的 Tag 是否在 CodeSet 中,從而決定這條消息能不能被消費。
org.apache.rocketmq.remoting.protocol.filter.FilterAPI#buildSubscriptionData(java.lang.String, java.lang.String)
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {Arrays.stream(tags).map(String::trim).filter(tag -> !tag.isEmpty()).forEach(tag -> {subscriptionData.getTagsSet().add(tag);subscriptionData.getCodeSet().add(tag.hashCode());});
} else {throw new Exception("subString split error");
}
然后,CodeSet 里面存的就是多個 Hashcode 。
這里您可能又會有疑問:要是在服務端通過哈希碼來匹配,那萬一兩個不同的 Tag 經過哈希運算之后,得到的哈希碼是一樣的,這樣不就匹配錯了嗎?沒錯,這種情況確實有可能發生。
為了避免這種問題,客戶端還會再做一層過濾,使用真正的 Tag 字符串再過濾一次,這樣就能保證最終消費到的消息一定是符合訂閱要求的,不會出現因為哈希沖突而導致的匹配錯誤。
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}
}
綜上所述,我們就得到了這樣一個 Tag 過濾的工作流程。
規則限制
-
發送消息只能設置一個 Tag。
-
多個 Tag 之間為或的關系,不同 Tag 間使用兩個豎線(||)隔開。例如,Tag1||Tag2||Tag3,表示標簽為 Tag1 或 Tag2 或 Tag3 的消息都滿足匹配條件,都會被發送給消費者進行消費。
-
多個 Tag 的順序也要保持一致,否則會導致訂閱關系不一致,例如 Tag1||Tag2 和 Tag2||Tag1 就是不同的。
SQL 過濾
從上面可以看到,Tag 過濾比較簡單,通過在 ConsumeQueue 直接進行匹配,效率比較高,但是能支持的消息過濾比較簡單,如果想通過消息的某個擴展字段來進行匹配,做一些更復雜的邏輯,就需要使用 SQL 過濾了。
代碼示例
在發送方我們可以設置一下 putUserProperty,來擴展字段。
public static void main(String[] args) throws Exception {// 創建消息生產者DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);// 創建消息實例,設置topic和消息內容,設置自定義屬性Message msg = new Message(TOPIC_NAME, "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));msg.putUserProperty("key1","value1");// 發送消息SendResult sendResult = producer.send(msg, 3000);System.out.println(sendResult + ":" + new String(msg.getBody()));producer.shutdown();
}
當消費組去消費消息的時候,它可以用 SQL 表達式來精準地篩選消息。例如我們可以設定條件,像 “Key1 必須等于 Value1”,或者設置更復雜的條件,用 “AND” 和 “OR” 這些邏輯運算符把多個條件組合起來。SQL 過濾能按照設置的條件,精確地過濾出符合條件的消息。
public static void main(String[] args) throws Exception {// 創建消息消費者DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);//訂閱所有消息pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));// 訂閱topic 訂閱單個key的sqlpushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));//訂閱多個屬性pushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));// 省略其他代碼
}
核心原理
從前面的介紹可以知道,ConsumeQueue 里并沒有存儲用于 SQL 表達式匹配的相關信息。所以要是想根據 SQL 表達式來匹配消息,就只能把 CommitLog 里的消息讀取出來,然后進行運算處理。實現這部分功能的代碼,同樣是在 org.apache.rocketmq.store.DefaultMessageStore#getMessage 這個方法里。
if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { // 省略部分代碼continue;
}
//匹配邏輯。
if (tempProperties == null && msgBuffer != null) {tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}
可以看到,這里是把所有消息的屬性字段都放到 MessageEvaluationContext 里面,然后根據用戶寫的 SQL 表達式來對消息進行過濾。這里用到的 getCompiledExpression() 方法,其實就是通過 SqlFilter 把用戶寫的 SQL 表達式編譯成一個 BooleanExpression,最終計算結果就是匹配或者不匹配。
如果您想了解這個功能的完整 SQL 實現細節,可以去深入研究 Rocketmq-filter 模塊。
@Override
public Expression compile(final String expr) throws MQFilterException {return SelectorParser.parse(expr);
}
和 Tag 的過濾方式不太一樣,BooleanExpression 需要讀取真實的消息內容,并且是基于消息的實際字符串去做匹配。這種方式的好處是客戶端不需要額外做什么配置,但缺點是性能相對會差一些,因為每次都要讀取消息內容來匹配。
為了提升性能,社區想了個辦法,就是在 ConsumeQueue 里增加一個擴展字段。不過要使用這個功能,需要先打開 enableConsumeQueueExt 這個開關。打開之后,就可以利用布隆過濾器(Bloom Filter)來做優化了。
布隆過濾器的原理大概是這樣的:它會根據消息的屬性生成一個序列化的布隆過濾器數據。在過濾的時候,如果布隆過濾器判斷消息不符合條件,那這條消息肯定是不符合的,就可以直接過濾掉;如果布隆過濾器判斷消息符合條件,那還需要進一步做精確匹配。
綜上所述,我們就得到了這樣一個 SQL 過濾的工作流程。
規則限制
由于 SQL 屬性過濾是生產者定義消息屬性,消費者設置 SQL 過濾條件,計算之后,可能有不同的結果,因此服務端的處理方式如下:
-
異常情況處理:如果過濾條件的表達式計算拋異常,消息默認被過濾,不會被投遞給消費者。例如比較數字和非數字類型的值。
-
空值情況處理:如果過濾條件的表達式計算值為 null 或不是布爾類型(true 和 false),則消息默認被過濾,不會被投遞給消費者。例如發送消息時不存在某個屬性,在訂閱時過濾條件中直接使用該屬性,則過濾條件的表達式計算結果為 null。
-
類型不符處理:如果消息自定義屬性為浮點型,但過濾條件中使用整數進行判斷,則消息默認被過濾,不會被投遞給消費者。
雖然這種方式是靈活的,但是在消息頭中還是不建議設置太多的值,因為總的消息頭部屬性有大小限制(32k),內置的已經占用了不少。超長之后,可能導致消息發送或者消費異常。
兩種過濾方式的對比總結
過濾方式 | TAG過濾 | SQL過濾 |
---|---|---|
是否支持多個過濾條件 | 否 | 是 |
性能 | 高 | 中 |
處理方 | 客戶端+服務端 | 服務端 |
易用性 | 強 | 一般 |
使用建議
合理劃分主題和 Tag 標簽。
從消息的過濾機制和主題的原理機制可以看出,業務消息的拆分可以基于主題進行篩選,也可以基于主題內消息的 Tag 標簽及屬性進行篩選。關于拆分方式的選擇,應遵循以下原則:
-
消息類型是否一致:不同類型的消息,如順序消息和普通消息需要使用不同的主題進行拆分,無法通過 Tag 標簽進行分類。
-
業務域是否相同:不同業務域和部門的消息應該拆分不同的主題。例如物流消息和支付消息應該使用兩個不同的主題;同樣是一個主題內的物流消息,普通物流消息和加急物流消息則可以通過不同的 Tag 進行區分。
-
消息量級和重要性是否一致:如果消息的量級規模存在巨大差異,或者說消息的鏈路重要程度存在差異,則應該使用不同的主題進行隔離拆分。
騰訊云消息過濾軌跡展示
從上述消息過濾的原理介紹可以發現,如果消息被過濾掉了,用戶收不到這條消息,和消息本身沒有被消費的情況看起來是一樣的。這時候用戶可能會有一些疑惑:在 RocketMQ 的管理控制臺(dashboard)上,消息顯示是“已消費”狀態,可實際上自己并沒收到。
在騰訊云 TDMQ RocketMQ 版上,我們針對過濾條件的查詢進行了優化。通過這個優化,能夠區分展示消息過濾和真正被消費兩種情況的消息軌跡展示。這樣一來,用戶就能很直觀地看到消息到底是被過濾掉了,還是真正被消費了。