使用ogg实现oracle到kafka的增量数据实时同步

Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,从而实现源数据库与目标数据库同步。

0、本篇中源端和目标端的一些配置信息:

-版本OGG版本id地址
源端Oracle11gR2Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64Carlota3
目标端kafka_2.12-2.5.0Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1Carlota2

源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle!

PS:源端是安装好了Oracle的机器,目标端是安装好了Kafka的机器,二者环境变量之前都配置好了。

1、源端OGG安装

  • 先建立ogg目录

    mkdir -p /opt/ogg
    
  • 解压zip文件

    unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip
    
  • 解压后得到一个tar包,再解压这个tar

    tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
    
  • 使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功

    chown -R oracle:oinstall /data/ogg 
    
  • 配置OGG环境变量

    vim /etc/profile
    

    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=ORACLEHOME/lib:/usr/libexportPATH=ORACLE_HOME/lib:/usr/lib export PATH=ORACLEHOME/lib:/usr/libexportPATH=OGG_HOME:$PATH

  • source /etc/profile
    

2、目标端OGG安装

  • 先建立ogg目录

    mkdir -p /data/ogg
    
  • 解压zip文件

    unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip
    
  • 解压后得到一个tar包,再解压这个tar

    tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar
    
  • 使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功

    chown -R oracle:oinstall /data/ogg 
    
  • 配置OGG环境变量

    vim /etc/profile
    

    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=JAVAHOME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64:JAVAHOME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64/server:JAVAHOME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/libjsig.so:JAVAHOME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/server/libjvm.so:OGGHOME/libexportPATH=OGG_HOME/lib export PATH=OGGHOME/libexportPATH=OGG_HOME:$PATH

  • source /etc/profile
    
  • ggsci
    
  • create subdirs
    

3、源端Oracle归档模式设置

  • 登陆Oracle用户

    su - oracle
    
  • 登陆Oracle

    sqlplus / as sysdba
    
  • 查看当前是否为归档模式(若为Disabled,则需手动打开)

    archive log list 
    

    Database log mode

    No Archive Mode Automatic archival

    Disabled Archive destination

    USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12

    Current log sequence 14

  • 立即关闭数据库

    shutdown immediate 
    
  • 启动实例并加载数据库,但不打开

    startup mount 
    
  • 更改数据库为归档模式

    alter database archivelog; 
    
  • 打开数据库

    alter database open;
    
  • 启用自动归档

    alter system archive log start; 
    
  • 再次查看当前是否为归档模式(看到为Enabled,则成功打开归档模式。)

    archive log list 
    

    Database log mode Archive Mode
    Automatic archival Enabled
    Archive destination USE_DB_RECOVERY_FILE_DEST
    Oldest online log sequence 12
    Next log sequence to archive 14
    Current log sequence 14

  • 查看辅助日志状态(若为NO,则需要通过命令修改)

    select force_logging, supplemental_log_data_min from v$database;
    

    FORCE_ SUPPLEMENTAL_LOG


    NO NO

  • alter database force logging;
    
  • alter database add supplemental log data;
    
  • 再次查看辅助日志状态(为YES即可)

    select force_logging, supplemental_log_data_min from v$database;
    

    FORCE_ SUPPLEMENTAL_LOG


    YES YES

4、源端oracle创建复制用户

  • root用户建立相关文件夹,并赋予权限

    mkdir -p /data/oracle/oggdata/orcl
    
    chown -R oracle:oinstall /data/oracle/oggdata/orcl
    
  • 执行下面sql

    SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;Tablespace created.SQL>  create user ogg identified by ogg default tablespace oggtbs;User created.SQL> grant dba to ogg;Grant succeeded.
  • Oracle创建测试表

    create user test_ogg  identified by test_ogg default tablespace users;
    grant dba to test_ogg;
    conn test_ogg/test_ogg;
    create table test_ogg(id int ,name varchar(20),primary key(id));
    

5、OGG源端配置

  • ggsci
    
  • create subdirs
    
  • dblogin userid ogg password ogg
    
  • edit param ./globals
    

    oggschema ogg

  • 配置管理器mgr

    edit param mgr
    

    PORT 7809

    DYNAMICPORTLIST 7810-7909

    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *

    PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

  • 添加复制表

    add trandata test_ogg.test_ogginfo trandata test_ogg.test_ogg
    
  • 配置extract进程(ORACLE_SID与Orcale中的相同)

    edit param extkafka
    

    extract extkafka

    dynamicresolution

    SETENV (ORACLE_SID = “orcl11g”)

    SETENV (NLS_LANG = “american_america.AL32UTF8”)

    userid ogg,password ogg

    exttrail /da ta/ogg/dirdat/to

    table test_ogg.test_ogg;

    add extract extkafka,tranlog,begin now
    

    若报错

    ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).

    执行下面的命令再重新添加即可。

    create subdirs
    
    add exttrail /data/ogg/dirdat/to,extract extkafka
    
  • 配置pump进程

    edit param pukafka
    

    extract pukafka

    passthru

    dynamicresolution

    userid ogg,password ogg

    rmthost Carlota2 mgrport 7809

    rmttrail /data/ogg/dirdat/to

    table test_ogg.test_ogg;

    add extract pukafka,exttrailsource /data/ogg/dirdat/to
    
    add rmttrail /data/ogg/dirdat/to,extract pukafka
    
  • 配置define文件(Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,)

    edit param test_ogg
    

    defsfile /data/ogg/dirdef/test_ogg.test_ogg

    userid ogg,password ogg

    table test_ogg.test_ogg;

  • 返回终端执行

    ./defgen paramfile dirprm/test_ogg.prm
    
  • 将生成的/data/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

    scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/ 
    

