我工作中用MQ的10种场景

news/2025/10/9 10:03:13/文章来源:https://www.cnblogs.com/12lisu/p/19130526

前言

最近有球友问我:MQ的使用场景有哪些?工作中一定要使用MQ吗?

记得刚工作那会儿,我总是想不明白:为什么明明直接调用接口就能完成的功能,非要引入MQ这么个"中间商"?

直到经历了系统崩溃、数据丢失、性能瓶颈等一系列问题后,我才真正理解了MQ的价值。

今天我想和大家分享我在实际工作中使用消息队列(MQ)的10种典型场景,希望对你会有所帮助。

一、为什么需要消息队列(MQ)?

在深入具体场景之前,我们先来思考一个基本问题:为什么要使用消息队列?

系统间的直接调用:
image

引入消息队列后:
image

接下来我们将通过10个具体场景,带大家来深入理解MQ的价值。

场景一:系统解耦

背景描述

在我早期参与的一个电商项目中,订单创建后需要通知多个系统:

// 早期的紧耦合设计
public class OrderService {private InventoryService inventoryService;private PointsService pointsService;private EmailService emailService;private AnalyticsService analyticsService;public void createOrder(Order order) {// 1. 保存订单orderDao.save(order);// 2. 调用库存服务inventoryService.updateInventory(order);// 3. 调用积分服务pointsService.addPoints(order.getUserId(), order.getAmount());// 4. 发送邮件通知emailService.sendOrderConfirmation(order);// 5. 记录分析数据analyticsService.trackOrderCreated(order);// 更多服务...}
}

这种架构存在严重问题:

  • 紧耦合:订单服务需要知道所有下游服务
  • 单点故障:任何一个下游服务挂掉都会导致订单创建失败
  • 性能瓶颈:同步调用导致响应时间慢

MQ解决方案

引入MQ后,架构变为:
image

代码实现

// 订单服务 - 生产者
@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createOrder(Order order) {// 1. 保存订单orderDao.save(order);// 2. 发送消息到MQrabbitTemplate.convertAndSend("order.exchange","order.created",new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount()));}
}// 库存服务 - 消费者
@Component
@RabbitListener(queues = "inventory.queue")
public class InventoryConsumer {@Autowiredprivate InventoryService inventoryService;@RabbitHandlerpublic void handleOrderCreated(OrderCreatedEvent event) {inventoryService.updateInventory(event.getOrderId());}
}

技术要点

  1. 消息协议选择:根据业务需求选择RabbitMQ、Kafka或RocketMQ
  2. 消息格式:使用JSON或Protobuf等跨语言格式
  3. 错误处理:实现重试机制和死信队列

场景二:异步处理

背景描述

用户上传视频后需要执行转码、生成缩略图、内容审核等耗时操作,如果同步处理,用户需要等待很长时间。

MQ解决方案

// 视频服务 - 生产者
@Service
public class VideoService {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public UploadResponse uploadVideo(MultipartFile file, String userId) {// 1. 保存原始视频String videoId = saveOriginalVideo(file);// 2. 发送处理消息kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));// 3. 立即返回响应return new UploadResponse(videoId, "upload_success");}
}// 视频处理服务 - 消费者
@Service
public class VideoProcessingConsumer {@KafkaListener(topics = "video-processing")public void processVideo(VideoProcessingEvent event) {// 异步执行耗时操作videoProcessor.transcode(event.getVideoId());videoProcessor.generateThumbnails(event.getVideoId());contentModerationService.checkContent(event.getVideoId());// 发送处理完成通知notificationService.notifyUser(event.getUserId(), event.getVideoId());}
}

架构优势

