kafka学习笔记(四、生产者(客户端)深入研究(二)——消费者协调器与_consumer_offsets剖析)

在这里插入图片描述


1.消费者协调器和组协调器

如果消费者客户端中配置了多个分配策略,则多消费者的分区分配交由消费者协调器和组协调器来完成,他们之间使用一套组协调协议进行交互。

1.1.在均衡原理

将全部消费者分成多个子集,每个消费者组的子集在服务中对应一个GroupCoordinator对起进行管理,GroupCoordinator是kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互。

GroupCoordinator+ConsumerCoordinator组重要的职责就是负责执行消费者在均衡操作。

1.1.1.触发在均衡情形

  • 有新的消费者加入消费组
  • 有消费者宏机下线
  • 有消费者主动退出消费组
  • 消费组对应的ConsumerCoordinator节点发生了变更
  • 消费组内所订阅的任意主题或主题的分区数发生变化
1.1.2.在均衡操作的主要内容
  1. 第一阶段: FINO_COORDINATOR
    消费者需要确认它所属的消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。如消费者已经保存了与消费者组对应的GroupCoordinator节点的信息,并且与他的网络连接是正常的,那么可进入第二阶段。否则就要想集群中的某个节点(负载最小的节点)发送FindCoordinatorRequest请求来查找对应的GroupCoordinator

    在这里插入图片描述


    • coordinator_key:在这里就是消费组的名称,即groupId
    • coordinator_type:置为0

    kafka在收到FindCoordinatorRequest请求后会根据coordinator_key查找对应的GroupCoordinator节点,如果找到对应的GroupCoordinator则会返回其相对应的node_id、host和port信息。

    具体查找GroupCoordinator的方式:

    • 先根据消费组groupId的哈希值计算_consumer_offfsets中的分区编号,具体算法:Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

      groupMetadataTopicPartitionCount为主题_consumer_offsets的分区个数,可以通过broker端参数offsets.topic.num.partitions来配置,默认为50.

    • 然后根据_consumer_offsets对应的分区寻找此分区leader副本所在的broker节点,该borker节点即为这个groupId对应的GroupCoordinator节点

      消费者groupId最终的分区分配方案计组内消费者所提交的消费位移信息都会发送给此分区leader副本所在的broker节点,让此broker节点即扮演GroupCoordinator的角色,有扮演保存分区分配方案和组内消费者位移的角色,以此可省去很多不必要的中间轮转所带来的消耗。

  2. 第二阶段: JOIN_GROUP
    在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求并处理响应。
    选举消费组的leader:

    GroupCoordinator需要为消费组内的消费者选举出一个leader,选举算法分为两种情况。

    1. 消费组内无leader,则第一个加入的为leader
    2. 如果某一时刻leader消费者由于某些原因退出了消费组,则会重新选举一个新的leader,方法为随机选举

    选举分区分配策略:

    每个消费者都可以设置自己的分区分配策略,则组内最终选举的分配策略基本上可以看作被各个消费者支持的最多的策略,具体过程如下

    1. 收集各个消费者支持的所有分配策略组成候选集candidates
    2. 每个消费者从候选集candidates中找出第一个自身支持的策略,为每个策略投上一票
    3. 计算候选集中各个策略的选票数,选票最多的策略即为当前消费组的分配策略

    如有消费者不支持选出的分配策略,则抛出异常

  3. 第三阶段:SYNC_GROUP
    leader消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配。在此之后需将分配的方案同步给各个消费者,此时leader消费者并不是直接和其余的普通消费者同步分配方案,而是通过GroupCoordinator这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案。
    SyncGroupRequest具体结构:

    在这里插入图片描述

    group_assignment是一个数组类型,其中包含了各个消费者对应的具体分配方案:member_id表示消费者的唯一标识,member_assignment是与消费者对应的分配方案,还可以做更具体的划分。

    member_assignment结构图:

    在这里插入图片描述

    服务端在接受到消费者发送的SyncGroupRequest请求后会交给GroupCoordinator来负责具体的的逻辑处理。

    GroupCoordinator处理逻辑:

    1. SyncGroupRequest请求做合法性校验
    2. 提取leader消费者发送过来的分配方案并连同整个消费组的元数据信息一起存入Kafka的_consumer_offsets主题中
    3. 最后发送响应(SyncGroupResponse)给各个消费者以提供个各个消费者各自所属的分配方案
  4. 第四阶段:HEARTBEAT
    进入这个阶段后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取信息的起始位置(上次提交的位置)。
    消费者通过向GroupCoordinator发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系

    心跳线程是一个独立的线程,可以在轮询消息的空挡发送心跳。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。

    心跳间隔时间由参数heartbeaat.interval.ms指定,默认值为3000(3秒)。

    这个时间必须比session.timeout.ms参数设定的值要,一般设定不能超过其1/3,也可以设定的更小,以控制正常重新平衡的逾期时间。

    max.poll.interval.ms参数用来指定消费者组管理时poll()方法调用之间的最大延迟,也就是消费者在获取更多消息之前可以空闲的时间上限。

    如果此超时时间期满之前poll()没有调用,则消费者被认为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

    LeaveGroupRequest请求可以主动退出消费组,如客户端调用unsubscrible()方法取消对某些主题的订阅。

