Flink Gauss CDC:深度剖析存量与增量同步的创新设计

目录

设计思路

1.为什么不直接用FlinkCDC要重写Flink Gauss CDC

2.存量同步的逻辑是什么

2.1、单主键的切片策略是什么

2.2、​​​​​复合主键作切片,怎么保证扫描到所有的数据

3、增量同步的逻辑是什么

4、存量同步结束之后如何无缝衔接增量同步 

5、下游数据如何落库

6、项目结构大概怎么样

总结


设计思路

1.为什么不直接用FlinkCDC要重写Flink Gauss CDC

GaussDB 是华为内部自研的一套数据库,提供了类似于PostgreSQL的逻辑复制插件。Gauss100 OLTP逻辑复制解析包含逻辑日志信息的REDO日志,只有当表逻辑复制开关和全局逻辑复制开关同时打开时,该表的数据才会被逻辑复制。变化的数据最终到kafka,假设对标USRSAMPLE.T1分别进行插入,更新,删除操作同步的消息格式如下:

[{"data": {"F2": "aaaaa","F1": 1},"dn": 0,"keys": null,"lsn": 295269,"msgTime": "2022-08-11 10:53:18.000999","opType": "I","scn": 598260140474369,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:53:18.000307"},{"data": {"F2": "bbbb"},"dn": 0,"keys": {"F1": 1},"lsn": 295299,"msgTime": "2022-08-11 10:53:52.000061","opType": "U","scn": 598401572352001,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:53:51.000234"},{"data": null,"dn": 0,"keys": {"F1": 1},"lsn": 295313,"msgTime": "2022-08-11 10:54:13.000824","opType": "D","scn": 598495963910145,"seq": 0,"table": "USRSAMPLE.T1","txTime": "2022-08-11 10:54:13.000210"}
]
  • long scn:System Change Number 值递增。
  • long lsn: Log Sequence Num递增序号,对应Log_group,用于表示原子操作的
  • 顺序。
  • short dn:数据库实例节点序列号,暂时没有用。
  • int seq:一个事务中的每条数据的序列号。
  • String txTime:事务在数据库中的提交时间。
  • String msgTime:json序列化一行数据的当前时间。
  • String opType:操作数据的类型(I:insert;D:delete;U:update)。
  • String table:表名,格式为(用户名.表名)。
  • HashMap<String, Object> data:增删改的数据列信息(列名和列值)。
  • HashMap<String, Object> keys:kafka.msg.version为0时,keys字段为null;
  • kafka.msg.version为1时,该字段填充delete和update的主键或者唯一索引数据列
  • 信息(列名和列值)。

逻辑复制的局限性

  1. 只能捕获开启逻辑复制之后的数据,即存量数据无法同步
  2. Flink cdc 数据格式要求有 before 和 after的数据,逻辑复制工具只有 after 数据
  3. 只捕获发生变化的字段里的数据到kafka,而不是这一行的所有字段的数据

2.存量同步的逻辑是什么

 把分片轮询均匀的分配给多个读取器,每个读取器(多个线程)可以从各自的队列中获取多个分片,从而保证并行读取分片数据后写入RowData传递给下游。

2.1、单主键的切片策略是什么

算出主键的min_value和max_value,然后根据当前算子并行度进行切片后放入

Queue<HybridSourceSplit> splits = new ConcurrentLinkedQueue<>();

需要考虑如何保证每个分片负载均衡
 

2.2、​​​​​复合主键作切片,怎么保证扫描到所有的数据

要保证复合主键的分片能覆盖所有数据,需要对多个主键列的分片进行笛卡尔积组合。这样可以得到一组互不重叠、能覆盖整个主键空间的分片。

假设有一个表,主键由 (pk1, pk2) 组成,我们分别对 pk1 和 pk2 进行分片:

  • pk1 的分片为:[(pk1_min, pk1_a), (pk1_a, pk1_b), (pk1_b, pk1_max)]
  • pk2 的分片为:[(pk2_min, pk2_x), (pk2_x, pk2_max)]

为了覆盖所有的数据,我们需要将两个主键的分片进行笛卡尔积组合,得到以下分片:

[(pk1_min, pk1_a), (pk2_min, pk2_x)]
[(pk1_min, pk1_a), (pk2_x, pk2_max)]
[(pk1_a, pk1_b),   (pk2_min, pk2_x)]
[(pk1_a, pk1_b),   (pk2_x, pk2_max)]
[(pk1_b, pk1_max), (pk2_min, pk2_x)]
[(pk1_b, pk1_max), (pk2_x, pk2_max)]

全量同步之后如何无缝衔接增量同步? 

3、增量同步的逻辑是什么

  1. 通过参考Kafka Upsert Connector的源码,重写 Kafka Connector,把ChangeEvent转为RowData,

  2. RowData是Flink内核传输数据的基本格式,其中RowKind有 INSERT,UPDATE_BEFORE, UPDATE_AFTER, DELETE 四种枚举格式和ChangeEvent 的operType相呼应
public class ChangeEvent {private long scn;private long lsn;private short dn;private int seq;private String txTime;private String msgTime;private String opType;private String table;private HashMap<String, Object> data;private HashMap<String, Object> keys;
}
@PublicEvolving
public interface RowData {int getArity();RowKind getRowKind();void setRowKind(RowKind kind);...
}

具体细节可以参考这篇博客:

Flink与Kafka集成:跨版本兼容性与性能优化实战_flink1.18.1u与kafka哪个版本-CSDN博客

4、存量同步结束之后如何无缝衔接增量同步 

  1. HybridTableSource 根据配置决定是使用【存量增量一体化读取】还是【Kafka 增量数据】。
  2. 启用【存量增量一体化读取】时,HybridParallelSource 被使用,其中包括 HybridSourceReader 和 HybridSourceEnumerator 的创建。
  3. 如果未启用并行读取,使用单线程的 ChangeEventToRowDataSourceFunction进行【Kafka 增量数据】。
  4. 在【存量增量一体化读取】中,HybridSourceEnumerator 负责分配分片,而 HybridSourceReader 负责读取数据。
  5. 存量数据读取通过 SnapshotReader 完成,增量数据读取通过 Kafka 完成,由静态锁保证多个HybridSourceReader实例最后只有一个启动 Kafka读取。

5、下游数据如何落库

  1. 为了提高算子间的数据传输效率,上游传递过来的数据最初是个二进制数据,BinaryRowData需要手动再转为GenericRowData
  2. 参考jdbc connector 把GenericRowData数据根据自定义的字段拼接sql用jdbc落库

拼接sql语句的方式,会带来新的问题:

  • 比如表有id,name,age三个字段,正常情况的Flink source -> sink时,是传递一整行的数据(id:1,name:张三,age:18),但是如果是手动拼接sql的方式,当遇到把age手动设置为null时,逻辑复制捕获的格式为(id:1,age:null),此时通过flink cdc的方式传递给sink时,数据为(id:1,name:null,age:null),导致sink端无法判断name和age是手动置为null的还是因为该字段未发生改变,导致的为null。
  • 此问题在不更改flink内核源码的情况下,通过多传递一个字段px_gauss_marks到sink,sink使用该字段识别字段是否为手动设置为null,从而正确的拼接sql
  • px_gauss_marks:如果kafka没传对应的字段,则设置为 0;如果kafka传对应的字段但是值不为null,则设置为 1; 如果kafka传对应的字段但是值为null,则设置为 2,最终拼接成一个包含012这样的字符串,下游根据这个字段做进一步解析
-- source
CREATE TABLE sourceTable (id BIGINT,name STRING,age INT,status BOOLEAN,px_gauss_marks STRING, -- 此字段传递的值用于sink端判断以上的字段是否为手动置为null的情况PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'gauss-cdc','topic' = '逻辑复制配置的topic','properties.bootstrap.servers' = '','properties.group.id' = '','table-name'='','url' = 'db url','username' = 'db username','password' = 'db password','table-name' = '','scan.startup.mode' = 'earliest-offset','enable-parallel-read' = 'true'
);-- sink
CREATE TABLE SinkTable (id BIGINT,name STRING,age INT,status BOOLEAN,px_gauss_marks STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'gauss-jdbc','url' = '','username' = '','password' = '','table-name'='','sink.buffer-flush.max-rows'='1','sink.buffer-flush.interval' = '0','sink.max-retries' = '0'
);

