RabbitMQ重复消费如何解决

消息重复消费的原因

  1. 生产者重试:网络波动导致生产者未收到 Broker 确认,重复发送消息。
  2. 消费者失败:消费者处理消息后未发送 ACK,消息重新入队。
  3. 集群故障转移:主节点宕机,未确认消息被重新投递。

解决方案

1. 消费者幂等性设计

原理:确保同一消息多次处理的结果与一次处理相同。

实现方式
  • 数据库唯一约束
    利用业务字段(如订单号)的唯一性约束,避免重复插入数据。

    CREATE TABLE orders (id VARCHAR(64) PRIMARY KEY, -- 唯一订单号amount DECIMAL(10,2)
    );
    
    // Java 示例(使用 MyBatis)
    public void processOrder(Order order) {try {orderMapper.insert(order); // 唯一约束冲突时会抛出异常// 业务逻辑...} catch (DuplicateKeyException e) {// 已处理过该订单,直接跳过log.warn("订单已存在: {}", order.getId());}
    }
    
  • Redis 原子操作
    使用 Redis 记录已处理消息的 ID,通过 SETNX 命令实现原子性检查。

    // Java 示例(使用 Spring Data Redis)
    public boolean isMessageProcessed(String messageId) {Boolean result = redisTemplate.opsForValue().setIfAbsent("msg:" + messageId, "1", Duration.ofMinutes(30));return Boolean.TRUE.equals(result);
    }public void consumeMessage(Message message) {String messageId = message.getMessageId();if (!isMessageProcessed(messageId)) {// 已处理过,直接返回return;}// 业务逻辑...
    }
    

2. 消息全局唯一 ID

原理:为每条消息分配唯一 ID,消费者记录已处理 ID。

实现步骤
  1. 生产者端:发送消息时附加唯一 ID。

    // Java 示例(使用 RabbitTemplate)
    public void sendOrder(Order order) {String messageId = UUID.randomUUID().toString();Message message = MessageBuilder.withBody(order.toJson().getBytes()).setHeader("messageId", messageId).build();rabbitTemplate.send("order.exchange", "order.key", message);
    }
    
  2. 消费者端:处理前检查 ID 是否已存在。

    // Java 示例(使用 @RabbitListener)
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Message message, Channel channel) throws IOException {String messageId = message.getMessageProperties().getHeader("messageId");if (redisTemplate.hasKey("processed:" + messageId)) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}// 业务逻辑...redisTemplate.opsForValue().set("processed:" + messageId, "1", Duration.ofDays(1));channel.basicAck(deliveryTag, false);
    }
    

3. 手动确认模式(Manual ACK)

原理:消费者处理完消息后手动发送 ACK,避免消息因异常重新入队。

配置与代码
  1. 配置手动 ACK(Spring Boot):

    spring:rabbitmq:listener:simple:acknowledge-mode: manual
    
  2. 消费者逻辑

    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {// 业务逻辑...channel.basicAck(deliveryTag, false); // 确认消息} catch (Exception e) {channel.basicNack(deliveryTag, false, true); // 重入队列}
    }
    

4. 消息去重表

原理:在数据库中维护一张去重表,记录已处理的消息 ID。

表结构
CREATE TABLE message_dedup (message_id VARCHAR(128) PRIMARY KEY,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
消费者逻辑
public void consumeMessage(Message message) {String messageId = extractMessageId(message);try {jdbcTemplate.update("INSERT INTO message_dedup (message_id) VALUES (?)", messageId);// 业务逻辑...} catch (DuplicateKeyException e) {// 消息已处理,直接ACKchannel.basicAck(deliveryTag, false);}
}

5. 消息过期与死信队列

原理:设置消息 TTL,超时未处理则转入死信队列,避免无限重试。

配置队列 TTL 和死信交换
// Java 配置示例
@Bean
public Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 60000); // 消息60秒过期args.put("x-dead-letter-exchange", "dlx.exchange");args.put("x-dead-letter-routing-key", "dlx.key");return new Queue("order.queue", true, false, false, args);
}@Bean
public DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange");
}@Bean
public Queue dlxQueue() {return new Queue("dlx.queue");
}@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}

