RocketMQ顺序消费机制

RocketMQ的顺序消费机制通过生产端和消费端的协同设计实现,其核心在于局部顺序性,即保证同一队列(MessageQueue)内的消息严格按发送顺序消费。以下是详细机制解析及关键源码实现:
在这里插入图片描述


一、顺序消费的核心机制

1. 生产端路由策略
  • Sharding Key路由:生产者通过MessageQueueSelector接口将同一业务标识(如订单ID)的消息路由到同一队列。例如,根据订单ID对队列数取模,确保同一订单的消息进入同一队列。
    // 示例:生产者选择队列
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}
    }, orderId);
    

路由方法:
在这里插入图片描述
SelectMessageQueueByHash:按哈希选择消息队列。
SelectMessageQueueByRandom:随机选择消息队列。
SelectMessageQueueByMachineRoom:按照机房选择消息队列。

  • 同步发送:必须使用同步发送(send()方法),异步发送无法保证消息顺序。
2. 消费端锁机制
  • Broker端队列锁:消费者集群模式下,通过定时任务(默认每20秒)向Broker申请队列锁,只有获得锁的消费者实例才能拉取并消费该队列消息。锁的有效期默认60秒,避免宕机导致死锁。
  • 本地队列快照锁:消费者在消费时对ProcessQueue(队列快照)加内存锁(synchronized块),确保同一队列的消息仅由一个线程顺序处理。
3. 消费流程控制
  • 单线程顺序消费:每个队列对应一个消费线程,从ProcessQueue的红黑树(msgTreeMap)中按消息偏移量顺序取出消息,保证消费顺序与存储顺序一致。
  • 失败重试机制:消费失败时,若未达最大重试次数,消息会重新放回ProcessQueue等待下次消费;若超过次数则进入死信队列。

二、关键源码解析

1. 消费者启动与锁管理
  • 服务初始化:消费者启动时,若监听器为MessageListenerOrderly,则创建ConsumeMessageOrderlyService,并启动定时加锁任务。

    // DefaultMQPushConsumerImpl#start
    if (getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeMessageService = new ConsumeMessageOrderlyService(this, listener);consumeMessageService.start();
    }
    
  • 定时加锁ConsumeMessageOrderlyService启动后,定时调用RebalanceImpl.lockAll()向Broker申请锁,更新ProcessQueue的锁定状态。

      public synchronized void lockMQPeriodically() {if (!this.stopped) {this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}}
    
      				for (MessageQueue mq : mqs) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (lockOKMQSet.contains(mq)) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}// 更新`ProcessQueue`的锁定状态 trueprocessQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());} else {// 更新`ProcessQueue`的锁定状态 falseprocessQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}
    
2. 消息拉取与消费
  • 锁检查:拉取消息前检查ProcessQueue是否已锁定,未锁定则延迟拉取。
    // DefaultMQPushConsumerImpl#pullMessage
    if (processQueue.isLocked()) {// 计算消费偏移量并拉取消息
    } else {executePullRequestLater(pullRequest, 3000); // 延迟3秒重试
    }
    
  • 消费线程加锁:消费线程运行时获取队列内存锁,确保单线程处理。
    synchronized (messageQueueLock.fetchLockObject(messageQueue)) {List<MessageExt> msgs = processQueue.takeMessags(batchSize);// 执行消费逻辑
    }
    
3. Broker端锁管理
  • 锁存储:Broker通过RebalanceLockManager维护锁信息,记录消费者ClientID和最后更新时间,超时(默认60秒)则自动释放。
    class LockEntry {String clientId;long lastUpdateTimestamp;boolean isExpired() { /* 检查是否超时 */ }
    }
    
  • 锁竞争:消费者通过lockBatchMQ请求批量加锁,Broker返回成功锁定的队列列表。

三、适用场景与注意事项

  1. 适用场景

    • 分区顺序:如订单流程(创建、支付、完成),同一订单ID的消息需顺序处理。
    • 全局顺序Topic仅一个队列,性能较低,适用于强一致性场景(如证券交易)。
  2. 注意事项

    • 幂等性:因网络抖动或消费者重启可能导致短暂乱序,业务逻辑需支持幂等处理。
    • 队列数选择:分区数越多并发度越高,但需确保同一业务ID的路由一致性。

总结

