flink mysql数据表同步API CDC

概述:

CDC简介 Change Data Capture

API CDC同步数据代码

package com.yclxiao.flinkcdcdemo.api;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.yclxiao.flinkcdcdemo.util.JdbcUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;/*** league.oc_settle_profit -> cloud.dws_profit_record_hdj* API方式*/
public class Wfg2userApi {private static final Logger LOG = LoggerFactory.getLogger(Wfg2userApi.class);private static String MYSQL_HOST = "192.168.1.12";private static int MYSQL_PORT = 3306;private static String MYSQL_USER = "root";private static String MYSQL_PASSWD = "123456";private static String SYNC_DB = "zentao";private static List<String> SYNC_TABLES = Arrays.asList("zentao.zt_group");public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB) // set captured database.tableList(String.join(",", SYNC_TABLES)) // set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + Wfg2userApi.class.getName());List<String> tableList = getTableList();for (String tbl : tableList) {SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);SingleOutputStreamOperator<String> cleanStream = clean(filterStream);SingleOutputStreamOperator<String> logicStream = logic(cleanStream);logicStream.addSink(new CustomDealDataSink());}env.execute(Wfg2userApi.class.getName());}private static class CustomDealDataSink extends RichSinkFunction<String> {private transient Connection cloudConnection;private transient PreparedStatement cloudPreparedStatement;private String insertSql = "INSERT INTO `zentao_zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) \n" +"      VALUES (?, ?, ?, ?, ?, ?, ?, ?)";private String deleteSql = "delete from zentao_zt_group where id = '%s'";@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 在这里初始化 JDBC 连接cloudConnection = DriverManager.getConnection("jdbc:mysql://" + MYSQL_HOST + ":3306/wfg", "root", "123456");cloudPreparedStatement = cloudConnection.prepareStatement(insertSql);}@Overridepublic void invoke(String value, Context context) throws Exception {JSONObject dataJson = JSON.parseObject(value);Long id = dataJson.getLong("id");Integer project = dataJson.getInteger("project");String vision = dataJson.getString("vision");String name = dataJson.getString("name");String role = dataJson.getString("role");String desc = dataJson.getString("desc");String acl = dataJson.getString("acl");Integer developer = dataJson.getInteger("developer");cloudPreparedStatement.setLong(1, id);cloudPreparedStatement.setInt(2, project);cloudPreparedStatement.setString(3, vision);cloudPreparedStatement.setString(4, name);cloudPreparedStatement.setString(5, role);cloudPreparedStatement.setString(6, desc);cloudPreparedStatement.setString(7, acl);cloudPreparedStatement.setInt(8, developer);cloudPreparedStatement.execute(String.format(deleteSql, id));cloudPreparedStatement.execute();}@Overridepublic void close() throws Exception {super.close();// 在这里关闭 JDBC 连接cloudPreparedStatement.close();cloudConnection.close();}}/*** 处理逻辑:过滤掉部分数据** @param cleanStream* @return*/private static SingleOutputStreamOperator<String> logic(SingleOutputStreamOperator<String> cleanStream) {return cleanStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String data) throws Exception {try {
//                    JSONObject dataJson = JSON.parseObject(data);
//                    String id = dataJson.getString("id");
//                    Integer bizType = dataJson.getInteger("biz_type");
//                    if (StringUtils.isBlank(id) || bizType == null) {
//                        return false;
//                    }// 只处理上岗卡数据
//                    return bizType == 9;return true;} catch (Exception ex) {LOG.warn("filter other format binlog:{}", data);return false;}}});}/*** 清晰数据** @param source* @return*/private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {return source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String row, Collector<String> out) throws Exception {try {LOG.info("============================row:{}", row);JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");//history,insert,updateif (Arrays.asList("r", "c", "u").contains(op)) {out.collect(rowJson.getJSONObject("after").toJSONString());} else {LOG.info("filter other op:{}", op);}} catch (Exception ex) {LOG.warn("filter other format binlog:{}", row);}}});}/*** 过滤数据** @param source* @param table* @return*/private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {return source.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String row) throws Exception {try {JSONObject rowJson = JSON.parseObject(row);JSONObject source = rowJson.getJSONObject("source");String tbl = source.getString("table");return table.equals(tbl);} catch (Exception ex) {ex.printStackTrace();return false;}}});}private static List<String> getTableList() {List<String> tables = new ArrayList<>();String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";List<JSONObject> tableList = JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);for (JSONObject jsob : tableList) {String schemaName = jsob.getString("TABLE_SCHEMA");String tblName = jsob.getString("TABLE_NAME");String schemaTbl = schemaName + "." + tblName;if (SYNC_TABLES.contains(schemaTbl)) {tables.add(tblName);}}return tables;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/831898.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python学习笔记------json

json简介 JSON是一种轻量级的数据交互格式。可以按照JSON指定的格式去组织和封装数据 JSON本质上是一个带有特定格式的字符串 主要功能&#xff1a;json就是一种在各个编程语言中流通的数据格式&#xff0c;负责不同编程语言中的数据传递和交互 为了让不同的语言能够相互通…

《LTC与铁三角∶从线索到回款-人民邮电》关于铁三角不错的论述

《LTC与铁三角∶从线索到回款-人民邮电》一书中&#xff0c;关于铁三角不错的论述&#xff0c;收藏之&#xff1a;客户责任人的角色定义及核心价值 AR 的核心价值定位主要体现在三个方面&#xff1a;客户关系、 客户满意度、竞争对手 “ 压制 ” 。 维护客户关系&#x…

百川2模型解读

简介 Baichuan 2是多语言大模型&#xff0c;目前开源了70亿和130亿参数规模的模型。在公开基准如MMLU、CMMLU、GSM8K和HumanEval上的评测&#xff0c;Baichuan 2达到或超过了其他同类开源模型&#xff0c;并在医学和法律等垂直领域表现优异。此外&#xff0c;官方还发布所有预…

[数据结构]————排序总结——插入排序(直接排序和希尔排序)—选择排序(选择排序和堆排序)-交换排序(冒泡排序和快速排序)—归并排序(归并排序)

文章涉及具体代码gitee&#xff1a; 登录 - Gitee.com 目录 1.插入排序 1.直接插入排序 总结 2.希尔排序 总结 2.选择排序 1.选择排序 ​编辑 总结 2.堆排序 总结 3.交换排序 1.冒泡排序 总结 2.快速排序 总结 4.归并排序 总结 5.总的分析总结 1.插入排…

Unity---版本控制软件

13.3 版本控制——Git-1_哔哩哔哩_bilibili Git用的比较多 Git 常用Linux命令 pwd&#xff1a;显示当前所在路径 ls&#xff1a;显示当前路径下的所有文件 tab键自动补全 cd&#xff1a;切换路径 mkdir&#xff1a;在当前路径下创建一个文件夹 clear&#xff1a;清屏 vim…

Linux的socket详解

一、本机直接的进程通信方式 管道&#xff08;Pipes&#xff09;&#xff1a; 匿名管道&#xff08;Anonymous pipes&#xff09;&#xff1a;通常用于父子进程间的通信&#xff0c;它是单向的。命名管道&#xff08;Named pipes&#xff0c;也称FIFO&#xff09;&#xff1a;允…

微星主板安装双系统不能进入Ubuntu的解决办法

在微星主板的台式机上面依次安装了Windows11和Ubuntu22.04。在Ubuntu安装完成后重启&#xff0c;没有出现系统选择界面&#xff0c;直接进入了Windows11。怎么解决&#xff1f;方法如下&#xff1a; &#xff08;1&#xff09;正常安装Windows11 &#xff08;2&#xff09;安…

《自动机理论、语言和计算导论》阅读笔记:p352-P401

《自动机理论、语言和计算导论》学习第 12 天&#xff0c;p352-P401总结&#xff0c;总计 50 页。 一、技术总结 1.Turing Machine ™ 2.undecidability ​ a.Ld(the diagonalization language) 3.reduction p392, In general, if we have an algorithm to convert insta…

Git系列:config 配置

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

Java中的枚举类型介绍