2._consumer_offsets剖析

_consumer_offsets是Kafka的内部主题,用于存储消费者组的偏移量(offset)信息,是Kafka实现消息可靠传递的关键组件。

一般情况下,当集群中第一次有消费者消费消息时会自动创建主题__consumer_offsets,其副本因子还受offsets.topic.num.partitions.factor参数的约束,此参数默认为3,分区数可以通过offsets.topic.num.partitons参数设置,默认为50。

  • 作用和背景
  • 偏移量管理:消费者需要记录消费进度(即分区中已处理的最新消息位置),__consumer_offsets负责持久化这些信息,确保消费者重启或故障后能恢复进度。
  • 替代ZooKeeper:早期Kafka版本将偏移量存储在ZooKeeper中,但随消费者规模增长,ZooKeeper的写入瓶颈显现。从Kafka 0.9版本起,偏移量迁移至__consumer_offsets,利用Kafka自身的高吞吐和分区能力提升扩展性。

2.1.OffsetCommitRequest

客户端提交消费位移是使用OffsetCommitRequest请求实现的,其UML图如下:

contains
contains (topics)
contains (partitions)
OffsetCommitRequest
+int16 apiKey = 0x0008
+int16 apiVersion
+int32 correlationId
+string clientId
+RequestBody requestBody
RequestBody
+string groupId
+int32 generationId
+string memberId
+int64 retentionTime
+List<TopicData> topics
TopicData
+string topic
+List<PartitionData> partitions
PartitionData
+int32 partition
+int64 offset
+int32 leaderEpoch
+string metadata
+int64 timestamp
  • 类关系:

    • OffsetCommitRequest 包含一个 RequestBody
    • RequestBody 包含多个 TopicData(通过 topics 字段)。
    • 每个 TopicData 包含多个 PartitionData(通过 partitions 字段)。
  • 字段类型:

    • 基本类型(如 int32, string)直接标注。
    • 集合类型用 ListT 表示(例如 ListTopicData)。
  • 关键字段:

    1. 公共头部字段
      API Key 固定为0x0008,标识这是一个OffsetCommit请求。
      API Version 决定请求的格式兼容性(不同版本的Kafka可能扩展字段)。
      Correlation ID 客户端生成的唯一ID,用于跟踪请求与响应的对应关系。
      Client ID 客户端的逻辑标识,用于服务端日志监控。
    2. 请求体字段
      group_id 消费者组唯一标识,对应group.id配置。
      generation_id 消费者组的“年代号”,在Rebalance操作后递增。用于防止已退组的消费者提交过期偏移量。
      member_id 消费者在组内的唯一ID,由Broker分配。
      retention_time 旧版本(如v0, v1)中用于指定偏移量保留时间,新版本中由Broker配置决定。
      topics 待提交偏移量的主题列表,每个主题包含多个分区的偏移量数据。
    3. PartitionData字段
      partition 目标分区编号。
      offset 消费者提交的当前消费进度(即下一条待处理消息的位置)。
      leader_epoch 用于处理副本故障恢复时的数据一致性(Kafka 0.11+引入)。
      metadata 可选的附加信息(如提交者的客户端版本)。
      timestamp: 旧版本中用于指定偏移量时间戳,新版本由Broker自动填充。

