【RocketMQ】RocketMq之IndexFile深入研究

一:RocketMq 整体文件存储介绍

存储⽂件主要分为三个部分:
  • CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
  • ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
  • IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。

这篇文章主要介绍IndexFile的研究,以rocketmq5.3.0版本作为研究。

二:IndexFile的文件结构

文件整理格式,如下图2-1所示

                                                图2-1 IndexFile 文件结构图


IndexFile 文件格式

  • 文件名:以时间戳命名(例如 20240301120000000),表示该文件索引的消息的时间范围。

  • 文件大小:默认为 400MB,可通过 maxIndexSize 配置调整。

  • 存储路径:默认在 ~/store/index 目录下。

每个 IndexFile 文件由三部分组成:
1. 文件头部(Header)
2. 哈希槽(Hash Slot)区域
3. 索引条目(Index Entry)区域


1. 文件头部(Header)

字段名

长度(字节)

说明

beginTimestamp

8

索引文件覆盖的最小时间戳(消息存储时间)

endTimestamp

8

索引文件覆盖的最大时间戳(消息存储时间)

beginPhyOffset

8

索引文件对应的最小物理偏移量(CommitLog 中的起始位置)

endPhyOffset

8

索引文件对应的最大物理偏移量(CommitLog 中的结束位置)

hashSlotCount

4

哈希槽数量(固定为 5,000,000)

indexCount

4

当前已写入的索引条目数量


2. 哈希槽(Hash Slot)区域

  • 哈希槽数量:固定为 500 万个(5,000,000),每个哈希槽占 4 字节

  • 哈希函数:对消息的 Key(如 UNIQ_KEYKEYS)进行哈希计算,得到槽位索引:
    slotPos = abs(hash(key)) % 5000000

每个哈希槽存储的是 索引条目区域 的起始位置(索引条目链表的头节点)。


3. 索引条目(Index Entry)区域

每个索引条目占 20 字节,包含以下字段:

字段名

长度(字节)

说明

keyHash

4

消息 Key 的哈希值(用于快速比对)

phyOffset

8

消息在 CommitLog 中的物理偏移量

timeDiff

4

消息存储时间与文件头部 beginTimestamp 的时间差(秒级)

slotValue

4

下一个索引条目的位置(用于解决哈希冲突的链表结构)


 三:IndexFile 写入和查询流程

IndexFile 写入流程:

+---------------------+
| Producer 发送消息     |
+---------------------+|v
+---------------------+
| 提取消息的 Key        | --> 如 UNIQ_KEY 或 KEYS 属性
+---------------------+|v
+---------------------+
| 检查 IndexFile 容量   | --> 是否已满?(indexCount >= indexNum)
+---------------------+| 是v
+---------------------+
| 返回 false,写入失败   |
+---------------------+| 否v
+---------------------+
| 计算 Key 的哈希值     | --> `keyHash = indexKeyHashMethod(key)`
+---------------------+|v
+---------------------+
| 计算哈希槽位置         | --> `slotPos = keyHash % hashSlotNum`
+---------------------+|v
+---------------------+
| 计算哈希槽绝对位置      | --> `absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize`
+---------------------+|v
+---------------------+
| 读取哈希槽的当前值      | --> `slotValue = mappedByteBuffer.getInt(absSlotPos)`
+---------------------+|v
+---------------------+
| 校验 slotValue 有效性  | --> 是否无效?(slotValue <= invalidIndex || slotValue > indexCount)
+---------------------+| 是v
+---------------------+
| 将 slotValue 设为无效   | --> `slotValue = invalidIndex`
+---------------------+| 否v
+---------------------+
| 计算时间差 (timeDiff)  | --> `timeDiff = (storeTimestamp - beginTimestamp) / 1000`
+---------------------+|v
+---------------------+
| 处理 timeDiff 边界值   | --> 确保 `0 <= timeDiff <= Integer.MAX_VALUE`
+---------------------+|v
+---------------------+
| 计算索引条目绝对位置     | --> `absIndexPos = IndexHeader.INDEX_HEADER_SIZE + hashSlotNum * hashSlotSize + indexCount * indexSize`
+---------------------+|v
+---------------------+
| 写入索引条目内容        |
| - keyHash            |
| - phyOffset          |
| - timeDiff           |
| - slotValue (nextIndex)|
+---------------------+|v
+---------------------+
| 更新哈希槽指向新条目     | --> `mappedByteBuffer.putInt(absSlotPos, indexCount)`
+---------------------+|v
+---------------------+
| 更新 IndexFile 头部信息 |
| - 若 indexCount <= 1,更新 beginPhyOffset 和 beginTimestamp |
| - 若 slotValue 无效,增加 hashSlotCount |
| - 增加 indexCount      |
| - 更新 endPhyOffset 和 endTimestamp |
+---------------------+|v
+---------------------+
| 返回 true,写入成功     |
+---------------------+|v
+---------------------+
| IndexFile 是否已满?   | -- 是 --> 创建新 IndexFile
| (文件大小 ≥ 400MB)    |
+---------------------+  

