011 rocketmq过滤消息

文章目录

  • 过滤消息
    • TAG模式过滤
      • FilterByTagProducer.java
      • FilterByTagConsumer.java
    • SQL表达式过滤
      • FilterBySQLProducer.java
      • FilterBySQLConsumer.java
    • 类过滤模式(基于4.2.0版本)

过滤消息

消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤⼜分为TAG和SQL92模式

TAG模式过滤

发送消息时我们会为每⼀条消息设置TAG标签,同⼀⼤类中的消息放在⼀个主题TOPIC下,但是如果
进⾏分类我们则可以根据TAG进⾏分类,每⼀类消费者可能不是关系某个主题下的所有消息,我们就可
以通过TAG进⾏过滤,订阅关注的某⼀类数据。

FilterByTagProducer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;//通过TAG 实现 过滤消息
public class FilterByTagProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String[] tags = {"TAGA","TAGB","TAGC"};for (int i = 0; i < 10; i++) {String tag =   tags[i%tags.length];//每个消息设置一个tag,tag 二级分类Message msg = new Message("TopicTest",tag,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

FilterByTagConsumer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class FilterByTagConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");//订阅Topic,只订阅标签为A或B的消息consumer.subscribe("TopicTest", "TAGA || TAGB");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

SQL表达式过滤

SQL92表达式消息过滤,是通过消息的属性运⾏SQL过滤表达式进⾏条件匹配,消息发送时需要设置⽤户的属性putUserProperty⽅法设置属性。
支持的语法:

  1. 数值⽐较, 如 > , >= , < , <= , BETWEEN , = ;
  2. 字符⽐较, 如 = , <> , IN ;
  3. IS NULL or IS NOT NULL ;
  4. 逻辑连接符 AND , OR , NOT ;

支持的类型:

  1. 数值型, 如123, 3.1415;
  2. 字符型, 如 ‘abc’, 必须⽤单引号;
  3. NULL , 特殊常数;
  4. 布尔值, TRUE or FALSE ;

FilterBySQLProducer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class FilterBySQLProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");producer.start();String[] tags = {"TagA","TagB","TagC","TagD"};for (int i = 0; i < 10; i++) {try {String tag = tags[i % tags.length];//构建消息Message msg = new Message("TopicTest" /* Topic */,tag /* Tag */,("RocketMQ消息测试,消息的TAG="+tag+  ", 属性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//每个消息设置属性为age,age值为0-9msg.putUserProperty("age", i+"");SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();
//                Thread.sleep(1000);}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

FilterBySQLConsumer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;public class FilterBySQLConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");//订阅Topicconsumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer===启动成功!");}
}

类过滤模式(基于4.2.0版本)

RocketMQ通过定义消息过滤类的接⼝实现消息过滤

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/72188.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【心得】一文梳理高频面试题 HTTP 1.0/HTTP 1.1/HTTP 2.0/HTTP 3.0的区别并附加记忆方法

面试时很容易遇到的一个问题—— HTTP 1.0/HTTP 1.1/HTTP 2.0/HTTP 3.0的区别&#xff0c;其实这四个版本的发展实际上是一环扣一环的&#xff0c;是逐步完善的&#xff0c;本文希望帮助读者梳理清楚各个版本之间的区别&#xff0c;并且给出当前各个版本的应用情况&#xff0c;…

大模型部署与调优:从基础到高效优化全解析

大模型部署与调优&#xff1a;从基础到高效优化全解析 1. 引言 随着深度学习的快速发展&#xff0c;大模型&#xff08;Large Models&#xff09; 在自然语言处理&#xff08;NLP&#xff09;、计算机视觉&#xff08;CV&#xff09;、推荐系统等领域的应用日益广泛。然而&am…

小红书app复制短链,分享链接转直接可访问链接

简介&#xff1a;小红书手机app分享的链接需要点击才能获取完成链接&#xff0c;本文教大家如何通过代码的方式将xhs的短连接转化为长链接。 1.正常我们分享的链接是这样的&#xff1a; 44 小猪吃宵夜发布了一篇小红书笔记&#xff0c;快来看吧&#xff01; &#x1f606; KeA…

DeepSeek 助力 Vue3 开发:打造丝滑的弹性布局(Flexbox)

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 Deep…

DeepSeek开源周Day5压轴登场:3FS与Smallpond,能否终结AI数据瓶颈之争?

2025年2月28日&#xff0c;DeepSeek开源周迎来了第五天&#xff0c;也是本次活动的收官之日。自2月24日启动以来&#xff0c;DeepSeek团队以每天一个开源项目的节奏&#xff0c;陆续向全球开发者展示了他们在人工智能基础设施领域的最新成果。今天&#xff0c;他们发布了Fire-F…

SQL AnyWhere 的备份与恢复

目录 一、备份 二、恢复 1、自动恢复 2、映像恢复 3、日志恢复-指定时间点 4、日志恢复-指定偏移 5、完整的恢复流程 6、恢复最佳实践 三、其他操作 1、dbtran 2、SQL Shell 工具 数据库的安装与基本使用内容请参考博客: SAP SQLAnyWhere 17 的安装与基本使用_sql…

入门基础项目(SpringBoot+Vue)

文章目录 1. css布局相关2. JS3. Vue 脚手架搭建4. ElementUI4.1 引入ElementUI4.2 首页4.2.1 整体框架4.2.2 Aside-logo4.2.3 Aside-菜单4.2.4 Header-左侧4.2.5 Header-右侧4.2.6 iconfont 自定义图标4.2.7 完整代码 4.3 封装前后端交互工具 axios4.3.1 安装 axios4.3.2 /src…

