消息队列 RocketMQ 消息重复消费问题(原因及解决)

目录

1.出现重复消费的原因

2.解决

2.1 数据库插入法

2.2 使用布隆过滤器

2.2.1 添加hutool的依赖

2.2.2 测试生产者

2.2.2 测试消费者


1.出现重复消费的原因

  1. BROADCASTING(广播) 模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。
  2. CLUSTERING(负载均衡)模式下,如果一个 topic 被多个 consumerGroup 消费,也会重复消费。
  3. 即使是在 CLUSTERING 模式下,同一个 consumerGroup 下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡重平衡 reBalance(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的 offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把 offset 提交给 broker,那么新的消费者可能会重新消费一次。虽然 orderly 模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起 concurrently 要严格了,但是加锁的线程和提交offset 的线程不是同一个,所以还是会出现极端情况下的重复消费。
  4. 还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。

简单的说:

  1. Consumer 消费完消息并不是实时同步到 Broker 的,而是将 offset 先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致 offset 没提交,下次没提交 offset 的这部分消息会被再次消费
  2. 即使 offset 被提交到了 Broker,在还没来得及持久化的时候 Broker 宕机了,当重启的时候 Broker 会读取consumerOffset.json 中保存的 offset 信息,这就会导致没持久化 offset 的这部分消息会被再次消费

那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是 msgId 也可以是你自定义的唯一的 key,这样就可以去重了

2.解决

我们需要给我们的消费者实现 幂等 ,也就是对同一个消息的处理结果,执行多少次都不变。

幂等性:多次操作产生的影响均和第一次操作产生的影响相同

例如:判断 crud 的幂等性

a. 新增:普通的新增是非幂等,设置了唯一索引的新增是幂等操作

b. 修改:update goods set stock = 10 where id = 1 幂等

               update goods set stock = stock - 1 where id = 1 非幂等

c. 查询:幂等

d. 删除:幂等

那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。你可以使用写入 Redis 来保证,因为Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。

2.1 数据库插入法

发送方需要给消息带一个唯一标记(根据业务标识)

模拟业务 数据库的订单操作日志表结构(去重表)

给订单号添加唯一索引(订单号存的是 key)

模拟业务,生产者发送了重复的消息

@Test
public void repeatTest() throws Exception {
String key = UUID.randomUUID().toString();
Message<String> msg = MessageBuilder.withPayload("扣减库存 -1").setHeader(RocketMQHeaders.KEYS, key).build();
rocketMQTemplate.syncSend("repeatTopic", msg);
rocketMQTemplate.syncSend("repeatTopic", msg);
}

消费者

@Component
@RocketMQMessageListener(topic = "repeatTopic",consumerGroup = "repeat-consumer-group")
public class RepeatListener implements RocketMQListener<MessageExt> {@Autowiredprivate LogMapper logMapper;@Overridepublic void onMessage(MessageExt messageExt) {// 先拿keyString keys = messageExt.getKeys();// 插入数据库 因为key做了唯一索引OrderOperLog orderOperLog = new OrderOperLog();orderOperLog.setType(1l);orderOperLog.setOrderSn(keys);orderOperLog.setUserId("1003");int insert = logMapper.insert(orderOperLog);System.out.println(keys);System.out.println(new String(messageExt.getBody()));}
}

在消费第二条的时候抛出唯一索引重复 SQLIntegrityConstraintViolationException

数据库只插入一条这样的记录

优化,捕获到异常是 SQLIntegrityConstraintViolationException 时直接将消息签收了,不再进行业务处理,因为之前已经消费了一条同样的消息,这样便可以解决重复消费问题

2.2 使用布隆过滤器

  • 使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。
  • 想法很好,但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?

我们可以选择布隆过滤器(BloomFilter)

介绍:

布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

2.2.1 添加hutool的依赖

<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.11</version>
</dependency>

2.2.2 测试生产者

public void testRepeatProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
// 我们可以使用自定义key当做唯一标识
String keyId = UUID.randomUUID().toString();
System.out.println(keyId);
Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}

发送了两条相同的消息

55d397c9-814f-4931-b0fd-7e142c04759b
SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C359, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C43F, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=2], queueOffset=0]

2.2.2 测试消费者