方案对比与选型

方案优点缺点适用场景
数据库唯一约束无需额外组件高并发下数据库压力大低频业务(如订单创建)
Redis 原子操作高性能需维护 Redis 高可用高频业务(如支付回调)
手动ACK避免消息丢失需处理ACK异常所有需要可靠消费的场景
消息去重表数据持久化增加数据库写入压力数据一致性要求高的场景
死信队列避免消息堆积需额外处理死信消息需要异常消息兜底的场景

总结

  • 幂等性设计是核心:无论消息重复多少次,业务结果保持一致。
  • 组合使用多种方案:例如“手动ACK + Redis去重”兼顾可靠性与性能。
  • 监控与告警:通过 RabbitMQ 管理界面监控消息积压情况,设置阈值告警。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/73096.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Node-RED基础1

目录 一、概述二、安装三、基操四、通讯五、数据六、节点七、 应用END 一、概述 Rode-Red是什么&#xff1f; 基于Node.js的物联网开发工具&#xff0c;做API、通讯&#xff1b;提供了一些基本的监控功能&#xff0c;可在编辑器界面中查看节点的运行状态、消息流量等信息。通…

java登神之阶之顺序表

一、了解List接口 在Java中&#xff0c;List接口是一个非常重要的集合框架接口&#xff0c;它继承自Collection接口&#xff08;Collection接口继承Iterable接口&#xff09;。List接口定义了一个有序集合&#xff0c;允许我们存储元素集合。并且可以根据元素的索引来访问集合中…

redux_旧版本

reduxjs/toolkit&#xff08;RTK&#xff09;是 Redux 官方团队推出的一个工具集&#xff0c;旨在简化 Redux 的使用和配置。它于 2019 年 10 月 正式发布&#xff0c;此文章记录一下redux的旧版本如何使用&#xff0c;以及引入等等。 文件目录如下&#xff1a; 步骤 安装依…

MySQL:SQL优化实际案例解析(持续更新)

文章目录 一、MySQL&#xff1a;SQL优化1、时间格式化问题&#xff08;字符串&#xff09;2、in/inner join的问题 一、MySQL&#xff1a;SQL优化 1、时间格式化问题&#xff08;字符串&#xff09; -- 优化前 SELECT * FROM test_table WHERE date_format( begin_time, %Y-%…

【含文档+PPT+源码】基于Python的美食数据的设计与实现

项目介绍 本课程演示的是一款基于Python的美食数据分析系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 带你从零开始部署运行本套系统 该项目附带的源码…

vue调整表格样式之深度修改

举例&#xff1a; <div class"grid-item"><h3>日数据</h3><el-table :data"dailyData" v-loading"loading"><el-table-column label"销售姓名" align"center" prop"salesName" />…

【Go每日一练】统计字符出现的次数

&#x1f47b;创作者&#xff1a;丶重明 &#x1f47b;创作时间&#xff1a;2025年3月9日 &#x1f47b;擅长领域&#xff1a;运维 目录 1.&#x1f636;‍&#x1f32b;️题目&#xff1a;统计字符出现的次数2.&#x1f636;‍&#x1f32b;️代码中可用的资源3.&#x1f636;…

uniapp在APP平台(Android/iOS)选择非媒体文件

TOC 背景 在我们APP开发过程中&#xff0c;经常会有这样一个需求场景&#xff1a;从手机中选择文件然后进行上传&#xff0c;这些文件主要分为两类&#xff0c;媒体文件和非媒体文件。而媒体文件选择在APP平台我们可以使用uni.chooseImage和uni.chooseVideo这两个API来实现。…

【eNSP实战】配置交换机端口安全

拓扑图 目的&#xff1a;让交换机端口与主机mac绑定&#xff0c;防止私接主机。 主机PC配置不展示&#xff0c;按照图中配置即可。 开始配置之前&#xff0c;使用PC1 ping 一遍PC2、PC3、PC4、PC5&#xff0c;让交换机mac地址表刷新一下记录。 LSW1查看mac地址表 LSW1配置端…

卡尔曼滤波算法从理论到实践:在STM32中的嵌入式实现

