【RabbitMQ】消息可靠性保障

news/2025/9/28 18:28:02/文章来源:https://www.cnblogs.com/jixingsuiyuan/p/19117336

本章目标

  • 掌握生产者确认(Publisher Confirms)机制,确保消息到达Broker。

  • 深入理解消费者确认(Consumer Acknowledgments)的最佳实践。

  • 学习死信队列(Dead Letter Exchange, DLX)处理失败消息。

  • 实现完整的消息可靠性保障体系。


一、理论部分

1. 消息传递的生命周期与可靠性挑战

在分布式系统中,消息可能在任何环节丢失:

  1. 生产者 -> Broker:网络故障、Broker崩溃

  2. Broker内部:服务器宕机、队列未持久化

  3. Broker -> 消费者:消费者处理失败、连接中断

2. 生产者确认(Publisher Confirms)

这是RabbitMQ提供的一种生产者端的可靠性机制。当生产者启用确认模式后,Broker会异步通知生产者消息是否已经成功处理。

  • 事务(Transactions):AMQP协议支持事务,但性能较差(同步,吞吐量降低约200-300倍)。

  • 发布者确认(Publisher Confirms):性能更好的异步替代方案,是生产环境推荐的方式。

确认的两种结果:

  • ACK:消息已被Broker成功接收和处理(持久化到磁盘)。

  • NACK:消息未被Broker处理(通常由于内部错误)。

3. 消费者确认(Consumer Acknowledgments)

我们在前面的章节已经接触过,本章将深入探讨:

  • 自动确认(autoAck: true):消息一送达就确认,风险高。

  • 手动确认(autoAck: false):

    • BasicAck:成功处理,消息从队列删除。

    • BasicNack:处理失败,可以要求重新入队或丢弃。

    • BasicReject:同BasicNack,但不支持批量操作。

4. 死信队列(Dead Letter Exchange, DLX)

当消息遇到以下情况时,会成为"死信":

  1. 消息被消费者basic.rejectbasic.nackrequeue = false

  2. 消息因TTL(Time-To-Live)过期

  3. 队列达到最大长度限制

死信消息会被重新发布到配置的DLX,然后根据DLX的类型路由到死信队列。

5. 完整的可靠性保障体系

生产级应用需要多层次的保障:

  1. 生产者确认:确保消息到达Broker

  2. 消息持久化:队列持久化 + 消息持久化

  3. 消费者确认:确保消息被成功处理

  4. 死信队列:处理无法正常消费的消息

  5. 监控与告警:及时发现和处理问题


二、实操部分:构建完整的可靠消息系统

我们将构建一个包含完整可靠性保障的订单处理系统。

第1步:创建项目结构

  1. 创建新解决方案,包含以下项目:

    • ReliableProducer - 支持确认的生产者

    • ReliableConsumer - 支持手动确认和死信处理的消费者

    • DeadLetterProcessor - 死信消息处理器

  2. 为所有项目添加RabbitMQ.Client NuGet包。

第2步:实现可靠生产者(ReliableProducer.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() 
{ HostName = "localhost", UserName = "myuser", Password = "mypassword" 
};using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 启用发布者确认模式
    channel.ConfirmSelect();// 声明持久化队列channel.QueueDeclare(queue: "reliable_orders",durable: true,exclusive: false,autoDelete: false,arguments: null);// 设置确认事件处理器channel.BasicAcks += (sender, ea) =>{Console.WriteLine($" [✓] Message {ea.DeliveryTag} confirmed by broker");};channel.BasicNacks += (sender, ea) =>{Console.WriteLine($" [✗] Message {ea.DeliveryTag} not confirmed by broker");// 在实际应用中,这里应该实现重试逻辑
    };for (int i = 1; i <= 10; i++){var message = $"Order #{i} - Product XYZ";var body = Encoding.UTF8.GetBytes(message);// 设置消息为持久化var properties = channel.CreateBasicProperties();properties.Persistent = true;properties.MessageId = Guid.NewGuid().ToString();// 发布消息channel.BasicPublish(exchange: "",routingKey: "reliable_orders",basicProperties: properties,body: body);Console.WriteLine($" [x] Sent {message}");// 等待确认(在实际应用中可能使用异步方式)if (channel.WaitForConfirms(TimeSpan.FromSeconds(5))){Console.WriteLine($" [✓] Message {i} confirmed");}else{Console.WriteLine($" [✗] Message {i} confirmation timeout");// 实现重试逻辑
        }Thread.Sleep(1000); // 模拟消息间隔
    }
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
View Code

第3步:配置死信交换机和队列

