发布/订阅(Publish/Subscribe)与交换机(Exchange)

 本章目标

  • 理解交换机(Exchange)在RabbitMQ中的核心作用。

  • 掌握发布/订阅模式(Publish/Subscribe)的实现。

  • 学习扇形交换机(Fanout Exchange)的使用。

  • 理解绑定(Binding)的概念。

  • 实现一个日志广播系统。


一、理论部分

1. 交换机(Exchange)简介

在前面的章节中,我们一直使用默认交换机(空字符串""),生产者直接将消息发送到队列。但RabbitMQ的真正强大之处在于它的交换机机制。

交换机是消息的入口点。生产者将消息发送到交换机,而不是直接发送到队列。交换机根据特定的规则和类型,决定将消息路由到哪些队列中。

2. 交换机类型

RabbitMQ提供了几种不同类型的交换机,每种都有不同的路由行为:

  • 扇形交换机(Fanout Exchange):将消息广播到所有绑定到它的队列中(忽略路由键)。本章重点。

  • 直连交换机(Direct Exchange):根据精确匹配的路由键将消息路由到队列(我们在第2章使用的默认交换机就是直连类型)。

  • 主题交换机(Topic Exchange):基于模式匹配的路由(使用通配符)。

  • 头交换机(Headers Exchange):基于消息头属性而不是路由键进行路由。

3. 发布/订阅模式(Publish/Subscribe)

发布/订阅模式的核心思想是:一条消息被分发给多个消费者。每个消费者都会收到相同的消息副本。这非常适合需要将同一信息通知给多个不同系统的场景,比如:

  • 日志广播系统

  • 新闻推送

  • 系统事件通知

  • 缓存更新通知

4. 绑定(Binding)

绑定是交换机和队列之间的连接。你可以理解为:"这个队列对这个交换机的消息感兴趣"。当我们创建绑定时,可以指定一个绑定键(Binding Key),交换机用它来决定哪些消息应该被路由到这个队列。

对于扇形交换机,绑定键会被忽略,所有绑定到该交换机的队列都会收到消息。


二、实操部分:构建日志广播系统

我们将创建一个日志系统,其中:

  • 一个生产者发送日志消息到扇形交换机。

  • 多个消费者分别将日志保存到文件、打印到控制台等,每个消费者都会收到所有的日志消息。

第1步:创建项目

  1. 创建一个新的解决方案。

  2. 添加一个控制台应用程序项目作为生产者:EmitLog

  3. 添加两个控制台应用程序项目作为消费者:ReceiveLogs(用于控制台输出)和SaveLogs(模拟保存到文件)。实际中可以创建更多消费者。

  4. 为所有项目添加RabbitMQ.Client NuGet包。

第2步:编写日志生产者(EmitLog.cs)

using System.Text;
using RabbitMQ.Client;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明一个扇形交换机(Fanout Exchange)// 参数说明:// exchange: "logs" - 交换机的名称// type: "fanout" - 交换机类型// durable: false - 是否持久化(服务器重启后是否存在)// autoDelete: false - 当所有队列都解绑后,是否自动删除交换机channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);// 2. 准备消息(从命令行参数获取或使用默认消息)var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);// 3. 发布消息到交换机(而不是队列!)// 关键变化:指定exchange参数为"logs",而不是空字符串// 对于fanout交换机,routingKey会被忽略,但通常我们还是提供一个有意义的键channel.BasicPublish(exchange: "logs",routingKey: "", // 对于fanout交换机,这个值被忽略basicProperties: null,body: body);Console.WriteLine($" [x] Sent to exchange 'logs': {message}");
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();static string GetMessage(string[] args)
{return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
}

关键点:

  • 使用ExchangeDeclare方法声明一个名为logs的扇形交换机。

  • BasicPublish中指定exchange: "logs",而不是之前的空字符串。

  • 对于扇形交换机,routingKey参数被忽略,但我们仍然提供它。

