日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

基于debezium实时数据同步(Oracle为主)

發(fā)布時間:2024/3/13 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于debezium实时数据同步(Oracle为主) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

基于debezium實時數(shù)據(jù)同步

  • 全部需要下載的內(nèi)容鏈接
  • 1、下載zookeeper-3.4.10
  • 2、下載kafka_2.13-2.8.0
  • 3、下載Kafka Connector:建議使用1.6以上版本可以對ddl進行捕獲
  • 4、安裝debezium-connector-oracle
    • 4.1下載debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解壓,安裝在自己的服務器,我的安裝目錄是/home/debezium/
    • 4.2、將debezium-connector-oracle 目錄下得jar包都拷貝一份到${KAFKA_HOME}/libs中
    • 4.3、Oracle需要下載客戶端并把jar包復制到${KAFKA_HOME}/libs
  • 5、kafka環(huán)境修改,使用集群方式配置,但其實kafka非集群搭建
  • 6、啟動zookeeper、kafka,connect-distributed環(huán)境
    • 6.1.進入zookeeper目錄
    • 6.2.進入kafka目錄
    • 6.3.以環(huán)境配置方式啟動connect-distributed
  • 7、提交Oracle-connector,監(jiān)視Oracle數(shù)據(jù)庫
  • 8、查看創(chuàng)建的kafka connector列表
  • 9、查看創(chuàng)建的kafka connector狀態(tài)
  • 10、查看創(chuàng)建的kafka connector配置
  • 11、查看kafka中topic
  • 12、flinksqlclient創(chuàng)建表并測試
  • 附上:Oracle的歸檔開啟
    • Oracle 開啟歸檔日志
    • 創(chuàng)建 新得表空間與dbzuser,并賦予相應得權限
    • 暫時可以不用,官網(wǎng)有做要求,暫時沒明白有什么用
  • kafka查看topic和消息內(nèi)容命令
    • 1、查詢topic,進入kafka目錄:
    • 2、查詢topic內(nèi)容:

全部需要下載的內(nèi)容鏈接

https://download.csdn.net/download/u010978399/21733452

1、下載zookeeper-3.4.10

https://blog.csdn.net/She_lock/article/details/80435176?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control

2、下載kafka_2.13-2.8.0

kafka安裝參考:

https://blog.csdn.net/weixin_39984161/article/details/91971731?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522161959594516780262520102%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=161959594516780262520102&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-91971731.pc_search_result_before_js&utm_term=linux%E5%AE%89%E8%A3%85kafka

3、下載Kafka Connector:建議使用1.6以上版本可以對ddl進行捕獲

debezium-connector-mysql-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.0.Final/debezium-connector-mysql-1.6.0.Final-plugin.tar.gz

debezium-connector-postgres-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.0.Final/debezium-connector-postgres-1.6.0.Final-plugin.tar.gz

debezium-connector-oracle-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.6.0.Final/debezium-connector-oracle-1.6.0.Final-plugin.tar.gz

4、安裝debezium-connector-oracle

4.1下載debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解壓,安裝在自己的服務器,我的安裝目錄是/home/debezium/

4.2、將debezium-connector-oracle 目錄下得jar包都拷貝一份到${KAFKA_HOME}/libs中

4.3、Oracle需要下載客戶端并把jar包復制到${KAFKA_HOME}/libs

客戶端下載地址:

https://download.oracle.com/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip

5、kafka環(huán)境修改,使用集群方式配置,但其實kafka非集群搭建

kafka安裝目錄:/home/kafka/kafka_2.13-2.8.0/

單機部署修改 [connect-standalone.properties]
集群部署修改 [connect-distributed.properties]

bootstrap.servers=192.168.1.121:9092 plugin.path=/home/debezium/debezium-connector-oraclegroup.id=amirdebezium // kafka connect內(nèi)部信息保存到kafka時消息的序列化方式 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=falseinternal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false// kafka connect內(nèi)部需要用到的三個topic config.storage.topic=amir-connect-configs offset.storage.topic=amir-connect-offsets status.storage.topic=amir-connect-statusesconfig.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1offset.flush.interval.ms=10000 rest.advertised.host.name=192.168.1.121cleanup.policy=compact rest.host.name=192.168.1.121 rest.port=8085

6、啟動zookeeper、kafka,connect-distributed環(huán)境

6.1.進入zookeeper目錄

啟動zookeeper

sh zkServer.sh start

停止zookeeper

sh zkServer.sh stop

6.2.進入kafka目錄

啟動kafka

/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

關閉kafka

/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

6.3.以環(huán)境配置方式啟動connect-distributed

加載環(huán)境