6、OGG目标端配置

  • 开启kafka服务

    zkServer.sh startkafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
    
  • ggsic
    
  • 配置管理器mgr

    edit param mgr
    

    PORT 7809

    DYNAMICPORTLIST 7810-7909

    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *

    PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

  • 配置checkpoint

    edit  param  ./GLOBALS
    

    CHECKPOINTTABLE test_ogg.checkpoint

  • 配置replicate进程

    edit param rekafka
    

    REPLICAT rekafka

    sourcedefs /data/ogg/dirdef/test_ogg.test_ogg

    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

    REPORTCOUNT EVERY 1 MINUTES, RATE

    GROUPTRANSOPS 10000

    MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

  • 配置kafka.props(去掉注释)

    cd /opt/ogg/dirprm/
    vim kafka.props
    

    gg.handlerlist=kafkahandler //handler类型
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置
    gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建
    gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
    gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
    gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/

    vim custom_kafka_producer.properties
    

    bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址
    acks=1
    compression.type=gzip //压缩类型
    reconnect.backoff.ms=1000 //重连延时
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=102400
    linger.ms=10000

  • 添加trail文件到replicate进程

    add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
    

7、测试

在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。
启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。
全部需要在ogg目录下执行ggsci目录进入ogg命令行。
源端依次是

start mgr
start extkafka
start pukafka

目标端

start mgr
start rekafka

GGSCI (Carlota2) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT RUNNING REKAFKA 00:00:00 00:00:08

GGSCI (Carlota3) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00

EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10

现在源端执行sql语句

conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

查看源端trail文件状态

ls -l /data/ogg/dirdat/to*

查看目标端trail文件状态

ls -l /data/ogg/dirdat/to*

查看kafka是否自动建立对应的主题

kafka-topics.sh --list --zookeeper localhost:2181

在列表中显示有test_ogg则表示没问题
通过消费者看是否有同步消息

kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}}

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}}

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/535796.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

转载:35岁前成功的12条黄金法则

习惯的力量是惊人的。习惯能载着你走向成功,也能驮着你滑向失败。如何选择,完全取决于你自己。 1.习惯的力量:35岁以前养成好习惯 你想成功吗?那就及早培养有利于成功的好习惯。 习惯的力量是惊人的,35岁…

JDK源码解析之 Java.lang.Object

Object类是Java中其他所有类的祖先,没有Object类Java面向对象无从谈起。作为其他所有类的基类,Object具有哪些属性和行为,是Java语言设计背后的思维体现。 Object类位于java.lang包中,java.lang包包含着Java最基础和核心的类&…

将z-blog改成英文blog所遇到的问题

1.将z-blog中文章日期中的“年,月,日”改成英文 相关模板:b_article-multi.htmlb_article-single.html默认用的时间标签是<#article/posttime/longdate#> 即 "2007年1月13日" 这样的形式你可以换成 <#article/posttime/shortdate#>即 "2…

JDK源码解析之 Java.lang.String

String 类代表字符串。Java 程序中的所有字符串字面值&#xff08;如 “abc” &#xff09;都作为此类的实例实现。 字符串是常量&#xff1b;它们的值在创建之后不能更改。字符串缓冲区支持可变的字符串。因为 String 对象是不可变的&#xff0c;所以可以共享。 一、类定义 p…

看到一个blog的标语,有意思!

"上世纪80年代勇气&#xff0c;90年代靠关系&#xff0c;现在必须靠知识能力&#xff01;挣钱靠1、兴趣广泛&#xff1b; 2、感觉敏锐&#xff1b; 3、集中力强&#xff1b; 4、个性不脆弱&#xff08;坚韧性&#xff09;&#xff1b; 5、能在瞬间了解因果关系&#xff1b…

JDK源码解析之 Java.lang.AbstractStringBuilder

这个抽象类是StringBuilder和StringBuffer的直接父类&#xff0c;而且定义了很多方法&#xff0c;因此在学习这两个类之间建议先学习 AbstractStringBuilder抽象类 该类在源码中注释是以JDK1.5开始作为前两个类的父类存在的&#xff0c;可是直到JDK1.8的API中&#xff0c;关于S…

RHEL下安装配置基于2台服务器的MYSQL集群

一、介绍这篇文档旨在介绍如何在RHEL下安装配置基于2台服务器的MySQL集群。并且实现任意一台服务器出现问题或宕机时MySQL依然能够继续运行。 注意&#xff01;虽然这是基于2台服务器的MySQL集群&#xff0c;但也必须有额外的第三台服务器作为管理节点&#xff0c;但这台服务器…

