关于flink两阶段提交高并发下程序卡住问题

先抛出代码

package com.dpf.flink;import com.dpf.flink.sink.MysqlSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;public class MysqlTwoPhaseCommit {//topicprivate static final String topic_ExactlyOnce = "TwoPhaseCommit";public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度,为了方便测试,查看消息的顺序,这里设置为1,可以更改为多并行度env.setParallelism(1);//checkpoint的设置//每隔10s进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(30000);//设置模式为:exactly_one,仅一次语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//确保检查点之间有1s的时间间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//检查点必须在10s之内完成,或者被丢弃【checkpoint超时时间】env.getCheckpointConfig().setCheckpointTimeout(10000);//同一时间只允许进行一次检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地env.setStateBackend(new FsStateBackend("file:///Users/david.dong/tmp/flink/checkpoint"));//设置kafka消费参数Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, topic_ExactlyOnce);/*SimpleStringSchema可以获取到kafka消息,JSONKeyValueDeserializationSchema可以获取都消息的key,value,metadata:topic,partition,offset等信息*/FlinkKafkaConsumer<String> kafkaConsumer011 = new FlinkKafkaConsumer<>(topic_ExactlyOnce,new SimpleStringSchema(),properties);//加入kafka数据源DataStreamSource<String> streamSource = env.addSource(kafkaConsumer011);SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = streamSource.map(str -> Tuple2.of(str, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));tupleStream.print();//数据传输到下游tupleStream.addSink(new MysqlSink()).name("MySqlTwoPhaseCommitSink");//触发执行env.execute("StreamDemoKafka2Mysql");}
}
package com.dpf.flink.sink;import com.dpf.flink.utils.DBConnectUtil;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;public class MysqlSink extends TwoPhaseCommitSinkFunction<Tuple2<String,Integer>, Connection,Void> {private static final Logger log = LoggerFactory.getLogger(MysqlSink.class);public MysqlSink() {super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);}/*** 执行数据库入库操作  task初始化的时候调用* @param connection* @param tuple* @param context* @throws Exception*/@Overrideprotected void invoke(Connection connection, Tuple2<String, Integer> tuple, Context context) throws Exception {log.info("start invoke...");String value = tuple.f0;Integer total = tuple.f1;String sql = "update student set name = 'aaa' where id = 1";log.info("====执行SQL:{}===",sql);PreparedStatement ps = connection.prepareStatement(sql);ps.setString(1, value);ps.setInt(2, total);ps.setLong(3, System.currentTimeMillis());log.info("要插入的数据:{}----{}",value,total);if (ps != null) {String sqlStr = ps.toString().substring(ps.toString().indexOf(":")+2);log.error("执行的SQL语句:{}",sqlStr);}//执行insert语句ps.execute();}/*** 获取连接,开启手动提交事物(getConnection方法中)* @return* @throws Exception*/@Overrideprotected Connection beginTransaction() throws Exception {log.info("start beginTransaction.......");String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";Connection connection = DBConnectUtil.getConnection(url, "root", "12345678");return connection;}/***预提交,这里预提交的逻辑在invoke方法中* @param connection* @throws Exception*/@Overrideprotected void preCommit(Connection connection) throws Exception {log.info("start preCommit...");}/*** 如果invoke方法执行正常,则提交事务* @param connection*/@Overrideprotected void commit(Connection connection) {log.info("start commit...");DBConnectUtil.commit(connection);}/*** 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行* @param connection*/@Overrideprotected void abort(Connection connection) {log.info("start abort rollback...");DBConnectUtil.rollback(connection);}
}
package com.dpf.flink.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;/*** 数据库连接工具类*/
public class DBConnectUtil {private static final Logger log = LoggerFactory.getLogger(DBConnectUtil.class);/*** 获取连接** @param url* @param user* @param password* @return* @throws SQLException*/public static Connection getConnection(String url, String user, String password) throws SQLException {Connection conn = null;try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {log.error("获取mysql.jdbc.Driver失败");e.printStackTrace();}try {conn = DriverManager.getConnection(url, user, password);log.info("获取连接:{" + conn + "} 成功...");} catch (Exception e) {log.error("获取连接失败,url:" + url + ",user:" + user);}//设置手动提交conn.setAutoCommit(false);return conn;}/*** 提交事务*/public static void commit(Connection conn) {if (conn != null) {try {conn.commit();} catch (SQLException e) {log.error("提交事务失败,Connection:" + conn);e.printStackTrace();} finally {close(conn);}}}/*** 事务回滚** @param conn*/public static void rollback(Connection conn) {if (conn != null) {try {conn.rollback();} catch (SQLException e) {log.error("事务回滚失败,Connection:" + conn);e.printStackTrace();} finally {close(conn);}}}/*** 关闭连接** @param conn*/public static void close(Connection conn) {if (conn != null) {try {conn.close();} catch (SQLException e) {log.error("关闭连接失败,Connection:" + conn);e.printStackTrace();}}}
}

这部分代码网上抄的,但是大致不差

前提:

1.source消息密集,全据并行度设置1

2.sink就执行update操作,并且就update同一条数据,为了更好验证问题

结果:

这边我尝试了很多次,中间有时候能顺利执行,但是有时候程序在sink这里卡住了,过一段时间就报错socket interrupt异常。

我的分析:

1.首先设置ck的间隔是10秒一次,那么当ck barrier到达sink算子的时候,就会进行预提交,并且立刻开启一个新事物用来处理后续的消息。那么这里就会出现多事务同时存在的情况,比如预提交的事务A,和新开启的事务B

2.新事物B开启后立刻就可以继续处理后续到来的消息。

3.那么此时如果事务A预提交后,他需要等待来自JobManager的complete指令,到代码层面也就是调用notifyComplete方法来进行commit。那么加入在这个期间,就是还没有收到complete指令的时候,事务B已经执行到了  ps.execute();这里,此时事务B就会卡住,因为他们都是操作同一条数据,那么问题来了,此时程序已经卡住了,也就是线程卡住了,那么此时就算JobManager发送complete指令了,然后调用notifyComplete方法,但是,此时没有线程执行这个方法!(因为主线程卡在了(ps.execute();这里)所以整个程序就彻底卡住了。

然后flnik dag上看到的是sink红了,前面的节点都黑了,就是背压瞬间就很严重了

以上是我对flink两阶段提交存在的问题的分析,我的source消息大概100多万,我分析是这个原因,如果分析的不对,还请大佬帮我分析下我哪里理解不对?或者为什么会卡住?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/77800.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html css js网页制作成品——HTML+CSS+js美甲店网页设计(5页)附源码

美甲店 目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&a…

LeetCode[347]前K个高频元素

思路&#xff1a; 使用小顶堆&#xff0c;最小的元素都出去了&#xff0c;省的就是大&#xff0c;高频的元素了&#xff0c;所以要维护一个小顶堆&#xff0c;使用map存元素高频变化&#xff0c;map存堆里&#xff0c;然后输出堆的东西就行了 代码&#xff1a; class Solution…

2024年网站开发语言选择指南:PHP/Java/Node.js/Python如何选型?

2024年网站开发语言选择指南&#xff1a;PHP/Java/Node.js/Python如何选型&#xff1f; 一、8大主流Web开发语言技术对比 1. PHP开发&#xff1a;中小型网站的首选方案 最新版本&#xff1a;PHP 8.3&#xff08;2023年11月发布&#xff09;核心优势&#xff1a; 全球78%的网站…

从数据结构说起(一)

1 揭开数据结构神奇的面纱 1.1 初识数据结构 在C的标准库模板&#xff08;Standard Template Library,STL&#xff09;课程上&#xff0c;我初次结识了《数据结构》。C语言提供的标准库模板是面向对象程序设计与泛型程序设计思想相结合的典范。所谓的泛型编程就是编写不依赖于具…

JAVA--- 关键字static

之前我们学习了JAVA 面向对象的一些基本知识&#xff0c;今天来进阶一下&#xff01;&#xff01;&#xff01; static关键字 static表示静态&#xff0c;是JAVA中的一个修饰符&#xff0c;可以修饰成员方法&#xff0c;成员变量&#xff0c;可用于修饰类的成员&#xff08;变…

4.27比赛总结

文章目录 T1T2法一&#xff1a;倍增求 LCA法二&#xff1a;Dijkstra 求最短路法三&#xff1a;dfs 求深度 T3T4总结 T1 一道非常简单的题&#xff0c;结果我因为一句话没写挂了 80pts…… 题目中没写 a a a 数组要按照 b b b 数组的顺序&#xff0c;所以对于最大方案&#x…

数据一致性巡检总结:基于分桶采样的设计与实现

数据一致性巡检总结&#xff1a;基于分桶采样的设计与实现 背景 在分布式系统中&#xff0c;缓存&#xff08;如 Redis&#xff09;与数据库&#xff08;如 MySQL&#xff09;之间的数据一致性问题是一个常见的挑战。由于缓存的引入&#xff0c;数据在缓存和数据库之间可能存…

SpringBoot与Druid整合,实现主从数据库同步

通过引入主从数据库同步系统&#xff0c;可以显著提升平台的性能和稳定性&#xff0c;同时保证数据的一致性和安全性。Druid连接池也提供了强大的监控和安全防护功能&#xff0c;使得整个系统更加健壮和可靠。 我们为什么选择Druid&#xff1f; 高效的连接管理&#xff1a;Dru…

在Linux系统中安装MySQL,二进制包版

1、检查是否已安装数据库&#xff08;rpm软件包管理器&#xff09; rpm -qa | grep mysql rpm -qa | grep mariadb #centOS7自带mariadb与mysql数据库冲突2、删除已有数据库 rpm -e –nodeps 软件名称 3、官网下载MySQL包 4、上传 # 使用FinalShell或Xshell工具上传&#…

【含文档+PPT+源码】基于SpringBoot电脑DIY装机教程网站的设计与实现

项目介绍 本课程演示的是一款 基于SpringBoot电脑DIY装机教程网站的设计与实现&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套…

Spring Boot 缓存机制:从原理到实践

文章目录 一、引言二、Spring Boot 缓存机制原理2.1 缓存抽象层2.2 缓存注解2.3 缓存管理器 三、入门使用3.1 引入依赖3.2 配置缓存3.3 启用缓存3.4 使用缓存注解3.5 实体类 四、踩坑记录4.1 缓存键生成问题4.2 缓存过期与更新问题4.3 事务与缓存的一致性问题 五、心得体会5.1 …

Spark读取Apollo配置

--conf spark.driver.extraJavaOptions-Dapp.idapollo的app.id -Denvfat -Dapollo.clusterfat -Dfat_metaapollo的meta地址 --conf spark.executor.extraJavaOptions-Dapp.idapollo的app.id -Denvfat -Dapollo.clusterfat -Dfat_metaapollo的meta地址 在spark的提交命令中&…

[逆向工程]如何理解小端序?逆向工程中的字节序陷阱与实战解析

[逆向工程]如何理解小端序&#xff1f;逆向工程中的字节序陷阱与实战解析 关键词&#xff1a;逆向工程、小端序、字节序、二进制分析、数据解析 引言&#xff1a;为什么字节序是逆向工程师的必修课&#xff1f; 在逆向工程中&#xff0c;分析二进制数据是最基础的任务之一。…

项目三 - 任务2:创建笔记本电脑类(一爹多叔)

在本次实战中&#xff0c;我们通过Java的单根继承和多接口实现特性&#xff0c;设计了一个笔记本电脑类。首先创建了Computer抽象类&#xff0c;提供计算的抽象方法&#xff0c;模拟电脑的基本功能。接着定义了NetCard和USB两个接口&#xff0c;分别包含连接网络和USB设备的抽象…

ElasticSearch深入解析(六):集群核心配置

1.开发模式和生产模式 Elasticsearch默认运行在开发模式下&#xff0c;此模式允许节点在配置存在错误时照常启动&#xff0c;仅将警告信息写入日志文件。而生产模式则更为严格&#xff0c;一旦检测到配置错误&#xff0c;节点将无法启动&#xff0c;这是一种保障系统稳定性的安…

【Prometheus-MySQL Exporter安装配置指南,开机自启】

目录 1. 创建 MySQL 监控用户2. 配置 MySQL 认证文件3. 安装 mysqld_exporter4. 配置 Systemd 服务5. 启动并验证服务6. 修改Prometheus配置常见错误排查错误现象排查步骤 6. 验证监控数据关键注意事项 7. Grafana看板 1. 创建 MySQL 监控用户 mysql -uroot -p123456 # 登录M…

redis未授权访问漏洞学习

一、Redis常见用途 1. Redis介绍 全称与起源: Redis全称Remote Dictionary Service(远程字典服务)&#xff0c;最初由antirez在2009年开发&#xff0c;用于解决网站访问记录统计的性能问题。发展历程: 从最初仅支持列表功能的内存数据库&#xff0c;经过十余年发展已支持多种…

4.27搭建用户界面

更新 router下面的index.js添加新的children 先区分一下views文件夹下的不同vue文件&#xff1a; Home.vue是绘制home页面的所有的表格。 Main.vue是架构头部和左侧目录的框架的。 研究一下这个routes对象&#xff0c;就可以发现重定向redirect的奥妙所在&#xff0c;我们先把…

【MySQL】(8) 联合查询

一、联合查询的作用 由于范式的规则&#xff0c;数据分到多个表中&#xff0c;想要查询完整的信息&#xff0c;就需要联合查询多张表。比如查询学生的学生信息和所在班级的信息&#xff0c;就需要联合查询学生表和班级表。 二、联合查询过程 案例&#xff1a;查询学生姓名为孙…

图漾官网Sample_V1版本C++语言完整参考例子---单相机版本

文章目录 1.参考例子 主要梳理了图漾官网Sample_V1版本的例子 1.参考例子 主要增加了从storage区域读取相机参数的设置&#xff0c;使用图漾PercipioViewer软件&#xff0c;如何将相机参数保存到srorage区&#xff0c;可参考链接&#xff1a;保存相机参数操作 保存参数设置 注…