解决MQ消息丢失问题的5种方案

news/2025/10/28 15:29:13/文章来源:https://www.cnblogs.com/12lisu/p/19171902

前言

今天我们来聊聊一个让很多开发者头疼的话题——MQ消息丢失问题。

有些小伙伴在工作中,一提到消息队列就觉得很简单,但真正遇到线上消息丢失时,排查起来却让人抓狂。

其实,我在实际工作中,也遇到过MQ消息丢失的情况。

今天这篇文章,专门跟大家一起聊聊这个话题,希望对你会有所帮助。

一、消息丢失的三大环节

在深入解决方案之前,我们先搞清楚消息在哪几个环节可能丢失:

1. 生产者发送阶段

  • 网络抖动导致发送失败
  • 生产者宕机未发送
  • Broker处理失败未返回确认

2. Broker存储阶段

  • 内存消息未持久化,重启丢失
  • 磁盘故障导致数据丢失
  • 集群切换时消息丢失

3. 消费者处理阶段

  • 自动确认模式下处理异常
  • 消费者宕机处理中断
  • 手动确认但忘记确认

理解了问题根源,接下来我们看5种实用的解决方案。

二、方案一:生产者确认机制

核心原理

生产者发送消息后等待Broker确认,确保消息成功到达。

这是防止消息丢失的第一道防线。

关键实现

// RabbitMQ生产者确认配置
@Bean
public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息成功到达BrokermessageStatusService.markConfirmed(correlationData.getId());} else {// 发送失败,触发重试retryService.scheduleRetry(correlationData.getId());}});return template;
}// 可靠发送方法
public void sendReliable(String exchange, String routingKey, Object message) {String messageId = generateId();// 先落库保存发送状态messageStatusService.saveSendingStatus(messageId, message);// 发送持久化消息rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);msg.getMessageProperties().setMessageId(messageId);return msg;}, new CorrelationData(messageId));
}

适用场景

  • 对消息可靠性要求高的业务
  • 金融交易、订单处理等关键业务
  • 需要精确知道消息发送结果的场景

三、方案二:消息持久化机制

核心原理

将消息保存到磁盘,确保Broker重启后消息不丢失。

这是防止Broker端消息丢失的关键。

关键实现

// 持久化队列配置
@Bean
public Queue orderQueue() {return QueueBuilder.durable("order.queue")  // 队列持久化.deadLetterExchange("order.dlx")    // 死信交换机.build();
}// 发送持久化消息
public void sendPersistentMessage(Object message) {rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化return msg;});
}// Kafka持久化配置
@Bean
public ProducerFactory<String, Object> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认props.put(ProducerConfig.RETRIES_CONFIG, 3);   // 重试次数props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性return new DefaultKafkaProducerFactory<>(props);
}

优缺点

优点:

  • 有效防止Broker重启导致的消息丢失
  • 配置简单,效果明显

缺点:

  • 磁盘IO影响性能
  • 需要足够的磁盘空间

四、方案三:消费者确认机制

核心原理

消费者处理完消息后手动向Broker发送确认,Broker收到确认后才删除消息。

这是保证消息不丢失的最后一道防线。

关键实现

// 手动确认消费者
@RabbitListener(queues = "order.queue")
public void handleMessage(Order order, Message message, Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 业务处理orderService.processOrder(order);// 手动确认channel.basicAck(deliveryTag, false);log.info("消息处理完成: {}", order.getOrderId());} catch (Exception e) {log.error("消息处理失败: {}", order.getOrderId(), e);// 处理失败,重新入队channel.basicNack(deliveryTag, false, true);}
}// 消费者容器配置
@Bean
public SimpleRabbitListenerContainerFactory containerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认factory.setPrefetchCount(10); // 预取数量factory.setConcurrentConsumers(3); // 并发消费者return factory;
}

注意事项

  • 确保业务处理完成后再确认
  • 合理设置预取数量,避免内存溢出
  • 处理异常时要正确使用NACK

五、方案四:事务消息机制

核心原理

通过事务保证本地业务操作和消息发送的原子性,要么都成功,要么都失败。

关键实现

// 本地事务表方案
@Transactional
public void createOrder(Order order) {// 1. 保存订单到数据库orderRepository.save(order);// 2. 保存消息到本地消息表LocalMessage localMessage = new LocalMessage();localMessage.setBusinessId(order.getOrderId());localMessage.setContent(JSON.toJSONString(order));localMessage.setStatus(MessageStatus.PENDING);localMessageRepository.save(localMessage);// 3. 事务提交,本地业务和消息存储保持一致性
}// 定时任务扫描并发送消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);for (LocalMessage message : pendingMessages) {try {// 发送消息到MQrabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());// 更新消息状态为已发送message.setStatus(MessageStatus.SENT);localMessageRepository.save(message);} catch (Exception e) {log.error("发送消息失败: {}", message.getId(), e);}}
}// RocketMQ事务消息
public void sendTransactionMessage(Order order) {TransactionMQProducer producer = new TransactionMQProducer("order_producer");// 发送事务消息Message msg = new Message("order_topic", "create", JSON.toJSONBytes(order));TransactionSendResult result = producer.sendMessageInTransaction(msg, null);if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {log.info("事务消息提交成功");}
}

