RocketMQ源码 Broker-ConsumerFilterManager 消费者数据过滤管理组件源码分析

前言

ConsumerFilterManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件consumerFilter.json的能力。它主要负责,对在消费者拉取消息时,进行消息数据过滤,且只针对使用表达式过滤的消费者有效。


源码版本:4.9.3

源码架构图

核心数据结构

可以看到内存中维护了 topic -> consumer group -> ConsumerFilterData 映射关系的数据结构。

/*** Consumer filter data manager.Just manage the consumers use expression filter.* 消费者过滤数据管理组件。只管理使用表达式过滤的消费者。*/
public class ConsumerFilterManager extends ConfigManager {// 核心数据结构:topic -> consumer group -> ConsumerFilterDataprivate ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);private transient BrokerController brokerController;// 布隆过滤器private transient BloomFilter bloomFilter;
}

深入看下 FilterDataMapByTopic 类,是上面数据结构的一个子集,维护了 消费组 -> 消费组过滤数据映射关系。

    public static class FilterDataMapByTopic {// 核心数据结构:consumer group -> ConsumerFilterDataprivate ConcurrentMap<String/*consumer group*/, ConsumerFilterData>groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();private String topic;}

在深入一步,看下 ConsumerFilterData 的数据结构,包含了全部与消费者过滤有关的关键信息。

/*** Filter data of consumer.*/
public class ConsumerFilterData {// 消费组private String consumerGroup;// 主题private String topic;// 过滤器表达式private String expression;// 过滤器类型private String expressionType;// 过滤器编译后的表达式private transient Expression compiledExpression;// 过滤器创建时间private long bornTime;// 过滤器过期时间private long deadTime = 0;// 过滤器版本private long version;// 布隆过滤器数据private BloomFilterData bloomFilterData;// 客户端版本private long clientVersion;
}

核心数据行为

从下面代码可以看到,ConsumerFilterManager的行为主要是注册订阅、取消订阅、清理过期订阅、序列化、反序列化等维护内存元数据的行为。过滤行为不在这个组件里体现,在其他调用方法中会有具体使用方式。

/*** Consumer filter data manager.Just manage the consumers use expression filter.* 消费者过滤数据管理组件。只管理使用表达式过滤的消费者。*/
public class ConsumerFilterManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);private static final long MS_24_HOUR = 24 * 3600 * 1000;// 核心数据结构:topic -> consumer group -> ConsumerFilterDataprivate ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);private transient BrokerController brokerController;// 布隆过滤器private transient BloomFilter bloomFilter;public ConsumerFilterManager() {// just for testthis.bloomFilter = BloomFilter.createByFn(20, 64);}public ConsumerFilterManager(BrokerController brokerController) {this.brokerController = brokerController;this.bloomFilter = BloomFilter.createByFn(brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),brokerController.getBrokerConfig().getExpectConsumerNumUseFilter());// then set bit map length of store config.brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(this.bloomFilter.getM());}/*** Build consumer filter data.Be care, bloom filter data is not included.** @return maybe null*/public static ConsumerFilterData build(final String topic, final String consumerGroup,final String expression, final String type,final long clientVersion) {if (ExpressionType.isTagType(type)) {return null;}ConsumerFilterData consumerFilterData = new ConsumerFilterData();consumerFilterData.setTopic(topic);consumerFilterData.setConsumerGroup(consumerGroup);consumerFilterData.setBornTime(System.currentTimeMillis());consumerFilterData.setDeadTime(0);consumerFilterData.setExpression(expression);consumerFilterData.setExpressionType(type);consumerFilterData.setClientVersion(clientVersion);try {consumerFilterData.setCompiledExpression(FilterFactory.INSTANCE.get(type).compile(expression));} catch (Throwable e) {log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());return null;}return consumerFilterData;}/*** 在指定消费组注册消费者过滤数据* @param consumerGroup* @param subList*/public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {for (SubscriptionData subscriptionData : subList) {register(subscriptionData.getTopic(),consumerGroup,subscriptionData.getSubString(),subscriptionData.getExpressionType(),subscriptionData.getSubVersion());}// make illegal topic dead.Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();while (iterator.hasNext()) {ConsumerFilterData filterData = iterator.next();boolean exist = false;for (SubscriptionData subscriptionData : subList) {if (subscriptionData.getTopic().equals(filterData.getTopic())) {exist = true;break;}}if (!exist && !filterData.isDead()) {filterData.setDeadTime(System.currentTimeMillis());log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);}}}public boolean register(final String topic, final String consumerGroup, final String expression,final String type, final long clientVersion) {// 不支持tag类型if (ExpressionType.isTagType(type)) {return false;}if (expression == null || expression.length() == 0) {return false;}// 获取topic对应的消费者过滤数据FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);if (filterDataMapByTopic == null) {FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic = prev != null ? prev : temp;}// 创建布隆过滤器数据BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);// 注册过滤数据到topicreturn filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);}// 取消注册消费者过滤数据public void unRegister(final String consumerGroup) {for (Entry<String, FilterDataMapByTopic> entry : filterDataByTopic.entrySet()) {entry.getValue().unRegister(consumerGroup);}}public ConsumerFilterData get(final String topic, final String consumerGroup) {if (!this.filterDataByTopic.containsKey(topic)) {return null;}if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {return null;}return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup);}// 获取消费组下所有过滤数据public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) {Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>();Iterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator();while (topicIterator.hasNext()) {FilterDataMapByTopic filterDataMapByTopic = topicIterator.next();Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator();while (filterDataIterator.hasNext()) {ConsumerFilterData filterData = filterDataIterator.next();if (filterData.getConsumerGroup().equals(consumerGroup)) {ret.add(filterData);}}}return ret;}// 获取topic下所有过滤数据public final Collection<ConsumerFilterData> get(final String topic) {if (!this.filterDataByTopic.containsKey(topic)) {return null;}if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {return null;}return this.filterDataByTopic.get(topic).getGroupFilterData().values();}public BloomFilter getBloomFilter() {return bloomFilter;}@Overridepublic String encode() {return encode(false);}@Overridepublic String configFilePath() {if (this.brokerController != null) {// 配置存储路径 config/consumerFilter.jsonreturn BrokerPathConfigHelper.getConsumerFilterPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}return BrokerPathConfigHelper.getConsumerFilterPath("./unit_test");}// 将json字符串反序列化为ConsumerFilterManager对象@Overridepublic void decode(final String jsonString) {ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);if (load != null && load.filterDataByTopic != null) {boolean bloomChanged = false;for (Entry<String, FilterDataMapByTopic> entry : load.filterDataByTopic.entrySet()) {FilterDataMapByTopic dataMapByTopic = entry.getValue();if (dataMapByTopic == null) {continue;}for (Entry<String, ConsumerFilterData> groupEntry : dataMapByTopic.getGroupFilterData().entrySet()) {ConsumerFilterData filterData = groupEntry.getValue();if (filterData == null) {continue;}try {filterData.setCompiledExpression(FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression()));} catch (Exception e) {log.error("load filter data error, " + filterData, e);}// check whether bloom filter is changed// if changed, ignore the bit map calculated before.if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {bloomChanged = true;log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData());break;}log.info("load exist consumer filter data: {}", filterData);if (filterData.getDeadTime() == 0) {// we think all consumers are dead when loadlong deadTime = System.currentTimeMillis() - 30 * 1000;filterData.setDeadTime(deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime);}}}if (!bloomChanged) {this.filterDataByTopic = load.filterDataByTopic;}}}// 将ConsumerFilterManager对象序列化为json字符串@Overridepublic String encode(final boolean prettyFormat) {// clean{clean();}return RemotingSerializable.toJson(this, prettyFormat);}// 清理过期的过滤数据public void clean() {Iterator<Map.Entry<String, FilterDataMapByTopic>> topicIterator = this.filterDataByTopic.entrySet().iterator();while (topicIterator.hasNext()) {Map.Entry<String, FilterDataMapByTopic> filterDataMapByTopic = topicIterator.next();Iterator<Map.Entry<String, ConsumerFilterData>> filterDataIterator= filterDataMapByTopic.getValue().getGroupFilterData().entrySet().iterator();while (filterDataIterator.hasNext()) {Map.Entry<String, ConsumerFilterData> filterDataByGroup = filterDataIterator.next();ConsumerFilterData filterData = filterDataByGroup.getValue();if (filterData.howLongAfterDeath() >= (this.brokerController == null ? MS_24_HOUR : this.brokerController.getBrokerConfig().getFilterDataCleanTimeSpan())) {log.info("Remove filter consumer {}, died too long!", filterDataByGroup.getValue());filterDataIterator.remove();}}if (filterDataMapByTopic.getValue().getGroupFilterData().isEmpty()) {log.info("Topic has no consumer, remove it! {}", filterDataMapByTopic.getKey());topicIterator.remove();}}}public ConcurrentMap<String, FilterDataMapByTopic> getFilterDataByTopic() {return filterDataByTopic;}public void setFilterDataByTopic(final ConcurrentHashMap<String, FilterDataMapByTopic> filterDataByTopic) {this.filterDataByTopic = filterDataByTopic;}public static class FilterDataMapByTopic {// 核心数据结构:consumer group -> ConsumerFilterDataprivate ConcurrentMap<String/*consumer group*/, ConsumerFilterData>groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();private String topic;public FilterDataMapByTopic() {}public FilterDataMapByTopic(String topic) {this.topic = topic;}// 取消注册某个消费组的过滤器public void unRegister(String consumerGroup) {if (!this.groupFilterData.containsKey(consumerGroup)) {return;}ConsumerFilterData data = this.groupFilterData.get(consumerGroup);if (data == null || data.isDead()) {return;}long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);data.setDeadTime(now);}public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,long clientVersion) {ConsumerFilterData old = this.groupFilterData.get(consumerGroup);if (old == null) {// 构建过滤器数据ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {return false;}// 设置布隆过滤器consumerFilterData.setBloomFilterData(bloomFilterData);// 放入内存数据结构old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);if (old == null) {log.info("New consumer filter registered: {}", consumerFilterData);return true;} else {if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;}return false;} else {this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);return true;}}} else {// 当前版本号小于旧的版本号if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;}return false;}// 新版本号大于旧的版本号boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);if (old.getBloomFilterData() == null && bloomFilterData != null) {change = true;}if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {change = true;}// if subscribe data is changed, or consumer is died too long.if (change) {// 构建过滤器数据ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {// new expression compile error, remove old, let client report error.this.groupFilterData.remove(consumerGroup);return false;}consumerFilterData.setBloomFilterData(bloomFilterData);// 设置过滤器数据this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("Consumer filter info change, old: {}, new: {}, change: {}",old, consumerFilterData, change);return true;} else {// 版本号一致,更新过滤器数据old.setClientVersion(clientVersion);if (old.isDead()) {reAlive(old);}return true;}}}protected void reAlive(ConsumerFilterData filterData) {long oldDeadTime = filterData.getDeadTime();filterData.setDeadTime(0);log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);}public final ConsumerFilterData get(String consumerGroup) {return this.groupFilterData.get(consumerGroup);}public final ConcurrentMap<String, ConsumerFilterData> getGroupFilterData() {return this.groupFilterData;}public void setGroupFilterData(final ConcurrentHashMap<String, ConsumerFilterData> groupFilterData) {this.groupFilterData = groupFilterData;}public String getTopic() {return topic;}public void setTopic(final String topic) {this.topic = topic;}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/221624.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SPWM技术

