速通FlinkCDC3.0

1.FlinkCDC概述

1.1FlinkCDC是什么?

        FlinkCDC(Flink Change Data Capture)是一个用于实时捕获数据库变更日志的工具,它可以将数据库的变更实时同步到ApacheFlink系统中。

1.2 FlinkCDC的三个版本?

        1.x 这个版本的FlinkCDC的提供了DataStream以及FlinkSQL的方式实现数据的动态获取

        存在的问题:

        就是在生产环境中,我们在同步环境的时候,万一前脚刚同步了数据,后脚就被修改了怎么办呢?这样我们不就是读到了,不正确的数据了。因此,在FlinkCDC1.x的版本中的解决方案是,在读表的过程中,锁住整张表,这时候不会有新的数据写入了。但是由此又带来了一个新的问题,生产环境中时时刻刻就是会有新的数据写入的,如果锁住整张表,就会对线上产生很多问题。所以迎来了2.x版本。

        2.x 这个版本提供了丰富的数据库对接以及增加全量的同步锁表的解决问题的解决方案。

        提供API或者FlinkSql去进行操作,打包代码上传到集群去进行操作,但是我们的本身任务并不复杂,就是一个导数据的任务,所以需要更简单的方法去实现数据导入的作用。

        3.x 这个版本提供了StreamingETL方式导入数据方案。 

        FlinkCDC在这个版本形成了自己的框架,可以像平时的那些框架文件如果spark,hadoop一样又bin,conf等文件夹,所以我们在使用FlinkCDC的时候就是可以直接在Conf中配置Resource(要导入的数据库)sink(目标文件),可以通过命令启动来进行同步。

顺带提一下两种CDC的同步方式:

        CDC一种是通过查询的方式和通过Binlog两种方式,简单说一下两种的不同

cdc对比
基于查询的CDC基于binlog的cdc
产品

Sqoop、DataX

Canal
执行模式BatchStreaming
是否可以检测到所有变化否(同步最终态)
延迟性延迟高(按天进行同步)低延迟
增加数据库压力

2.flinkCDC 同步mysql数据库数据到doris

        2.1 环境准备

           1)安装FlinkCDC

               flinkCDC下载地址 https://pan.baidu.com/s/1_BKPxommK5dsY3hD7rYVUA 提取码: pisv 

tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/

        2)向FlinkCDC的目录下的lib目录下传入Mysql以及Doris 的依赖包

        doris的jar包
        https://pan.baidu.com/s/1pgtsYT9VyXD1U4RbjYA6rg 提取码: kx2q 
        mysql的jar包
        https://pan.baidu.com/s/1pxCy0-iSutqN9YjdzGAfZw 提取码: p65d 
        还需要一个mysql-connector的jar
        https://pan.baidu.com/s/1lzJuQRPL3KtDqDXaoGBSMQ 提取码: dwnw 

        为啥还需要已经有了mysql的jar包了还需要一个mysql-connector?

        是因为mysql的jar包依赖于mysql-connector,

        为啥不封装在一起?

        首先来说就是MySQL的jar相当于对数据库一个能力的封装底层可以用别的connector,也是为了解耦,还有一个原因就是两个包所用的协议不一样,上面的这个msyql的jar包是用的apache的协议。  

        为啥用了mysql的驱动包,不用doris的驱动包?

        因为doris兼容mysql的协议。

        2.2 同步变更

        编写 MySQL同步到doris的配置文件

        可以选择在FlinkCDC中创建一个单独的文件夹写配置文件,也可以写在conf的目录下。

vim mysql-to-doris.yaml

source:

 #数据源的数据库类型

  type: mysql

 #地址/主机名称

  hostname: hadoop103

 #端口号

  port: 3306

 #数据库用户名

  username: root

  #数据库密码

  password: "000000"

  #要同步的表名

  tables: test.\.*

  #ServerID 下面详细解释

  server-id: 5400-5404

 #时区

  server-time-zone: UTC+8

sink:

  #目标数据库类型

  type: doris

  #目标数据库物理存储主机名加端口号

  fenodes: hadoop102:7030

  #数据库用户名

  username: root

  #数据库密码

  password: "000000"

 #是否同步表的初始变化,就是类似新增字段之类的

  table.create.properties.light_schema_change: true

 #副本数

  table.create.properties.replication_num: 1

pipeline:

  #任务名称

  name: Sync MySQL Database to Doris

  #并行任务数量

  parallelism: 1

server-id 的作用:
MySQL复制标识:在MySQL主从复制中,每个从库必须有一个唯一的server-id来标识自己。同样,当你的CDC工具连接MySQL时,它实际上扮演了一个MySQL从库的角色,通过binlog来获取数据变更。

