FlinkPipelineComposer 详解

FlinkPipelineComposer 详解

原文

背景

在flink-cdc 3.0中引入了pipeline机制,提供了除Datastream api/flink sql以外的一种方式定义flink 任务

通过提供一个yaml文件,描述source sink transform等主要信息

由FlinkPipelineComposer解析,自动调用DataStream api进行构建

官方样例

 source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2

目前可以通过source配置的源只有mysql 和 values

values是调试用的,所以可以说当前这个功能是专门为“mysql同步数据到各个sink”的场景使用的

目前可以使用的sink有

  1. doris
  2. elasticsearch
  3. kafka
  4. paimon
  5. starrocks
  6. values

FlinkPipelineComposer

我们以mysql -> values来观察 FlinkPipelineComposer 是如何通过读取yaml文件的定义来构建DataStream的

values会将mysql产生的cdc消息打印到stdout上

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: x.x.x.xport: 3306username: usernamepassword: passwordtables: test.t1server-id: 5400-5404server-time-zone: UTC+8sink:type: valuesname: values Sinkpipeline:name: Sync Mysql Database to Valuesparallelism: 2

首先来观察一下这个任务提交到flink集群后具体的链路构成

在这里插入图片描述

结合官方给出的架构

在这里插入图片描述

可以看出,“一个source,一个sink”的yaml定义,最终会生成5个operator

  1. Souce: Flink CDC Event Source: mysql
  2. SchemaOperator
  3. PrePartition

-------------- shuffle --------------

  1. PostPartion
  2. Sink Writer: values Sink

Souce: Flink CDC Event Source: mysql

负责

  1. 创建枚举器
  2. 创建reader
  3. 枚举split分发给reader
  4. reader读取数据生成事件

SchemaOperator

负责和JobMaster上的coodinator沟通,执行schema evolution 相关逻辑,见Flink CDC Schema Evolution 详解

PrePartition

负责

  1. 广播FlushEvent
  2. 广播SchemaChangeEvent
  3. shuffle普通消息到下游

PostPartion

Sink Writer: values Sink

写入下游,values sink当前到实现是打印到stdout

源码解析

接下来分析,FlinkPipelineComposer 读取 yaml 构造DataStream的细节

CliFrontend#main

CliFrontend.java:54

args

在这里插入图片描述

createExecutor 创建 executor CliFrontend.java:76

调用CliExecutor#run CliExecutor.java:70

看一下解析得到的pipelineDef
在这里插入图片描述

这里已经从yaml文件中解析出了source和sink的配置了

composer.compose 调用compose方法开始使用DataStream api进行构建

FlinkPipelineComposer.java:92 FlinkPipelineComposer#compose

声明了5个translator,其中第一个sourceTranslator会生成DataStream<Event> stream,而其他的translator基于这个stream作为input,调用transform方法,放入对应阶段的operator

DataSourceTranslator sourceTranslator = new DataSourceTranslator();
...
TransformTranslator transformTranslator = new TransformTranslator();
...
SchemaOperatorTranslator schemaOperatorTranslator =...
...
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
...
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
...

translate的调用顺序如下