6、项目结构大概怎么样

所有实现DynamicTableSourceFactory接口的类都是程序的入口

总结

源码涉及商业秘密目前不便公开,但是这篇博客的设计思想借鉴了很多开源组件源码相信可以承前启后。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893400.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

idea新增java快捷键代码片段

最近在写一些算法题&#xff0c;有很多的List<List这种编写&#xff0c;想着能否自定义一下快捷键 直接在写代码输入&#xff1a;lli&#xff0c;即可看见提示

修改docker共享内存shm-size

法1&#xff1a;在创建容器时增加共享内存大小 nvidia-docker run -it -p 10000:22 --name"zm" -v /home/zm:/data ufoym/deepo:all-cu101 /bin/bash --shm-size20G法2&#xff1a;修改正在运行的容器的共享内存设置 查看容器、共享内存 docker ps -a df -lh | gr…

深度学习-91-大语言模型LLM之基于langchain的模型IO的提示模板

文章目录 1 Model的输入输出2 提示模板2.1 提示模板的特点2.2 提示模板的类型3 使用提示模板3.1 设置环境变量3.2 PromptTemplate提示模板3.2.1 通过from_template方法3.2.2 直接生成提示模板3.2.3 使用提示模板3.2.4 复用提示模板3.3 ChatPromptTemplate聊天提示模板3.3.1 通过…

stm8s单片机(二)外部中断实验

中断优先级 stm8的中断优先级不是固定不变的&#xff0c;stm8的中断分为硬件优先级与软件优先级&#xff1b;当多个中断发生时&#xff0c;cpu会先响应软件优先级高的中断&#xff0c;若软件优先级相同会先响应硬件优先级高的&#xff1b; 其中软件优先级有四个 /*** brief …

【ubuntu 连接显示器无法显示】可以通过 ssh 连接 ubuntu 服务器正常使用,但服务器连接显示器没有输出

背景 ubuntu 服务器通过显示器进行关机&#xff0c;断开电源重新接上电源再重启时&#xff0c;服务器连接显示器不再有输出。CPU 为 AMD Ryzen 7 5800X 8-Core Processor&#xff0c;并没有显示&#xff0c;只能通过 NVDIA GPU 来显示。但是通过 nvidia-smi, nvitop 的输出, 以…

FineReport案列分析(一)

