011 rocketmq過濾消息

文章目錄

  • 過濾消息
    • TAG模式過濾
      • FilterByTagProducer.java
      • FilterByTagConsumer.java
    • SQL表達式過濾
      • FilterBySQLProducer.java
      • FilterBySQLConsumer.java
    • 類過濾模式(基于4.2.0版本)

過濾消息

消息過濾包括基于表達式過濾與基于類模式兩種過濾模式。其中表達式過濾?分為TAG和SQL92模式

TAG模式過濾

發送消息時我們會為每?條消息設置TAG標簽,同??類中的消息放在?個主題TOPIC下,但是如果
進?分類我們則可以根據TAG進?分類,每?類消費者可能不是關系某個主題下的所有消息,我們就可
以通過TAG進?過濾,訂閱關注的某?類數據。

FilterByTagProducer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;//通過TAG 實現 過濾消息
public class FilterByTagProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String[] tags = {"TAGA","TAGB","TAGC"};for (int i = 0; i < 10; i++) {String tag =   tags[i%tags.length];//每個消息設置一個tag,tag 二級分類Message msg = new Message("TopicTest",tag,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

FilterByTagConsumer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class FilterByTagConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");//訂閱Topic,只訂閱標簽為A或B的消息consumer.subscribe("TopicTest", "TAGA || TAGB");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

SQL表達式過濾

SQL92表達式消息過濾,是通過消息的屬性運?SQL過濾表達式進?條件匹配,消息發送時需要設置?戶的屬性putUserProperty?法設置屬性。
支持的語法:

  1. 數值?較, 如 > , >= , < , <= , BETWEEN , = ;
  2. 字符?較, 如 = , <> , IN ;
  3. IS NULL or IS NOT NULL ;
  4. 邏輯連接符 AND , OR , NOT ;

支持的類型:

  1. 數值型, 如123, 3.1415;
  2. 字符型, 如 ‘abc’, 必須?單引號;
  3. NULL , 特殊常數;
  4. 布爾值, TRUE or FALSE ;

FilterBySQLProducer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class FilterBySQLProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");producer.start();String[] tags = {"TagA","TagB","TagC","TagD"};for (int i = 0; i < 10; i++) {try {String tag = tags[i % tags.length];//構建消息Message msg = new Message("TopicTest" /* Topic */,tag /* Tag */,("RocketMQ消息測試,消息的TAG="+tag+  ", 屬性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//每個消息設置屬性為age,age值為0-9msg.putUserProperty("age", i+"");SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();
//                Thread.sleep(1000);}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

FilterBySQLConsumer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;public class FilterBySQLConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");//訂閱Topicconsumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer===啟動成功!");}
}

類過濾模式(基于4.2.0版本)

RocketMQ通過定義消息過濾類的接?實現消息過濾

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/72188.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/72188.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/72188.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【心得】一文梳理高頻面試題 HTTP 1.0/HTTP 1.1/HTTP 2.0/HTTP 3.0的區別并附加記憶方法

面試時很容易遇到的一個問題—— HTTP 1.0/HTTP 1.1/HTTP 2.0/HTTP 3.0的區別&#xff0c;其實這四個版本的發展實際上是一環扣一環的&#xff0c;是逐步完善的&#xff0c;本文希望幫助讀者梳理清楚各個版本之間的區別&#xff0c;并且給出當前各個版本的應用情況&#xff0c;…

大模型部署與調優:從基礎到高效優化全解析

大模型部署與調優&#xff1a;從基礎到高效優化全解析 1. 引言 隨著深度學習的快速發展&#xff0c;大模型&#xff08;Large Models&#xff09; 在自然語言處理&#xff08;NLP&#xff09;、計算機視覺&#xff08;CV&#xff09;、推薦系統等領域的應用日益廣泛。然而&am…

小紅書app復制短鏈,分享鏈接轉直接可訪問鏈接

