FlinkCDC for mysql to Clickhouse

完整依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.0</version></dependency><!--       <dependency>-->
<!--           <groupId>org.apache.flink</groupId>-->
<!--           <artifactId>flink-jdbc_2.12</artifactId>-->
<!--           <version>1.10.3</version>-->
<!--       </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.0</version><type>test-jar</type></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.4.0</version></dependency><dependency><groupId>com.aliyun</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.12.0</version></dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.6</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency></dependencies>

Flink CDC

package name.lijiaqi.cdc;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.HashMap;public class MySqlBinlogSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("test").username("flinkcdc").password("dafei1288").deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 添加 sourceenv.addSource(sourceFunction)// 添加 sink.addSink(new ClickhouseSink());env.execute("mysql2clickhouse");}// 将cdc数据反序列化public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {Gson jsstr = new Gson();HashMap<String, Object> hs = new HashMap<>();String topic = sourceRecord.topic();String[] split = topic.split("[.]");String database = split[1];String table = split[2];hs.put("database",database);hs.put("table",table);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//获取数据本身Struct struct = (Struct)sourceRecord.value();Struct after = struct.getStruct("after");if (after != null) {Schema schema = after.schema();HashMap<String, Object> afhs = new HashMap<>();for (Field field : schema.fields()) {afhs.put(field.name(), after.get(field.name()));}hs.put("data",afhs);}String type = operation.toString().toLowerCase();if ("create".equals(type)) {type = "insert";}hs.put("type",type);collector.collect(jsstr.toJson(hs));}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}public static class ClickhouseSink extends RichSinkFunction<String>{Connection connection;PreparedStatement pstmt;private Connection getConnection() {Connection conn = null;try {Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String url = "jdbc:clickhouse://localhost:8123/default";conn = DriverManager.getConnection(url,"default","dafei1288");} catch (Exception e) {e.printStackTrace();}return conn;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";pstmt = connection.prepareStatement(sql);}// 每条记录插入时调用一次public void invoke(String value, Context context) throws Exception {//{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}Gson t = new Gson();HashMap<String,Object> hs = t.fromJson(value,HashMap.class);String database = (String)hs.get("database");String table = (String)hs.get("table");String type = (String)hs.get("type");if("test".equals(database) && "test_cdc".equals(table)){if("insert".equals(type)){System.out.println("insert => "+value);LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");String name = (String)data.get("name");String description = (String)data.get("description");Double id = (Double)data.get("id");// 未前面的占位符赋值pstmt.setInt(1, id.intValue());pstmt.setString(2, name);pstmt.setString(3, description);pstmt.executeUpdate();}}}@Overridepublic void close() throws Exception {super.close();if(pstmt != null) {pstmt.close();}if(connection != null) {connection.close();}}}
}

Flink SQL CDC

