深圳前十网站建设公司桂林建网站的公司

web/2025/10/2 8:50:54/文章来源:
深圳前十网站建设公司,桂林建网站的公司,跨境电商平台推广,免费茶叶网站建设一、服务端接收消费者拉取数据的方法二、遍历请求中需要拉取数据的主题分区集合#xff0c;分别执行查询数据操作#xff0c;1、会选择合适的副本读取本地日志数据(2.4版本后支持主题分区多副本下的读写分离) 三、会判断当前请求是主题分区Follower发送的拉取数据请求还是消费… 一、服务端接收消费者拉取数据的方法二、遍历请求中需要拉取数据的主题分区集合分别执行查询数据操作1、会选择合适的副本读取本地日志数据(2.4版本后支持主题分区多副本下的读写分离) 三、会判断当前请求是主题分区Follower发送的拉取数据请求还是消费者客户端拉取数据请求1、拉取数据之前首先要得到leaderIsrUpdateLock的读锁2、readFromLocalLog读取本地日志数据 四、读取日志数据就是读取的segment文件(忽视零拷贝的加持)1、获取当前本地日志的基础数据(高水位线偏移量等)2、遍历segment直到从segment读取到数据 五、创建文件日志流对象FileRecords1、根据位点创建文件流FileLogInputStream2、把文件流构建成数据批量迭代器对象RecordBatchIterator3、DefaultRecordBatch实现iterator方法在内存中创建数据 一、服务端接收消费者拉取数据的方法 kafka服务端接收生产者数据的API在KafkaApis.scala类中handleFetchRequest方法 override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit {//省略代码request.header.apiKey match {//消费者拉取消息请求这个接口进行处理case ApiKeys.FETCH handleFetchRequest(request)//省略代码} } def handleFetchRequest(request: RequestChannel.Request): Unit {//从请求中获取请求的API版本(versionId)和客户端ID(clientId)。val versionId request.header.apiVersionval clientId request.header.clientId//从请求中获取Fetch请求的数据val fetchRequest request.body[FetchRequest]//根据请求的版本号决定是否获取主题名称的映射关系(topicNames)。如果版本号大于等于13则使用metadataCache.topicIdsToNames()获取主题名称映射关系否则使用空的映射关系。val topicNames if (fetchRequest.version() 13)metadataCache.topicIdsToNames()elseCollections.emptyMap[Uuid, String]()//根据主题名称映射关系获取Fetch请求的数据(fetchData)和需要忽略的主题(forgottenTopics)。val fetchData fetchRequest.fetchData(topicNames)val forgottenTopics fetchRequest.forgottenTopics(topicNames)//创建一个Fetch上下文(fetchContext)用于管理Fetch请求的处理过程。该上下文包含了Fetch请求的版本号、元数据、是否来自Follower副本、Fetch数据、需要忽略的主题和主题名称映射关系。val fetchContext fetchManager.newContext(fetchRequest.version,fetchRequest.metadata,fetchRequest.isFromFollower,fetchData,forgottenTopics,topicNames)//初始化两个可变数组erroneous和interesting用于存储处理过程中的错误和请求需要哪些topic的数据。val erroneous mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]()val interesting mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]()//Fetch请求来自Follower副本if (fetchRequest.isFromFollower) {//则需要验证权限。如果权限验证通过// The follower must have ClusterAction on ClusterResource in order to fetch partition data.if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {//遍历每个分区的数据根据不同情况将数据添加到erroneous或interesting中fetchContext.foreachPartition { (topicIdPartition, data) if (topicIdPartition.topic null)erroneous topicIdPartition - FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)else if (!metadataCache.contains(topicIdPartition.topicPartition))erroneous topicIdPartition - FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)elseinteresting topicIdPartition - data}} else {//如果权限验证失败则将所有分区的数据添加到erroneous中。fetchContext.foreachPartition { (topicIdPartition, _) erroneous topicIdPartition - FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)}}} else {//如果Fetch请求来自普通的Kafka消费者// Regular Kafka consumers need READ permission on each partition they are fetching.val partitionDatas new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]fetchContext.foreachPartition { (topicIdPartition, partitionData) if (topicIdPartition.topic null)erroneous topicIdPartition - FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)elsepartitionDatas topicIdPartition - partitionData}//需要验证对每个分区的读取权限,根据权限验证结果将数据添加到erroneous或interesting中。val authorizedTopics authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)partitionDatas.foreach { case (topicIdPartition, data) if (!authorizedTopics.contains(topicIdPartition.topic))erroneous topicIdPartition - FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)else if (!metadataCache.contains(topicIdPartition.topicPartition))erroneous topicIdPartition - FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)elseinteresting topicIdPartition - data}}//省略代码//如果需要的topic没有校验通过或者不存在则直接调用processResponseCallback处理响应if (interesting.isEmpty) {processResponseCallback(Seq.empty)} else {// for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given// no bytes were recorded in the recent quota window// trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress//如果是Follower提取数据的请求则maxQuotaWindowBytes设置为int类型的最大否则从记录中得到此client以前获取数据大小// 再和请求中、配置文件中的fetchMaxBytes比较得到下面fetchMaxBytes和fetchMinBytes两个值val maxQuotaWindowBytes if (fetchRequest.isFromFollower)Int.MaxValueelsequotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt//根据请求的类型和配额限制获取Fetch请求的最大字节数(fetchMaxBytes)和最小字节数(fetchMinBytes)val fetchMaxBytes Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)val fetchMinBytes Math.min(fetchRequest.minBytes, fetchMaxBytes)val clientMetadata: Optional[ClientMetadata] if (versionId 11) {// Fetch API version 11 added preferred replica logic//提取 API 版本 11以上 添加了首选副本逻辑Optional.of(new DefaultClientMetadata(fetchRequest.rackId,clientId,request.context.clientAddress,request.context.principal,request.context.listenerName.value))} else {Optional.empty()}//创建一个FetchParams对象包含了请求的各种参数val params new FetchParams(versionId,fetchRequest.replicaId,fetchRequest.replicaEpoch,fetchRequest.maxWait,fetchMinBytes,fetchMaxBytes,FetchIsolation.of(fetchRequest),clientMetadata)// call the replica manager to fetch messages from the local replica//replicaManager.fetchMessages方法从本地副本获取消息并提供回调函数processResponseCallback处理响应replicaManager.fetchMessages(params params,fetchInfos interesting,quota replicationQuota(fetchRequest),responseCallback processResponseCallback,)} } replicaManager.fetchMessages 最后通过这个方法获得日志 /*** Fetch messages from a replica, and wait until enough data can be fetched and return;* the callback function will be triggered either when timeout or required fetch info is satisfied.* Consumers may fetch from any replica, but followers can only fetch from the leader.* 从副本中获取消息并等待可以获取足够的数据并返回;* 当满足超时或所需的获取信息时将触发回调函数。* 消费者可以从任何副本中获取但追随者只能从领导者那里获取。*/def fetchMessages(params: FetchParams,fetchInfos: Seq[(TopicIdPartition, PartitionData)],quota: ReplicaQuota,responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] Unit): Unit {// check if this fetch request can be satisfied right away//调用readFromLocalLog函数从本地日志中读取消息并将结果保存在logReadResults中。val logReadResults readFromLocalLog(params, fetchInfos, quota, readFromPurgatory false)var bytesReadable: Long 0var errorReadingData falsevar hasDivergingEpoch falsevar hasPreferredReadReplica falseval logReadResultMap new mutable.HashMap[TopicIdPartition, LogReadResult]//根据读取结果更新一些变量如bytesReadable可读取的字节数、errorReadingData是否读取数据时发生错误、hasDivergingEpoch是否存在不同的epoch和hasPreferredReadReplica是否存在首选读取副本。logReadResults.foreach { case (topicIdPartition, logReadResult) brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()if (logReadResult.error ! Errors.NONE)errorReadingData trueif (logReadResult.divergingEpoch.nonEmpty)hasDivergingEpoch trueif (logReadResult.preferredReadReplica.nonEmpty)hasPreferredReadReplica truebytesReadable bytesReadable logReadResult.info.records.sizeInByteslogReadResultMap.put(topicIdPartition, logReadResult)}// respond immediately if 1) fetch request does not want to wait 不需要等待// 2) fetch request does not require any data 不需要任何数据// 3) has enough data to respond 有足够的数据// 4) some error happens while reading data 读取数据时发生错误// 5) we found a diverging epoch 存在不同的epoch// 6) has a preferred read replica 存在首选读取副本if (params.maxWaitMs 0 || fetchInfos.isEmpty || bytesReadable params.minBytes || errorReadingData ||hasDivergingEpoch || hasPreferredReadReplica) {val fetchPartitionData logReadResults.map { case (tp, result) val isReassignmentFetch params.isFromFollower isAddingReplica(tp.topicPartition, params.replicaId)tp - result.toFetchPartitionData(isReassignmentFetch)}responseCallback(fetchPartitionData)} else {//将构建一个延迟处理的DelayedFetch对象并将其放入延迟处理队列delayedFetchPurgatory中以便在满足特定条件时完成请求。// construct the fetch results from the read resultsval fetchPartitionStatus new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)]fetchInfos.foreach { case (topicIdPartition, partitionData) logReadResultMap.get(topicIdPartition).foreach(logReadResult {val logOffsetMetadata logReadResult.info.fetchOffsetMetadatafetchPartitionStatus (topicIdPartition - FetchPartitionStatus(logOffsetMetadata, partitionData))})}val delayedFetch new DelayedFetch(params params,fetchPartitionStatus fetchPartitionStatus,replicaManager this,quota quota,responseCallback responseCallback)// create a list of (topic, partition) pairs to use as keys for this delayed fetch operationval delayedFetchKeys fetchPartitionStatus.map { case (tp, _) TopicPartitionOperationKey(tp) }// try to complete the request immediately, otherwise put it into the purgatory;// this is because while the delayed fetch operation is being created, new requests// may arrive and hence make this operation completable.delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)}}通过readFromLocalLog查询数据日志 二、遍历请求中需要拉取数据的主题分区集合分别执行查询数据操作 /*** Read from multiple topic partitions at the given offset up to maxSize bytes* 以给定的偏移量从多个主题分区读取最大最大大小字节*/def readFromLocalLog(params: FetchParams,readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],quota: ReplicaQuota,readFromPurgatory: Boolean): Seq[(TopicIdPartition, LogReadResult)] {val traceEnabled isTraceEnableddef read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult {//从fetchInfo中获取一些数据包括fetchOffset拉取偏移量、maxBytes拉取的最大字节数和logStartOffset日志起始偏移量。val offset fetchInfo.fetchOffsetval partitionFetchSize fetchInfo.maxBytesval followerLogStartOffset fetchInfo.logStartOffset//计算调整后的最大字节数adjustedMaxBytes取fetchInfo.maxBytes和limitBytes的较小值。val adjustedMaxBytes math.min(fetchInfo.maxBytes, limitBytes)try {if (traceEnabled)trace(sFetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, sremaining response limit $limitBytes (if (minOneMessage) s, ignoring response/partition size limits else ))//获取指定分区的Partition对象val partition getPartitionOrException(tp.topicPartition)//获取当前时间戳fetchTimeMsval fetchTimeMs time.milliseconds//检查拉取请求或会话中的主题ID是否与日志中的主题ID一致如果不一致则抛出InconsistentTopicIdException异常。val topicId if (tp.topicId Uuid.ZERO_UUID) None else Some(tp.topicId)if (!hasConsistentTopicId(topicId, partition.topicId))throw new InconsistentTopicIdException(Topic ID in the fetch session did not match the topic ID in the log.)// If we are the leader, determine the preferred read-replica//根据一些条件选择合适的副本(replica)进行后续的数据抓取(fetch)。val preferredReadReplica params.clientMetadata.asScala.flatMap(metadata findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))if (preferredReadReplica.isDefined) {//如果不存在则跳过读取操作直接构建一个LogReadResult对象表示从非Leader副本获取数据的结果。replicaSelectorOpt.foreach { selector debug(sReplica selector ${selector.getClass.getSimpleName} returned preferred replica s${preferredReadReplica.get} for ${params.clientMetadata})}// If a preferred read-replica is set, skip the readval offsetSnapshot partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader false)LogReadResult(info new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),divergingEpoch None,highWatermark offsetSnapshot.highWatermark.messageOffset,leaderLogStartOffset offsetSnapshot.logStartOffset,leaderLogEndOffset offsetSnapshot.logEndOffset.messageOffset,followerLogStartOffset followerLogStartOffset,fetchTimeMs -1L,lastStableOffset Some(offsetSnapshot.lastStableOffset.messageOffset),preferredReadReplica preferredReadReplica,exception None)} else {// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition//尝试进行读取操作。根据读取结果构建一个LogReadResult对象表示从分区获取数据的结果。val readInfo: LogReadInfo partition.fetchRecords(fetchParams params,fetchPartitionData fetchInfo,fetchTimeMs fetchTimeMs,maxBytes adjustedMaxBytes,minOneMessage minOneMessage,updateFetchState !readFromPurgatory)val fetchDataInfo if (params.isFromFollower shouldLeaderThrottle(quota, partition, params.replicaId)) {// If the partition is being throttled, simply return an empty set.new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)} else if (!params.hardMaxBytesLimit readInfo.fetchedData.firstEntryIncomplete) {// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make// progress in such cases and dont need to report a RecordTooLargeExceptionnew FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)} else {readInfo.fetchedData}//返回构建的LogReadResult对象LogReadResult(info fetchDataInfo,divergingEpoch readInfo.divergingEpoch.asScala,highWatermark readInfo.highWatermark,leaderLogStartOffset readInfo.logStartOffset,leaderLogEndOffset readInfo.logEndOffset,followerLogStartOffset followerLogStartOffset,fetchTimeMs fetchTimeMs,lastStableOffset Some(readInfo.lastStableOffset),preferredReadReplica preferredReadReplica,exception None)}} catch {//省略代码}}var limitBytes params.maxBytesval result new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)]var minOneMessage !params.hardMaxBytesLimitreadPartitionInfo.foreach { case (tp, fetchInfo) val readResult read(tp, fetchInfo, limitBytes, minOneMessage)//记录批量的大小以字节为单位。val recordBatchSize readResult.info.records.sizeInBytes// Once we read from a non-empty partition, we stop ignoring request and partition level size limits//如果 recordBatchSize 大于 0则将 minOneMessage 设置为 false表示从非空分区读取了消息不再忽略请求和分区级别的大小限制。if (recordBatchSize 0)minOneMessage falselimitBytes math.max(0, limitBytes - recordBatchSize)//将 (tp - readResult) 添加到 result 中result (tp - readResult)}result}val readResult read(tp, fetchInfo, limitBytes, minOneMessage)遍历主题分区分别执行read内部函数执行查询操作 方法内部通过partition.fetchRecords查询数据 1、会选择合适的副本读取本地日志数据(2.4版本后支持主题分区多副本下的读写分离) 在上面readFromLocalLog方法中read内部方法 val preferredReadReplica params.clientMetadata.asScala.flatMap(metadata findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))def findPreferredReadReplica(partition: Partition,clientMetadata: ClientMetadata,replicaId: Int,fetchOffset: Long,currentTimeMs: Long): Option[Int] {//partition.leaderIdIfLocal返回一个Option[Int]类型的值表示分区的领导者副本的ID。// 如果本地是领导者副本则返回该副本的ID否则返回None。partition.leaderIdIfLocal.flatMap { leaderReplicaId // Dont look up preferred for follower fetches via normal replication//如果存在领导者副本ID(leaderReplicaId)则执行flatMap中的代码块否则直接返回None。if (FetchRequest.isValidBrokerId(replicaId))Noneelse {replicaSelectorOpt.flatMap { replicaSelector //通过metadataCache.getPartitionReplicaEndpoints方法获取分区副本的端点信息val replicaEndpoints metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,new ListenerName(clientMetadata.listenerName))//创建一个可变的mutable.Set[ReplicaView]类型的集合replicaInfoSet用于存储符合条件的副本信息。val replicaInfoSet mutable.Set[ReplicaView]()//遍历分区的远程副本集合(partition.remoteReplicas)对每个副本进行以下操作//获取副本的状态快照(replica.stateSnapshot)。//如果副本的brokerId存在于ISR中并且副本的日志范围包含了指定的fetchOffset则将副本信息添加到replicaInfoSet中。partition.remoteReplicas.foreach { replica val replicaState replica.stateSnapshotif (partition.inSyncReplicaIds.contains(replica.brokerId) replicaState.logEndOffset fetchOffset replicaState.logStartOffset fetchOffset) {replicaInfoSet.add(new DefaultReplicaView(replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),replicaState.logEndOffset,currentTimeMs - replicaState.lastCaughtUpTimeMs))}}//创建一个DefaultReplicaView对象表示领导者副本的信息并将其添加到replicaInfoSet中。val leaderReplica new DefaultReplicaView(replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),partition.localLogOrException.logEndOffset,0L)replicaInfoSet.add(leaderReplica)//创建一个DefaultPartitionView对象表示分区的信息其中包含了副本信息集合和领导者副本信息。val partitionInfo new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)//调用replicaSelector.select方法根据特定的策略选择合适的副本。然后通过collect方法将选择的副本转换为副本的ID集合。replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect {// Even though the replica selector can return the leader, we dont want to send it out with the// FetchResponse, so we exclude it here//从副本的ID集合中排除领导者副本并返回剩余副本的ID集合。case selected if !selected.endpoint.isEmpty selected ! leaderReplica selected.endpoint.id}}}}}其中 replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect选合适副本默认首先Leader副本但是2.4版本后支持主题分区非Leader副本中读取数据即Follower副本读取数据 在代码上 通过case selected if !selected.endpoint.isEmpty selected ! leaderReplica selected.endpoint.id 判断设置 在配置上 在broker端需要配置参数 replica.selector.class其默认配置为LeaderSelector意思是消费者从首领副本获取消息改为RackAwareReplicaSelector即消费者按照指定的rack id上的副本进行消费。还需要配置broker.rack参数用来指定broker在哪个机房。在consumer端需要配置参数client.rack且这个参数和broker端的哪个broker.rack匹配上就会从哪个broker上去获取消息数据。 读写分离在2.4之前为什么之前不支持后面支持了呢 之前不支持的原因其实对于kakfa而言主题分区的水平扩展完全可以解决消息的处理量增加broker也可以降低系统负载所以没有必要费力不讨好增加一个读写分离。 现在支持的原因有一种场景不是很适合跨机房或者说跨数据中心的场景当其中一个数据中心需要向另一个数据中心同步数据的时候如果只能从首领副本进行数据读取的话需要跨机房来完成而这些流量带宽又比较昂贵而利用本地跟随者副本进行消息读取就成了比较明智的选择。 所以kafka推出这一个功能目的并不是降低broker的系统负载分摊消息处理量而是为了节约流量资源。 三、会判断当前请求是主题分区Follower发送的拉取数据请求还是消费者客户端拉取数据请求 关于Follower发请求可以看一下kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码 def fetchRecords(fetchParams: FetchParams,fetchPartitionData: FetchRequest.PartitionData,fetchTimeMs: Long,maxBytes: Int,minOneMessage: Boolean,updateFetchState: Boolean): LogReadInfo {def readFromLocalLog(log: UnifiedLog): LogReadInfo {readRecords(log,fetchPartitionData.lastFetchedEpoch,fetchPartitionData.fetchOffset,fetchPartitionData.currentLeaderEpoch,maxBytes,fetchParams.isolation,minOneMessage)}//判断获取数据的请求是否来自Followerif (fetchParams.isFromFollower) {// Check that the request is from a valid replica before doing the readval (replica, logReadInfo) inReadLock(leaderIsrUpdateLock) {val localLog localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,fetchParams.fetchOnlyLeader)val replica followerReplicaOrThrow(fetchParams.replicaId,fetchPartitionData)val logReadInfo readFromLocalLog(localLog)(replica, logReadInfo)}if (updateFetchState !logReadInfo.divergingEpoch.isPresent) {updateFollowerFetchState(replica,followerFetchOffsetMetadata logReadInfo.fetchedData.fetchOffsetMetadata,followerStartOffset fetchPartitionData.logStartOffset,followerFetchTimeMs fetchTimeMs,leaderEndOffset logReadInfo.logEndOffset,fetchParams.replicaEpoch)}logReadInfo} else {//来自消费者客户端请求inReadLock(leaderIsrUpdateLock) {val localLog localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,fetchParams.fetchOnlyLeader)readFromLocalLog(localLog)}}}1、拉取数据之前首先要得到leaderIsrUpdateLock的读锁 上面的方法逻辑中 //Follower的请求val (replica, logReadInfo) inReadLock(leaderIsrUpdateLock) //来自消费者客户端请求inReadLock(leaderIsrUpdateLock) 2、readFromLocalLog读取本地日志数据 def readFromLocalLog(log: UnifiedLog): LogReadInfo {readRecords(log,fetchPartitionData.lastFetchedEpoch,fetchPartitionData.fetchOffset,fetchPartitionData.currentLeaderEpoch,maxBytes,fetchParams.isolation,minOneMessage)}四、读取日志数据就是读取的segment文件(忽视零拷贝的加持) 1、获取当前本地日志的基础数据(高水位线偏移量等) private def readRecords(localLog: UnifiedLog,lastFetchedEpoch: Optional[Integer],fetchOffset: Long,currentLeaderEpoch: Optional[Integer],maxBytes: Int,fetchIsolation: FetchIsolation,minOneMessage: Boolean): LogReadInfo {//localLog的高水位标记(initialHighWatermark)、、。val initialHighWatermark localLog.highWatermark//日志起始偏移(initialLogStartOffset)val initialLogStartOffset localLog.logStartOffset//日志结束偏移(initialLogEndOffset)val initialLogEndOffset localLog.logEndOffset//和最后一个稳定偏移(initialLastStableOffset)val initialLastStableOffset localLog.lastStableOffset//省略代码//代码调用localLog的read方法读取指定偏移量处的数据val fetchedData localLog.read(fetchOffset,maxBytes,fetchIsolation,minOneMessage)//返回一个包含读取数据的LogReadInfo对象。new LogReadInfo(fetchedData,Optional.empty(),initialHighWatermark,initialLogStartOffset,initialLogEndOffset,initialLastStableOffset)}def read(startOffset: Long,maxLength: Int,isolation: FetchIsolation,minOneMessage: Boolean): FetchDataInfo {checkLogStartOffset(startOffset)val maxOffsetMetadata isolation match {case FetchIsolation.LOG_END localLog.logEndOffsetMetadatacase FetchIsolation.HIGH_WATERMARK fetchHighWatermarkMetadatacase FetchIsolation.TXN_COMMITTED fetchLastStableOffsetMetadata}localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation FetchIsolation.TXN_COMMITTED)}2、遍历segment直到从segment读取到数据 /*** param startOffset 起始偏移量startOffset* param maxLength 最大长度maxLength* param minOneMessage 是否至少读取一个消息minOneMessage* param maxOffsetMetadata 最大偏移元数据maxOffsetMetadata* param includeAbortedTxns 是否包含已中止的事务includeAbortedTxns* throws* return 返回一个FetchDataInfo对象*/def read(startOffset: Long,maxLength: Int,minOneMessage: Boolean,maxOffsetMetadata: LogOffsetMetadata,includeAbortedTxns: Boolean): FetchDataInfo {maybeHandleIOException(sException while reading from $topicPartition in dir ${dir.getParent}) {trace(sReading maximum $maxLength bytes at offset $startOffset from log with stotal length ${segments.sizeInBytes} bytes)//获取下一个偏移元数据endOffsetMetadata和对应的偏移量endOffset)val endOffsetMetadata nextOffsetMetadataval endOffset endOffsetMetadata.messageOffset//获得segment的集合比如会获得某个位点后所有的segment的列表有序var segmentOpt segments.floorSegment(startOffset)// return error on attempt to read beyond the log end offset//如果起始偏移量大于结束偏移量或者找不到日志段则抛出OffsetOutOfRangeException异常。if (startOffset endOffset || segmentOpt.isEmpty)throw new OffsetOutOfRangeException(sReceived request for offset $startOffset for partition $topicPartition, sbut we only have log segments upto $endOffset.)//如果起始偏移量等于最大偏移量元数据的偏移量函数返回一个空的FetchDataInfo对象if (startOffset maxOffsetMetadata.messageOffset)emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)else if (startOffset maxOffsetMetadata.messageOffset)//如果起始偏移量大于最大偏移量元数据的偏移量函数返回一个空的FetchDataInfo对象,并将起始偏移量转换为偏移元数据emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)else {//函数在小于目标偏移量的基本偏移量的日志段上进行读取var fetchDataInfo: FetchDataInfo null//首先fetchDataInfo不为null和大于start位点的segment要存在while (fetchDataInfo null segmentOpt.isDefined) {val segment segmentOpt.getval baseOffset segment.baseOffsetval maxPosition // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.//如果它在此段上请使用最大偏移位置;否则段大小是限制。if (maxOffsetMetadata.segmentBaseOffset segment.baseOffset) maxOffsetMetadata.relativePositionInSegmentelse segment.sizefetchDataInfo segment.read(startOffset, maxLength, maxPosition, minOneMessage)if (fetchDataInfo ! null) {//则根据条件判断如果includeAbortedTxns为真则调用addAbortedTransactions方法添加中断的事务到fetchDataInfo中。if (includeAbortedTxns)fetchDataInfo addAbortedTransactions(startOffset, segment, fetchDataInfo)}//如果fetchDataInfo为null则将segmentOpt设置为segments中大于baseOffset的下一个段。else segmentOpt segments.higherSegment(baseOffset)}//成功读取到消息函数返回FetchDataInfo对象if (fetchDataInfo ! null) fetchDataInfoelse {//如果已经超过了最后一个日志段的末尾且没有读取到任何数据则返回一个空的FetchDataInfo对象其中包含下一个偏移元数据和空的内存记录MemoryRecords.EMPTYnew FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)}}}}首先获得segment列表var segmentOpt segments.floorSegment(startOffset) 通过 fetchDataInfo segment.read(startOffset, maxLength, maxPosition, minOneMessage) 从segment获取数据 五、创建文件日志流对象FileRecords def read(startOffset: Long,maxSize: Int,maxPosition: Long size,minOneMessage: Boolean false): FetchDataInfo {if (maxSize 0)throw new IllegalArgumentException(sInvalid max size $maxSize for log read from segment $log)val startOffsetAndSize translateOffset(startOffset)// if the start position is already off the end of the log, return null//则表示起始位置已经超出了日志的末尾则返回 nullif (startOffsetAndSize null)return null//起始偏移量、基准偏移量和起始位置创建一个LogOffsetMetadata对象val startPosition startOffsetAndSize.positionval offsetMetadata new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)val adjustedMaxSize if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)else maxSize// return a log segment but with zero size in the case belowif (adjustedMaxSize 0)return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)// calculate the length of the message set to read based on whether or not they gave us a maxOffset//根据给定的maxOffset计算要读取的消息集的长度将其限制为maxPosition和起始位置之间的较小值并将结果赋给fetchSize变量。val fetchSize: Int min((maxPosition - startPosition).toInt, adjustedMaxSize)//创建一个FetchDataInfo对象其中包含偏移量元数据、从起始位置开始的指定大小的日志切片log slice以及其他相关信息//其中log.slice(startPosition, fetchSize)是日志数据new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),adjustedMaxSize startOffsetAndSize.size, Optional.empty())}log.slice 获取文件数据 public FileRecords slice(int position, int size) throws IOException {int availableBytes availableBytes(position, size);int startPosition this.start position;return new FileRecords(file, channel, startPosition, startPosition availableBytes, true);}这里生成一个新的文件数据对象下面就是FileRecords的构造方法 FileRecords(File file,FileChannel channel,int start,int end,boolean isSlice) throws IOException {this.file file;this.channel channel;this.start start;this.end end;this.isSlice isSlice;this.size new AtomicInteger();//表示这只是一个切片视图不需要检查文件大小直接将size设置为end - start。if (isSlice) {// dont check the file size if this is just a slice viewsize.set(end - start);} else {//如果isSlice为false表示这不是一个切片需要检查文件的大小。如果文件大小超过了Integer.MAX_VALUE将抛出KafkaException异常。if (channel.size() Integer.MAX_VALUE)throw new KafkaException(The size of segment file ( channel.size() ) is larger than the maximum allowed segment size of Integer.MAX_VALUE);//否则将文件大小和end之间的较小值设置为limit并将size设置为limit - start。然后将文件通道的位置设置为limit即文件末尾的位置。int limit Math.min((int) channel.size(), end);size.set(limit - start);// if this is not a slice, update the file pointer to the end of the file// set the file position to the last byte in the filechannel.position(limit);}batches batchesFrom(start);}1、根据位点创建文件流FileLogInputStream /*** Get an iterator over the record batches in the file, starting at a specific position. This is similar to* {link #batches()} except that callers specify a particular position to start reading the batches from. This* method must be used with caution: the start position passed in must be a known start of a batch.* param start The position to start record iteration from; must be a known position for start of a batch* return An iterator over batches starting from {code start}*///它的作用是从FileRecords直接返回一个batch的iterator public IterableFileChannelRecordBatch batchesFrom(final int start) {return () - batchIterator(start);}private AbstractIteratorFileChannelRecordBatch batchIterator(int start) {final int end;if (isSlice)end this.end;elseend this.sizeInBytes();//创建一个FileLogInputStream对象inputStream并传入this、start和end作为参数。FileLogInputStream inputStream new FileLogInputStream(this, start, end);//创建一个RecordBatchIterator对象并将inputStream作为参数传入。//将创建的RecordBatchIterator对象作为返回值返回。return new RecordBatchIterator(inputStream);} } FileLogInputStream类实现了nextBatch()接口,这个接口是从基础输入流中获取下一个记录批次。 public class FileLogInputStream implements LogInputStreamFileLogInputStream.FileChannelRecordBatch {/*** Create a new log input stream over the FileChannel* param records Underlying FileRecords instance* param start Position in the file channel to start from* param end Position in the file channel not to read past*/FileLogInputStream(FileRecords records,int start,int end) {this.fileRecords records;this.position start;this.end end;}Overridepublic FileChannelRecordBatch nextBatch() throws IOException {//首先获取文件的通道channelFileChannel channel fileRecords.channel();//检查是否达到了文件末尾或者下一个记录批次的起始位置。如果达到了文件末尾则返回空null。if (position end - HEADER_SIZE_UP_TO_MAGIC)return null;//读取文件通道中的记录头部数据并将其存储在一个缓冲区logHeaderBufferlogHeaderBuffer.rewind();Utils.readFullyOrFail(channel, logHeaderBuffer, position, log header);//记录头部数据中解析出偏移量offset和记录大小sizelogHeaderBuffer.rewind();long offset logHeaderBuffer.getLong(OFFSET_OFFSET);int size logHeaderBuffer.getInt(SIZE_OFFSET);// V0 has the smallest overhead, stricter checking is done laterif (size LegacyRecord.RECORD_OVERHEAD_V0)throw new CorruptRecordException(String.format(Found record size %d smaller than minimum record overhead (%d) in file %s., size, LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file()));//检查是否已经超过了文件末尾减去记录开销和记录大小的位置。如果超过了则返回空nullif (position end - LOG_OVERHEAD - size)return null;//代码会根据记录头部的magicbyte magic logHeaderBuffer.get(MAGIC_OFFSET);//创建一个记录批次对象batchfinal FileChannelRecordBatch batch;if (magic RecordBatch.MAGIC_V个LUE_V2)//则创建一个旧版本的记录批次对象batch new LegacyFileChannelRecordBatch(offset, magic, fileRecords, position, size);else//否则创建一个默认版本的记录批次对象batch new DefaultFileChannelRecordBatch(offset, magic, fileRecords, position, size);//代码会更新当前位置position以便下次读取下一个记录批次。position batch.sizeInBytes();return batch;} } 2、把文件流构建成数据批量迭代器对象RecordBatchIterator 上文中的batchIterator方法会把文件流构造RecordBatchIterator对象 class RecordBatchIteratorT extends RecordBatch extends AbstractIteratorT {private final LogInputStreamT logInputStream;RecordBatchIterator(LogInputStreamT logInputStream) {this.logInputStream logInputStream;}Overrideprotected T makeNext() {try {T batch logInputStream.nextBatch();if (batch null)return allDone();return batch;} catch (EOFException e) {throw new CorruptRecordException(Unexpected EOF while attempting to read the next batch, e);} catch (IOException e) {throw new KafkaException(e);}} }AbstractIterator抽象类 public abstract class AbstractIteratorT implements IteratorT {private enum State {READY, NOT_READY, DONE, FAILED}private State state State.NOT_READY;private T next;Overridepublic boolean hasNext() {switch (state) {case FAILED:throw new IllegalStateException(Iterator is in failed state);case DONE:return false;case READY:return true;default:return maybeComputeNext();}}Overridepublic T next() {if (!hasNext())throw new NoSuchElementException();state State.NOT_READY;if (next null)throw new IllegalStateException(Expected item but none found.);return next;}Overridepublic void remove() {throw new UnsupportedOperationException(Removal not supported);}public T peek() {if (!hasNext())throw new NoSuchElementException();return next;}protected T allDone() {state State.DONE;return null;}protected abstract T makeNext();private Boolean maybeComputeNext() {state State.FAILED;next makeNext();if (state State.DONE) {return false;} else {state State.READY;return true;}}}调用RecordBatchIterator类的makeNext()方法之后调用第五章节的FileLogInputStream中的nextBatch() DefaultFileChannelRecordBatch这个是默认的 static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {DefaultFileChannelRecordBatch(long offset,byte magic,FileRecords fileRecords,int position,int batchSize) {super(offset, magic, fileRecords, position, batchSize);}Overrideprotected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {return new DefaultRecordBatch(buffer);}Overridepublic long baseOffset() {return offset;}//省略代码}3、DefaultRecordBatch实现iterator方法在内存中创建数据 之后看一下哪里调用的DefaultFileChannelRecordBatch中的toMemoryRecordBatch方法 DefaultRecordBatch再通过这个batch的iterator方法获取到IteratorRecord的 public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {Override public IteratorRecord iterator() {if (count() 0)return Collections.emptyIterator();if (!isCompressed())return uncompressedIterator();// for a normal iterator, we cannot ensure that the underlying compression stream is closed,// so we decompress the full record set here. Use cases which call for a lower memory footprint// can use streamingIterator at the cost of additional complexitytry (CloseableIteratorRecord iterator compressedIterator(BufferSupplier.NO_CACHING, false)) {ListRecord records new ArrayList(count());while (iterator.hasNext())records.add(iterator.next());return records.iterator();}} } DefaultFileChannelRecordBatch是FileChannelRecordBatch的一个子类。FileChannelRecordBatch表示日志是通过FileChannel的形式来保存的。在遍历日志的时候不需要将日志全部读到内存中而是在需要的时候再读取。我们直接看最重要的iterator方法 public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {protected final long offset;protected final byte magic;protected final FileRecords fileRecords;protected final int position;protected final int batchSize;private RecordBatch fullBatch;private RecordBatch batchHeader;FileChannelRecordBatch(long offset,byte magic,FileRecords fileRecords,int position,int batchSize) {this.offset offset;this.magic magic;this.fileRecords fileRecords;this.position position;this.batchSize batchSize;}//省略代码Overridepublic IteratorRecord iterator() {return loadFullBatch().iterator();}//省略代码} protected RecordBatch loadFullBatch() {if (fullBatch null) {batchHeader null;fullBatch loadBatchWithSize(sizeInBytes(), full record batch);}return fullBatch;}最后会调用DefaultFileChannelRecordBatch类型的toMemoryRecordBatch方法在内存中生成批量数据 private RecordBatch loadBatchWithSize(int size, String description) {FileChannel channel fileRecords.channel();try {ByteBuffer buffer ByteBuffer.allocate(size);Utils.readFullyOrFail(channel, buffer, position, description);buffer.rewind();//在内存中生成数据return toMemoryRecordBatch(buffer);} catch (IOException e) {throw new KafkaException(Failed to load record batch at position position from fileRecords, e);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/85536.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

