使用消息队列怎样防止消息重复?

大家好,我是君哥。

使用消息队列时,我们经常会遇到一个可能对业务产生影响的问题,消息重复。在订单、扣款、对账等对幂等有要求的场景,消息重复的问题必须解决。

那怎样应对重复消息呢?今天来聊一聊这个话题。

1.三个语义

正确使用消息队列,我们会考虑到消息防丢失、防重复,我们介绍 3 个语义:

  • At Least Once:在消息队列中,指消息不丢失,一条消息最少被消费一次,但是可能会有重复消费。

  • Exactly Once:在消息队列中,消息被精准消费一次,不丢失,也不会重复;

  • At Most Once:在消息队列中,消息不会被重复消费,但是可能会有消息丢失

不同的消息场景,需要的语义不同。比如 Exactly Once 最难实现,一般需要引入事务消息。

不同使用场景,对语义的要求也不一样。比如日志收集类的场景,At Most Once 就可以满足,而支付类的场景则要求 Exactly Once。

2.消息重复

什么情况下会导致消息重复呢?

生产者发送消息后,Broker 保存成功,但是没有成功给生产者返回 ACK,生产者以为消息发送失败,重试,再次给 Broker 发送。Broker 保存了重复消息,导致 Consumer 多次消费。

图片

消费者消费消息后,给 Broker 返回 ACK 失败,导致 Broker 没有修改偏移量,同一条消息再次发送给消费者,或者被消费者拉取到。

图片

3.生产者防重

有的消息中间件是支持生产者幂等的。比如 Kafka 从 0.11.0 版本开始引入了幂等 Producer,可以使用下面代码开启幂等 Producer:

Properties props = new Properties();
//省略其他代码
//配置幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 
//创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Kafka 实现生产者幂等的原理是在生产者引入了 Producer ID(PID)和 Sequence Number 这两个参数。

  • PID:Producer 拥有的 ID,唯一标识一个 Producer。

  • Sequence Number:自增的数值,唯一标识同一个 Producer 发送到指定分区的消息 ID。

有了这两个参数,Broker 单分区就可以唯一标识一个生产者发送的唯一一条消息<PID,SequenceNumber>。Broker 收到消息时,如果检查到消息的<PID,SequenceNumber>已经存在,就不会再保留这条消息。

但幂等 Producer 只能在单分区下生效,多分区情况下是不生效的。因为多个分区之间并不能相互访问对方的<PID,SequenceNumber>。

图片

4.Broker 防重

Broker 如果可以防重,那对于生产者和消费者来说,节省了大量的工作。下面我们看下 Pulsar 是怎样防重的。

Broker 通过参数 BrokerDeduplicationEnabled 开启防重功能。对于 Producer 发送的重复消息,Broker 返回响应 -1:-1。

Producer 发送消息时,会带一个 sequenceId 字段,Broker 会按照 ProducerName 维度记录当前生产者最大的 sequenceId(highestSequenceId)。Broker 收到消息时,首先会判断消息中的 sequenceId 是否大于自己保存的当前生产者的 highestSequenceId,如果是则保存消息并更新 highestSequenceId,否则丢弃消息,并且给 Producer 返回 -1:-1。

下面是三个极端情况:

  1. Producer 断开连接:这种情况下,跟 Broker 重新建立连接后,本地保存的 sequenceId 还在,只要使用 sequenceId 递增后发送消息即可;

  2. Producer 宕机:Producer 重启后,缓存的 sequenceId 肯定不存在了,这时跟 Broker 重新建立连接后,Broker 会根据 ProducerName 找出 highestSequenceId 发给 Producer,Producer 使用这个 sequenceId 来发送消息;

  3. Producer 和 Broker 都宕机:Broker 重启后,可以从宕机前保存的快照中恢复各 Producer 对应的 highestSequenceId 发送给各 Producer。但这个 highestSequenceId 不一定准确,因为 Broker 宕机瞬间很有可能最新的 sequenceId 没有来得及保存快照。

需要注意的是,跟 Kafka 的幂等 Producer 类似,Pulsar 的 Broker 幂等也只能保证 Topic/Partition 级别。

5.消费者防重

从上面的分析可以看出,靠生产者防重和 Broker 防重,只能在 Topic/Partition 级别生效,这通常并不能满足我们的需求。而为了避免消费者重复消费对业务造成影响,消息防重还是必要的。这就要求我们做最后一道防线,在消费端进行防重或幂等处理。

消费端做防重,就不再考虑消息中间件层面的配置(比如 sequenceId),而是从消息体进行下手。

