Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,从而实现源数据库与目标数据库同步。
0、本篇中源端和目标端的一些配置信息:
| - | 版本 | OGG版本 | id地址 | 
|---|---|---|---|
| 源端 | Oracle11gR2 | Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64 | Carlota3 | 
| 目标端 | kafka_2.12-2.5.0 | Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1 | Carlota2 | 
源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle!
PS:源端是安装好了Oracle的机器,目标端是安装好了Kafka的机器,二者环境变量之前都配置好了。
1、源端OGG安装
-  先建立ogg目录 mkdir -p /opt/ogg
-  解压zip文件 unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip
-  解压后得到一个tar包,再解压这个tar tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
-  使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功 chown -R oracle:oinstall /data/ogg
-  配置OGG环境变量 vim /etc/profileexport OGG_HOME=/opt/ogg 
 export LD_LIBRARY_PATH=ORACLEHOME/lib:/usr/libexportPATH=ORACLE_HOME/lib:/usr/lib export PATH=ORACLEHOME/lib:/usr/libexportPATH=OGG_HOME:$PATH
-  source /etc/profile
2、目标端OGG安装
-  先建立ogg目录 mkdir -p /data/ogg
-  解压zip文件 unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip
-  解压后得到一个tar包,再解压这个tar tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar
-  使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功 chown -R oracle:oinstall /data/ogg
-  配置OGG环境变量 vim /etc/profileexport OGG_HOME=/opt/ogg 
 export LD_LIBRARY_PATH=JAVAHOME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64:JAVAHOME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64/server:JAVAHOME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/libjsig.so:JAVAHOME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/server/libjvm.so:OGGHOME/libexportPATH=OGG_HOME/lib export PATH=OGGHOME/libexportPATH=OGG_HOME:$PATH
-  source /etc/profile
-  ggsci
-  create subdirs
3、源端Oracle归档模式设置
-  登陆Oracle用户 su - oracle
-  登陆Oracle sqlplus / as sysdba
-  查看当前是否为归档模式(若为Disabled,则需手动打开) archive log listDatabase log mode No Archive Mode Automatic archival Disabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Current log sequence 14 
-  立即关闭数据库 shutdown immediate
-  启动实例并加载数据库,但不打开 startup mount
-  更改数据库为归档模式 alter database archivelog;
-  打开数据库 alter database open;
-  启用自动归档 alter system archive log start;
-  再次查看当前是否为归档模式(看到为Enabled,则成功打开归档模式。) archive log listDatabase log mode Archive Mode 
 Automatic archival Enabled
 Archive destination USE_DB_RECOVERY_FILE_DEST
 Oldest online log sequence 12
 Next log sequence to archive 14
 Current log sequence 14
-  查看辅助日志状态(若为NO,则需要通过命令修改) select force_logging, supplemental_log_data_min from v$database;FORCE_ SUPPLEMENTAL_LOG 
 NO NO 
-  alter database force logging;
-  alter database add supplemental log data;
-  再次查看辅助日志状态(为YES即可) select force_logging, supplemental_log_data_min from v$database;FORCE_ SUPPLEMENTAL_LOG 
 YES YES 
4、源端oracle创建复制用户
-  root用户建立相关文件夹,并赋予权限 mkdir -p /data/oracle/oggdata/orclchown -R oracle:oinstall /data/oracle/oggdata/orcl
-  执行下面sql SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;Tablespace created.SQL> create user ogg identified by ogg default tablespace oggtbs;User created.SQL> grant dba to ogg;Grant succeeded.
-  Oracle创建测试表 create user test_ogg identified by test_ogg default tablespace users; grant dba to test_ogg; conn test_ogg/test_ogg; create table test_ogg(id int ,name varchar(20),primary key(id));
5、OGG源端配置
-  ggsci
-  create subdirs
-  dblogin userid ogg password ogg
-  edit param ./globalsoggschema ogg 
-  配置管理器mgr edit param mgrPORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 * PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3 
-  添加复制表 add trandata test_ogg.test_ogginfo trandata test_ogg.test_ogg
-  配置extract进程(ORACLE_SID与Orcale中的相同) edit param extkafkaextract extkafka dynamicresolution SETENV (ORACLE_SID = “orcl11g”) SETENV (NLS_LANG = “american_america.AL32UTF8”) userid ogg,password ogg exttrail /da ta/ogg/dirdat/to table test_ogg.test_ogg; add extract extkafka,tranlog,begin now若报错 ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory). 执行下面的命令再重新添加即可。 create subdirsadd exttrail /data/ogg/dirdat/to,extract extkafka
-  配置pump进程 edit param pukafkaextract pukafka passthru dynamicresolution userid ogg,password ogg rmthost Carlota2 mgrport 7809 rmttrail /data/ogg/dirdat/to table test_ogg.test_ogg; add extract pukafka,exttrailsource /data/ogg/dirdat/toadd rmttrail /data/ogg/dirdat/to,extract pukafka
-  配置define文件(Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,) edit param test_oggdefsfile /data/ogg/dirdef/test_ogg.test_ogg userid ogg,password ogg table test_ogg.test_ogg; 
-  返回终端执行 ./defgen paramfile dirprm/test_ogg.prm
-  将生成的/data/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里: scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/
6、OGG目标端配置
-  开启kafka服务 zkServer.sh startkafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
-  ggsic
-  配置管理器mgr edit param mgrPORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 * PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3 
-  配置checkpoint edit param ./GLOBALSCHECKPOINTTABLE test_ogg.checkpoint 
-  配置replicate进程 edit param rekafkaREPLICAT rekafka sourcedefs /data/ogg/dirdef/test_ogg.test_ogg TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg; 
-  配置kafka.props(去掉注释) cd /opt/ogg/dirprm/ vim kafka.propsgg.handlerlist=kafkahandler //handler类型 
 gg.handler.kafkahandler.type=kafka
 gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置
 gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建
 gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
 gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
 gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/vim custom_kafka_producer.propertiesbootstrap.servers=192.168.44.129:9092 //kafkabroker的地址 
 acks=1
 compression.type=gzip //压缩类型
 reconnect.backoff.ms=1000 //重连延时
 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 batch.size=102400
 linger.ms=10000
-  添加trail文件到replicate进程 add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
7、测试
在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。
 启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。
 全部需要在ogg目录下执行ggsci目录进入ogg命令行。
 源端依次是
start mgr
start extkafka
start pukafka
目标端
start mgr
start rekafka
GGSCI (Carlota2) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REKAFKA 00:00:00 00:00:08
GGSCI (Carlota3) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00
EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10
现在源端执行sql语句
conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;
查看源端trail文件状态
ls -l /data/ogg/dirdat/to*
查看目标端trail文件状态
ls -l /data/ogg/dirdat/to*
查看kafka是否自动建立对应的主题
kafka-topics.sh --list --zookeeper localhost:2181
在列表中显示有test_ogg则表示没问题
 通过消费者看是否有同步消息
kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning
{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}}
{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}}
{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}