kafka学习笔记(四、生产者、消费者(客户端)深入研究(三)——事务详解及代码实例)

在这里插入图片描述


1.事务简介

Kafka事务是Apache Kafka在流处理场景中实现Exactly-Once语义的核心机制。它允许生产者在跨多个分区和主题的操作中,以原子性(Atomicity)的方式提交回滚消息,确保数据处理的最终一致性。例如,在流处理中,消费者读取消息后处理并生成新消息,若处理失败,事务可确保原始消息的消费偏移与新消息的发送同时回滚,避免数据不一致。

事务的核心作用:
原子性: 跨分区的写操作要么全部成功,要么全部失败。
隔离性: 事务未提交时,消息对消费者不可见(通过isolation.level=read_committed配置实现)。
持久性: 事务状态持久化至内部Topic __transaction_state,支持故障恢复。

2.事务实现的核心原理

2.1.幂等性

幂等性是事务的基础,确保单分区内消息不重复。

简单的说就是对接口的多次调用所产生的结果和调用一次一致的。(生产者在进行重试的时候有可能会重复写入消息,kafka幂等性功能的使用就是为了避免这种情况的发生)

其实现依赖两个核心机制

  • Producer ID(PID):生产者初始化时由事务协调器(Transaction Coordinator)分配的唯一标识。
  • 序列号(Sequence Number):每个消息携带的递增序号,Broker通过检查PID和序列号判断是否重复。

每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者没发送一条消息就会将 <PID,分区> 对应的序列号的值加1

broker端会在内存中为每一对 <PID,分区> 维护一个序列号。对于收到的额每一条消息,只有当他的序列号的值(SN_new)比broker端维护的对应序列号的值(SN_old)大1(即SN_new = SN_old + 1)时,broker才会接收它。

配置要求:

  • enable.idempotence=true
  • max.in.flight.requests.per.connection ≤5
  • acks=all
  • retries >0(需启用重试机制)。

局限性:幂等性仅保证单会话、单分区的Exactly-Once语义,无法跨会话或多分区。

2.2.事务机制

