【RocketMQ 存储】- 同步刷盘服务 GroupCommitService

文章目录

  • 1. 前言
  • 2. 参数
  • 3. 队列相关
  • 4. 核心逻辑 run
    • 4.1 waitForRunning
    • 4.2 doCommit
    • 4.3 flush
  • 5. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

RocketMQ 存储部分系列文章:

  • 【RocketMQ 存储】- RocketMQ存储类 MappedFile
  • 【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
  • 【RocketMQ 存储】- broker 端存储单条消息的逻辑
  • 【RocketMQ 存储】- broker 端存储批量消息的逻辑
  • 【RocketMQ 存储】- 同步刷盘和异步刷盘

上一篇文章中,我们解析了消息存储服务里面的同步刷盘和异步刷盘,同时也解析了 FlushDiskWatcher 同步刷盘监视器的逻辑,那这篇文章开始就要解析 GroupCommitService 服务了。


2. 参数

GroupCommitService 是 FlushCommitLogService 的子类,而 FlushCommitLogService 是 ServiceThread 的子类。那么就说明了 GroupCommitService 也是一个线程服务,核心逻辑就是里面的 run 方法。

abstract class FlushCommitLogService extends ServiceThread {protected static final int RETRY_TIMES_OVER = 10;
}

RETRY_TIMES_OVER 是 FlushCommitLogService 里面的参数,看参数名意思就是最大重试次数,因为 Commit 和 Flush 都不是一定能成功的,所以这里就是失败的重试次数。

/*** GroupCommit Service* 同步刷盘服务,也就是说消息需要存储完之后才返回结果*/
class GroupCommitService extends FlushCommitLogService {...
}

好了,上面就是这个类的继承关系,下面来看下这个类里面的属性。

// 添加消息的时候如果需要刷盘就会提交一个刷盘请求到 requestsWrite 写队列中
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
// 读队列,和上面的写队列配合实现读写分离,putRequest 写入的刷盘请求会被交换到 requestsRead 里面,后续被 doCommit 调用
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();// 同步锁
private final PutMessageSpinLock lock = new PutMessageSpinLock();
  • requestsWrite:写队列,添加消息的时候如果需要刷盘就会提交一个刷盘请求到 requestsWrite 写队列中。
  • requestsRead:读队列,和上面的写队列配合实现读写分离,putRequest 写入的刷盘请求会被交换到 requestsRead 里面,后续被 doCommit 调用。
  • lock:同步锁,交换读写队列和添加请求到 requestsWrite 里面的时候会调用。

RocketMQ 在同步刷盘的时候使用了读写队列来完成,那么为什么要用读写队列呢,这其实也是读写分离的一种思想。

当消息添加到 ByteBuffer 之后会提交一个刷盘请求到 requestsWrite 写队列中。GroupCommitRequest 每隔 10ms 执行一次刷盘。阻塞的时候首先会交换一下读写队列,那我们知道,GroupCommitRequest 是用来处理 requestsRead 里面的消息的,当处理完阻塞,这时候 requestsRead 已经清空了,接着就交换 requestsReadrequestsWrite 队列,然后让刷盘请求接着往 requestsWrite 里面写入,流程如下:

  1. GroupCommitService 服务启动。
  2. 调用 waitForRunning 方法阻塞等待,在等待期间请求往 requestsWrite 里面写入,阻塞完毕之后交换读写队列,也就是说相当于 requestsWrite 里面的请求移动到了 requestsRead 队列,而这时候 requestsWrite 又清空了(注意我这里说是请求移动,但是其实是直接交换指针,因为这样交换效率高)。
  3. 当阻塞结束,就调用 doCommit 方法处理 requestsRead 里面的请求,注意哈,当 GroupCommitService 调用 doCommit 方法的时候,其他刷盘请求继续添加到 requestsWrite 集合里面。
  4. 请求处理完后,继续调用 waitForRunning 阻塞,注意这时候刷盘请求还是继续添加到 requestsWrite 集合里面。
  5. 接着阻塞完成,交换读写队列,调用 doCommit 方法处理 requestsRead 里面的请求。
  6. 继续循环上面的过程 …