一、背景及定义 情景&#xff1a; 枚举是在JDK1.5以后引入的。 主要用途是&#xff1a; 将一组常量组织起来&#xff0c;在这之前表示一组常量通常使用定义常量的方式&#xff1a; 这种定义方式实际上并不好。 例如&#xff1a;如果碰巧有另一个变量也是1&#xff0c;那么…

笔记85:如何计算递归算法的“时间复杂度”和空间复杂度?

先上公式&#xff1a; 递归算法的时间复杂度 递归次数 x 每次递归消耗的时间颗粒数递归算法的空间复杂度 递归深度 x 每次递归消耗的内存空间大小 注意&#xff1a; 时间复杂度指的是在执行这一段程序的时候&#xff0c;所花费的全部的时间&#xff0c;即时间的总和而空间复…

以太网基础-IP、ICMP、ARP协议

一、IP协议 参考&#xff1a;rfc791.txt.pdf (rfc-editor.org) IP协议&#xff08;Internet Protocol&#xff09;是TCP/IP协议族中最核心的协议&#xff0c;提供不可靠的、无连接的、尽力而为的数据报传输服务。 IP报文数据头如下 Version&#xff1a;4bit&#xff0c;4表示…

网络模型与调试

网络模型 网络的体系结构 ● 网络采用分而治之的方法设计&#xff0c;将网络的功能划分为不同的模块&#xff0c;以分层的形式有机组合在一起。 ● 每层实现不同的功能&#xff0c;其内部实现方法对外部其他层次来说是透明的。每层向上层提供服务&#xff0c;同时使用下层提供…

Elasticsearch:如何使用 Java 对索引进行 ES|QL 的查询

在我之前的文章 “Elasticsearch&#xff1a;对 Java 对象的 ES|QL 查询”&#xff0c;我详细介绍了如何使用 Java 来对 ES|QL 进行查询。对于不是很熟悉 Elasticsearch 的开发者来说&#xff0c;那篇文章里的例子还是不能单独来进行运行。在今天的这篇文章中&#xff0c;我来详…

MySQL CRUD进阶

前言&#x1f440;~ 上一章我们介绍了CRUD的一些基础操作&#xff0c;关于如何在表里进行增加记录、查询记录、修改记录以及删除记录的一些基础操作&#xff0c;今天我们学习CRUD&#xff08;增删改查&#xff09;进阶操作 如果各位对文章的内容感兴趣的话&#xff0c;请点点小…

【网络编程下】五种网络IO模型

目录 前言 一.I/O基本概念 1.同步和异步 2.阻塞和非阻塞 二.五种网络I/O模型 1.阻塞I/O模型 2.非阻塞式I/O模型 ​编辑 3.多路复用 4.信号驱动式I/O模型 5. 异步I/O模型 三.五种I/O模型比较​编辑 六.I/O代码示例 1. 阻塞IO 2.非阻塞I/O 3.多路复用 (1)select …

Electron 对 SQLite 进行加密

上一篇讲了如何在 Electron使用 SQLite&#xff0c;如果 SQLite 中存有敏感数据&#xff0c;客户端采用明文存储风险很高&#xff0c;为了保护客户数据&#xff0c;就需要对数据进行加密&#xff0c;由于 electron 对代码并不加密&#xff0c;所以这里排除通过逆向工程进行数据…

想要快速接收的看过来:Cell旗下毕业神刊,中科院二区、平均审稿周期1个月,冲!

我是娜姐 迪娜学姐 &#xff0c;一个SCI医学期刊编辑&#xff0c;探索用AI工具提效论文写作和发表。 就是它了&#xff0c;Cell旗下全OA期刊iScience。影响因子5.8分&#xff0c;中科院二区&#xff0c;年发文量逐年上涨&#xff0c;2023年发文近3000篇&#xff0c;2024年第一季…

一周学会Django5 Python Web开发 - Django5 ModelForm表单定义与使用

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计51条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

一种算法分类方式及其应用

在计算机科学领域&#xff0c;算法是解决问题的有效方法&#xff0c;而对算法进行分类有助于理解它们的特性、优劣以及在不同场景下的应用。常见的算法分类方法&#xff0c;包括按设计思想、问题类型、数据结构和应用领域等&#xff0c;每一类算法会对应有其典型和实际应用。 算…