oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战

引言:

《异构数据源的CDC实时同步系统》 系列第一篇 (已完成)

《零编码打造异构数据实时同步系统——异构数据源CDC之2》 系列第二篇(已完成)

《零编码打造异构数据实时同步系统——异构数据源CDC之3》 系列第三篇(已完成)

《异构数据源的CDC实时同步系统——最终选型实战》 系列第四篇(已完成)

7.debezium

debezium是由redhat支持的开源分布式CDC系统,支持多端数据源,如mysql、mongodb、postgresql、oracle、sql server和Cassandra,社区非常活跃,很多的新功能和新数据源都在快速发展中,源码地址:https://github.com/debezium/debezium

e09d3f8be993a55e3748806aa5c50a31.png

我们使用debezium主要是看中它支持多数据源,同时与kafka的整合,在CDC领域不能忽略的一个商用产品是kafka conflent,在它的产品中,连接源端的组件就是debezium,我们一度就想使用这个商用

d5356678059a85c6ab4eac1fadc2a375.png

组件,但是试用版本仅支持一个broker,无法在真正的的生产环境使用,它的优势在于配置的可视化,后来我们使用kafka eagle来进行kafka的管理后,才彻底下定决心自己使用开源版本搞一套。我们最终采用的整体方案是debezium+kafka+kafka-connect-jdbc,管理端使用的kafka eagle.

关于confluent的资料,网上很多,我们在实际配置的过程中也参考了很多它的建议。

注意事项:

1)debezium需要设置的mysql权限:GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
2)采用阿里云的rds的mysql数据源非常坑,默认是不开启SHOW DATABASES权限的,需要在debezium中单独配置属性database.history.store.only.monitored.tables.ddl:true
3)debezium配合kafaka启动使用properties方式,也就是说第一个源需要配置为文本模式,后续可采用动态增加源的方式动态增加,但是文件模式需要为json

8.kafka-connect-jdbc

开源地址:https://github.com/confluentinc/kafka-connect-jdbc,它是confluent开源的兼容jdbc的数据库同步数据的kafka connect连接器。

b0273fb993d1b7c2b892f7a454b064fa.png

这个组件支持的目的端的源非常多,理论上有java客户端的基本都支持,所以基本上可以涵盖你能用到的绝大多数数据源,它的延迟非常好,比之前的bireme好太多了,毕竟是国外大厂支持的组件,是国内小公司开源组件所不能比拟的。

9.最终选型方案

a847eca9bd933a15046e70614de8a8c5.png

上图为我们最终确定的方案,在实际生产中,除了直接DB层级的数据实时同步外,我们还有一套pulsar的比较灵活的数据接口方案,不在此次讨论范围之内,也就是说我们最终实现了基于DB和业务层级的实时数据同步方案。

00698278032cd3465569e42622429e26.png

业界其他公司的CDC方案:

465bc91233ffc55abb0880e55972b1c8.png
16ca9789034863da0208e7ff7ece99b9.png
bd6e3923d2a176948cc917c33a7425a9.png

=======.实际生产配置过程:==========

1.kafka安装配置,以standalone为例

需要单独说明的是:因为gpdb6目前还不支持upsert模式,debezium的新增和更新均会导致一条新增加的完整数据到kafka,默认kafka按批提交的模式会造成gpdb6的主键冲突,需要修改模式为逐条应用,同时配合自己单独写的check程序进行offset错误的自动修正

#1)安装kafka,注意2.30有个bugtar -zxvf kafka_2.12-2.4.0.tgzcd kafka_2.12-2.4.0Vim config/server.properties    #单机版只需要消息存放路径即可log.dirs=/opt/kafka_2.12-2.4.0/kafka-logs#增加可以删除topicdelete.topic.enable=true#保留日志大小:1GB,不设置的话会日志撑爆log.retention.bytes=1073741824mkdir -p /opt/kafka_2.12-2.4.0/kafka-logs#修改kafka的connect-standalone.properties设置为逐条应用consumer.max.poll.records=1#2)修改内置zk的配置vim config/zookeeper.properties#制定zk元数据存放路径dataDir=/opt/kafka_2.12-2.4.0/zdatamkdir -p /opt/kafka_2.12-2.4.0/zdata#3)启动服务,先启动zk再启动kafkacd /opt/kafka_2.12-2.4.0/nohup bin/zookeeper-server-start.sh config/zookeeper.properties  &nohup bin/kafka-server-start.sh config/server.properties —加守护进程启动bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties

2.kafka基本命令

#4)查看服务是够启动    jps#5)创建一个测试用的topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查询topic列表:bin/kafka-topics.sh --list --zookeeper localhost:2181#查看topic信息:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test                          #删除topic(只会删除元数据):配置上面的delete.topic.enable=true后可生效bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test#手动删除文件:bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test./kafka-topics.sh --zookeeper 192.168.6.42:2181 --describe --topic itslawnode1./kafka-consumer-groups.sh  --describe --group test-consumer-group --zookeeper localhost:2181 #查看offset信息bin/kafka-consumer-groups.sh --bootstrap-server 192.168.6.42:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 0#查看和删除群组:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group connect-sink-judge-up#从开始的消费信息: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test#6)创建控制台生产者生产数据bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#7)新开一个进程创建消费者数据bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

3.debezium安装配置

#下载debezium-connector-mysql,将文件中的jar包copy到kafka的libs目录cd /opt/kafka_2.12-2.4.0/tableconfig  #tableconfig是新建目录,存放配置文件#######第一个启动的properties文件格式############name=authorization-mysql-connector-new-01connector.class=io.debezium.connector.mysql.MySqlConnectordatabase.hostname=mysql源IPdatabase.port=3306database.user=账号database.password=密码database.server.id=1database.server.name=debeziumdatabase.whitelist=platform_authorizationdatabase.serverTimezone=UTCtable.whitelist=platform_authorization.lawyer_authorization,platform_authorization.lawyer_authorization_recorddatabase.history.kafka.bootstrap.servers=localhost:9092database.history.kafka.topic=auth.platform_authorizationinclude.schema.changes=false#使用table名作为topic名字,因为machine.db.table默认topictransforms=routetransforms.route.type=org.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex=([^.]+).([^.]+).([^.]+)transforms.route.replacement=$3#不进行初始化,只获取当前的schema,初始化采用rds_dbsync比较方便,实际测试比init方式快几十倍,因为此处是逐条应用的snapshot.mode=schema_only##########json格式的文件#########{"name":"hanukkah-mysql-connector","config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"mysql主机名","database.port":"3306","database.user":"用户名","database.password":"密码","database.server.id":"1","database.server.name":"debezium","database.whitelist":"hanukkah","database.serverTimezone":"UTC","table.whitelist":"hanukkah.cooperation_lawyer","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"mysql1.hanukkah","include.schema.changes":"false","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+).([^.]+).([^.]+)","transforms.route.replacement":"$3","snapshot.mode":"schema_only"}}                                            

4.sink配置

#首先下载kafka-connect-jdbc-5.3.1.jar并防止到kafka的libs目录即可{    "name": "sink-cooperation_lawyer-ins",    "config": {        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",        "tasks.max": "1",        "topics": "cooperation_lawyer",        "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用户&password=密码&stringtype=unspecified¤tSchema=当前schema名",        "transforms": "unwrap",        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        "transforms.unwrap.drop.tombstones": "false",        "auto.create": "true",        "insert.mode": "insert",        "delete.enabled": "true","table.name.format": "platform.cooperation_lawyer",        "pk.fields": "id",        "pk.mode": "record_key"    }}