事务通过事务协调器TransactionCoordinator, TC)和事务日志__transaction_state)扩展了幂等性,实现跨分区和会话的原子性。

  • 事务协调器(TransactionCoordinator

    角色: 管理事务生命周期,持久化事务状态至__transaction_state
    容错: TC故障时,新TC通过事务日志恢复状态。

  • 事务日志(__transaction_state

    存储内容: 事务ID、PID、涉及的分区列表、事务状态(如Ongoing、PrepareCommit)。
    分区策略: 按事务ID哈希分配到50个默认分区,确保负载均衡。

2.3.事务实现原理

事务实现的流程从事务的初始化到开启及发送消息再到提交或回滚事务主要分为以下五个阶段来讲解。

2.3.1.查找TransactionCoordinator

TransactionCoordinator负责分配PID和管理事务,所以生产者要做的第一件事就是找出对应的 TransactionCoordinator所在的broker节点。

步骤:

  1. 发送FindCoordinatorRequest请求,FindCoordinatorRequest中coordinator_type为1。
  2. kafka收到请求,根据coordinator_key(transactionalId)查找对应的TransactionCoordinator节点,找到返回对应的node_id、host和 port信息。

    通过transactionalId的哈希值分配到__transaction_state的特定分区编号。
    计算算法:
    Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount1

  3. 最后根据找到的分区寻找此分区leader副本所在的broker节点,该broker几点即为这个transactionalId对应的TransactionCoordinator节点。

2.3.2.获取PID

在找到TransactionCoordinator节点后,就需要为当前生产者分配一个PID了。生产者获取PID的操作是通过InitProducerIdRequest请求实现的。
在这里插入图片描述

  • transactional_id 表示事务transactionalId
  • transactional_timeout_ms 表示TransactionCoordinator等待事务状态更新的超时时间,通过客户端参数transaction.timeout.ms配置,默认为60s。

步骤:

  1. 生产者发送InitProducerIdRequest请求到TransactionCoordinator

    如果未开启事务特性只开启幂等特性,则InitProducerIdRequest请求可以发送给任意的broker。

  2. TransactionCoordinator将第一次收到包含transactionalIdInitProducerIdRequest请求的transactionalId和对应的PID以消息的形式保存到主题__transaction_state中(Conssume-transorm-produce流程图中2.1步骤)。
  3. 增加该PID对应的producer_epoch

    具有相同PIDproducer_epoch小于该producer_epoch的其他生产者新开启的事务会被拒绝。

  4. 恢复(commit)或中止(Abort)之前的生产者未完成的事务。
  5. 响应InitProducerIdRequest请求。

    返回类型为InitProducerIdResponse,其主要包含PIDproducer_epoch

2.3.3.开启事务

通过KafkaProducer的beginTransaction()方法开启事务。

调用了此方法后,生产者本地会标记已经开启了一个新的事物,只有在生产者发送第一条消息之后TransactionCoordinator才会认为该事物已经开启。

2.3.4.Conssume-transorm-produce

这个阶段是kafka事务中最复杂的一个阶段,其包含了整个事务的数据处理流程,涉及多种请求。
在这里插入图片描述
步骤:

  1. AddpartitonsToTxnRequest
    生产者给一个新的分区(TopicPartition)发送数据之前,就需要先向TransactionCoordinator发送AddpartitonsToTxnRequest
    这个请求会让TransactionCoordinator<transactionId, TopicPartiton>的对应关系存储在主题_transaction_state中(上图中的4.1步骤)以此对照关系后续为每个分区设置COMMITABORT标记。

    如果该分区是对应事务中的第一个分区,那么此时TransactionCoordinator还会启动对该事务的计时。

    在这里插入图片描述

  2. ProduceRequest
    生产者通过ProduceRequest发送消息(ProducerBatch)到用户自定义主题中(与发送普通消息相同,如流程图4.2)。
    与普通消息不同的是,ProducerBatch中包含实质的PIDproducer_epochsequence number

  3. AddOffsetsToTxnRequest
    此请求来自KafkaProducer中的sendOffsetsToTranscation()方法,此方法可以在一个事物批次里处理消息的消费和发送。此方法会向TransactionCoordinator节点发送AddOffsetsToTxnRequest请求,TransactionCoordinator收到后会通过groupId来推导出在_consumer_offsets中的分区,之后将这个分区保存在_consumer_offsets中(如流程图4.3)。

    方法有两次参数Map<TopicPartition, OffsetAndMetadata> offsetsgroupId

    在这里插入图片描述

  4. TxnOffsetCommitRequest
    此请求也来自sendOffsetsToTranscation(),在处理完AddOffsetsToTxnRequest之后,生产者还会发送TxnOffsetCommitRequest请求给GroupCoordinator,从而将本次事务中包含的消费位移信息offsets存储到主题_consumer_offsets中(如流程图4.4)。

2.3.5.提交或中止事务

数据写入成功就可以调用kafkaProducer中的commitTransaction()方法或abortTransaction()方法来结束当前事务。其步骤如下:

  1. EndTxnrequest
    此请求用来提交或中止事务。

    无论调用commitTransaction()方法或abortTransaction()方法,生产者都会向TransactionCoordinator发送EndTxnRequest请求。

    在这里插入图片描述

    TransactionCoordinator收到EndTxnRquest的执行流程:

    1. PREPARE_COMMITPREPARE_ABORT消息写入主题__transaction_state(流程图5.1)。
    2. 通过WriteTxnMarkersRequest请求将COMMITABORT信息写入用户所使用的普通主题__consumer_offsets(流程图5.2)。
    3. COMPLETE_COMMITCOMPLETE_ABORT信息写入内部主题__transaction_state(流程图5.3
  2. WriteTxnMarkersRequest
    此请求的由TransactionCoordinator发向事务中各个分区的leader节点,当节点收到请求后会在对应的分区中写入控制消息(ControlBatch)。

    控制消息用来标识事务的终结,和普通消息一样存储到日志文件中。

  3. 写入最终的COMPLETE_COMMITCOMPLETE_ABORT
    TransactionCoordinator将最终的COMPLETE_COMMITCOMPLETE_ABORT信息写入主题__transaction_state以表明当前事务已经结束,此时可以删除主题__transaction_state中所有关于该事务的消息。

由于主题__transaction_state采用的日志清理策略为日志压缩,所以这里的删除只需将相应的消息设置为墓碑消息2即可。


  1. transactionTopicPartitionCount为主题__transaction_state中分区个数,这个可以通过broker端参数transaction.state.log.num.partitions类配置,默认为50。 ↩︎

  2. 墓碑消息:不直接删除数据,而是通过在数据记录中插入一个特殊的标记(即墓碑消息),来指示这些数据已被删除或不再有效。墓碑消息本身不占用存储空间,它只是标记了数据的删除状态,实际的物理删除是由日志压缩(Compact)或日志删除(Delete)策略来完成的‌。 ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/79210.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Missashe计网复习笔记(随时更新)

Missashe计算机网络复习笔记 前言&#xff1a;这篇笔记用于博主对计网这门课所学进行记录和总结&#xff0c;也包括一些个人的理解。正在更新当中…… 第一章 计算机网络体系结构 考纲内容 (一) 计算机网络概述 计算机网络的概念、组成与功能;计算机网络的分类; 计算机网络…

PVP鼠标推荐(deepseek)

下面有不懂的自行百度查找&#x1f44d; ❤️ 以下是几款在 双击性能&#xff08;DBC&#xff09; 和 拖拽点击&#xff08;DC&#xff09; 方面表现优秀的游戏鼠标推荐&#xff0c;结合了硬件性能、微动寿命以及玩家口碑&#xff1a; 1. 罗技 G102/G203 Lightsync 特点&#…

ABP vNext + EF Core 实战性能调优指南

ABP vNext EF Core 实战性能调优指南 &#x1f680; 目标 本文面向中大型 ABP vNext 项目&#xff0c;围绕查询性能、事务隔离、批量操作、缓存与诊断&#xff0c;系统性地给出优化策略和最佳实践&#xff0c;帮助读者快速定位性能瓶颈并落地改进。 &#x1f4d1; 目录 ABP vN…

为啥大模型一般将kv进行缓存,而q不需要

1. 自回归生成的特点 大模型&#xff08;如 GPT 等&#xff09;在推理时通常采用自回归生成的方式&#xff1a; 模型逐个生成 token&#xff0c;每次生成一个新 token 时&#xff0c;需要重新计算注意力。在生成第 t 个 token 时&#xff0c;模型需要基于前 t-1 个已生成的 t…

3DGS-slam:splatam公式

配套讲解视频&#xff1a;https://www.bilibili.com/video/BV1ZgfBYdEpg/?spm_id_from333.1387.homepage.video_card.click&vd_sourced4c3e747c32049ddd90dcce17208f4e0 1、多维高斯分布公式: 对于多维&#xff08;多变量&#xff09;高斯分布&#xff0c;概率密度函数的…

从Dockerfile 构建docker镜像——保姆级教程

从Dockfile开始 dockerfile简介开始构建1、编辑dockerfile2、构建镜像3、拉取镜像4、推送到镜像仓库 镜像的优化1、优化的基本原则2、多阶段构建 dockerfile简介 开始构建 1、编辑dockerfile # 使用官方的 Python 3.8 镜像作为基础镜像 FROM python:3.8-slim# 设置工作目录 …

开元类双端互动组件部署实战全流程教程(第2部分:控制端协议拆解与机器人逻辑调试)

作者&#xff1a;那个写了个机器人结果自己被踢出房间的开发者 游戏逻辑房间结构参考界面 从这张图我们能看出&#xff0c;该组件按功能结构细分为多个房间&#xff0c;每个房间底注、准入标准不同&#xff0c;对应的控制模块也有层级区分。常规来说&#xff0c;一个“互动房间…

[特征工程]机器学习-part2

1 特征工程概念 特征工程:就是对特征进行相关的处理 一般使用pandas来进行数据清洗和数据处理、使用sklearn来进行特征工程 特征工程是将任意数据(如文本或图像)转换为可用于机器学习的数字特征,比如:字典特征提取(特征离散化)、文本特征提取、图像特征提取。 特征工程步骤…

[数据库之十一] 数据库索引之联合索引

执行数据库查询时&#xff0c;通常查询条件是多对个属性进行判断和约束&#xff0c;对于这种类型的查询&#xff0c;如果存在多个索引则使用多个索引&#xff0c;或者使用建立在多属性搜索码上的索引&#xff0c;这样能提高查询效率。 一、使用多个单码索引 假设数据表 instruc…

增强学习(Reinforcement Learning)简介

增强学习&#xff08;Reinforcement Learning&#xff09;简介 增强学习是机器学习的一种范式&#xff0c;其核心目标是让智能体&#xff08;Agent&#xff09;通过与环境的交互&#xff0c;基于试错机制和延迟奖励反馈&#xff0c;学习如何选择最优动作以最大化长期累积回报。…

PaddlePaddle 和PyTorch选择与对比互斥

你遇到的错误信息如下&#xff1a; RuntimeError: (PreconditionNotMet) Tensors dimension is out of bound.Tensors dimension must be equal or less than the size of its memory.But received Tensors dimension is 8, memorys size is 0.[Hint: Expected numel() * Size…

vison transformer vit 论文阅读

An Image is Worth 16x16 Words 20年的论文看成10年的哈斯我了 [2010.11929] 一张图像胜过 16x16 个单词&#xff1a;用于大规模图像识别的转换器 --- [2010.11929] An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale 为什么transformer好训练&am…

依赖关系-根据依赖关系求候选码

关系模式R&#xff08;U, F&#xff09;, U{}&#xff0c;F是R的函数依赖集&#xff0c;可以将属性分为4类&#xff1a; L: 仅出现在依赖集F左侧的属性 R: 仅出现在依赖集F右侧的属性 LR: 在依赖集F左右侧都出现的属性 NLR: 在依赖集F左右侧都未出现的属性 结论1: 若X是L类…

SAP note 3565626 : Baltimore CyberTrust 根证书即将过期

SAP note 3565626 &#xff1a; Baltimore CyberTrust 根证书即将过期 20250512 2025年5月9日 症状 您已收到来⾃ SAP Integration Suite/Cloud Integration 服务的通知邮件&#xff0c; 建议 Baltimore CyberTrust 根证书将于 2025 年 5 ⽉ 12 ⽇ 过期&#xff0c;其中 Balt…

算法精讲:字母异位词分组问题剖析

算法精讲:字母异位词分组问题剖析 一、引言 在算法的学习与实践中,字符串相关的问题一直是重点和难点。今天我们要深入探讨的“字母异位词分组”问题,不仅考验对字符串操作的理解,还涉及到数据结构的巧妙运用。通过解决这个问题,我们能进一步提升算法思维和代码实现能力。…

【每日八股】复习 Redis Day7:应知应会的 33 条 Redis 基础八股文

应知应会的 33 条 Redis 基础八股文 今天对 Redis 八股文进行收官总结&#xff0c;共收录了 33 条基础八股文。 文章目录 应知应会的 33 条 Redis 基础八股文Redis 持久化简述 Redis 持久化的两种策略&#xff1f;AOF 的三种持久化策略&#xff1f;AOF 磁盘重写机制&#xf…

k8s之探针

探针介绍&#xff1a; 编排工具运行时&#xff0c;虽说pod挂掉会在控制器的调度下会重启&#xff0c;出现pod重启的时候&#xff0c;但是pod状态是running,无法真实的反应当时pod健康状态&#xff0c;我们可以通过Kubernetes的探针监控到pod的实时状态。 Kubernetes三种探针类…

记9(Torch

目录 1、Troch 1、Troch 函数说明举例torch.tensor()torch.arange()创建张量创建一个标量&#xff1a;torch.tensor(42)创建一个一维张量&#xff1a;torch.tensor([1, 2, 3])创建一个二维张量&#xff1a;torch.tensor([[1, 2], [3, 4]])生成一维等差张量&#xff1a;语法&am…

flask开启https服务支持

目录 一、背景 二、开启https支持 三、自签名 1、安装openssl 2、验证安装 3、自签名 四、编写代码 五、访问https接口 一、背景 最近在做自动化业务&#xff0c;需要兼容现在主流的框架开发的前端页面&#xff0c;于是到github找到了几个项目&#xff0c;clone下来项目并…

路由交换实验

案例一&#xff1a;实施和配置RIPV2 1.给AR1配置接口 查看R1接口配置情况 2.配置三台路由的RIP协议&#xff0c;版本为version2 &#xff0c;关闭自动汇总&#xff0c;通告所有的直连接口 案例二&#xff1a;配置多区域的OSPF协议 1.配置R1的接口IP地址参数 2.配置r2,r3的接口参…