第3步:编写第一个日志消费者(ReceiveLogs.cs)- 控制台输出

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明扇形交换机(必须与生产者使用相同的交换机名称和类型)channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);// 2. 声明一个临时队列// 关键点:我们不指定队列名称,让RabbitMQ生成一个随机名称// exclusive: true - 当连接关闭时,队列会被自动删除var queueName = channel.QueueDeclare().QueueName;// 3. 将队列绑定到交换机// 对于fanout交换机,bindingKey被忽略(这里用空字符串)
    channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");Console.WriteLine($" [*] Waiting for logs. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [CONSOLE] {message}");};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

关键点:

  • 使用QueueDeclare()而不指定队列名,让RabbitMQ生成一个唯一的、随机的队列名。

  • 设置exclusive: true(默认值),这样当消费者断开连接时,队列会被自动删除。

  • 使用QueueBind将队列绑定到logs交换机。

第4步:编写第二个日志消费者(SaveLogs.cs)- 模拟保存到文件

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 同样的步骤:声明交换机、创建临时队列、绑定channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");Console.WriteLine($" [*] Waiting for logs to save. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 模拟将日志保存到文件(这里只是打印,实际中可以写入文件)Console.WriteLine($" [x] [FILE] Saved log: {message} - {DateTime.Now:yyyy-MM-dd HH:mm:ss}");// 模拟文件写入的延迟Thread.Sleep(500);};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

这个消费者与第一个几乎相同,只是模拟了不同的处理逻辑(保存到文件)。

第5步:运行与演示

  1. 启动多个消费者
    打开三个终端窗口:

    • 窗口1:运行ReceiveLogs(控制台输出消费者)

    • 窗口2:运行SaveLogs(文件保存消费者)

    • 窗口3:运行ReceiveLogs(另一个控制台输出消费者)

    每个消费者启动时都会显示一个随机生成的队列名,如:

     
    [*] Waiting for logs. Queue: amq.gen-JzTY20BRgKO-HjmUJj0wLg
  2. 查看管理后台
    访问 http://localhost:15672,查看Exchanges标签页:

    • 你会看到新创建的logs交换机,类型为fanout

    • 点击logs交换机,在绑定(Bindings)部分,你会看到3个队列绑定到了这个交换机。

  3. 发送日志消息
    在另一个终端运行EmitLog项目:

    cd EmitLog
    dotnet run "User john.doe logged in successfully"
    dotnet run "Warning: Database connection pool 80% full"
    dotnet run "Error: Payment service timeout after 30 seconds"
  4. 观察现象

    • 所有三个消费者都会收到并处理每一条消息!

    • 消息被广播到了所有绑定到logs交换机的队列。

    • 每个消费者可以以不同的方式处理同一条消息。

  5. 动态测试

    • 在生产者运行期间,启动第四个消费者。

    • 你会发现新启动的消费者只能收到之后发送的消息,而无法收到之前已经发送的消息。这是因为扇形交换机只负责将当前和未来的消息广播给所有绑定的队列。


本章总结

在这一章中,我们深入学习了RabbitMQ的核心组件——交换机,并实现了强大的发布/订阅模式:

  1. 交换机(Exchange):理解了交换机作为消息入口点的作用,以及它与队列的关系。

  2. 扇形交换机(Fanout Exchange):掌握了最简单的交换机类型,它将消息无条件地广播到所有绑定的队列。

  3. 发布/订阅模式:实现了一条消息被多个消费者同时接收的场景。

  4. 临时队列:学习了如何创建匿名队列,用于短暂的发布/订阅场景。

  5. 绑定(Binding):理解了队列和交换机之间的连接关系。

现在你已经能够构建消息广播系统了。在下一章,我们将学习直连交换机(Direct Exchange),实现有选择性的消息路由——让消费者只接收它们感兴趣的消息(比如只处理error级别的日志)。这将使我们的消息系统更加精细和高效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/910815.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线性结构之链表

