日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

基于debezium实时数据同步(Oracle为主)

發(fā)布時(shí)間:2024/3/13 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于debezium实时数据同步(Oracle为主) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

基于debezium實(shí)時(shí)數(shù)據(jù)同步

  • 全部需要下載的內(nèi)容鏈接
  • 1、下載zookeeper-3.4.10
  • 2、下載kafka_2.13-2.8.0
  • 3、下載Kafka Connector:建議使用1.6以上版本可以對(duì)ddl進(jìn)行捕獲
  • 4、安裝debezium-connector-oracle
    • 4.1下載debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解壓,安裝在自己的服務(wù)器,我的安裝目錄是/home/debezium/
    • 4.2、將debezium-connector-oracle 目錄下得jar包都拷貝一份到${KAFKA_HOME}/libs中
    • 4.3、Oracle需要下載客戶端并把jar包復(fù)制到${KAFKA_HOME}/libs
  • 5、kafka環(huán)境修改,使用集群方式配置,但其實(shí)kafka非集群搭建
  • 6、啟動(dòng)zookeeper、kafka,connect-distributed環(huán)境
    • 6.1.進(jìn)入zookeeper目錄
    • 6.2.進(jìn)入kafka目錄
    • 6.3.以環(huán)境配置方式啟動(dòng)connect-distributed
  • 7、提交Oracle-connector,監(jiān)視Oracle數(shù)據(jù)庫(kù)
  • 8、查看創(chuàng)建的kafka connector列表
  • 9、查看創(chuàng)建的kafka connector狀態(tài)
  • 10、查看創(chuàng)建的kafka connector配置
  • 11、查看kafka中topic
  • 12、flinksqlclient創(chuàng)建表并測(cè)試
  • 附上:Oracle的歸檔開(kāi)啟
    • Oracle 開(kāi)啟歸檔日志
    • 創(chuàng)建 新得表空間與dbzuser,并賦予相應(yīng)得權(quán)限
    • 暫時(shí)可以不用,官網(wǎng)有做要求,暫時(shí)沒(méi)明白有什么用
  • kafka查看topic和消息內(nèi)容命令
    • 1、查詢topic,進(jìn)入kafka目錄:
    • 2、查詢topic內(nèi)容:

全部需要下載的內(nèi)容鏈接

https://download.csdn.net/download/u010978399/21733452

1、下載zookeeper-3.4.10

https://blog.csdn.net/She_lock/article/details/80435176?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control

2、下載kafka_2.13-2.8.0

kafka安裝參考:

https://blog.csdn.net/weixin_39984161/article/details/91971731?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522161959594516780262520102%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=161959594516780262520102&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-91971731.pc_search_result_before_js&utm_term=linux%E5%AE%89%E8%A3%85kafka

3、下載Kafka Connector:建議使用1.6以上版本可以對(duì)ddl進(jìn)行捕獲

debezium-connector-mysql-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.0.Final/debezium-connector-mysql-1.6.0.Final-plugin.tar.gz

debezium-connector-postgres-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.0.Final/debezium-connector-postgres-1.6.0.Final-plugin.tar.gz

debezium-connector-oracle-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.6.0.Final/debezium-connector-oracle-1.6.0.Final-plugin.tar.gz

4、安裝debezium-connector-oracle

4.1下載debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解壓,安裝在自己的服務(wù)器,我的安裝目錄是/home/debezium/

4.2、將debezium-connector-oracle 目錄下得jar包都拷貝一份到${KAFKA_HOME}/libs中

4.3、Oracle需要下載客戶端并把jar包復(fù)制到${KAFKA_HOME}/libs

客戶端下載地址:

https://download.oracle.com/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip

5、kafka環(huán)境修改,使用集群方式配置,但其實(shí)kafka非集群搭建

kafka安裝目錄:/home/kafka/kafka_2.13-2.8.0/

單機(jī)部署修改 [connect-standalone.properties]
集群部署修改 [connect-distributed.properties]

bootstrap.servers=192.168.1.121:9092 plugin.path=/home/debezium/debezium-connector-oraclegroup.id=amirdebezium // kafka connect內(nèi)部信息保存到kafka時(shí)消息的序列化方式 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=falseinternal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false// kafka connect內(nèi)部需要用到的三個(gè)topic config.storage.topic=amir-connect-configs offset.storage.topic=amir-connect-offsets status.storage.topic=amir-connect-statusesconfig.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1offset.flush.interval.ms=10000 rest.advertised.host.name=192.168.1.121cleanup.policy=compact rest.host.name=192.168.1.121 rest.port=8085

6、啟動(dòng)zookeeper、kafka,connect-distributed環(huán)境

6.1.進(jìn)入zookeeper目錄

啟動(dòng)zookeeper

sh zkServer.sh start

停止zookeeper

sh zkServer.sh stop

6.2.進(jìn)入kafka目錄

啟動(dòng)kafka

/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

關(guān)閉kafka

/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

6.3.以環(huán)境配置方式啟動(dòng)connect-distributed

加載環(huán)境