export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/home/kafka/kafka_2.13-2.8.0/config/connect-log4j.properties

啟動

./bin/connect-distributed.sh /home/kafka/kafka_2.13-2.8.0/config/connect-distributed.properties &

末尾 一定要加上符號&是為了后臺運行,這樣就不會頁面一關,服務就沒有了

7、提交Oracle-connector,監(jiān)視Oracle數(shù)據(jù)庫

這個就是在liunx里面,命令直接貼進去

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://172.16.50.22:8085/connectors/ -d ' { "name": "debezium-oracle", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "tasks.max" : "1", "database.server.name" : "XE", "database.hostname" : "172.16.50.239", "database.port" : "1521", "database.user" : "amir", "database.password" : "amir", "database.dbname" : "XE", "database.schema" : "MSCDW", "database.connection.adapter": "logminer", "database.tablename.case.insensitive": "true", "table.include.list" : "MSCDW.*", "snapshot.mode" : "initial", "schema.include.list" : "MSCDW", "database.history.kafka.bootstrap.servers" : "172.16.50.22:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'

8、查看創(chuàng)建的kafka connector列表

鏈接:

172.16.50.22:8085/connectors

9、查看創(chuàng)建的kafka connector狀態(tài)

鏈接:

172.16.50.22:8085/connectors/debezium-oracle/status

這里的debezium-oracle是上一步查出來的名稱

10、查看創(chuàng)建的kafka connector配置

鏈接:

172.16.50.22:8085/connectors/debezium-oracle/config

11、查看kafka中topic

當環(huán)境搭建好之后,默認為每個表創(chuàng)建一個屬于自己的主題,如圖所示,小編這里使用的kafka Tool工具查看,注意這里的主題為XE.SCOTT.DEPT,而非XE.MSCDW.CONFIG,其實按照上述步驟應該是MSCDW,但因為在寫文檔的時候忘記放這塊的內(nèi)容,是后來才發(fā)現(xiàn)補的,補的時候配置是監(jiān)聽SCOTT庫的DDL,就懶的換了。

12、flinksqlclient創(chuàng)建表并測試

CREATE TABLE sinkMysqlConfigTable ( ID STRING, CRON STRING ) WITH ( ‘connector.type’ = ‘jdbc’, ‘connector.url’ = ‘jdbc:mysql://IP:3306/admin’, ‘connector.table’ = ‘config’, ‘connector.username’ = ‘root’, ‘connector.password’ = ‘dhcc@2020, ‘connector.write.flush.max-rows’ =1);CREATE TABLE createOracleConfigTable ( id STRING, cron STRING ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ =XE.MSCDW.CONFIG, ‘properties.bootstrap.servers’ =172.16.50.22:9092, ‘debezium-json.schema-include’ =false, ‘properties.group.id’ = ‘a(chǎn)mirdebezium’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘value.format’ = ‘debezium-json’ );

附上:Oracle的歸檔開啟

#按要求修改,不然會報錯

alter system set db_recovery_file_dest_size=5G;

Oracle 開啟歸檔日志

#開啟行模式

alter database add supplemental log data (all) columns;

創(chuàng)建 新得表空間與dbzuser,并賦予相應得權限

CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/amir/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO dbzuser; GRANT SELECT ON V_$DATABASE TO dbzuser; GRANT FLASHBACK ANY TABLE TO dbzuser; GRANT SELECT ANY TABLE TO dbzuser; GRANT SELECT_CATALOG_ROLE TO dbzuser; GRANT EXECUTE_CATALOG_ROLE TO dbzuser; GRANT SELECT ANY TRANSACTION TO dbzuser; GRANT SELECT ANY DICTIONARY TO dbzuser;GRANT CREATE TABLE TO dbzuser; GRANT ALTER ANY TABLE TO dbzuser; GRANT LOCK ANY TABLE TO dbzuser; GRANT CREATE SEQUENCE TO dbzuser;GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser; GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser; GRANT SELECT ON V_$LOGFILE TO dbzuser; GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;

暫時可以不用,官網(wǎng)有做要求,暫時沒明白有什么用

CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS; GRANT CONNECT TO debezium; GRANT CREATE SESSION TO debezium; GRANT CREATE TABLE TO debezium; GRANT CREATE SEQUENCE to debezium; ALTER USER debezium QUOTA 100M on users;

kafka查看topic和消息內(nèi)容命令

1、查詢topic,進入kafka目錄:

bin/kafka-topics.sh --list --zookeeper localhost:2181

2、查詢topic內(nèi)容:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning

總結

以上是生活随笔為你收集整理的基于debezium实时数据同步(Oracle为主)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。