【RabbitMQ】保证消息不丢失

要确保 RabbitMQ 在消费者(Python 服务)重启或挂掉时消息不丢失,需结合 消息持久化确认机制(ACK)死信队列(DLX) 实现高可靠性:


1. 消息持久化(Durability)

确保消息和队列在 RabbitMQ 服务重启后仍存在:

Java 发布者(设置持久化)
// 创建持久化队列 + 持久化消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明持久化队列(durable=true)channel.queueDeclare("image_queue", true, false, false, null);// 发布持久化消息(deliveryMode=2)String message = "{\"task_id\": \"123\"}";channel.basicPublish("", "image_queue", new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build(),message.getBytes());System.out.println("消息已持久化发送");
}
Python 消费者(声明持久化队列)
channel.queue_declare(queue='image_queue', durable=True)  # durable=True

2. 手动确认(Manual ACK)

消费者处理完消息后显式发送 ACK,避免消息因崩溃丢失:

Python 消费者(关闭自动 ACK)
def callback(ch, method, properties, body):try:print(f"处理消息: {body.decode()}")# 业务逻辑...ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认except Exception as e:print(f"处理失败: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)  # 重新入队channel.basic_consume(queue='image_queue', on_message_callback=callback, auto_ack=False)  # 关闭自动ACK

关键点

  • auto_ack=False:必须关闭自动确认。
  • 处理成功后调用 basic_ack,失败时 basic_nack 并重新入队。

3. 死信队列(DLX)

处理因消费者崩溃或消息超时未被确认的消息:

Java 发布者(声明死信交换机和队列)
// 定义死信交换机和队列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");  // 死信交换机
args.put("x-message-ttl", 60000);  // 消息存活时间(可选)channel.queueDeclare("image_queue", true, false, false, args);
channel.exchangeDeclare("dlx_exchange", "direct");
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "");
Python 消费者(监听死信队列)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.basic_consume(queue='dlx_queue', on_message_callback=handle_dlx_message, auto_ack=False)

作用

  • 消息超过 TTL 或消费者拒绝时,自动路由到死信队列。
  • 可对死信消息进行补偿处理(如重试或记录日志)。

4. 消费者高可用(HA)

确保消费者服务崩溃后能自动恢复:

方案 1:进程守护(如 systemd)
# /etc/systemd/system/python_consumer.service
[Unit]
Description=Python RabbitMQ Consumer
After=network.target[Service]
User=root
WorkingDirectory=/opt/app
ExecStart=/usr/bin/python3 /opt/app/consumer.py
Restart=always  # 崩溃后自动重启[Install]
WantedBy=multi-user.target
方案 2:容器化(Docker)
# Dockerfile
FROM python:3.9
COPY consumer.py /app/
CMD ["python", "/app/consumer.py"]
docker run -d --restart=unless-stopped my_consumer

5. 消息补偿机制(可选)

极端情况下(如 RabbitMQ 崩溃),可通过数据库记录消息状态:

Java 发布者(本地事务)
// 伪代码:本地事务
try {saveToDatabase(task);  // 1. 先存数据库sendToRabbitMQ(task); // 2. 再发消息commitTransaction();
} catch (Exception e) {rollbackTransaction();// 定时任务扫描数据库补偿发送
}

完整流程图

Java Publisher RabbitMQ Python Consumer DLX 发布持久化消息 (deliveryMode=2) 推送消息 (auto_ack=false) basic_ack() basic_nack(requeue=true) 重新投递 alt [处理成功] [处理失败或崩溃] 转入死信队列 触发补偿逻辑 alt [消息超时或多次失败] Java Publisher RabbitMQ Python Consumer DLX

最佳实践总结

措施实现方式作用
消息持久化deliveryMode=2 + queueDeclare(durable=true)防止 RabbitMQ 重启丢失消息
手动 ACKauto_ack=false + basic_ack()/basic_nack()确保消息处理完成才删除
死信队列(DLX)x-dead-letter-exchange + 独立死信消费者处理超时或失败消息
消费者高可用systemd Restart=alwaysdocker --restart=unless-stopped消费者崩溃后自动恢复
消息补偿数据库记录 + 定时任务扫描极端情况下的兜底措施

通过以上组合方案,可确保消息在消费者崩溃、重启或 RabbitMQ 异常时不丢失、不重复、可恢复

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/78026.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python基本语法(控制语句)