不同版本的OffsetCommitRequest可能有字段变化:

版本重要变化
v0基础版本,无leader_epoch字段
v1 新增retention_time字段
v2引入leader_epoch,删除timestamp字段
v3+优化字段编码,支持更严格的校验

2.2.请求流程

  1. 消费者提交偏移量
    消费者调用commitSync()commitAsync()时,构造OffsetCommitRequest,按上述格式编码为二进制数据,发送给Broker

  2. Broker处理逻辑
    Broker将偏移量写入内部主题__consumer_offsets,并根据请求中的generation_idmember_id验证提交合法性。

  3. 响应返回客户端
    Broker返回OffsetCommitResponse,包含每个分区的提交结果(成功或错误码)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/81342.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

快速将FastAPI接口转为模型上下文协议(MCP)!

fastapi_mcp 是一个用于将 FastAPI 端点暴露为模型上下文协议&#xff08;Model Context Protocol, MCP&#xff09;工具的库&#xff0c;并且支持认证功能。 环境macbook&#xff0c;python3.13 pip install fastapi uvicorn fastapi-mcp 代码 from fastapi import FastAPI, …

实验数据的转换

最近做实验需要把x轴y轴z轴的数据处理一下&#xff0c;总结一下解决的方法&#xff1a; 源文件为两个txt文档&#xff0c;分别为x轴和y轴&#xff0c;如下&#xff1a; 最终需要达到的效果是如下&#xff1a; 就是需要把各个矩阵的数据整理好放在同一个txt文档里。 步骤① …

第Y3周:yolov5s.yaml文件解读

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 本次任务&#xff1a;将yolov5s网络模型中的第4层的C3x2修改为C3x1&#xff0c;第6层的C3x3修改为C3x2。 首先输出原来的网络结构&#xff1a; from n pa…

Ansible安装配置

一、前提 服务器操作系统均为centos7.9 主机ipmaster(Ansible管理端)172.25.192.2node1172.25.192.10node2172.25.192.3 更新/etc/hosts文件 二、安装 master节点&#xff1a; 1. 安装epel源 yum install -y epel-release 2. 安装Ansible yum install -y ansible A…

MySQL中ROW_NUMBER() OVER的用法以及使用场景

使用语法 ROW_NUMBER() OVER ([PARTITION BY partition_column1, partition_column2, ...]ORDER BY sort_column1 [ASC|DESC], sort_column2 [ASC|DESC], ... )PARTITION BY&#xff1a;将数据按指定列分组&#xff0c;每组内单独生成行号。ORDER BY&#xff1a;决定组内行号的…

【人工智能】释放本地AI潜能:LM Studio用户脚本自动化DeepSeek的实战指南

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 随着大型语言模型(LLM)的快速发展,DeepSeek以其高效的性能和开源特性成为开发者关注的焦点。LM Studio作为一款强大的本地AI模型管理工具…

笔试强训:Day3

一、牛牛冲钻五&#xff08;模拟&#xff09; 登录—专业IT笔试面试备考平台_牛客网 #include<iostream> using namespace std; int main(){int t,n,k;string s;cin>>t;while(t--){cin>>n>>k>>s;int ret0;//统计加了多少星for(int i0;i<n;i)…

语音识别质量的跟踪

背景 这个项目是用来生成结构化的电子病历的。数据的来源是医生的录音。中间有一大堆的处理&#xff0c;语音识别&#xff0c;关键字匹配&#xff0c;结构化处理&#xff0c;病历编辑......。最多的时候给上百家医院服务。 语音识别质量的跟踪 一、0225医院的训练后的情况分…

人工智能搜索时代的SEO:关键趋势与优化策略

随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;搜索引擎的运作方式正在经历前所未有的变革。2025年&#xff0c;AI驱动的搜索&#xff08;如谷歌的AI概览、ChatGPT搜索和必应的AI增强功能&#xff09;不仅改变了用户获取信息的方式&#xff0c;还为SEO从业…

Node.js心得笔记

