文章目錄
- 過濾消息
- TAG模式過濾
- FilterByTagProducer.java
- FilterByTagConsumer.java
- SQL表達式過濾
- FilterBySQLProducer.java
- FilterBySQLConsumer.java
- 類過濾模式(基于4.2.0版本)
過濾消息
消息過濾包括基于表達式過濾與基于類模式兩種過濾模式。其中表達式過濾?分為TAG和SQL92模式
TAG模式過濾
發送消息時我們會為每?條消息設置TAG標簽,同??類中的消息放在?個主題TOPIC下,但是如果
進?分類我們則可以根據TAG進?分類,每?類消費者可能不是關系某個主題下的所有消息,我們就可
以通過TAG進?過濾,訂閱關注的某?類數據。
FilterByTagProducer.java
package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;//通過TAG 實現 過濾消息
public class FilterByTagProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String[] tags = {"TAGA","TAGB","TAGC"};for (int i = 0; i < 10; i++) {String tag = tags[i%tags.length];//每個消息設置一個tag,tag 二級分類Message msg = new Message("TopicTest",tag,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}
FilterByTagConsumer.java
package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class FilterByTagConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");//訂閱Topic,只訂閱標簽為A或B的消息consumer.subscribe("TopicTest", "TAGA || TAGB");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}
SQL表達式過濾
SQL92表達式消息過濾,是通過消息的屬性運?SQL過濾表達式進?條件匹配,消息發送時需要設置?戶的屬性putUserProperty?法設置屬性。
支持的語法:
- 數值?較, 如 > , >= , < , <= , BETWEEN , = ;
- 字符?較, 如 = , <> , IN ;
- IS NULL or IS NOT NULL ;
- 邏輯連接符 AND , OR , NOT ;
支持的類型:
- 數值型, 如123, 3.1415;
- 字符型, 如 ‘abc’, 必須?單引號;
- NULL , 特殊常數;
- 布爾值, TRUE or FALSE ;
FilterBySQLProducer.java
package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class FilterBySQLProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");producer.start();String[] tags = {"TagA","TagB","TagC","TagD"};for (int i = 0; i < 10; i++) {try {String tag = tags[i % tags.length];//構建消息Message msg = new Message("TopicTest" /* Topic */,tag /* Tag */,("RocketMQ消息測試,消息的TAG="+tag+ ", 屬性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//每個消息設置屬性為age,age值為0-9msg.putUserProperty("age", i+"");SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();
// Thread.sleep(1000);}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}
FilterBySQLConsumer.java
package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;public class FilterBySQLConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");//訂閱Topicconsumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer===啟動成功!");}
}
類過濾模式(基于4.2.0版本)
RocketMQ通過定義消息過濾類的接?實現消息過濾