#控制语句 Python语言的控制语句和其他编程语言类似&#xff0c;常用的有if…else、while、for语句。 案例2一7控制语句 第1组代码&#xff0c;说明if-else语句&#xff1a; #1 print(\n1,if) x,y,z10,20,5 if x>y:print(x>y) else:print(x<y)输出结果: 1,if x<…

并发设计模式实战系列(10):Balking(犹豫模式)

&#x1f31f; 大家好&#xff0c;我是摘星&#xff01; &#x1f31f; 今天为大家带来的是并发设计模式实战系列&#xff0c;第10章Balking&#xff08;犹豫模式&#xff09;&#xff0c;废话不多说直接开始~ 目录 一、核心原理深度拆解 1. 状态守护机制 2. 与状态模式的…

【强化学习系列】贝尔曼方程

首先回顾状态价值函数和动作价值函数的定义&#xff1a; 状态价值函数 v π ( s ) v_\pi(s) vπ​(s)是从状态 s s s出发&#xff0c;直至一幕结束后获得的回报的期望值 动作价值函数 q π ( s , a ) q_\pi(s,a) qπ​(s,a)是从状态 s s s出发&#xff0c;采取动作 a a a后&…

donet使用指定版本sdk

ps:来自微软官方方案,实测可行,就是在项目任意目录下在新建 global.json,并配置sdk版本 SDK 使用最新安装的版本 SDK 命令包括 dotnet new 和 dotnet run。 .NET CLI 必须为每个 dotnet 命令选择一个 SDK 版本。 即使在以下情况下&#xff0c;它也会默认使用计算机上安装的最新…

x-cmd install | Orbiton:极简至上的终端文本编辑器与轻量级 IDE

目录 核心特点安装适用场景优势 厌倦了臃肿复杂的 IDE&#xff1f;渴望一个轻巧、快速、专注的编码环境&#xff1f;Orbiton&#xff0c;一款极简主义的终端文本编辑器与轻量级 IDE&#xff0c;将带给你前所未有的编码体验。 核心特点 极简主义&#xff0c;专注编码&#xff1…

大脑、机器人与贝叶斯信念及AI推理

在机器不再局限于重复性任务的世界里&#xff0c;机器人技术已经大胆地迈入了感知、学习和决策的领域。这篇文章探讨了智能机器人系统是如何构建的——从理解它们嘈杂的传感器和不确定的环境&#xff0c;到使它们能够做出明智的选择并随着时间的推移调整自己的行为。 AI推理 …

线上婚恋相亲小程序源码介绍

​基于ThinkPHP、FastAdmin和UniApp开发的线上婚恋相亲小程序源码&#xff0c;这款小程序源码采用了ThinkPHP作为后端框架&#xff0c;其强大的功能与良好的扩展性为程序的稳定运行提供了保障。 ​FastAdmin作为后台管理框架&#xff0c;使得管理员能够便捷地对用户信息、相亲…

长短期记忆(LSTM)简介

RNN 的主要限制在于它无法记住很长的序列&#xff0c;并且会陷入梯度消失的问题。 什么是梯度消失问题&#xff1f; 当添加更多具有某些激活函数的层时&#xff0c;神经网络中损失函数的梯度趋近于零&#xff0c;这使得网络难以训练。 长短期记忆&#xff08;LSTM&#xff09;…

JESD204B 探究

JESD204B协议是高速串行接口标准,主要用于ADC/DAC与逻辑器件(如FPGA)之间的数据传输。以下为综合解析: 一、协议概述 ‌核心作用‌ 通过高速SERDES技术实现数模转换器与逻辑器件间的高效数据传输,支持多通道同步和确定性延迟,适用于GB级吞吐量场景23。‌版本演进‌ JESD2…

Flutter PIP 插件 ---- 新增PipActivity,Android 11以下支持自动进入PIP Mode

接上文 Flutter PIP 插件 ---- Android 项目地址 PIP&#xff0c; pub.dev也已经同步发布 pip 0.0.3&#xff0c;你的加星和点赞&#xff0c;将是我继续改进最大的动力 开发文档 Add videos using picture-in-picture (PiP)介绍PIP功能从 Android 8.0 (API level 26) 引入&…

【Java开发日记】6个Java 工具,轻松分析定位 JVM 问题 !