当然上面是最简单的外层逻辑,但是实际的实现要比这复杂一点,比如当 GroupCommitService 处理 requestsRead 期间又有刷盘请求写入了,这时候还需要阻塞吗?显然是不需要的,所以这就需要考虑如何控制整个流程了。

好了,上面就是 GroupCommitService 里面的几个参数,那么带着疑问来看下同步服务里面的方法。


3. 队列相关

首先是几个跟队列相关的方法,包括往队列里面添加刷盘请求,交换读写队列。

/*** 交换读写队列*/@Overrideprotected void onWaitEnd() {this.swapRequests();}

onWaitEnd 方法的逻辑是交换读写队列,这个方法的本意其实是线程 wait 等待结束之后调用的,不同的实现类不同,对于 GroupCommitService 来说就是交换读写队列。

/*** 交换读写队列*/
private void swapRequests() {// 加锁lock.lock();try {LinkedList<GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;} finally {// 解锁lock.unlock();}
}

这里面就是负责交换读写队列,交换之前需要先加锁。下面是往队列里面添加请求的方法。当请求添加到集合之后就唤醒同步刷盘线程来进行刷盘。

/*** 添加刷盘请求到 requestsWrite 中,GroupCommitRequest 说明是组提交,也就是刷盘是批量处理请求的* @param request*/
public synchronized void putRequest(final GroupCommitRequest request) {// 加锁lock.lock();try {// 把刷盘请求添加到 requestsWrite 中this.requestsWrite.add(request);} finally {// 解锁lock.unlock();}// 唤醒同步刷盘线程this.wakeup();
}

4. 核心逻辑 run

下面就俩看下最核心的逻辑,也就是 run 方法。我们就来看下这里面是怎么实现同步刷盘的。

/*** 同步刷盘逻辑,死循环执行刷盘*/
public void run() {CommitLog.log.info(this.getServiceName() + " service started");// 如果刷盘服务没有停止while (!this.isStopped()) {try {// 每 10ms 刷盘一次this.waitForRunning(10);// 在里面进行批量刷盘,因为刷盘请求是首先会放到 requestsRead 里面再由这个服务来处理的this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// Under normal circumstances shutdown, wait for the arrival of the// request, and then flush// 正常逻辑下,关闭的时候会等待 10ms,然后处理剩余的刷盘请求try {Thread.sleep(10);} catch (InterruptedException e) {CommitLog.log.warn(this.getServiceName() + " Exception, ", e);}synchronized (this) {// 交换读写队列,因为上面把 requestsRead 队列里面的刷盘请求处理完了// 所以这里交换读写队列,下面调用 doCommit() 处理 requestsWrite 队列里面的刷盘请求// 这样就能彻底处理完所有的刷盘请求了this.swapRequests();}this.doCommit();CommitLog.log.info(this.getServiceName() + " service end");
}

首先看下里面的 while 循环,GroupCommitService 同步刷盘服务只要没有调用 stop 是不会停下的。

里面的逻辑如下:

  1. 如果刷盘服务没有停止
    • 睡眠 10 ms,这里是最多睡眠 10ms,因为如果有请求添加到 requestsWrite 里面,服务就会提前被唤醒。
    • 调用 doCommit 进行批量刷盘
  2. 服务停止后
    • 睡眠 10ms
    • 交换读写队列,因为上面 doCommit 已经处理完 requestsRead 队列里面的请求了,所以这里交换队列,下面再调用 doCommit 来处理剩下的请求,当这部分请求处理完了,GroupCommitService 就结束运行了。

4.1 waitForRunning

waitForRunning 这个方法在父类 ServiceThread 中,主要用于阻塞等待 interval 时间。下面来看下里面的逻辑,还是一行一行代码看。

if (hasNotified.compareAndSet(true, false)) {// 如果是这种状态,就说明调用这个方法阻塞之前正在处理 requestsRead 队列里面的 请求// 这时候设置成了 false,表示阻塞等待,所以就交换下 requestsRead 和 requestsWrite 队列,能够让 requestsWrite 继续写入this.onWaitEnd();return;
}

