日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用ogg实现oracle到kafka的增量数据实时同步

發(fā)布時間:2025/3/8 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用ogg实现oracle到kafka的增量数据实时同步 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Oracle Golden Gate軟件是一種基于日志的結(jié)構(gòu)化數(shù)據(jù)復(fù)制備份軟件,它通過解析源數(shù)據(jù)庫在線日志或歸檔日志獲得數(shù)據(jù)的增量變化,再將這些變化應(yīng)用到目標(biāo)數(shù)據(jù)庫,從而實現(xiàn)源數(shù)據(jù)庫與目標(biāo)數(shù)據(jù)庫同步。

0、本篇中源端和目標(biāo)端的一些配置信息:

-版本OGG版本id地址
源端Oracle11gR2Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64Carlota3
目標(biāo)端kafka_2.12-2.5.0Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1Carlota2

源端和目標(biāo)端的文件不一樣,目標(biāo)端需要下載Oracle GoldenGate for Big Data,源端需要下載Oracle GoldenGate for Oracle!

PS:源端是安裝好了Oracle的機器,目標(biāo)端是安裝好了Kafka的機器,二者環(huán)境變量之前都配置好了。

1、源端OGG安裝

  • 先建立ogg目錄

    mkdir -p /opt/ogg
  • 解壓zip文件

    unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip
  • 解壓后得到一個tar包,再解壓這個tar

    tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
  • 使oracle用戶有ogg的權(quán)限,后面有些需要在oracle用戶下執(zhí)行才能成功

    chown -R oracle:oinstall /data/ogg
  • 配置OGG環(huán)境變量

    vim /etc/profile

    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=ORACLEHOME/lib:/usr/libexportPATH=ORACLE_HOME/lib:/usr/lib export PATH=ORACLEH?OME/lib:/usr/libexportPATH=OGG_HOME:$PATH

  • source /etc/profile

2、目標(biāo)端OGG安裝

  • 先建立ogg目錄

    mkdir -p /data/ogg
  • 解壓zip文件

    unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip
  • 解壓后得到一個tar包,再解壓這個tar

    tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar
  • 使oracle用戶有ogg的權(quán)限,后面有些需要在oracle用戶下執(zhí)行才能成功

    chown -R oracle:oinstall /data/ogg
  • 配置OGG環(huán)境變量

    vim /etc/profile

    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=JAVAHOME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64:JAVAH?OME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64/server:JAVAHOME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/libjsig.so:JAVAH?OME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/server/libjvm.so:OGGHOME/libexportPATH=OGG_HOME/lib export PATH=OGGH?OME/libexportPATH=OGG_HOME:$PATH

  • source /etc/profile
  • ggsci
  • create subdirs

3、源端Oracle歸檔模式設(shè)置

  • 登陸Oracle用戶

    su - oracle
  • 登陸Oracle

    sqlplus / as sysdba
  • 查看當(dāng)前是否為歸檔模式(若為Disabled,則需手動打開)

    archive log list

    Database log mode

    No Archive Mode Automatic archival

    Disabled Archive destination

    USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12

    Current log sequence 14

  • 立即關(guān)閉數(shù)據(jù)庫

    shutdown immediate
  • 啟動實例并加載數(shù)據(jù)庫,但不打開

    startup mount
  • 更改數(shù)據(jù)庫為歸檔模式

    alter database archivelog;
  • 打開數(shù)據(jù)庫

    alter database open;
  • 啟用自動歸檔

    alter system archive log start;
  • 再次查看當(dāng)前是否為歸檔模式(看到為Enabled,則成功打開歸檔模式。)

    archive log list

    Database log mode Archive Mode
    Automatic archival Enabled
    Archive destination USE_DB_RECOVERY_FILE_DEST
    Oldest online log sequence 12
    Next log sequence to archive 14
    Current log sequence 14

  • 查看輔助日志狀態(tài)(若為NO,則需要通過命令修改)

    select force_logging, supplemental_log_data_min from v$database;

    FORCE_ SUPPLEMENTAL_LOG


    NO NO

  • alter database force logging;
  • alter database add supplemental log data;
  • 再次查看輔助日志狀態(tài)(為YES即可)

    select force_logging, supplemental_log_data_min from v$database;

    FORCE_ SUPPLEMENTAL_LOG


    YES YES

4、源端oracle創(chuàng)建復(fù)制用戶

  • root用戶建立相關(guān)文件夾,并賦予權(quán)限

    mkdir -p /data/oracle/oggdata/orcl chown -R oracle:oinstall /data/oracle/oggdata/orcl
  • 執(zhí)行下面sql

    SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;Tablespace created.SQL> create user ogg identified by ogg default tablespace oggtbs;User created.SQL> grant dba to ogg;Grant succeeded.
  • Oracle創(chuàng)建測試表

    create user test_ogg identified by test_ogg default tablespace users; grant dba to test_ogg; conn test_ogg/test_ogg; create table test_ogg(id int ,name varchar(20),primary key(id));