npm init 可用npm 来调试node项目 浏览器中的顶级对象时window <ref *1> Object [global] { global: [Circular *1], clearImmediate: [Function: clearImmediate], setImmediate: [Function: setImmediate] { [Symbol(nodejs.util.promisify.custom)]: [Getter] }, cl…

计算机网络01-网站数据传输过程

局域网&#xff1a; 覆盖范围小&#xff0c;自己花钱买设备&#xff0c;宽带固定&#xff0c;自己维护&#xff0c;&#xff0c;一般长度不超过100米&#xff0c;&#xff0c;&#xff0c;带宽也比较固定&#xff0c;&#xff0c;&#xff0c;10M&#xff0c;&#xff0c;&…

Mysql常用函数解析

字符串函数 CONCAT(str1, str2, …) 将多个字符串连接成一个字符串。 SELECT CONCAT(Hello, , World); -- 输出: Hello World​​SUBSTRING(str, start, length) 截取字符串的子串&#xff08;起始位置从1开始&#xff09;。 SELECT SUBSTRING(MySQL, 3, 2); -- 输出: SQ…

SpringMVC 前后端数据交互 中文乱码

ajax 前台传入数据&#xff0c;但是后台接收到的数据中文乱码 首先我们分析一下原因&#xff1a;我们调用接口的时候传入的中文&#xff0c;是没有乱码的 此时我们看一下Java后台接口对应的编码&#xff1a; 默认情况&#xff1a;Servlet容器&#xff08;如Tomcat&#xff09;默…

loads、dumps、jsonpath使用场景

在处理JSON数据时&#xff0c;loads、dumps 和 jsonpath 是三个非常有用的工具或概念。它们各自在不同的场景下发挥作用&#xff0c;让我们一一来看&#xff1a; 1. loads loads 函数是 Python 中 json 模块的一部分&#xff0c;用于将 JSON 格式的字符串解析成 Python 的数据…

Java学习手册:Spring 事务管理

一、事务管理的概念 事务是一组操作的集合&#xff0c;这些操作要么全部成功&#xff0c;要么全部失败。事务管理的目的是保证数据的一致性和完整性。在数据库操作中&#xff0c;事务管理尤为重要&#xff0c;例如银行转账、订单支付等场景都需要事务管理来确保数据的正确性。…

echarts自定义图表--柱状图-横向

区别于纵向表格 xAxis和yAxis对调 要将label全部固定到最右侧&#xff1a; 隐藏一个柱形 为每个label设置固定的偏移距离 offset: [300 - 80, 0] 在data中加入label的配置 根据现在的值生成距离右侧的偏移 更新方法 chart.setOption({series: [{},{data: data.map(v > ({v…

【CV数据集】Visdrone2019无人机目标检测数据集(YOLO、VOC、COCO格式)

visdrone2019的Task1是非常通用的目标检测数据集&#xff0c;也是许多人做目标检测论文和项目必然会用到的数据集&#xff0c;我将该数据集进行了处理&#xff0c;将其YOLO、VOC和COCO格式都整理好&#xff0c;通过下载我整理好的数据集和相关文件&#xff0c;可以直接在自己的…

常见电源的解释说明

英文缩写 BJT&#xff08;bipolar junction transistor&#xff09;双极型结晶体管FET&#xff08;field-effect transistor&#xff09;场效应管TTL&#xff08;Transistor-Transistor Logic&#xff09;三极管CMOS&#xff08;Complementary Metal Oxide Semiconductor&…

【2025年五一数学建模竞赛】A题 解题思路与模型代码

2025年五一数学建模竞赛 A题 问题一&#xff1a;推测支路 1 和支路 2 的车流量 1.1 问题描述 根据提供的主路历史数据以及已知的支路车流量变化趋势&#xff08;支路1呈线性增长&#xff0c;支路2先线性增长后线性减少&#xff09;&#xff0c;推测这两个支路在特定时间段&a…

d202551

目录 一、175. 组合两个表 - 力扣&#xff08;LeetCode&#xff09; 二、511. 游戏玩法分析 I - 力扣&#xff08;LeetCode&#xff09; 三、1204. 最后一个能进入巴士的人 - 力扣&#xff08;LeetCode&#xff09; 一、175. 组合两个表 - 力扣&#xff08;LeetCode&#xf…