  1. 快速响应:用户上传后立即得到响应
  2. 弹性扩展:可以根据处理压力动态调整消费者数量
  3. 故障隔离:处理服务故障不会影响上传功能

场景三:流量削峰

背景描述

电商秒杀活动时,瞬时流量可能是平时的百倍以上,直接冲击数据库和服务。

MQ解决方案

image

代码实现

// 秒杀服务
@Service
public class SecKillService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;public SecKillResponse secKill(SecKillRequest request) {// 1. 校验用户资格if (!checkUserQualification(request.getUserId())) {return SecKillResponse.failed("用户无资格");}// 2. 预减库存(Redis原子操作)Long remaining = redisTemplate.opsForValue().decrement("sec_kill_stock:" + request.getItemId());if (remaining == null || remaining < 0) {// 库存不足,恢复库存redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());return SecKillResponse.failed("库存不足");}// 3. 发送秒杀成功消息到MQrabbitTemplate.convertAndSend("sec_kill.exchange","sec_kill.success",new SecKillSuccessEvent(request.getUserId(), request.getItemId()));return SecKillResponse.success("秒杀成功");}
}// 订单处理消费者
@Component
@RabbitListener(queues = "sec_kill.order.queue")
public class SecKillOrderConsumer {@RabbitHandlerpublic void handleSecKillSuccess(SecKillSuccessEvent event) {// 异步创建订单orderService.createSecKillOrder(event.getUserId(), event.getItemId());}
}

技术要点