源码入口:org.apache.rocketmq.store.index.IndexFile#putKey

IndexFile 查询流程:

+---------------------+
| Consumer 根据 Key 查询 |
+---------------------+|v
+---------------------+
| 计算 Key 的哈希值     | --> `keyHash = Math.abs(key.hashCode())`
+---------------------+|v
+---------------------+
| 计算哈希槽位置         | --> `slotPos = keyHash % 5,000,000`
+---------------------+|v
+---------------------+
| 读取哈希槽的链表头位置   | --> `slotValue = mappedByteBuffer.getInt(slotPos * 4)`
+---------------------+|v
+---------------------+  
| 遍历链表条目           |
| while (slotValue > 0)|
+---------------------+|v
+---------------------+
| 读取索引条目:         |
| - keyHashRead       |
| - phyOffset         |
| - timeDiff          |
| - nextIndex         |
+---------------------+|v
+---------------------+
| 检查时间范围是否匹配?   | --> `storeTime = beginTimestamp + timeDiff * 1000`
| (storeTime ∈ [begin, end]?)|
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 比对 keyHashRead 和 keyHash |
| (是否相等?)          |
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 从 CommitLog 读取实际 Key |
| (检查 Key 是否一致?)    |
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 返回 phyOffset       | --> 添加到结果列表
+---------------------+|v
+---------------------+
| slotValue = nextIndex| --> 继续遍历下一个条目
+---------------------+|v
+---------------------+
| 遍历结束,返回结果列表   |
+---------------------+

源码入口:org.apache.rocketmq.store.index.IndexService#queryOffset

四:IndexFile解决hash冲突问题思想

RocketMQ 的 IndexFile 通过 链地址法(Chaining) 解决哈希冲突问题,其核心思想是将哈希到同一槽位的多个索引条目组织成链表结构,并通过哈希槽(Hash Slot)与索引条目(Index Entry)的关联实现高效写入和查询。以下是具体实现思想及关键设计:


1. 哈希冲突的背景

  • 哈希冲突:不同 Key 经过哈希函数计算后可能得到相同的哈希值,导致被分配到同一个哈希槽。

  • 问题:若不处理冲突,后续 Key 的索引会覆盖已有数据,导致查询结果错误。


2. 解决冲突的核心思想:链地址法

RocketMQ 的 IndexFile 采用 单链表 结构管理同一哈希槽下的所有冲突条目,具体流程如下:

(1) 写入时的链表插入

  • 新条目插入链表头部
    当新 Key 的哈希值与某槽位已有条目冲突时,新条目会被插入链表头部,并更新哈希槽指针指向新条目。

    // 新条目的 nextIndex 指向原头节点
    this.mappedByteBuffer.putInt(absIndexPos + 16, slotValue);
    // 更新哈希槽指针为新条目位置
    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
     
    • 优势:插入时间复杂度为 O(1),无需遍历链表。

(2) 查询时的链表遍历

  • 遍历链表比对 Key
    查询时,从哈希槽指向的链表头节点开始,依次遍历所有条目,通过两次比对(哈希值 + 实际 Key)过滤冲突。

    while (nextIndexToRead > 0) {// 1. 读取条目内容int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);// 2. 比对哈希值if (keyHashRead == keyHash) {// 3. 从 CommitLog 读取实际 Key 比对String keyStored = readKeyFromCommitLog(phyOffsetRead);if (key.equals(keyStored)) {phyOffsets.add(phyOffsetRead);}}// 4. 移动到下一个节点nextIndexToRead = prevIndexRead;
    }

3. 关键设计优化

(1) 哈希槽数量固定

  • 默认 500 万个哈希槽

    private static final int HASH_SLOT_NUM = 5000000; // 默认槽数
    • 目的:通过大量槽位减少哈希冲突的概率,使冲突链表尽可能短。

    • 权衡:槽数过多会占用更多内存,但查询效率更高。

(2) 时间范围过滤

  • 索引条目存储时间差(timeDiff)
    每个索引条目记录消息存储时间与 IndexFile 起始时间的差值(秒级),查询时快速过滤掉不满足时间范围的条目。

    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff * 1000L;
    if (timeRead < begin || timeRead > end) {continue; // 跳过不符合时间条件的条目
    }
    • 优势:减少无效条目的遍历,提升查询性能。

(3) 文件滚动(Rolling)

  • 按时间或大小滚动
    IndexFile 文件默认大小上限为 400MB,或时间跨度超过阈值时,创建新文件。

    • 目的:避免单个文件过大导致链表过长,同时支持按时间范围快速定位文件。