需要额外说明的是:在目的是greenplum数仓环境下:
1)如果mysql源端字段类型是timestamp,则需要在gpdb端配置字段类型为timestamptz后无需额外配置sink项
2)如果mysql源端字段类型是datetime,则目的端字段类型需要配置为timestamp,同时需要sink文件中增补TimestampConverter配置项,有几个datetime字段配置几个配置项
3)如果mysql源端datetime配置了精度,需要debezium配置增加time.precision.mode=connect
4) "auto.evolve": "true" 则源端表结构变更后会自动在目的端创建对应数据结构 "auto.create": "true" 则源端新增表后会自动同步到目的端

{    "name": "sink-pa_course-ins",    "config": {        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",        "tasks.max": "1",        "topics": "pa_course",        "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用户&password=密码&stringtype=unspecified¤tSchema=当前schema名",        "transforms": "unwrap,TimestampConverter",        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        "transforms.unwrap.drop.tombstones": "false",        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",        "transforms.TimestampConverter.field": "create_time",        "transforms.TimestampConverter.target.type": "string",        "auto.create": "true",        "auto.evolve": "true",        "insert.mode": "insert",        "delete.enabled": "true",        "pk.fields": "id",        "pk.mode": "record_key"    }}

5.启动服务

#启动kafka,进程多了Kafka和QuorumPeerMainbin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties#启动第一个sourcebin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>connector-logs/connector.log 2>&1 增补其他sourcecurl -X POST -H  "Content-Type:application/json" -d @tableconfig/authorization-source.json http://localhost:8083/connectors/#启动sinkcurl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties ...#查看所有的connectors:     curl -X GET  http://127.0.0.1:8083/connectors 

6.使用eagle进行topic状态查看和管理

关于kafka eagle的下载安装,特别简单,就不单独说明了,这里仅贴出效果图

c80ea4eb175bc95aaa42c7acb8915f11.png
129f3fad2e25ab31e26341c064aed2b7.png

从上图非常清楚的能看到哪些topic是有问题的(红色),绝大多数问题在于offset的错误导致的,在实际使用中我们通过一个简单python守护进程的代码进行了管理

import requestsimport loggingimport psycopg2import jsonimport reimport time# 获取数仓连接def get_gp_conn():    conn = psycopg2.connect(host="192.168.2.175", port=5432, user="name", password="password",dbname='datawarehouse')    return conn'''删除目的端的主键ID'''def del_dup_id(tablefullname,dup_id):    db = get_gp_conn()    cursor = db.cursor()    sql = "delete from "+ tablefullname +" where id='" + dup_id+"'"    cursor.execute(sql)    db.commit()    cursor.close()    db.close()'''重启sink'''def restart_sink(sinkname,configname):    '''delurl = 'http://127.0.0.1:8083/connectors/'+ sinkname    del_res = requests.delete(delurl)    print("del resp:",del_res)    url = 'http://127.0.0.1:8083/connectors/'    headers = 'Content-Type:application/json,Accept:application/json'    datas = 'tableconfig/' + configname    start_res = requests.post(url,data=datas,headers=headers)    print("start resp:",start_res)    #checkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status'    '''    url = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/restart'    requests.post(url)'''检测任务状态'''def check_sink_status(sinkname,tablefullname,configname):    sinkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status'    print(sinkurl)    resp = requests.get(sinkurl)    status = json.loads(resp.text)    state = status['state']    if state == 'FAILED':        trace = status['trace']        pattern = re.compile(r'Key (id)=((.+)) already exists')        search = re.search(pattern, trace)        #print(search)        if search:            del_id = search.group(1)            print('duplicate key is {}, now to del this record of target database'.format(del_id))            del_dup_id(tablefullname,del_id)            restart_sink(sinkname,configname)'''获取任务列表'''def get_sink_list():    conn = get_gp_conn()    cur = conn.cursor()    cur.execute("select taskname,tableschema,tablename,configname from platform.tasklist where tablename is not null")    print("current time is:",time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))    rows = cur.fetchall()    for row in rows:        taskname = row[0]        schema = row[1]        tablename =row[2]        configname = row[3]        tablefullname = schema +'.'+tablename        check_sink_status(taskname,tablefullname,configname)    cur.close()    conn.close()if __name__ == '__main__':    get_sink_list()