父格sql的解析 SELECT XCPX,XCPDL ,XCPZL FROM (SELECT DISTINCT XCPX,XCPDL,XCPZL FROM XPCXP_YS WHERE BM IN(经销商业务部,兴客坊) UNION(SELECTDISTINCT A.CPX AS XCPX,A.CPDL AS XCPDL,A.CPZL AS XCPZLFROM SYS.ORDER_SALEZJK BJOIN KHFL_LS L1ON L1.KSBMB.CUSTCODELEFT…

社区版Dify实现文生视频 LLM+ComfyUI+混元视频

社区版Dify实现文生视频 LLMComfyUI混元视频 一、 社区版Dify实现私有化混元视频效果二、为什么社区版Dify可以在对话框实现文生视频&#xff1f;LLMComfyUI混元视频 实现流程图&#xff08;重点&#xff09;1. 文生视频模型支持ComfyUI2. ComfyUI可以轻松导出API实现封装3. Di…

Vue3 30天精进之旅:Day01 - 初识Vue.js的奇妙世界

引言 在前端开发领域&#xff0c;Vue.js是一款极具人气的JavaScript框架。它以其简单易用、灵活高效的特性&#xff0c;吸引了大量开发者。本文是“Vue 3之30天系列学习”的第一篇博客&#xff0c;旨在帮助大家快速了解Vue.js的基本概念和核心特性&#xff0c;为后续的深入学习…

helm推送到harbor私有库--http: server gave HTTP response to HTTPS client

harbor私有库访问的是http模式 harbor 2.8版本以上可以存储helm镜像 docker镜像推送的时候需要docker端配置insecure-registries 发现helm推送只能在harbor部署的本机使用localhost才能推送成功&#xff0c;即 helm push xxx.tgz oci://localhost:80/library 使用helm pus…

AI对齐与开源发展:多学科融合创新之路

标题&#xff1a;AI对齐与开源发展&#xff1a;多学科融合创新之路 文章信息摘要&#xff1a; 本文分析了AI对齐研究中多学科融合的重要性及开源AI领域的最新发展。文章指出&#xff0c;AI对齐问题需要计算机科学与社会选择理论等学科深度结合&#xff0c;特别是在RLHF领域的应…

transformers使用过程问题

transfomers新旧版本冲突&#xff0c;和accelerate、datasets、evaluate这些库直接也经常会发生冲突 我使用了下面的版本&#xff0c;暂时没有冲突&#xff0c;如果有冲突再更新 transformers4.41.2 datasets2.20.0 accelerate0.31.0 evaluate0.4.2pip install transformers安…

svn tag

一般发布版本前&#xff0c;需要在svn上打个tag。步骤如下&#xff1a; 1、空白处右击&#xff0c;选择TortoiseSVN->Branch/tag; 2、填写To path&#xff0c;即tag的路基以及tag命名&#xff08;一般用版本号来命名&#xff09;&#xff1b;填写tag信息&#xff1b;勾选cr…

python 基础类json和csv

一、json 1.将字典转换为json字符串 2.将json字符串转化为字典 3.将字典保存为json文件 4.将json文件读取出字典格式的数据 import json # 1.将字典转化成json字符串 dict1{"张三":"zhangsan","B":"b","C":"c&quo…

【JavaSE】(8) String 类

一、String 类常用方法 1、构造方法 常用的这4种构造方法&#xff1a;直接法&#xff0c;或者传参字符串字面量、字符数组、字节数组。 在 JDK1.8 中&#xff0c;String 类的字符串实际存储在 char 数组中&#xff1a; String 类也重写了 toString 方法&#xff0c;所以可以直…

【理解工具调用的流程,本质体现了大模型智能性】

1、工具调用 调用完结果看里面tool_calls 是否为空&#xff0c;不为空就调用工具函数处理&#xff0c; 如果为空就中断循环。大模型返回的message结果智能判断是否继续调用 输入输出如下&#xff1a; 请输入&#xff1a;深圳西安天气 ------------------------------------…

Excel 实现文本拼接方法

1. 使用 & 运算符 这是最常见和简单的拼接方法。你只需使用 & 来连接多个文本单元格或文本字符串。 示例公式&#xff1a; A1 & B1这个公式会将 A1 和 B1 单元格中的文本合并为一个字符串。 如果你希望在文本之间加入分隔符&#xff08;如空格、逗号等&#xf…

C# ASP.NET MVC项目内使用ApiController

1.在App_Start文件夹新建WebApiConfig.cs文件&#xff0c;建立webApi路由的注册方法。 using System.Web.Http;namespace PrivilegeManager {public class WebApiConfig{public static void Register(HttpConfiguration config){config.MapHttpAttributeRoutes();config.Route…

【Springboot相关知识】Springboot结合SpringSecurity实现身份认证以及接口鉴权

Springboot结合SpringSecurity实现身份认证以及接口鉴权 身份认证1. 添加依赖2. 配置JWT工具类3. 配置Spring Security4. 创建JWT请求过滤器5. 创建认证控制器6. 创建请求和响应对象7. 配置UserDetailsService8. 运行应用程序9. 测试总结 接口鉴权1. 启用方法级安全注解2. 定义…

Mac开启任何来源安装配置环境变量

目录 开启任何来源配置环境变量退出保存时如果没有权限修改文件权限拓展——.bash_profile和.zshrc 开启任何来源 sudo spctl --master-disable#打开软件时提示文件损坏 sudo xattr -r -d com.apple.quarantine 进入访达应用程序拖拽应用到终端配置环境变量 cd ~ vi ~/.bash…

使用 Logback 的最佳实践:`logback.xml` 与 `logback-spring.xml` 的区别与用法

在开发 Spring Boot 项目时&#xff0c;日志是调试和监控的重要工具。Spring Boot 默认支持 Logback 作为日志系统&#xff0c;并提供了 logback.xml 和 logback-spring.xml 两种配置方式。这篇文章将详细介绍这两者的区别、各自的优缺点以及最佳实践。 目录 一、什么是 Logbac…