/*** 在boot项目中可以使用@Bean在整个容器中放置一个单利对象*/
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100); // m数组长度@Test
public void testRepeatConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTestTopic", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 拿到消息的keyMessageExt messageExt = msgs.get(0);String keys = messageExt.getKeys();// 判断是否存在布隆过滤器中if (bloomFilter.contains(keys)) {// 直接返回了 不往下处理业务return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 这个处理业务,然后放入过滤器中// do sth...bloomFilter.add(keys);System.out.println("keys:" + keys);System.out.println(new String(messageExt.getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();
System.in.read();
}

业务只处理了一条

keys:55d397c9-814f-4931-b0fd-7e142c04759b
库存-1

延迟过了后 重复消息被签收

解决重复消费问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/114314.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu20.04下安装nc

前言 nc在网络渗透测试中非常好用&#xff0c;这里的主要记一下Ubuntu20.04中nc的安装 编译安装 第一种方式是自己编译安装&#xff0c;先下载安装包 nc.zip wget http://sourceforge.net/projects/netcat/files/netcat/0.7.1/netcat-0.7.1.tar.gz/download -O netcat-0.7.…

当你学会这项python数据提取神器时,请做好升职准备!

一、什么是 jsonpath ● JsonPath 是一种信息抽取类库&#xff0c;是从 JSON 文档中抽取指定信息的工具&#xff0c;提供多种语言实现版本&#xff0c;包括&#xff1a;JavaScript、Python、PHP 和 Java。 二、特点 ● 只能提取 JSON 格式的数据 ● 提取后的数据类型与原数据…

2023秋招笔试算法Python3题解

诸神缄默不语-个人CSDN博文目录 签两方了&#xff0c;感觉秋招已经结束了&#xff0c;所以发布一下之前写的笔试编程题题解。 不全。可能有些题我会继续补。 不保证能过。 后续依然有可能继续刷算法题&#xff0c;但是就另外专门写博文来解析了。 打码是因为原则上其实是不让公…

国密https访问

前言 现在的SSL的加密算法实际上主要是国际算法&#xff0c;包括JDK&#xff0c;Go等语言也仅支持国际算法加密&#xff08;毕竟是国外开源项目&#xff09;&#xff0c;hash。随着国密算法的普及&#xff0c;比如openssl就支持国密了&#xff0c;还要新版本的Linux内核也开始…

【Python第三方包】实现自动化(pyautogui包)

文章目录 前言一、如何安装pyautogui二、pyautogui鼠标操作2.1 鼠标移动2.2 鼠标点击2.3 拖动鼠标三、键盘操作3.1 按下和释放按键3.2 键盘输入四、截图和图像识别4.1 截图4.2 图像识别总结前言 自动化是现代计算机编程和软件开发中的一个重要概念。通过自动化,我们可以节省时…

解决因d3dx9_30.dll丢失程序无法运行,电脑缺失d3dx9_30.dll报错解决方案

我们的生活和工作都离不开电脑。然而&#xff0c;电脑作为一种复杂的工具&#xff0c;也会出现各种各样的问题。其中&#xff0c;丢失d3dx9_30.dll文件是一个常见的问题。d3dx9_30.dll是DirectX的动态链接库文件&#xff0c;如果丢失或损坏&#xff0c;可能会导致许多软件和游戏…

[论文笔记]GPT-1

引言 今天带来论文Improving Language Understanding by Generative Pre-Training的笔记,它的中文题目为:通过生成式预训练改进语言理解。其实就是GPT的论文。 自然语言理解可以应用于大量NLP任务上,比如文本蕴含、问答、语义相似和文档分类。虽然无标签文本语料是丰富的,…

Js使用ffmpeg在视频中添加png或gif

Js使用ffmpeg在视频中添加png或gif ffmpeg 使用场景是需要在web端对视频进行编辑 添加图片和gif。 注意: 以下所有的使用案例均基于vue3 setup。 同时由于ffmpeg版本不同会导致使用的api不同&#xff0c;使用案例前需要注意ffmpeg版本问题。 如果使用的是0.12需要使用新的…

通讯录和内存动态管理

目录 (通讯录)动态增长版 实现效果 找单身狗 题目 源码 思路 三个内存函数的模拟实现 模拟实现strncpy 模拟实现strncat 模拟实现atoi (通讯录)动态增长版 该版本通讯录在原版的基础上增加了检查容量函数&#xff0c;实现了通讯录的动态…

Linux中的shell编程