unity学习61:UI布局layout

目录 1 布局 layout 1.1 先准备测试UI,新增这样一组 panel 和 image 1.2 新增 vertical layout 1.3 现在移动任意一个image 都会影响其他 1.3.1 对比 如果没有这个&#xff0c;就会是覆盖效果了 1.3.2 对比 如果没有这个&#xff0c;就会是覆盖效果了 1.4 总结&#xf…

翻译: 深入分析LLMs like ChatGPT 一

大家好&#xff0c;我想做这个视频已经有一段时间了。这是一个全面但面向普通观众的介绍&#xff0c;介绍像ChatGPT这样的大型语言模型。我希望通过这个视频让大家对这种工具的工作原理有一些概念性的理解。 首先&#xff0c;我们来谈谈你在这个文本框里输入内容并点击回车后背…

Ubuntu 下 nginx-1.24.0 源码分析 - ngx_conf_add_dump

ngx_conf_add_dump 定义在src\core\ngx_conf_file.c static ngx_int_t ngx_conf_add_dump(ngx_conf_t *cf, ngx_str_t *filename) {off_t size;u_char *p;uint32_t hash;ngx_buf_t *buf;ngx_str_node_t *sn;ngx_conf_dump_t *cd;has…

Oracle 导出所有表索引的创建语句

在Oracle数据库中&#xff0c;导出所有表的索引创建语句通常涉及到使用数据字典视图来查询索引的定义&#xff0c;然后生成对应的SQL语句。你可以通过查询DBA_INDEXES或USER_INDEXES视图&#xff08;取决于你的权限和需求&#xff09;来获取这些信息。 使用DBA_INDEXES视图 如…

快速搭建多语言网站的 FastAdmin 实践

快速搭建多语言网站的 FastAdmin 实践 引言 在全球化的背景下&#xff0c;越来越多的网站需要支持多种语言&#xff0c;以便满足不同用户的需求。FastAdmin 是一个基于 ThinkPHP 的快速后台开发框架&#xff0c;提供了丰富的功能和灵活的扩展性&#xff0c;非常适合用于快速搭…

Python 实战:构建分布式文件存储系统全解析

Python 实战&#xff1a;构建分布式文件存储系统全解析 在当今数据爆炸的时代&#xff0c;分布式文件存储系统凭借其高可扩展性、高可靠性等优势&#xff0c;成为了数据存储领域的热门选择。本文将详细介绍如何使用 Python 构建一个简单的分布式文件存储系统。从系统架构设计&…

【综合项目】api系统——基于Node.js、express、mysql等技术

目录 0 前言 1 初始化 2 注册登录 2.1 注册 2.1.1 功能&#xff1a;密码加密&#xff08;2.3.3&#xff09; 2.1.1.1 操作 2.1.1.2 bcryptjs详解 2.1.2 插入新用户&#xff08;2.3.4&#xff09; 2.1.3 优化&#xff1a;表单数据验证&#xff08;2.5&#xff09; …

tableau之标靶图、甘特图和瀑布图

一、标靶图 概念 标靶图&#xff08;Bullet Chart&#xff09;是一种用于显示数据与目标之间关系的可视化图表&#xff0c;常用于业务和管理报告中。其设计旨在用来比较实际值与目标值&#xff0c;同时展示额外的上下文信息&#xff08;如趋势&#xff09;。 作用 可视化目标…

Linux下的网络通信编程

在不同主机之间&#xff0c;进行进程间的通信。 1解决主机之间硬件的互通 2.解决主机之间软件的互通. 3.IP地址&#xff1a;来区分不同的主机&#xff08;软件地址&#xff09; 4.MAC地址&#xff1a;硬件地址 5.端口号&#xff1a;区分同一主机上的不同应用进程 网络协议…

网络七层模型—OSI参考模型详解

网络七层模型&#xff1a;OSI参考模型详解 引言 在网络通信的世界中&#xff0c;OSI&#xff08;Open Systems Interconnection&#xff09;参考模型是一个基础且核心的概念。它由国际标准化组织&#xff08;ISO&#xff09;于1984年提出&#xff0c;旨在为不同厂商的设备和应…

530 Login fail. A secure connection is requiered(such as ssl)-java发送QQ邮箱(简单配置)

由于cs的csdN许多文章关于这方面的都是vip文章&#xff0c;而本文是免费的&#xff0c;希望广大网友觉得有帮助的可以多点赞和关注&#xff01; QQ邮箱授权码到这里去开启 授权码是16位的字母&#xff0c;填入下面的mail.setting里面的pass里面 # 邮件服务器的SMTP地址 host…

Sqlserver安全篇之_TLS的证书概念

证书的理解 参考Sqlserver的官方文档https://learn.microsoft.com/zh-cn/sql/database-engine/configure-windows/certificate-overview?viewsql-server-ver16 TLS(Transport Layer Security)传输层安全和SSL(Secure Sockets Layer)安全套接字层协议位于应用程序协议层和TCP/…

【SQL】掌握SQL查询技巧:数据分组与排序

目录 1. GROUP BY 1.1 定义与用途1.2 示例说明1.3 注意事项1.4 可视化示例 2. ORDER BY 2.1 定义与用途2.2 升序说明&#xff08;默认&#xff09;2.3 降序排序2.4 多列排序2.5 可视化示例 3. GROUP BY 与 ORDER BY 的结合使用4. 可视化示例总结 在数据库管理中&#xff0c;S…