日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战

發(fā)布時(shí)間:2023/12/2 数据库 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

引言:

《異構(gòu)數(shù)據(jù)源的CDC實(shí)時(shí)同步系統(tǒng)》 系列第一篇 (已完成)

《零編碼打造異構(gòu)數(shù)據(jù)實(shí)時(shí)同步系統(tǒng)——異構(gòu)數(shù)據(jù)源CDC之2》 系列第二篇(已完成)

《零編碼打造異構(gòu)數(shù)據(jù)實(shí)時(shí)同步系統(tǒng)——異構(gòu)數(shù)據(jù)源CDC之3》 系列第三篇(已完成)

《異構(gòu)數(shù)據(jù)源的CDC實(shí)時(shí)同步系統(tǒng)——最終選型實(shí)戰(zhàn)》 系列第四篇(已完成)

7.debezium

debezium是由redhat支持的開源分布式CDC系統(tǒng),支持多端數(shù)據(jù)源,如mysql、mongodb、postgresql、oracle、sql server和Cassandra,社區(qū)非常活躍,很多的新功能和新數(shù)據(jù)源都在快速發(fā)展中,源碼地址:https://github.com/debezium/debezium

我們使用debezium主要是看中它支持多數(shù)據(jù)源,同時(shí)與kafka的整合,在CDC領(lǐng)域不能忽略的一個(gè)商用產(chǎn)品是kafka conflent,在它的產(chǎn)品中,連接源端的組件就是debezium,我們一度就想使用這個(gè)商用

組件,但是試用版本僅支持一個(gè)broker,無法在真正的的生產(chǎn)環(huán)境使用,它的優(yōu)勢(shì)在于配置的可視化,后來我們使用kafka eagle來進(jìn)行kafka的管理后,才徹底下定決心自己使用開源版本搞一套。我們最終采用的整體方案是debezium+kafka+kafka-connect-jdbc,管理端使用的kafka eagle.

關(guān)于confluent的資料,網(wǎng)上很多,我們?cè)趯?shí)際配置的過程中也參考了很多它的建議。

注意事項(xiàng):

1)debezium需要設(shè)置的mysql權(quán)限:GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
2)采用阿里云的rds的mysql數(shù)據(jù)源非常坑,默認(rèn)是不開啟SHOW DATABASES權(quán)限的,需要在debezium中單獨(dú)配置屬性database.history.store.only.monitored.tables.ddl:true
3)debezium配合kafaka啟動(dòng)使用properties方式,也就是說第一個(gè)源需要配置為文本模式,后續(xù)可采用動(dòng)態(tài)增加源的方式動(dòng)態(tài)增加,但是文件模式需要為json

8.kafka-connect-jdbc

開源地址:https://github.com/confluentinc/kafka-connect-jdbc,它是confluent開源的兼容jdbc的數(shù)據(jù)庫(kù)同步數(shù)據(jù)的kafka connect連接器。

這個(gè)組件支持的目的端的源非常多,理論上有java客戶端的基本都支持,所以基本上可以涵蓋你能用到的絕大多數(shù)數(shù)據(jù)源,它的延遲非常好,比之前的bireme好太多了,畢竟是國(guó)外大廠支持的組件,是國(guó)內(nèi)小公司開源組件所不能比擬的。

9.最終選型方案

上圖為我們最終確定的方案,在實(shí)際生產(chǎn)中,除了直接DB層級(jí)的數(shù)據(jù)實(shí)時(shí)同步外,我們還有一套pulsar的比較靈活的數(shù)據(jù)接口方案,不在此次討論范圍之內(nèi),也就是說我們最終實(shí)現(xiàn)了基于DB和業(yè)務(wù)層級(jí)的實(shí)時(shí)數(shù)據(jù)同步方案。

業(yè)界其他公司的CDC方案:

=======.實(shí)際生產(chǎn)配置過程:==========

1.kafka安裝配置,以standalone為例

需要單獨(dú)說明的是:因?yàn)間pdb6目前還不支持upsert模式,debezium的新增和更新均會(huì)導(dǎo)致一條新增加的完整數(shù)據(jù)到kafka,默認(rèn)kafka按批提交的模式會(huì)造成gpdb6的主鍵沖突,需要修改模式為逐條應(yīng)用,同時(shí)配合自己?jiǎn)为?dú)寫的check程序進(jìn)行offset錯(cuò)誤的自動(dòng)修正