在实际应用中,我们通常在生产者和消费者中都声明所需的交换机和队列。这里我们在消费者中配置完整的死信机制。

第4步:实现可靠消费者(ReliableConsumer.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() 
{ HostName = "localhost", UserName = "myuser", Password = "mypassword" 
};using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明死信交换机channel.ExchangeDeclare("dlx", ExchangeType.Direct, durable: true);// 2. 声明死信队列channel.QueueDeclare("dead_letter_queue", durable: true,exclusive: false, autoDelete: false, arguments: null);// 3. 绑定死信队列到死信交换机channel.QueueBind("dead_letter_queue", "dlx", "dead_letter");// 4. 声明主队列,并配置死信参数var arguments = new Dictionary<string, object>{{ "x-dead-letter-exchange", "dlx" },          // 指定死信交换机{ "x-dead-letter-routing-key", "dead_letter" } // 死信路由键
    };channel.QueueDeclare(queue: "reliable_orders",durable: true,exclusive: false,autoDelete: false,arguments: arguments);// 设置公平分发channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);Console.WriteLine(" [*] Waiting for orders. To exit press CTRL+C");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");try{// 模拟业务处理
            ProcessOrder(message, ea.DeliveryTag);// 处理成功,手动确认channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);Console.WriteLine($" [✓] Order processed successfully: {ea.DeliveryTag}");}catch (Exception ex){Console.WriteLine($" [✗] Failed to process order {ea.DeliveryTag}: {ex.Message}");// 处理失败,拒绝消息并不重新入队(发送到死信队列)
            channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);}};channel.BasicConsume(queue: "reliable_orders",autoAck: false,  // 手动确认模式
                         consumer: consumer);Console.ReadLine();
}void ProcessOrder(string message, ulong deliveryTag)
{// 模拟业务逻辑 - 随机失败以测试可靠性机制var random = new Random();// 模拟10%的失败率if (random.Next(0, 10) == 0){throw new Exception("Simulated processing failure");}// 模拟处理时间Thread.Sleep(2000);Console.WriteLine($"    Processing order {deliveryTag}: {message}");
}
View Code

第5步:实现死信处理器(DeadLetterProcessor.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() 
{ HostName = "localhost", UserName = "myuser", Password = "mypassword" 
};using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 声明死信队列(确保存在)channel.QueueDeclare("dead_letter_queue", durable: true,exclusive: false, autoDelete: false, arguments: null);Console.WriteLine(" [*] Waiting for dead letters. To exit press CTRL+C");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var originalQueue = ea.BasicProperties.Headers?["x-first-death-queue"]?.ToString();Console.WriteLine($" [DEAD LETTER] Received failed message:");Console.WriteLine($"    Original Queue: {originalQueue}");Console.WriteLine($"    Message: {message}");Console.WriteLine($"    Routing Key: {ea.RoutingKey}");Console.WriteLine($"    Delivery Tag: {ea.DeliveryTag}");// 在实际应用中,这里可以实现:// 1. 发送告警通知// 2. 记录到错误日志// 3. 人工干预// 4. 重试机制
        Console.WriteLine("    -> Sending alert to administrator...");Console.WriteLine("    -> Logging to error system...");// 确认死信消息channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: "dead_letter_queue",autoAck: false,consumer: consumer);Console.ReadLine();
}
View Code

第6步:高级特性 - 带重试机制的消费者

