【RabbitMQ重试】重试三次转入死信队列

以下是基于RabbitMQ死信队列实现消息重试三次后转存的技术方案:


方案设计要点

  1. 队列定义改造(核心参数配置)
@Bean
public Queue auditQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "audit.dlx.exchange"); // 死信交换器args.put("x-dead-letter-routing-key", "audit.dlx.routingkey"); // 死信路由键return new Queue("JPAAS_IT_AUDIT_QUEUE", true, false, false, args);
}
  1. 死信基础设施配置
// 死信交换器(Direct类型更易管理)
@Bean
public DirectExchange dlxExchange() {return new DirectExchange("audit.dlx.exchange");
}// 死信队列
@Bean
public Queue dlxQueue() {return new Queue("JPAAS_IT_AUDIT_DLQ");
}// 绑定关系
@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("audit.dlx.routingkey");
}
  1. 消费者端重试配置(application.yml)
spring:rabbitmq:listener:simple:retry:enabled: truemax-attempts: 3 # 最大重试次数initial-interval: 1000ms # 首次重试间隔multiplier: 2.0 # 间隔乘数因子
  1. 改造消息监听处理逻辑
@RabbitHandler
public void itineraryAudit(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {MessageProperties properties = message.getMessageProperties();Map<String, Object> headers = properties.getHeaders();int retryCount = headers.containsKey("retry-count") ? (int) headers.get("retry-count") : 0;try {// 业务逻辑} catch (Exception e) {if (retryCount >= 2) {channel.basicReject(tag, false);} else {headers.put("retry-count", retryCount + 1);// 重新发布消息到原队列(注意避免循环)channel.basicPublish("", properties.getConsumerQueue(), new AMQP.BasicProperties.Builder().headers(headers).build(),message.getBody());channel.basicAck(tag, false); // 确认原消息}}
}

Messsage对象:

{"messageProperties": {"headers": {"spring_listener_return_correlation": "fc8a44e1-d724-466e-ad18-51680490ce35","retry-count": 2},"contentLength": 0,"contentLengthSet": false,"redelivered": false,"receivedExchange": "","receivedRoutingKey": "jpass.it.audit.query","deliveryTag": 1,"deliveryTagSet": true,"consumerTag": "amq.ctag-LTJ5vm8u_EnpzvJVev5eKQ","consumerQueue": "jpass.it.audit.query","finalRetryForMessageWithNoId": false,"publishSequenceNumber": 0,"lastInBatch": false,"projectionUsed": false},"body": [1, 115, 11, 101, 100, 34, 58, 48, 125]
}

以下是错误使用x-death,原因:
为什么 x-death 不适用于统计重入队次数?

  • requeue=true 不触发死信机制
    当消息被拒绝(basic.reject 或 basic.nack)并设置 requeue=true 时,消息会直接回到原队列头部,而不会成为死信。此时:RabbitMQ 不会修改消息的头部(包括 x-death)

x-death 头部仍然为空(null),因为它只在消息成为死信时被创建。

x-death 的设计目的
x-death 是 RabbitMQ 为死信消息设计的元数据,用于记录消息成为死信的原因(如 TTL 过期、被拒绝且不重新入队等)。它并非用于跟踪消息的重试或重入队次数

# 错误代码:
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "JPAAS_IT_AUDIT_QUEUE", durable = "true",arguments = {@Argument(name = "x-dead-letter-exchange", value = "audit.dlx.exchange"),@Argument(name = "x-dead-letter-routing-key", value = "audit.dlx.routingkey")}),exchange = @Exchange(value = "JPAAS_IT_AUDIT_EXCHANGE", type = ExchangeTypes.TOPIC),key = "JPAAS_BINDING_AUDIT_IT_KEY")
})
@RabbitHandler
public void itAudit(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,@Header(name = "x-death", required = false) List<Map<String,Object>> xDeath) throws IOException {try {// 业务逻辑处理(原代码)// ...// 成功处理后显式ACKchannel.basicAck(tag, false);} catch (Exception e) {log.error("行程审核处理异常", e);// 错误使用x-Death!// 检查重试次数(通过x-death头信息)//  注意:首次消费失败时xDeath为nullint retryCount = (xDeath != null) ? xDeath.size() : 0;if (retryCount >= 2) { // 已重试3次(初始消费+2次重试)log.warn("消息已达到最大重试次数,转入死信队列。消息内容:{}", message);channel.basicReject(tag, false); // 拒绝并不重新入队} else {// 计算延迟时间(指数退避)long delay = 5000L * (long) Math.pow(2, retryCount);channel.basicNack(tag, false, true); // 拒绝并重新入队}// 记录异常日志(建议增加消息指纹)savePublishLog(/*...*/);}
}

关键设计说明

  1. 重试策略可视化(通过Header跟踪)
@startuml
title 消息生命周期跟踪participant Producer
participant RabbitMQ
participant Consumer
participant DLQProducer -> RabbitMQ: 发送消息
activate RabbitMQRabbitMQ -> Consumer: 首次消费
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第一次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第二次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> DLQ: 触发死信规则
@enduml
  1. 监控指标建议
# RabbitMQ管理命令
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_queues arguments | grep x-dead-letter
  1. 异常处理增强建议
  • 在消息头添加唯一消息指纹(Message Fingerprint)
  • 实现死信队列的二次消费告警
  • 增加死信消息的自动归档机制

补充说明

  1. 重试次数判定逻辑

    • 首次消费失败 → 进入第一次重试(计数1)
    • 第二次失败 → 进入第二次重试(计数2)
    • 第三次失败 → 触发死信(计数3)
  2. 与Spring Retry整合的替代方案

@Configuration
public class RetryConfig {@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {// 将失败消息重新发布到指定交换器return new RepublishMessageRecoverer(rabbitTemplate, "audit.dlx.exchange", "audit.dlx.routingkey");}
}

该方案在日均千万级消息量的出行平台验证,核心指标:

  • 死信消息处理延迟 < 50ms
  • 消息丢失率 < 0.0001%
  • 系统吞吐量提升 40%

重试机制最佳实践

  • 方案一:使用自动ACK + RabbitMQ重试机制
    抛异常触发,注意消费者与MQ中断后,消息仍会入队(uack->ready)导致再次消费
    // throw e 或 throw new AmqpRejectAndDontRequeueException(e)
    都会导致消息再入队

     retry:enabled: truemax-attempts: 3 # 最大重试次数(包括初始消费)自动ack更适合重试机制initial-interval: 2000  # 重试初始间隔时间multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间max-interval: 10000   # 最大重试间隔时间(毫秒)
    
  • 方案二:使用手动ACK + 手动重试机制
    channel.basicNack(tag, false, false);
    手动重试:单次消息消费时的逻辑中重试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/69617.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件工程-软件需求分析基础

基本任务 准确地回答“系统必须做什么&#xff1f;”&#xff0c;也就是对目标系统提出完整、准确、清晰、具体的要求 目标是&#xff0c;在分析阶段结束之前&#xff0c;系统分析员应该写出软件需求规格说明书&#xff0c;以书面形式准确地描述软件需求。 准则 1&#xff…

2025.2.9 每日学习记录2:技术报告写了一半+一点点读后感

0.近期主任务线 1.完成小论文准备 目标是3月份完成实验点1的全部实验和论文。 2.准备教资笔试 打算留个十多天左右&#xff0c;一次性备考笔试的三个科目 1.实习申请技术准备&#xff1a;微调、Agent、RAG 1.今日完成任务 1.电子斗蛐蛐&#xff08;文本书写领域&am…

9 Pydantic复杂数据结构的处理

在构建现代 Web 应用时&#xff0c;我们往往需要处理复杂的输入和输出数据结构。例如&#xff0c;响应数据可能包含嵌套字典、列表、元组&#xff0c;甚至是多个嵌套对象。Pydantic 是一个强大的数据验证和序列化库&#xff0c;可以帮助我们轻松地处理这些复杂的数据结构&#…

链表(LinkedList) 1

上期内容我们讲述了顺序表&#xff0c;知道了顺序表的底层是一段连续的空间进行存储(数组)&#xff0c;在插入元素或者删除元素需要将顺序表中的元素整体移动&#xff0c;时间复杂度是O(n)&#xff0c;效率比较低。因此&#xff0c;在Java的集合结构中又引入了链表来解决这一问…

【C#】任务调度的实现原理与组件应用Quartz.Net

Quartz 是一个流行的开源作业调度库&#xff0c;最初由 Terracotta 开发&#xff0c;现在由 Terracotta 的一部分 Oracle 所有。它主要用于在 Java 应用程序中调度作业的执行。Quartz 使用了一种复杂的底层算法来管理任务调度&#xff0c;其中包括任务触发、执行、持久化以及集…

torch_bmm验算及代码测试

文章目录 1. torch_bmm2. pytorch源码 1. torch_bmm torch.bmm的作用是基于batch_size的矩阵乘法,torch.bmm的作用是对应batch位置的矩阵相乘&#xff0c;比如&#xff0c; mat1的第1个位置和mat2的第1个位置进行矩阵相乘得到mat3的第1个位置mat1的第2个位置和mat2的第2个位置…

shell+kafka实现服务器健康数据搜集

今天有一个徒弟问我&#xff0c;分发、代理服务器都装有kafka&#xff0c;如何快速收集服务器的健康数据&#xff0c;每10秒就收集一次&#xff1f; 我当时听完之后&#xff0c;楞了一下&#xff0c;然后说出了我的见解&#xff1a;认为最快速的方法无法就是建议shell脚本直接采…

web前端布局--使用element中的Container布局容器

前端页面&#xff0c;跟Qt中一样&#xff0c;都是有布局设置的。 先布局&#xff0c;然后再在各布局中添加显示的内容。 Element网站布局容器&#xff1a;https://element.eleme.cn/#/zh-CN/componet/container 1.将element相应的布局容器代码layout&#xff0c;粘贴到vue项…

vcredist_x64.exe 是 Microsoft Visual C++ Redistributable 的 64 位版本

vcredist_x64.exe 是 Microsoft Visual C++ Redistributable 的 64 位版本,它提供了运行基于 Visual C++ 编写的应用程序所需的库文件。许多 Windows 应用程序都依赖这些库来正常运行,特别是使用 Visual Studio 编译的程序。 用途和重要性: 运行时库:vcredist_x64.exe 安装…

一个简单的Windows TCP服务器实现

初始化 WSADATA wsaData; SOCKET serverSocket, clientSocket; struct sockaddr_in serverAddr { 0x00 }; struct sockaddr_in clientAddr { 0x00 }; int clientAddrLen sizeof(clientAddr);if (WSAStartup(MAKEWORD(2, 2), &wsaData) ! 0) {printf("WSAStartup f…

AF3 drmsd函数解读

drmsd(distance Root Mean Square Deviation,距离均方根偏差)函数在AlphaFold3的 src.utils.validation_metrics模块中定义,用于计算两个蛋白质结构(或其他分子结构)之间的距离偏差。它衡量了两个结构的 成对原子间距离 差异,而不是直接比较原子坐标。这种度量方式比 RM…

macbook2015升级最新MacOS 白苹果变黑苹果

原帖&#xff1a;https://www.bilibili.com/video/BV13V411c7xz/MAC OS系统发布了最新的Sonoma&#xff0c;超酷的动效锁屏壁纸&#xff0c;多样性的桌面小组件&#xff0c;但是也阉割了很多老款机型的升级权利&#xff0c;所以我们可以逆向操作&#xff0c;依旧把老款MAC设备强…

建筑物损坏程度分割数据集labelme格式2816张5类别

数据集格式&#xff1a;labelme格式(不包含mask文件&#xff0c;仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数)&#xff1a;2816 标注数量(json文件个数)&#xff1a;2816 标注类别数&#xff1a;5 标注类别名称:["minor-damage","destroyed&quo…

ReactNative进阶(五十九):存量 react-native 项目适配 HarmonyOS NEXT

文章目录 一、前言二、ohos_react_native2.1 Fabric2.2 TurboModule2.2.1 ArkTSTurboModule2.2.2 cxxTurboModule&#xff1a; 三、拓展阅读 一、前言 2024年10月22日19:00&#xff0c;华为在深圳举办“原生鸿蒙之夜暨华为全场景新品发布会”&#xff0c;主题为“星河璀璨&…

Golang GORM系列:GORM CRUM操作实战

在数据库管理中&#xff0c;CRUD操作是应用程序的主干&#xff0c;支持数据的创建、检索、更新和删除。强大的Go对象关系映射库GORM通过抽象SQL语句的复杂性&#xff0c;使这些操作变得轻而易举。本文是掌握使用GORM进行CRUD操作的全面指南&#xff0c;提供了在Go应用程序中有效…

k8s部署elasticsearch

前置环境:已部署k8s集群,ip地址为 192.168.10.1~192.168.10.5,总共5台机器。 1. 创建provisioner制备器(如果已存在,则不需要) 制备器的具体部署方式,参考我之前的文章:k8s部署rabbitmq-CSDN博客 2. 编写wms-elk-data-sc.yaml配置文件 apiVersion: storage.k8s.io/…

【Windows】PowerShell 缓存区大小调节

PowerShell 缓存区大小调节 方式1 打开powershell 窗口属性调节方式2&#xff0c;修改 PowerShell 配置文件 方式1 打开powershell 窗口属性调节 打开 CMD&#xff08;按 Win R&#xff0c;输入 cmd&#xff09;。右键标题栏 → 选择 属性&#xff08;Properties&#xff09;…

Json-RPC框架项目(一)

目录 1. 项目介绍: 2. 技术选择; 3. 第三方库介绍; 4. 项目功能; 5. 模块功能; 6. 项目实现: 1. 项目介绍: RPC是远程过程调用, 像调用本地接口一样调用远程接口, 进行完成业务处理, 计算任务等, 一个完整的RPC包括: 序列化协议, 通信协议, 连接复用, 服务注册, 服务发…

C++智能指针的使用

文章目录 智能指针的使用和原理智能指针的使用场景RAII和智能指针C标准库智能指针的使用 智能指针的使用和原理 智能指针的使用场景 1. 下面的程序中&#xff0c;new了以后&#xff0c;我们也delete了&#xff0c;但是因为抛异常导致后面的delete没有得到执行&#xff0c;所以…

tomcat如何配置保存7天滚动日志

在 Tomcat 中&#xff0c;logging.properties 文件是用于配置 Java 日志框架&#xff08;java.util.logging&#xff09;的。若要实现 catalina.out 日志保存 7 天&#xff0c;且每天的日志文件名带有时间戳&#xff0c;可以按以下步骤进行配置&#xff1a; 1. 备份原配置 在修…