#1)安裝kafka,注意2.30有個(gè)bugtar -zxvf kafka_2.12-2.4.0.tgzcd kafka_2.12-2.4.0Vim config/server.properties #單機(jī)版只需要消息存放路徑即可log.dirs=/opt/kafka_2.12-2.4.0/kafka-logs#增加可以刪除topicdelete.topic.enable=true#保留日志大小:1GB,不設(shè)置的話會(huì)日志撐爆log.retention.bytes=1073741824mkdir -p /opt/kafka_2.12-2.4.0/kafka-logs#修改kafka的connect-standalone.properties設(shè)置為逐條應(yīng)用consumer.max.poll.records=1#2)修改內(nèi)置zk的配置vim config/zookeeper.properties#制定zk元數(shù)據(jù)存放路徑dataDir=/opt/kafka_2.12-2.4.0/zdatamkdir -p /opt/kafka_2.12-2.4.0/zdata#3)啟動(dòng)服務(wù),先啟動(dòng)zk再啟動(dòng)kafkacd /opt/kafka_2.12-2.4.0/nohup bin/zookeeper-server-start.sh config/zookeeper.properties &nohup bin/kafka-server-start.sh config/server.properties —加守護(hù)進(jìn)程啟動(dòng)bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties

2.kafka基本命令

#4)查看服務(wù)是夠啟動(dòng) jps#5)創(chuàng)建一個(gè)測(cè)試用的topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查詢topic列表:bin/kafka-topics.sh --list --zookeeper localhost:2181#查看topic信息:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test #刪除topic(只會(huì)刪除元數(shù)據(jù)):配置上面的delete.topic.enable=true后可生效bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test#手動(dòng)刪除文件:bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test./kafka-topics.sh --zookeeper 192.168.6.42:2181 --describe --topic itslawnode1./kafka-consumer-groups.sh --describe --group test-consumer-group --zookeeper localhost:2181 #查看offset信息bin/kafka-consumer-groups.sh --bootstrap-server 192.168.6.42:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 0#查看和刪除群組:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group connect-sink-judge-up#從開始的消費(fèi)信息: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test#6)創(chuàng)建控制臺(tái)生產(chǎn)者生產(chǎn)數(shù)據(jù)bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#7)新開一個(gè)進(jìn)程創(chuàng)建消費(fèi)者數(shù)據(jù)bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

3.debezium安裝配置

#下載debezium-connector-mysql,將文件中的jar包c(diǎn)opy到kafka的libs目錄cd /opt/kafka_2.12-2.4.0/tableconfig #tableconfig是新建目錄,存放配置文件#######第一個(gè)啟動(dòng)的properties文件格式############name=authorization-mysql-connector-new-01connector.class=io.debezium.connector.mysql.MySqlConnectordatabase.hostname=mysql源IPdatabase.port=3306database.user=賬號(hào)database.password=密碼database.server.id=1database.server.name=debeziumdatabase.whitelist=platform_authorizationdatabase.serverTimezone=UTCtable.whitelist=platform_authorization.lawyer_authorization,platform_authorization.lawyer_authorization_recorddatabase.history.kafka.bootstrap.servers=localhost:9092database.history.kafka.topic=auth.platform_authorizationinclude.schema.changes=false#使用table名作為topic名字,因?yàn)閙achine.db.table默認(rèn)topictransforms=routetransforms.route.type=org.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex=([^.]+).([^.]+).([^.]+)transforms.route.replacement=$3#不進(jìn)行初始化,只獲取當(dāng)前的schema,初始化采用rds_dbsync比較方便,實(shí)際測(cè)試比init方式快幾十倍,因?yàn)榇颂幨侵饤l應(yīng)用的snapshot.mode=schema_only##########json格式的文件#########{"name":"hanukkah-mysql-connector","config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"mysql主機(jī)名","database.port":"3306","database.user":"用戶名","database.password":"密碼","database.server.id":"1","database.server.name":"debezium","database.whitelist":"hanukkah","database.serverTimezone":"UTC","table.whitelist":"hanukkah.cooperation_lawyer","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"mysql1.hanukkah","include.schema.changes":"false","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+).([^.]+).([^.]+)","transforms.route.replacement":"$3","snapshot.mode":"schema_only"}}

4.sink配置

#首先下載kafka-connect-jdbc-5.3.1.jar并防止到kafka的libs目錄即可{ "name": "sink-cooperation_lawyer-ins", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "cooperation_lawyer", "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用戶&password=密碼&stringtype=unspecified¤tSchema=當(dāng)前schema名", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "insert.mode": "insert", "delete.enabled": "true","table.name.format": "platform.cooperation_lawyer", "pk.fields": "id", "pk.mode": "record_key" }}

需要額外說明的是:在目的是greenplum數(shù)倉(cāng)環(huán)境下:
1)如果mysql源端字段類型是timestamp,則需要在gpdb端配置字段類型為timestamptz后無需額外配置sink項(xiàng)
2)如果mysql源端字段類型是datetime,則目的端字段類型需要配置為timestamp,同時(shí)需要sink文件中增補(bǔ)TimestampConverter配置項(xiàng),有幾個(gè)datetime字段配置幾個(gè)配置項(xiàng)
3)如果mysql源端datetime配置了精度,需要debezium配置增加time.precision.mode=connect
4) "auto.evolve": "true" 則源端表結(jié)構(gòu)變更后會(huì)自動(dòng)在目的端創(chuàng)建對(duì)應(yīng)數(shù)據(jù)結(jié)構(gòu) "auto.create": "true" 則源端新增表后會(huì)自動(dòng)同步到目的端

