前言
本篇文章比較簡單,分別介紹RocketMQ支持幾種過濾機制,其原理和使用。
RocketMQ 提供了多種消息過濾機制,幫根據業務需求高效篩選消息,可以減少不必要的消息傳輸和處理。以下是其核心過濾機制及使用場景:
1. Tag 標簽過濾
- 原理:
每個消息發送時可附加一個 Tag(字符串標簽),消費者訂閱時指定一個或多個 Tag,Broker 會過濾出匹配 Tag 的消息投遞給消費者。 - 使用方式:
- 生產者:發送消息時設置
setTags
。Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
- 消費者:訂閱時指定 Tag(支持
*
表示全部,||
表示或關系)。consumer.subscribe("TopicTest", "TagA || TagB");
- 生產者:發送消息時設置
- 特點:
- 高效:Broker 端過濾,性能損耗低。
- 簡單:僅支持精確匹配,適用于簡單分類場景(如訂單狀態分類)。
2. SQL92 屬性過濾
- 原理:
基于消息的 自定義屬性(Key-Value),通過 SQL 表達式進行復雜條件過濾(如數值比較、邏輯運算)。需開啟 Broker 的enablePropertyFilter=true
。 - 使用方式:
- 生產者:為消息添加自定義屬性。
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes()); msg.putUserProperty("a", "10"); msg.putUserProperty("b", "5");
- 消費者:訂閱時編寫 SQL 表達式。
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 5 AND b = '5'"));
- 生產者:為消息添加自定義屬性。
- 特點:
- 靈活:支持復雜邏輯(如
>
,<
,BETWEEN
,IS NULL
等)。 - 性能損耗:相比 Tag 過濾略高,需評估表達式復雜度。
- 靈活:支持復雜邏輯(如
3. 類過濾(Class Filter)
- 原理:
允許用戶自定義 Java 類實現過濾邏輯,Broker 加載該類并調用其方法判斷消息是否投遞。適用于高度定制化的過濾需求。 - 使用方式:
- 實現接口:編寫類實現
org.apache.rocketmq.common.filter.MessageFilter
。public class CustomFilter implements MessageFilter {@Overridepublic boolean match(MessageExt msg, FilterContext context) {// 自定義過濾邏輯return msg.getUserProperty("region").equals("CN");} }
- 部署類:將編譯后的類文件上傳到 Broker 指定路徑(需配置
filterSupportRetry=true
)。 - 消費者訂閱:指定過濾類名。
consumer.subscribe("TopicTest", MessageSelector.byFilterClass("com.example.CustomFilter"));
- 實現接口:編寫類實現
- 特點:
- 高度靈活:可編寫任意復雜邏輯(如結合外部配置或數據庫)。
- 維護成本高:需管理類的版本和部署,適合有特殊需求的場景。
對比與選型建議
機制 | 性能 | 靈活性 | 適用場景 |
---|---|---|---|
Tag 過濾 | 高 | 低(精確匹配) | 簡單分類(如訂單狀態、日志類型) |
SQL92 | 中 | 高 | 復雜屬性條件(如價格范圍、地域) |
類過濾 | 低 | 極高(自定義) | 特殊邏輯(需動態規則或外部查詢) |
注意事項
- Broker 配置:SQL 和類過濾需 Broker 開啟支持(
enablePropertyFilter
或filterSupportRetry
)。 - 版本兼容性:SQL92 過濾需 RocketMQ 4.3.0+,類過濾需 4.6.0+。
- 生產環境慎用類過濾:頻繁更新過濾類可能導致服務中斷,建議優先使用 Tag 或 SQL 過濾。
通過合理選擇過濾機制,可以顯著提升消息系統的效率和可維護性。