export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/home/kafka/kafka_2.13-2.8.0/config/connect-log4j.properties

啟動(dòng)

./bin/connect-distributed.sh /home/kafka/kafka_2.13-2.8.0/config/connect-distributed.properties &

末尾 一定要加上符號(hào)&是為了后臺(tái)運(yùn)行,這樣就不會(huì)頁(yè)面一關(guān),服務(wù)就沒(méi)有了

7、提交Oracle-connector,監(jiān)視Oracle數(shù)據(jù)庫(kù)

這個(gè)就是在liunx里面,命令直接貼進(jìn)去

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://172.16.50.22:8085/connectors/ -d ' { "name": "debezium-oracle", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "tasks.max" : "1", "database.server.name" : "XE", "database.hostname" : "172.16.50.239", "database.port" : "1521", "database.user" : "amir", "database.password" : "amir", "database.dbname" : "XE", "database.schema" : "MSCDW", "database.connection.adapter": "logminer", "database.tablename.case.insensitive": "true", "table.include.list" : "MSCDW.*", "snapshot.mode" : "initial", "schema.include.list" : "MSCDW", "database.history.kafka.bootstrap.servers" : "172.16.50.22:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'

8、查看創(chuàng)建的kafka connector列表

鏈接:

172.16.50.22:8085/connectors

9、查看創(chuàng)建的kafka connector狀態(tài)

鏈接:

172.16.50.22:8085/connectors/debezium-oracle/status

這里的debezium-oracle是上一步查出來(lái)的名稱

10、查看創(chuàng)建的kafka connector配置

鏈接:

172.16.50.22:8085/connectors/debezium-oracle/config

11、查看kafka中topic

當(dāng)環(huán)境搭建好之后,默認(rèn)為每個(gè)表創(chuàng)建一個(gè)屬于自己的主題,如圖所示,小編這里使用的kafka Tool工具查看,注意這里的主題為XE.SCOTT.DEPT,而非XE.MSCDW.CONFIG,其實(shí)按照上述步驟應(yīng)該是MSCDW,但因?yàn)樵趯?xiě)文檔的時(shí)候忘記放這塊的內(nèi)容,是后來(lái)才發(fā)現(xiàn)補(bǔ)的,補(bǔ)的時(shí)候配置是監(jiān)聽(tīng)SCOTT庫(kù)的DDL,就懶的換了。

12、flinksqlclient創(chuàng)建表并測(cè)試

CREATE TABLE sinkMysqlConfigTable ( ID STRING, CRON STRING ) WITH ( ‘connector.type’ = ‘jdbc’, ‘connector.url’ = ‘jdbc:mysql://IP:3306/admin’, ‘connector.table’ = ‘config’, ‘connector.username’ = ‘root’, ‘connector.password’ = ‘dhcc@2020, ‘connector.write.flush.max-rows’ =1);CREATE TABLE createOracleConfigTable ( id STRING, cron STRING ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ =XE.MSCDW.CONFIG, ‘properties.bootstrap.servers’ =172.16.50.22:9092, ‘debezium-json.schema-include’ =false, ‘properties.group.id’ = ‘a(chǎn)mirdebezium’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘value.format’ = ‘debezium-json’ );

附上:Oracle的歸檔開(kāi)啟

#按要求修改,不然會(huì)報(bào)錯(cuò)

alter system set db_recovery_file_dest_size=5G;

Oracle 開(kāi)啟歸檔日志

#開(kāi)啟行模式

alter database add supplemental log data (all) columns;

創(chuàng)建 新得表空間與dbzuser,并賦予相應(yīng)得權(quán)限

CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/amir/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO dbzuser; GRANT SELECT ON V_$DATABASE TO dbzuser; GRANT FLASHBACK ANY TABLE TO dbzuser; GRANT SELECT ANY TABLE TO dbzuser; GRANT SELECT_CATALOG_ROLE TO dbzuser; GRANT EXECUTE_CATALOG_ROLE TO dbzuser; GRANT SELECT ANY TRANSACTION TO dbzuser; GRANT SELECT ANY DICTIONARY TO dbzuser;GRANT CREATE TABLE TO dbzuser; GRANT ALTER ANY TABLE TO dbzuser; GRANT LOCK ANY TABLE TO dbzuser; GRANT CREATE SEQUENCE TO dbzuser;GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser; GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser; GRANT SELECT ON V_$LOGFILE TO dbzuser; GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;

暫時(shí)可以不用,官網(wǎng)有做要求,暫時(shí)沒(méi)明白有什么用

CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS; GRANT CONNECT TO debezium; GRANT CREATE SESSION TO debezium; GRANT CREATE TABLE TO debezium; GRANT CREATE SEQUENCE to debezium; ALTER USER debezium QUOTA 100M on users;

kafka查看topic和消息內(nèi)容命令

1、查詢topic,進(jìn)入kafka目錄:

bin/kafka-topics.sh --list --zookeeper localhost:2181

2、查詢topic內(nèi)容:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning

總結(jié)

以上是生活随笔為你收集整理的基于debezium实时数据同步(Oracle为主)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。