同时为了避免standalone进程的异常终止,我们用shell的守护进行进行了监控

#! /bin/bashfunction check(){      count=`ps -ef |grep $1 |grep -v "grep" |wc -l`      #echo $count      if [ 0 == $count ];then        time=$(date "+%Y-%m-%d %H:%M:%S")        echo "standalone restart at:${time}"        cd /opt/kafka_2.12-2.4.0/        bin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>>connector-logs/connector.log 2>&1 &        sleep 60scurl -X POST -H  "Content-Type:application/json" -d @tableconfig/platform-source.json http://localhost:8083/connectors/    ......        sleep 10s        curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties     .....      fi    }    check ConnectStandalone

另外,在实际运行过程中会出现offset错误的情况,极其特殊情况下使用上面的方法无法快速解决问题,建议使用kafkacat查看详细信息,人为跳过offset,具体细节不再赘述。

如喜欢此专题,请关注并提问,技术人驱动自身的是永不停歇的渴望。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/352455.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用DynamoDBMapper插入DynamoDB项目

在上一篇文章中,我们使用DynamoDBMapper来将DynamoDB表映射到Java对象。 插入时,我们的动作几乎相同,但是方式更方便。 为了插入项目,您要做的就是使用对象映射器持久化对象 在我们的例子中,我们将创建一个执行简单插…

在与 SQL Server 建立连接时出现与网络相关的或特定于实例的错误。未找到或无法访问服务器,sqlserver

今早开机发现,打开SQL Server 2008 的 SQL Server Management Studio,输入sa的密码发现,无法登陆数据库?提示以下错误: “在与 SQL Server 建立连接时出现与网络相关的或特定于实例的错误。未找到或无法访问服务器。请…

matlab 与数据库操作

第一部分:读取数据其相关信息查看 1. 配置数据源 (1).“控制面板”----“管理工具”——“数据源(ODBC)”——“添加”——“创建数据源名称和选择数据库文件”。具体操作见数据源配置: http://blog.c…

PHP05

php05 1、音乐案例删除部分 1)通过执行某些PHP代码获取到指定的数据,填充到html的指定位置 accept属性也可以直接写扩展名,多个扩展名间用英文的逗号分隔 accept".lrc" 允许选中多个文件,文件域属性:multiple 直接写该属…

算术运算导致溢出_CPU的控制器与运算器

计算机基础方面的知识,对于一些非科班出身的同学来讲,一直是他们心中的痛,而对于科班出身的同学,很多同学在工作之后,也意识到自身所学知识的不足与欠缺,想回头补补基础知识。关于计算机基础的课程很多&…

Windows下,Netbeans使用C++的配置方法

如果尚未安装 NetBeans IDE,请转至http://www.netbeans.org/community/releases/,然后下载包含最近的 C/C 支持的 IDE 版本。 如果已安装 NetBeans IDE 但未包含 C/C 支持,请完成以下步骤为 IDE 添加 C/C 支持。 从主菜单中选择“工具”>“…

为了简单起见,Arquillian Chameleon

使用Arquillian时,您需要做的一件事就是定义要在哪个容器下执行所有测试。 这是通过在适配器的类路径中添加依赖项并取决于所使用的模式(嵌入式,托管或远程)来下载的来完成的。 手动安装应用程序服务器。 例如,当在嵌…

matlab 读取文件夹底下所有txt文件

读取matlab 文件夹底下所有的txt 文件包括子文件夹底下的文件。 dirsdir(H:\DCE); for i1:length(dirs); if (dirs(i).isdir && ~strcmp(dirs(i).name,.) && ~strcmp(dirs(i).name,..) ) str_path[H:\DCE\, dirs(i).name]; filedir([str_path,\*.txt]);…

BZOJ4856 JSOI2016 病毒感染 区间DP