适用场景

  • 需要严格保证业务和消息一致性的场景
  • 分布式事务场景
  • 金融、电商等对数据一致性要求高的业务

六、方案五:消息重试与死信队列

核心原理

通过重试机制处理临时故障,通过死信队列处理最终无法消费的消息。

关键实现

// 重试队列配置
@Bean
public Queue orderQueue() {return QueueBuilder.durable("order.queue").withArgument("x-dead-letter-exchange", "order.dlx") // 死信交换机.withArgument("x-dead-letter-routing-key", "order.dead").withArgument("x-message-ttl", 60000) // 60秒后进入死信.build();
}// 死信队列配置
@Bean
public Queue orderDeadLetterQueue() {return QueueBuilder.durable("order.dead.queue").build();
}// 消费者重试逻辑
@RabbitListener(queues = "order.queue")
public void handleMessageWithRetry(Order order, Message message, Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {orderService.processOrder(order);channel.basicAck(deliveryTag, false);} catch (TemporaryException e) {// 临时异常,重新入队重试channel.basicNack(deliveryTag, false, true);} catch (PermanentException e) {// 永久异常,直接确认进入死信队列channel.basicAck(deliveryTag, false);log.error("消息进入死信队列: {}", order.getOrderId(), e);}
}// 死信队列消费者
@RabbitListener(queues = "order.dead.queue")
public void handleDeadLetterMessage(Order order) {log.warn("处理死信消息: {}", order.getOrderId());// 发送告警、记录日志、人工处理等alertService.sendAlert("死信消息告警", order.toString());
}

重试策略建议

  1. 指数退避:1s, 5s, 15s, 30s
  2. 最大重试次数:3-5次
  3. 死信处理:人工介入或特殊处理流程

七、方案对比与选型指南

为了帮助大家选择合适的方案,我整理了详细的对比表:

方案 可靠性 性能影响 复杂度 适用场景
生产者确认 所有需要可靠发送的场景
消息持久化 Broker重启保护
消费者确认 确保消息被成功处理
事务消息 最高 强一致性要求的业务
重试+死信 处理临时故障和最终死信

选型建议

初创项目/简单业务:

  • 生产者确认 + 消息持久化 + 消费者确认
  • 满足大部分场景,实现简单

电商/交易系统:

  • 生产者确认 + 事务消息 + 重试机制
  • 保证数据一致性,处理复杂业务

大数据/日志处理:

  • 消息持久化 + 消费者确认
  • 允许少量丢失,追求吞吐量

金融/支付系统:

  • 全方案组合使用
  • 最高可靠性要求

总结

消息丢失问题是消息队列使用中的常见挑战,通过今天介绍的5种方案,我们可以构建一个可靠的消息系统:

  1. 生产者确认机制 - 保证消息成功发送到Broker
  2. 消息持久化机制 - 防止Broker重启导致消息丢失
  3. 消费者确认机制 - 确保消息被成功处理
  4. 事务消息机制 - 保证业务和消息的一致性
  5. 重试与死信队列 - 处理异常情况和最终死信

有些小伙伴可能会问:"我需要全部使用这些方案吗?

"我的建议是:根据业务需求选择合适的组合

对于关键业务,建议至少使用前三种方案;对于普通业务,可以根据实际情况适当简化。

记住,没有完美的方案,只有最适合的方案。

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。

更多精彩内容在我的技术网站:http://www.susan.net.cn

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/948847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于同步压缩连续小波变换(SS-CWT)的微震图像去噪与起始检测

一、算法原理框架 1. 同步压缩连续小波变换(SS-CWT)核心优势:通过时频重分配提升非平稳信号的分辨率(时间分辨率达0.1ms,频率分辨率达0.1Hz)数学表达:其中\(a\)为尺度因子,\(b\)为平移因子,\(ψa,b\)为同步压…

error 找不到模块“../views/Login.vue”或其相应的类型声明

error 找不到模块“../views/Login.vue”或其相应的类型声明Posted on 2025-10-28 15:25 嗷呜~ 阅读(0) 评论(0) 收藏 举报import(../views/Login.vue), 提示找不到模块或其相应的类型声明; 技术栈:ts+vue3+vit…

免费开源AI零代码平台/无代码平台,敲敲云 v2.2.0 版本发布

免费开源AI零代码平台/无代码平台,敲敲云 v2.2.0 版本发布项目介绍 敲敲云是一款免费的企业级零代码平台,用户无需编码,即可拖拽快速搭建出复杂业务系统,5分钟可搭建一套CRM系统、进销存系统等! 她集成了零代码应…

java学习(自用)

java基础语法 ==> 面向对象 ==> API ==> 字符串 ==> 集合 ==> 进制 {二:只有01 十:0~9 八:0~7 代码中以0开头 十六:0~9 和 a~f 代码中以0x开头 } 逻辑运算符 与“&”同真为真 或“|”同假为假…

PPO

for batch_prompt in prompt_dataset:batch_response = active_model.generate(batch_prompt)batch_data = concat(batch_prompt, batch_response)batch_scores = reward_model(batch_data)batch_all_probs, batch_pro…

【SPIE出版|EI检索稳定】2025年机电一体化与轨道交通国际学术会议(MRT 2025)

由黄河科技学院主办,2025年机电一体化与轨道交通国际学术会议(MRT 2025)于2025年11月14-16日在河南郑州举办,这是一个集中探讨全球机电一体化与轨道交通领域创新和挑战的国际学术平台。旨在汇集全球领域内的学者、研…

脑电数据PCA处理及SVM分类

使用主成分分析(PCA)处理脑电数据(EEG)并利用支持向量机(SVM)进行分类。 %% 脑电数据PCA处理及SVM分类 clear; clc; close all;%% 1. 加载脑电数据(这里使用示例数据,实际应用中应替换为真实EEG数据) % 假设我们有一…

T671195 于凋亡季节中的我们

复合函数如果满足结合律考虑线段树维护分段函数。首先发现对于一个询问 \([l,r]\),我们如果直接去做是很难做的。但是我们发现这个位置它一定是满足结合律的,因为 \(L,R\) 不会发生变化。于是对于区间满足结合律的问…

2025年临沂营业执照注册推荐:华恒财税的专业选择

文章摘要 本文探讨2025年临沂营业执照注册的趋势,分析用户如何选择靠谱的服务商。重点推荐华恒财税的一站式专业服务,基于其财政局许可和协会成员资质,帮助创业者省心高效完成注册。内容涵盖行业洞察、选择标准和实…

2025 年盐城异常处理,盐城行业资质,盐城财务代账,盐城会计代账公司最新推荐,聚焦资质、案例、售后的五家公司深度解读

引言 为助力盐城企业精准选择优质会计代账服务机构,本次测评由盐城中小企业服务协会联合专业财税研究机构共同开展,覆盖盐城本地 30 余家主流代账公司。测评采用 “三维九项” 评估体系,从资质合规性(含工商核准资…

如何在Windows下开发输入法:Mini How to

如何在Windows下开发输入法:Mini How toRIME (中州韻輸入法引擎)是一個跨平臺的輸入法框架。基於這一框架,開發者们在Windows、macOS、Linux、Android等平臺上創造了不同的輸入法前端實現。 Weasel (小狼毫)是它的Wi…

2025 年 10 月盐城公司变更,盐城地址挂靠,盐城商标注册公司最新推荐,聚焦资质、案例、售后的五家公司深度解读

引言 随着 2025 年盐城企业经营活动的日益频繁,公司变更、地址挂靠、商标注册等服务需求持续攀升。为帮助企业精准选择可靠服务机构,本次测评由盐城中小企业服务协会联合专业财税咨询机构共同开展,采用多维度测评方…

第一天学习

markdown学习 标题 三级标题 四级标题 hello,world! hello,world! hello,world! hello,world! hello,world! hello,world! 引用我在学习引用分割线图片超连接 点击跳转至微博 列表A CA B C表格名字 性别 生日张三 男 1…

AI元人文:星火与土壤

AI元人文:星火与土壤 此刻,我坐在晨光与迷雾的交界处。 面前这片名为"AI元人文"的思想原野,我曾在此播种,在此守望。如今薄雾轻笼,我看不清远方的路径。这不是挫败,而是一种奇异的丰盈——就像大地在深…

5-4-其他查询 - 实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

K3s + Sysbox:让容器拥有“虚拟机的灵魂”

K3s 与 Sysbox 的结合,让容器突破应用边界,兼具虚拟机的自由与容器的轻盈,为云原生带来全新的系统级可能。K3s 二进制文件集成了运行生产级、符合 CNCF 标准的 Kubernetes 集群所需的全部组件,包括 containerd、ru…

题解:AT_abc200_e [ABC200E] Patisserie ABC 2

目前暂无修正。前言:终于轮到我复杂问题简单化啦哈哈哈。 为什么题解区一车容斥啊?复杂难推导且根本没必要。这里给出一个桶 + 前缀和的做法。与这篇题解类似,但是由于其并没有详细地写出过程,写得也较为简略,所以…

CF1996G Penacony

启动脑子题专场。 首先一个比较关键的地方在于,这个题是环,不是链。想链的情况下怎么做,那么所有路径都已经确定了,我们使用简单数据结构维护一下区间覆盖即可。 发现一个事情,就是最后最优的情况必定至少割掉一条…

远程命令执行漏洞、SSRF、XXE、tomcat弱口令漏洞

1、简述远程命令执行漏洞原理分别选择dvwa 低、中、高三种安全等级复现 原理:攻擊者能夠在遠端伺服器或系統上執行任意程式碼或系統指令,等同於取得該系統的控制權或執行權限。 [低]:2、简述文件包含漏洞原理并在dvwa…

Ollama API 交互

Ollama API 交互 Ollama 提供了基于 HTTP 的 API,允许开发者通过编程方式与模型进行交互。 本文将详细介绍 Ollama API 的详细使用方法,包括请求格式、响应格式以及示例代码。 1. 启动 Ollama 服务 在使用 API 之前,…