摘要&#xff1a;卡尔曼滤波&#xff08;Kalman Filter&#xff09;是传感器数据融合领域的经典算法&#xff0c;在姿态解算、导航定位等嵌入式场景中广泛应用。本文将从公式推导、代码实现、参数调试三个维度深入解析卡尔曼滤波&#xff0c;并给出基于STM32硬件的完整工程案例…

Redis----大key、热key解决方案、脑裂问题

文章中相关知识点在往期已经更新过了&#xff0c;如果有友友不理解可翻看往期内容 出现脑裂问题怎么保证集群还是高可用的 什么是脑裂问题 脑裂说的就是当我们的主节点没有挂&#xff0c;但是因为网络延迟较大&#xff0c;然后和主节点相连的哨兵通信较差&#xff0c;之后主…

python总结(3)

创建自定义类 终于要创建自定义类了!下面是一个简单的示例: class Person:def set_name(self, name):self.name namedef get_name(self):return self.namedef greet(self):print("Hello, world! Im {}.".format(self.name))这个示例包含三个方法定义&#xff0c;它…

word毕业论文“et al.”替换为“等”——宏

Sub 中文参考文献改等()中文参考文献改等 宏Selection.Find.ClearFormattingSelection.Find.Replacement.ClearFormattingWith Selection.Find.Text "([一-龥], )et al.".Replacement.Text "\1等.".Forward True.Wrap wdFindContinue.Format False.Ma…

网络编程-----服务器(多路复用IO 和 TCP并发模型)

一、单循环服务器模型 1. 核心特征 while(1){newfd accept();recv();close(newfd);}2. 典型应用场景 HTTP短连接服务&#xff08;早期Apache&#xff09;CGI快速处理简单测试服务器 3. 综合代码 #include <stdio.h> #include <sys/types.h> /* See NO…

typora高亮方案+鼠标侧键一键改色

引言 在typora里面有一个自定义的高亮, <mark></mark>>但是单一颜色就太难看了, 我使用人工智能, 搜索全网艺术家, 汇集了几种好看的格式,并且方便大家侧键一键 调用, 是不是太方便啦 ! 示例 午夜模式 春意盎然 深海蓝调 石墨文档 秋日暖阳 蜜桃宣言 使用方法 …

swift -(5) 汇编分析结构体、类的内存布局

一、结构体 在 Swift 标准库中&#xff0c;绝大多数的公开类型都是结构体&#xff0c;而枚举和类只占很小一部分 比如Bool、 Int、 Double、 String、 Array、 Dictionary等常见类型都是结构体 ① struct Date { ② var year: Int ③ var month: Int ④ …

从零开始学机器学习——初探分类器

首先给大家介绍一个很好用的学习地址:https://cloudstudio.net/columns 在上一节中,我们已经完成了对所有经过均衡和清洗的菜品数据的准备工作。接下来,我们将考虑使用多种算法,也就是不同的分类器,来构建模型。我们将利用这个数据集和各种分类器,基于一组特定的配料,预…

黑色RGB是什么

>好的&#xff0c;用户现在问的是“黑色rgb是什么”。我需要先理解用户的问题。之前用户在使用MATLAB调整网格线颜色时&#xff0c;可能看到了默认颜色是黑色&#xff0c;或者之前我提到过默认颜色是[0.15 0.15 0.15]&#xff0c;而用户可能现在想知道黑色的RGB值具体是什么…

做到哪一步才算精通SQL

做到哪一步才算精通SQL-Structured Query Language 数据定义语言 DDL for StructCREATE&#xff1a;用来创建数据库、表、索引等对象ALTER&#xff1a;用来修改已存在的数据库对象DROP&#xff1a;用来删除整个数据库或者数据库中的表TRUNCATE&#xff1a;用来删除表中所有的行…

《深度解析DeepSeek-M8:量子经典融合,重塑计算能效格局》

在科技飞速发展的今天&#xff0c;量子计算与经典算法的融合成为了前沿领域的焦点。DeepSeek-M8的“量子神经网络混合架构”&#xff0c;宛如一把钥匙&#xff0c;开启了经典算法与量子计算协同推理的全新大门&#xff0c;为诸多复杂问题的解决提供了前所未有的思路。 量子计算…