生产者发送消息时,给消息体赋值一个全局唯一的 ID,消费者处理消息时,根据全局唯一 ID 做防重。

比如消费端的逻辑是保存一条订单消息,那把唯一 ID 保存到数据库并且加一个唯一索引,这样根据唯一索引就可以做消息去重。

不过使用唯一索引也有缺点:

  • 如果使用 MySQL 数据库,不能使用 Change Buffer;

  • 非插入的场景(比如更新库存)不能去重。

对于唯一索引的缺点,我们可以引入 Redis 对唯一 ID 做保存,利用 setNx 判断消息是否已经处理过。如下图:

图片

if (jedis.setnx(ID, "1") == 1) {//处理业务,返回 ACK
}else {//直接返回 返回 ACK
}

6.总结

使用消息队列,在一些场景下是需要防重的。主流消息队列提供了一些防重的能力,但并不是完全可靠的。在对重复消息敏感的场景下,最好是在消费端处理消息时,从业务层面进行消息防重。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/72374.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

005 公网访问 docker rocketmq

文章目录 创建自定义网络创建NameServer容器创建Broker容器正式开始启动 Nameserver 容器启动 Broker 容器并关联 Nameserverdocker exec -it rmqbroker vi /etc/rocketmq/broker.conf检查 namesrv 解析检查 Broker 注册状态Nameserver 日志Broker 日志检查容器日志手动指定 Br…

解决Docker Desktop启动后Docker Engine stopped问题

一、问题描述 当我们更新了Docker Desktop后,在重新打开就显示【Docker Engine stopped(Docker引擎已经停止)】,无法正常使用Docker,如下图所示: 二、问题分析 1、检查电脑主板的CPU是否开启虚拟化; 2、需检查Docker所需的功能是否开启; 3、检查WSL是否匹配; Docker的…

MongoDB—(一主、一从、一仲裁)副本集搭建

MongoDB集群介绍&#xff1a; MongoDB 副本集是由多个MongoDB实例组成的集群&#xff0c;其中包含一个主节点&#xff08;Primary&#xff09;和多个从节点&#xff08;Secondary&#xff09;&#xff0c;用于提供数据冗余和高可用性。以下是搭建 MongoDB 副本集的详细步骤&am…

C++STL---<limits>

C <limits> 头文件&#xff1a; <limits> 头文件是 C 标准库中用于获取各种数据类型的数值范围、精度等信息的工具。它通过模板类 std::numeric_limits 提供了对基本数据类型&#xff08;如 int、float、double 等&#xff09;的详细属性查询功能。通过 std::nume…

蓝桥杯自我复习打卡

总复习&#xff0c;打卡1. 一。排序 1。选段排序 太可恶了&#xff0c;直接全排输出&#xff0c;一个测试点都没过。 AC 首先&#xff0c;这个【l,r】区间一定要包含p,或者q&#xff0c;pq一个都不包含的&#xff0c;[l,r]区间无论怎么变&#xff0c;都对ans没有影响。 其次&…

Flutter_学习记录_实现列表上拉加载更多的功能

可以用ScrollController组件来实现这样列表上拉加载更多的功能: 1. 定义变量 在StatefulWidget 的组件内&#xff0c;添加三个属性&#xff1a; // 滚动视图的控制器final ScrollController _scrollController ScrollController();// 是否已显示了上拉加载中bool _isShowM…

【Linux】【网络】不同子网下的客户端和服务器通信其它方式

【Linux】【网络】不同子网下的客户端和服务器通信其它方式 那么&#xff0c;在 NAT 环境下&#xff0c;应该如何让内网设备做为服务器&#xff0c;使内网设备被外部连接&#xff1f; 1 多拨 部分运营商&#xff0c;支持在多个设备上&#xff0c;通过 PPPoE 登录同一个宽带账…

《Python百练成仙》31-40章(不定时更新)

第卅一章 函数结丹def开紫府 罗酆山的鬼门关吞吐着猩红的变量阴风&#xff0c;每个风眼都涌动着作用域混乱的灵力乱流。叶军手握薛香遗留的丹田玉简&#xff0c;玉简表面浮现出残缺的函数符文&#xff1a; def 凝聚金丹(灵气):道基 灵气 * 0.618print(金丹品质) # 作用域外变…

六十天前端强化训练之第一天到第七天——综合案例:响应式个人博客项目

