鲁斌 42450745 网站建设你需要网站建设
news/
2025/9/22 23:31:14/
文章来源:
鲁斌 42450745 网站建设,你需要网站建设,长安网站建设哪家好,织梦wap手机网站模板文章目录 场景分析方法的幂等分布式锁Redis实现分布式锁抢锁的设计思路 分布式锁案例 直击面试rocketmq什么时候重复消费消息丢失的问题消息在哪里丢失发送端确保发送成功并且配合失败的业务处理消费端确保消息不丢失rocketmq 主从同步刷盘 场景分析 分布式系统架构中,队列是分… 文章目录 场景分析方法的幂等分布式锁Redis实现分布式锁抢锁的设计思路 分布式锁案例 直击面试rocketmq什么时候重复消费消息丢失的问题消息在哪里丢失发送端确保发送成功并且配合失败的业务处理消费端确保消息不丢失rocketmq 主从同步刷盘 场景分析 分布式系统架构中,队列是分布式的,生产端是分布式集群,消费端也是分布式集群.
相当于有多个消费端同时监听队列,同时减库存,写入订单.
面试题:如何处理消息重复消费的问题. 重复消费大部分场景,需要解决的.
引入2个概念来解决: 幂等的业务方法,和消息的分布式锁.
方法的幂等
结论: 一个方法的一次业务逻辑调用和N次调用的结果是一致的,我们称这种方法就是幂等.
一旦重复消费,一定要把消费的业务逻辑方法(orderAdd)设计成幂等的.
幂等的方法 GET方法: 查询方法,天生幂等.DELETE方法: 删除方法,天生幂等.PUT方法: 修改,并不是天生幂等,需要设计 减少库存: update stock_tbl set stockstock-#{stock} where id#{id}(不是幂等)select * from stock_log where order_id#{orderId}(查询日志,判断是否已经见过库存了),没有数据 update stock_tbl set stockstock-#{stock} where id#{id},insert into stock_log (字段) values (订单id,商品减库存信息) (这样设计就幂等了,依然有问题) POST方法: 新增,并不是天生幂等,需要设计 新增订单: insert into order_tbl (order_id,order_item_id,count,user_id) values (各种属性);如果使用唯一属性校验,作用在order_id order_sn(编号).同一张订单,这个字段值是相同(幂等满足,没做幂等不满足) 当前orderAdd方法设计幂等的解决思路(之一) 使用订单id或者订单编号,**userId商品id(**这个只满足当前我们的案例特点,不满足实际场景.)查询订单,如果已经存在了,库存不减少,订单不增了,购物车不用删除了
Override
public void orderAdd(OrderAddDTO orderAddDTO) {//幂等设计思路: 利用userId和commodityCode 查询,如果已经存在了订单,方法直接执行结束//如果结果不存在,减库存,生单,删除购物车int countorderMapper.selectExists(orderAddDTO);if (count0){log.debug(订单已经新增了);return;}StockReduceCountDTO countDTOnew StockReduceCountDTO();countDTO.setCommodityCode(orderAddDTO.getCommodityCode());countDTO.setReduceCount(orderAddDTO.getCount());// 利用Dubbo调用stock模块减少库存的业务逻辑层方法实现功能stockService.reduceCommodityCount(countDTO);// 2.从购物车中删除用户选中的商品(调用Cart模块删除购物车中商品的方法)// 利用dubbo调用cart模块删除购物车中商品的方法实现功能Order ordernew Order();BeanUtils.copyProperties(orderAddDTO,order);// 下面执行新增 假设insert是幂等的.orderMapper.insertOrder(order);log.info(新增订单信息为:{},order);cartService.cartDelete(orderAddDTO);
}Select(select count(id) from
order_tbl where user_id#{userId} and commodity_code#{commodityCode})
int selectExists(OrderAddDTO orderAddDTO);分布式锁
当前分布式消费架构 即使,将方法设计成幂等,这个架构中,消息重复消费
,满足线程安全问题的所有因素
并发/多线程写操作共享数据
只要解决其中一点,线程安全问题就消失了.
并发多线程–串行
写操作– 避免写(不能满足当前案例,必须写)
共享数据–个体数据(不能满足,重复消费,重复订单是前提)
分布式线程安全问题的解决方案—分布式锁
错误思路: 引入synchronized同步锁,不能解决分布式场景下,多个进程的并发线程安全问题.
概念: 分布式场景下,多进程,多线程并发的抢锁机制. 抢到资源锁,执行业务逻辑,抢不到等待或者放弃执行.能够避免对同一个资源出现并发多线程操作的解决方案.
和synchronized的区别在于 synchronizeds本地锁.管理一个进程中的多线程,分布式锁是管理多个进程中的多线程.
分布式锁当前落地方案: redis setnx命令
Redis实现分布式锁
抢锁的设计思路 目标: 多线程执行业务之前,先判断执行权限,抢锁,抢到锁的才能执行业务,抢不到的不执行.(当前案例中,抢锁,然后执行的业务逻辑是:orderAdd) 抢锁如何执行?: setnx key “” key值如何设计?: 需要结合业务,设计key值(redis中最主要的功能,都关系到key值的设计),抢锁的逻辑中,满足是业务数据,满足重复消费的重复数据.就可以实现这个key值的设计. 消息Id是重复的. 当前业务流程设计缺陷: 如果有一个消费者抢到锁了,执行了业务方法.执行完成后,没有释放锁的机制.如果引入等待重抢的机制,由于抢到锁的没有释放,会导致死锁. 释放锁的逻辑引入 上述整改的流程中避免了死锁问题,但是存在删除失败导致死锁的问题. 所以,要保证del释放没有成功,在redis也一定不会长期保存. 兜底的解决死锁问题.基本不会出现死锁了. 为了解决误删除的问题,抢锁的时候setnx key value值设计成一个随机数. 随机数两个消费,多个消费者生成相同的可能性极低.
分布式锁案例
Component
RocketMQMessageListener(topic business-order-topic,consumerGroup ${rocketmq.consumer.group},selectorExpression orderAdd)
Slf4j
public class OrderAddConsumerListener implements RocketMQListenerMessageExt {Autowiredprivate IOrderService orderService;Autowiredprivate StringRedisTemplate redisTemplate;Overridepublic void onMessage(MessageExt msg) {//拿到底层消息对象的bodybyte[] body msg.getBody();//尝试先解析成stringString orderJsonnew String(body, StandardCharsets.UTF_8);System.out.println(orderJson);OrderAddDTO orderAddDTOJSON.toJavaObject(JSON.parseObject(orderJson),OrderAddDTO.class);System.out.println(orderAddDTO);//1.生成锁的key值,生成当前这把锁的随机数//准备锁keyString msgKeyLockmsg:order:add:msg.getMsgId();//准备随机数 4 6 8位String randCodenew Random().nextInt(9000)1000;ValueOperationsString, String stringOps redisTemplate.opsForValue();try{//补充消息消费的抢锁机制//2.抢锁 setnx msgKeyLock randCode expire 10sBoolean tryLockSuccess stringOps.setIfAbsent(msgKeyLock, randCode, 10, TimeUnit.SECONDS);//3.判断 抢锁成功还是失败if(!tryLockSuccess){//3.2 失败了 可以等待5秒重新抢锁,也可以直接结束//尝试这里使用while编写等待5秒重新抢的逻辑log.info(有别人抢锁了,msgKey:{},value:{},msgKeyLock,randCode);return;}//3.1 成功了 执行orderAddorderService.orderAdd(orderAddDTO);}catch (CoolSharkServiceException e){//业务异常,说明订单新增业务性失败,比如库存没了log.error(库存减少失败,库存触底了:{},异常信息:{},orderAddDTO,e.getMessage());}finally {//释放锁 读以下锁的value值,等于当前生成value才释放String s stringOps.get(msgKeyLock);if (s!null s.equals(randCode)){//del msgKeyLockredisTemplate.delete(msgKeyLock);}}}
}直击面试
rocketmq什么时候重复消费
在broker做扩展的时候,消息队列的消息,做扩展的时候,原本存储在原队列的消息,会进行rebalance重平衡.消费开始阶段 消费者consumer1 所在group1 绑定队列,push消费模式,使得消费者接受到了queue1 queue2的6条消息.消费过程,成功执行,即将返回确认. 总结:消费者并发消费的逻辑,同一组消费者绑定分布式队列,推送批量的消息在某个消费者还没有来得及消费,或者没来得及返回确认给rocketmq,队列发生了扩容缩容rocketmq会对队列中所有的消息做rebalance重平衡(消息重新分配给不同队列),消费者绑定也充平衡导致已经推送的但是未返回确认的消息,被发送给不同消费者多次.
消息丢失的问题
rocketmq kafka rabbitmq activemq都是队列.只要谈到其中一个.
重复消费的问题(方法必须设计成幂等,一旦设计成幂等,可能造成线程安全隐患,所以引入分布式锁)消息丢失如何处理.
面试题:消息丢失如何处理.
消息在哪里丢失
发送没成功,没有解决不成功的业务逻辑rocketmq保存的时候,断电,宕机,丢失消息(运行的时候,消息存储在内存)消费端丢失消息(没有成功处理消息,就直接返回success,并不是所有的消费逻辑都是先消费,再确认的,如果关注的是消费速度,不关注成功或者是否丢失,就可以这样处理)
发送端确保发送成功并且配合失败的业务处理
同步发送,接收发送结果,SEND_OK才结束.
客户端代码底层都有默认重试(retry 3 times).发送重试都失败了.
处理发送失败的逻辑.
发送到备用/失败的队列记录日志,将消息来源,目标和消息内容,详细记录,等待监控系统,维护人员来直接处理
消费端确保消息不丢失
一定是先消息费,在确认,消费失败,返回失败(rocketmq消费点位保持原有位置不变,同一个消费者组,会重新拿到消息)
rocketmq 主从同步刷盘 同步刷盘(消息数据可靠性保证): 如果持久化内存消息数据到磁盘失败,发送结果没有成功.
异步刷盘: 只要内存接收到了生产端的消息数据,数据是否持久化到磁盘,都会给生产端发送成功接收信息.
主从的双机热备: broker可以配置主从,考虑数据可靠性,和性能,一般主master做同步刷盘,slave做异步刷盘.(都同步刷盘,100%保证消息只要到达rocketmq就不会丢失,但是性能不能保证.)
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/910746.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!