hasNotified 是一个标记,表示有没有被唤醒,初始化会设置成 false,当调用 wakeup 的时候会被修改成 true,表示被唤醒了。

/*** 唤醒线程去处理请求*/
public void wakeup() {// CAS 设置唤醒标记位为 trueif (hasNotified.compareAndSet(false, true)) {// countDown 可以里面让 waitPoint.await 返回waitPoint.countDown(); // notify}
}

再接着往下看,如果 hasNotified.compareAndSet(true, false) 修改失败了,说明服务线程状态是 false 没被唤醒过,就说明这个服务线程可能一直都在阻塞唤醒、阻塞唤醒 …

有可能服务线程并没有被调用过 wakeup 唤醒,对于同步刷盘来说,wakeup 其实就是当 requestsWrite 里面有数据了就可以调用这个方法来唤醒服务线程来处理请求。

// 这里就是 CAS 失败了,意思就是说服务线程状态是 false 没被唤醒,就说明这个服务线程可能一直都在阻塞唤醒、阻塞唤醒....
// 有可能服务线程并没有被调用过 wakeup 唤醒,对于同步刷盘来说,wakeup 其实就是当 requestsWrite 里面有数据了就可以调用这个方法唤醒刷盘服务刷盘
// 重置下计数
waitPoint.reset();

重置完计数之后,就开始阻塞等待了,当阻塞了 10ms 或者被 wakeup 唤醒了(有刷盘请求添加到 requestsWrite 了)或者被中断了,这种情况下都会记录标记位 hasNotified 位 false,然后调用 this.onWaitEnd() 交换读写队列,接着在外层调用 doCommit 来处理 requestsRead 的请求。

try {// 继续阻塞等待 interval(ms)waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {log.error("Interrupted", e);
} finally {// 设置唤醒标记为 falsehasNotified.set(false);// 这里不管是被中断还是被唤醒了都会交换下读写队列,因为这里阻塞的时候外层方法是不断往 requestsWrite 写入刷盘请求的// 所以这里需要交换 requestsWrite 和 requestsRead 队列,这样在比如同步刷盘的时候当调用 waitForRunning 阻塞了 10ms,// 后面调用 doCommit 的时候就可以处理 requestsRead 队列了。this.onWaitEnd();
}

好了,上面就是全部逻辑,下面是整体代码。

/*** 等待一定时间,时间为 interval(ms)* 这个方法同步异步都会被调用,其实 ServiceThread 就是同步线程和异步线程的父类,所以这里面的方法都是通用的方法* @param interval*/
protected void waitForRunning(long interval) {// 设置是否被唤醒,如果已经唤醒了,就改成 false,表示当前线程正在执行刷盘或者已经执行了刷盘操作,所以这时候处理的就是 requestsRead 队列if (hasNotified.compareAndSet(true, false)) {// 如果是这种状态,就说明调用这个方法阻塞之前正在处理 requestsRead 队列里面的 请求// 这时候设置成了 false,表示阻塞等待,所以就交换下 requestsRead 和 requestsWrite 队列,能够让 requestsWrite 继续写入this.onWaitEnd();return;}// 这里就是 CAS 失败了,意思就是说服务线程状态是 false 没被唤醒,就说明这个服务线程可能一直都在阻塞唤醒、阻塞唤醒....// 有可能服务线程并没有被调用过 wakeup 唤醒,对于同步刷盘来说,wakeup 其实就是当 requestsWrite 里面有数据了就可以调用这个方法唤醒刷盘服务刷盘// 重置下计数waitPoint.reset();try {// 继续阻塞等待 interval(ms)waitPoint.await(interval, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("Interrupted", e);} finally {// 设置唤醒标记为 falsehasNotified.set(false);// 这里不管是被中断还是被唤醒了都会交换下读写队列,因为这里阻塞的时候外层方法是不断往 requestsWrite 写入刷盘请求的// 所以这里需要交换 requestsWrite 和 requestsRead 队列,这样在比如同步刷盘的时候当调用 waitForRunning 阻塞了 10ms,// 后面调用 doCommit 的时候就可以处理 requestsRead 队列了。this.onWaitEnd();}
}

