工作队列(Work Queues)与消息确认(Ack)

本章目标

  • 理解工作队列(竞争消费者模式)的概念和适用场景。

  • 掌握消息确认(Acknowledgment)机制,实现可靠的消息处理。

  • 学习消息持久化(Durability),防止服务器重启导致消息丢失。

  • 使用公平分发(Fair Dispatch)来优化多个消费者的工作效率。


一、理论部分

1. 工作队列(Work Queues / Task Queues)

在第2章的"Hello World"示例中,我们每发送一条消息,就会被一个消费者立即接收。但在实际应用中,我们往往需要处理一些耗时任务(如发送邮件、处理图片、生成报告等)。

工作队列(又称任务队列)的核心思想是避免立即执行资源密集型任务并等待其完成,而是将任务封装为消息并发送到队列中。在后台运行的多个工作进程(消费者)会从队列中取出消息并进行处理。

这种多个消费者从一个队列中获取消息的模式称为竞争消费者模式(Competing Consumers Pattern),它能很容易地实现并行处理,从而横向扩展系统。

2. 消息确认(Message Acknowledgment)

在默认的自动确认(autoAck: true)模式下,消息一旦被RabbitMQ传递给消费者,就会立即从队列中删除。这有一个严重的问题:如果消费者在处理消息过程中崩溃或断开连接,这条正在处理的消息就会永久丢失,而且无法被其他消费者重新处理。

为了解决这个问题,AMQP提供了消息确认机制:

  • 消费者在创建时设置 autoAck: false(手动确认模式)。

  • 当消费者成功处理完一条消息后,它会显式地向RabbitMQ发送一个确认(ACK)。

  • 只有在收到ACK后,RabbitMQ才会安全地从队列中删除该消息。

  • 如果消费者在处理过程中断开连接(没有发送ACK),RabbitMQ会认为该消息未被成功处理,并将其重新入队,然后传递给另一个消费者(如果存在)。

这种机制确保了即使消费者偶尔死亡,消息也不会丢失。

3. 消息持久化(Message Durability)

消息确认机制保护了消息在消费者处理时不丢失。但如果RabbitMQ服务器本身停止或崩溃了呢?默认情况下,RabbitMQ退出或崩溃时,它会忘记所有的队列和消息。

为了确保消息在服务器重启后仍然存在,我们需要做两件事:

  1. 将队列声明为持久的(Durable):这样队列本身会在服务器重启后继续存在。

  2. 将消息标记为持久的(Persistent):在发布消息时,设置 IBasicProperties.Persistent = true

注意:将消息标记为Persistent并不能完全保证消息永不丢失。虽然RabbitMQ会将消息保存到磁盘,但在它接收到消息和保存到磁盘之间仍然有一个很短的时间窗口。对于更强的保证,需要使用发布者确认(Publisher Confirms),这将在后续章节介绍。

4. 公平分发(Fair Dispatch)

默认情况下,RabbitMQ会使用轮询(Round-robin) 的方式将消息平均分发给所有消费者,而不考虑每个消费者当前未确认的消息数量。这可能导致一个问题:某些消息处理起来很耗时,而某些很快。如果一个繁忙的消费者前面堆积了很多未确认的消息,而空闲的消费者却得不到新任务,就会造成处理能力浪费。

为了解决这个问题,我们可以使用 basicQos 方法并设置 prefetchCount = 1。这告诉RabbitMQ不要一次向一个消费者发送超过一条消息。或者换句话说,在消费者处理并确认上一条消息之前,不要向其发送新消息。这样,RabbitMQ会将新消息分发给下一个空闲的消费者。


二、实操部分:构建可靠的工作队列

我们将创建一个任务发布者(NewTask)和多个工作者(Worker)。任务消息中的点号.数量代表其处理复杂度(每个点号耗时1秒)。

第1步:创建项目
  1. 创建一个新的解决方案。

  2. 添加两个控制台应用程序项目:NewTask (生产者) 和 Worker (消费者)。

  3. 为两个项目添加 RabbitMQ.Client NuGet包。

第2步:编写可靠的任务生产者(NewTask.cs)
csharp
using System.Text;
using RabbitMQ.Client;// 示例: dotnet run "Message."
//         dotnet run "Message.."
//         dotnet run "Message..." ...
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明一个持久化的队列channel.QueueDeclare(queue: "task_queue",durable: true,        // 队列持久化exclusive: false,autoDelete: false,arguments: null);// 2. 准备消息var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);// 3. 设置消息属性为持久化var properties = channel.CreateBasicProperties();properties.Persistent = true;// 4. 发布消息channel.BasicPublish(exchange: "",routingKey: "task_queue",basicProperties: properties, // 传入持久化属性body: body);Console.WriteLine($" [x] Sent {message}");
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();static string GetMessage(string[] args)
{return args.Length > 0 ? string.Join(" ", args) : "Hello World!";
}

关键更改:

  • QueueDeclare 中的 durable: true 确保队列在服务器重启后依然存在。

  • 创建了 IBasicProperties 对象并设置 Persistent = true,使消息本身也被标记为持久化。

第3步:编写可靠的工作者(Worker.cs)
csharp
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 声明持久化队列(必须与生产者声明参数一致)channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);// !!! 关键设置:公平分发 !!!// 告诉RabbitMQ,在当前工作者处理并确认上一条消息之前,不要向其发送新消息channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");// 模拟耗时任务,消息中的每个点号'.'代表1秒工作int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");// !!! 手动发送消息确认(ACK) !!!// 只有在任务处理完成后,才发送ACK,告知RabbitMQ可以安全删除消息channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};// 启动消费者,设置 autoAck: false (手动确认模式)channel.BasicConsume(queue: "task_queue",autoAck: false, // 关闭自动确认!consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

