详细介绍:深入理解Kafka事务

news/2025/9/22 21:18:40/文章来源:https://www.cnblogs.com/ljbguanli/p/19106060

详细介绍:深入理解Kafka事务

一 kafka事务介绍

1.1 Kafka事务的作用

  • Exactly-Once Semantics (EOS):在“消费 → 处理 → 生产”的流式链路里避免重复写重复读带来的副作用,确保“处理一次且仅一次”的可见效果。

  • 跨分区 / 跨 Topic 原子性:将一次处理内写入的多分区多主题消息,以及本次消费位点 offset 的提交,绑定在同一个事务里,要么都生效,要么都回滚。

1.2 相关术语

  • PID / Producer IDEpochSequence Number:幂等生产者元数据,避免重复写。

  • 事务协调器(Transaction Coordinator):位于 broker 侧的协调者,管理事务状态机与两阶段提交。

  • 控制批次(Control Batch / Control Records):日志里的特殊记录,用于标记事务,主要是 COMMIT / ABORT(注意:数据分区不写“BEGIN”标记)。

  • LSO(Last Stable Offset)HW(High Watermark):对 read_committed 消费者只暴露到 LSO,屏蔽未决事务。

  • __transaction_state:kafka内部主题,用于持久化事务状态机。

  • __consumer_offsets:kafka内部主题,存消费组位点;位点也可以被纳入事务。

  • 僵尸实例:一个旧的 Producer 实例(带着同样的 transactional.id)在崩溃或网络分区后挂掉了,但它可能在恢复后继续尝试往 Kafka 写数据,但是与此同时,已经有一个新的 Producer 实例已经起来并接管了同样的 transactional.id,我们把这个宕机后又恢复的producer叫做僵尸实例

1.3 消费者隔离级别

消费者的隔离级别有下面两种

  • read_uncommitted(默认):可读到未提交已提交数据。

  • read_committed:只读取已提交事务的数据(EOS 流水线应使用)。

假设想要配置消费者隔离级别为read_committed,可通过下面配置完成

props.put("isolation.level", "read_committed");

二、使用 Kafka 事务

2.1 生产者端配置