5、OGG源端配置

  • ggsci
  • create subdirs
  • dblogin userid ogg password ogg
  • edit param ./globals

    oggschema ogg

  • 配置管理器mgr

    edit param mgr

    PORT 7809

    DYNAMICPORTLIST 7810-7909

    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *

    PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

  • 添加復(fù)制表

    add trandata test_ogg.test_ogginfo trandata test_ogg.test_ogg
  • 配置extract進程(ORACLE_SID與Orcale中的相同)

    edit param extkafka

    extract extkafka

    dynamicresolution

    SETENV (ORACLE_SID = “orcl11g”)

    SETENV (NLS_LANG = “american_america.AL32UTF8”)

    userid ogg,password ogg

    exttrail /da ta/ogg/dirdat/to

    table test_ogg.test_ogg;

    add extract extkafka,tranlog,begin now

    若報錯

    ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).

    執(zhí)行下面的命令再重新添加即可。

    create subdirs add exttrail /data/ogg/dirdat/to,extract extkafka
  • 配置pump進程

    edit param pukafka

    extract pukafka

    passthru

    dynamicresolution

    userid ogg,password ogg

    rmthost Carlota2 mgrport 7809

    rmttrail /data/ogg/dirdat/to

    table test_ogg.test_ogg;

    add extract pukafka,exttrailsource /data/ogg/dirdat/to add rmttrail /data/ogg/dirdat/to,extract pukafka
  • 配置define文件(Oracle與MySQL,Hadoop集群(HDFS,Hive,kafka等)等之間數(shù)據(jù)傳輸可以定義為異構(gòu)數(shù)據(jù)類型的傳輸,故需要定義表之間的關(guān)系映射,)

    edit param test_ogg

    defsfile /data/ogg/dirdef/test_ogg.test_ogg

    userid ogg,password ogg

    table test_ogg.test_ogg;

  • 返回終端執(zhí)行

    ./defgen paramfile dirprm/test_ogg.prm
  • 將生成的/data/ogg/dirdef/test_ogg.test_ogg發(fā)送的目標(biāo)端ogg目錄下的dirdef里:

    scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/

6、OGG目標(biāo)端配置

  • 開啟kafka服務(wù)

    zkServer.sh startkafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  • ggsic
  • 配置管理器mgr

    edit param mgr

    PORT 7809

    DYNAMICPORTLIST 7810-7909

    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *

    PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

  • 配置checkpoint

    edit param ./GLOBALS

    CHECKPOINTTABLE test_ogg.checkpoint

  • 配置replicate進程

    edit param rekafka

    REPLICAT rekafka

    sourcedefs /data/ogg/dirdef/test_ogg.test_ogg

    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

    REPORTCOUNT EVERY 1 MINUTES, RATE

    GROUPTRANSOPS 10000

    MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

  • 配置kafka.props(去掉注釋)

    cd /opt/ogg/dirprm/ vim kafka.props

    gg.handlerlist=kafkahandler //handler類型
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關(guān)配置
    gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,無需手動創(chuàng)建
    gg.handler.kafkahandler.format=json //傳輸文件的格式,支持json,xml等
    gg.handler.kafkahandler.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務(wù)傳輸一次
    gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/

    vim custom_kafka_producer.properties

    bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址
    acks=1
    compression.type=gzip //壓縮類型
    reconnect.backoff.ms=1000 //重連延時
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=102400
    linger.ms=10000

  • 添加trail文件到replicate進程

    add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint

7、測試

在源端和目標(biāo)端的OGG命令行下使用start [進程名]的形式啟動所有進程。
啟動順序按照源mgr——目標(biāo)mgr——源extract——源pump——目標(biāo)replicate來完成。
全部需要在ogg目錄下執(zhí)行g(shù)gsci目錄進入ogg命令行。
源端依次是

start mgr start extkafka start pukafka

目標(biāo)端

start mgr start rekafka

GGSCI (Carlota2) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT RUNNING REKAFKA 00:00:00 00:00:08

GGSCI (Carlota3) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00

EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10

現(xiàn)在源端執(zhí)行sql語句

conn test_ogg/test_ogg insert into test_ogg values(1,'test'); commit; update test_ogg set name='zhangsan' where id=1; commit; delete test_ogg where id=1; commit;

查看源端trail文件狀態(tài)

ls -l /data/ogg/dirdat/to*

查看目標(biāo)端trail文件狀態(tài)

ls -l /data/ogg/dirdat/to*

查看kafka是否自動建立對應(yīng)的主題

kafka-topics.sh --list --zookeeper localhost:2181

在列表中顯示有test_ogg則表示沒問題
通過消費者看是否有同步消息

kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}}

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}}

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}

總結(jié)

以上是生活随笔為你收集整理的使用ogg实现oracle到kafka的增量数据实时同步的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。