转:Kafka事务使用和编程示例/实例

Kafka事务使用和编程示例/实例_JobShow裁员加班实况-微信小程序-CSDN博客一、概述​ Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中,或者说是一个原子操作,生产消息和提交偏移量同时成功或者失败。注意:kafka事务和DB事务。在理解消息的事务时,一直处于一个错误理解是,把操作db的业务逻辑跟操作消息当成是一个事务,如下所示:void kakfa_in_tranction(){ // 1.kafa的操作:读取消息或生产消息 kafkaOperation(); // 2.db操作 dbOperation()https://blog.csdn.net/u010002184/article/details/113933973


一、概述

​ Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中,或者说是一个原子操作,生产消息和提交偏移量同时成功或者失败。

  • 注意:kafka事务和DB事务。

    在理解消息的事务时,一直处于一个错误理解是,把操作db的业务逻辑跟操作消息当成是一个事务,如下所示:

    void  kakfa_in_tranction(){// 1.kafa的操作:读取消息或生产消息kafkaOperation();// 2.db操作dbOperation();
    }

    操作DB数据库的数据源是DB,消息数据源是kfaka,这是完全不同两个数据。一种数据源(如mysql,kafka)对应一个事务,所以它们是两个独立的事务。kafka事务指kafka一系列 生产、消费消息等操作组成一个原子操作,db事务是指操作数据库的一系列增删改操作组成一个原子操作。

二、事务的使用

Kafka中的事务特性主要用于以下两种场景:

  • 生产者发送多条消息可以封装在一个事务中,形成一个原子操作。多条消息要么都发送成功,要么都发送失败。

  • read-process-write模式:将消息消费和生产封装在一个事务中,形成一个原子操作。在一个**流式处理**的应用中,常常一个服务需要从上游接收消息,然后经过处理后送达到下游,这就对应着消息的消费和生成。

    当事务中仅仅存在Consumer消费消息的操作时,它和Consumer手动提交Offset并没有区别。因此单纯的消费消息并不是Kafka引入事务机制的原因,单纯的消费消息也没有必要存在于一个事务中。
    

三、事务相关的API

1, api

    /*** 初始化事务*/public void initTransactions();/*** 开启事务*/public void beginTransaction() throws ProducerFencedException ;/*** 在事务内提交已经消费的偏移量*/public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) ;/*** 提交事务*/public void commitTransaction() throws ProducerFencedException;/*** 丢弃事务*/public void abortTransaction() throws ProducerFencedException ;

2,事务配置

2,1 生产者

需要设置transactional.id属性。
设置了transactional.id属性后,enable.idempotence属性会自动设置为true。

2.2 消费者

 需要设置isolation.level = read_committed,这样Consumer只会读取已经提交了事务的消息。另外,需要设置enable.auto.commit = false来关闭自动提交Offset功能。

四、事务使用示例

1, 需求

在Kafka的topic:ods_user中有一些用户数据,数据格式如下:
 
姓名,性别,出生日期
张三,1,1980-10-09
李四,0,1985-11-01
 
我们需要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic:dwd_user中。要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

2,控制台模拟数据

# 创建名为ods_user和dwd_user的主题
bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic ods_user --partitions 3 --replication-factor 2
bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic dwd_user --partitions 3 --replication-factor 2
# 生产数据到 ods_user
bin/kafka-console-producer.sh --broker-list node-1:9092 --topic ods_user
# 从dwd_user消费数据
bin/kafka-console-consumer.sh --bootstrap-server node-1:9092 --topic dwd_user --from-beginning

3, 详细代码