package name.lijiaqi.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class MysqlToMysqlMain {public static void main(String[] args) throws Exception {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 数据源表String sourceDDL ="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'username' = 'flinkcdc',\n" +" 'password' = 'dafei1288',\n" +" 'database-name' = 'test',\n" +" 'table-name' = 'test_cdc'\n" +")";String url = "jdbc:mysql://127.0.0.1:3306/test";String userName = "root";String password = "dafei1288";String mysqlSinkTable = "test_cdc_sink";// 输出目标表String sinkDDL ="CREATE TABLE test_cdc_sink (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING,\n" +" PRIMARY KEY (id) NOT ENFORCED \n " +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'driver' = 'com.mysql.jdbc.Driver',\n" +" 'url' = '" + url + "',\n" +" 'username' = '" + userName + "',\n" +" 'password' = '" + password + "',\n" +" 'table-name' = '" + mysqlSinkTable + "'\n" +")";// 简单的聚合处理String transformSQL ="insert into test_cdc_sink select * from mysql_binlog";tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);TableResult result = tableEnv.executeSql(transformSQL);// 等待flink-cdc完成快照result.print();env.execute("sync-flink-cdc");}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/113007.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GRASP 、SOLID 与 GoF 设计模式

一、GRASP GRASP&#xff1a;通用职责分配软件设计模式(General Responsibility Assignment Software Patterns)&#xff0c;其主要思想是基于单一职责设计软件对象。 思考软件对象设计以及大型构件的流行方式是&#xff0c;考虑其职责、角色和协作。这是被称为职责驱动设计&a…

基于CNN实现谣言检测 - python 深度学习 机器学习 计算机竞赛

文章目录 1 前言1.1 背景 2 数据集3 实现过程4 CNN网络实现5 模型训练部分6 模型评估7 预测结果8 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于CNN实现谣言检测 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&am…

高速DSP系统设计参考指南(七)电磁干扰基础

&#xff08;七&#xff09;电磁干扰基础 1.概述2.EMI概述3.数字信号4.电流环路5.电源6.传输线7.电源层和地层8. 减少电磁干扰经验法则9.总结 1.概述 高速DSP系统中的辐射是由通过印刷电路板走线传播的快速开关电流和电压引起的。随着DSP速度的提高&#xff0c;印刷电路板走线…

C++ 字符串编码转换封装函数,UTF-8编码与本地编码互转

简介 字符串编码转换封装函数&#xff0c;UTF-8编码与本地编码互转。 中文乱码的解决方法 有时候我们会遇到乱码的字符串&#xff0c;比如&#xff1a; 古文码 可能是用GBK方式读取UTF-8编码的中文导致的&#xff0c;用下面的Utf8ToLocal(string str)函数转换一下就可以了。…

017 基于Spring Boot的食堂管理系统

部分代码地址&#xff1a; https://github.com/XinChennn/xc017-stglxt 基于Spring Boot的食堂管理系统 项目介绍 本项目是基于Java的管理系统。采用前后端分离开发。前端基于bootstrap框架实现&#xff0c;后端使用Java语言开发&#xff0c;技术栈包括但不限于SpringBoot、…

Filter与Listener(过滤器与监听器)

1.Filter 1.过滤器概述 过滤器——Filter&#xff0c;它是JavaWeb三大组件之一。另外两个是Servlet和Listener 它可以对web应用中的所有资源进行拦截&#xff0c;并且在拦截之后进行一些特殊的操作 在程序中访问服务器资源时&#xff0c;当一个请求到来&#xff0c;服务器首…

unity操作_碰撞器 c#

碰撞器Collider 在场景中选择一个物体Cube 观察检查器Inspector 自带Cube会默认挂载盒子碰撞器Box Colilider 增加组件可以增加更多中碰撞器 Edit Collider 编辑碰撞器形状 Is Trigger选项 Is Trigger &#xff1a;是否是触发器&#xff0c;如果启用此属性 则该碰撞体将用于触…

Java面试题-Java核心基础-第七天(String)

目录 一、String、StringBuffer、StringBuilder的区别 二、String为什么是不可变的 三、字符串拼接用""还是用StringBuilder 四、String 中的equals和Object中的equals的区别 五、字符串常量池的作用了解吗&#xff1f; 六、String s1 new String("abc&qu…

数据挖掘(6)聚类分析

一、什么是聚类分析 1.1概述 无指导的&#xff0c;数据集中类别未知类的特征&#xff1a; 类不是事先给定的&#xff0c;而是根据数据的相似性、距离划分的聚类的数目和结构都没有事先假定。挖掘有价值的客户: 找到客户的黄金客户ATM的安装位置 1.2区别 二、距离和相似系数 …

状态空间方程的离散化

一、理论基础 1、系统离散化&#xff08;传递函数和状态空间方程&#xff09; 【离散系统】传递函数和状态空间方程离散化-CSDN博客 状态空间方程的离散化 - 知乎 (zhihu.com) 2、差分方程的建立与分析 【精选】数学建模之差分方程模型详解_左手の明天的博客-CSDN博客 【信…

UITesting 界面测试

1. 创建界面测试视图 UITestingBootcampView.swift import SwiftUI/// 界面测试 ViewModel class UITestingBootcampViewModel: ObservableObject{let placeholderText: String "Add name here..."Published var textFiledText: String ""Published var…

2023年中国一次性医用内窥镜市场发展现状分析:相关产品进入上市高峰期[图]

基于对减少交叉感染风险和维护成本的需求等因素&#xff0c;一种新兴的、耗材化的一次性内窥镜可以避免因重复使用产品而导致的感染问题和高额的清洗消毒费用&#xff0c;从而提高患者的安全性并帮助医疗机构节省运营成本。 一次性和可重复使用医用内窥镜特点对比 资料来源&am…

Android 指定有线网或Wifi进行网络请求

Android 指定有线网或Wifi进行网络请求 文章目录 Android 指定有线网或Wifi进行网络请求一、前言&#xff1a;二、指定网络通讯测试1、 窗口命令 ping -I 网络节点 IP2、Java 代码指定特定网络通讯 三、指定特定网络的demo app 开发1、效果图&#xff1a;2、实际测试结果说明&a…

记录nfc.listenNFCStatus多次刷卡后会重复调用接口

当你使用nfc.listenNFCStatus在UniApp中监听NFC&#xff08;Near Field Communication&#xff09;状态时&#xff0c;可能会出现多次刷卡后接口被重复调用的情况。这通常发生是因为在多次刷卡后&#xff0c;NFC状态变化多次触发了监听事件。要解决这个问题&#xff0c;你可以使…

Nginx负载均衡反向代理动静分离

文章目录 nginx负载均衡&反向代理&动静分离环境说明部署动静分离1.主机lnmp部署一个动态页面&#xff0c;在此以discuz论坛系统为例2.主机n1部署两个静态页面访问动、静态页面 配置负载均衡配置反向代理访问测试 nginx负载均衡&反向代理&动静分离 环境 主机名…

链块串的实现(无功能函数实现)

String.h #pragma once #include <stdlib.h> #include <assert.h> #include <string.h> #include <errno.h> #include <iostream> using namespace std; #define SIZE 5 #define EFF_SIZE (SIZE -1)//插入的有效字符 typedef char elemType;typ…

【LINUX】1-移植NXP提供的源码

一、在Linux中添加自己的开发板 defconfig配置文件&#xff1a;一个就是imx6ull_alientek_emmc_defconfig默认配置文件 # 复制一份NXP 官方的SDK cd arch/arm/configs cp imx_v7_mfg_defconfig imx_alientek_emmc_defconfig 设备树&#xff1a;imx6ull-alientek-emmc.d…

【Arduino32】PWM控制直流电机速度

硬件准备 震动传感器&#xff1a;1个 红黄绿LED灯&#xff1a;各一个 旋钮电位器&#xff1a;1个 直流电机&#xff1a;1个 1K电阻&#xff1a;1个 220欧电阻&#xff1a;3个 杜邦线&#xff1a;若干 硬件连线 软件程序 const int analogInPin A0;//PWM输入引脚 const…

C/C++基础

C 二进制 问题&#xff1a;二进制怎么表示整数、小数、正数、负数&#xff0c;如何存储&#xff1f;加减乘除怎么运算&#xff08;见文章《计算机加减乘除本质》&#xff09;&#xff1f; 变量 c定义一个变量的时候&#xff0c;需要事先定义变量大小和变量类型。 //有符号…

vue通知(滚动)

1. li宽度不顾定 <template><div id"app"><div id"box" mouseover"clearLeft" mouseleave"setLeft"><ul :style"{ transform: translateX( left px) }" ref"cmdlist"><li v-for&qu…