引言:
《异构数据源的CDC实时同步系统》 系列第一篇 (已完成)
《零编码打造异构数据实时同步系统——异构数据源CDC之2》 系列第二篇(已完成)
《零编码打造异构数据实时同步系统——异构数据源CDC之3》 系列第三篇(已完成)
《异构数据源的CDC实时同步系统——最终选型实战》 系列第四篇(已完成)
7.debezium
debezium是由redhat支持的开源分布式CDC系统,支持多端数据源,如mysql、mongodb、postgresql、oracle、sql server和Cassandra,社区非常活跃,很多的新功能和新数据源都在快速发展中,源码地址:https://github.com/debezium/debezium

我们使用debezium主要是看中它支持多数据源,同时与kafka的整合,在CDC领域不能忽略的一个商用产品是kafka conflent,在它的产品中,连接源端的组件就是debezium,我们一度就想使用这个商用

组件,但是试用版本仅支持一个broker,无法在真正的的生产环境使用,它的优势在于配置的可视化,后来我们使用kafka eagle来进行kafka的管理后,才彻底下定决心自己使用开源版本搞一套。我们最终采用的整体方案是debezium+kafka+kafka-connect-jdbc,管理端使用的kafka eagle.
关于confluent的资料,网上很多,我们在实际配置的过程中也参考了很多它的建议。
注意事项:
1)debezium需要设置的mysql权限:GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
2)采用阿里云的rds的mysql数据源非常坑,默认是不开启SHOW DATABASES权限的,需要在debezium中单独配置属性database.history.store.only.monitored.tables.ddl:true
3)debezium配合kafaka启动使用properties方式,也就是说第一个源需要配置为文本模式,后续可采用动态增加源的方式动态增加,但是文件模式需要为json
8.kafka-connect-jdbc
开源地址:https://github.com/confluentinc/kafka-connect-jdbc,它是confluent开源的兼容jdbc的数据库同步数据的kafka connect连接器。

这个组件支持的目的端的源非常多,理论上有java客户端的基本都支持,所以基本上可以涵盖你能用到的绝大多数数据源,它的延迟非常好,比之前的bireme好太多了,毕竟是国外大厂支持的组件,是国内小公司开源组件所不能比拟的。
9.最终选型方案

上图为我们最终确定的方案,在实际生产中,除了直接DB层级的数据实时同步外,我们还有一套pulsar的比较灵活的数据接口方案,不在此次讨论范围之内,也就是说我们最终实现了基于DB和业务层级的实时数据同步方案。

业界其他公司的CDC方案:



=======.实际生产配置过程:==========
1.kafka安装配置,以standalone为例
需要单独说明的是:因为gpdb6目前还不支持upsert模式,debezium的新增和更新均会导致一条新增加的完整数据到kafka,默认kafka按批提交的模式会造成gpdb6的主键冲突,需要修改模式为逐条应用,同时配合自己单独写的check程序进行offset错误的自动修正
#1)安装kafka,注意2.30有个bugtar -zxvf kafka_2.12-2.4.0.tgzcd kafka_2.12-2.4.0Vim config/server.properties #单机版只需要消息存放路径即可log.dirs=/opt/kafka_2.12-2.4.0/kafka-logs#增加可以删除topicdelete.topic.enable=true#保留日志大小:1GB,不设置的话会日志撑爆log.retention.bytes=1073741824mkdir -p /opt/kafka_2.12-2.4.0/kafka-logs#修改kafka的connect-standalone.properties设置为逐条应用consumer.max.poll.records=1#2)修改内置zk的配置vim config/zookeeper.properties#制定zk元数据存放路径dataDir=/opt/kafka_2.12-2.4.0/zdatamkdir -p /opt/kafka_2.12-2.4.0/zdata#3)启动服务,先启动zk再启动kafkacd /opt/kafka_2.12-2.4.0/nohup bin/zookeeper-server-start.sh config/zookeeper.properties &nohup bin/kafka-server-start.sh config/server.properties —加守护进程启动bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties
2.kafka基本命令
#4)查看服务是够启动 jps#5)创建一个测试用的topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查询topic列表:bin/kafka-topics.sh --list --zookeeper localhost:2181#查看topic信息:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test #删除topic(只会删除元数据):配置上面的delete.topic.enable=true后可生效bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test#手动删除文件:bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test./kafka-topics.sh --zookeeper 192.168.6.42:2181 --describe --topic itslawnode1./kafka-consumer-groups.sh --describe --group test-consumer-group --zookeeper localhost:2181 #查看offset信息bin/kafka-consumer-groups.sh --bootstrap-server 192.168.6.42:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 0#查看和删除群组:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group connect-sink-judge-up#从开始的消费信息: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test#6)创建控制台生产者生产数据bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#7)新开一个进程创建消费者数据bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
3.debezium安装配置
#下载debezium-connector-mysql,将文件中的jar包copy到kafka的libs目录cd /opt/kafka_2.12-2.4.0/tableconfig #tableconfig是新建目录,存放配置文件#######第一个启动的properties文件格式############name=authorization-mysql-connector-new-01connector.class=io.debezium.connector.mysql.MySqlConnectordatabase.hostname=mysql源IPdatabase.port=3306database.user=账号database.password=密码database.server.id=1database.server.name=debeziumdatabase.whitelist=platform_authorizationdatabase.serverTimezone=UTCtable.whitelist=platform_authorization.lawyer_authorization,platform_authorization.lawyer_authorization_recorddatabase.history.kafka.bootstrap.servers=localhost:9092database.history.kafka.topic=auth.platform_authorizationinclude.schema.changes=false#使用table名作为topic名字,因为machine.db.table默认topictransforms=routetransforms.route.type=org.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex=([^.]+).([^.]+).([^.]+)transforms.route.replacement=$3#不进行初始化,只获取当前的schema,初始化采用rds_dbsync比较方便,实际测试比init方式快几十倍,因为此处是逐条应用的snapshot.mode=schema_only##########json格式的文件#########{"name":"hanukkah-mysql-connector","config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"mysql主机名","database.port":"3306","database.user":"用户名","database.password":"密码","database.server.id":"1","database.server.name":"debezium","database.whitelist":"hanukkah","database.serverTimezone":"UTC","table.whitelist":"hanukkah.cooperation_lawyer","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"mysql1.hanukkah","include.schema.changes":"false","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+).([^.]+).([^.]+)","transforms.route.replacement":"$3","snapshot.mode":"schema_only"}}
4.sink配置
#首先下载kafka-connect-jdbc-5.3.1.jar并防止到kafka的libs目录即可{ "name": "sink-cooperation_lawyer-ins", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "cooperation_lawyer", "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用户&password=密码&stringtype=unspecified¤tSchema=当前schema名", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "insert.mode": "insert", "delete.enabled": "true","table.name.format": "platform.cooperation_lawyer", "pk.fields": "id", "pk.mode": "record_key" }}
需要额外说明的是:在目的是greenplum数仓环境下:
1)如果mysql源端字段类型是timestamp,则需要在gpdb端配置字段类型为timestamptz后无需额外配置sink项
2)如果mysql源端字段类型是datetime,则目的端字段类型需要配置为timestamp,同时需要sink文件中增补TimestampConverter配置项,有几个datetime字段配置几个配置项
3)如果mysql源端datetime配置了精度,需要debezium配置增加time.precision.mode=connect4) "auto.evolve": "true" 则源端表结构变更后会自动在目的端创建对应数据结构 "auto.create": "true" 则源端新增表后会自动同步到目的端
{ "name": "sink-pa_course-ins", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "pa_course", "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用户&password=密码&stringtype=unspecified¤tSchema=当前schema名", "transforms": "unwrap,TimestampConverter", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.unwrap.drop.tombstones": "false", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss", "transforms.TimestampConverter.field": "create_time", "transforms.TimestampConverter.target.type": "string", "auto.create": "true", "auto.evolve": "true", "insert.mode": "insert", "delete.enabled": "true", "pk.fields": "id", "pk.mode": "record_key" }}
5.启动服务
#启动kafka,进程多了Kafka和QuorumPeerMainbin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties#启动第一个sourcebin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>connector-logs/connector.log 2>&1 增补其他sourcecurl -X POST -H "Content-Type:application/json" -d @tableconfig/authorization-source.json http://localhost:8083/connectors/#启动sinkcurl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties ...#查看所有的connectors: curl -X GET http://127.0.0.1:8083/connectors
6.使用eagle进行topic状态查看和管理
关于kafka eagle的下载安装,特别简单,就不单独说明了,这里仅贴出效果图