避免冲突:如果你有多个CDC工具或从库连接同一个MySQL主库,每个实例必须有不同的server-id,否则会导致冲突和数据不一致。

        这种配置方式通常在分布式或并行环境中使用,允许多个任务实例使用不同的server-id(在你的配置中parallelism: 4表示并行度为4,所以需要4个不同的server-id)。

        启动环境

        1)开启Flink集群

        首先要添加如下配置

vim conf/flink-conf.yaml

添加如下配置信息

execution.checkpointing.interval: 5000

#启动集群
bin/start-cluster.sh

        2)开启doris的FE

bin/start_fe.sh

        3)  开启Doris的BE

bin/start_be.sh

        4)启动FlinkCDC同步变更任务

        尚硅谷给的是这个命令,但是我用这个命令不行

flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris.yaml

        我用的这个可以

bin/flink-cdc.sh config/你的配置文件 --jar lib/mysql....

        然后刷新数据库观察结果

        以上情况适用于就是我们的主库mysql的数据库名.表名,在doris中的数据库名.表名是一样。如果doris中的表名不一样就用到下面的路由变更。

2.3路由变更

ource:

  type: mysql

  hostname: hadoop103

  port: 3306

  username: root

  password: "000000"

  tables: test_route.\.*

  server-id: 5400-5404

  server-time-zone: UTC+8

sink:

  type: doris

  fenodes: hadoop102:7030

  benodes: hadoop102:7040

  username: root

  password: "000000"

  table.create.properties.light_schema_change: true

  table.create.properties.replication_num: 1

#增加了路由规则

route:

  - source-table: test_route.t1

    sink-table: doris_test_route.doris_t1

  - source-table: test_route.t2

    sink-table: doris_test_route.doris_t1

  - source-table: test_route.t3

    sink-table: doris_test_route.doris_t3

pipeline:

  name: Sync MySQL Database to Doris

  parallelism: 1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/77677.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

B+树节点与插入操作

B树节点与插入操作 设计B树节点 在设计B树的数据结构时,我们首先需要定义节点的格式,这将帮助我们理解如何进行插入、删除以及分裂和合并操作。以下是对B树节点设计的详细说明。 节点格式概述 所有的B树节点大小相同,这是为了后续使用自由…

C# 检查字符串是否包含在另一个字符串中

string shopList "我是大浪,你的小狼"; this.ShopId"你的小狼"; bool existsShopId false; if (!string.IsNullOrEmpty(shopList)) {existsShopId shopList.Split(,).Any(part > part.Trim() this.ShopId); }检查 goodsIdSet 中的每个元素是否都在 …

珈和科技遥感赋能农业保险创新 入选省级卫星应用示范标杆

为促进空天信息与数字经济深度融合,拓展卫星数据应用场景价值,提升卫星数据应用效能和用户体验,加速卫星遥感技术向民生领域转化应用,近日,湖北省国防科工办组织开展了2024年湖北省卫星应用示范项目遴选工作。 经多渠…

深入理解 React 组件的生命周期:从创建到销毁的全过程

React 作为当今最流行的前端框架之一,其组件生命周期是每个 React 开发者必须掌握的核心概念。本文将全面剖析 React 组件的生命周期,包括类组件的各个生命周期方法和函数组件如何使用 Hooks 模拟生命周期行为,帮助开发者编写更高效、更健壮的…

缓存 --- Redis性能瓶颈和大Key问题

缓存 --- Redis性能瓶颈和大Key问题 内存瓶颈网络瓶颈CPU 瓶颈持久化瓶颈大key问题优化方案 Redis 是一个高性能的内存数据库,但在实际使用中,可能会在内存、网络、CPU、持久化、大键值对等方面遇到性能瓶颈。下面从这些方面详细分析 Redis 的性能瓶颈&a…

Python爬虫与代理IP:高效抓取数据的实战指南

目录 一、基础概念解析 1.1 爬虫的工作原理 1.2 代理IP的作用 二、环境搭建与工具选择 2.1 Python库准备 2.2 代理IP选择技巧 三、实战步骤分解 3.1 基础版:单线程免费代理 3.2 进阶版:多线程付费代理池 3.3 终极版:Scrapy框架自动…

Nginx HTTP 414 与“大面积”式洪水攻击联合防御实战

一、引言 在大规模分布式应用中,Nginx 常作为前端负载均衡和反向代理服务器。攻击者若结合超长 URI/头部攻击(触发 HTTP 414)与海量洪水攻击,可在网络层与应用层形成双重打击:一方面耗尽缓冲区和内存,另一…

【上位机——MFC】运行时类信息机制

运行时类信息机制的使用 类必须派生自CObject类内必须添加声明宏DECLARE_DYNAMIC(theClass)3.类外必须添加实现宏 IMPLEMENT_DYNAMIC(theClass,baseClass) 具备上述三个条件后&#xff0c;CObject::IsKindOf函数就可以正确判断对象是否属于某个类。 代码示例 #include <…

