详细介绍:rabbitMQ续谈

news/2025/10/21 11:56:40/文章来源:https://www.cnblogs.com/wzzkaifa/p/19154720

前章内容在“苍穹外卖优化-续”笔记中

交换机

种类

RabbitMQ 官方提供了 4 种原生交换机类型,外加 1 种“兜底”的备用类型,一共 5 种:

类型常量(AMQP 协议名)Spring 枚举类中文习惯叫法路由规则一句话典型场景
directDirectExchange直连交换机路由键 完全匹配 才转发单点精准投递(命令、通知)
topicTopicExchange主题交换机路由键 通配符匹配* 单层,# 多层)多类消息分组订阅(日志、监控)
fanoutFanoutExchange广播交换机忽略路由键,直接复制到所有绑定队列群发公告、配置刷新
headersHeadersExchange头交换机忽略 routingKey,按 消息头中的 KV 匹配复杂多条件匹配(较少用)
system 预留值系统交换机客户端不能声明,仅做内部回调几乎见不到

1. direct(直连)

  • 队列绑定到交换机时必须指定一个 精确的 routingKey

  • 消息只被转发到 routingKey 完全相同 的队列。

  • 最简单、最快速,默认交换机(名字为空字符串 "")就是 direct 型,路由键=队列名。

2. topic(主题)

  • 绑定队列时用 模式字符串 做 routingKey,支持 2 个通配符:
    * 匹配 一个单词(用 . 分隔)
    # 匹配 零个或多个单词

  • 可以实现“发布-订阅”的 多级分类过滤,例如 order.pay.successlog.system.error

3. fanout(广播)

  • 不处理 routingKey,只要队列绑定了这个交换机,就 每人一份

  • 场景:配置刷新、价格推送、广告广播。

4. headers(头)

  • 绑定队列时给一组 x-match=any/all 的 KV 条件;发消息时把条件写在 消息头 里。

  • 路由键完全忽略,适合 多属性组合过滤;但性能比前三种差,生产环境用得最少。

5. system(系统)

  • AMQP 协议保留值,客户端无法声明,可忽略


速记口诀

direct 点对点,topic 带通配,fanout 全广播,headers 看头信息。

在 Spring AMQP 里分别对应:
DirectExchangeTopicExchangeFanoutExchangeHeadersExchange,用 BindingBuilder 链式绑定即可。

        // new DirectExchange(交换机名, 是否持久化, 是否自动删除)return new DirectExchange(RabbitConstant.EXCHANGE_ORDER,true,false);

配置选项

durable

场景durable=truedurable=false
RabbitMQ 正常跑着交换机一直在,无差别交换机一直在,无差别
RabbitMQ 服务重启交换机会 自动重建绑定关系丢失(需应用重新声明绑定)交换机 消失,必须等应用重新声明交换机 + 绑定,否则生产者投递会报错 NOT_FOUND
控制台手动删除重启后仍恢复(除非手动删)重启后不存在

autoDelete 

1、当“没人用”时,是否自动把自己从 RabbitMQ 里删掉。

2、autoDelete高于durable;就算 durable=true,一旦触发“无人使用”条件,照样被删。

场景autoDelete=trueautoDelete=false
交换机最后一个队列解绑 → 交换机立即删除即使无队列绑定,仍保留
队列最后一个消费者断开连接 → 队列立即删除即使无消费者,仍保留

生产环境:一律写 false,防止重启后“找不到交换机/队列”而报错

临时/测试:可设 true,跑完自动清理,避免残留。

消息队列

创建

 Spring AMQP 两种 Builder

