假设某主题只有1个分区,该分区有两个副本:Leader 副本在 Broker1 上,Follower 副本在 Broker2 上,其 Leader 副本写入数据和 Follower 副本同步数据的流程如下图:

1.leader 副本将数据写入本地磁盘  KafkaApis.handleProduceRequest(){      replicaManager.appendRecords(){            appendToLocalLog(){                Partition.appendRecordsToLeader(){                    Log.appendAsLeader(){                        Log.append(){                              //通过LogSegment.append()方法写入磁盘                              LogSegment.append()                        }                    }                }            }        }  }2.leader 副本更新LEO  KafkaApis.handleProduceRequest(){      replicaManager.appendRecords(){            appendToLocalLog(){                Partition.appendRecordsToLeader(){                    Log.appendAsLeader(){                        Log.append(){                              //更新Leader副本的LEO值                            updateLogEndOffset(appendInfo.lastOffset + 1)                        }                    }                }            }        }  }3.follower 副本同步数据,携带自身的LEO  AbstractFetchThread.doWork(){      maybeFetch(){            buildFetch(fetchStates){                  //这里的fetchState.fetchOffset 就是Follower副本的LEO值                builder.add(topicPartition, new FetchRequest.PartitionData(                fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))            }        }  }4.leader 副本更新本地保存的Follower副本的LEO  ReplicaManager.fetchMessages(){      //获取读取结果      val logReadResults = readFromLog(){          if (isFromFollower) updateFollowerLogReadResults(replicaId, result){                  //TODO 更新leader保存的各个follower副本的LEO                  partition.updateReplicaLogReadResult(replica, readResult){                      //TODO 最终更新所有的replica的LEO的值                      replica.updateLogReadResult(logReadResult){                          //更新LEO对象                          logEndOffsetMetadata = logReadResult.info.fetchOffsetMetadata                    }                }            }      }  }5.leader 副本尝试更新ISR列表    ReplicaManager.fetchMessages(){      //获取读取结果      val logReadResults = readFromLog(){          if (isFromFollower) updateFollowerLogReadResults(replicaId, result){                  //TODO 尝试更新ISR列表                  val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult){                      //更新ISR列表                      updateIsr(newInSyncReplicas)                }            }      }  }6.leader 副本更新HW    ReplicaManager.fetchMessages(){      //获取读取结果      val logReadResults = readFromLog(){          if (isFromFollower) updateFollowerLogReadResults(replicaId, result){                  //TODO 尝试更新ISR列表及Leader副本的HW                  val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult){                      //TODO 尝试更新leader的HW                      maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs){                          //取ISR列表中副本的最小的LEO作为新的HW                          val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)                          //获取旧的HW                          val oldHighWatermark = leaderReplica.highWatermark                          //如果新的HW值大于旧的HW值,就更新                          if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||                            (oldHighWatermark.messageOffset == newHighWatermark.messageOffset &&                             oldHighWatermark.onOlderSegment(newHighWatermark))) {                            //更新 Leader 副本的 HW                            leaderReplica.highWatermark = newHighWatermark                        }                    }                }            }      }  }7.leader 副本给 follower副本 返回数据,携带leader 副本的 HW 值  ReplicaManager.fetchMessages(){      //获取读取结果      val logReadResults = readFromLog(){         readFromLocalLog(){           read(){             val readInfo = partition.readRecords(){                 //获取Leader Replica的高水位                 val initialHighWatermark = localReplica.highWatermark.messageOffset             }           }         }      }  }8.follower 副本写入数据,更新自身LEO、  ReplicaFetcherThread.processPartitionData(){    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false){            doAppendRecordsToFollowerOrFutureReplica(){                Log.appendAsFollower(){                    Log.append(){                          //更新Follower副本的LEO值                          updateLogEndOffset(appendInfo.lastOffset + 1)                    }                }            }        }  }9.follower 副本更新本地的 HW 值  ReplicaFetcherThread.processPartitionData(){        //根据leader返回的HW,更新Follower本地的HW:取Follower本地LEO 和 Leader HW 的较小值        val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark)        //TODO 更新Follower副本的 HW 对象        replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)  }- 对于HW,Leader 副本和 Follower 副本只保存自身的 
- 对于LEO,Follower 副本只保存自身的,但是 Leader 副本除了保存自身的外,还会保存所有 Follower 副本的 LEO 值 
- 无论是Leader副本所在节点,还是Follower副本所在节点,分区对应的Partition 对象都会保存所有的副本对象,但是只有本地副本对象有对应的日志文件 
整个数据写入及同步的过程分为九个步骤:
- leader 副本将数据写入本地磁盘
- leader 副本更新 LEO
- follower 副本发送同步数据请求,携带自身的 LEO
- leader 副本更新本地保存的其它副本的 LEO
- leader 副本尝试更新 ISR 列表
- leader 副本更新 HW
- leader 副本给 follower 副本返回数据,携带 leader 副本的 HW 值
- follower 副本接收响应并写入数据,更新自身 LEO
- follower 副本更新本地的 HW 值
下面具体分析这几个步骤。第一、二步在分析日志对象的写数据流程时已经详细介绍过,这里不再赘述(《深入理解Kafka服务端之日志对象的读写数据流程》)。 对于后面的几个步骤,由于发生在不同的节点上,并没有按照这个顺序进行分析,而是分成了
- Follower副本的相关操作:即 第三步、第八步、第九步
- Leader副本的相关操作:即 第四步、第五步、第六步、第七步
- 抽象类:AbstractFetcherThread
- 实现类:ReplicaFetcherThread
abstract class AbstractFetcherThread(name: String,//线程名称                                     clientId: String,//Cliend ID,用于日志输出                                     val sourceBroker: BrokerEndPoint,//数据源Broker地址                                     failedPartitions: FailedPartitions,//线程处理过程报错的分区集合                                     fetchBackOffMs: Int = 0,//拉取的重试间隔,默认是 Broker 端参数 replica.fetch.backoff.ms 值。                                     isInterruptible: Boolean = true)//是否允许线程中断  extends ShutdownableThread(name, isInterruptible) {  type FetchData = FetchResponse.PartitionData[Records]  type EpochData = OffsetsForLeaderEpochRequest.PartitionData  //泛型 PartitionFetchState:表征分区读取状态,包含已读取偏移量和对应的副本读取状态  //副本状态由 ReplicaState 接口定义,包含 读取中 和 截断中 两个  private val partitionStates = new PartitionStates[PartitionFetchState]  ...}其中,type 的用法是:给指定的类起一个别名,如:
type FetchData = FetchResponse.PartitionData[Records]后面就可以用 FetchData 来表示 FetchResponse.PartitionData[Records] 类;EpochData 同理。
FetchResponse.PartitionData:FetchResponse是封装的FETCH请求的响应类,PartitionData是一个嵌套类,表示响应中单个分区的拉取信息,包括对应Leader副本的高水位,分区日志的起始偏移量,拉取到的消息集合等。
public static final class PartitionData<T extends BaseRecords> {    public final Errors error;//错误码    public final long highWatermark;//从Leader返回的分区的高水位值    public final long lastStableOffset;// 最新LSO值    public final long logStartOffset;//日志起始偏移量    public final Optional preferredReadReplica;// 期望的Read Replica;KAFKA 2.4之后支持部分Follower副本可以对外提供读服务    public final List abortedTransactions;// 该分区对应的已终止事务列表    public final T records;//消息集合}public static class PartitionData {    public final Optional currentLeaderEpoch;    public final int leaderEpoch;}PartitionFetchState:样例类,用来表征分区的读取状态。包含已拉取的偏移量,当前leader的epoch,副本读取状态等
case class PartitionFetchState(fetchOffset: Long,//已拉取的偏移量                               currentLeaderEpoch: Int,//当前epoch                               delay: DelayedItem,                               state: ReplicaState//副本读取状态                              ) {  //表征分区的读取状态  //1.可拉取,表明副本获取线程当前能够读取数据。判断条件是:副本处于Fetching且未被推迟执行  def isReadyForFetch: Boolean = state == Fetching && !isDelayed  //2.截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。判断条件是:副本处于Truncating且未被推迟执行  def isTruncating: Boolean = state == Truncating && !isDelayed  //3.被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。判断条件是:存在未过期的延迟任务  def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0}- isReadyForFetch:可拉取,表明副本获取线程当前能够读取数据。判断条件是:副本处于Fetching且未被推迟执行
- isTruncating:截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。判断条件是:副本处于Truncating且未被推迟执行
- isDelayed:被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。判断条件是:存在未过期的延迟任务
副本读取的状态
ReplicaState:特质,用来表征副本读取状态。
sealed trait ReplicaState//截断中case object Truncating extends ReplicaState//拉取中case object Fetching extends ReplicaState- Truncating:截断中
- Fetching:拉取中
- buildFetch:封装拉取数据的请求
- truncate:进行日志截断
- processPartitionData:处理返回的响应
- doWork:将上面定义的三个方法串联起来,形成一个闭环,并不断地重复执行。从而实现从Leader副本所在的节点同步消息
class ReplicaFetcherThread(name: String,                           fetcherId: Int,//Follower 拉取的线程 Id,也就是线程的编号。                           // 单台 Broker 上,允许存在多个 ReplicaFetcherThread 线程。                           // Broker 端参数 num.replica.fetchers,决定了 Kafka 到底创建多少个 Follower 拉取线程。                           sourceBroker: BrokerEndPoint,                           brokerConfig: KafkaConfig,//服务端配置类,用来获取配置信息                           failedPartitions: FailedPartitions,                           replicaMgr: ReplicaManager,//副本管理器。该线程类通过副本管理器来获取分区对象、副本对象以及它们下面的日志对象。                           metrics: Metrics,                           time: Time,                           quota: ReplicaQuota,//用做限流。作用是控制 Follower 副本拉取速度                           leaderEndpointBlockingSend: Option[BlockingSend] = None//用于实现同步发送请求的类。                           // 所谓的同步发送,是指该线程使用它给指定 Broker 发送请求,然后线程处于阻塞状态,直到接收到 Broker 返回的 Response。                          )extends AbstractFetcherThread(                              name = name,                                clientId = name,                                sourceBroker = sourceBroker,                                failedPartitions,                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,                                isInterruptible = false) {  //Follower副本所在Broker的Id  private val replicaId = brokerConfig.brokerId  //用于执行请求发送的类  private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(        new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId,        s"broker-$replicaId-fetcher-$fetcherId", logContext))    //Follower发送的FETCH请求被处理返回前的最长等待时间,由参数:replica.fetch.wait.max.ms 配置,默认 500 毫秒    private val maxWait = brokerConfig.replicaFetchWaitMaxMs    //每个FETCH Response返回前必须要累积的最少字节数,由参数:replica.fetch.min.bytes 配置,默认 1 字节    private val minBytes = brokerConfig.replicaFetchMinBytes    //每个合法FETCH Response的最大字节数,由参数:replica.fetch.response.max.bytes 配置,默认 10 M    private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes    //单个分区能够获取到的最大字节数,由参数:replica.fetch.max.bytes 配置,默认 1 M    private val fetchSize = brokerConfig.replicaFetchMaxBytes    ...}buildFetch() 方法:为指定分区集合构建对应的FetchRequest.Builder 对象,而该对象是构建 FetchRequest 的核心组件。
这个方法中有一个重要的操作:
- 封装拉取请求时,携带了Follower副本的 LogStartOffset 和 LEO 值(对应同步数据的第三步) 
 override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]):             ResultWithPartitions[Option[FetchRequest.Builder]] = {    //定义一个保存出错分区的集合    val partitionsWithError = mutable.Set[TopicPartition]()    val builder = fetchSessionHandler.newBuilder()    // 遍历每个分区,将处于可获取状态的分区添加到builder后续统一处理    // 对于有错误的分区加入到出错分区集合    partitionMap.foreach { case (topicPartition, fetchState) =>      //如果分区的状态是可拉取的,且该分区未对follower限流      if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {        try {          //获取本地Follower副本保存的分区日志的logStartOffset          val logStartOffset = replicaMgr.localReplicaOrException(topicPartition).logStartOffset          /**将分区和对应的PartitionData添加到builder,注意这里的PartitionData对应的是拉取请求FetchRequest,里面封装了拉取请求的元数据信息,如:          * fetchOffset:拉取消息的起始偏移量,也就是Follower副本的LEO          * currentLeaderEpoch:Follower副本保存的leader epoch值          */          builder.add(topicPartition, new FetchRequest.PartitionData(            fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))        } catch {          case _: KafkaStorageException =>      //如果有异常,将该分区添加到出错分区的集合            partitionsWithError += topicPartition        }      }    }    val fetchData = builder.build()    val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {      None    } else {      //构造FETCH请求的Builder对象      val requestBuilder = FetchRequest.Builder        .forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend)        .setMaxBytes(maxBytes)        .toForget(fetchData.toForget)        .metadata(fetchData.metadata)      Some(requestBuilder)    }    //构建返回结果,返回Builder对象以及出错分区列表    ResultWithPartitions(fetchRequestOpt, partitionsWithError)  }truncate() 方法:用于将指定分区的日志截断到指定的偏移量
override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {    //根据分区获取本地副本    val replica = replicaMgr.localReplicaOrException(tp)    val partition = replicaMgr.getPartition(tp).get    //调用Partition.truecateTo方法进行日志截断    // offsetTruncationState.offset:要截断到的偏移量    partition.truncateTo(offsetTruncationState.offset, isFuture = false)    if (offsetTruncationState.offset < replica.highWatermark.messageOffset)      warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " +        s"${replica.highWatermark.messageOffset}")    if (offsetTruncationState.truncationCompleted)      replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp,        offsetTruncationState.offset)}这个方法内部依次调用了:Partition.truncateTo -> LogManager.truncateTo -> Log.truncateTo -> LogSegment.truncateTo 进行日志截断操作
processPartitionData方法:用于处理指定分区从Leader副本所在节点返回的响应,将获取的消息写入本地存储,并返回写入消息的元数据
这里有两个个重要的操作:
- 写入消息,更新 Follower 副本的 LEO(对应同步数据的第八步) 
- 更新 Follower 副本本地的 HW 值(对应同步数据的第九步) 
override def processPartitionData(topicPartition: TopicPartition,   // 拉取数据的分区                                    fetchOffset: Long,              // 拉取的消息集合的起始位移                                    partitionData: FetchData        // 读取到的分区消息数据                                   ): Option[LogAppendInfo] = {     // 返回值:写入已读取消息数据前的元数据    //从副本管理器获取副本对象Replica    val replica = replicaMgr.localReplicaOrException(topicPartition)    //从副本管理器获取指定主题分区对象    val partition = replicaMgr.getPartition(topicPartition).get    //将获取的消息封装成MemoryRecords    val records = toMemoryRecords(partitionData.records)    //判断获取的消息集合是否超限    maybeWarnIfOversizedRecords(records, topicPartition)    //如果获取消息的起始位移值不是本地日志LEO值则视为异常情况    if (fetchOffset != replica.logEndOffset)      throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(        topicPartition, fetchOffset, replica.logEndOffset))    if (isTraceEnabled)      trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"        .format(replica.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))    //TODO 写入Follower副本本地日志,更新自身的LEO    val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)    if (isTraceEnabled)      trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"        .format(replica.logEndOffset, records.sizeInBytes, topicPartition))    //根据leader返回的HW,更新Follower本地的HW:取Follower本地LEO 和 Leader HW 的较小值    val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark)    //获取从leader返回的LogStartOffset    val leaderLogStartOffset = partitionData.logStartOffset    //TODO 更新Follower副本的HW对象    replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)    //尝试更新Follower副本的LogStartOffset    replica.maybeIncrementLogStartOffset(leaderLogStartOffset)    if (isTraceEnabled)      trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")    // 副本消息拉取限流    if (quota.isThrottled(topicPartition))      quota.record(records.sizeInBytes)    replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)    //返回写入消息的元数据    logAppendInfo  }override def doWork() {    //尝试日志截断    maybeTruncate()    //尝试拉取数据    maybeFetch()}这个方法很简单,只在内部调用了两个方法:
maybeTruncate():尝试进行日志截断
private def maybeTruncate(): Unit = {    // 将所有处于截断中状态的分区依据有无Leader Epoch值进行分组    val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()    // 对于有Leader Epoch值的分区,将日志截断到Leader Epoch值对应的位移值处    if (partitionsWithEpochs.nonEmpty) {      truncateToEpochEndOffsets(partitionsWithEpochs)    }    // 对于没有Leader Epoch值的分区,将日志截断到高水位值处    if (partitionsWithoutEpochs.nonEmpty) {      truncateToHighWatermark(partitionsWithoutEpochs)    }}这里先看对于没有Leader Epoch的分区,将日志截断到高水位处:
private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {    val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]    // 遍历每个要执行截断操作的分区对象    for (tp       // 获取分区的分区读取状态      val partitionState = partitionStates.stateValue(tp)      if (partitionState != null) {        // 取出高水位值。        val highWatermark = partitionState.fetchOffset        //封装截断状态        val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)        info(s"Truncating partition $tp to local high watermark $highWatermark")        // 执行截断到高水位值        if (doTruncate(tp, truncationState))          //保存分区和对应的截取状态          fetchOffsets.put(tp, truncationState)      }    }    // 更新这组分区的分区读取状态    updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)}private def maybeFetch(): Unit = {    //获取分区状态集合和对应的拉取请求的集合    val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {      //获取要拉取消息的分区和分区对应状态的集合      val fetchStates = partitionStates.partitionStateMap.asScala      // TODO 第一步:为集合中的分区构造FetchRequest.builder对象,这里的返回结果有两个对象:      //fetchRequestOpt:要读取的分区核心信息 + FetchRequest.Builder 对象。      // 而这里的核心信息,就是指要读取哪个分区,从哪个位置开始读,最多读多少字节,等等。      //partitionsWithError:一组出错的分区      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)      //TODO 第二步:处理出错的分区,处理方式主要是将这个分区加入到有序Map末尾,等待后续重试      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")      // 如果当前没有可读取的分区,则等待fetchBackOffMs时间等候后续重试      if (fetchRequestOpt.isEmpty) {        trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)      }      (fetchStates, fetchRequestOpt)    }    //TODO 第三步:遍历FETCH请求,发送FETCH请求给Leader副本,并处理Response    fetchRequestOpt.foreach { fetchRequest =>      processFetchRequest(fetchStates, fetchRequest)    }}val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)b:处理出错的分区。处理方式主要是将这个分区加入到有序Map末尾,等待后续重试
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")//将给定的分区从map头部移除,然后再加到尾部,以达到轮询的目的//这里的LinkedHashMap对于插入元素是有顺序的,加入插入顺序是abcde,先读取了a,// 为了保证公平性,会将a从集合中先移除,然后放到尾部,那么下次就从b开始读public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {    map.remove(topicPartition);    map.put(topicPartition, state);    updateSize();}c:遍历并发送FETCH请求给Leader副本,然后处理Response
fetchRequestOpt.foreach { fetchRequest =>  processFetchRequest(fetchStates, fetchRequest)}private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],                                  fetchRequest: FetchRequest.Builder): Unit = {    //定义出错分区的集合    val partitionsWithError = mutable.Set[TopicPartition]()    //定义接收响应数据的集合    var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty    try {      trace(s"Sending fetch request $fetchRequest")      //给Leader发送FETCH请求,获取响应数据      responseData = fetchFromLeader(fetchRequest)    } catch {      case t: Throwable =>        if (isRunning) {          warn(s"Error in response for fetch request $fetchRequest", t)          inLock(partitionMapLock) {            partitionsWithError ++= partitionStates.partitionSet.asScala            partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)          }        }    }    //更新请求发送速率指标    fetcherStats.requestRate.mark()    //如果接收到了响应    if (responseData.nonEmpty) {      inLock(partitionMapLock) {        //遍历响应结果中的分区和分区对应的数据        responseData.foreach { case (topicPartition, partitionData) =>          Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>            //获取分区对应的拉取状态            val fetchState = fetchStates(topicPartition)            // 处理Response的条件:            // 1. 获取的消息集合的起始偏移量和之前已保存的下一条待写入偏移量相等            // 2. 当前分区处于可获取状态            if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {              //获取请求中携带的Follower副本保存的 leader epoch 值              val requestEpoch = if (fetchState.currentLeaderEpoch >= 0)                Some(fetchState.currentLeaderEpoch)              else                None              partitionData.error match {                // 如果没有错误                case Errors.NONE =>                  try {                    // 交由子类完成Response的处理                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,                      partitionData)                    logAppendInfoOpt.foreach { logAppendInfo =>                      val validBytes = logAppendInfo.validBytes                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset                      fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset)                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {                        val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch,                          state = Fetching)                        // 将该分区放置在有序Map读取顺序的末尾,保证公平性                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)                        fetcherStats.byteRate.mark(validBytes)                      }                    }                  } catch {                    case ime: CorruptRecordException =>                      error(s"Found invalid messages during fetch for partition $topicPartition " +                        s"offset ${currentFetchState.fetchOffset}", ime)                      partitionsWithError += topicPartition                    case e: KafkaStorageException =>                      error(s"Error while processing data for partition $topicPartition " +                        s"at offset ${currentFetchState.fetchOffset}", e)                      markPartitionFailed(topicPartition)                    case t: Throwable =>                      error(s"Unexpected error occurred while processing data for partition $topicPartition " +                        s"at offset ${currentFetchState.fetchOffset}", t)                      markPartitionFailed(topicPartition)                  }                // 如果读取位移值越界,通常是因为Leader发生变更                case Errors.OFFSET_OUT_OF_RANGE =>                  //调整越界,主要办法是做截断                  if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))                    //如果依然不能成功,将该分区添加到出错分区集合                    partitionsWithError += topicPartition                //如果Follower本地保存的Leader Epoch值比Leader所在Broker上的Epoch值要新                case Errors.UNKNOWN_LEADER_EPOCH =>                  debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +                    s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.")                  // 加入到出错分区集合                  partitionsWithError += topicPartition                // 如果Follower本地保存的Leader Epoch值比Leader所在Broker上的Epoch值要旧                case Errors.FENCED_LEADER_EPOCH =>                  //将该分区标记为失效,从分区拉取状态集合中移除,并加入到失效分区集合                  if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition                // 如果Leader发生变更                case Errors.NOT_LEADER_FOR_PARTITION =>                  debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +                    "that the partition is being moved")                  // 加入到出错分区列表                  partitionsWithError += topicPartition                case _ =>                  error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}",                    partitionData.error.exception)                  // 加入到出错分区集合                  partitionsWithError += topicPartition              }            }          }        }      }    }    // 处理出错分区集合,主要就是将该分区放到map数据结构的末尾    if (partitionsWithError.nonEmpty) {      handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")    }  }内部调用了ReplicaManager.fetchMessages() 方法:
def handleFetchRequest(request: RequestChannel.Request) {  ...    //TODO 这里是处理Follower Replica 拉取消息请求的具体方法    replicaManager.fetchMessages(        fetchRequest.maxWait.toLong,        fetchRequest.replicaId,        fetchRequest.minBytes,        fetchRequest.maxBytes,        versionId <= 2,        interesting,        replicationQuota(fetchRequest),        processResponseCallback,        fetchRequest.isolationLevel)    ...}fetchMessages() 方法:
def fetchMessages(timeout: Long,                    replicaId: Int,                    fetchMinBytes: Int,                    fetchMaxBytes: Int,                    hardMaxBytesLimit: Boolean,                    fetchInfos: Seq[(TopicPartition, PartitionData)],                    quota: ReplicaQuota = UnboundedQuota,                    responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,                    isolationLevel: IsolationLevel) {    val isFromFollower = Request.isValidBrokerId(replicaId)    val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId    val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)      FetchLogEnd    else if (isolationLevel == IsolationLevel.READ_COMMITTED)      FetchTxnCommitted    else      FetchHighWatermark    //从本地磁盘读取数据    def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {      val result = readFromLocalLog(        replicaId = replicaId,        fetchOnlyFromLeader = fetchOnlyFromLeader,        fetchIsolation = fetchIsolation,        fetchMaxBytes = fetchMaxBytes,        hardMaxBytesLimit = hardMaxBytesLimit,        readPartitionInfo = fetchInfos,        quota = quota)      if (isFromFollower) updateFollowerLogReadResults(replicaId, result)      else result    }    //获取读取结果    val logReadResults = readFromLog()    var bytesReadable: Long = 0    var errorReadingData = false    val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]    logReadResults.foreach { case (topicPartition, logReadResult) =>      if (logReadResult.error != Errors.NONE)        errorReadingData = true      bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes      logReadResultMap.put(topicPartition, logReadResult)    }    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {      val fetchPartitionData = logReadResults.map { case (tp, result) =>        tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,          result.lastStableOffset, result.info.abortedTransactions)      }      responseCallback(fetchPartitionData)    } else {      val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]      fetchInfos.foreach { case (topicPartition, partitionData) =>        logReadResultMap.get(topicPartition).foreach(logReadResult => {          val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata          fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))        })      }      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,        fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)    }  }def readFromLocalLog(replicaId: Int,                       fetchOnlyFromLeader: Boolean,                       fetchIsolation: FetchIsolation,                       fetchMaxBytes: Int,                       hardMaxBytesLimit: Boolean,                       readPartitionInfo: Seq[(TopicPartition, PartitionData)],                       quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {    def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {      //读取的起始偏移量      val offset = fetchInfo.fetchOffset      //读取的大小      val partitionFetchSize = fetchInfo.maxBytes      //follower Replica 的LogStartOffset      val followerLogStartOffset = fetchInfo.logStartOffset      brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()      brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()      val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)      try {        trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +          s"remaining response limit $limitBytes" +          (if (minOneMessage) s", ignoring response/partition size limits" else ""))        val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)        val fetchTimeMs = time.milliseconds        //读取数据,获取读取结果,里面包含了读取到的消息,LEO,HW,LogStartOffset等信息        val readInfo = partition.readRecords(          //读取的起始偏移量          fetchOffset = fetchInfo.fetchOffset,          //Follower副本保存的Leader epoch          currentLeaderEpoch = fetchInfo.currentLeaderEpoch,          maxBytes = adjustedMaxBytes,          fetchIsolation = fetchIsolation,          fetchOnlyFromLeader = fetchOnlyFromLeader,          minOneMessage = minOneMessage)        //获取读到的数据        val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {          //如果分区被限流了,那么返回一个空集合          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)        } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {          //如果返回的消息集合不完整,也返回一个空集合          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)        } else {          //正常返回          readInfo.fetchedData        }        //根据获取到的数据封装返回结果        LogReadResult(info = fetchDataInfo,                      highWatermark = readInfo.highWatermark,//Leader的HW                      leaderLogStartOffset = readInfo.logStartOffset,//Leader的LogStartOffset                      leaderLogEndOffset = readInfo.logEndOffset,//Leader的LEO                      followerLogStartOffset = followerLogStartOffset,//Follower的LogStartOffset                      fetchTimeMs = fetchTimeMs,                      readSize = adjustedMaxBytes,                      lastStableOffset = Some(readInfo.lastStableOffset),                      exception = None//异常信息                      )      } catch {        case e@ (_: UnknownTopicOrPartitionException |                 _: NotLeaderForPartitionException |                 _: UnknownLeaderEpochException |                 _: FencedLeaderEpochException |                 _: ReplicaNotAvailableException |                 _: KafkaStorageException |                 _: OffsetOutOfRangeException) =>          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),                        highWatermark = -1L,                        leaderLogStartOffset = -1L,                        leaderLogEndOffset = -1L,                        followerLogStartOffset = -1L,                        fetchTimeMs = -1L,                        readSize = 0,                        lastStableOffset = None,                        exception = Some(e))        case e: Throwable =>          brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()          brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()          val fetchSource = Request.describeReplicaId(replicaId)          error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +            s"on partition $tp: $fetchInfo", e)          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),                        highWatermark = -1L,                        leaderLogStartOffset = -1L,                        leaderLogEndOffset = -1L,                        followerLogStartOffset = -1L,                        fetchTimeMs = -1L,                        readSize = 0,                        lastStableOffset = None,                        exception = Some(e))      }    }    //读取的最大字节    var limitBytes = fetchMaxBytes    //封装结果对象    val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]    //是否至少返回一条消息    var minOneMessage = !hardMaxBytesLimit    //遍历分区进行读取    readPartitionInfo.foreach { case (tp, fetchInfo) =>      //获取读取的结果      val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)      //获取每个分区读取的字节数      val recordBatchSize = readResult.info.records.sizeInBytes      if (recordBatchSize > 0)        minOneMessage = false      //更新还可以读取的字节数      limitBytes = math.max(0, limitBytes - recordBatchSize)      //将分区的读取结果保存到结果集合中      result += (tp -> readResult)    }    //返回结果集    result  }//获取Leader Replica的高水位val initialHighWatermark = localReplica.highWatermark.messageOffsetprivate def updateFollowerLogReadResults(replicaId: Int,                                           readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {    debug(s"Recording follower broker $replicaId log end offsets: $readResults")    readResults.map { case (topicPartition, readResult) =>      var updatedReadResult = readResult      nonOfflinePartition(topicPartition) match {          //如果找到了对应的分区        case Some(partition) =>          //根据副本id获取Partition对象中保存的副本对象          //Partition.allReplicasMap结构中保存了当前分区的所有副本对象。其中,key是brokerid,value是对应的Replica对象          partition.getReplica(replicaId) match {              //如果获取到了Replica对象            case Some(replica) =>              //TODO 更新leader保存的各个follower副本的LEO              partition.updateReplicaLogReadResult(replica, readResult)            case None =>              warn(s"Leader $localBrokerId failed to record follower $replicaId's position " +                s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +                s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} " +                s"for partition $topicPartition. Empty records will be returned for this partition.")              updatedReadResult = readResult.withEmptyFetchInfo          }          //如果对应的分区没有被创建        case None =>          warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created.")      }      topicPartition -> updatedReadResult    }  }Partition.updateReplicaLogReadResult() 方法:
def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = {    val replicaId = replica.brokerId    val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L    //TODO 最终更新Leader副本保存的Follower副本的LEO的值    replica.updateLogReadResult(logReadResult)    val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L    val leaderLWIncremented = newLeaderLW > oldLeaderLW    //TODO 尝试更新ISR列表,在这个方法中会更新Leader副本对象的HW对象和分区对应的Log对象的HW值    val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)    val result = leaderLWIncremented || leaderHWIncremented    if (result)      tryCompleteDelayedRequests()    debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.")    result  }def updateLogReadResult(logReadResult: LogReadResult) {    if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)      _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)    else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)      _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)    //更新Follower副本的日志起始偏移量,即 _logStartOffset 变量    logStartOffset = logReadResult.followerLogStartOffset    //更新Follower副本的LEO元数据对象,即 _logEndOffsetMetadata 变量    logEndOffsetMetadata = logReadResult.info.fetchOffsetMetadata    //最后一次拉取时Leader副本的LEO    lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset    lastFetchTimeMs = logReadResult.fetchTimeMs}def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {    inWriteLock(leaderIsrUpdateLock) {      // 检查给定的副本对象是否需要添加到ISR列表      leaderReplicaIfLocal match {        case Some(leaderReplica) =>          //获取给定节点的Replica对象          val replica = getReplica(replicaId).get          //获取leader副本的HW值          val leaderHW = leaderReplica.highWatermark          //获取Follower副本的LEO          val fetchOffset = logReadResult.info.fetchOffsetMetadata.messageOffset          //判断是否需要更新ISR列表的条件:          //1.该节点不在ISR列表,且replica.logEndOffsetMetadata.offsetDiff(leaderHW)           //2.给定Follower副本的LEO大于等于leader副本的HW          //3.给定的Follower副本属于该分区          //4.leader epoch对应的起始偏移量存在且小于Follower副本的LEO          //满足这4个条件说明这个Follower副本已经和leader副本保持同步了,把这个Follower副本加入到ISR列表中          if (!inSyncReplicas.contains(replica) &&             assignedReplicas.map(_.brokerId).contains(replicaId) &&             replica.logEndOffsetMetadata.offsetDiff(leaderHW) >= 0 &&             leaderEpochStartOffsetOpt.exists(fetchOffset >= _)) {            //将该副本加入集合            val newInSyncReplicas = inSyncReplicas + replica            info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +              s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")            // update ISR in ZK and cache            //更新ISR列表            updateIsr(newInSyncReplicas)            replicaManager.isrExpandRate.mark()          }          // check if the HW of the partition can now be incremented          // since the replica may already be in the ISR and its LEO has just incremented          //TODO 尝试更新leader副本的HW对象及分区对应的Log对象的HW值          maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)        case None => false // nothing to do if no longer leader      }    }  }maybeIncrementLeaderHW() 方法:尝试更新 leader 副本的 HW 对象及分区对应的Log 对象的 HW 值(对应同步数据的第六步)
private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {    val allLogEndOffsets = assignedReplicas.filter { replica =>      curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicas.contains(replica)    }.map(_.logEndOffsetMetadata)    //取ISR列表中副本的最小的LEO作为新的HW    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)    //获取旧的HW    val oldHighWatermark = leaderReplica.highWatermark    //如果新的HW值大于旧的HW值,就更新    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {      //更新Replica的hightWatermark对象,以及对应Log对象的高水位值      leaderReplica.highWatermark = newHighWatermark      debug(s"High watermark updated to $newHighWatermark")      true    } else {      def logEndOffsetString(r: Replica) = s"replica ${r.brokerId}: ${r.logEndOffsetMetadata}"      debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark. " +        s"All current LEOs are ${assignedReplicas.map(logEndOffsetString)}")      false    }  }def onHighWatermarkIncremented(highWatermark: Long): Unit = {    lock synchronized {      //更新高水位值      replicaHighWatermark = Some(highWatermark)      producerStateManager.onHighWatermarkUpdated(highWatermark)      updateFirstUnstableOffset()    }  }def highWatermark_=(newHighWatermark: LogOffsetMetadata) {    //如果是本地副本    if (isLocal) {      if (newHighWatermark.messageOffset < 0)        throw new IllegalArgumentException("High watermark offset should be non-negative")      //高水位的元数据对象      highWatermarkMetadata = newHighWatermark      //更新Log对象保存的高水位值      log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))      trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")    } else {      throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId")    }  }//更新Replica的hightWatermark对象,以及对应Log对象的高水位值leaderReplica.highWatermark = newHighWatermarkdef fetchMessages(){    ...    logReadResults.foreach { case (topicPartition, logReadResult) =>      //如果读取发生错误      if (logReadResult.error != Errors.NONE)        errorReadingData = true      bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes      //将读取结果放入集合      logReadResultMap.put(topicPartition, logReadResult)    }    ...}上面所说的合适的时机,分为 立即返回 和 延时返回,当满足下面四个条件之一时,便立即返回,否则会进行延时处理:
- 拉取等待的时间到了 
- 拉取请求中没有拉取分区的信息 
- 已经拉取到了足够多的数据 
- 拉取过程中发生错误 
Leader副本写入数据,Follower副本进行同步的过程分为9个步骤:
- leader 副本将数据写入本地磁盘
- leader 副本更新 LEO
- follower 副本发送同步数据请求,携带自身的 LEO
- leader 副本更新本地保存的其它副本的 LEO
- leader 副本尝试更新 ISR 列表
- leader 副本更新 HW
- leader 副本给 follower 副本返回数据,携带 leader 副本的 HW 值
- follower 副本接收响应并写入数据,更新自身 LEO
- follower 副本更新本地的 HW 值
关于 HW 和 LEO 的保存:
- 对于HW,Leader 副本和 Follower 副本只保存自身的 
- 对于LEO,Follower 副本只保存自身的,但是 Leader 副本除了保存自身的外,还会保存所有 Follower 副本的 LEO 值 
- 无论是Leader副本所在节点,还是Follower副本所在节点,分区对应的Partition 对象都会保存所有的副本对象,但是只有本地副本对象有对应的日志文件