Maven插件管理的基本原理

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

卷积神经网络--手写数字识别

本文我们通过搭建卷积神经网络模型&#xff0c;实现手写数字识别。 pytorch中提供了手写数字的数据集 &#xff0c;我们可以直接从pytorch中下载 MNIST中包含70000张手写数字图像&#xff1a;60000张用于训练&#xff0c;10000张用于测试 图像是灰度的&#xff0c;28x28像素 …

大文件分片上传进阶版(新增md5校验、上传进度展示、并行控制,智能分片、加密上传、断点续传、自动重试),实现四位一体的网络感知型大文件传输系统‌

上篇文章我们总结了大文件分片上传的主要核心&#xff0c;但是我对md5校验和上传进度展示这块也比较感兴趣&#xff0c;所以在deepseek的帮助下&#xff0c;扩展了一下我们的代码&#xff0c;如果有任何问题和想法&#xff0c;非常欢迎大家在评论区与我交流&#xff0c;我需要学…

C# 点击导入,将需要的参数传递到弹窗的页面

点击导入按钮&#xff0c;获取本页面的datagridview标题的结构&#xff0c;并传递到导入界面。 新增一个datatable用于存储datagridview的caption和name&#xff0c;这里用的是devexpress组件中的gridview。 DataTable dt new DataTable(); DataColumn CAPTION …

android的 framework 是什么

Android的Framework&#xff08;框架&#xff09;是Android系统的核心组成部分&#xff0c;它为开发者提供了一系列的API&#xff08;应用程序编程接口&#xff09;&#xff0c;使得开发者能够方便地创建各种Android应用。以下是关于它的详细介绍&#xff1a; 位置与架构 在A…

【MySQL】表的约束(主键、唯一键、外键等约束类型详解)、表的设计

目录 1.数据库约束 1.1 约束类型 1.2 null约束 — not null 1.3 unique — 唯一约束 1.4 default — 设置默认值 1.5 primary key — 主键约束 自增主键 自增主键的局限性&#xff1a;经典面试问题&#xff08;进阶问题&#xff09; 1.6 foreign key — 外键约束 1.7…

数据结构-C语言版本(三)栈

数据结构中的栈&#xff1a;概念、操作与实战 第一部分 栈分类及常见形式 栈是一种遵循后进先出(LIFO, Last In First Out)原则的线性数据结构。栈主要有以下几种实现形式&#xff1a; 1. 数组实现的栈&#xff08;顺序栈&#xff09; #define MAX_SIZE 100typedef struct …

如何以特殊工艺攻克超薄电路板制造难题?

一、超薄PCB的行业定义与核心挑战 超薄PCB通常指厚度低于1.0毫米的电路板&#xff0c;而高端产品可进一步压缩至0.4毫米甚至0.2毫米以下。这类电路板因体积小、重量轻、热传导性能优异&#xff0c;被广泛应用于折叠屏手机、智能穿戴设备、医疗植入器械及新能源汽车等领域。然而…

AI 赋能 3D 创作!Tripo3D 全功能深度解析与实操教程

大家好&#xff0c;欢迎来到本期科技工具分享&#xff01; 今天要给大家带来一款革命性的 AI 3D 模型生成平台 ——Tripo3D。 无论你是游戏开发者、设计师&#xff0c;还是 3D 建模爱好者&#xff0c;只要想降低创作门槛、提升效率&#xff0c;这款工具都值得深入了解。 接下…

如何理解抽象且不易理解的华为云 API?

API的概念在华为云的使用中非常抽象&#xff0c;且不容易理解&#xff0c;用通俗的语言 形象的比喻来讲清楚——什么是华为云 API&#xff0c;怎么用&#xff0c;背后原理&#xff0c;以及主要元素有哪些&#xff0c;尽量让新手也能明白。 &#x1f9e0; 一句话先理解&#xf…

第 7 篇:总结与展望 - 时间序列学习的下一步

第 7 篇&#xff1a;总结与展望 - 时间序列学习的下一步 (图片来源: Guillaume Hankenne on Pexels) 恭喜你&#xff01;如果你一路跟随这个系列走到了这里&#xff0c;那么你已经成功地完成了时间序列分析的入门之旅。我们从零开始&#xff0c;一起探索了时间数据的基本概念、…

PPT无法编辑怎么办?原因及解决方法全解析

在日常办公中&#xff0c;我们经常会遇到需要编辑PPT的情况。然而&#xff0c;有时我们会发现PPT文件无法编辑&#xff0c;这可能由多种原因引起。今天我们来看看PPT无法编辑的几种常见原因&#xff0c;并提供实用的解决方法&#xff0c;帮助你轻松应对。 原因1&#xff1a;文…