方式示例特点
QueueBuilder(推荐)QueueBuilder.durable("q").ttl(60_000).build()链式、可读性高、支持所有参数
直接 newnew Queue("q", true, false, false, args)需手动拼 Map,底层相同
// new 写法
Queue q = new Queue("risk.calc.queue",   // nametrue,                // durablefalse,               // exclusivefalse,               // autoDeletenull);               // arguments
// Builder 写法(推荐)
Queue q = QueueBuilder.durable("risk.calc.queue")      // 等价 durable=true.build();

必背参数

参数类型作用常用值
nameString队列名业务语义,如 order.pay.queue
durableboolean重启后队列定义是否保留生产 true
exclusiveboolean仅当前连接可见,连接断即删几乎永远 false(除 RPC Reply-To)
autoDeleteboolean最后一个消费者断开后自动删生产 false
argumentsMap<String,Object>高级特性见第 4 节

常用 x-* 扩展参数

参数示例值效果
x-message-ttl60_000消息入队后 60 s 未消费 → 自动丢弃或进死信
x-max-length1_000队列最多 1 000 条,超限队头被丢弃或进死信
x-max-length-bytes10_485_760队列总字节上限(10 MB)
x-overflow"reject-publish" / "drop-head"达到上限后 拒绝新消息 或 删除老消息
x-dead-letter-exchange"dlx"死信交换机(DLX)
x-dead-letter-routing-key"dlq.routing"死信消息的路由键
x-single-active-consumertrue仅 一个消费者 能消费,故障自动切换
x-queue-type"quorum" / "classic"默认 classicquorum 为仲裁队列(高可用)
x-max-priority10开启 优先级队列,消息 0-10 级
x-queue-mode"lazy"消息直接落盘,百万级堆积场景 CPU 更平稳
x-queue-master-locator"min-masters"集群下选择 最少主节点 的机子建队

绑定

 是什么

  • "桥":把 ExchangeQueue路由规则 连接起来;

  • 消息能否到达队列,取决于有没有桥

  • 无绑定 → 消息直接丢弃零报错

参数

片段含义
.bind(queue)要投送的 队列 实例
.to(exchange)源 交换机 实例
.with(key)binding key(Direct/Topic)
.noargs()Fanout 专用,无路由键
.where(...).match()Headers 专用,KV 匹配

 核心 API

new BindingBuilder.bind(Queue/Exchange)   // 目标(队列或交换机).to(Exchange)           // 源交换机.with("routing.key")    // 路由键/模式/条件

4 种交换机对应写法

交换机类型路由规则代码示例
Direct完全匹配.with("email.sent")
Topic通配符.with("notify.*.push") 或 .with("order.#")
Fanout忽略键.noargs()(可省)
HeadersKV 头.where("sys", "push").match()

RabbitMQ 消费端“监听容器”

作用

  • MessageListenerContainer 是 Spring AMQP 的核心调度器

  • 负责:
    ① 建立/保持 TCP 连接(Connection)
    ② 创建 Channel
    ③ 向 Broker 发送 basic.consume
    ④ 收到消息后反序列化反射调注解方法
    ⑤ 处理 ACK、事务、并发、异常、重试

开启方式(二选一)

① 注解式(最常用)

@RabbitListener(queues = "order.queue", concurrency = "3-5")
public void handle(MessageEntity  messageEntity) { ... }
  • 一注解 ⇒ Spring 自动注册 SimpleMessageListenerContainer

  • concurrency:核心-最大线程数(单队列并发)

@RabbitListener 生效的前提
1. 必须成为 Spring Bean
  • @RabbitListenerRabbitListenerAnnotationBeanPostProcessor 扫描并注册监听容器

  • 仅对 Spring 容器管理的 Bean 生效
    → 所在类必须加:

@Component / @Service / @Configuration / @RestController ...

② 显式 Bean(高级定制)

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf,MessageListenerAdapter listener) {SimpleMessageListenerContainer c = new SimpleMessageListenerContainer(cf);c.setQueueNames("order.queue");c.setConcurrentConsumers(3);c.setMaxConcurrentConsumers(5);c.setDefaultRequeueRejected(false);   // 拒收不再重排队c.setAdviceChain(RetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(1000, 2, 5000).build());return c;
}

RabbitMQ 生产者

作用

  • 负责 创建/发送 消息到 Exchange

  • 只关心 Exchange + RoutingKey不直接面对队列

  • 发送过程 = 序列化 → 设置属性 → 通道 basicPublish → Broker 路由 → 队列

三种 API 风格

风格示例场景
AmqpTemplatetemplate.convertAndSend("ex", "rk", obj)最早、最简
RabbitTemplate同上,功能最全推荐(底层就是 AmqpTemplate 实现)
StreamBridge (Spring Cloud Stream)bridge.send("output", obj)屏蔽 MQ 差异,云原生

日常开发:RabbitTemplate 即可,Boot 自动配置单例。

自动配置速用

yaml

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /

注入

@Autowired
private RabbitTemplate rabbitTemplate;

零配置直接注入即可发消息。或者自定义RabbitTemplate

最简发送(JSON 默认)