一、基于数字正弦序列的数字SPWM 图1图中T0为正弦调制波周期&#xff1b;Ts为系统采样周期(即载波信号周期或逆变开关器件的开关频率周期)&#xff1b;N T0/Ts为载波比&#xff1b;Vdc为SPWM脉冲幅值&#xff1b;M≤1为脉宽调制比&#xff0c;其决定了信号最大脉冲宽度为MTs≤T…

信号继电器 DX-31B DC220V 电压型 带板前接线底座

系列型号 DX-31B信号继电器DX-31BJ信号继电器 DX-32A信号继电器DX-32AJ信号继电器 DX-32B信号继电器DX-32BJ信号继电器 DX-31A信号继电器DX-33/1信号继电器 DX-33/2信号继电器DX-33/3信号继电器 DX-33/4信号继电器DX-33/5信号继电器 一. 继电器用途 DX-30系列信号继电器…

Leetcode—1502.判断能否形成等差数列【简单】

2023每日刷题&#xff08;五十九&#xff09; Leetcode—1502.判断能否形成等差数列 实现代码 class Solution { public:bool canMakeArithmeticProgression(vector<int>& arr) {sort(arr.begin(), arr.end());int diff abs(arr[1] - arr[0]);for(int i 1; i <…

Linux汇编语言编程-汇编语言

术语 Figure 3-13. 8086 Computer (Partial Model) reg 代表寄存器。 它可以是表 3.13 中列出的任何寄存器。 imm 代表立即数【immediate】&#xff08;可以理解为字面量&#xff0c;常量&#xff09;。 术语“立即数【immediate】”用于指代直接由十进制或十六进制表示形式给…

基于SSM的药房药品采购集中管理系统的设计与实现论文

摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对药房药品采购信息管理混乱&#xff0c;出错率高&#xff0c;信息安全…

DevEco Studio 项目启动工程和Device Manage

DevEco Studio 项目启动工程和Device Manage 鸿蒙&#xff08;HarmonyOS&#xff09; 一、操作环境 操作系统: Windows 10 专业版 IDE:DevEco Studio 3.1 SDK:HarmonyOS 3.1 二、创建虚拟机&#xff08;Device Manage&#xff09; 鸿蒙IDE创建虚拟设备入口有2个地方&…

c⽂件操作

1.什么是⽂件&#xff1f; 磁盘上的⽂件是⽂件。但是在程序设计中&#xff0c;我们⼀般谈的⽂件有两种&#xff1a;程序⽂件、数据⽂件&#xff08;从⽂件功能的⻆度来分类的&#xff09;。 1.程序⽂件 程序⽂件包括源程序⽂件&#xff08;后缀为.c&#xff09;,⽬标⽂件&#…

操作系统笔记——概论、进程、线程(王道408)

文章目录 前言计算机系统概述OS的基本概念OS的发展历程OS的运行机制OS体系结构OS引导虚拟机 进程和线程进程和线程基础进程进程状态进程控制进程通信线程线程实现 CPU调度调度的层次进程调度细节调度算法评价指标批处理调度算法交互式调度方法 同步与互斥基本概念互斥互斥软件实…

shell实战-批量修改主机密码

1.编写执行脚本 vim host-pass.sh #!/bin/bash#配置旧的密码文件 cat >old_pass.txt <<EOF 10.36.192.182 root 123 22 10.36.192.184 root 123 22 EOF[ -f /etc/init.d/functions ] && . /etc/init.d/functions OLD_INFOold_pass.txt NEW_INFOnew_pass.txt…

设计模式(2)--对象创建(2)--生成器

1. 意图 将一个复杂对象的构建与它的表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。 2. 四种角色 指挥(Director)、抽象生成器(Builder)、具体生成器(Concrete Builder)、产品(Product) 3. 优点 3.1 可以改变一个产品的内部表示(通过定义新的生成器)。 3.2 将构…

云计算与AI融合:Amazon Connect开创客户服务智能时代

授权说明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 在亚马逊云科技 re:Invent 2023 大会上&#xff0c;Amazon Connect…

