消息长度_填坑笔记:RocketMQ消息订阅失败问题?

前语:不要为了读文章而读文章,一定要带着问题来读文章,勤思考。

5b1f76d6429fca6df7474092347c3ef0.png

作者:kinnylee   来源:http://1t.click/g26

# 背景介绍

项目组使用阿里RocketMQ,对同一个消费组设置不同的tag订阅关系,出现消息丢失的问题,本文从rocketmq源码研究消息发布与订阅原理,并分析导致该问题的原因。

# 官方说明

  • 告诉使用者:同一个消费组,必须保持订阅关系一致

  • 为什么?它没有说!只能从源码找答案

    5a601bcc7dbb69a9fe0c86fdbd49c5f9.png

# 问题复现

  • 启动消费者1,消费组为group1,订阅topicA的消息,tag设置为tag1 || tag2

  • 启动消费者2,消费组也为group1,也订阅topicA的消息,但是tag设置为tag3

  • 启动生产者,生产者发送含有tag1,tag2,tag3的消息各10条

  • 消费者1没有收到任何消息,消费者2收到部分消息

# 结论

  • 同一个消费组中,设置不同tag时,后启动的消费者会覆盖先启动的消费者设置的tag

  • tag决定了消息过滤的条件,经过服务端和客户端两层过滤,最后只有后启动的消费者才能收到部分消息

# 原理说明

1、消息如何保存

CommitLog

  • 保存所有topic的原始消息

  • CommitLog分为多个文件,每个文件默认最大为1G

  • 每条记录包括:消息长度和消息文本(消息体,属性,uid等等)

  • 因每条消息长度不一致,每个commitLog的记录长度也不一致

ab0f95dadae9778f73502f6adce67d5e.png

ConsumerQueue

  • 保存某个Topic下某个Queue的索引信息

  • 每条记录包括:消息在commitLog中的offset,消息大小,消息tag的哈希值

  • 每条记录长度固定为20byte

  • producer发送消息后,先保存到commitLog,再异步建立该条消息对应的topic + queue对应的ConsumerQueue索引

  • 第三部分的Hash(tag)是服务端过滤消息的重要依据

dc242eb187c912692a1f566452056979.png

2、consumer如何订阅消息?

注册订阅信息

  • consumer订阅时,会将订阅信息注册到到服务端

  • 保存订阅信息的是Map类,key为topic,value主要是tag

  • subVersion取当前时间。

这里的key是topic,subVersion版本号,这两点很关键!后面有用到!

b7b7dd01e84346cd45f9f9f390aee6fd.png

拉取消息并过滤

  • 拉取消息时,首先从服务端获取订阅关系,得到tag的hash集合codeSet

  • 然后从ConsumerQueue获取一条记录,判断记录的hashCode是否在codeSet中,以达到消息过滤的目的,决定是否将该消息发送给consumer

  • 总之一句话:tag决定了消息是否发到客户端

3、消息过滤

服务端过滤

  • 过滤:tag的hash值过滤

  • 优点:

    • 减少不必要消息占用流量

  • 缺点:

    • Hash存在冲突,过滤不完全准确

ca15655467dd0d7172736efa72364640.png

客户端过滤

  • 服务端过滤存在不准确性,客户端再次精确过滤

  • 客户度过滤:tag的字符串值做对比。不相等的不返回给消费者

原因总结

  • 同一个consumer group的订阅关系,保存在RebalanceImpl类的Map中。key为topic

  • 不同的消费者启动后,依次注册订阅关系,因为tag不一样,导致Map中同一topic的tag被覆盖。比如:消费者1订阅tag1,消费者2订阅tag2。最后map中只保存tag2.

  • 过滤的核心是是tag,tag被更新,过滤条件被改变。服务端过滤后只返回tag2的消息

  • 客户端接收消息后,再次过滤。先启动的消费者1订阅tagA,但是服务端返回tag2,所以消费者1收不到任何消息。消费者2能收到一半的消息(集群模式,假设消息平均分配,另外一半分给tag2)

# 源码分析

1、订阅关系数据结构

3016b0c72b15688e7e8012794392865f.png

2、消费者1启动时注册的订阅关系

d79594d55b976f9435915c2c3fcbec31.png

3、消费者2后启动覆盖订阅关系

93cb0c2abfc9b243cd666f008f0c93c4.png

4、服务端过滤时取出ConsumerQueue的Hash(tag)

bf9d0918563f139712eca903b2968ae0.png

5、对比消息的Hash(tag)和之前保存的订阅关系

a11fb3933a03f3288b4c0b638d6c7da3.png

7、客户端过滤

01989a3c650b9fdc9f5c797690b4ccff.png

热文推荐

这份5G PPT这几天在我的朋友圈刷屏了。

作为一名Java程序员,你竟然不知道Intrumentation!

c97a46ad5adfd4dcfbc75ec8e44e641a.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/420287.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端学习(1340):mongoose验证规则