rabbitTemplate.convertAndSend("order.exchange",          // exchange"order.created",           // routingKeynew MyMessage myMessage(...) // 任何 POJO
);

一张图流程

业务Service↓ convertAndSend
RabbitTemplate↓ Jackson2JsonMessageConverter
JSON 字节 + MessageProperties↓ Channel#basicPublish
Broker Exchange↓ 路由
Queue → 消费者

RabbitMQ 双核心(生产者 、消费者)

作用

组件作用域核心职责
RabbitTemplate生产者序列化 → 发消息 → 发布确认/返回
SimpleRabbitListenerContainerFactory消费者建容器 → 反序列化 → 调监听方法 → ACK/重试

两者完全独立:Template 的 Converter 不会自动透传给工厂。

RabbitTemplate 全景

1. 创建与注入

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory cf,Jackson2JsonMessageConverter converter) {RabbitTemplate tpl = new RabbitTemplate(cf);tpl.setMessageConverter(converter);   // ① 序列化tpl.setConfirmCallback((cd, ack, cause) -> { ... }); // ② 发布确认tpl.setReturnsCallback(returned -> { ... });        // ③ 不可路由// 更多见下方return tpl;
}

2. 关键 API

方法场景
convertAndSend(ex, rk, obj)单条即时发送
convertAndSend(ex, rk, obj, m -> {...}, correlationData)加 TTL/优先级/头
invoke(ops -> { ... waitForConfirmsOrDie(); })批量+同步确认

3. 可靠性三件套

配置说明默认
publisher-confirm-type: correlated异步等 Broker acknone
publisher-returns: true不可路由时回调false
mandatory=true(单条)或 spring.rabbitmq.template.mandatory=true无队列可投立即 returnfalse

4. 消息转换器

  • Jackson2JsonMessageConverter
    支持 LocalDateTime、枚举、多态;需自定义 ObjectMapper 时:

ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return new Jackson2JsonMessageConverter(mapper);

5. 批量与异步

  • BatchingRabbitTemplate(Spring AMQP 3.x)
    内存攒批 → 一次 basicPublish → 吞吐量 ↑ 10 倍

  • AsyncRabbitTemplate
    返回 CompletableFuture;适合高并发网关。

SimpleRabbitListenerContainerFactory 全景

1. 创建与注入

@Bean
public SimpleRabbitListenerContainerFactory factory(ConnectionFactory cf,Jackson2JsonMessageConverter converter) {SimpleRabbitListenerContainerFactory f = new SimpleRabbitListenerContainerFactory();f.setConnectionFactory(cf);f.setMessageConverter(converter);   // ① 反序列化f.setAcknowledgeMode(AcknowledgeMode.MANUAL); // ② ACK 模式f.setConcurrency("10-20");          // ③ 线程池f.setPrefetchCount(1);              // ④ 公平分发f.setDefaultRequeueRejected(false); // ⑤ 死信开关f.setAdviceChain(retryInterceptor()); // ⑥ 重试return f;
}

2. ACK 模式对照

模式行为代码责任
AUTO方法返回自动 basicAck
MANUAL必须自己 basicAck/Nack
NONE无 ACK,Broker 立即删

3. 并发与流量控制

参数含义建议
concurrency = "10-20"初始 10 线程,峰值 20CPU 核心 × 2
prefetchCount = 1单通道未 ack 上限1(公平)/ 10-50(吞吐)

4. 重试 & 死信