传送门 原Word文档 题意:太长不给 这种题目一看就是区间DP 设$f_i$表示治愈了前$i$个村子的时候最少死了多少村民,又设前缀和为$sum_i$,通过枚举折返时最后经过的村子$j$,并且提前计算$i1$到$N$中死的村民数量,可以得到…

go get 的不再src目录中_Go 每日一库之 sqlc:根据 sql 生成代码

简介在 Go 语言中编写数据库操作代码真的非常痛苦!database/sql标准库提供的都是比较底层的接口。我们需要编写大量重复的代码。大量的模板代码不仅写起来烦,而且还容易出错。有时候字段类型修改了一下,可能就需要改动很多地方;添…

查看linux服务器的系统信息

查看linux服务器的系统信息新接手了几台linux的服务器,第一步当然是要了解这些服务器的软硬件配置.现在就写出我这次用的一些命令.首先当然要取得机器的IP,用户名和密码(呵呵,不知道就找原来的管理员要哈)登陆之后,首先看到的就是机器的名称,一般提示符就有了,如[rootlocalhost…

Windows下更改MySQL数据库的存储位置

一.同一主机下位置的转移 在mysql安装完成后,要修改数据库存储的位置,比如从安装目录下的C:\Program Files\MySQL\MySQL Server 5.0\Data文件夹转移到D:\mySQLData文件夹。 1、在D:\下新建mySQLData文件夹 2、停止MySQL服务,将C:\Program Fil…

apk去除签名验证工具安卓版_App 签名过期或泄露怎么办?别担心,Google 已经给出解决方案!...

一、序在将 App 发布到市场之前,很重要的一个步骤就是为 APK 进行签名,大部分时候,这个操作隐藏在了打包的流程中,而不被我们注意到。签名的作用,除了证明 App 的所有权之外,还可以帮助 Android 市场和设备…

初入WEBOTS半个月来的一点总结

对于我这一个从十一假期结束后才刚刚接触webots的小白来说,webots简直就是一个噩梦。在这里记录一下噩梦的发展历程。 在最初安装webots时就遇到了麻烦。安装7.0.3总是出问题。程序找不到接入点。替换dll文件也没有用更新运行库也不行。最好索性重新买固态硬盘做了个…

skype 回放设备出现问题

最近用skype拨打电话换台电脑会出现“回放设备出现问题”,拨通号码后会自动跳掉无法拨通 一直更新声音驱动器,结果发现跟声卡驱动半毛钱关系都没有,是要插入耳麦。

sonar java_修复Sonar中常见的Java安全代码冲突

sonar java本文旨在向您展示如何快速修复最常见的Java安全代码冲突。 它假定您熟悉代码规则和违规的概念以及Sonar如何对其进行报告。 但是,如果您以前从未听过这些术语,则可以阅读Sonar Concepts或即将出版的有关Sonar的书 ,以获取更详细的解…

linux服务器性能查看

################### cpu性能查看 ############################################################1、查看物理cpu个数:cat /proc/cpuinfo |grep "physical id"|sort|uniq|wc -l2、查看每个物理cpu中的core个数:cat /proc/cpuinfo |grep "…

android sdk 安装_Appium+python自动化1-环境安装(上)

前言appium可以说是做app自动化测试最火的一个框架,它的主要优势是支持android和ios,另外脚本语言也是支持java和Python。小编擅长Python,所以接下来的教程是appiumpython的实例。学习appium最大的难处在于环境的安装,80%的人死于…

Mean

题目描述 NiroBC 是猫咪学堂一年级的新生,开学第一天,学堂组织了一场迎新会,在 迎新会上,猫咪们会互相赠送礼物。 一年级的新生共有 N 只猫咪,编号为 1 . . . N(包括 NiroBC 自己),其…

如何开启mysql计划事件

首先在sql中查询计划事件的状态:SHOW VARIABLES LIKE event_scheduler如果返回的是off表示当前是关闭状态,如果是on当前已经开启了计划任务。在mysql程序的目录下找到my.ini文件,添加一个项:event_scheduler 1保存后重启mysql服务…