簡介&#xff1a;小紅書手機app分享的鏈接需要點擊才能獲取完成鏈接&#xff0c;本文教大家如何通過代碼的方式將xhs的短連接轉化為長鏈接。 1.正常我們分享的鏈接是這樣的&#xff1a; 44 小豬吃宵夜發布了一篇小紅書筆記&#xff0c;快來看吧&#xff01; &#x1f606; KeA…

DeepSeek 助力 Vue3 開發:打造絲滑的彈性布局(Flexbox)

前言&#xff1a;哈嘍&#xff0c;大家好&#xff0c;今天給大家分享一篇文章&#xff01;并提供具體代碼幫助大家深入理解&#xff0c;徹底掌握&#xff01;創作不易&#xff0c;如果能幫助到大家或者給大家一些靈感和啟發&#xff0c;歡迎收藏關注哦 &#x1f495; 目錄 Deep…

DeepSeek開源周Day5壓軸登場:3FS與Smallpond,能否終結AI數據瓶頸之爭?

2025年2月28日&#xff0c;DeepSeek開源周迎來了第五天&#xff0c;也是本次活動的收官之日。自2月24日啟動以來&#xff0c;DeepSeek團隊以每天一個開源項目的節奏&#xff0c;陸續向全球開發者展示了他們在人工智能基礎設施領域的最新成果。今天&#xff0c;他們發布了Fire-F…

SQL AnyWhere 的備份與恢復

目錄 一、備份 二、恢復 1、自動恢復 2、映像恢復 3、日志恢復-指定時間點 4、日志恢復-指定偏移 5、完整的恢復流程 6、恢復最佳實踐 三、其他操作 1、dbtran 2、SQL Shell 工具 數據庫的安裝與基本使用內容請參考博客: SAP SQLAnyWhere 17 的安裝與基本使用_sql…

入門基礎項目(SpringBoot+Vue)

文章目錄 1. css布局相關2. JS3. Vue 腳手架搭建4. ElementUI4.1 引入ElementUI4.2 首頁4.2.1 整體框架4.2.2 Aside-logo4.2.3 Aside-菜單4.2.4 Header-左側4.2.5 Header-右側4.2.6 iconfont 自定義圖標4.2.7 完整代碼 4.3 封裝前后端交互工具 axios4.3.1 安裝 axios4.3.2 /src…

unity學習61:UI布局layout

目錄 1 布局 layout 1.1 先準備測試UI,新增這樣一組 panel 和 image 1.2 新增 vertical layout 1.3 現在移動任意一個image 都會影響其他 1.3.1 對比 如果沒有這個&#xff0c;就會是覆蓋效果了 1.3.2 對比 如果沒有這個&#xff0c;就會是覆蓋效果了 1.4 總結&#xf…

翻譯: 深入分析LLMs like ChatGPT 一

大家好&#xff0c;我想做這個視頻已經有一段時間了。這是一個全面但面向普通觀眾的介紹&#xff0c;介紹像ChatGPT這樣的大型語言模型。我希望通過這個視頻讓大家對這種工具的工作原理有一些概念性的理解。 首先&#xff0c;我們來談談你在這個文本框里輸入內容并點擊回車后背…

Ubuntu 下 nginx-1.24.0 源碼分析 - ngx_conf_add_dump

