【RabbitMQ】主题(Topics)与主题交换机(Topic Exchange)

news/2025/9/26 18:19:34/文章来源:https://www.cnblogs.com/jixingsuiyuan/p/19114022

本章目标

  • 理解主题交换机(Topic Exchange)的强大路由能力。

  • 掌握通配符*#的使用规则。

  • 学习基于模式匹配的复杂消息路由。

  • 实现一个支持多维度过滤的智能消息系统。


一、理论部分

1. 主题交换机(Topic Exchange)简介

主题交换机是RabbitMQ中最灵活也是最强大的交换机类型。它结合了扇形交换机的广播能力和直连交换机的精确匹配能力,同时引入了模式匹配的概念。

主题交换机的工作方式:

  • 消息仍然带有路由键(Routing Key),但路由键必须是由点号分隔的单词列表(如:usa.newseurope.weather.alert)。

  • 队列通过绑定键(Binding Key) 绑定到交换机,绑定键也使用相同的点号分隔格式。

  • 绑定键支持两种通配符进行模式匹配。

2. 通配符规则

主题交换机的强大之处在于绑定键支持通配符:

  • *(星号):匹配恰好一个单词

    • 示例:*.orange.* 可以匹配 quick.orange.rabbit,但不能匹配 quick.orange.fox.jumps

  • #(井号):匹配零个或多个单词

    • 示例:lazy.# 可以匹配 lazylazy.foxlazy.brown.foxlazy.pink.fox.jumps.over

3. 路由键格式最佳实践

路由键通常采用层次结构,便于模式匹配:

  • <facility>.<severity>auth.infokernel.error

  • <region>.<service>.<event>usa.payment.successeurope.order.cancelled

  • <category>.<subcategory>.<action>news.sports.updateweather.alert.severe

4. 使用场景

主题交换机适用于需要复杂、灵活的消息路由场景:

  • 新闻订阅系统:用户可以根据兴趣订阅特定主题(如sports.**.finance

  • 物联网设备监控:按设备类型、地理位置、告警级别路由消息

  • 微服务事件总线:基于事件类型和来源进行精细路由


二、实操部分:构建智能新闻分发系统

我们将构建一个新闻分发系统,其中:

  • 生产者发送带有分类路由键的新闻消息

  • 消费者可以根据兴趣订阅特定模式的新闻

第1步:创建项目

  1. 创建一个新的解决方案。

  2. 添加一个控制台应用程序项目作为生产者:EmitLogTopic

  3. 添加多个消费者项目:

    • ReceiveNewsAll - 接收所有新闻

    • ReceiveSportsNews - 接收所有体育新闻

    • ReceiveUSNews - 接收所有美国新闻

    • ReceiveCriticalAlerts - 接收所有紧急警报

    • ReceiveWeatherUpdates - 接收所有天气更新

  4. 为所有项目添加RabbitMQ.Client NuGet包。

第2步:编写新闻生产者(EmitLogTopic.cs)

using System.Text;
using RabbitMQ.Client;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 声明主题交换机channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);// 路由键格式:<category>.<region>.<severity>// 示例:news.usa.info, sports.europe.alert, weather.asia.criticalvar routingKey = (args.Length > 0) ? args[0] : "anonymous.info";var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "topic_logs",routingKey: routingKey,basicProperties: null,body: body);Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

