主页 > 手机  > 

011rocketmq过滤消息

011rocketmq过滤消息

文章目录 过滤消息TAG模式过滤FilterByTagProducer.javaFilterByTagConsumer.java SQL表达式过滤FilterBySQLProducer.javaFilterBySQLConsumer.java 类过滤模式(基于4.2.0版本)

过滤消息

消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤⼜分为TAG和SQL92模式

TAG模式过滤

发送消息时我们会为每⼀条消息设置TAG标签,同⼀⼤类中的消息放在⼀个主题TOPIC下,但是如果 进⾏分类我们则可以根据TAG进⾏分类,每⼀类消费者可能不是关系某个主题下的所有消息,我们就可 以通过TAG进⾏过滤,订阅关注的某⼀类数据。

FilterByTagProducer.java package com.example.rocketmq.demo.filter; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq mon.message.Message; import org.apache.rocketmq.remoting mon.RemotingHelper; //通过TAG 实现 过滤消息 public class FilterByTagProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); String[] tags = {"TAGA","TAGB","TAGC"}; for (int i = 0; i < 10; i++) { String tag = tags[i%tags.length]; //每个消息设置一个tag,tag 二级分类 Message msg = new Message("TopicTest",tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } } FilterByTagConsumer.java package com.example.rocketmq.demo.filter; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq mon.message.MessageExt; import java.util.List; public class FilterByTagConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Specify name server addresses. consumer.setNamesrvAddr("localhost:9876"); //订阅Topic,只订阅标签为A或B的消息 consumer.subscribe("TopicTest", "TAGA || TAGB"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); } } SQL表达式过滤

SQL92表达式消息过滤,是通过消息的属性运⾏SQL过滤表达式进⾏条件匹配,消息发送时需要设置⽤户的属性putUserProperty⽅法设置属性。 支持的语法:

数值⽐较, 如 > , >= , < , <= , BETWEEN , = ;字符⽐较, 如 = , <> , IN ;IS NULL or IS NOT NULL ;逻辑连接符 AND , OR , NOT ;

支持的类型:

数值型, 如123, 3.1415;字符型, 如 ‘abc’, 必须⽤单引号;NULL , 特殊常数;布尔值, TRUE or FALSE ; FilterBySQLProducer.java package com.example.rocketmq.demo.filter; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq mon.message.Message; import org.apache.rocketmq.remoting mon.RemotingHelper; public class FilterBySQLProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); producer.start(); String[] tags = {"TagA","TagB","TagC","TagD"}; for (int i = 0; i < 10; i++) { try { String tag = tags[i % tags.length]; //构建消息 Message msg = new Message("TopicTest" /* Topic */, tag /* Tag */, ("RocketMQ消息测试,消息的TAG="+tag+ ", 属性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //每个消息设置属性为age,age值为0-9 msg.putUserProperty("age", i+""); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); // Thread.sleep(1000); } } //Shut down once the producer instance is not longer in use. producer.shutdown(); } } FilterBySQLConsumer.java package com.example.rocketmq.demo.filter; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq mon.message.MessageExt; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; public class FilterBySQLConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Specify name server addresses. consumer.setNamesrvAddr("localhost:9876"); //订阅Topic consumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6")); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer===启动成功!"); } } 类过滤模式(基于4.2.0版本)

RocketMQ通过定义消息过滤类的接⼝实现消息过滤

标签:

011rocketmq过滤消息由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“011rocketmq过滤消息