创建RetryConsumer.cs,实现更复杂的重试逻辑:

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() 
{ HostName = "localhost", UserName = "myuser", Password = "mypassword" 
};using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 配置重试队列(带TTL)var retryArguments = new Dictionary<string, object>{{ "x-dead-letter-exchange", "" },{ "x-dead-letter-routing-key", "reliable_orders" },{ "x-message-ttl", 10000 } // 10秒后重试
    };channel.QueueDeclare("retry_queue", durable: true, exclusive: false, autoDelete: false, arguments: retryArguments);channel.QueueDeclare(queue: "reliable_orders", durable: true, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);Console.WriteLine(" [*] Waiting for messages with retry support.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 检查重试次数var retryCount = GetRetryCount(ea.BasicProperties);Console.WriteLine($" [x] Received (attempt {retryCount + 1}): {message}");try{ProcessOrderWithRetry(message, retryCount);channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);Console.WriteLine($" [✓] Successfully processed");}catch (Exception ex){Console.WriteLine($" [✗] Processing failed: {ex.Message}");if (retryCount < 3) // 最多重试3次
            {Console.WriteLine($" [↻] Scheduling retry {retryCount + 1}");// 发布到重试队列var properties = channel.CreateBasicProperties();properties.Persistent = true;properties.Headers = new Dictionary<string, object>{{ "retry-count", retryCount + 1 }};channel.BasicPublish("", "retry_queue", properties, body);channel.BasicAck(ea.DeliveryTag, false); // 确认原消息
            }else{Console.WriteLine($" [✗] Max retries exceeded, sending to DLQ");channel.BasicNack(ea.DeliveryTag, false, false);}}};channel.BasicConsume("reliable_orders", false, consumer);Console.ReadLine();
}int GetRetryCount(IBasicProperties properties)
{if (properties.Headers?.ContainsKey("retry-count") == true){var retryCountBytes = (byte[])properties.Headers["retry-count"];return BitConverter.ToInt32(retryCountBytes, 0);}return 0;
}void ProcessOrderWithRetry(string message, int retryCount)
{var random = new Random();// 模拟处理,重试次数越多成功率越高(模拟系统恢复)var failureChance = Math.Max(10 - retryCount * 3, 1); // 降低失败率if (random.Next(0, failureChance) == 0){throw new Exception($"Simulated failure on attempt {retryCount + 1}");}Thread.Sleep(1000);Console.WriteLine($"    Processed successfully on attempt {retryCount + 1}");
}
View Code

第7步:运行与测试

  1. 启动所有服务

    # 终端1:启动死信处理器
    dotnet run --project DeadLetterProcessor# 终端2:启动主消费者
    dotnet run --project ReliableConsumer# 终端3:启动生产者
    dotnet run --project ReliableProducer
  2. 测试场景1:正常流程

    • 观察生产者确认日志

    • 观察消费者处理成功的日志

  3. 测试场景2:消费者处理失败

    • 在消费者处理时强制关闭消费者进程

    • 观察消息重新投递到其他消费者

    • 或者观察消息进入死信队列

  4. 测试场景3:死信处理

    • 让消费者处理失败,消息进入死信队列

    • 观察死信处理器的告警和日志记录

  5. 测试场景4:重试机制

    • 使用RetryConsumer测试重试逻辑

    • 观察消息在重试队列中的行为

第8步:监控与管理