JDK源码解析之 Java.lang.StringBuffer

StringBuffer类表示一个可变的字符序列。StringBuffer的API与StringBuilder互相兼容&#xff0c;但是StringBuffer是线程安全的。在可能的情况下&#xff0c;建议优先使用StringBuilder&#xff0c;因为在大多数实现中它比StringBuffer更快。 一、类定义 public final class S…

redo和undo

这是在网上看到的对redo和undo的探讨&#xff1a; 1. redo 记录所有做过的事情&#xff0c;用于恢复 undo 记录事务的前镜相&#xff0c;用于回滚2. redo&#xff0c;恢复数据库时&#xff0c;按照重做日志文件来恢复你之前的操作 undo&#xff0c;撤消你做过的操作&#xff0…

JDK源码解析之 Java.lang.StringBuilder

StringBuilder类表示一个可变的字符序列。StringBuilder的API与StringBuffer互相兼容&#xff0c;但是StringBuilder是非线程安全的&#xff0c;在大多数实现中它比StringBuffer更快。 一、类定义 public final class StringBufferextends AbstractStringBuilderimplements ja…

从映射观点看索引

信息检索主要有“检”与“索&#xff08;办手续&#xff09;”两个动作。在图书馆借书时&#xff0c;一般而言&#xff0c; 找书的时间比办理手续的时间长得多&#xff0c;因而缩短检查时间是提高效率的关键。数据库中检 索信息也与此类似。 在没有索引文件时&#xff0c;DBM…

JDK源码解析之 Java.lang.Boolean

Boolean 类是将 boolean 基本类型进行包装。类型为 Boolean 的对象包含一个单一属性 value&#xff0c;其类型为 boolean。 此外还提供了许多将 boolean 转换为 String、String 转换为 boolean&#xff0c;以及其他一些方法。 一、类定义 public final class Boolean implemen…

MYSQL的集群的安装与配置(mysql-5.1.21)

具体安装与配置&#xff1a;1&#xff09;准备工作&#xff1a;6台机器&#xff0c;IP地址分别为192.168.0.&#xff08;231-236&#xff09;MGM节点&#xff1a;192.168.0.231(232)SQL 节点&#xff1a;192.168.0.233-234NDBD 节点&#xff1a;192.168.0.235-236系统都是REDHA…

JDK源码解析之 Java.lang.Byte

byte&#xff0c;即字节&#xff0c;由8位的二进制组成。在Java中&#xff0c;byte类型的数据是8位带符号的二进制数,以二进制补码表示的整数 取值范围&#xff1a;默认值为0&#xff0c;最小值为-128&#xff08;-27&#xff09;;最大值是127&#xff08;27-1&#xff09; Byt…

在命令行模式下管理SELinux

作者&#xff1a; Oslad.com (原创&#xff01;转载请注明出处) 2006-07-14 在 GUI 图形界面模式下&#xff0c;要更改 SELinux 的策略使用方式&#xff0c;只需依次点击“应用程序”&#xff0c;“系统设置”&#xff0c;“安全级别”&#xff1b;然后在“安全级别配置”对…

JDK源码解析之 Java.lang.Double

Double类是原始类型double的包装类&#xff0c;它包含若干有效处理double值的方法&#xff0c;如将其转换为字符串表示形式&#xff0c;反之亦然。Double类的对象可以包含一个double值。 Double类包装原始类型的值 double中的对象。类型的对象 Double包含一个类型为的字段 doub…

网页搜索帮助-禁止搜索引擎收录的方法

什么是robots.txt文件?搜索引擎使用spider程序自动访问互联网上的网页并获取网页信息。spider在访问一个网站时&#xff0c;会首先会检查该网站的根域下是否有一个叫做robots.txt的纯文本文件。您可以在您的网站中创建一个纯文本文件robots.txt&#xff0c;在文件中声明该网站…

JDK源码解析之 Java.lang.Float

Float类是原始类型float的包装类&#xff0c;它包含若干有效处理浮点值的方法&#xff0c;如将其转换为字符串表示形式&#xff0c;反之亦然。Float类的一个对象可以包含一个浮点值 一、类定义 public final class Float extends Number implements Comparable<Float> {…

FTP两种工作模式:主动模式(Active FTP)和被动模式(Passive FTP)

在主动模式下&#xff0c;FTP客户端随机开启一个大于1024的端口N向服务器的21号端口发起连接&#xff0c;然后开放N1号端口进行监听&#xff0c;并向服务器发出PORT N 1命令。服务器接收到命令后&#xff0c;会用其本地的FTP数据端口&#xff08;通常是20&#xff09;来连接客户…

JDK源码解析之 java.lang.Integer

teger 基本数据类型int 的包装类 Integer 类型的对象包含一个 int 类型的字段 一、类定义 public final class Integer extends Number implements Comparable<Integer>{}类被声明为final的,表示不能被继承;继承了Number抽象类,可以用于数字类型的一系列转换;实现了Comp…