const mongoose require(mongoose); mongoose.connect(mongodb://localhost/playground, { useUnifiedTopology: true }).then(() > console.log(数据库连接成功)).catch(err > console.log(err, 数据库连接失败))//创建集合规则 const postSchema new mongoose.Schema…

深入分析C++引用

关于引用和指针的差别的文章非常多非常多,可是总是找不到他们的根本差别,偶然在codeproject上看到这篇文章,认为讲的挺好的, 所以翻译了下,希望对大家有帮助。 原文地址: http://www.codeproject.com/KB/cpp/Reference…

mysql8窗口函数

mysql window函数mysql 开窗函数demo&practice分组求和2. 和MySQL其它函数搭配mysql 开窗函数demo&practice 注意: mysql8 才有开窗函数, mysql5.7以下不要阅读。分组求和 create table sales (id int PRIMARY KEY auto_increment,city varchar(15),county varchar(1…

前端学习(1341):mongoose验证规则延伸

const mongoose require(mongoose); mongoose.connect(mongodb://localhost/playground, { useUnifiedTopology: true }).then(() > console.log(数据库连接成功)).catch(err > console.log(err, 数据库连接失败))//创建集合规则 const postSchema new mongoose.Schema…

艾创机器人_世界教育机器人大赛 2019赛季世界锦标赛落幕曲靖代表队获多个奖项...

来源:曲靖日报-曲靖新闻网本报讯12月14日,世界教育机器人大赛(WER)2019赛季世界锦标赛在上海隆重开赛。来自中国、美国、英国、日本、菲律宾、墨西哥等30多个国家和地区的1万余名选手在大赛中一决高低。刚刚结束的第六届曲靖市青少年机器人竞赛余温未散,从比赛中脱颖…

javascript 学习笔记

一: js数据类型 js有两类数据类型,1:原始类型;2对象类型。 原始类型包括5中:数字,字符串,布尔,nil,undifined。 nil和undefined分属不同的类型,而此两种类型比…

MySQL8权限,角色

角色创建角色给角色赋予权限删除权限给用户赋予角色激活角色撤销用户权限创建角色 mysql> create role boss; Query OK, 0 rows affected (0.01 sec)mysql> create role manager; Query OK, 0 rows affected (0.01 sec)给角色赋予权限 manager角色拥有查询sales表的权限…

前端学习(1342):mongoose验证规则拿到错误信息

const mongoose require(mongoose); mongoose.connect(mongodb://localhost/playground, { useUnifiedTopology: true }).then(() > console.log(数据库连接成功)).catch(err > console.log(err, 数据库连接失败))//创建集合规则 const postSchema new mongoose.Schema…

单片机人流统计装置的程序_单片机其实不难

对于大学读电子方面专业的同学们,肯定知道有这么一个神奇的元器件,它枯燥难懂,但也十分吸引人,它就是我们今天要讲的元器件--单片机单片机作为工业控制领域里面最核心的部件,它存在于每一台机器,小到扫地机…

数据库索引的作用和长处缺点

为什么要创建索引呢?这是由于,创建索引能够大大提高系统的性能。 第一,通过创建唯一性索引,能够保证数据库表中每一行数据的唯一性。 第二,能够大大加快 数据的检索速度,这也是创建索引的最基本的原因。 第…

MySQL索引篇

index存储引擎索引InnoDB中的索引MyISAM索引存储引擎 以前一直认为关系型数据库中的索引不重要,知道最近学了MySQL高级篇,才发现,对MySQL一知半解。都是听人泛泛而谈。 首先MySQL服务器是怎么存数据的,怎么取到的,内…

sublime加入input函数_【挑战自学Python编程】第八天:while循环以及input()函数

摘要01 while循环02 input函数03 终端04 使用while循环与input()函数01 while循环在正式讲Python中的while前,希望大家先关注单词一下while,翻译为中文意思是:当。(这里我们只需要这一种意思即可)下面我们开始看while循…

文件循环读取_一个案例轻松认识Python文件处理提取文件中的数字

1、文件打开使用 open() 函数打开文件。它需要两个参数,第一个参数是文件路径或文件名,第二个是文件的打开模式。模式通常是下面这样的:"r",以只读模式打开,你只能读取文件但不能编辑/删除文件的任何内容&qu…

B/S架构

程序架构 tomcat目录结构: bin:存放启动和关闭tomcat的脚本文件; conf:存放配置文件 lib:存放所需的jar文件 webapps:存放发布的web程序 work:存入tomcat工作时产生的文件 1.tomcat配置端口,当默认的8080端口被占用时修改8080端口的位置在tom…

前端学习(1344):用户的增删改查操作1

const http require(http); const mongoose require(mongoose);//数据库连接 mongoose.connect(mongodb://localhost/playground, { useUnifiedTopology: true }).then(() > console.log(数据库连接成功)).catch(() > console.log(数据库连接失败));const userSchema …

索引常用注意事项

索引1. 索引怎么建好?2. 索引容易失效的场景3. 连接查询索引优化4. order by,group by5. 覆盖索引6. 索引下推1. 索引怎么建好? 单表 主键必须唯一,且单调递增有唯一键的,尽量建立唯一键where条件用得比较多的字段查询…

EntityFramework的安装

关于EntityFramework在vs2012无法引用的问题 这段时间学习MVC,发现一个问题,我公司的电脑可以直接引用EntityFrameWork这个命名空间,但我家里面的电脑就不能直接引用,刚开始以为是我电脑配置问题,后重装电脑&#xff0…

毫米波雷达_最新的7个毫米波雷达应用案例

毫米波雷达传感器如何做到"全天候"?毫米波雷达使用的技术是毫米波(millimeterwave),通常缩写为MMW,波长为1~10毫米,频率为30~300GHz的电磁波。根据波的传播理论,频率越高,波长越短&am…

前端学习(1345):用户的增删改查操作2

//创建http连接 const http require(http); //创建服务器 const app http.createServer(); //第三方模块导入 const mongoose require(mongoose); //获取连接 const url require(url); //数据库连接地址 mongoose.connect(mongodb://localhost/playground, { useUnifiedTop…