C/C++常见面试题(二)

接前面C/C常见面试题&#xff08;一&#xff09;&#xff0c;继续巩固 目录 1 sizeof和strlen的区别 2 宏定义的陷阱 3 不使用sizeof计算出类型或者变量所占的内存的字节数 4 给定一个数判断是否其是2的N次幂 5 C/C打印所在文件、行号、函数、日期&#xff0c;时间、遵循的…

力扣131. 分割回文串(java 回溯法)

Problem: 131. 分割回文串 文章目录 题目描述思路解题方法复杂度Code 题目描述 思路 题目要求我们给出所有的回文子字符串&#xff0c;而涉及到穷举我们可以利用回溯来解决&#xff0c;另外我们也可以发现问题中涉及到元素存在重复但不可复用的特性&#xff0c;因此我们可以类…

18 5G - NR物理层解决方案支持6G非地面网络中的高移动性

文章目录 非地面网络场景链路仿真参数实验仿真结果 非地面网络场景 链路仿真参数 实验仿真结果 Figure 5 && Figure 6&#xff1a;不同信噪比下的BER和吞吐量 变量 SISO 2x2MIMO 2x4MIMO 2x8MIMOReyleigh衰落、Rician衰落、多径TDL-A(NLOS) 、TDL-E(LOS)(a)QPSK (b)16…

【知识积累】深度度量学习综述

原文指路&#xff1a;https://hav4ik.github.io/articles/deep-metric-learning-survey Problem Setting of Supervised Metric Learning 深度度量学习是一组旨在衡量数据样本之间相似性的技术。 Contrastive Approaches 对比方法的主要思想是设计一个损失函数&#xff0c;直…

Leetcode 51 N 皇后

题意理解&#xff1a; N皇后问题指的是在一个nn的棋盘上&#xff0c;防止皇后棋子&#xff0c;每行、每列、每45斜角只能有一个皇后存在。 这是一道困难的题&#xff1a;困难在于&#xff1a; 如何处理棋盘&#xff0c;如何表示棋子。 将期盼看作是2维数组&#xff0c;一行一行…

关东升老师极简系列丛书(由清华大学出版社出版)

极简系列丛书&#xff0c;编程学习新体验 在这个科技日新月异的时代&#xff0c;编程已经成为了一种必备技能。但是面对各种复杂的编程语言&#xff0c;你是否也曾感到过迷茫和困惑&#xff1f;由清华大学出版社出版的“极简系列丛书”就是为了帮助你解决这个问题。 这套丛书…

解决nuxt3引入图片报错:ReferenceError: require is not defined

现象&#xff1a; 原因&#xff1a;在nuxt3中不支持require的方式引入图片/文件等静态资源。 解决办法&#xff1a; 1. 直接在img标签中的src属性里写明图片的路径&#xff0c;但是此时src前面不能有冒号做动态绑定&#xff01;&#xff1a; src"/assets/images/loading…

【为什么POI的SXSSFWorkbook占用内存更小?】

&#x1f513;为什么POI的SXSSFWorkbook占用内存更小&#xff1f; &#x1f3c6;POI的SXSSFWorkbook&#x1f3c6;POI的SXSSFWorkbook占用内存&#x1f3c6;扩展配置行缓存限制 &#x1f3c6;POI的SXSSFWorkbook SXSSFWorkbook类是Apache POI库的一部分&#xff0c;它是一个流…

【论文阅读】LoRA: Low-Rank Adaptation of Large Language Models

code&#xff1a;GitHub - microsoft/LoRA: Code for loralib, an implementation of "LoRA: Low-Rank Adaptation of Large Language Models" 做法&#xff1a; 把预训练LLMs里面的参数权重给冻结&#xff1b;向transformer架构中的每一层&#xff0c;注入可训练的…