shell编程 重定向 cat >temp 输入内容到temp文件中&#xff0c;如果存在temp则覆盖&#xff0c;没有则新建 cat >>temp 追加内容 cat temp1>>temp2 将temp1中的内容追加到temp 命令执行控制符号 ; 一个命令行执行多条语句 命令替换符 1.双引号&#…

插槽的基本使用和作用域插槽

1.编译作用域 父级模板里的所有内容都是在父级作用域中编译的&#xff1b;子模板里的所有内容都是在子作用域中编译的。 即父子组件只能使用各自作用域的数据 2.插槽的后备内容&#xff08;slot中默认配置内容&#xff09; 可以在slot中提前设置一段内容作为默认值&#xf…

数据结构--线性表回顾

目录 线性表 1.定义 2.线性表的基本操作 3.顺序表的定义 3.1顺序表的实现--静态分配 3.2顺序表的实现--动态分配 4顺序表的插入、删除 4.1插入操作的时间复杂度 4.2顺序表的删除操作-时间复杂度 5 顺序表的查找 5.1按位查找 5.2 动态分配的方式 5.3按位查找的时间…

Spark简介

文章目录 一、简介二、安装1、简介2、本地部署(Local模式)2.1 安装2.2 官方WordCount实例 3、Standlong模式3.1 简介2.2 安装集群2.3 官方测试案例 4、Yarn模式3.1 安装3.2 配置历史服务器3.3 配置查看历史日志 5、Mesos模式6、几种模式对比7、常用端口 三、Yarn模式详解1、简介…

3. 无重复字符的最长子串 --力扣 --JAVA

题目 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。 解题思路 遍历字符串&#xff0c;遇到重复字符计算进行下一个子串的统计&#xff1b;从发生重复的字符第一次出现的位置进行继续循环&#xff1b;输出最长子串的长度。 代码展示 class S…

sql语句数据库查询:如果当前元素已经使用过,下拉框不显示该元素该如何查询?

写宿舍管理系统&#xff0c;做到宿管和楼栋关系时&#xff0c;新增一个宿管&#xff0c;一个宿管管理一栋楼&#xff0c;如果当前楼栋已选择&#xff0c;那么就不能再选&#xff0c;如图所示&#xff1a; 最开始使用的是&#xff1a; SELECT DISTINCT b.building_num,b.TYPE,b…

项目二开笔记-萤火商城https://www.yiovo.com/doc

很久没写过php代码了&#xff0c;二开过程中笔记如下 注意事项 打开APP_DEBUG 关于建表 .在store进行开发&#xff0c;新建表的时候需要加上store_id字段 增加页面 前端页面 增加新的菜单&#xff0c;需要在router.config.js中增加对应的配置增加新的页面&#xff0c;需要…

【Python】图像和办公文档的处理

图像和办公文档处理 用程序来处理图像和办公文档经常出现在实际开发中&#xff0c;Python的标准库中虽然没有直接支持这些操作的模块&#xff0c;但我们可以通过Python生态圈中的第三方模块来完成这些操作。 操作图像 计算机图像相关知识 颜色。如果你有使用颜料画画的经历&…

【计算机毕设选题推荐】口腔助手小程序SpringBoot+Vue+小程序

前言&#xff1a;我是IT源码社&#xff0c;从事计算机开发行业数年&#xff0c;专注Java领域&#xff0c;专业提供程序设计开发、源码分享、技术指导讲解、定制和毕业设计服务 项目名 基于SpringBoot的口腔助手小程序 技术栈 SpringBootVue小程序MySQLMaven 文章目录 一、口腔…

《机器学习分类器 二》——朴素的贝叶斯算法,项目实践,算法实践。

1,朴素贝叶斯算法的介绍 1. 朴素贝叶斯算法定义 朴素贝叶斯算法是基于概率统计的分类方法。它的核心思想是利用贝叶斯定理来估计在给定特征的条件下某个类别的概率&#xff0c;然后选择具有最高概率的类别作为预测结果。在分类问题中&#xff0c;我们通常有一个数据集&#x…

nonaDlA 逻辑分析仪 使用记录

注意事项&#xff0c;很灵敏&#xff0c;不要用手碰&#xff0c;产生误触发 安装软件 github地址 官方提供的淘宝地址与使用说明 1.安装 1.安装程序 &#xff1a;下载githubDLA源码&#xff0c;打开 software\PulseView.exe安装 2.安装驱动&#xff1a;安装完第一步后&a…