关键更改:

  • channel.BasicQos(0, 1, false): 设置公平分发,每个消费者一次只预取一条消息。

  • channel.BasicConsume(autoAck: false): 切换到手动确认模式。

  • 在 Received 事件处理程序的最后,调用 channel.BasicAck(...) 来显式确认消息处理完成。ea.DeliveryTag 是消息的唯一标识符。

  • 使用 Thread.Sleep 模拟耗时任务。

第4步:运行与演示
  1. 启动两个(或多个)工作者(Worker)
    打开两个终端窗口,分别运行 Worker 项目。

    bash
    cd Worker
    dotnet run

    两个窗口都会显示 [*] Waiting for messages.

  2. 发送任务
    运行 NewTask 项目来发送一些耗时不同的任务。

    bash
    cd NewTask
    dotnet run "First message."      # 耗时约1秒
    dotnet run "Second message.."     # 耗时约2秒
    dotnet run "Third message..."     # 耗时约3秒
    dotnet run "Fourth message...."   # 耗时约4秒
    dotnet run "Fifth message....."   # 耗时约5秒
  3. 观察现象

    • 你会看到任务被轮流分配给两个工作者(轮询分发)。

    • 但是,由于我们设置了 prefetchCount=1,当一个工作者正在处理一个长任务(例如5秒)时,RabbitMQ不会再给它发送新消息,而是会将新消息分发给另一个空闲的工作者。这就是公平分发的效果。

    • 查看管理后台(Queues),你会看到 "Unacked"(未确认)消息的数量。只有当工作者调用 BasicAck 后,这个消息才会消失。

  4. 演示消息确认的重要性

    • 让一个工作者正在处理一个长任务(比如5秒的任务)。

    • 在它处理过程中,强制关闭这个工作者的终端窗口(模拟消费者崩溃)。

    • 观察另一个工作者窗口和管理后台:刚才那条被中断处理的消息(状态为Unacked)会重新变为Ready,并被自动传递给另一个仍在运行的工作者进行处理。这样就保证了消息绝不会因为消费者崩溃而丢失。


本章总结

在这一章中,我们构建了一个可靠的工作队列系统,并深入学习了RabbitMQ的核心可靠性机制:

  1. 工作队列模式:使用多个消费者并行处理耗时任务。

  2. 消息确认(ACK):通过手动确认(autoAck: false)和 BasicAck,确保消息只有在被成功处理后才会被删除,防止消费者崩溃导致消息丢失。

  3. 消息与队列持久化:通过 durable: true 和 properties.Persistent = true,防止RabbitMQ服务器重启导致消息丢失。

  4. 公平分发(QoS):通过 BasicQos 和 prefetchCount: 1,优化任务分发,使空闲的消费者能优先获得新任务,提高整体处理效率。

现在,你已经能够构建一个健壮的、用于处理后台任务的分布式系统了。在下一章,我们将离开简单的队列模型,探索RabbitMQ更强大的功能——交换机(Exchange),学习如何实现发布/订阅模式,将一条消息投递给多个消费者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/909135.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React18新增的hook useId

useId 首先要理解SSR时,服务端和客户端的水合 服务端会生成一个HTML模版,和JS一起发给客户端,然后客户端的JS来“水合”HTML中的内容,转为可交互的组件。而官方文档中所说的“客户端组件被激活处理后的顺序可能与服…

十年架构演进史:从臃肿war包到云原生,我们终于解放了!

十年架构演进史:从臃肿war包到云原生,我们终于解放了!单体到微服务架构服务演化过程大家好,欢迎来到程序视点!我是你们的老朋友.安戈! 前言 各位技术人,不知道你们是否和我一样,每次打开一个老项目的代码仓库,…

week1作业

在 Java 中,我本学期主要遵循的编码规范如下: 类名我采用大驼峰命名法,每个单词的首字母大写,例如StudentInfo 、UserService。 方法名我采用小驼峰命名法,首单词首字母小写,其余单词首字母大写,如getStudentNa…

6-5 汇聚层

