RabbitMQ 消息消费模式深度解析

本文深入探讨 RabbitMQ 中 Exchange、Queue、Routing Key 的协作机制,以及不同场景下的消息消费策略。

一、核心概念回顾

RabbitMQ 消息流转的核心链路:

1.1 Exchange 类型

类型特点使用场景
direct精确匹配 routing key点对点消息,精确路由
topic通配符匹配 routing key(*#灵活路由,多级分类
fanout忽略 routing key,广播到所有绑定队列广播通知,事件发布
headers基于消息头属性匹配复杂路由条件

1.2 关键关系图


二、一个 Exchange 对应多个 Routing Key

2.1 设计模式

一个 Exchange 可以通过不同的 Routing Key 路由到不同的队列,这是推荐的最佳实践

mq: order: delete: exchange: order.delete.exchange routingKey: deleteAll: order.routing.delete.all # 删除全部 deletePart: order.routing.delete.partial # 部分删除

2.2 架构示意

2.3 优势

  • 逻辑聚合:同一业务域的消息统一管理

  • 灵活路由:消费者按需订阅

  • 易于扩展:新增类型只需添加 routing key

  • 资源节约:减少 Exchange 数量


三、消息竞争 vs 消息广播

这是理解 RabbitMQ 消费逻辑的核心问题

3.1 场景一:消息竞争(Work Queue 模式)

多个消费者绑定同一个 Queue→ 消息只会被其中一个消费者处理

⚠️ 消息只会被 A 或 B 其中一个消费(轮询分发)

适用场景

  • 任务分发、负载均衡

  • 耗时任务的并行处理

  • 同一服务的多实例部署

代码示例

// 多个消费者订阅同一队列 - 竞争消费 @Service public class OrderProcessorA { @RabbitListener(queues = "order.process.queue") public void handleOrder(String message) { // 处理订单 - 与其他消费者竞争 log.info("实例A处理消息: {}", message); } } ​ @Service public class OrderProcessorB { @RabbitListener(queues = "order.process.queue") // 同一队列 public void handleOrder(String message) { // 处理订单 - 与其他消费者竞争 log.info("实例B处理消息: {}", message); } }

3.2 场景二:消息广播(Pub/Sub 模式)

每个消费者绑定独立的 Queue→ 所有消费者都能收到消息

✅ 两个服务都能收到完整的消息副本!

适用场景

  • 事件通知(用户登录、订单创建)

  • 数据同步(多系统数据一致性)

  • 日志收集(多个系统记录同一事件)


四、实现广播的三种方式

4.1 方式一:Topic Exchange + 独立队列

每个服务创建自己的队列,使用相同的 routing key 绑定。

@Configuration public class MqConfig { @Value("${mq.user.login.exchange}") private String loginExchange; @PostConstruct public void initMq() { Channel channel = connection.createChannel(); // 声明 topic 类型 exchange channel.exchangeDeclare(loginExchange, "topic", true); // 服务A 绑定自己的队列 channel.queueDeclare("service-a.login.queue", true, false, false, null); channel.queueBind("service-a.login.queue", loginExchange, "user.login.event"); // 服务B 绑定自己的队列(相同 routing key) channel.queueDeclare("service-b.login.queue", true, false, false, null); channel.queueBind("service-b.login.queue", loginExchange, "user.login.event"); } }

4.2 方式二:Fanout Exchange(推荐用于纯广播)

Fanout 类型忽略 routing key,消息直接广播到所有绑定的队列。

@PostConstruct public void initBroadcastMq() { Channel channel = connection.createChannel(); // 声明 fanout 类型 - 广播模式 channel.exchangeDeclare("user.logout.fanout", "fanout", true); // 各服务绑定自己的队列(routing key 为空) channel.queueDeclare("order-service.logout.queue", true, false, false, null); channel.queueBind("order-service.logout.queue", "user.logout.fanout", ""); channel.queueDeclare("cart-service.logout.queue", true, false, false, null); channel.queueBind("cart-service.logout.queue", "user.logout.fanout", ""); channel.queueDeclare("session-service.logout.queue", true, false, false, null); channel.queueBind("session-service.logout.queue", "user.logout.fanout", ""); }

4.3 方式三:临时队列(适合临时消费者)

使用自动生成的队列名,服务停止后队列自动删除。

// 自动生成唯一队列名 String queueName = channel.queueDeclare().getQueue(); // 如: amq.gen-JzTY20BRgKO-HjmUJj0wLg ​ // 绑定到 exchange channel.queueBind(queueName, "event.exchange", "order.created"); ​ // 服务停止后,队列自动删除

五、配置示例与最佳实践

5.1 完整配置示例

mq: # 基础配置 host: 127.0.0.1 username: admin password: admin123 # 业务 Exchange 配置 order: # 订单处理 - 竞争消费模式 process: exchange: order.process.exchange queue: order.process.queue # 多实例共享队列 routingKey: order.routing.process # 订单删除 - 多类型路由 delete: exchange: order.delete.exchange routingKey: deleteAll: order.routing.delete.all deletePart: order.routing.delete.partial # 用户事件 - 广播模式 user: logout: exchange: user.logout.fanout # fanout 类型 queue: ${spring.application.name}.logout.queue # 每个服务独立队列 login: exchange: user.login.fanout queue: ${spring.application.name}.login.queue

5.2 Exchange 初始化代码

@Service public class RabbitMqInitializer { private static final Logger log = LoggerFactory.getLogger(RabbitMqInitializer.class); @Resource private ConnectionFactory connectionFactory; @Value("${mq.order.process.exchange}") private String processExchange; @Value("${mq.order.process.queue}") private String processQueue; @Value("${mq.order.process.routingKey}") private String processRoutingKey; @Value("${mq.user.logout.exchange}") private String logoutExchange; @Value("${mq.user.logout.queue}") private String logoutQueue; @PostConstruct public void initializeQueues() { try { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // ========== 订单处理 - Topic Exchange ========== channel.exchangeDeclare(processExchange, "topic", true, false, null); channel.queueDeclare(processQueue, true, false, false, null); channel.queueBind(processQueue, processExchange, processRoutingKey); log.info("订单处理MQ初始化完成 - Exchange: {}, Queue: {}", processExchange, processQueue); // ========== 用户登出 - Fanout Exchange (广播) ========== channel.exchangeDeclare(logoutExchange, "fanout", true, false, null); channel.queueDeclare(logoutQueue, true, false, false, null); channel.queueBind(logoutQueue, logoutExchange, ""); // fanout 不需要 routing key log.info("用户登出MQ初始化完成 - Exchange: {}, Queue: {}", logoutExchange, logoutQueue); channel.close(); } catch (Exception e) { log.error("MQ初始化失败", e); } } }

5.3 消息生产者

@Service public class MessageProducer { private static final Logger log = LoggerFactory.getLogger(MessageProducer.class); @Resource private RabbitMqInitializer rabbitMqInitializer; @Value("${mq.order.delete.exchange}") private String deleteExchange; @Value("${mq.order.delete.routingKey.deleteAll}") private String deleteAllRoutingKey; @Value("${mq.order.delete.routingKey.deletePart}") private String deletePartRoutingKey; /** * 发送删除全部订单消息 */ public void publishDeleteAll(String message) { log.info("发送删除全部消息: {}", message); publishMessage(message, deleteExchange, deleteAllRoutingKey); } /** * 发送部分删除消息 */ public void publishDeletePart(String message) { log.info("发送部分删除消息: {}", message); publishMessage(message, deleteExchange, deletePartRoutingKey); } private void publishMessage(String message, String exchange, String routingKey) { try { Channel channel = getChannel(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 持久化 .build(); channel.basicPublish(exchange, routingKey, props, message.getBytes()); log.info("消息发送成功 - Exchange: {}, RoutingKey: {}", exchange, routingKey); } catch (Exception e) { log.error("消息发送失败: {}", message, e); } } }

六、常见问题与解决方案

Q1: 如何确保消息不丢失?

// 1. Exchange 持久化 channel.exchangeDeclare(exchange, "topic", true); // durable=true ​ // 2. Queue 持久化 channel.queueDeclare(queue, true, false, false, null); // durable=true ​ // 3. 消息持久化 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 2=持久化 .build(); ​ // 4. 开启发布确认 channel.confirmSelect(); channel.basicPublish(exchange, routingKey, props, message.getBytes()); if (!channel.waitForConfirms()) { log.error("消息发布确认失败"); }

Q2: 如何控制消费速率?

// 设置 prefetch count,限制未确认消息数量 channel.basicQos(1); // 每次只预取1条消息 ​ // 手动确认 channel.basicConsume(queue, false, (consumerTag, delivery) -> { try { // 处理消息 processMessage(delivery.getBody()); // 手动确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { // 拒绝并重新入队 channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); } }, consumerTag -> {});

Q3: 同一服务多实例如何避免重复消费?

:让多实例共享同一个 Queue 名称即可,RabbitMQ 会自动轮询分发。

# 所有实例使用相同的队列名 mq: order: process: queue: order.process.queue # 固定队列名,不要用 ${instance.id}

七、完整流程图

7.1 消息发布流程

7.2 竞争消费 vs 广播消费对比


八、总结

消费模式Queue 策略Exchange 类型适用场景
竞争消费多消费者共享同一 Queue任意任务分发、负载均衡
广播消费每个消费者独立 Queuefanout / topic事件通知、数据同步
选择性消费独立 Queue + 不同 routing keytopic / direct按类型订阅

核心原则

消息是发送到Queue的,不是直接发送到消费者的。 同一个 Queue 的消息只会被消费一次,不同 Queue 可以收到相同消息的副本。

理解这一点,就能灵活设计各种消息消费场景!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1146219.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Web的模拟混频电路在线仿真操作指南

用浏览器就能玩转射频电路:在线仿真混频器的实战教学 你有没有过这样的经历?想验证一个简单的模拟混频电路,却要花半天时间安装LTspice、配置模型路径、翻找元件库;或者在课堂上讲调幅信号生成时,学生一脸茫然&#x…

SiFive平台移植RISC-V裸机程序从零实现指南

从零开始在 SiFive 平台运行 RISC-V 裸机程序:不只是“点灯”,而是真正理解底层启动机制你有没有试过,在一块全新的开发板上连一个 LED 都点不亮?不是代码写错了,也不是接线问题——而是程序根本没跑起来。这种情况在裸…

S8050三极管驱动LED灯时饱和状态判定:核心要点解析

S8050驱动LED为何总发热?一文讲透三极管饱和导通的设计精髓你有没有遇到过这种情况:用S8050三极管控制一个LED,结果灯不亮、亮度低,或者三极管发烫得厉害?明明电路看起来没问题——电源接了,电阻也加了&…

超详细版:Multisim搭建单级放大电路全过程

从零开始:用Multisim搭建一个真正能“放大”的单级共射极电路 你有没有试过在仿真软件里搭了一个放大电路,输入信号也加了,电源也接了——可示波器上出来的波形要么是条直线,要么就是削顶的正弦波?别急,这几…

方达炬〖发明信用种品〗:应用数据贷款

方达炬〖发明信用种品〗: 应用数据贷款

无源蜂鸣器驱动电路设计核心要点解析

无源蜂鸣器驱动电路设计:从原理到实战的完整指南在嵌入式系统开发中,声音提示早已不是“锦上添花”,而是人机交互的关键一环。无论是洗衣机完成洗涤时的一声“嘀”,还是智能门锁识别失败的连续警示音,背后都离不开一个…

模拟与数字混合电路板PCB设计的分区策略解析

混合信号PCB设计:如何让模拟与数字“和平共处”?在现代电子系统中,把高精度模拟电路和高速数字逻辑塞进同一块PCB,早已不是什么新鲜事。无论是工业传感器前端、医疗心电采集设备,还是5G通信模块,几乎都能看…

SiFive平台下RISC-V用户模式与特权模式切换详解

深入SiFive平台:RISC-V用户态与特权态切换的底层逻辑与实战解析你有没有遇到过这样的情况?在SiFive开发板上跑一个裸机程序,突然ecall指令一执行就卡死;或者写了个简单的系统调用,结果返回后程序“飞了”——PC指针指向…

强电弱电混合布局:电路板PCB设计避坑指南

强电弱电混合布局:PCB设计中的“安静”之道在工业控制柜里,一块小小的电路板可能同时承载着驱动几十安培电流的电机控制器,以及采集微伏级传感器信号的精密模拟前端。这种场景早已不是特例——强电与弱电共存于同一块PCB上,已经成…

驱动程序安装方式对比:图形化vs命令行通俗解释

驱动安装的两种“语言”:图形界面 vs 命令行,你该用哪一种?你有没有遇到过这种情况——新买了一台打印机,插上电脑却提示“未识别设备”,于是你打开厂商官网,下载了一个.exe文件,双击运行&#…

8位加法器Verilog实现通俗解释

从全加器到8位加法器:用Verilog亲手搭建一个“二进制计算器”你有没有想过,计算机是怎么做加法的?不是打开计算器点两下那种——而是从最底层的晶体管开始,靠0和1自己算出来的那种。今天我们就来干一件“硬核”的事:用…

字符设备驱动内存管理最佳实践解析

字符设备驱动内存管理:从踩坑到精通的实战指南你有没有遇到过这样的情况?驱动写得好好的,一跑起来却莫名其妙地宕机;或者系统用着用着内存越来越少,最后直接 OOM(Out of Memory)崩溃。更离谱的是…

Multisim14自定义虚拟仪器创建:从零开始教程

从零打造专属测量工具:Multisim14自定义虚拟仪器实战指南你有没有遇到过这样的情况?在做电路仿真时,标准示波器只能看波形、万用表只能测直流——但你想分析谐波畸变率、想自动识别元件类型、甚至希望一键生成Bode图。这时候,Mult…

多路选择器电路分析:数字电路实验一文说清

多路选择器电路分析:从实验到实战的深度拆解 你有没有遇到过这样的情况——在数字电路实验课上,老师让你用几片74系列芯片搭一个“数据开关”,结果接线一通乱,拨码开关一动,LED却怎么都不按预期亮?或者&…

ES索引分片策略设计:超详细版架构实践指南

Elasticsearch索引分片设计实战:从原理到高可用架构的深度拆解你有没有遇到过这样的场景?刚上线的ES集群查询飞快,但几个月后,随着数据不断写入,搜索延迟飙升、节点频繁GC、甚至部分分片无法分配。排查一圈下来&#x…

蜂鸣器报警模块快速理解:核心要点与基础测试演示

蜂鸣器报警模块实战指南:从原理到代码,轻松实现嵌入式音频反馈 你有没有遇到过这样的场景?设备出错了,但没有任何提示;或者程序跑起来了,却不知道是否正常启动。这时候,如果能“嘀”一声&#x…

HBuilderX安装与uni-app环境部署:新手手把手指导

从零开始搭建uni-app开发环境:HBuilderX安装与项目实战指南 你是不是也遇到过这样的困扰?想做一个小程序,又要兼容App,结果发现iOS、Android、微信、支付宝各搞一套代码,开发效率低得让人崩溃。别急,今天我…

HBuilderX中HTML5开发环境搭建:实战案例演示

用 HBuilderX 快速搭建 HTML5 开发环境:从零开始做一个个人主页你有没有过这样的经历?想快速写个网页原型,结果光是配置开发环境就花了一小时——装编辑器、配 Live Server、调路径、清缓存……明明只是想写几行代码,却被各种工具…

基于USB转串口驱动的PLC通信方案:系统学习教程

如何用USB转串口稳定连接PLC?从芯片到代码的工业通信实战指南 在工厂自动化现场,你是否遇到过这样的场景:手里的新工控机连个RS-232接口都没有,而产线上的西门子S7-200或三菱FX系列PLC却只支持串口通信?面对这种“新电…

为什么在抖音娱乐直播行业,公认“最好的工会”是史莱克学院

一、行业共识:顶级流水与长期稳居头部的实力背书在抖音娱乐直播行业,史莱克学院长期被视为标杆级头部公会。 曾位列抖音娱乐公会流水全国第一 规模庞大、体系成熟,而非“昙花一现型”工会 在主播、运营、业内从业者中口碑高度一致&#xfffd…