4. 示例场景

写入冲突场景

  • Key1: Ea#20231001123456 → 哈希值 19583063 → 槽位 18332292

  • Key2: FB#20231001123456 → 哈希值 19583063 → 槽位 18332292(冲突)

  • 处理流程

    1. Key1 写入槽位 18332292,链表头指向 Key1。

    2. Key2 写入时,插入链表头部,槽位指针更新为 Key2,Key2 的 nextIndex 指向 Key1。

查询冲突场景

  • 查询 Key: Ea#20231001123456

    1. 哈希计算定位到槽位 18332292。

    2. 遍历链表:

      • 先读取 Key2(哈希值匹配但 Key 不匹配,跳过)。

      • 再读取 Key1(哈希值 + Key 均匹配,返回 phyOffset)。

hash冲突代码调试示例

 public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message msg = new Message("Ea", "TagA" , ("消息1").getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setKeys("20231001123456");producer.sendOneway(msg);Message msg2 = new Message("FB", "TagA" , ("消息3").getBytes(RemotingHelper.DEFAULT_CHARSET));msg2.setKeys("20231001123456");producer.sendOneway(msg2);producer.shutdown();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/70021.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

注解与反射基础

注解 概述 注解&#xff08;Annotation&#xff09;&#xff0c;从jdk5.0引入。 作用 不是程序本身&#xff0c;可以对程序作出解释&#xff08;这一点和注释没什么区别&#xff09;可以被其他程序读取 格式 注释是以“注释名”在代码中存在的&#xff0c;还可以添加一些…

WebSocket——环境搭建与多环境配置

一、前言&#xff1a;为什么要使用多环境配置&#xff1f; 在开发过程中&#xff0c;我们通常会遇到多个不同的环境&#xff0c;比如开发环境&#xff08;Dev&#xff09;、测试环境&#xff08;Test&#xff09;、生产环境&#xff08;Prod&#xff09;等。每个环境的配置和需…

SliverAppBar的功能和用法

文章目录 1 概念介绍2 使用方法3 示例代码 我们在上一章回中介绍了SliverGrid组件相关的内容&#xff0c;本章回中将介绍SliverAppBar组件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1 概念介绍 我们在本章回中介绍的SliverAppBar和普通的AppBar类似&#xff0c;它们的…

BFS(广度优先搜索)——搜索算法

BFS&#xff0c;也就是广度&#xff08;宽度&#xff09;优先搜索&#xff0c;二叉树的层序遍历就是一个BFS的过程。而前、中、后序遍历则是DFS&#xff08;深度优先搜索&#xff09;。从字面意思也很好理解&#xff0c;DFS就是一条路走到黑&#xff0c;BFS则是一层一层地展开。…

【Java基础-42.3】Java 基本数据类型与字符串之间的转换:深入理解数据类型的转换方法

在 Java 开发中&#xff0c;基本数据类型与字符串之间的转换是非常常见的操作。无论是从用户输入中读取数据&#xff0c;还是将数据输出到日志或界面&#xff0c;都需要进行数据类型与字符串之间的转换。本文将深入探讨 Java 中基本数据类型与字符串之间的转换方法&#xff0c;…

数据库 - Sqlserver - SQLEXPRESS、由Windows认证改为SQL Server Express认证进行连接 (sa登录)

本文讲SqlServer Express版本在登录的时候&#xff0c; 如何由Windows认证&#xff0c;修改为Sql Server Express认证。 目录 1&#xff0c;SqlServer Express的Windows认证 2&#xff0c;修改为混合认证 3&#xff0c;启用sa 用户 4&#xff0c;用sa 用户登录 下面是详细…

机器学习--学习计划

3周机器学习速成计划 基于「28原则」&#xff0c;聚焦机器学习20%的核心概念&#xff0c;覆盖80%的常见应用场景。计划分为 理论学习 项目实战&#xff0c;每周学习后通过5个递进项目巩固知识。 &#x1f4c5; 第1周&#xff1a;数据与监督学习基础 学习目标&#xff1a;掌握…

CNN的各种知识点(四): 非极大值抑制(Non-Maximum Suppression, NMS)

非极大值抑制&#xff08;Non-Maximum Suppression, NMS&#xff09; 1. 非极大值抑制&#xff08;Non-Maximum Suppression, NMS&#xff09;概念&#xff1a;算法步骤&#xff1a;具体例子&#xff1a;PyTorch实现&#xff1a; 总结&#xff1a; 1. 非极大值抑制&#xff08;…

GWO优化SVM回归预测matlab

灰狼优化算法&#xff08;Grey Wolf Optimizer&#xff0c;简称 GWO&#xff09;&#xff0c;是由澳大利亚格里菲斯大学的 Mirjalii 等人于 2014 年提出的群智能优化算法。该算法的设计灵感源自灰狼群体的捕食行为&#xff0c;核心思想是对灰狼社会的结构与行为模式进行模仿。 …

elasticsearch8.15 高可用集群搭建(含认证Kibana)

文章目录 1.资源配置2.系统参数优化3.JDK17安装4.下载&安装ES 8.155.生成ES的证书(用于ES节点之间进行安全数据传输)6.修改ES 相关配置文件7.创建es用户并启动8.配置ES的账号和密码(用于ES服务端和客户端)9.下载和安装Kibana10.编辑Kibana配置文件11.启动Kiabana12.访问Kia…

地址查询API接口:高效查询地址信息,提升数据处理效率

地址查询各省市区API接口 地址查询是我们日常生活中经常遇到的一个需求&#xff0c;无论是在物流配送、地图导航还是社交网络等应用中&#xff0c;都需要通过地址来获取地理位置信息。为了满足这个需求&#xff0c;我们可以使用地址查询API接口来高效查询地址信息&#xff0c;提…

3、C#基于.net framework的应用开发实战编程 - 实现(三、三) - 编程手把手系列文章...

三、 实现&#xff1b; 三&#xff0e;三、编写应用程序&#xff1b; 此文主要是实现应用的主要编码工作。 1、 分层&#xff1b; 此例子主要分为UI、Helper、DAL等层。UI负责便签的界面显示&#xff1b;Helper主要是链接UI和数据库操作的中间层&#xff1b;DAL为对数据库的操…

leetcode解题思路分析(一百六十三)1409 - 1415 题

查询带键的排列 给定一个正整数数组 queries &#xff0c;其取值范围在 1 到 m 之间。 请你根据以下规则按顺序处理所有 queries[i]&#xff08;从 i0 到 iqueries.length-1&#xff09;&#xff1a; 首先&#xff0c;你有一个排列 P[1,2,3,…,m]。 对于当前的 i &#xff0c;找…

【自学笔记】GitHub的重点知识点-持续更新

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 GitHub使用指南详细知识点一、GitHub基础与账户管理1. GitHub简介2. 创建与管理GitHub账户3. 创建与配置仓库&#xff08;Repository&#xff09; 二、Git基础与Git…

vscode软件操作界面UI布局@各个功能区域划分及其名称称呼

文章目录 abstract检查用户界面的主要区域官方文档关于UI的介绍 abstract 检查 Visual Studio Code 用户界面 - Training | Microsoft Learn 本质上&#xff0c;Visual Studio Code 是一个代码编辑器&#xff0c;其用户界面和布局与许多其他代码编辑器相似。 界面左侧是用于访…

类和对象(下)——类型转化 static成员 内部类 匿名对象 拷贝对象优化

一、类型转换 1.1 类型转化特点 C支持内置类型隐式类型转换为类类型对象&#xff0c;需要有相关内置类型为参数的构造函数。构造函数前面加explicit就不再支持隐式类型转换。类类型的对象之间也可以隐式转换&#xff0c;需要相应的构造函数支持 内置类型转换为类类型对象&#…

基于场景图的零样本目标导航

参考论文&#xff1a;SG-Nav&#xff1a;Online 3D Scene Graph Prompting for LLM-based Zero-shot Object Navigation 0 前言 基于现成的视觉基础模型VFMs和大语言模型LLM构建了无需任何训练的零样本物体巡航框架SG-Nav。 通过VLMs将机器人对场景的观测构建为在线的3D场景图…

深入解析 clone():高效的进程与线程创建方法(中英双语)

深入解析 clone()&#xff1a;高效的进程与线程创建方法 1. 引言 在 Unix/Linux 系统中&#xff0c;传统的进程创建方式是 fork()&#xff0c;它会复制父进程的地址空间来创建子进程。然而&#xff0c;fork() 复制的资源往往会被 exec() 立即替换&#xff0c;这会导致额外的内…

开屏广告-跳过神器

给大家介绍一款超实用的软件——SKIP&#xff0c;它堪称李跳跳的最佳平替&#xff01;这款软件已经在Github开源免费&#xff0c;完全无需担心内置源问题&#xff0c;也无需导入任何规则。安装完成后&#xff0c;即可直接使用&#xff0c;非常便捷&#xff01; 首次打开软件时…

大模型本地化部署(Ollama + Open-WebUI)

文章目录 环境准备下载Ollama模型下载下载Open-WebUI 本地化部署的Web图形化界面本地模型联网查询安装 Docker安装 SearXNG本地模型联网查询 环境准备 下载Ollama 下载地址&#xff1a;Ollama网址 安装完成后&#xff0c;命令行里执行命令 ollama -v查看是否安装成功。安装成…