目录 使用 JDK 自带工具查看 JVM 情况 jps jinfo jvisualvm jcm 使用 JDK 自带工具查看 JVM 情况 JDK 自带了很多命令行甚至是图形界面工具&#xff0c;帮助查看 JVM 的一些信息。比如&#xff0c;在机器上运行 ls 命令&#xff0c;可以看到 JDK 8 提供了非常多的工具或程…

动态规划简单题2

leetcode91题&#xff08;解码方法&#xff09; 分析题目&#xff1a; 1.这是一种解码&#xff0c;就是给多个数字组成的字符串&#xff0c;把这些数字解码成字母&#xff0c;看看一共有多少种 2.如果一个数字前有前导0就不合法&#xff0c;比如06&#xff0c;这与6不同&…

(007)Excel 公式的使用

文章目录 逻辑运算公式的参数常用函数引用方式引用工作表和工作簿表格的引用修改公式的计算时机区域交叉引用 逻辑运算 公式的参数 单元格引用&#xff1a;SUM(A1:A24)。字面值&#xff1a;SQRT(121)。字面文本字符串&#xff1a;PROPER(“john.f.smith”)。表达式&#xff1a…

Unity 和 Unreal Engine(UE) 两大主流游戏引擎的核心使用方法

以下是 Unity 和 Unreal Engine&#xff08;UE&#xff09; 两大主流游戏引擎的核心使用方法和对比分析&#xff0c;帮助开发者快速上手并根据项目需求选择合适工具&#xff1a; 一、Unity 使用指南 1. 安装与配置 安装&#xff1a;从 Unity Hub 下载&#xff0c;选择长期支持…

猜数字游戏:从数学原理到交互体验的完整设计指南

目录 猜数字游戏&#xff1a;从数学原理到交互体验的完整设计指南引言第一章 游戏数学原理1.1 均匀分布与随机生成1.2 最优猜测策略 第二章 游戏系统设计2.1 核心架构2.2 动态难度系统 第三章 交互设计细节3.1 输入验证系统3.2 渐进式提示机制 第四章 进阶功能设计4.1 智能辅导…

2025工业大模型白皮书 | 蚂蚁工厂北京航空航天大学联合出品

由蚂蚁工厂与北京航空航天大学联合发布的《2025工业大模型白皮书》是一部针对工业领域大模型技术发展的前瞻性研究报告。该白皮书系统梳理了工业大模型的技术演进、核心应用场景、关键挑战及未来发展趋势&#xff0c;旨在为制造业数字化转型提供理论支撑和实践指南。作为产学研…

JavaWeb:后端web基础(TomcatServletHTTP)

一、今日内容 二、Tomcat 介绍与使用 介绍 基本使用 小结 配置 配置 查找进程 三、Servlet 什么是Servlet 快速入门 需求 步骤 1.新建工程-模块&#xff08;Maven&#xff09; 2.修改打包方式-war 3.编写代码 /*** 可以选择继承HttpServlet*/ WebServlet("/hello&q…

构建现代分布式云架构的三大支柱:服务化、Service Mesh 与 Serverless

目录 前言1. 服务化架构模式&#xff1a;构建可扩展的基础单元1.1 服务化的定义与演进1.2 在分布式云中的价值1.3 面临的挑战 2. Service Mesh 架构&#xff1a;服务通信的治理中枢2.1 什么是 Service Mesh&#xff1f;2.2 功能与优势2.3 在分布式云中的角色2.4 落地难点 3. Se…

嵌入式C语言的运算符与输入输出

目录 1. 运算符 1.1 位运算符 1.1.1 位运算 ~ 1.1.2 位逻辑与 & 1.1.3 位逻辑或 | 1.1.4 位逻辑异或 ^ 1.1.5 位移位运算 1.1.6 将无符号位的某位快速置 1 1.2 三目运算符 1.3 逗号运算符 1.4 运算符优先级 2. 输出 2.1 字符输出函数 2.2 格式输出函数 2.3 字符…

IPD研学:76页页基于IPD思想-华为需求管理培训方案【附全文阅读】

适应人群 本方案适用于企业中参与产品研发、市场、销售、项目管理等部门的人员,尤其是负责需求管理工作的相关从业者;致力于提升产品竞争力,对优化需求管理流程、提高产品开发质量感兴趣的企业管理者;以及希望了解行业前沿需求管理方法,寻求突破和创新的相关人士。…