ngx_conf_add_dump 定義在src\core\ngx_conf_file.c static ngx_int_t ngx_conf_add_dump(ngx_conf_t *cf, ngx_str_t *filename) {off_t size;u_char *p;uint32_t hash;ngx_buf_t *buf;ngx_str_node_t *sn;ngx_conf_dump_t *cd;has…

Oracle 導出所有表索引的創建語句

在Oracle數據庫中&#xff0c;導出所有表的索引創建語句通常涉及到使用數據字典視圖來查詢索引的定義&#xff0c;然后生成對應的SQL語句。你可以通過查詢DBA_INDEXES或USER_INDEXES視圖&#xff08;取決于你的權限和需求&#xff09;來獲取這些信息。 使用DBA_INDEXES視圖 如…

快速搭建多語言網站的 FastAdmin 實踐

快速搭建多語言網站的 FastAdmin 實踐 引言 在全球化的背景下&#xff0c;越來越多的網站需要支持多種語言&#xff0c;以便滿足不同用戶的需求。FastAdmin 是一個基于 ThinkPHP 的快速后臺開發框架&#xff0c;提供了豐富的功能和靈活的擴展性&#xff0c;非常適合用于快速搭…

Python 實戰:構建分布式文件存儲系統全解析

Python 實戰&#xff1a;構建分布式文件存儲系統全解析 在當今數據爆炸的時代&#xff0c;分布式文件存儲系統憑借其高可擴展性、高可靠性等優勢&#xff0c;成為了數據存儲領域的熱門選擇。本文將詳細介紹如何使用 Python 構建一個簡單的分布式文件存儲系統。從系統架構設計&…

【綜合項目】api系統——基于Node.js、express、mysql等技術

目錄 0 前言 1 初始化 2 注冊登錄 2.1 注冊 2.1.1 功能&#xff1a;密碼加密&#xff08;2.3.3&#xff09; 2.1.1.1 操作 2.1.1.2 bcryptjs詳解 2.1.2 插入新用戶&#xff08;2.3.4&#xff09; 2.1.3 優化&#xff1a;表單數據驗證&#xff08;2.5&#xff09; …

tableau之標靶圖、甘特圖和瀑布圖

一、標靶圖 概念 標靶圖&#xff08;Bullet Chart&#xff09;是一種用于顯示數據與目標之間關系的可視化圖表&#xff0c;常用于業務和管理報告中。其設計旨在用來比較實際值與目標值&#xff0c;同時展示額外的上下文信息&#xff08;如趨勢&#xff09;。 作用 可視化目標…

Linux下的網絡通信編程

在不同主機之間&#xff0c;進行進程間的通信。 1解決主機之間硬件的互通 2.解決主機之間軟件的互通. 3.IP地址&#xff1a;來區分不同的主機&#xff08;軟件地址&#xff09; 4.MAC地址&#xff1a;硬件地址 5.端口號&#xff1a;區分同一主機上的不同應用進程 網絡協議…

網絡七層模型—OSI參考模型詳解

網絡七層模型&#xff1a;OSI參考模型詳解 引言 在網絡通信的世界中&#xff0c;OSI&#xff08;Open Systems Interconnection&#xff09;參考模型是一個基礎且核心的概念。它由國際標準化組織&#xff08;ISO&#xff09;于1984年提出&#xff0c;旨在為不同廠商的設備和應…

530 Login fail. A secure connection is requiered(such as ssl)-java發送QQ郵箱(簡單配置)

由于cs的csdN許多文章關于這方面的都是vip文章&#xff0c;而本文是免費的&#xff0c;希望廣大網友覺得有幫助的可以多點贊和關注&#xff01; QQ郵箱授權碼到這里去開啟 授權碼是16位的字母&#xff0c;填入下面的mail.setting里面的pass里面 # 郵件服務器的SMTP地址 host…

Sqlserver安全篇之_TLS的證書概念

證書的理解 參考Sqlserver的官方文檔https://learn.microsoft.com/zh-cn/sql/database-engine/configure-windows/certificate-overview?viewsql-server-ver16 TLS(Transport Layer Security)傳輸層安全和SSL(Secure Sockets Layer)安全套接字層協議位于應用程序協議層和TCP/…

【SQL】掌握SQL查詢技巧:數據分組與排序

目錄 1. GROUP BY 1.1 定義與用途1.2 示例說明1.3 注意事項1.4 可視化示例 2. ORDER BY 2.1 定義與用途2.2 升序說明&#xff08;默認&#xff09;2.3 降序排序2.4 多列排序2.5 可視化示例 3. GROUP BY 與 ORDER BY 的結合使用4. 可視化示例總結 在數據庫管理中&#xff0c;S…