离散存储[链表]:定义:n个结点的离散分配彼此通过指针相连每个结点只有一个前续结点每个结点只有一个后续结点首结点没有前续结点尾结点没有后续结点专业术语:首结点:第一个有效结点,存放第一个有效数据尾结点:最…

高职教育双高建设网站佛山建设网站公司哪家好

1、基础用法就不再赘述了,重要的属性配置: Disable Catalog Update on Startup:禁用时在初始化Addressables的时候自动更新远程的catalog(启用后可以通过代码 Addressables.CheckForCatalogUpdates()更新) Use…

营销型网站软件建立网站需要多少钱就蓷y湖南岚鸿推荐

目录 一、为什么要有动态内存分配​ 二、C/C中程序内存区域划分​ 三、malloc和free​ 2.1、malloc 2.2、free​ 四、calloc和realloc​ 3.1、calloc​ 3.2、realloc​ 3.3realloc在调整内存空间的是存在两种情况: 3.4realloc有malloc的功能 五、常见的动…

简单的购物网站设计大型免费网页游戏排行榜

SH文件介绍 介绍SH文件示例执行SH文件具体用法 介绍 SH文件通常指的是 Shell 脚本文件,文件后缀名为.sh,其中包含一系列要由操作系统的命令解释器执行的命令。 在 Unix 和类 Unix 操作系统中,通常使用 Bourne Shell(sh&#xff…

博客网站建设源码上海互联网推广找哪家

打卡记录 同积元组&#xff08;哈希表 排列组合&#xff09; 链接 思路&#xff1a;用哈希表将数组中出现的两不同数乘积依次记录&#xff0c;将出现两次以上的乘积组通过排列组合计算总情况个数。 class Solution { public:int tupleSameProduct(vector<int>& num…

分类信息网站如何优化wordpress会员通知插件

一、Array方法 方法参数返回值描述以下Api会修改原属组pushitem1, item2, ...数组新长度向数组的末尾添加一个或多个元素&#xff0c;并返回新的长度pop删除的元素删除数组的最后一个元素&#xff0c;并返回删除的元素unshiftitem1,item2, ...数组新长度向数组的开头添加一个或…

展示型网站一样做seo优化网站维护合同范本

原网址&#xff1a;http://pichcar.iteye.com/blog/676292 URL中的特殊字符 有些符号在URL中是不能直接传递的&#xff0c;如果要在URL中传递这些特殊符号&#xff0c;那么就要使用他们的编码了。编码的格式为&#xff1a;%加字符的ASCII码&#xff0c;即一个百分号%&#xff0…

网站建设和管理是教什么科目成都房地产政策

服务器数据恢复环境&#xff1a; 某单位一台DS5300存储&#xff0c;1个主机4个扩展柜&#xff0c;组建了2组RAID5&#xff08;一组27块硬盘&#xff0c;一组23块盘&#xff09;。27块盘的那组RAID5阵列存放Oracle数据库文件&#xff0c;存储系统一共分了11个卷。 服务器故障&a…

可以做手机网页的网站个人怎么进行网站建设

一、点查看自定义快捷键可以定义一些快速启动方式 然后用不习惯的快捷键也能在这里改 二、android studio 快捷键导出备份 导入方法&#xff1a; android studio &#xff0d;>file->import setting ->选择jar包即可 导出studio的设置方法&#xff1a; android …

AI 编程“效率幻觉”:为何你感觉快了,项目却慢了?

AI 编码工具普及,但为何开发者感觉很快,实际项目却变慢了?本文深入剖析 AI 编程“感知差距”背后的根源,探讨如何通过结构化输入,真正释放 AI 潜能。一、AI 编程的“速度与激情”背后 2025 年,如果你问一个开发者…

lc1033-移动石子直到连续

难度:中等(伪境)题目描述数轴上有三块石子,最左边的石子可以向右移,但不能越过最右边的石子 不能放在有石子的地方最右侧的石子同理 每次只能移动一块石子,在上面规则下不限制距离 问从初始位置到三块石子相邻,…

广东平台网站建设平台小学校园网站建设方案

我的世界换肤教程&#xff0c;本篇教程记录如何使用MCSkin 3D软件、皮肤使用、皮肤预览图制作等相关教程。感兴趣的小伙伴们可以来看看这篇我的世界皮肤教程。一.MCSkin 3D1.4软件介绍及教程&#xff1a;1. 视图-3D设置-屏蔽部位半透明显示在绘制其他部位时&#xff0c;可以用半…

苏州建站公司兴田德润简介呢wordpress 采集发布

大家根据电脑系统的位数&#xff0c;选择 32 位的 VM 配置文件或者 64 位的 VM 配置文件32 位操作系统内存不会超过 4G&#xff0c;所以没有多大空间可以调整&#xff0c;建议不用调整了64 位操作系统中 8G 内存以下的机子或是静态页面开发者是无需修改的。64 位操作系统且内存…

淄博网站制作服务网站名称是什么

人民邮电出版社图灵公司介绍&#xff08;来自http://www.turingbook.com/&#xff09; 北京图灵文化发展有限公司成立于2005年6月&#xff0c;由人民邮电出版社投资控股&#xff0c;以策划出版高质量的科技书籍为核心业务&#xff0c;主要出版领域包括计算机、电子电气、数学统…

滕州网站优化宁波创建网站

一、存储引擎概念介绍 MySQL中的数据用各种不同的技术存储在文件中&#xff0c;每一种技术都使用不同的存储机制、索引技巧、锁定水平并最终提供不同的功能和能力&#xff0c;这些不同的技术以及配套的功能在MySQL中称为存储引擎 存储引擎是MySQL将数据存储在文件系统中的存储…

手机网站加速器w7自己做网站

可以以电脑浏览器的手机模式打开&#xff0c;也可以在手机浏览器中直接打开 游戏运用了Canvas的drawImage&#xff0c;translate&#xff0c;rotate&#xff0c;save&#xff0c;restore&#xff0c;fillRect等API。 采用中介者模式&#xff0c;Game类统领全局&#xff0c;负责…

网站研发进度表下载零遁nas做网站

一、环境准备&#xff1a; 1、安装appium 2、xcode (appium 版本&#xff1a;12.1.0 xcode版本&#xff1a;12.5 可正常运行&#xff0c;ps:appium 版本&#xff1a;12.1.0 xcode版本&#xff1a;13.0 一直报奇奇怪怪的错误&#xff09; 3、依赖工具包安装 brew install…

网站建设mvc三层框架图网站模板下载破解版

Transformer模式是Java&#xff08;以及可能仅具有使用场所差异和不变参数类型的其他OO语言&#xff09;的设计模式&#xff0c;可帮助子类型层次结构内的对象将自己流畅地转换为任何类型的对象。 语境 我一直在关注与Jim Laskey发行的JDK-8203703相关的OpenJDK线程&#xff…

专业做网站制作paypal网站做外贸

我发现org.apache.commons.pool非常有用且健壮&#xff0c;但没有充分记录。 因此&#xff0c;我将在这里帮助您解释如何使用Apache KeyedObjectPool 。 什么是KeyedObjectPool &#xff1f; 它是一个映射&#xff0c;其中包含多种类型的实例池。 可以使用任意键访问每种类型。…

大连网站推广爱得科技公司建网站怎么建

Python数据库编程实战&#xff1a;sqlite3模块详解 在Python中&#xff0c;数据库编程是一项重要且实用的技能。通过数据库&#xff0c;我们可以高效地存储、检索和管理大量数据。Python提供了多种数据库编程接口&#xff0c;其中sqlite3模块是一个轻量级的关系型数据库引擎&a…