public class TransUse {public static void main(String[] args) {Consumer<String, String> consumer = createConsumer();Producer<String, String> producer = createProduceer();// 初始化事务producer.initTransactions();while(true) {try {// 1. 开启事务producer.beginTransaction();// 2. 定义Map结构,用于保存分区对应的offsetMap<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();// 2. 拉取消息ConsumerRecords<String, String> records = consumer.poll(2000);for (ConsumerRecord<String, String> record : records) {// 3. 保存偏移量offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));// 4. 进行转换处理String[] fields = record.value().split(",");fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";String message = fields[0] + "," + fields[1] + "," + fields[2];// 5. 生产消息到dwd_userproducer.send(new ProducerRecord<>("dwd_user", message));}// 6. 提交偏移量到事务producer.sendOffsetsToTransaction(offsetCommits, "ods_user");// 7. 提交事务producer.commitTransaction();} catch (Exception e) {// 8. 放弃事务producer.abortTransaction();}}}// 1. 创建消费者public static Consumer<String, String> createConsumer() {// 1. 创建Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "node-1:9092");props.setProperty("group.id", "ods_user");props.put("isolation.level","read_committed");props.setProperty("enable.auto.commit", "false");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2. 创建Kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅要消费的主题consumer.subscribe(Arrays.asList("ods_user"));return consumer;}// 2. 创建生产者public static Producer<String, String> createProduceer() {// 1. 创建生产者配置Properties props = new Properties();props.put("bootstrap.servers", "node-1:9092");props.put("transactional.id", "dwd_user");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建生产者Producer<String, String> producer = new KafkaProducer<>(props);return producer;}}

4,异常模拟

// 3. 保存偏移量
offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));
// 4. 进行转换处理
String[] fields = record.value().split(",");
fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
String message = fields[0] + "," + fields[1] + "," + fields[2];// 模拟异常
int i = 1/0;// 5. 生产消息到dwd_user
producer.send(new ProducerRecord<>("dwd_user", message));

 我们发现,可以消费到消息,但如果中间出现异常的话,offset是不会被提交的,除非消费、生产消息都成功,才会提交事务。

转自:

kafka事务使用和编程示例_青眼酷白龙的博客-CSDN博客_kafka事务使用

-------------------------------------------

一、事务场景

  1. 最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。
  2. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。
  3. kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交。
  4. producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。
  5. 流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别。

二、几个关键概念和推导

1.因为producer发送消息可能是分布式事务,所以引入了常用的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引入的Group Coordinator在选举和failover上面类似。

2.事务管理中事务日志是必不可少的,kafka使用一个内部topic来保存事务日志,这个设计和之前使用内部topic保存位点的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态。
3.因为事务存在commit和abort两种操作,而客户端又有read committed和read uncommitted两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control Message。
4.producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有一个唯一标识符来进行关联,这个就是TransactionalId,一个producer挂了,另一个有相同TransactionalId的producer能够接着处理这个事务未完成的状态。注意不要把TransactionalId和数据库事务中常见的transaction id搞混了,kafka目前没有引入全局序,所以也没有transaction id,这个TransactionalId是用户提前配置的。
5. TransactionalId能关联producer,也需要避免两个使用相同TransactionalId的producer同时存在,所以引入了producer epoch来保证对应一个TransactionalId只有一个活跃的producer epoch

三、事务语义

2.1.  多分区原子写入

事务能够保证Kafka topic下每个分区的原子写入。事务中所有的消息都将被成功写入或者丢弃。例如,处理过程中发生了异常并导致事务终止,这种情况下,事务中的消息都不会被Consumer读取。现在我们来看下Kafka是如何实现原子的“读取-处理-写入”过程的。

首先,我们来考虑一下原子“读取-处理-写入”周期是什么意思。简而言之,这意味着如果某个应用程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进行了一些处理(如B = F(A))之后将消息B写入topic tp1,则只有当消息A和B被认为被成功地消费并一起发布,或者完全不发布时,整个读取过程写入操作是原子的。

现在,只有当消息A的偏移量X被标记为消耗时,消息A才被认为是从topic tp0消耗的,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写入一个名为__consumer_offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给__consumer_offsets topic时才被认为成功消费。

由于offset commit只是对Kafkatopic的另一次写入,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原子写入也启用原子“读取-处理-写入”循环:提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分,所以整个步骤都是原子的。

kafka系列九、kafka事务原理、事务API和使用场景 - 小人物的奋斗 - 博客园

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329822.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[初级]Java中的switch对整型、字符型、字符串的具体实现细节