RocketMQ的顺序消费通过生产端路由策略消费端锁机制Broker协同管理实现。其设计在保证局部顺序的同时兼顾性能,适用于多数业务场景。源码层面,ConsumeMessageOrderlyServiceRebalanceImpl是核心模块,通过定时加锁单线程消费队列快照管理确保顺序性。实际使用时需结合业务特点设计Sharding Key,并处理可能的异常情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/896962.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JavaEE】-- 多线程(初阶)4

文章目录 8.多线程案例8.1 单例模式8.1.1 饿汉模式8.1.2 懒汉模式 8.2 阻塞队列8.2.1 什么是阻塞队列8.2.2 生产者消费者模型8.2.3 标准库中的阻塞队列8.2.4 阻塞队列的应用场景8.2.4.1 消息队列 8.2.5 异步操作8.2.5 自定义实现阻塞队列8.2.6 阻塞队列--生产者消费者模型 8.3 …

【C++设计模式】第四篇:建造者模式(Builder)

注意&#xff1a;复现代码时&#xff0c;确保 VS2022 使用 C17/20 标准以支持现代特性。 分步骤构造复杂对象&#xff0c;实现灵活装配 1. 模式定义与用途 核心目标&#xff1a;将复杂对象的构建过程分离&#xff0c;使得同样的构建步骤可以创建不同的表示形式。 常见场景&am…

vuex中的state是响应式的吗?

在 Vue.js 中&#xff0c;Vuex 的 state 是响应式的。这意味着当你更改 state 中的数据时&#xff0c;依赖于这些数据的 Vue 组件会自动更新。这是通过 Vue 的响应式系统实现的&#xff0c;该系统使用了 ES6 的 Proxy 对象来监听数据的变化。 当你在 Vuex 中定义了一个 state …

若依框架中的岗位与角色详解

若依框架中的岗位与角色详解 一、核心概念与定位 岗位&#xff08;Post&#xff09; 业务职能导向&#xff1a;岗位是用户在组织架构中的职务标识&#xff08;如“开发人员”“项目经理”&#xff09;&#xff0c;用于描述工作职责而非直接控制权限。岗位与部门关联&#xff…

SQL经典常用查询语句

1. 基础查询语句 1.1 查询表中所有数据 在SQL中&#xff0c;查询表中所有数据是最基本的操作之一。通过使用SELECT * FROM table_name;语句&#xff0c;可以获取指定表中的所有记录和列。例如&#xff0c;假设有一个名为employees的表&#xff0c;包含员工的基本信息&#xf…

EP 架构:未来主流方向还是特定场景最优解?

DeepSeek MoE架构采用跨节点专家并行&#xff08;EP&#xff09;架构&#xff0c;在提升推理系统性能方面展现出巨大潜力。这一架构在发展进程中也面临诸多挑战&#xff0c;其未来究竟是会成为行业的主流方向&#xff0c;还是仅适用于特定场景&#xff0c;成为特定领域的最优解…

[密码学实战]Java实现国密(SM2)密钥协商详解:原理、代码与实践

一、代码运行结果 二、国密算法与密钥协商背景 2.1 什么是国密算法&#xff1f; 国密算法是由中国国家密码管理局制定的商用密码标准&#xff0c;包括&#xff1a; SM2&#xff1a;椭圆曲线公钥密码算法&#xff08;非对称加密/签名/密钥协商&#xff09;SM3&#xff1a;密码…

动漫短剧开发公司,短剧小程序搭建快速上线

在当今快节奏的生活里&#xff0c;人们的娱乐方式愈发多元&#xff0c;而动漫短剧作为新兴娱乐形式&#xff0c;正以独特魅力迅速崛起&#xff0c;成为娱乐市场的耀眼新星。近年来&#xff0c;动漫短剧市场呈爆发式增长&#xff0c;吸引众多创作者与观众目光。 从市场规模来看…

第四十五:创建一个vue 的程序