Properties props = new Properties();
// broker地址
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
// transactional.id 必须唯一且稳定(可复用)
props.put("transactional.id", "order-service-txn-1");
// 配了 transactional.id 会自动开启,但是最好还是显式配置
props.put("enable.idempotence", "true");
/**
然后通常由客户端自动/隐式设置为适配幂等语义:
要求acks=all、retries>0 max.in.flight.requests.per.connection producer = new KafkaProducer<>(props);
// 找到协调器、申请 PID/epoch、登记事务状态
producer.initTransactions();

2.2 事务性生产

// 开启事务
producer.beginTransaction();
// 发送消息
producer.send(new ProducerRecord<>("demo-topic", "key1", "message-1"));
producer.send(new ProducerRecord<>("demo-topic", "key2", "message-2"));
producer.send(new ProducerRecord<>("demo-topic2", "key3", "message-3"));
// 提交事务
producer.commitTransaction();

这样,对于配置了read_committed的消费者而言,要么这三个消息同时可见,要么同时不可见。

2.3 实践建议

  • 使用稳定且可复用的 transactional.id,这样服务重启后就可恢复事务上下文,还能对“僵尸实例”做围栏。

  • 事务应尽可能短小且频繁提交,避免长时间占用导致 LSO 卡住,增加读延迟。

  • 失败重试要以事务回滚为界,确保回滚后可安全重放。

  • EOS 只覆盖 Kafka 内部的原子性;涉及外部系统,则需要额外使用 Outbox/Saga 等模式。

三 kafka事务的实现

3.1 关键组件

  • 事务生产者:发数据、报告参与分区、发起事务结束(提交/回滚)。

  • 消费组协调器(Group Coordinator):当offset被纳入事务时,消费组协调器需要把最新offset发送到专门存储offset的内部主题__consumer_offsets中, 所以消费者协调器和__consumer_offsets里的对应分区也是事务参与者。

  • 事务协调器(Transaction Coordinator):负责给生产者分配 Producer ID/epoch(每个 transactional.id对应一个PID),维护事务状态机,持久化事务日志,并且当事务结束时(commit 或 abort),事务协调器 会把这个事务的结果(commit/abort 标记)广播到所有该事务涉及的分区)。

  • 数据分区所在的 Broker Leader:接受数据与控制批次写入,维护 High Watermark/Last Stable Offset与中止事务索引。

  • 消费者:根据隔离级别获取数据,包括read_committed 和read_uncommitted ,依赖隔离级别和 abortedTransactions 过滤。

3.2 事务实现流程

下图是kafka事务消息的总体流程图

3.2.1 幂等生产者

  • 协调器为每个 transactional.id 分配 PIDepoch

  • 生产者对每个分区维护单调递增的序列号;Broker 端以 (PID, epoch, seq) 去重,避免“重复写”。

  • 若同一 transactional.id 的新实例启动并 initTransactions(),协调器会提升 epoch 并围栏旧实例;旧实例写入不会成功并且得到 INVALID_PRODUCER_EPOCH/ProducerFencedException

3.2.2 事务状态机与内部日志

  • 事务协调器将每个事务的状态持久化到 __transaction_state
    EMPTY/ONGOING → PREPARE_COMMIT | PREPARE_ABORT → COMPLETE_COMMIT | COMPLETE_ABORT

  • 事务涉及到的分区集合(数据分区与 __consumer_offsets 的目标分区)由生产者在首次写入/首次提交位点时通过
    AddPartitionsToTxn / AddOffsetsToTxn 报告给协调器并持久化。

3.2.3  两阶段提交(2PC)

与传统数据库不同的是,数据分区里只写“结束标记”——COMMIT 或 ABORT 的控制批次;不写 BEGIN。BEGIN 只体现在协调器的内部状态机与日志。信息会包含自己所属的事务producer。

阶段 A:事务进行中(ONGOING)

  1. beginTransaction() 后,生产者向多个分区写入消息(每条携带 PID/epoch/seq)。

  2. 如首次写入某分区,生产者会先向协调器请求 AddPartitionsToTxn,协调器会记录“本事务涉及到这个分区”。

阶段 B:准备提交(PREPARE_COMMIT)/ 准备回滚(PREPARE_ABORT)

  1. 生产者调用 commitTransaction()(或 abortTransaction()),就会发送 EndTxn请求给协调器。

  2. 协调器把事务状态改为 PREPARE_COMMIT(或 PREPARE_ABORT)并写入kafka内内部主题 __transaction_state

  3. 扇出:协调器向所有涉及分区的 leader 发起WriteTxnMarkers请求。

阶段 C:各分区落盘控制记录 + 反馈

  1. 在收到事务协调器的WriteTxnMarkers请求后,各分区在自己的日志里追加一个“控制批次(Control Batch)”,类型为 COMMIT 或 ABORT。注意kafka没有“BEGIN”控制批次,BEGIN 信息由协调器掌

  2. 分区 leader 追加成功后应答协调器。

  3. 当所有目标分区都落成控制批次,协调器将事务状态置为 COMPLETE_COMMIT(或 COMPLETE_ABORT),并更新 __transaction_state

3.3 可见性控制

  • HW(High Watermark):副本多数派确认的最高位移。read_uncommitted 可读到 HW。

  • LSO(Last Stable Offset):保证其之前没有“未决事务”的最末位移
    read_committed,Broker 只返回 ≤ LSO 的数据,从源头屏蔽未提交事务。

  • 为何消费者还能拿到“已中止事务”的数据片段?
    为性能考虑,Broker 可能仍返回包含已中止事务记录的批次,但会携带一个
    abortedTransactions 列表(含 producerIdfirstOffset)。客户端在解码时跳过这些记录

  • 事务索引(.txnindex):每个日志段都有一个中止事务索引,Broker 用它在 Fetch 时快速收集 abortedTransactions 列表。

小结:在 read_committed 下,消费者不用“暂存不确定状态数据”去等控制标记;Broker 通过 LSO 保证不给你发“未决事务”的记录。客户端只需在已决事务里过滤 ABORT 记录(根据 abortedTransactions)。

3.4 消费-处理-生产 模式中消费offset与输出的原子绑定

sendOffsetsToTransaction(offsets, groupMetadata) 背后做了两件事,

AddOffsetsToTxn告诉事务协调器:这次事务会提交哪个消费组的位点

TxnOffsetCommit 把位点写入 __consumer_offsets 对应分区

在最终 COMMIT(或 ABORT)时,__consumer_offsets 分区也会收到相应的 COMMIT/ABORT 控制批次,从而与输出数据一并原子生效(或放弃)。

3.5 常见故障的处理

3.5.1 失败与恢复

  • 如果某些分区暂不可用,协调器会持续重试 WriteTxnMarkers最终一致的 2PC)。

  • 事务超时(由客户端 transaction.timeout.ms 申请,受 broker 上限约束)协调器主动 ABORT 并下发 ABORT 标记。

  • 协调器宕机可通过 __transaction_state 重放恢复事务状态并继续扇出事务标记。

  • 在事物未提交之前,配置了read_committed的消费者不会看到未决事务。

3.5.2 应对僵尸实例

  • Kafka 引入了 Producer Epoch,通过围栏机制来隔离僵尸实例。每个 Producer 在第一次用某个 transactional.id 初始化事务时,Kafka 的 Transaction Coordinator 会给它分配一个 producerIdproducerEpoch。当相同 transactional.id 的新实例启动时,Coordinator 会给它分配 更高的 epoch,并更新元数据。就这样,新实例可以用高 epoch 写数据,而旧实例(僵尸)带着低 epoch 再写数据时,Broker 会直接拒绝。

四 运维和调优要点

事务大小与超时

  • 客户端的 transaction.timeout.ms 受 Broker 端上限约束(如 transaction.max.timeout.ms)。

  • 事务过大或时间过长,会拖慢 LSO 前进,导致 read_committed 消费延迟升高

围栏与异常

  • ProducerFencedException / INVALID_PRODUCER_EPOCH:同一 transactional.id 新实例已接管;旧实例必须停止。

  • TransactionAbortedException:本事务已被中止;需要清理/重启事务。

副本与可靠性

  • 幂等/EOS 通常要求 acks=all 与合适的 min.insync.replicas。避免不干净选主导致重写。

重要监控指标

  • 生产端:transactional.commit.latency.avgtransactional.abort.raterecord-errors/retries

  • Broker:transaction-coordinator-metrics(扇出延迟、超时/中止率)、replica-fetcher-metrics

  • 消费端:records-lag-max(在 read_committed 下对 LSO 滞后敏感)。

主题压缩与控制记录

  • 控制批次(COMMIT/ABORT)是特殊记录,日志清理/压缩会保留其必要语义,确保历史可正确回放。

边界与限制

  • 事务只在同一 Kafka 集群内跨 Topic/分区原子;不跨外部系统

  • 超大事务(大量分区/消息)会放大标记扇出成本与恢复时间。

五 Kafka Streams 中的事务

  • processing.guarantee=exactly_once_v2/exactly_once:Streams 在内部为每个任务(Task)维护事务性生产者,把处理结果与位点绑定到同一事务中;重平衡时靠 epoch 围栏防止旧实例写入。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/910405.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

能碳园区 / 工厂系统 - 智慧园区

智慧能碳园区 / 工厂:能碳协同驱动的低碳高效运营新范式 在 “双碳” 目标与产业数字化深度融合的背景下,智慧能碳园区 / 工厂已成为企业绿色转型的核心载体 —— 它并非 “智慧能源” 与 “碳管理” 的简单叠加,而…

郑州网站推广营销wordpress 修改邮箱

我们做接口测试时候&#xff0c;会发现通常需要测试的接口类型有好几种&#xff0c;比较多的是http类型的接口&#xff0c;其他还有webservice接口&#xff0c;rpc接口等等&#xff0c;本次主要说下http类型的接口我们该怎么测试&#xff0c;还有该测试哪些东西。HTTP接口概念:…

上海工程建设执业资格注册中心网站网站建设排名北京

在计算机科学的世界里&#xff0c;排序是一项基本而重要的操作。无论是数据库管理、搜索引擎&#xff0c;还是日常编程&#xff0c;高效的排序算法都是提高性能的关键。本文将介绍四种基本的排序算法&#xff1a;冒泡排序、选择排序、插入排序和快速排序&#xff0c;并探讨它们…

有什么免费开发网站建设软件有哪些做外贸有哪些好的网站有哪些

java8根据某个id删选Java 8带有新的Optional类型&#xff0c;类似于其他语言中可用的类型。 这篇文章将介绍这种新类型的使用方式&#xff0c;即主要用途。 什么是可选类型&#xff1f; 可选的是新容器类型&#xff0c;如果有可用值&#xff0c;则该容器类型将包装单个值。 因…

汕头网站优化公司沈阳电商网站建设

本章主要讲解微信公众号自定义菜单、微信网页开发、模板消息推送等功能的实现&#xff1b;发福利了&#xff0c;下方关注公众号&#xff0c;就能免费获取项目源码1、自定义菜单开发前需要了解以下几点&#xff1a;1、微信公众号的自定义菜单最多包括3个一级菜单&#xff0c;每个…

深圳网站建设与设计制作笑话小网站模板html

它们都是对表达式的记法&#xff0c;因此也被称为前缀记法、中缀记法和后缀记法。它们之间的区别在于运算符相对与操作数的位置不同&#xff1a;前缀表达式的运算符位于与其相关的操作数之前&#xff1b;中缀和后缀同理。举例&#xff1a;(3 4) 5 - 6 就是中缀表达式- 3 4 …

dell公司网站设计特色石油 技术支持 东莞网站建设

上篇文章介绍了如何利用pyhook模块监听鼠标键盘的事件&#xff0c;接下来如果需要通过监听的事件内容&#xff0c;来做对应的操作&#xff0c;需要对event的内容进行相应的判断。 如果仅仅通过event.key来进行判断&#xff0c;可以写成str(evnet.key) F12等&#xff0c;如果需…

佛山网站开发公司秦皇岛微信公众号

2019双11&#xff0c;支付宝参战的第十一年。 与十一年前相比&#xff0c;双11的许多东西都改变了。比如金额——2684亿&#xff0c;差不多是十一年前的5000倍&#xff1b;比如流量——订单峰值54.4万笔/秒&#xff0c;曾经是想都不敢想的数字&#xff1b;再比如层出不穷的新技…

国外域名查询网站装修设计公司哪个好

前言 今天我们来做个小试验&#xff0c;用PHP和RabbitMQ实现消息队列功能。 前期准备&#xff0c;需要安装好docker、docker-compose的运行环境。 如何使用docker部署php服务_php如何使用docker发布-CSDN博客 一、安装RabbitMQ 1、创建相关目录&#xff0c;执行如下命令。…

代码随想录算法训练营第五天 |242.有效的字母异位词、349. 两个数组的交集、第202题. 快乐数、1. 两数之和

242.有效的字母异位词思路:想到的是分别遍历两个数组,然后用map统计值,一个数组遍历的时候对值++,一个--,如果value没有能消除完,那就不一样。 优化:其实一次遍历就可以做完。func isAnagram(s string, t string…

Python - GaussDB table sync to Hive

Python - GaussDB table sync to Hive import psycopg2 from datetime import date, datetimeSRC_TABLE = "aaa" TARGET_TABLE = "bbb"# ---------------------------- # Step 1: Connect to Gaus…

淄博网站开发恶意代码 wordpress

下载地址&#xff1a;网盘下载 基本介绍 编辑内容简介 到底是本什么书&#xff0c;拥有这样一份作序推荐人列表&#xff1a;阿里集团章文嵩博士|新浪TimYang|去哪网吴永强|丁香园冯大辉|蘑菇街岳旭强|途牛汤峥嵘|豆瓣洪强宁|某电商陈皓/林昊…… 这本书出自某电商技术部总监之手…

Photoshop 2025 v26.0(PS2025)下载安装教程(含一键安装包下载)

目录一、PS2025 软件介绍:学 Photoshop 2025 v26.0 下载安装教程前,先懂它为啥好用二、Photoshop 2025 v26.0 下载方法三、Photoshop 2025 v26.0 安装教程详细步骤步骤 1:解压安装包 ——Photoshop 2025 v26.0 下载…

装修设计公司网站腾讯企业邮箱登录入口免费版

文章目录[toc]第一章 面试流程1.1 面试官谈面试1.2 面试3种形式1.3 面试的3个环节第一章 面试流程 1.1 面试官谈面试 初级的程序员谈算法和数据结构&#xff0c;高级的程序员谈项目经验要对公司近况和项目情况了解不要紧张&#xff0c;不要马上上手写代码 1.2 面试3种形式 …

成都市金牛区建设和交通局网站手机软件下载大全

背景 最近遇到了一个问题&#xff0c;在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题&#xff0c;使用的是.net框架&#xff0c;背景是高并发压力下的mq消费&#xff0c;按理说即使队列中堆了几百条消息&#xff0c;我客户端可以同处理5个消息。 原因是多线程…

宁夏网站建设报价毕业设计代写网站

SNAP软件下载与安装 一、下载地址 首先进入网站 找到DOWNLOAD下载页&#xff0c; 安装完成后&#xff0c;界面如下 还需要再装一个Sen2cor下载好之后&#xff0c;解压到用户文件夹下 然后打开L2A_Process.bat文件 打开CMD&#xff0c;输入 cd C:\Users\lenovo\AppData\L…

网站一般怎么维护企业的网站建设费用

字节数组转为二进制数 c#低延迟系统需要高性能的消息处理和传递。 由于在大多数情况下&#xff0c;数据必须通过有线传输或序列化才能保持持久性&#xff0c;因此编码和解码消息已成为处理管道的重要组成部分。 高性能数据编码的最佳结果通常涉及应用程序数据细节的知识。 本文…

做外贸网站市场分析金华 网站建设

Bug(俗称"八阿哥") 是软件开发绕不过的一道坎&#xff0c;因此调试便成了每位程序员一项必备的核心技能。调试不仅有助于理解程序的运行流程&#xff0c;还能改进代码质量&#xff0c;最终提高开发者解决问题的能力以及交付软件的品质。本文旨在讨论 Java 调试关键技…

2345网站入口大数据网络营销

Python 语言的基本语法和编码规范 Python 编程教程教师 : 工作 :Python 语言的基本语法和编码标 准课程描述本章将介绍 Python 语言的基本语法和编码标准&#xff0c;重点介 绍 Python 语言的基本知识&#xff0c;如数据类型、运算符、常量、变量、表 达式和常用语句&#xff0…

上海网站建设-新闻动态金融培训网站源码

引言 曾想过轻松获取亚马逊上的商品图片用于项目或研究吗&#xff1f;是否曾面对网络速度慢或被网站反爬虫机制拦截而无法完成数据采集任务&#xff1f;如果是&#xff0c;那么本文将为您介绍如何用OkHttp和Kotlin构建一个高效的Amazon图片爬虫解决方案。 背景介绍 亚马逊&a…