本章主要介绍汇聚层相关原理和实现1.最大汇聚层和平均汇聚层 import torch from torch import nn from d2l import torch as d2ldef pool2d(X, pool_size, mode=max):p_h, p_w = pool_sizeY = torch.zeros((X.shape[0]…

从IpadOS 26 Beta版切换成IpadOS 26 正式版

设置 - 通用 - 软件更新 - Beta版更新点击上图中的Beta版更新,进入如下图所示菜单,选择关闭返回至软件更新页面,点击IpadOS 26进行安装

2025.9.21总结

今天继续梳理已学习过的知识和学习路线。 对于已有技术,web开发,安卓开发而言。光凭借这些技术对于找到一份工作而言还是比较困难的。 而大三暑假的时候最长差不多能放出去半年的时间,也就是需要在寒假把简历打磨好…

6-4 多输入多输出通道

本章主要介绍通道多输入和多输出1.多输入通道 import torch from d2l import torch as d2l def corr2d_multi_in(X, K):# 先遍历 X 和 K 的第0个维度(通道维度),再把他们加在一起# 把所有配对得到的互相关结果逐元素…

6-6 卷积神经网络LeNet

本章主要介绍卷积神经网络LeNet的实现1.LeNet import torch from torch import nn from d2l import torch as d2lnet = nn.Sequential(nn.Conv2d(1, 6, kernel_size=5, padding=2), nn.Sigmoid(),nn.AvgPool2d(kernel_…

5-5读写文件

本章主要介绍将训练后的数据保存到文件中1.加载和保存张量 import torch from torch import nn from torch.nn import functional as Fx = torch.arange(4) # 把 Python 对象 x 打包成字节流,原封不动地写进文件 x-fi…

6-2图像卷积

本章主要介绍二维卷积和图像卷积的计算1.二维卷积计算 import torch from torch import nn from d2l import torch as d2l定义二维卷积函数 def corr2d(X, K):计算二维互相关运算h, w = K.shapeY = torch.zeros((X.sha…

二叉树的高度和判断平衡二叉树

LCR 176. 判断是否为平衡二叉树 利用递归得出结果,平衡二叉树成立的条件:左子树和右子树之差的绝对值小于等于 1,也就是当左子树高度 - 右子树高度的差值等于 0或者等于1的时候该平衡二叉树成立。 那么我们可以利用…

20250921 之所思 - 人生如梦

20250921 之所思一大早就收到老板要求每天晚上十点开会的信息,顿时心情很糟,因为晚上十点开会,开完就已经接近十二点,很害怕自己会彻夜难眠,然后起床就一直在想这件事,顿时整个早上的心情都受到了影响。还为这个…

UE5 Cook数据结构

UE Cook 数据结构 本篇讲非 MPCook 的数据结构1. FPackageDatas核心类 FPackageDatas 是管理 CookOnTheFlyServer 里的所有 PackageData 的列表的类。PackageDatas 是一个关联数组,存储 COTFS 需要的 package(如 coo…

通过微信对客服系统客户进行消息提醒,比如客户快过期了,访客发来的消息也是通过模板消息通知给客服

vx: llike620我的客服系统已经通过自己开发的形式实现了对接 客户服务到期提醒​​和​​客服消息通知​​——正是模板消息功能的典型和优秀应用案例。 作为开发者,您肯定关心如何将现有系统做得更健壮、更高效。以…

WPF治具软件模板分享 - Dragonet

目录WPF治具软件模板分享程序功能介绍功能实现导航功能程序配置日志功能界面介绍 WPF治具软件模板分享 运行环境:VS2022 .NET 8.0 完整项目:Gitee仓库 项目重命名方法参考:网页概要:针对治具单机软件制作了一个设…

基于WOA鲸鱼优化的XGBoost序列预测算法matlab仿真

1.算法运行效果图预览 (完整程序运行后无水印)2.算法运行软件版本 matlab2024b3.部分核心程序 (完整版代码包含详细中文注释和操作步骤视频)%最大迭代次数 paramters.maxiter = 50; paramters.tr…

软件工程第二次作业——个人项目

这个作业属于哪个课程 https://edu.cnblogs.com/campus/gdgy/Class12Grade23ComputerScience这个作业要求在哪里 https://edu.cnblogs.com/campus/gdgy/Class12Grade23ComputerScience/homework/13468这个作业的目标 &…

微信扫码二维码,关注绑定公众号提醒,利用微信公众号的模板消息进行消息通知的推送

gofly.v1kf.com vx: llike620我的客服系统已经通过自己开发的形式实现了对接希望通过微信扫码关注公众号,并利用模板消息功能实现消息推送。这是一个非常实用的需求,尤其在服务通知和用户互动方面能极大提升体验。其…

Arch下实现人脸识别登录:howdy的配置与使用

安装Howdy 查阅Arch Linux中文Wiki的教程[1]可知:howdy包已无法在最新的Arch Linux上正常使用,推荐安装howdy-git包那么,就 yay -S howdy-git查看摄像头路径 如果电脑只有一个摄像头的话,一般而言,摄像头的路径是…

fedora无法看视频?编解码器详细安装教程【转发】

fedora无法看视频?编解码器详细安装教程【转发】原文:https://zhuanlan.zhihu.com/p/26494803528 启用rpm fusion 包 free包: sudo dnf install https://download1.rpmfusion.org/free/fedora/rpmfusion-free-relea…