html <div id"app">{{ msg }}<h2>{{ web.title }}</h2><h3>{{ web.url }}</h3> </div> js /*<div id"app"></div> 指定一个 id 为 app 的 div 元素{{ }} 插值表达式, 可以将 Vue 实例中定义的数据在视图…

docer swarm集群部署springboot项目

1.准备两台服务器&#xff0c;安装好docker、docker-compose 因为用到了docker仓库&#xff0c;安装harbor,可以从github下载离线安装包 2. 我这边用到了gitlab-ci,整体流程也都差不多 1&#xff09;打包mvn clean install 2&#xff09;打镜像 docker-compose -f docker-compo…

Python测试框架Pytest的参数化

上篇博文介绍过&#xff0c;Pytest是目前比较成熟功能齐全的测试框架&#xff0c;使用率肯定也不断攀升。 在实际工作中&#xff0c;许多测试用例都是类似的重复&#xff0c;一个个写最后代码会显得很冗余。这里&#xff0c;我们来了解一下pytest.mark.parametrize装饰器&…

开发博客系统

前言 准备工作 数据库表分为实体表和关系表 第一&#xff0c;建数据库表 然后导入前端页面 创建公共模块 就是统一返回值&#xff0c;异常那些东西 自己造一个自定义异常 普通类 mapper 获取全部博客 我们只需要返回id&#xff0c;title&#xff0c;content&#xff0c;us…

【Spring Boot 应用开发】-05 命令行参数

Spring Boot 常用命令行参数 Spring Boot 支持多种命令行参数&#xff0c;这些参数可以在启动应用时通过命令行直接传递。以下是一些常用的命令行参数及其详细说明&#xff1a; 1. 基本配置参数 --server.port端口号 指定应用程序运行的HTTP端口&#xff0c;默认为8080。 jav…

20250304学习记录

第一部分&#xff0c;先来了解一下各种论文期刊吧&#xff0c;毕竟也是这把岁数了&#xff0c;还什么都不懂呢 国际期刊&#xff1a; EI收集的主要有两种&#xff0c; JA&#xff1a;EI源刊 CA&#xff1a;EI会议 CPCI也叫 ISTP 常说的SCI分区是指&#xff0c;JCR的一区、…

2024 年 MySQL 8.0.40 安装配置、Workbench汉化教程最简易(保姆级)

首先到官网上下载安装包&#xff1a;http://www.mysql.com 点击下载&#xff0c;拉到最下面&#xff0c;点击社区版下载 windows用户点击下面适用于windows的安装程序 点击下载&#xff0c;网络条件好可以点第一个&#xff0c;怕下着下着断了点第二个离线下载 双击下载好的安装…

网络安全检查漏洞内容回复 网络安全的漏洞

网络安全的核心目标是保障业务系统的可持续性和数据的安全性&#xff0c;而这两点的主要威胁来自于蠕虫的暴发、黑客的攻击、拒绝服务攻击、木马。蠕虫、黑客攻击问题都和漏洞紧密联系在一起&#xff0c;一旦有重大安全漏洞出现&#xff0c;整个互联网就会面临一次重大挑战。虽…

汽车智能钥匙中PKE低频天线的作用

PKE&#xff08;Passive Keyless Entry&#xff09;即被动式无钥匙进入系统&#xff0c;汽车智能钥匙中PKE低频天线在现代汽车的智能功能和安全保障方面发挥着关键作用&#xff0c;以下是其具体作用&#xff1a; 信号交互与身份认证 低频信号接收&#xff1a;当车主靠近车辆时…

uiautomatorviewer定位元素报Unexpected ... UI hierarchy

发现问题 借鉴博客 Unexpected error while obtaining UI hierarchy android app UI自动化-元素定位辅助工具 Unexpected error while obtaining UI hierarchy&#xff1a;使用uiautomatorviewer定位元素报错 最近在做安卓自动化,安卓自动化主要工作之一就是获取UI树 app端获…

通俗的方式解释“零钱兑换”问题

“零钱兑换”是一道经典的算法题目&#xff0c;其主要问题是&#xff1a;给定不同面额的硬币和一个总金额&#xff0c;求出凑成总金额所需的最少硬币个数。如果没有任何一种硬币组合能组成总金额&#xff0c;返回-1。 解题思路 动态规划&#xff1a;使用动态规划是解决零钱兑…

GBT32960 协议编解码器的设计与实现

GBT32960 协议编解码器的设计与实现 引言 在车联网领域&#xff0c;GBT32960 是一个重要的国家标准协议&#xff0c;用于新能源汽车与监控平台之间的数据交互。本文将详细介绍如何使用 Rust 实现一个高效可靠的 GBT32960 协议编解码器。 整体架构 编解码器的核心由三个主要组…