转载自 [初级]Java中的switch对整型、字符型、字符串的具体实现细节Java 7中&#xff0c;switch的参数可以是String类型了&#xff0c;这对我们来说是一个很方便的改进。到目前为止switch支持这样几种数据类型&#xff1a;byteshort int char String 。但是&#xff0c;作为一个…

SpringBoot-Cache整合redis

前言 SpringBoot的众多Starter有两个很重要的缓存Starter&#xff0c;其中一个是我们经常用到的Redis&#xff08;spring-boot-starter-data-redis&#xff09;还有一个是 spring-boot-starter-cache。 今天主要是简单介绍一个如何整合这两个组件&#xff0c;达到相互合作的关系…

C#跨平台物联网通讯框架ServerSuperIO(SSIO)

一.SSIO的特点 轻型高性能通信框架&#xff0c;适用于多种应用场&#xff0c;轮询模式、自控模式、并发模式和单例模式。设备驱动、IO通道、控制模式场景协调统一。设备驱动内轩命令驱动器、命令缓存器、自定义参数和实时数据元素。框架平台支持按设备命令优先级别进行调度&…

spring boot 单元测试_spring-boot-plus1.2.0-RELEASE发布-快速打包-极速部署-在线演示

spring-boot-plusspring-boot-plus集成spring boot常用开发组件的后台快速开发脚手架Purpose每个人都可以独立、快速、高效地开发项目&#xff01;Everyone can develop projects independently, quickly and efficiently&#xff01;官网地址&#xff1a;springboot.plusGITHU…

在Java中如何高效的判断数组中是否包含某个元素

转载自 在Java中如何高效的判断数组中是否包含某个元素如何检查一个数组(无序)是否包含一个特定的值&#xff1f;这是一个在Java中经常用到的并且非常有用的操作。同时&#xff0c;这个问题在Stack Overflow中也是一个非常热门的问题。在投票比较高的几个答案中给出了几种不同的…

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍

【README】 0&#xff0c;为啥要看 DefaultKafkaProducerFactory&#xff1f; 最近在基于 springboot 开发kafka模块&#xff0c;发现 kafakTemplate构造器传入了 DefaultKafkaProducerFactory实例&#xff0c; kafkaTemplate内部使用了 很多 DefaultKafkaProducerFactory的方…

【SpringSecurity】【JJWT】JJWT踩坑LocalDateTime

前言 最近自己又在开始闲搞&#xff0c;主要原因还是下山无望&#xff08;买显卡&#xff09;。只能晚上下班找点事情做啦~~ 环境 版本请根据实际情况参考JJWT官网选择使用&#xff0c;这里只说明一下问题大概思路&#xff01; <!-- 增加token生成依赖 --> <depen…

针对Linux ASP.NET MVC网站中 httpHandlers配置无效的解决方案

近期有Linux ASP.NET用户反映&#xff0c;在MVC网站的Web.config中添加 httpHandlers 配置用于处理自定义类型&#xff0c;但是在运行中并没有产生预期的效果&#xff0c;服务器返回了404&#xff08;找不到网页&#xff09;错误。经我亲自测试&#xff0c;在WebForm网站中&…

简单介绍Java中Comparable和Comparator

转载自 简单介绍Java中Comparable和ComparatorComparable 和 Comparator是Java核心API提供的两个接口&#xff0c;从它们的名字中&#xff0c;我们大致可以猜到它们用来做对象之间的比较的。但它们到底怎么用&#xff0c;它们之间有又哪些差别呢&#xff1f;下面有两个例子可以…

spring-kafka整合:KafkaTemplate-kafka模板类介绍

【README】 1&#xff0c;本文主要关注 KafkaTemplate的重点方法&#xff0c;并非全部方法&#xff1b; 2&#xff0c;KafkaTemplate 底层依赖于 DefaultKafkaProducerFactory &#xff0c; 关于 DefaultKafkaProducerFactory 的介绍&#xff0c;refer2 spring-kafka整合:…

安卓 on a null object reference_详解Object.prototype.__proto__