  1. 库存预扣:使用Redis原子操作避免超卖
  2. 队列缓冲:MQ缓冲请求,避免直接冲击数据库
  3. 限流控制:在网关层进行限流,拒绝过多请求

场景四:数据同步

背景描述

在微服务架构中,不同服务有自己的数据库,需要保证数据一致性。

MQ解决方案

// 用户服务 - 数据变更时发送消息
@Service
public class UserService {@Transactionalpublic User updateUser(User user) {// 1. 更新数据库userDao.update(user);// 2. 发送消息(在事务内)rocketMQTemplate.sendMessageInTransaction("user-update-topic",MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus())).build(),null);return user;}
}// 其他服务 - 消费用户更新消息
@Service
@RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")
public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {@Overridepublic void onMessage(UserUpdateEvent event) {// 更新本地用户信息缓存orderService.updateUserCache(event.getUserId(), event.getStatus());}
}

一致性保证

  1. 本地事务表:将消息和业务数据放在同一个数据库事务中
  2. 事务消息:使用RocketMQ的事务消息机制
  3. 幂等消费:消费者实现幂等性,避免重复处理

场景五:日志收集

背景描述

分布式系统中,日志分散在各个节点,需要集中收集和分析。

MQ解决方案

image

代码实现

// 日志收集组件
@Component
public class LogCollector {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void collectLog(String appId, String level, String message, Map<String, Object> context) {LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());// 发送到KafkakafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));}
}// 日志消费者
@Service
public class LogConsumer {@KafkaListener(topics = "app-logs", groupId = "log-es")public void consumeLog(String message) {LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);// 存储到ElasticsearchelasticsearchService.indexLog(logEvent);// 实时监控检查if ("ERROR".equals(logEvent.getLevel())) {alertService.checkAndAlert(logEvent);}}
}

技术优势

  1. 解耦:应用节点无需关心日志如何处理
  2. 缓冲:应对日志产生速率波动
  3. 多消费:同一份日志可以被多个消费者处理

场景六:消息广播

背景描述

系统配置更新后,需要通知所有服务节点更新本地配置。

MQ解决方案

// 配置服务 - 广播配置更新
@Service
public class ConfigService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public void updateConfig(String configKey, String configValue) {// 1. 更新配置存储configDao.updateConfig(configKey, configValue);// 2. 广播配置更新消息redisTemplate.convertAndSend("config-update-channel", new ConfigUpdateEvent(configKey, configValue));}
}// 服务节点 - 订阅配置更新
@Component
public class ConfigUpdateListener {@Autowiredprivate LocalConfigCache localConfigCache;@RedisListener(channel = "config-update-channel")public void handleConfigUpdate(ConfigUpdateEvent event) {// 更新本地配置缓存localConfigCache.updateConfig(event.getKey(), event.getValue());}
}

应用场景

  1. 功能开关:动态开启或关闭功能
  2. 参数调整:调整超时时间、限流阈值等
  3. 黑白名单:更新黑白名单配置

场景七:顺序消息

背景描述

在某些业务场景中,消息的处理顺序很重要,如订单状态变更。

MQ解决方案

// 订单状态变更服务
@Service
public class OrderStateService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void changeOrderState(String orderId, String oldState, String newState) {OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);// 发送顺序消息,使用orderId作为sharding keyrocketMQTemplate.syncSendOrderly("order-state-topic", event, orderId  // 保证同一订单的消息按顺序处理);}
}// 订单状态消费者
@Service
@RocketMQMessageListener(topic = "order-state-topic",consumerGroup = "order-state-group",consumeMode = ConsumeMode.ORDERLY  // 顺序消费
)
public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {@Overridepublic void onMessage(OrderStateEvent event) {// 按顺序处理订单状态变更orderService.processStateChange(event);}
}

顺序保证机制

  1. 分区顺序:同一分区内的消息保证顺序
  2. 顺序投递:MQ保证消息按发送顺序投递
  3. 顺序处理:消费者顺序处理消息

场景八:延迟消息

背景描述

需要实现定时任务,如订单超时未支付自动取消。

MQ解决方案

// 订单服务 - 发送延迟消息
@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createOrder(Order order) {// 保存订单orderDao.save(order);// 发送延迟消息,30分钟后检查支付状态rabbitTemplate.convertAndSend("order.delay.exchange","order.create",new OrderCreateEvent(order.getId()),message -> {message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分钟return message;});}
}// 订单超时检查消费者
@Component
@RabbitListener(queues = "order.delay.queue")
public class OrderTimeoutConsumer {@RabbitHandlerpublic void checkOrderPayment(OrderCreateEvent event) {Order order = orderDao.findById(event.getOrderId());if ("UNPAID".equals(order.getStatus())) {// 超时未支付,取消订单orderService.cancelOrder(order.getId(), "超时未支付");}}
}

替代方案对比

方案 优点 缺点
数据库轮询 实现简单 实时性差,数据库压力大
延时队列 实时性好 实现复杂,消息堆积问题
定时任务 可控性强 分布式协调复杂

场景九:消息重试

背景描述

处理消息时可能遇到临时故障,需要重试机制保证最终处理成功。

MQ解决方案

// 消息消费者 with 重试机制
@Service
@Slf4j
public class RetryableConsumer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = "business.queue")public void processMessage(Message message, Channel channel) {try {// 业务处理businessService.process(message);// 确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (TemporaryException e) {// 临时异常,重试log.warn("处理失败,准备重试", e);// 拒绝消息,requeue=truechannel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true  // 重新入队);} catch (PermanentException e) {// 永久异常,进入死信队列log.error("处理失败,进入死信队列", e);channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false  // 不重新入队);}}
}

重试策略

  1. 立即重试:临时故障立即重试
  2. 延迟重试:逐步增加重试间隔
  3. 死信队列:最终无法处理的消息进入死信队列

场景十:事务消息

背景描述

分布式系统中,需要保证多个服务的数据一致性。

MQ解决方案

// 事务消息生产者
@Service
public class TransactionalMessageService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Transactionalpublic void createOrderWithTransaction(Order order) {// 1. 保存订单(数据库事务)orderDao.save(order);// 2. 发送事务消息TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order-tx-topic",MessageBuilder.withPayload(new OrderCreatedEvent(order.getId())).build(),order  // 事务参数);if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {throw new RuntimeException("事务消息发送失败");}}
}// 事务消息监听器
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {@Autowiredprivate OrderDao orderDao;@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 检查本地事务状态Order order = (Order) arg;Order existOrder = orderDao.findById(order.getId());if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {return RocketMQLocalTransactionState.COMMIT_MESSAGE;} else {return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;}} catch (Exception e) {return RocketMQLocalTransactionState.UNKNOWN;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 回查本地事务状态String orderId = (String) msg.getHeaders().get("order_id");Order order = orderDao.findById(orderId);if (order != null && "CREATED".equals(order.getStatus())) {return RocketMQLocalTransactionState.COMMIT_MESSAGE;} else {return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;}}
}

事务消息流程

image

总结

通过以上10个场景,我们可以总结出MQ使用的核心原则:

适用场景

  1. 异步处理:提升系统响应速度
  2. 系统解耦:降低系统间依赖
  3. 流量削峰:应对突发流量
  4. 数据同步:保证最终一致性
  5. 分布式事务:解决数据一致性问题

技术选型建议

场景 推荐MQ 原因
高吞吐 Kafka 高吞吐量,持久化存储
事务消息 RocketMQ 完整的事务消息机制
复杂路由 RabbitMQ 灵活的路由配置
延迟消息 RabbitMQ 原生支持延迟队列

最佳实践

  1. 消息幂等性:消费者必须实现幂等处理
  2. 死信队列:处理失败的消息要有兜底方案
  3. 监控告警:完善的消息堆积监控和告警
  4. 性能优化:根据业务特点调整MQ参数

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。

本文收录于我的技术网站:http://www.susan.net.cn

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/932391.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

提升网站的访问速度优化关键词的方法有哪些

链接&#xff1a;C 设计模式 链接&#xff1a;C 设计模式 - 工厂方法 链接&#xff1a;C 设计模式 - 抽象工厂 链接&#xff1a;C 设计模式 - 建造者模式 原型模式&#xff08;Prototype Pattern&#xff09;是一种创建型设计模式&#xff0c;它允许一个对象通过复制现有对象来…

提示缺少xxx.dll文件,DLL修复工具 全系列完整版 (支持32位/64位winxp/win7/win10/win11系统)

下载地址获取https://weibo.com/ttarticle/p/show?id=2309405219774505812150 image 使用说明 image 自动修复文件 image 手动输入需要安装的DLL文件修复后-文件会下载到以下目录:如果是32位的就放在C:\Windows\Syst…

推广网站建设上海公司注册虚拟地址

大家好&#xff01;我是sum墨&#xff0c;一个一线的底层码农&#xff0c;平时喜欢研究和思考一些技术相关的问题并整理成文&#xff0c;限于本人水平&#xff0c;如果文章和代码有表述不当之处&#xff0c;还请不吝赐教。 以下是正文&#xff01; 一、写文背景 我们在写后端…

中国建设银行支付网站广州有哪些网站建设

目录 1. LSTM的背景人工神经网络的进化循环神经网络&#xff08;RNN&#xff09;的局限性LSTM的提出背景 2. LSTM的基础理论2.1 LSTM的数学原理遗忘门&#xff08;Forget Gate&#xff09;输入门&#xff08;Input Gate&#xff09;记忆单元&#xff08;Cell State&#xff09;…

2025 年电线电缆厂家最新推荐实力厂家榜单:聚焦优质企业,助力精准选购

在当前工业建设、民生工程及新兴产业快速发展的背景下,电线电缆作为关键基础配套产品,其质量与性能直接影响项目安全与效率。然而,市场上企业数量繁杂,部分产品存在质量不达标、技术落后等问题,导致采购方难以精准…

基于MATLAB的火灾检测GUI系统设计与实现

一、系统架构设计 1. 技术架构 graph TD A[摄像头/视频输入] --> B(图像预处理) B --> C{特征提取} C --> D[火焰检测模块] C --> E[烟雾检测模块] D --> F(GUI显示) E --> F F --> G[报警模块] …

博客园登录bug

今天登录博客园遇到的情况 1、人机验证初次生效,之后尝试新密码后只能弹出提示并不能验证 2、短信登录根本收不到验证码

从零开始:C#实现计算表达式解析与求值——以后缀表达式为例

从零开始:C#实现计算表达式解析与求值——以后缀表达式为例当提到表达式解析技术时,很多人第一反应可能是复杂且精细的递归下降方法。这种方法主要用于构建抽象语法树(AST),虽然功能强大,能够处理复杂的语法结构,…

基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行步骤

项目简介该项目是基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过…

tp3企业网站开发百度云平面设计网站源码

转载地址&#xff1a;https://www.cnblogs.com/qingchunjun/p/4208159.html

简单网站建设规划方案新媒体网站建设十大的经典成功案例

目录 AIoT的协同计算 一、背景与意义 二、原理与方法 三、关键技术与挑战 四、应用与实践 AIoT的协同计算 一、背景与意义 随着物联网和人工智能技术的快速发展,智能物联网(AIoT)成为了一个新兴的前沿领域。在这个领域中,多移动终端设备如智能手机、可穿戴设备、无人…

78建筑网站品牌策划书范文案例

我们都知道&#xff0c;当 Oracle 数据库出现性能故障后&#xff0c;一般会在线上实时诊断数据库性能问题&#xff0c;特别是资源突然打高的场景&#xff0c;这个时候用到ASH的数据&#xff0c;就能很大程度上准确定位问题所在。 Oracle ASH 在 Oracle 数据库中&#xff0c;实…

新开传奇网站韩版小橘子被做h网站

我使用的是airbnb规则进行代码eslint。 会出现保存后&#xff0c;数组或者对象最后一个元素尾随逗号。 此时需要在.eslintrc.js中加入下面代码即可 "comma-dangle": ["error", {"arrays": "never","objects": "never…

第五届无线通信、网络与物联网国际学术会议

第五届无线通信、网络与物联网国际学术会议 2025 5th International Conference on Wireless Communication, Networking and Internet of Things (WCNIoT 2025) 第五届无线通信、网络与物联网国际学术会议(WCNIoT 2…

php自适应网站开发wordpress空间满

知乎传送门&#xff1a;https://www.zhihu.com/question/29355920 为什么图片反复压缩后会普遍会变绿而不是其他颜色&#xff1f;这是大神做的模拟迭代压缩的测试&#xff1a;https://m13253.github.io/JPEGreen/。排名第一的回答已经很仔细了&#xff0c;关于图像压缩不是很懂…

2025 年低温冷水机厂家联系方式推荐:广东弘星制冷专注工业温控,提供定制设备与管家式服务

当前工业领域对温度控制的精准度、稳定性及节能性要求日益提升,新能源、医药化工、半导体等关键行业,更是将可靠的温控系统视为生产流程高效运转的核心保障。然而,市场上部分温控设备存在选型适配性差、能耗过高、故…

git信息提交错误,进行修改

git信息提交错误,进行修改安装(ubuntu) apt update apt install git-filter-repo使用,下面的操作,会把 .git/config 的内容进行修改,所以先做备份 cp .git/config ~/git_config# 邮箱 git filter-repo --email-call…

白城学习做网站的学校传扬互动网站建设公司

1.数据关联性删除判断 示例&#xff1a;比如后台发布了一个待抢购的订单&#xff0c;app已经把此单抢购&#xff0c;因为后台没有及时刷新状态&#xff0c;所有如果要删除或下架此笔订单&#xff0c;必须先要验证此订单的状态是否为已经抢购&#xff1b; 2.数据重复录入问题 示…

武功县住房和城乡建设局官网站ico 众筹网站开发

当使用 keep-alive 缓存组件时&#xff0c;组件中的定时器可能会在组件被缓存后继续运行&#xff0c;从而干扰其他组件的逻辑。为了避免这种情况&#xff0c;可以通过以下方法解决&#xff1a; 1. 在组件的 deactivated 钩子中清理定时器 keep-alive 为缓存的组件提供了 acti…

建一个网站大概需要多少钱找网页设计师

软考程序员考试&#xff08;初级资格&#xff09;主要考察计算机基础理论、编程能力及软件开发相关知识。以下是核心知识点总结及备考建议&#xff1a; 一、计算机基础 数制与编码 二进制、八进制、十进制、十六进制转换原码、反码、补码表示&#xff08;整数与浮点数&#xf…