到这里我们再来回顾下这个方法的第一段,就是 if (hasNotified.compareAndSet(true, false)) 这里到底有什么用呢?

我们知道当有请求添加,就会调用 putRequest 方法把请求添加到 requestsWrite 中,最后调用 wakeup 唤醒同步刷盘线程。然后把标记位设置为 true。但是有一个奇怪点不知道大家有没有发现,waitForRunning 方法中当服务被唤醒之后在 finally 块里面会设置 hasNotified.set(false)也就是说就算 wakeup 方法修改标记位为 true,在 finally 块里面又会修改成 false。那么这个 if 判断到底有什么用?

其实这个 wakeup 方法上面也说了,就是在请求添加到 requestsWrite 后就会唤醒,也就是说不管 GroupCommitService 是阻塞还是唤醒都可以添加请求,其实在 GroupCommitService 处理刷盘的时候如果有请求过来也会调用 wakeup 方法将 hasNotified 修改为 true。这时候当 GroupCommitService 运行完刷盘逻辑,再调用 waitForRunning 阻塞的时候,hasNotified.compareAndSet(true, false) 就会设置成功,这时候会直接交换队列退出,也就是说不需要再阻塞等待了

通过上面的判断就能够让 GroupCommitService 在有请求到来的时候无缝衔接处理请求,而不是每一次都固定睡眠 10ms。


4.2 doCommit

上面就是阻塞等待的源码,真正的刷盘核心逻辑是在 doCommit 中,doCommit 里面的逻辑不多,下面来看下。

/*** 同步刷盘*/
private void doCommit() {// 从读队列中获取刷盘请求if (!this.requestsRead.isEmpty()) {// ...} else {// Because of individual messages is set to not sync flush, it// will come to this process// 有些消息是同步刷盘不等待,就不需要走上面的流程去读取 requestsRead 处理刷盘请求,这类的也不会往 requestsWrite 里面设置刷盘请求CommitLog.this.mappedFileQueue.flush(0);}
}

我们先来看下整体逻辑,首先判断如果 requestsRead 不为空,那么就处理 requestsRead 里面的请求,否则调用 CommitLog.this.mappedFileQueue.flush(0) 直接刷盘。

那么什么情况下不为空呢?有两种情况:

  1. 长时间没有请求过来
  2. 添加消息的时候设置了不需要等待存储完成后才返回,也就是 messageExt.isWaitStoreMsgOK() 为 false。

好了,下面再来看下当 requestsRead 不为空的时候是怎么处理请求的,首先当然就是需要遍历所有请求了。

// 遍历所有刷盘请求
for (GroupCommitRequest req : this.requestsRead) {...
}

由于添加请求的时候会设置这个请求的 nextOffset,这个 nextOffset 其实就是 ByteBuffer 的写指针 + 消息长度,换句话说这个 nextOffset 就是当消息刷盘之后最少要到达的位置。所以在 for 循环里面会判断两个指针的位置大小比较。

boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

如果 flushWhere 这个指针 >= nextOffset,那就说明这个请求背后的消息早就刷盘了,根本不需要继续刷盘了,否则就开始刷盘。

for (int i = 0; i < 2 && !flushOK; i++) {CommitLog.this.mappedFileQueue.flush(0);// 刷完一次之后继续判断下flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}

这里会刷盘两次,因为是用可能需要刷盘两次的,上面的 flushedWhere 是 MappedFile 列表的指针,不是单个 flushedWhere 的指针,虽然说消息是顺序存储(应该是这样把,看源码是,如果有错欢迎评论指出),但是遇到下面这种情况就得存到下一个文件了。
在这里插入图片描述

比如当需要添加消息到 MappedFile 的时候,MappedFile1 文件剩余的长度不足,这时候消息就会添加到 MappedFile2 的时候,但是 flushedWhere 还是指向原来的 wrotePosition 位置,所以这时候就需要刷盘两次,第一个刷盘 MappedFile1,第二次刷盘 MappedFile2。

在第一次刷盘的时候由于刷到文件尾部了,这时候会更新 flushWhere = fromOffset + wroteOffset,比如第一个文件 1024B,这时候 flushedWhere = 0 + 1024 = 1024,而文件名是从 0 开始,所以 File1 的名字就会是: [0, 1024),那么第二次刷盘通过 1024 就能找到 MappedFile2 进行刷盘。

好了,上面就是刷盘逻辑,前一篇文章我们就说过了,生产者调用同步刷盘的时候有时候会阻塞等待刷盘结果,所以当消息刷盘结束需要对 request 设置返回结果,刷盘成功就返回 PUT_OK,刷盘失败就返回 FLUSH_DISK_TIMEOUT,这里的 wakeupCustomer 其实就是通过 future 去调用 complate 方法通知返回结果。

req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);public void wakeupCustomer(final PutMessageStatus putMessageStatus) {this.flushOKFuture.complete(putMessageStatus);
}