DataStream<Event> stream =sourceTranslator.translate(...
stream =transformTranslator.translatePreTransform(...
stream =transformTranslator.translatePostTransform(...
stream =schemaOperatorTranslator.translate(...
stream =partitioningTranslator.translate(...
sinkTranslator.translate(pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());return new FlinkPipelineExecution(env...)...

逐一说明

  1. sourceTranslator.translate 通过source名字获取sourceProvider,关联到stream中
  • sourceProvider.getSource ->
    • MysqlSource ->
      • createReader
      • createEnumerator
  1. stream = transformTranslator.translatePreTransform
if (transforms.isEmpty()) {return input;
}

由于有如上代码,我们的yaml中没有涉及,所以忽略这个transform

  1. stream = transformTranslator.translatePostTransform
  • 同上
  1. stream = schemaOperatorTranslator.translate
  • 插入一个schemaOperator节点,在收到schemaChangeEvent的时候
    1. 停住当前流
    2. 上报coodinator
    3. flush下游数据,让sink消耗完已有数据
    4. sink 通知coodinator flush完成
    5. coodinator调用sink注册的MetaApplier完成schema变更,变更完成后通知schemaOperator
    6. schemaOperator重新放通数据
  1. stream = partitioningTranslator.translate
  • 构建prePartition postPartition节点
  1. sinkTranslator.translate
  • 构建sink节点
  1. FlinkPipelineExecution 中的 execute 方法调用 env.executeAsync(jobName)

总结

flink-cdc 3.0 提供的pipeline模式,通过定义yaml,自动构建了一条cdc pipeline,避免手动调用datastream api,并且支持schema evolution

构建的主要逻辑集中在 FlinkPipelineComposer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/60363.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Zustand浅学习

道阻且长&#xff0c;行而不辍&#xff0c;未来可期 之前只是会使用zustand,也没仔细看过zustand的文档&#xff0c;前段时间一个合约朋友问我前端的zustand怎么用&#xff0c;啊&#xff0c;这&#xff0c;是那个笑起来明媚的不像话的帅哥问我问题诶&#xff0c;那我得认真一下…

海量数据迁移:Elasticsearch到OpenSearch的无缝迁移策略与实践

文章目录 一&#xff0e;迁移背景二&#xff0e;迁移分析三&#xff0e;方案制定3.1 使用工具迁移3.2 脚本迁移 四&#xff0e;方案建议 一&#xff0e;迁移背景 目前有两个es集群&#xff0c;版本为5.2.2和7.16.0&#xff0c;总数据量为700T。迁移过程需要不停服务迁移&#…

TypeScript:现代 JavaScript 的超级集

目录 为什么使用 TypeScript? TypeScript 的基本特性 TypeScript 的优势 TypeScript项目实战 简单的命令行任务管理系统 TypeScript 是由微软开发的一个开源编程语言,它是 JavaScript 的一个严格超集。TypeScript 的核心特性是静态类型检查,使得开发者可以在编写代码时…

‌MySQL 5.7和8.0版本在多个方面存在显著区别,主要包括性能优化、新特性引入以及安全性提升

性能优化‌ ‌编码器和解码器‌&#xff1a;MySQL 8.0引入了更快和更高效的编码器和解码器&#xff0c;支持压缩、加密、并发等方面的优化&#xff0c;而MySQL 5.7的编码器和解码器相对较慢。‌认证方式‌&#xff1a;MySQL 8.0默认使用caching_sha2_password作为登录认证插件&…

【贪心算法】贪心算法三

贪心算法三 1.买卖股票的最佳时机2.买卖股票的最佳时机 II3.K 次取反后最大化的数组和4.按身高排序5.优势洗牌&#xff08;田忌赛马&#xff09; 点赞&#x1f44d;&#x1f44d;收藏&#x1f31f;&#x1f31f;关注&#x1f496;&#x1f496; 你的支持是对我最大的鼓励&#…

QtLua

描述 QtLua 库旨在使用 Lua 脚本语言使 Qt4/Qt5 应用程序可编写脚本。它是 QtScript 模块的替代品。 QtLua 不会为 Qt 生成或使用生成的绑定代码。相反&#xff0c;它提供了有用的 C 包装器类&#xff0c;使 C 和 lua 对象都可以从 lua 和 C 访问。它利用 Qt 元对象系统将 QOb…

Devops业务价值流:敏捷测试最佳实践

在迭代增量开发模式下&#xff0c;我们强调按照用户故事的优先级进行软件小功能的频繁交付。由于迭代周期紧凑&#xff0c;测试与开发活动往往并行进行&#xff0c;测试时间相对有限。为确保在这种快节奏的开发环境中依然能够保持产品质量&#xff0c;我们特制定以下测试阶段的…

el-table 纵向垂直表头处理

项目中表格展示会遇到需要纵向垂直表头情况&#xff0c;下面&#xff0c;我们基于el-table组件来实现这种表格。 以下是这次需要用到的数据表格&#xff0c;已知左侧违章名称是固定的&#xff0c;而月份是不固定的&#xff0c;在后端返回数据格式已确定的情况下&#xff0c;需…

Rust 模板匹配——根据指定图片查找处于大图中的位置(支持GPU加速)

Rust 模板匹配——根据指定图片查找处于大图中的位置(支持GPU加速) 01 前言 在手搓RPA工具的时候,总会碰到不好定位的情况,那么,就需要根据小图来找到对应屏幕上的位置(以图识图),这个需求也比较简单。想到市面上也有不少RPA工具都有这个功能,那么人家有的,俺也可以…

Spring框架之策略模式 (Strategy Pattern)

策略模式&#xff08;Strategy Pattern&#xff09;详解 策略模式&#xff08;Strategy Pattern&#xff09;是一种行为型设计模式&#xff0c;用于定义一系列算法&#xff0c;并将每种算法封装到独立的策略类中&#xff0c;使它们可以相互替换&#xff0c;从而使算法的变化独…

HDFS和HBase跨集群数据迁移 源码

HDFS集群间数据迁移&#xff08;hadoop distcp&#xff09; hadoop distcp \ -pb \ hdfs://XX.14.36.205:8020/user/hive/warehouse/dp_fk_tmp.db/ph_cash_order \ hdfs://XX.18.32.21:8020/user/hive/warehouse/dp_fksx_mart.db/HBase集群间数据&#xff08;hbase ExportSnap…

Sql server查询数据库表的数量

SELECT count(*) FROM sys.objects WHERE typeU --统计表数量 SELECT NAME FROM sys.objects WHERE typeU --列出表名称 或者 SELECT COUNT(*) FROM SysObjects Where XTypeU --统计表数量 SELECT Name FROM SysObjects Where XTypeU --列出表名称 --判断字…

浅谈单片机的gcc优化级别__以双音频信号发生器为例

IDE&#xff1a; CLion HOST&#xff1a; Windows 11 MinGW&#xff1a;x86_64-14.2.0-release-posix-seh-ucrt-rt_v12-rev0 GCC&#xff1a; arm-gnu-toolchain-13.3.rel1-mingw-w64-i686-arm-none-eabi 一、简介 gcc有多种优化级别&#xff0c;一般不选择的情况下&#x…

Ceph MDS高可用架构探索:从零到一构建多主一备MDS服务

文章目录 Ceph实现MDS服务多主一备高可用架构当前 mds 服务器状态添加 MDS 服务器验证ceph集群当前状态当前的文件系统状态设置处于激活状态 mds 的数量MDS 高可用优化分发配置文件并重启 mds 服务 Ceph实现MDS服务多主一备高可用架构 Ceph 的元数据服务&#xff08;MDS&#…

PySpark 数据处理实战:从基础操作到案例分析

Spark 的介绍与搭建&#xff1a;从理论到实践_spark环境搭建-CSDN博客 Spark 的Standalone集群环境安装与测试-CSDN博客 PySpark 本地开发环境搭建与实践-CSDN博客 Spark 程序开发与提交&#xff1a;本地与集群模式全解析-CSDN博客 Spark on YARN&#xff1a;Spark集群模式…

鸿蒙 APP 发布上架

证书创建与打包: https://developer.huawei.com/consumer/cn/doc/app/agc-help-releaseharmony-0000001933963166 不同环境多渠道打包: //todo 备案相关 一、除了发布应用商店以外,还有3个渠道,都适合小规模内测。 【1】开放式测试:发给指定白名单用户 【2】发布企业内…

使用GPT-SoVITS训练语音模型

1.项目演示 阅读单句话 1725352713141 读古诗 1725353700203 2.项目环境 开发环境&#xff1a;linux 机器配置如下&#xff1a;实际使用率百分之二十几&#xff0c; 3.开发步骤 1.首先是准备数据集&#xff0c;要求是wav格式&#xff0c;一到两个小时即可&#xff0c; 2.…

Python学习从0到1 day27 Python 高阶技巧 ③ 设计模式 — 单例模式

此去经年&#xff0c;再难同游 —— 24.11.11 一、什么是设计模式 设计模式是一种编程套路&#xff0c;可以极大的方便程序的开发最常见、最经典的设计模式&#xff0c;就是我们所学习的面向对象了。 除了面向对象外,在编程中也有很多既定的套路可以方便开发,我们称之为设计模…

Go开发指南-Gin与Web开发

目录&#xff1a; (1)Go开发指南-Hello World (2)Go开发指南-Gin与Web开发 Gin 是一个用 Go 语言编写的轻量级、高性能的 Web 框架&#xff0c;主要用于构建 API 服务和微服务。由于其简洁的 API 设计和强大的路由功能&#xff0c;Gin 在 Go 社区中广受欢迎。 运行Web程序 创…

3.2 软件需求:面对过程分析模型

面对过程分析模型 1. 需求分析的模型概述1.1 面对过程分析模型-结构化分析方法1.2 结构化分析的过程 2. 功能模型&#xff1a;数据流图初步2.1 加工2.2 外部实体&#xff08;数据源点/终点&#xff09;2.3 数据流2.4 数据存储2.5 注意事项 3. 功能模型&#xff1a;数据流图进阶…