超市会员管理系统seo外包公司多少钱

第8天 小型软件项目开发 8.1 记事本开发 小技巧:用VC6新建工程,以资源方式打开系统自带notepad.exe中的MENU资源,加入到自己新建的工程中;然后再添加到VS工程中,即可获取现有exe的菜单资源。 EndDialog中传入的参数…

甜品网站设计与实现毕业设计php学校网站建设

地点:茗福轩西班牙的一对情侣。老板娘在用电脑讲解。品完茶好像不过瘾,还要去酒吧喝点。

商城网站的开发怎么做贵阳设计网站

异常处理概述在代码的运行过程中,错误是不可避免的,总的来说,错误发生于两种情况:一是程序内部的逻辑或者语法错误,二是运行环境或者用户输入中不可预知的数据造成的错误。对于前者,就称之为错误&#xff0…

网站制作公司 云南中卫网站推广优化

一、goroutine 池 本质上是生产者消费者模型在工作中我们通常会使用可以指定启动的 goroutine 数量-worker pool 模式,控制 goroutine 的数量,防止 goroutine 泄漏和暴涨一个简易的 work pool 示例代码如下: package mainimport ("fmt…

网站改版技术要求e福州首页

转载声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 原文链接:在NS-3中安装可视化工具pyviz的一些问题的解决_寻同学的博客-CSDN博客 今天,在更新软件时,使用以下…

葫芦岛建设工程信息网站hyperx wordpress 汉化

cpu 抖动性能分析新机器 当我在新机器上工作时,我想了解它的局限性。 在这篇文章中,我将研究机器的抖动以及忙于等待本周末构建的新PC的影响。 该机器的规格很有趣,但不是发布目的。 永远不要少于它们: i7-3970X六核运行于4.5 GH…

美容医疗 网站建设宽屏网站宽度

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、力扣322. 零钱兑换二、力扣509. 斐波那契数三、力扣46. 全排列四、力扣51. N 皇后五、力扣52. N 皇后 II 前言 一、力扣322. 零钱兑换 class Solution {pu…

网站开发的整个流程学it要多久多少学费

来源:amazonaws.com摘要:长期以来,使用无监督(预)训练来提高区别性任务的性能表现一直是机器学习研究的一个重要目标。最近,OpenAI通过使用一个具有可扩展性的任务不可知系统,在一系列不同的自然…

建设银行城东支行网站网站报价表格

系列文章 1.SpringBoot整合RabbitMQ并实现消息发送与接收 2. 解析JSON格式参数 & 修改对象的key 3. VUE整合Echarts实现简单的数据可视化 4. Java中运用BigDecimal对字符串的数值进行加减乘除等操作 5. List<HashMap<String,String>&…

护肤品网站建设的摘要iis2008如何做网站

目录 一、摘要1.1 项目介绍1.2 项目详细录屏 二、功能模块2.1 数据中心模块2.2 二手商品档案管理模块2.3 商品预约管理模块2.4 商品预定管理模块2.5 商品留言板管理模块2.6 商品资讯管理模块 三、实体类设计3.1 用户表3.2 二手商品表3.3 商品预约表3.4 商品预定表3.5 留言表3.6…

井冈山保育院网站建设网站前后端的关系

快捷键快捷键符号英文名称功能说明Control Space⌃SpaceBasic code completion基本的代码补全(补全任何类、方法、变量),代码智能提示,因为和切换输入法快捷键冲突,所以基本改成Alt/Shift Command Enter⌘⇧↩Compl…

网站硬件费用wordpress地图页面如何添加

朋友们好! 春天来了,我们热情洋溢的团队很高兴能为 Gotchiverse 带来一堆新鲜的更新和丰富的功能。让我们一起来看看这次开发更新带来了什么: Gotchichain 选择定居基地 精神力量竞技场获得了 EBIC 更新 高奇守护者通过全新的进阶系统提升…

长沙房地产网站建设网页游戏大全小游戏

RTOS 调试指南 此文档介绍 FreeRTOS 系统方案支持的常用软件调试方法,帮助相关开发人员快速高效地进行软件调试,提高解决软件问题的效率。 栈回溯 栈回溯是指获取程序的调用链信息,通过栈回溯信息,能帮助开发者快速理清程序执行…

中英文网站建站睢县做网站的公司

后台代理提醒简介 随着生活节奏的加快,我们有时会忘记一些重要的事情或日子,所以提醒功能必不可少。应用可能需要在指定的时刻,向用户发送一些业务提醒通知。例如购物类应用,希望在指定时间点提醒用户有优惠活动。为满足此类业务…

建设银行河北招聘网站网站改版 数据迁移

西门子S7-1200/1500系列的PLC,采用Profinet实时以太网通讯协议,需要连接带EtherCAT的通讯功能的伺服驱动器等设备,就必须进行通讯协议转换。捷米特JM-EIP-RTU系列的网关提供了,快速可行的解决方案 捷米特JM-ECTM-PN在PROFINET一侧…

php企业网站源码下载西安建站套餐

情况1:图片在/public目录下把图片放到与index.html同级的目录下情况1-1.png方式1因为vue编译后会生成index.html,所以我们将图片与index.html放在同一目录下,相当于在index.html中使用引入图片情况2:图片在/src/assets目录下把图片…

网站建设赚钱吗天元建设集团有限公司总裁赵纪峰

文章目录 颜色特征量化颜色直方图适用颜色空间:RGB、HSV等颜色空间操作 几何特征边缘 Edge边缘定义边缘提取 基于关键点的特征描述子引入几何特征:关键点几何特征:Harris角点FAST角点检测几何特征:斑点局部特征:SIFT预…

家教网站建设手机制作app教程

飞桨(PaddlePaddle)是百度自主研发的中国首个开源开放、功能丰富的产业级深度学习平台,以百度多年的深度学习技术研究和业务应用为基础。飞桨深度学习平台集核心框架、基础模型库、端到端开发套件、丰富的工具组件于一体,还包括了…

温州本地网站平台国外域名注册哪个网站好

虽然单花色胡牌算法面试时写出来了,但是完整的胡牌算法却没有写,既然遇到了,秉着不抛弃不放弃的精神,当然不能原谅懒惰的自己了。下面这篇为一个完整的胡牌算法。胡牌规则除了以下几点,其余与单花色胡牌规则一致&#…

网站建设熊掌号wordpress分类产品

注:新添加的硬盘,如果没有分区,可以直接使用pvcreate进行创建,然后用vgextend进行扩展如果新添加的硬盘经过分区,则要把需要扩展的分区修改为8e格式,则进行扩展以上内容实测~相关概念:pv:物理卷…