Object.prototype 的 __proto__ 属性是一个访问器属性(一个getter函数和一个setter函数), 暴露了通过它访问的对象的内部[[Prototype]] (一个对象或 null)。使用__proto__是有争议的&#xff0c;也不鼓励使用它。因为它从来没有被包括在EcmaScript语言规范中&#xff0c;但是现…

【SpringBoot】服务器JVM远程调试

目的 当系统部署到测试环境服务器时&#xff0c;难免会遇到bug。这个时候如果能远程调试&#xff0c;那么能够大大提高我们的生产效率&#xff0c;快速完成服务调试&#xff0c;最快发布生产环境。&#xff08;领导好评不就到手了&#xff09; 准备 Idea&#xff08;Java最好…

图说:为什么Java中的字符串被定义为不可变的

转载自 图说&#xff1a;为什么Java中的字符串被定义为不可变的字符串&#xff0c;想必大家最熟悉不过了&#xff0c;通常我们在代码中有几种方式可以创建字符串&#xff0c;比如&#xff1a;String s "Hollis";这时&#xff0c;其实会在堆内存中创建一个字符串对象…

值得推荐的微软技术公众号推荐

为开阔技术人眼界&#xff0c;促进技术人职业成长。小二在此诚意推荐最值得关注的微软技术公众号。平时关注推送的文章或多或少要么学到知识技能&#xff0c;要么收到一些启发&#xff0c;有利于个人成长。这些公众号为笔者个人积累&#xff0c;不一定都合大家口味&#xff0c;…

springboot:BeanPostProcessor示例及分析

【README】 1&#xff0c;本文主要分析 BeanPostProcessor 的作用&#xff0c; 开发方式&#xff1b; 2&#xff0c;BeanPostProcessor 是bean后置处理器&#xff0c; 简而言之就是bean被创建好了&#xff0c;之后如果需要对其属性进行修改&#xff0c;则 需要使用 BeanPost…

实现了某一个接口的匿名类的例子_java中的内部类内部接口详解,一文搞定

简介一般来说&#xff0c;我们创建类和接口的时候都是一个类一个文件&#xff0c;一个接口一个文件&#xff0c;但有时候为了方便或者某些特殊的原因&#xff0c;java并不介意在一个文件中写多个类和多个接口&#xff0c;这就有了我们今天要讲的内部类和内部接口。内部类先讲内…

Java 8 日期和时间解读

转载自 Java 8 日期和时间解读现在&#xff0c;一些应用程序仍然在使用java.util.Date和java.util.Calendar API和它们的类库&#xff0c;来使我们在生活中更加轻松的处理日期和时间&#xff0c;比如&#xff1a;JodaTime。然而&#xff0c;Java 8 引进的新的类库来处理日期和时…

云计算产值将超3000亿美元 亚马逊微软谷歌居三甲

腾讯科技讯 3月27日消息&#xff0c;据外电报道&#xff0c;云计算曾经主要是无法承担建造和维护基础设施的初创公司的解决方案&#xff0c;但对于管理数字业务的大型企业而言&#xff0c;云计算正快速成为省钱的管理数字业务的方式。市场调研公司IDC在上月的一份调查数据显示&…

oracle中join另一个表后会查询不出一些数据_面试必备 | 8个Hive数据仓工具面试题锦集!...

是新朋友吗&#xff1f;记得先点蓝字关注我哦&#xff5e;今日课程菜单Java全栈开发 | Web前端H5大数据开发 | 数据分析人工智能Python | 人工智能物联网进入数据时代&#xff0c;大数据技术成为互联网发展的核心要素之一。与此同时大数据开发工程师的薪资也成为行业内高薪的代…

springboot-Initializer例子及分析

【README】 1&#xff0c;本文主要编写了 初始化器例子并分析了其调用路径&#xff1b; 2&#xff0c;初始化器的执行顺序 先于 后置处理器&#xff1b; 后置处理器&#xff0c;refer2 springboot&#xff1a;BeanPostProcessor示例及分析_PacosonSWJTU的博客-CSDN博客【RE…