从上图非常清楚的能看到哪些topic是有问题的(红色),绝大多数问题在于offset的错误导致的,在实际使用中我们通过一个简单python守护进程的代码进行了管理
import requestsimport loggingimport psycopg2import jsonimport reimport time# 获取数仓连接def get_gp_conn(): conn = psycopg2.connect(host="192.168.2.175", port=5432, user="name", password="password",dbname='datawarehouse') return conn'''删除目的端的主键ID'''def del_dup_id(tablefullname,dup_id): db = get_gp_conn() cursor = db.cursor() sql = "delete from "+ tablefullname +" where id='" + dup_id+"'" cursor.execute(sql) db.commit() cursor.close() db.close()'''重启sink'''def restart_sink(sinkname,configname): '''delurl = 'http://127.0.0.1:8083/connectors/'+ sinkname del_res = requests.delete(delurl) print("del resp:",del_res) url = 'http://127.0.0.1:8083/connectors/' headers = 'Content-Type:application/json,Accept:application/json' datas = 'tableconfig/' + configname start_res = requests.post(url,data=datas,headers=headers) print("start resp:",start_res) #checkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status' ''' url = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/restart' requests.post(url)'''检测任务状态'''def check_sink_status(sinkname,tablefullname,configname): sinkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status' print(sinkurl) resp = requests.get(sinkurl) status = json.loads(resp.text) state = status['state'] if state == 'FAILED': trace = status['trace'] pattern = re.compile(r'Key (id)=((.+)) already exists') search = re.search(pattern, trace) #print(search) if search: del_id = search.group(1) print('duplicate key is {}, now to del this record of target database'.format(del_id)) del_dup_id(tablefullname,del_id) restart_sink(sinkname,configname)'''获取任务列表'''def get_sink_list(): conn = get_gp_conn() cur = conn.cursor() cur.execute("select taskname,tableschema,tablename,configname from platform.tasklist where tablename is not null") print("current time is:",time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))) rows = cur.fetchall() for row in rows: taskname = row[0] schema = row[1] tablename =row[2] configname = row[3] tablefullname = schema +'.'+tablename check_sink_status(taskname,tablefullname,configname) cur.close() conn.close()if __name__ == '__main__': get_sink_list()
同时为了避免standalone进程的异常终止,我们用shell的守护进行进行了监控
#! /bin/bashfunction check(){ count=`ps -ef |grep $1 |grep -v "grep" |wc -l` #echo $count if [ 0 == $count ];then time=$(date "+%Y-%m-%d %H:%M:%S") echo "standalone restart at:${time}" cd /opt/kafka_2.12-2.4.0/ bin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>>connector-logs/connector.log 2>&1 & sleep 60scurl -X POST -H "Content-Type:application/json" -d @tableconfig/platform-source.json http://localhost:8083/connectors/ ...... sleep 10s curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties ..... fi } check ConnectStandalone
另外,在实际运行过程中会出现offset错误的情况,极其特殊情况下使用上面的方法无法快速解决问题,建议使用kafkacat查看详细信息,人为跳过offset,具体细节不再赘述。
如喜欢此专题,请关注并提问,技术人驱动自身的是永不停歇的渴望。