第3步:编写接收所有新闻的消费者(ReceiveNewsAll.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 使用 # 匹配所有消息channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "#");Console.WriteLine($" [*] Waiting for ALL news. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [ALL] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

第4步:编写接收体育新闻的消费者(ReceiveSportsNews.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 匹配所有体育相关的新闻:sports.*.*channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "sports.#");Console.WriteLine($" [*] Waiting for SPORTS news. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [SPORTS] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

第5步:编写接收美国新闻的消费者(ReceiveUSNews.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 匹配所有美国相关的新闻:*.usa.*channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.usa.*");Console.WriteLine($" [*] Waiting for USA news. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [USA] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

第6步:编写接收紧急警报的消费者(ReceiveCriticalAlerts.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 匹配所有紧急级别的消息:*.*.criticalchannel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.*.critical");Console.WriteLine($" [*] Waiting for CRITICAL alerts. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [CRITICAL] Received '{ea.RoutingKey}':'{message}'");Console.WriteLine("    -> Sending emergency notification!");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

第7步:编写接收天气更新的消费者(ReceiveWeatherUpdates.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 匹配所有天气相关的更新:weather.*// 一个队列可以绑定多个模式channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "weather.#");channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.alert"); // 也接收所有警报
Console.WriteLine($" [*] Waiting for WEATHER updates and ALERTS. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [WEATHER/ALERT] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

第8步:运行与演示

  1. 启动所有消费者
    打开六个终端窗口,分别运行所有消费者程序。

  2. 发送各种类型的新闻消息

    cd EmitLogTopic# 发送体育新闻
    dotnet run "sports.usa.score" "Team USA wins gold medal"
    dotnet run "sports.europe.update" "Champions League finals scheduled"# 发送美国相关新闻
    dotnet run "news.usa.politics" "Election results announced"
    dotnet run "tech.usa.innovation" "Silicon Valley startup raises $10M"# 发送紧急警报
    dotnet run "weather.usa.critical" "Tornado warning for Midwest"
    dotnet run "safety.europe.critical" "Security alert: System maintenance"# 发送天气更新
    dotnet run "weather.asia.update" "Monsoon season begins"
    dotnet run "news.europe.alert" "Breaking: Major announcement"# 发送其他消息
    dotnet run "entertainment.hollywood.gossip" "Celebrity wedding announced"
  3. 观察路由结果并分析模式匹配

    消息路由键ALLSPORTSUSACRITICALWEATHER/ALERT
    sports.usa.score
    sports.europe.update
    news.usa.politics
    tech.usa.innovation
    weather.usa.critical
    safety.europe.critical ✅ (*.alert)
    weather.asia.update
    news.europe.alert ✅ (*.alert)
    entertainment.hollywood.gossip
  4. 测试复杂场景

    • 发送 weather.alert.severe.critical - 观察哪些消费者能收到

    • 发送 sports.alert - 测试多个模式的匹配

    • 在管理后台查看绑定关系,理解通配符的实际效果

第9步:通配符规则详解示例

为了更好理解通配符,让我们看一些匹配示例:

绑定键 *.orange.* 的匹配情况:

  • ✅ quick.orange.rabbit (匹配)

  • ✅ lazy.orange.elephant (匹配)

  • ❌ quick.orange.fox.lazy (不匹配 - 四个单词)

  • ❌ orange (不匹配 - 只有一个单词)

  • ❌ quick.brown.fox (不匹配 - 中间不是orange)

绑定键 lazy.# 的匹配情况:

  • ✅ lazy (匹配)

  • ✅ lazy.fox (匹配)

  • ✅ lazy.brown.fox (匹配)

  • ✅ lazy.pink.fox.jumps.over (匹配)

  • ❌ quick.lazy.fox (不匹配 - 第一个单词不是lazy)


本章总结

在这一章中,我们深入学习了RabbitMQ中最强大的主题交换机,掌握了基于模式匹配的复杂消息路由:

  1. 主题交换机(Topic Exchange):理解了基于通配符的模式匹配路由机制。

  2. 通配符规则:掌握了*(匹配一个单词)和#(匹配零个或多个单词)的使用方法。

  3. 路由键设计:学习了使用点号分隔的层次化路由键设计最佳实践。

  4. 复杂路由场景:实现了支持多维度过滤的智能新闻分发系统。

  5. 多重模式绑定:掌握了单个队列绑定多个模式的高级用法。

主题交换机提供了无与伦比的灵活性,是构建复杂事件驱动系统的理想选择。在下一章,我们将转向另一个重要主题:消息可靠性保障,学习如何确保消息在复杂的分布式环境中绝不丢失。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/918657.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

企业网站推广技巧有哪些怎样做免费网站推广

1.你说一下什么是分布式锁 分布式锁是一种在分布式系统环境下实现的锁机制&#xff0c;它主要用于解决&#xff0c;多个分布式节点之间对共享资源的互斥访问问题&#xff0c;确保在分布式系统中&#xff0c;即使存在有多个不同节点上的进程或线程&#xff0c;同一时刻也只有一…

详细介绍:八股已死、场景当立(微服务保护篇)

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

Ubuntu上编译 Linux_RT 内核

目录一、编译安装1. 下载 Linux 内核源码和对应版本的 preempt_rt 补丁源码2. 解压及安装依赖项2.1 安装依赖2.2 解压缩文件并打补丁3. 自定义部分编译配置3.1 生成相关的内核配置文件3.2 修改调整内核的一些配置项4. …

做淘宝的网站的多少钱开发app的过程

热门推荐 &#xff08;1&#xff09;即将直播持续集成与交付&#xff1a;分层自动化之UI自动化体系建设直播简介&#xff1a;本系列直播由阿里旗下一站式研发提效平台云效策划推出&#xff0c;主要为大家详细介绍阿里巴巴在持续集成和持续交付的最佳实践。 直播讲师&#xff1a…

vue3 + vite Cannot access ‘xxx‘ before initialization

vue3 + vite Cannot access ‘xxx‘ before initialization 是用于循环引用造成的,不建议循环引用,所以遇到这种情况要优化代码

《“悬荡”于理想与现实之间:一份关于人机共生未来的思想实验评估》

《“悬荡”于理想与现实之间:一份关于人机共生未来的思想实验评估》 对这篇《元人文AI:价值共生时代的技术哲学与创新实践》的分析是否客观,需要从多个维度进行综合评估。总的来说,该分析在理论构建的深度、体系的…

区别:RS-232、RS-422、RS-485

RS-232、RS-422、RS-485博客园文作者:Citrusliu博文地址:https://www.cnblogs.com/citrus

解决字符串数组中大整数精度问题

示例:[{"specId": 3140724743078936585, "quantity": 1, "specName": "箱"}, {"specId": 3140724798770905093, "quantity": 10, "specName"…

软文发布门户网站太原seo霸屏

“八股文”在实际工作中是助力、阻力还是空谈&#xff1f; 作为现在各类大中小企业面试程序员时的必问内容&#xff0c;“八股文”似乎是很重要的存在。但“八股文”是否能在实际工作中发挥它“敲门砖”应有的作用呢&#xff1f;有IT人士不禁发出疑问&#xff1a;程序员面试考…

playwright-mcp入门

npm install -g @executeautomation/playwright-mcp-server npm install -g @playwright/mcp 配置-方式1 npx @playwright/mcp@latest --port 8931{"mcpServers": {"playwright": {"url"…

【征文计划】深度剖析 Rokid SLAM 算法:从传感器融合到空间重建的完整技术链路 - 实践

【征文计划】深度剖析 Rokid SLAM 算法:从传感器融合到空间重建的完整技术链路 - 实践pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important;…

国信DRS数据恢复中心成为东芝(TOSHIBA)存储硬盘的数据恢复合作服务商

国信DRS数据恢复中心可在不影响原厂硬件质保的情况下打开密封的硬盘驱动器以便恢复数据,东芝硬盘用户可享有我中心数据恢复服务20%的折扣优惠。 如果您的硬盘驱动器硬件损坏无法正常识别或读取异常,数据误删除、误分…

深入解析Windows注册表regf文件格式

本文详细解析了Windows注册表使用的regf二进制文件格式,涵盖基础块、存储桶和单元格结构,探讨了安全描述符、子键索引等关键组件的实现细节及其在历史漏洞中的作用,为安全研究人员提供深入的技术参考。Windows注册表…

华米运动步数修改,每天自动修改并同步 微信运动/支付宝运动 步数

只支持使用 脚本猫 扩展在浏览器后台定时运行脚本主页:https://scriptcat.org/zh-CN/script-show-page/4285此脚本一直为 开源免费 使用,如果你是从某些地方买的话,你就是被骗了# 温馨提示使用 Zepp 或 Zepp Life 注…

两个路由器做双网站安卓程序开发用什么软件

iframe基本内涵 通常我们使用iframe直接直接在页面嵌套iframe标签指定src就可以了。 <iframe src"demo_iframe_sandbox.htm"></iframe> 但是&#xff0c;有追求的我们&#xff0c;并不是想要这么low的iframe. 我们来看看在iframe中还可以设置些什么属…

建设酒店网站ppt模板下载html底部友情链接代码

python的开发者为处理表格和画图提供了库的支持&#xff0c;使用pandas库可以轻松完成对csv文件的读写操作&#xff0c;使用matplotlib库提供了画热力图的各种方法。实现这个功能首先需要读出csv数&#xff0c;然后设置自定义色条的各种属性如颜色&#xff0c;位置&#xff0c;…

IMU-坐标系-位姿

坐标坐标系 1.判定坐标系:大拇指指向 Z 轴,看四指环绕方向,如果是 X 指向 Y,就是右手系。右手大拇指指向z轴方向,其余四指由x轴握向y轴方向,如果成功,那么判定为右手系。左手大拇指指向z轴方向,其余四指由x轴握…

在 Nginx Docker 官方镜像中编译并加入第三方模块 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

计算机毕业设计springboot考研资讯管理系统 基于Spring Boot的考研信息管理平台设计与达成 Spring Boot驱动下的研究生入学考试资讯管理系统开发

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

登录 Linux 自动展示 CPU/内存/磁盘挂载使用情况等信息(针对于银河麒麟调整的)

1、编写脚本 创建脚本: vi /etc/profile.d/sysinfo.sh脚本内容: #!/bin/bash set -e# 颜色 GREEN="\033[1;32m" YELLOW="\033[1;33m" CYAN="\033[1;36m" RESET="\033[0m"# …