在RabbitMQ管理界面(http://localhost:15672)监控:

  • 队列深度和消息状态

  • 确认率和投递率

  • 死信队列中的消息数量


本章总结

在这一章中,我们构建了一个完整的消息可靠性保障体系:

  1. 生产者确认:使用ConfirmSelect和确认事件确保消息到达Broker。

  2. 消息持久化:队列持久化 + 消息持久化,应对服务器重启。

  3. 消费者确认:手动确认模式,确保消息被成功处理。

  4. 死信队列:处理无法正常消费的消息,防止消息丢失。

  5. 重试机制:实现带延迟的重试逻辑,提高系统韧性。

  6. 监控告警:通过死信处理器实现错误通知。

这些机制组合使用,可以构建出生产级的可靠消息系统。在下一章,我们将学习如何将RabbitMQ与ASP.NET Core集成,构建现代化的微服务应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/920974.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CPU 测试脚本

CPU 测试脚本Posted on 2025-09-28 18:23 大势趋007 阅读(0) 评论(1) 收藏 举报cpu 测试记录#!/bin/bash# 冒泡排序算法测试 - CPU负载测试 # 兼容旧版本Bashbubble_sort() {local array_name=$1eval "local…

Day23static详解

static修饰德成员变量,属于类的本身,被该类德所有实例共享,在类中可以通过类名直接访问,再导入包时打破必须通过类名访问静态成员的规则,将指定的静态成员直接引入当前类的作用域 package oop1.Demo7; //被fianl定…

11.prometheus监控之黑盒(blackbox)监控

一、黑盒监控"白盒监控"--需要把对应的Exporter程序安装到被监控的目标主机上,从而实现对主机各种资源及其状态的数据采集工作。但是由于某些情况下操作技术或其他原因,不是所有的Exporter都能部署到被监控…

网站访问量有什么用网站开发代码用什么软件

重点是要在程序管理窗口中“查看已安装的更新”打开当前系统中已安装更新列表&#xff0c;找到两个IE11的更新&#xff08;见下图“卸载文件“&#xff09;并卸载掉&#xff0c;这样windows功能中的ie11才会变成ie8. 打开控制面板 进入面板&#xff0c;点击程序&#xff0c;进…

自己做的网站怎么接入数据库嘉兴企业网站推广方法

配置IPsec on GRE Tunnel with IOS Firewall and NAT<?xml:namespace prefix o ns "urn:schemas-microsoft-com:office:office" />详细配置见附件

openssh升级

openssh升级## 功能```支持centos7.5升级openssh9.8``` ## 备份```/etc/pam.d/sshd/etc/ssh/sshd_config```## 物料```openssh.repoopenssh.tar.gz ```----```[openssh]name=openssh9.8baseurl=file:///openssh/gpgche…

实用指南:月匣 - 百度推出的AI情感陪伴与剧情互动应用

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

电子商务网站的建设与维护企业网站内容如何更新

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力&#xff0c;一起奔赴大厂。 目录 1.字符指针 2.指针数组 3.数组指针 4.数组传…

做纸巾定制的网站广告设计宣传画册

一、 选题的依据及意义 随着信息化技术的发展&#xff0c;敬老院的信息化管理也迎来了机遇和挑战&#xff0c;我们积极应对这场战斗&#xff0c;丝毫不懈怠。因为&#xff0c;在未来的互联网是一个开放的环境&#xff0c;而传统的管理是一个独立的对象维护和医疗管理模式&…

宜昌的网站建设wordpress菜单出不来

在制作项目的时候遇到一个需求&#xff0c;点击一个按钮弹出一个input输入框&#xff0c;并让输入框获得焦点&#xff0c;项目中引用了element-ui 在网上查找了很多方法&#xff0c;但是在实际使用中发现了一个问题无论是使用$ref获取input元素然后使用focus方法还是使用饿了么…

Python虚拟环境及创建和使用虚拟环境(Python3)

一、什么是Python虚拟环境 简单说,虚拟环境是一个独立的 Python 运行环境,它与系统全局的 Python 环境完全隔离。每个虚拟环境可以有自己独立的 Python 解释器版本(如果需要)。 每个虚拟环境中安装的第三方库(如 …

团队协作必备:16款在线协同编辑文档方案对比

为解决企业在线协同编辑文档的选型难题,本文深度评测了含坚果云在内的16款主流工具。文章从协作办公、数据安全与功能特色等多维度进行全面对比分析,旨在为不同需求的企业提供实用选型指南,帮助团队找到最合适的协同…

石材企业网站源码自创图片软件

文章目录 了解操作系统定义目的操作系统体系结构功能特征操作系统的区别(64位与32位)操作系统的地址内存管理缓存 了解操作系统 定义 操作系统是控制管理计算机系统的硬软件,分配调度资源的系统软件 目的 方便性,有效性(提高系统资源的利用率,提高系统的吞吐量) 操作系统体…

变电站、开闭所、环网柜、配电站

1、概念变电站:“区域总电源”,把高压电(如 220kV)变成 10kV,是所有下游设备的电来源;开闭所:“10kV 中转站”,把变电站来的 10kV 电分给周边的环网柜和配电站;环网柜:“10kV 小节点”,给配电站分支供电,还…

IDEA大幅度提升编译速度配置 - 指南

IDEA大幅度提升编译速度配置 - 指南2025-09-28 18:15 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !imp…

免费只做网站公司装修工程

《死锁实验报告》由会员分享&#xff0c;可在线阅读&#xff0c;更多相关《死锁实验报告(3页珍藏版)》请在人人文库网上搜索。1、操作系统实验二报告一实验名称&#xff1a;死锁的检测与解除二实验目的&#xff1a;观察死锁产生的条件&#xff0c;并使用适当的算法&#xff0c;…

网站建设入账哪个科目湛江人才网

事务原理 1 事务基础 1). 事务 事务 是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;事务会把所有的操作作为一个整体一起向系 统提交或撤销操作请求&#xff0c;即这些操作要么同时成功&#xff0c;要么同时失败。 2). 特性 原子性&#xff08;Atomi…

为AI注入灵魂:一种面向人机黑箱的元人文治理新范

为AI注入灵魂:一种面向人机黑箱的元人文治理新范式 在人工智能治理领域,我们正面临一个根本性的范式转移:挑战从纯粹的“技术黑箱”转向更为复杂的 “人机混合黑箱” 。当人类偏好、算法决策与社会环境相互缠绕,传…

2025年5款主流服务管理工具大盘点!总有一款最值得你选! - RAIN

2025年5款主流服务管理工具大盘点!总有一款最值得你选!一、开篇:数字化时代,服务管理已成企业 “刚需配置”​ 在当今数字化转型已成为企业核心战略的背景下,“服务能力”正逐渐演变为决定竞争力的关键因素。然而…

2025.9.28——1黄

普及/提高- P5194 [USACO05DEC] Scales S wpmx说自己没看清题卡了很久的简单dfs题,试着做了一下,难点在于倒着dfs,这是一个重要剪枝。