{ "name": "sink-pa_course-ins", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "pa_course", "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用戶&password=密碼&stringtype=unspecified¤tSchema=當(dāng)前schema名", "transforms": "unwrap,TimestampConverter", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.unwrap.drop.tombstones": "false", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss", "transforms.TimestampConverter.field": "create_time", "transforms.TimestampConverter.target.type": "string", "auto.create": "true", "auto.evolve": "true", "insert.mode": "insert", "delete.enabled": "true", "pk.fields": "id", "pk.mode": "record_key" }}

5.啟動(dòng)服務(wù)

#啟動(dòng)kafka,進(jìn)程多了Kafka和QuorumPeerMainbin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties#啟動(dòng)第一個(gè)sourcebin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>connector-logs/connector.log 2>&1 增補(bǔ)其他sourcecurl -X POST -H "Content-Type:application/json" -d @tableconfig/authorization-source.json http://localhost:8083/connectors/#啟動(dòng)sinkcurl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties?...#查看所有的connectors: curl -X GET http://127.0.0.1:8083/connectors

6.使用eagle進(jìn)行topic狀態(tài)查看和管理

關(guān)于kafka eagle的下載安裝,特別簡(jiǎn)單,就不單獨(dú)說明了,這里僅貼出效果圖

從上圖非常清楚的能看到哪些topic是有問題的(紅色),絕大多數(shù)問題在于offset的錯(cuò)誤導(dǎo)致的,在實(shí)際使用中我們通過一個(gè)簡(jiǎn)單python守護(hù)進(jìn)程的代碼進(jìn)行了管理

import requestsimport loggingimport psycopg2import jsonimport reimport time# 獲取數(shù)倉(cāng)連接def get_gp_conn(): conn = psycopg2.connect(host="192.168.2.175", port=5432, user="name", password="password",dbname='datawarehouse') return conn'''刪除目的端的主鍵ID'''def del_dup_id(tablefullname,dup_id): db = get_gp_conn() cursor = db.cursor() sql = "delete from "+ tablefullname +" where id='" + dup_id+"'" cursor.execute(sql) db.commit() cursor.close() db.close()'''重啟sink'''def restart_sink(sinkname,configname): '''delurl = 'http://127.0.0.1:8083/connectors/'+ sinkname del_res = requests.delete(delurl) print("del resp:",del_res) url = 'http://127.0.0.1:8083/connectors/' headers = 'Content-Type:application/json,Accept:application/json' datas = 'tableconfig/' + configname start_res = requests.post(url,data=datas,headers=headers) print("start resp:",start_res) #checkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status' ''' url = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/restart' requests.post(url)'''檢測(cè)任務(wù)狀態(tài)'''def check_sink_status(sinkname,tablefullname,configname): sinkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status' print(sinkurl) resp = requests.get(sinkurl) status = json.loads(resp.text) state = status['state'] if state == 'FAILED': trace = status['trace'] pattern = re.compile(r'Key (id)=((.+)) already exists') search = re.search(pattern, trace) #print(search) if search: del_id = search.group(1) print('duplicate key is {}, now to del this record of target database'.format(del_id)) del_dup_id(tablefullname,del_id) restart_sink(sinkname,configname)'''獲取任務(wù)列表'''def get_sink_list(): conn = get_gp_conn() cur = conn.cursor() cur.execute("select taskname,tableschema,tablename,configname from platform.tasklist where tablename is not null") print("current time is:",time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))) rows = cur.fetchall() for row in rows: taskname = row[0] schema = row[1] tablename =row[2] configname = row[3] tablefullname = schema +'.'+tablename check_sink_status(taskname,tablefullname,configname) cur.close() conn.close()if __name__ == '__main__': get_sink_list()

同時(shí)為了避免standalone進(jìn)程的異常終止,我們用shell的守護(hù)進(jìn)行進(jìn)行了監(jiān)控

#! /bin/bashfunction check(){ count=`ps -ef |grep $1 |grep -v "grep" |wc -l` #echo $count if [ 0 == $count ];then time=$(date "+%Y-%m-%d %H:%M:%S") echo "standalone restart at:${time}" cd /opt/kafka_2.12-2.4.0/ bin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>>connector-logs/connector.log 2>&1 & sleep 60scurl -X POST -H "Content-Type:application/json" -d @tableconfig/platform-source.json http://localhost:8083/connectors/ ...... sleep 10s curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties ?..... fi } check ConnectStandalone

另外,在實(shí)際運(yùn)行過程中會(huì)出現(xiàn)offset錯(cuò)誤的情況,極其特殊情況下使用上面的方法無法快速解決問題,建議使用kafkacat查看詳細(xì)信息,人為跳過offset,具體細(xì)節(jié)不再贅述。

如喜歡此專題,請(qǐng)關(guān)注并提問,技術(shù)人驅(qū)動(dòng)自身的是永不停歇的渴望。

總結(jié)

以上是生活随笔為你收集整理的oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。