完整依赖
< dependencies> < ! -- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-core< /artifactId> < version> 1.13 .0 < /version> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-streaming-java_2.12 < /artifactId> < version> 1.13 .0 < /version> < /dependency> < ! -- < dependency> -->
< ! -- < groupId> org.apache.flink< /groupId> -->
< ! -- < artifactId> flink-jdbc_2.12 < /artifactId> -->
< ! -- < version> 1.10 .3 < /version> -->
< ! -- < /dependency> --> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-connector-jdbc_2.12 < /artifactId> < version> 1.13 .0 < /version> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-java< /artifactId> < version> 1.13 .0 < /version> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-clients_2.12 < /artifactId> < version> 1.13 .0 < /version> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-api-java-bridge_2.12 < /artifactId> < version> 1.13 .0 < /version> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-common< /artifactId> < version> 1.13 .0 < /version> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-planner_2.12 < /artifactId> < version> 1.13 .0 < /version> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-planner-blink_2.12 < /artifactId> < version> 1.13 .0 < /version> < /dependency> < dependency> < groupId> org.apache.flink< /groupId> < artifactId> flink-table-planner-blink_2.12 < /artifactId> < version> 1.13 .0 < /version> < type> test-jar< /type> < /dependency> < dependency> < groupId> com.alibaba.ververica< /groupId> < artifactId> flink-connector-mysql-cdc< /artifactId> < version> 1.4 .0 < /version> < /dependency> < dependency> < groupId> com.aliyun< /groupId> < artifactId> flink-connector-clickhouse< /artifactId> < version> 1.12 .0 < /version> < /dependency> < dependency> < groupId> ru.yandex.clickhouse< /groupId> < artifactId> clickhouse-jdbc< /artifactId> < version> 0.2 .6 < /version> < /dependency> < dependency> < groupId> com.google.code.gson< /groupId> < artifactId> gson< /artifactId> < version> 2.8 .6 < /version> < /dependency> < /dependencies>
Flink CDC
package name.lijiaqi.cdc; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.HashMap; public class MySqlBinlogSourceExample { public static void main( String[ ] args) throws Exception { SourceFunction< String> sourceFunction = MySQLSource.< String> builder( ) .hostname( "localhost" ) .port( 3306 ) .databaseList( "test" ) .username( "flinkcdc" ) .password( "dafei1288" ) .deserializer( new JsonDebeziumDeserializationSchema( )) .build( ) ; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment( ) ; // 添加 source env.addSource( sourceFunction) // 添加 sink.addSink( new ClickhouseSink( )) ; env.execute( "mysql2clickhouse" ) ; } // 将cdc数据反序列化public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema { @Overridepublic void deserialize( SourceRecord sourceRecord, Collector collector) throws Exception { Gson jsstr = new Gson( ) ; HashMap< String, Object> hs = new HashMap<> ( ) ; String topic = sourceRecord.topic( ) ; String[ ] split = topic.split( "[.]" ) ; String database = split[ 1 ] ; String table = split[ 2 ] ; hs.put( "database" ,database) ; hs.put( "table" ,table) ; //获取操作类型Envelope.Operation operation = Envelope.operationFor( sourceRecord) ; //获取数据本身Struct struct = ( Struct) sourceRecord.value( ) ; Struct after = struct.getStruct( "after" ) ; if ( after != null) { Schema schema = after.schema( ) ; HashMap< String, Object> afhs = new HashMap<> ( ) ; for ( Field field : schema.fields( )) { afhs.put( field.name( ) , after.get( field.name( )) ) ; } hs.put( "data" ,afhs) ; } String type = operation.toString( ) .toLowerCase( ) ; if ( "create" .equals( type)) { type = "insert" ; } hs.put( "type" ,type) ; collector.collect( jsstr.toJson( hs)) ; } @Overridepublic TypeInformation< String> getProducedType ( ) { return BasicTypeInfo.STRING_TYPE_INFO; } } public static class ClickhouseSink extends RichSinkFunction< String> { Connection connection; PreparedStatement pstmt; private Connection getConnection ( ) { Connection conn = null; try { Class.forName( "ru.yandex.clickhouse.ClickHouseDriver" ) ; String url = "jdbc:clickhouse://localhost:8123/default" ; conn = DriverManager.getConnection( url,"default" ,"dafei1288" ) ; } catch ( Exception e) { e.printStackTrace( ) ; } return conn; } @Overridepublic void open( Configuration parameters) throws Exception { super.open( parameters) ; connection = getConnection( ) ; String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)" ; pstmt = connection.prepareStatement( sql) ; } // 每条记录插入时调用一次public void invoke( String value, Context context) throws Exception { //{ "database" : "test" ,"data" :{ "name" : "jacky" ,"description" : "fffff" ,"id" :8} ,"type" : "insert" ,"table" : "test_cdc" } Gson t = new Gson( ) ; HashMap< String,Object> hs = t.fromJson( value,HashMap.class) ; String database = ( String) hs.get( "database" ) ; String table = ( String) hs.get( "table" ) ; String type = ( String) hs.get( "type" ) ; if( "test" .equals( database) && "test_cdc" .equals( table)) { if( "insert" .equals( type)) { System.out.println( "insert => " +value) ; LinkedTreeMap< String,Object> data = ( LinkedTreeMap< String,Object> ) hs.get( "data" ) ; String name = ( String) data.get( "name" ) ; String description = ( String) data.get( "description" ) ; Double id = ( Double) data.get( "id" ) ; // 未前面的占位符赋值pstmt.setInt( 1 , id.intValue( )) ; pstmt.setString( 2 , name) ; pstmt.setString( 3 , description) ; pstmt.executeUpdate( ) ; } } } @Overridepublic void close( ) throws Exception { super.close( ) ; if( pstmt != null) { pstmt.close( ) ; } if( connection != null) { connection.close( ) ; } } }
}
Flink SQL CDC
package name.lijiaqi.cdc; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class MysqlToMysqlMain { public static void main( String[ ] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance( ) .useBlinkPlanner( ) .inStreamingMode( ) .build( ) ; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment( ) ; env.setParallelism( 1 ) ; StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env, fsSettings) ; tableEnv.getConfig( ) .setSqlDialect( SqlDialect.DEFAULT) ; // 数据源表String sourceDDL = "CREATE TABLE mysql_binlog (\n " +" id INT NOT NULL,\n " +" name STRING,\n " +" description STRING\n " +") WITH (\n " +" 'connector' = 'mysql-cdc',\n " +" 'hostname' = 'localhost',\n " +" 'port' = '3306',\n " +" 'username' = 'flinkcdc',\n " +" 'password' = 'dafei1288',\n " +" 'database-name' = 'test',\n " +" 'table-name' = 'test_cdc'\n " +")" ; String url = "jdbc:mysql://127.0.0.1:3306/test" ; String userName = "root" ; String password = "dafei1288" ; String mysqlSinkTable = "test_cdc_sink" ; // 输出目标表String sinkDDL = "CREATE TABLE test_cdc_sink (\n " +" id INT NOT NULL,\n " +" name STRING,\n " +" description STRING,\n " +" PRIMARY KEY (id) NOT ENFORCED \n " +") WITH (\n " +" 'connector' = 'jdbc',\n " +" 'driver' = 'com.mysql.jdbc.Driver',\n " +" 'url' = '" + url + "',\n " +" 'username' = '" + userName + "',\n " +" 'password' = '" + password + "',\n " +" 'table-name' = '" + mysqlSinkTable + "'\n " +")" ; // 简单的聚合处理String transformSQL = "insert into test_cdc_sink select * from mysql_binlog" ; tableEnv.executeSql( sourceDDL) ; tableEnv.executeSql( sinkDDL) ; TableResult result = tableEnv.executeSql( transformSQL) ; // 等待flink-cdc完成快照result.print( ) ; env.execute( "sync-flink-cdc" ) ; } }