欢迎来到编程星辰海的博客讲解 目录 前言回顾 HTML5与CSS3基础 一、知识讲解 1. 项目架构设计&#xff08;语义化HTML&#xff09; 2. 响应式布局系统&#xff08;Flex Grid&#xff09; 3. 样式优先级与组件化设计 4. 完整响应式工作流 二、核心代码示例 完整HTML结…

测试的BUG分析

在了解BUG之前,我们要先了解软件测试的生命周期,因为大多数BUG都是在软件测试的过程中被发现的 软件测试的生命周期 在了解 软件测试的生命周期 之前,我们要先了解 软件的生命周期 ,虽然他们之间只差了两个字,但是差距还是很大的 首先是 软件生命周期 ,这个是站在 软件 的角…

【洛谷贪心算法题】P1094纪念品分组

该题运用贪心算法&#xff0c;核心思想是在每次分组时&#xff0c;尽可能让价格较小和较大的纪念品组合在一起&#xff0c;以达到最少分组的目的。 【算法思路】 输入处理&#xff1a;首先读取纪念品的数量n和价格上限w&#xff0c;然后依次读取每件纪念品的价格&#xff0c;…

[STM32]从零开始的STM32 BSRR、BRR、ODR寄存器讲解

一、前言 学习STM32一阵子以后&#xff0c;相信大家对STM32 GPIO的控制也有一定的了解了。之前在STM32 LED的教程中也教了大家如何使用寄存器以及库函数控制STM32的引脚从而点亮一个LED&#xff0c;之前的寄存器只是作为一个引入&#xff0c;并没有深层次的讲解&#xff0c;在教…

SQL分组问题

下列为电商公司用户访问时间数据 统计某个用户连续的访问记录&#xff0c;如果时间间隔小于60s&#xff0c;就分为一组 id ts 1001 17523641234 1001 17523641256 1002 17523641278 1001 17523641334 1002 17523641434 1001 17523641534 1001 17523641544 1002 17523…

3月2日 C++日常习题测试一答案

C测试题答案与讲解 一、填空题答案及讲解 答案&#xff1a;const 讲解&#xff1a;在 C 中&#xff0c;const关键字用于定义常量&#xff0c;一旦定义&#xff0c;其值不能被修改。例如const int num 10;&#xff0c;这里的num就是一个常量。 答案&#xff1a;3 讲解&…

2W8000字 LLM架构文章阅读指北

❝ 大模型架构专栏已经更新了30多篇文章。完整的专栏内容欢迎订阅&#xff1a; LLM 架构专栏 1、LLM大模型架构专栏|| 从NLP基础谈起 2、 LLM大模型架构专栏|| 自然语言处理&#xff08;NLP&#xff09;之建模 3、 LLM大模型架构之词嵌入&#xff08;Part1&#xff09; 3、 LLM…

SP导入智能材质球

智能材质球路径 ...\Adobe Substance 3D Painter\resources\starter_assets\smart-materials 放入之后就会自动刷新

网络原理----TCP/IP(3)

核心机制七----延时应答 默认情况下&#xff0c;接收方都是在收到数据报的第一时间&#xff0c;就返回ack&#xff0c;但是可以通过延时返回ack的方式来提高效率&#xff0c;理论上不是100%提高效率&#xff0c;但还是有一定帮助的。 因为如果接收数据的主机⽴刻返回ACK应答,…

MacBook Pro使用FFmpeg捕获摄像头与麦克风推流音视频

FFmpeg查看macos系统音视频设备列表 ffmpeg -f avfoundation -list_devices true -i "" 使用摄像头及麦克风同时推送音频及视频流: ffmpeg -f avfoundation -pixel_format yuyv422 -framerate 30 -i "0:1" -c:v libx264 -preset ultrafast -b:v 1000k -…

部署Joplin私有云服务器postgres版-docker compose

我曾经使用过一段时间 Joplin&#xff0c;官方版本是收费的&#xff0c;而我更倾向于将数据掌握在自己手中。因此&#xff0c;在多次权衡后&#xff0c;我决定自己搭建 Joplin 服务器并进行尝试。 个人搭建的版本与数据库直连&#xff0c;下面是使用 Docker Compose 配置数据库…

SQL的select语句完整的执行顺序

SQL的SELECT语句的执行顺序可以用"做菜流程"来类比理解。虽然我们写SQL时按SELECT…FROM…WHERE…顺序写&#xff0c;但数据库执行顺序完全不同。以下是通俗易懂的讲解&#xff08;附流程图和示例&#xff09;&#xff1a; &#x1f527; 执行顺序流程图&#xff1a…