原理:
- 同步原理:其实就是伪装成一个mysql 的从库会拉取主库的binlog日志读取数据,相当于mysql 的主从复制。然而flink的数据处理方式是流处理,实时收集清洗数据。
- 相关联的checkpoint,其实就是一个容错恢复快照,没执行后,会保存一个当前处理数据的offset,如果有job异常停止,或者checkpoint失败,那么下次checkpoint将从上次失败的地方继续处理数据。容错恢复的算法是异步屏障算法。
1.自定函数:
利用flink-table的TableFunction表函数
flink-core包的Tuple函数Api,实现代码:
public class ASI_UDTF extends TableFunction<Tuple1<String[]>> {public void eval(String str1) {if (Strings.isNullOrEmpty(str1)) {collect(null);} else {String[] split1 = str1.split(",");Tuple1<String[]> of1 = Tuple1.of(split1);collect(of1);}}
}
在flink控制台里把自定义函数的jar包生成一个对应的函数;
注意:
- 函数的入参判空!!!
- 用lateral table派生表关联时,也要注意如果是null值的情况,所以要外连接,例如:
LEFT JOIN lateral table (trans_to_array(gss.`goods_specification_values`)) as F(gss_array_values) ON TRUE
2.oracle-cdc同步到mysql-jdbc的场景:
oracle-cdc同步到mysql-jdbc,需要驱动jar,三个包:
flink-connector-jdbc-3.0.0-1.16.jar
flink-sql-connector-oracle-cdc-2.3.0.jar
mysql-connector-java-5.1.49.jar;
使用flinksql的方式:
oracle-cdc,create source table的参考格式
Flink SQL> CREATE TABLE products (ID INT NOT NULL,NAME STRING,DESCRIPTION STRING,WEIGHT DECIMAL(10, 3),PRIMARY KEY(id) NOT ENFORCED) WITH ('connector' = 'oracle-cdc','hostname' = 'localhost','port' = '1521','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'XE','schema-name' = 'inventory','table-name' = 'products');
=======================================
mysql-jdbc,create sink table参考格式:
Flink SQL>CREATE TABLE vehicle_info (id BIGINT,company_name STRING,PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.0.33:3306/mayi_user','table-name' = 'vehicle_info','username' = 'mayi_admin','password' = '1q2w3e4r'
);
相关使用链接:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html