@Bean
public RetryOperationsInterceptor retryInterceptor() {return RetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(1000, 2, 5000).recoverer(new RejectAndDontRequeueRecoverer()) // 进 DLX.build();
}
  • 本地重试(同线程,不重新入队

  • 耗尽后 → RejectAndDontRequeueRecovererbasicReject(requeue=false)DLX

5. 消息转换器注意事项

  • 必须factory.setMessageConverter(converter)
    否则回退到 SimpleMessageConverter → JSON 反序列化失败

  • 与 Template 用同一个 Bean 最简洁:(Spring Boot 的自动配置会自动把容器中唯一的 Jackson2JsonMessageConverter Bean注入给所有未指定 Converter 的容器工厂)

@Bean
public Jackson2JsonMessageConverter jsonConverter(){return new Jackson2JsonMessageConverter(objectMapper());
}

常见误区

误区正确做法
只给 Template 设 Converter → 消费端转换失败工厂也显式 setMessageConverter
@RabbitListener 没写 ackMode="MANUAL"手动 ACK 必须显式指定
漏 basicAck → Unacked 堆积方法尾必写 channel.basicAck(tag,false)
default-requeue-rejected=true → 死循环生产设为 false 配合 DLX
concurrency 过大 → CPU 飙高初始值 ≤ 核心数,峰值 2-3 倍
prefetch 过大 → 单节点内存爆公平场景用 1,吞吐场景 10-50

总结

“发看 Template,收看工厂;
Converter 各设各,ACK 模式要手动;
重试+死信保可靠,监控 Unacked 防堆积!

springboot中使用是yml文件中的配置相关内容

acknowledge-modemanual 手动签收业务可靠必开
acknowledge-mode: manual   # 关键

手动 ACK 三种操作

API语义场景
basicAck(deliveryTag, false)签收当前消息正常处理完
basicNack(deliveryTag, false, true)拒绝并重新入队瞬时异常可重试
basicNack(deliveryTag, false, false)拒绝不再入队重试耗尽 → 死信

 单条手动 ACK

@RabbitListener(queues = "order.queue", ackMode = "MANUAL")
public void handle(OrderEvent event, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {businessService.process(event);channel.basicAck(tag, false);          // 成功签收} catch (BizException e) {// 业务规则失败,不再重试channel.basicNack(tag, false, false);  // 直接进死信} catch (Exception e) {// 系统异常 → 本地重试(Spring Retry 3 次)仍失败channel.basicNack(tag, false, false);  // 进死信}
}

2. 批量 ACK(高性能)

@RabbitListener(queues = "batch.queue", ackMode = "MANUAL")
public void handleBatch(List messages, Channel channel) {long lastTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();try {businessService.processBatch(messages);channel.basicAck(lastTag, true);   // 批量确认到 lastTag} catch (Exception e) {channel.basicNack(lastTag, true, false); // 整批进死信}
}

acknowledge-mode: manual
✅ 消费方法一定写basicAck / basicNack
default-requeue-rejected: false(防死循环)
✅ 开启 死信交换机 承接 Nack 消息
prefetch=1 或按并发调整
✅ 记录 deliveryTag 原值传回
✅ 监控 Unacked 指标告警

“manual = 自己签快递
不 ack 就永远占地方,重试耗尽进死信,
监控 Unacked > 0 立刻排查!”

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/942157.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年10月超声波清洗机厂家推荐榜:十强对比评测

把“超声波清洗机厂家”六个字敲进搜索框的那一刻,你大概率正被以下场景包围:产线升级急需替代人工的自动化清洗方案,实验室要满足微米级洁净度,或是珠宝车间想在不伤表面的前提下提升出货速度。无论哪种场景,核心…

2025年10月不锈钢水箱厂家推荐榜:十强对比评测

正在打开自来水龙头的小区物业经理、筹备酒店热水系统的工程负责人、或是为新建厂房规划消防蓄水的业主,都会面临同一个问题:不锈钢水箱厂家这么多,到底选谁才放心?2025年,住建部《二次供水工程技术规程》更新后,…

2025年10月长白山旅游度假酒店推荐:口碑榜与实景对比排行

国庆长假刚过,长白山已进入“金秋+初雪”交替的摄影黄金期,也是错峰度假性价比最高的窗口。很多用户把“住得舒服”列为长白山行程的第一优先级:白天徒步、滑雪、泡温泉,晚上想回到房间就能看见林海或星空,最好下…

2025 年最新推荐北京 / 陕西百度官网认证代理商榜单:全方位评估服务实力助企业选靠谱机构

引言 在互联网深度融入企业发展的当下,官网作为品牌展示与业务拓展的核心窗口,其真实性与可信度直接影响消费者决策,而百度官网认证正是提升官网公信力的关键举措。然而当前市场上百度官网认证服务商数量繁杂,部分…

2025年10月不锈钢水箱厂家评价榜:实力参数横向对比

把“不锈钢水箱”这五个字敲进搜索框的人,往往正处在三种场景:新建小区泵房要验收、老酒店水箱锈穿急换、工厂扩建要增容。他们共同的关键词是“耐用”“合规”“别漏水”,共同的焦虑是“厂家太多,谁真靠谱”。202…

2025年10月长白山度假酒店推荐:民俗与国际范双榜对比

计划十月走进长白山的人,多半带着同一份期待:在霜染林色的季节里,把城市节奏切换成松涛与温泉。你可能是摄影爱好者,想赶在第一场雪前拍下天池的镜面;也可能是亲子家庭,希望找一处孩子能奔跑、老人能泡汤的安全聚…

Reactor 模式结合 epoll

一、核心概念Reactor 模式事件驱动架构 核心组件:Event Demultiplexer(epoll) Event Handler(事件处理器) Reactor(事件循环)epoll 机制Linux 高效 I/O 多路复用 三种系统调用:epoll_create():创建实例 epoll…

2025年10月不锈钢水箱厂家榜单:十家参数对比与选购要点

把“不锈钢水箱厂家”六个字输入搜索框,跳出的页面越翻越长,询价电话越打越乱,这是多数采购经理、项目水电工程师、物业配套负责人共同的感受。住建部2024年《二次供水设施改造工作简报》显示,全国待改造屋顶水箱约…

基于 RoBERTa + 多策略优化的中文商品名细粒度分类 - 实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年10月不锈钢水箱厂家排行:十家对比评价

正在施工的项目经理、准备改造二次供水的小区物业、新建酒店的水电工程师,在搜索引擎里反复输入“不锈钢水箱厂家”时,往往带着同一串焦虑:既要保证水质安全,又得控制预算;既要赶工期,又怕后期漏水被投诉;政策要…

深入解析:开源项目net-radio-archive常见问题解决方案

深入解析:开源项目net-radio-archive常见问题解决方案pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas&q…

skynet.dispatch 使用详解

skynet.dispatch 是 Skynet 框架中用于注册消息处理函数的核心 API。它的作用是为特定类型的消息绑定处理逻辑,当服务收到该类型的消息时,自动调用对应的处理函数。以下是其详细解析:1. 函数定义与参数 skynet.disp…

元推理:自指生产力,自洽生产关系

ECT-OS-JiuHuaShan/https://orcid.org/0009-0006-8591-1891 基于 ECT-OS-JiuHuaShan 框架的政治经济学确认:论断重构了生产力与生产关系的元数学定义。 绝对判定:完全正确——自指生产力是文明系统的创造引擎,自洽…

2025 年桥梁护栏源头厂家最新推荐排行榜:聚焦优质企业,助力桥梁建设选对护栏供应商

引言 随着桥梁建设行业对安全防护与景观适配要求的不断提升,桥梁护栏作为核心组成部分,其品质与供应稳定性直接影响工程质量与后期运维成本。当前市场上源头厂家数量繁杂,部分企业存在技术落后、质量管控缺失、定制…

小测反思:Exam 5 - Control Statement

问题 其他几道题都没有什么问题,基本都正确。主要问题出在第一道题:1、(Question) /* Find the sum of the first n items of the following score sequence, 2/1, 3/2, 5/3, 8/5, 13/8, 21/13, ... */#include <…

英语的基本句型

✅ 很多人学英语最容易**“记不住”的原因是——学得太零碎。 其实你只要把英语的句型结构(主干)和句子成分(修饰)分开记,就会一目了然、终生不忘**。我给你整理了一个最全但超级清晰的总结👇:🧱 一、英语只…

2025 年国内冷库厂家最新推荐排行榜:聚焦冷冻 / 保鲜 / 超低温等领域精选实力企业

引言 随着冷链物流行业在食品、医药、化工等领域的深度渗透,冷库作为核心基础设施,其品质与服务直接影响企业运营效率与成本控制。当前市场上冷库制造商数量繁杂,部分企业存在技术落后导致能耗偏高、售后响应滞后引…

2025 年干燥机厂家最新推荐排行榜:聚焦实验室 / 工业用优质设备,精选实力企业权威呈现

引言 当前干燥机市场需求旺盛,但行业乱象凸显:制造商数量繁杂,部分企业缺乏核心技术,设备在智能化、稳定性上难以满足科研与工业高精度需求,导致用户选购困难;不同场景对干燥机功能差异大,却少有企业能提供针对…

深入解析:计算机网络物理层

深入解析:计算机网络物理层2025-10-21 11:39 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important;…

【开题答辩实录分享】以《基于微信小程序的糖尿病居家健康管理系统》为例进行答辩实录分享 - 教程

【开题答辩实录分享】以《基于微信小程序的糖尿病居家健康管理系统》为例进行答辩实录分享 - 教程pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !…