深度解析RocketMq源码-IndexFile

1.绪论

在工作中,我们经常需要根据msgKey查询到某条日志。但是,通过前面对commitLog分析,producer将消息推送到broker过后,其实broker是直接消息到达broker的先后顺序写入到commitLog中的。我们如果想根据msgKey检索一条消息无疑大海捞针,所以们需要像数集一样建立一个目录,我们其实可以想到的是构建一个Map,key存储msgKey,value存储msg在commitLog中的物理偏移量。而这个目录其实就是indexFile。

2.indexFile的组成和原理

indexFile主要由两部分组成,分别是indexFile文件头和index的文件内容。

2.1 indexFile文件头 - IndexHeader 

indexHeader占据40个字节,其中最重要的是他记录了整个索引文件的最开始插入的索引的时间和最后一条数据插入的时间,主要是为了支持根据时间进行范围搜索。以及第一条和最后一条日志的索引位置。还有一个就是已经插入了多少条索引IndexCount。

public class IndexHeader {
//index文件头占4个字节public static final int INDEX_HEADER_SIZE = 40;private static int beginTimestampIndex = 0;private static int endTimestampIndex = 8;private static int beginPhyoffsetIndex = 16;private static int endPhyoffsetIndex = 24;private static int hashSlotcountIndex = 32;private static int indexCountIndex = 36;private final ByteBuffer byteBuffer;//开始的时间戳private final AtomicLong beginTimestamp = new AtomicLong(0);//结束时间戳private final AtomicLong endTimestamp = new AtomicLong(0);//开始的物理偏移量private final AtomicLong beginPhyOffset = new AtomicLong(0);//结束的物理偏移量private final AtomicLong endPhyOffset = new AtomicLong(0);//hash槽的数量private final AtomicInteger hashSlotCount = new AtomicInteger(0);//index的数量private final AtomicInteger indexCount = new AtomicInteger(1);
}

2.2 indexFile的组成

idnexFile的内容包括:

1. 40个字节的indexFile头

2. 4* 500w个字节hash槽,每个槽记录的其实是:根据key取hash值%槽数在当前hash槽的索引的序号(也即当前有多少条索引)

3. 20*2000w个自己的索引数,每条索引20个字节,包含4个字节索引key的hash值+8个字节的物理偏移量+4个字节的当前索引的插入时间距离该索引文件第一条索引的插入时间的差值+4个字节的上一个在当前hash槽的索引的序号。

我们可以画图来描述一下:

e5b98bd4baf04c1c9393127f0a93ebca.png

可以看出idnexFile是采用链地址法解决hash冲突的,每个索引存储有上一条拥有相同hash值索引的index值,相当于通过链表将这些hash冲突的索引串起来。

public class IndexFile {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);//一个hash槽的大小为4个字节private static int hashSlotSize = 4;//一条索引的大小为20字节private static int indexSize = 20;private static int invalidIndex = 0;//hash槽的数量private final int hashSlotNum;//index的总数量private final int indexNum;//index也是存储在mappedFile中的private final MappedFile mappedFile;private final MappedByteBuffer mappedByteBuffer;//index文件的头private final IndexHeader indexHeader;public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final long endTimestamp) throws IOException {//文件总大小 = 头部所占40个字节 + hash槽数量(默认为500w) * 4个字节 + index数量 * 20个字节int fileTotalSize =IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);//新建mappedFilethis.mappedFile = new MappedFile(fileName, fileTotalSize);//获取到与文件建立映射关系的bufferthis.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();//hash槽数量this.hashSlotNum = hashSlotNum;//索引文件的数量this.indexNum = indexNum;ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//index文件的头部this.indexHeader = new IndexHeader(byteBuffer);if (endPhyOffset > 0) {//够级整个索引文件的开始的物理偏移量和结束的偏移量this.indexHeader.setBeginPhyOffset(endPhyOffset);this.indexHeader.setEndPhyOffset(endPhyOffset);}if (endTimestamp > 0) {//够级整个索引文件的开始时间戳和结束时间戳this.indexHeader.setBeginTimestamp(endTimestamp);this.indexHeader.setEndTimestamp(endTimestamp);}}
}

3.向indexFile插入一条索引数据

主要的步骤如下:

1.获取msgKey的hash值;

2.通过hash值对总的hash槽数取模得到对应第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量;

5.分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号(也即当前hash槽中存储的值);

6.将最新一条的索引序号写入到hash槽中。

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {if (this.indexHeader.getIndexCount() < this.indexNum) {//1.获取msgKey的hash值int keyHash = indexKeyHashMethod(key);//2.通过hash值对总的hash槽数取模得到对应第几个槽int slotPos = keyHash % this.hashSlotNum;//3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//获取到上一个hash槽的所指向的索引序号int slotValue = this.mappedByteBuffer.getInt(absSlotPos);if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}//获取当前索引与第一条索引的差值long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号this.mappedByteBuffer.putInt(absIndexPos, keyHash);this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//将最新一条的索引序号写入到hash槽中this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());//更新idnex中的最后一条索引的时间戳和物理偏移量if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}if (invalidIndex == slotValue) {this.indexHeader.incHashSlotCount();}   //增加indexheader索引序号this.indexHeader.incIndexCount();this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);}} else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}

4.从indexFile中读取一条索引数据

1.获取索引key的hash值;

2.hash值对槽总数取模获得第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.从槽中读取到该槽所指向的最新的一条索引序号;

5.40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量;

6.如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环;

7.如果不匹配,便根据链表寻找到拥有相同hash值并且时间匹配的日志;

    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,final long begin, final long end) {if (this.mappedFile.hold()) {//获取索引key的hash值int keyHash = indexKeyHashMethod(key);//hash值对槽总数取模获得第几个槽int slotPos = keyHash % this.hashSlotNum;//40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//从槽中读取到该槽所指向的最新的一条索引序号int slotValue = this.mappedByteBuffer.getInt(absSlotPos);if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {} else {for (int nextIndexToRead = slotValue; ; ) {if (phyOffsets.size() >= maxNum) {break;}// 40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//获取索引的hash值int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);//获取到该索引的物理偏移量long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);//获取到时间戳差值long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//获取到拥有相同槽数的上一条索引序号int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);if (timeDiff < 0) {break;}timeDiff *= 1000L;long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}//如果上一条索引非法,证明已经到达链表头部,跳出循环,证明该条索引就是需要寻找的索引if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {break;}nextIndexToRead = prevIndexRead;}}} catch (Exception e) {log.error("selectPhyOffset exception ", e);} finally {this.mappedFile.release();}}}

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/36691.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Zookeeper:基于Zookeeper的分布式锁

一、Zookeeper分布式锁原理 二、Zookeeper JavaAPI操作 1、Curator介绍 Curator是Apache Zookeeper的Java客户端。常见的Zookeeper Java API&#xff1a; 原生Java API。ZkClient。Curator。 Curator项目目标是简化Zookeeper客户端的使用。Curator最初是Netfix研发的&#xf…

LIMS系统选型时应该避免哪些误区呢

LIMS实验室管理系统在选型、实施及使用过程中&#xff0c;确实存在一些常见的误区。以下是对这些误区的详细解析和归纳&#xff1a; 一、误区 1、只关注功能而忽视用户需求 在LIMS系统的选型过程中&#xff0c;实验室可能过于关注系统的功能和技术特性&#xff0c;而忽视了实…

42.option方法给服务端和客户端配置参数

客户端是Bootstrap.option方法配置参数。 服务端有两个: 1.ServerBootstrap.option方法,给ServerSocketChannel配置参数的。 2.ServerBootstrap.childOption方法,给SocketChannel配置参数的。 package com.xkj.client;import com.xkj.message.*; import com.xkj.protoco…

K近邻回归原理详解及Python代码示例

K近邻回归原理详解 K近邻回归&#xff08;K-Nearest Neighbors Regression, KNN&#xff09;是一种基于实例的学习算法&#xff0c;用于解决回归问题。它通过找到输入数据点在特征空间中最相似的K个邻居&#xff08;即最近的K个数据点&#xff09;&#xff0c;并使用这些邻居的…

C++11的可变参数模板

可变参数模板 什么是可变参数模板的可变参数展开参数包emplace系列函数引例emplace系列函数 什么是可变参数 printf和scanf中就涉及可变参数 这里三个点就代表可变参数&#xff0c;意思就是不管你传多少个参数&#xff0c;都可以接收 printf("%d",x); printf("…

狼牙山短视频:成都柏煜文化传媒有限公司

狼牙山短视频&#xff1a;记录自然与历史的交融 随着短视频的兴起&#xff0c;我们得以在短短几分钟内&#xff0c;跨越千山万水&#xff0c;领略世界各地的风情。成都柏煜文化传媒有限公司 而今天&#xff0c;我想带大家走进一个独特的地方——狼牙山&#xff0c;通过一系列短…

centos挂载新的磁盘

如果是vmware的话&#xff0c;在管理界面&#xff0c;为虚拟机创建一个新的磁盘&#xff0c;然后需要给这个磁盘分区 https://juejin.cn/post/6987200157733371935 1、执行如下命令&#xff0c;查询磁盘分区的UUID。 blkid 磁盘分区 以查询磁盘分区“/dev/vdb1”的UUID为例&am…

Spring Boot中的异常处理策略

Spring Boot中的异常处理策略 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将探讨在Spring Boot应用程序中如何有效地处理异常&#xff0c;保证系统的…

Transformer教程之Transformer的历史背景

在现代人工智能领域&#xff0c;Transformer模型已经成为一种不可或缺的技术&#xff0c;它在自然语言处理&#xff08;NLP&#xff09;和计算机视觉等多个领域取得了巨大的成功。本文将带你回顾Transformer的历史背景&#xff0c;了解它是如何从最初的构想到今天的广泛应用的。…

英国Essay写作攻略怎么才算详细?

写Essay是所有英国留学生都要面对的&#xff0c;很多刚到英国的留学生为了拿到一个完美的成绩单&#xff0c;都会选择找人Essay写作&#xff0c;从而拿到高分。可是你有没有想过&#xff0c;为什么Essay写作能拿高分&#xff0c;而自己写的Essay一直在及格线边上徘徊&#xff0…

Element-UI表单验证 二选一、三选一、多选一验证

Element-UI表单验证二选一验证 在表单提交过程中很多时候要用到几个表单项二选一验证或多选一验证&#xff0c;比如联系方式中的手机号和固定电话只需要填写一项就可通过验证&#xff0c;针对这样情况可以使用Element-UI的自定义验证实现&#xff0c;具体实现方法如下。 HTML…

Web渗透:文件包含漏洞

Ⅱ.远程文件包含 远程文件包含漏洞&#xff08;Remote File Inclusion, RFI&#xff09;是一种Web应用程序漏洞&#xff0c;允许攻击者通过URL从远程服务器包含并执行文件&#xff1b;RFI漏洞通常出现在动态包含文件的功能中&#xff0c;且用户输入未经适当验证和过滤。接着我…

520. 检测大写字母 Easy

我们定义&#xff0c;在以下情况时&#xff0c;单词的大写用法是正确的&#xff1a; 全部字母都是大写&#xff0c;比如 "USA" 。 单词中所有字母都不是大写&#xff0c;比如 "leetcode" 。 如果单词不只含有一个字母&#xff0c;只有首字母大写&#xff0…

生产者发送数据,kafka服务器接收数据异常的问题记录

现象&#xff1a; 某个客户要求审计日志用kafka的方式传输给他们&#xff0c;使用了第三方的librdkafka库来开发。 往客户提供的kafka服务器上的一个topic发送数据&#xff0c;这个topic有三个分区&#xff0c;客户反馈接收到的数据和发送端发送的实际数量对不上&#xff0c;他…

代码随想录第五十二天打卡

647. 回文子串 动态规划解决的经典题目&#xff0c;如果没接触过的话&#xff0c;别硬想 直接看题解。 代码随想录 class Solution { public:int countSubstrings(string s) {vector<vector<bool>>dp(s.size(),vector<bool>(s.size(),false));int res0;for …

使用VMware创建Ubuntu 24.04【一】

相关链接下载地址 VMware https://www.vmware.com/content/vmware/vmware-published-sites/cn/products/workstation-pro/workstation-pro-evaluation.html.html.html Ubuntu 24.04 LTS https://cn.ubuntu.com/download/desktop 虚拟机创建 1、打开VNware软件&#xff0c;点…

5.9k!一款清新好用的后台管理系统!【送源码】

今天给大家分享的开源项目是一个优雅清新后台管理系统——Soybean Admin。 简介 官方是这样介绍这个项目的&#xff1a; Soybean Admin 使用的是Vue3作为前端框架&#xff0c;TypeScript作为开发语言&#xff0c;同时还整合了NaiveUI组件库&#xff0c;使得系统具有高可用性和…

Vue 3 的 <script setup> 语法糖中的e

在 Vue 3 的 <script setup> 语法糖中&#xff0c;可以通过直接在模板的事件监听器中访问事件对象&#xff08;通常命名为 e 或 event&#xff09;来传递它到方法。 以下是一个简单的例子&#xff0c;展示了如何在 Vue 3 的 <script setup> 中获取到 mousemove 事…

Windows 环境下 MySQL Server 清空 Log 文件命令

MySQL Community Server for Windows 下载&#xff1a; MySQL :: Download MySQL Community Server 清空 Log 文件步骤命令&#xff08;MySQL Server 8.0 为例&#xff09;&#xff1a; 1.杀掉 MySQL Workbench 进程 2.删除路径文件 C:\ProgramData\MySQL\MySQL Server 8.0\Dat…

基于YOLOv5+pyqt5的口罩佩戴检测系统(PyQT页面+YOLOv5模型+数据集)

简介 在各种工作环境和公共场所,确保人们正确佩戴口罩对个人防护和公共卫生至关重要,尤其是在医疗设施、制造业车间和拥挤的公共交通中。为了满足这一需求,我们开发了一种基于YOLOv5目标检测模型的口罩佩戴检测系统。本项目不仅实现了高精度的口罩佩戴检测,还设计了一个可…