< dependencies> < ! -- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-core< /artifactId> < version> 1.13 .0 <> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-streaming-java_2.12 <> < version> 1.13 .0 <> < /dependency> < ! --       < dependency> --> 
< ! --           < groupId> org.apache.flink< /groupId> --> 
< ! --           < artifactId> flink-jdbc_2.12 <> --> 
< ! --           < version> 1.10 .3 <> --> 
< ! --       < /dependency> --> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-connector-jdbc_2.12 <> < version> 1.13 .0 <> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-java< /artifactId> < version> 1.13 .0 <> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-clients_2.12 <> < version> 1.13 .0 <> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-api-java-bridge_2.12 <> < version> 1.13 .0 <> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-common< /artifactId> < version> 1.13 .0 <> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-planner_2.12 <> < version> 1.13 .0 <> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-planner-blink_2.12 <> < version> 1.13 .0 <> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-planner-blink_2.12 <> < version> 1.13 .0 <> < type> test-jar< /type> < /dependency> < dependency> < groupId> com.alibaba.ververica< /groupId> < artifactId> flink-connector-mysql-cdc< /artifactId> < version> 1.4 .0 <> < /dependency> < dependency> < groupId> com.aliyun< /groupId> < artifactId> flink-connector-clickhouse< /artifactId> < version> 1.12 .0 <> < /dependency> < dependency> < groupId> ru.yandex.clickhouse< /groupId> < artifactId> clickhouse-jdbc< /artifactId> < version> 0.2 .6 <> < /dependency> < dependency> < groupId> com.google.code.gson< /groupId> < artifactId> gson< /artifactId> < version> 2.8 .6 <> < /dependency> < /dependencies> 
package name.lijiaqi.cdc; import  com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; 
import  com.google.gson.Gson; 
import  com.google.gson.internal.LinkedTreeMap; 
import  io.debezium.data.Envelope; 
import  org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
import  org.apache.flink.api.common.typeinfo.TypeInformation; 
import  org.apache.flink.configuration.Configuration; 
import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import  org.apache.flink.streaming.api.functions.sink.RichSinkFunction; 
import  org.apache.flink.streaming.api.functions.source.SourceFunction; 
import  com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; 
import  org.apache.flink.util.Collector; 
import  org.apache.kafka.connect.source.SourceRecord; import  org.apache.kafka.connect.data.Field; 
import  org.apache.kafka.connect.data.Schema; 
import  org.apache.kafka.connect.data.Struct; import  java.sql.Connection; 
import  java.sql.DriverManager; 
import  java.sql.PreparedStatement; 
import  java.util.HashMap; public class MySqlBinlogSourceExample { public static void main( String[ ]  args)  throws Exception { SourceFunction< String>  sourceFunction =  MySQLSource.< String> builder( ) .hostname( "localhost" ) .port( 3306 ) .databaseList( "test" ) .username( "flinkcdc" ) .password( "dafei1288" ) .deserializer( new JsonDebeziumDeserializationSchema( )) .build( ) ; StreamExecutionEnvironment env  =  StreamExecutionEnvironment.getExecutionEnvironment( ) ; // 添加 source env.addSource( sourceFunction) // 添加 sink.addSink( new ClickhouseSink( )) ; env.execute( "mysql2clickhouse" ) ; } // 将cdc数据反序列化public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema { @Overridepublic void deserialize( SourceRecord sourceRecord, Collector collector)  throws Exception { Gson jsstr =  new Gson( ) ; HashMap< String, Object>  hs =  new HashMap<> ( ) ; String topic =  sourceRecord.topic( ) ; String[ ]  split  =  topic.split( "[.]" ) ; String database =  split[ 1 ] ; String table =  split[ 2 ] ; hs.put( "database" ,database) ; hs.put( "table" ,table) ; //获取操作类型Envelope.Operation operation =  Envelope.operationFor( sourceRecord) ; //获取数据本身Struct struct =  ( Struct) sourceRecord.value( ) ; Struct after =  struct.getStruct( "after" ) ; if  ( after !=  null)  { Schema schema =  after.schema( ) ; HashMap< String, Object>  afhs =  new HashMap<> ( ) ; for  ( Field field :  schema.fields( ))  { afhs.put( field.name( ) , after.get( field.name( )) ) ; } hs.put( "data" ,afhs) ; } String type  =  operation.toString( ) .toLowerCase( ) ; if  ( "create" .equals( type))  { type  =  "insert" ; } hs.put( "type" ,type) ; collector.collect( jsstr.toJson( hs)) ; } @Overridepublic TypeInformation< String>  getProducedType ( )  { return  BasicTypeInfo.STRING_TYPE_INFO; } } public static class ClickhouseSink extends RichSinkFunction< String> { Connection connection; PreparedStatement pstmt; private Connection getConnection ( )  { Connection conn =  null; try { Class.forName( "ru.yandex.clickhouse.ClickHouseDriver" ) ; String url =  "jdbc:clickhouse://localhost:8123/default" ; conn =  DriverManager.getConnection( url,"default" ,"dafei1288" ) ; }  catch ( Exception e)  { e.printStackTrace( ) ; } return  conn; } @Overridepublic void open( Configuration parameters)  throws Exception { super.open( parameters) ; connection =  getConnection( ) ; String sql =  "insert into sink_ch_test(id,name,description) values (?,?,?)" ; pstmt =  connection.prepareStatement( sql) ; } // 每条记录插入时调用一次public void invoke( String value, Context context)  throws Exception { //{ "database" : "test" ,"data" :{ "name" : "jacky" ,"description" : "fffff" ,"id" :8} ,"type" : "insert" ,"table" : "test_cdc" } Gson t =  new Gson( ) ; HashMap< String,Object>  hs =  t.fromJson( value,HashMap.class) ; String database =  ( String) hs.get( "database" ) ; String table =  ( String) hs.get( "table" ) ; String type  =  ( String) hs.get( "type" ) ; if( "test" .equals( database)  &&  "test_cdc" .equals( table)) { if( "insert" .equals( type)) { System.out.println( "insert => " +value) ; LinkedTreeMap< String,Object>  data =  ( LinkedTreeMap< String,Object> ) hs.get( "data" ) ; String name =  ( String) data.get( "name" ) ; String description =  ( String) data.get( "description" ) ; Double id  =  ( Double) data.get( "id" ) ; // 未前面的占位符赋值pstmt.setInt( 1 , id.intValue( )) ; pstmt.setString( 2 , name) ; pstmt.setString( 3 , description) ; pstmt.executeUpdate( ) ; } } } @Overridepublic void close( )  throws Exception { super.close( ) ; if( pstmt !=  null)  { pstmt.close( ) ; } if( connection !=  null)  { connection.close( ) ; } } } 
} 
package name.lijiaqi.cdc; import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import  org.apache.flink.table.api.EnvironmentSettings; 
import  org.apache.flink.table.api.SqlDialect; 
import  org.apache.flink.table.api.TableResult; 
import  org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class MysqlToMysqlMain { public static void main( String[ ]  args)  throws Exception { EnvironmentSettings fsSettings =  EnvironmentSettings.newInstance( ) .useBlinkPlanner( ) .inStreamingMode( ) .build( ) ; StreamExecutionEnvironment env  =  StreamExecutionEnvironment.getExecutionEnvironment( ) ; env.setParallelism( 1 ) ; StreamTableEnvironment tableEnv =  StreamTableEnvironment.create( env, fsSettings) ; tableEnv.getConfig( ) .setSqlDialect( SqlDialect.DEFAULT) ; // 数据源表String sourceDDL = "CREATE TABLE mysql_binlog (\n "  +" id INT NOT NULL,\n "  +" name STRING,\n "  +" description STRING\n "  +") WITH (\n "  +" 'connector' = 'mysql-cdc',\n "  +" 'hostname' = 'localhost',\n "  +" 'port' = '3306',\n "  +" 'username' = 'flinkcdc',\n "  +" 'password' = 'dafei1288',\n "  +" 'database-name' = 'test',\n "  +" 'table-name' = 'test_cdc'\n "  +")" ; String url =  "jdbc:mysql://127.0.0.1:3306/test" ; String userName =  "root" ; String password =  "dafei1288" ; String mysqlSinkTable =  "test_cdc_sink" ; // 输出目标表String sinkDDL = "CREATE TABLE test_cdc_sink (\n "  +" id INT NOT NULL,\n "  +" name STRING,\n "  +" description STRING,\n "  +" PRIMARY KEY (id) NOT ENFORCED \n  "  +") WITH (\n "  +" 'connector' = 'jdbc',\n "  +" 'driver' = 'com.mysql.jdbc.Driver',\n "  +" 'url' = '"  + url + "',\n "  +" 'username' = '"  + userName + "',\n "  +" 'password' = '"  + password + "',\n "  +" 'table-name' = '"  + mysqlSinkTable + "'\n "  +")" ; // 简单的聚合处理String transformSQL = "insert into test_cdc_sink select * from mysql_binlog" ; tableEnv.executeSql( sourceDDL) ; tableEnv.executeSql( sinkDDL) ; TableResult result =  tableEnv.executeSql( transformSQL) ; // 等待flink-cdc完成快照result.print( ) ; env.execute( "sync-flink-cdc" ) ; } }