好了,上面就是 for 循环处理刷盘请求的逻辑,当处理完所有请求之后,需要修改当前的最小输盘时间戳,同时清空 requestsRead 队列。

// 获取存储时间戳
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {// 修改最新 commitlog 的刷盘时间撮CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
// 上面数据刷盘刷完了,这里就相当于清空队列
this.requestsRead = new LinkedList<>();

好了,上面就是全部的逻辑,下面给出整体的代码。

/*** 同步刷盘*/private void doCommit() {// 从读队列中获取刷盘请求if (!this.requestsRead.isEmpty()) {// 遍历所有刷盘请求for (GroupCommitRequest req : this.requestsRead) {// 这里的 flushedWhere 是整体已经刷盘的偏移量// req.getNextOffset() 是这条消息写入到 ByteBuffer 中最终的偏移量,意味者如果刷盘,那么最终刷盘完成后 flushedWhere >= nextOffset// 所以这里会判断如果已经大于等于了,就表明这部分数据早就刷盘成功了,也就没必要刷盘boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();for (int i = 0; i < 2 && !flushOK; i++) {// 这里会刷盘两次,因为一条消息有可能横跨了两个文件中// 比如:                File1                               File2//        [start1     flushedWhere   end1]     [start2           end2]// 上面例子中 flushWhere 是上一次刷盘位置,[start1, end1] 和 [start2, end2] 是文件范围,注意哈这里不是说这个范围是已写数据范围// 限制我们假设要添加一条消息,这条消息横跨了两个文件://        [start1     flushedWhere  end1]     [start2 msgStart msgEnd(nextOffset)      end2]// 这时候假设第一个文件剩下的字节放不下一条消息,那么这条消息就需要添加到第二个 MappedFile 上,但是 flushedWhere// 还是指向第一个 MappedFile,所以第一次刷盘刷的是 MappedFile1,而第二次刷盘才是真正刷消息到文件中。在第一次刷盘的时候// 刷到文件尾部了,这时候会更新 flushWhere = fromOffset + wroteOffset。// 比如第一个文件 1024B,这时候 flushedWhere = 0 + 1024 = 1024,而文件名是从 0 开始,所以 File1 的名字就会是: [0, 1024)// 也就是说第二次刷盘的时候通过 flushedWhere 是会找到 File1 的下一个文件的CommitLog.this.mappedFileQueue.flush(0);// 刷完一次之后继续判断下flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();}// 同步刷盘 DefaultMessageStore#putMessages#waitForPutResult 会阻塞等待刷盘完成返回结果// 这个方法就是通过 complete 来通知线程同步刷盘的返回结果// 流程是这样:// 1.线程 DefaultMessageStore#putMessages#waitForPutResult 把追加消息数据到文件中,并且把一个刷盘请求添加到 requestsWrite 中,接着阻塞等待// 2.同步刷盘线程处理里面的刷盘请求,并设置返回值,通过 complete 通知等待线程刷盘结果req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}// 获取存储时间戳long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {// 修改最新 commitlog 的刷盘时间撮CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}// 上面数据刷盘刷完了,这里就相当于清空队列this.requestsRead = new LinkedList<>();} else {// Because of individual messages is set to not sync flush, it// will come to this process// 有些消息是同步刷盘不等待,就不需要走上面的流程去读取 requestsRead 处理刷盘请求,这类的也不会往 requestsWrite 里面设置刷盘请求CommitLog.this.mappedFileQueue.flush(0);}}

4.3 flush

这个就是刷盘的逻辑 CommitLog.this.mappedFileQueue.flush(0),我们这里只看 MappedFileQueue 里面的 flush 方法。

/*** 刷盘逻辑* @param flushLeastPages* @return*/
public boolean flush(final int flushLeastPages) {boolean result = true;// 首先根据刷到哪个位置了,去找到对应的 mappedFile// 如果 flushedWhere = 0,就表示还没有开始写入数据,这时候返回第一个文件MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {// 存储时间long tmpTimeStamp = mappedFile.getStoreTimestamp();// 刷盘,这里返回结果判断逻辑如下// 1.如果没有使用读写分离,就获取 wrotePosition 的位置,就是 MappedByteBuffer 的 position// 2.如果使用了读写分离,就获取 committedPosition 的位置,因为使用读写分离,那么数据需要先写入//   堆外缓存,再刷盘,所以 committedPosition 就是写入堆外缓存的位置int offset = mappedFile.flush(flushLeastPages);// 因为一个 MappedFile 文件会映射一个 ByteBuffer,所以上面的 offset 在 MappedByteBuffer 中的偏移量// 而下面这个全局偏移量就是: mappedFile 的起始偏移量(文件名) + offsetlong where = mappedFile.getFileFromOffset() + offset;// result 就表示 flushedWhere 是不是最新的位点result = where == this.flushedWhere;// 更新 flushedWherethis.flushedWhere = where;if (0 == flushLeastPages) {// 如果最少刷盘页数为 0,就是说只要有数据就更新,那么更新存储时间戳// 如果最小刷盘页不为 0,就不会刷新这个参数this.storeTimestamp = tmpTimeStamp;}}return result;
}

这个方法中首先通过 flushedWhere 找到对应的 MappedFile,然后调用 flush 方法进行刷盘,刷完把 flushedWhere 更新成 mappedFile.getFileFromOffset() + offset,这个 offset 其实就是 commit 指针的位置,正好 flush 之后就刷盘了,所以 flushedWhere 可以这么更新。

这个方法逻辑不复杂,我就不细说了,下面来看一下这里面的最后一个方法,就是如何根据 flushedWhere 找到所属的 MappedFile。

/*** 根据偏移量找属于哪一个 MappedFile,注意一下就是这个偏移量是整体偏移量,就是 MappedFileName + offset** @param offset 全局偏移量* @param returnFirstOnNotFound 如果找不到就返回第一个* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 获取第一个 MappedFileMappedFile firstMappedFile = this.getFirstMappedFile();// 获取最后一个 MappedFileMappedFile lastMappedFile = this.getLastMappedFile();// 如果这两个都不为空if (firstMappedFile != null && lastMappedFile != null) {// 偏移量肯定要在第一个和最后一个文件范围内的// 因为 fromOffset 记录的是起始偏移量,所以最后一个文件的偏移量就要加上文件大小if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 找到对应下标,其实就是(offset - firstMappedFile.getFileFromOffset())/ mappedFileSize,向下取整int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {// 获取对应下标的 MappedFiletargetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// 校验 offset 范围是否合法if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}// 如果上面这种没办法找到,就直接遍历全部 MappedFile,一个一个对比来找for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {// 找到就返回return tmpMappedFile;}}}if (returnFirstOnNotFound) {// 那如果最终都找不到,就看看要不要返回第一个 MappedFilereturn firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;
}

简单总结下里面的流程:

  1. 首先获取第一个 MappedFile 和最后一个 MappedFile。
  2. 找到对应下标的 MappedFile,其实就是(offset - firstMappedFile.getFileFromOffset())/ mappedFileSize,向下取整。
  3. 如果找到的这个 MappedFile 是在范围内,直接返回结果,找到 MappedFile 了。
  4. 如果上面这种没办法找到 MappedFile,就直接遍历全部 MappedFile,一个一个对比来找。
  5. 那如果最终都找不到,就看看要不要返回第一个 MappedFile。

逻辑不复杂,这里就不详细解析了,看注释就可以。


5. 小结

本文解析了同步刷盘服务 GroupCommitService,下一篇文章就解析异步刷盘服务。





如有错误,欢迎指出!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/69300.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS 相关知识

1、高度已知&#xff0c;三栏布局&#xff0c;左右宽度 200&#xff0c;中间自适应&#xff0c;如何实现&#xff1f; <body><div class"box"><div class"box1">高度已知</div><div class"box2">左右宽度 200&…

服务端与多客户端照片的传输,recv,send

一、照片传输 server.c /* * 文件名称&#xff1a;server.c * 创 建 者&#xff1a; * 创建日期&#xff1a;2025年02月07日 * 描 述&#xff1a; */ #include <stdio.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h…

科学上网:原理、工具、配置与注意事项

由于各种原因,我们有时无法直接访问某些境外网站或服务。这时,“科学上网”就应运而生。本文将介绍科学上网的原理、常用工具、详细配置,以及相关注意事项。 一、 什么是科学上网? 科学上网,是指通过特定技术手段绕过网络审查或地理限制,访问通常无法直接访问的互联网资…

基于yolov11的阿尔兹海默症严重程度检测系统python源码+onnx模型+评估指标曲线+精美GUI界面

【算法介绍】 基于YOLOv11的阿尔兹海默症严重程度检测系统是一种创新的医疗辅助工具&#xff0c;旨在通过先进的计算机视觉技术提高阿尔兹海默症的早期诊断和病情监测效率。阿尔兹海默症是一种渐进性的神经退行性疾病&#xff0c;通常表现为认知障碍、记忆丧失和语言障碍等症状…

IDEA编写SpringBoot项目时使用Lombok报错“找不到符号”的原因和解决

目录 概述|背景 报错解析 解决方法 IDEA配置解决 Pom配置插件解决 概述|背景 报错发生背景&#xff1a;在SpringBoot项目中引入Lombok依赖并使用后出现"找不到符号"的问题。 本文讨论在上述背景下发生的报错原因和解决办法&#xff0c;如果仅为了解决BUG不论原…

对于 useMemo 的理解及解析

在 React 中&#xff0c;useMemo 是一个 Hook&#xff0c;用于优化性能。它通过缓存计算结果来避免在每次渲染时都进行昂贵的计算。当依赖项没有变化时&#xff0c;useMemo 会返回缓存的结果&#xff0c;而不是重新计算。 主要功能 缓存计算结果&#xff1a;useMemo 可以记住…

【异常解决】在idea中提示 hutool 提示 HttpResponse used withoud try-with-resources statement

博主介绍&#xff1a;✌全网粉丝22W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…

搜维尔科技:提供人形机器人传感器的应用案例分析

视觉传感器 • 家庭服务场景&#xff1a;在家庭清洁机器人中&#xff0c;视觉传感器可以识别家具、障碍物的位置和形状&#xff0c;规划清洁路径&#xff0c;避开桌椅、宠物玩具等。如小米扫地机器人&#xff0c;通过视觉传感器与算法结合&#xff0c;能构建房间地图&#xff…

虹科波形小课堂 | 三分钟掌握车辆相对压缩测试!不拆发动机、不测缸压就能判断故障缸!

不拆发动机、不测缸压&#xff0c;只测个电流也能知道哪个缸压缩有问题&#xff1f;没错&#xff01;做个相对压缩测试&#xff0c;测下起动电流就行&#xff0c;简单又实用&#xff01;今天&#xff0c;从原理到方法&#xff0c;几分钟教会你&#xff01; 我们都知道&#xf…

自然语言处理NLP_[1]-NLP入门

文章目录 1.自然语言处理入门1. 什么是自然语言处理2.自然语言处理的发展简史3 自然语言处理的应用场景1. **机器翻译**2. **文本分类**3. **情感分析**4. **问答系统**5. **文本生成**6. **信息抽取**7. **语音识别与合成**8. **文本摘要**9. **搜索引擎优化**10. **聊天机器人…

无限使用Cursor

原理&#xff1a;运行程序获得15天的免费试用期&#xff0c;重新运行程序重置试用期&#xff0c;实现无限使用。免费的pro账号&#xff0c;一个月有250的高级模型提问次数。 前提&#xff1a;已安装cursor cursor-vip工具&#xff1a;https://cursor.jeter.eu.org?p95d60efe…

LIMO:少即是多的推理

25年2月来自上海交大、SII 和 GAIR 的论文“LIMO: Less is More for Reasoning”。 一个挑战是在大语言模型&#xff08;LLM&#xff09;中的复杂推理。虽然传统观点认为复杂的推理任务需要大量的训练数据&#xff08;通常超过 100,000 个示例&#xff09;&#xff0c;但本文展…

一种基于Leaflet.Legend的图例动态更新方法

目录 前言 一、场景再现 1、需求描述 2、核心方法介绍 3、存在的问题 二、问题解决 1、重复解决办法 2、图例不展示解决办法 3、成果展示 三、总结 前言 在当今数字化时代&#xff0c;地理信息系统&#xff08;GIS&#xff09;技术已经广泛应用于各个领域&#xff0c;…

【AI时代】使用ollama私有化部署deepseek的过程及问题记录

文章目录 说明下载模型通过ollama下载通过modelscope下载 部署open-webui问题记录临时目录空间不足单机多卡部署后台启动 说明 对于DeepSeek的私有化部署&#xff0c;现在网上已经有很全面的资料了&#xff0c;本文主要记录部署以及部署过程中遇到的问题。目前对于这些问题&am…

使用 SDKMAN! 在 Mac(包括 ARM 架构的 M1/M2 芯片)上安装 Java 8

文章目录 1. 安装 SDKMAN!2. 查找可用的 Java 8 版本3. 安装 Java 84. 验证安装5. 切换 Java 版本&#xff08;可选&#xff09;6. 解决 ARM 架构兼容性问题总结 可以使用 SDKMAN! 在 Mac&#xff08;包括 ARM 架构的 M1/M2 芯片&#xff09;上安装 Java 8。SDKMAN! 是一个强大…

存储异常导致的Oracle重大生产故障

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 作者&#xff1a;IT邦德 中国DBA联盟(ACDU)成员&#xff0c;10余年DBA工作经验 Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主&#xff0c;全网粉丝10万 擅长主流Oracle、MySQL、PG、高斯…

计算机视觉-拟合

一、拟合 拟合的作用主要是给物体有一个更好的描述 根据任务选择对应的方法&#xff08;最小二乘&#xff0c;全最小二乘&#xff0c;鲁棒最小二乘&#xff0c;RANSAC&#xff09; 边缘提取只能告诉边&#xff0c;但是给不出来数学描述&#xff08;应该告诉这个点线是谁的&a…

安全测试|用例设计基本步骤和指南

前言 安全测试用例设计是确保软件应用程序的安全性的一个重要环节。这涉及到识别潜在的安全漏洞和弱点&#xff0c;并设计相应的测试用例来验证这些漏洞是否存在。 以下是一些关于如何设计安全测试用例的基本步骤和指南&#xff1a; 一、需求分析&#xff1a; 1)首先&#x…

【自开发工具介绍】SQLSERVER的ImpDp和ExpDp工具演示05

SQLSERVER的ImpDp和ExpDp工具演示 1、表部分数据导出 (-query) ※「-query」和「-include_table」必须一起使用 「-query」后面字符串是sql文的where语句&#xff0c;但要注意要使用%&#xff0c;需要写%% 验证用&#xff1a;导出的表&#xff0c;导入到新的数据库 db的数…

13.1 深入理解 LangChain Chat Model 与 Prompt Template:重构智能翻译助手的核心

深入理解 LangChain Chat Model 与 Prompt Template:重构智能翻译助手的核心 关键词:LangChain Chat Model, Chat Prompt Template, 翻译系统架构设计, 大模型抽象层, 提示工程优化 1. 为什么需要 Chat Model 抽象层? 在传统翻译系统开发中,对接不同大模型面临三大痛点:…