oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战
引言:
《異構數據源的CDC實時同步系統》 系列第一篇 (已完成)
《零編碼打造異構數據實時同步系統——異構數據源CDC之2》 系列第二篇(已完成)
《零編碼打造異構數據實時同步系統——異構數據源CDC之3》 系列第三篇(已完成)
《異構數據源的CDC實時同步系統——最終選型實戰》 系列第四篇(已完成)
7.debezium
debezium是由redhat支持的開源分布式CDC系統,支持多端數據源,如mysql、mongodb、postgresql、oracle、sql server和Cassandra,社區非常活躍,很多的新功能和新數據源都在快速發展中,源碼地址:https://github.com/debezium/debezium
我們使用debezium主要是看中它支持多數據源,同時與kafka的整合,在CDC領域不能忽略的一個商用產品是kafka conflent,在它的產品中,連接源端的組件就是debezium,我們一度就想使用這個商用
組件,但是試用版本僅支持一個broker,無法在真正的的生產環境使用,它的優勢在于配置的可視化,后來我們使用kafka eagle來進行kafka的管理后,才徹底下定決心自己使用開源版本搞一套。我們最終采用的整體方案是debezium+kafka+kafka-connect-jdbc,管理端使用的kafka eagle.
關于confluent的資料,網上很多,我們在實際配置的過程中也參考了很多它的建議。
注意事項:
1)debezium需要設置的mysql權限:GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
2)采用阿里云的rds的mysql數據源非常坑,默認是不開啟SHOW DATABASES權限的,需要在debezium中單獨配置屬性database.history.store.only.monitored.tables.ddl:true
3)debezium配合kafaka啟動使用properties方式,也就是說第一個源需要配置為文本模式,后續可采用動態增加源的方式動態增加,但是文件模式需要為json
8.kafka-connect-jdbc
開源地址:https://github.com/confluentinc/kafka-connect-jdbc,它是confluent開源的兼容jdbc的數據庫同步數據的kafka connect連接器。
這個組件支持的目的端的源非常多,理論上有java客戶端的基本都支持,所以基本上可以涵蓋你能用到的絕大多數數據源,它的延遲非常好,比之前的bireme好太多了,畢竟是國外大廠支持的組件,是國內小公司開源組件所不能比擬的。
9.最終選型方案
上圖為我們最終確定的方案,在實際生產中,除了直接DB層級的數據實時同步外,我們還有一套pulsar的比較靈活的數據接口方案,不在此次討論范圍之內,也就是說我們最終實現了基于DB和業務層級的實時數據同步方案。
業界其他公司的CDC方案:
=======.實際生產配置過程:==========
1.kafka安裝配置,以standalone為例
需要單獨說明的是:因為gpdb6目前還不支持upsert模式,debezium的新增和更新均會導致一條新增加的完整數據到kafka,默認kafka按批提交的模式會造成gpdb6的主鍵沖突,需要修改模式為逐條應用,同時配合自己單獨寫的check程序進行offset錯誤的自動修正
#1)安裝kafka,注意2.30有個bugtar -zxvf kafka_2.12-2.4.0.tgzcd kafka_2.12-2.4.0Vim config/server.properties #單機版只需要消息存放路徑即可log.dirs=/opt/kafka_2.12-2.4.0/kafka-logs#增加可以刪除topicdelete.topic.enable=true#保留日志大小:1GB,不設置的話會日志撐爆log.retention.bytes=1073741824mkdir -p /opt/kafka_2.12-2.4.0/kafka-logs#修改kafka的connect-standalone.properties設置為逐條應用consumer.max.poll.records=1#2)修改內置zk的配置vim config/zookeeper.properties#制定zk元數據存放路徑dataDir=/opt/kafka_2.12-2.4.0/zdatamkdir -p /opt/kafka_2.12-2.4.0/zdata#3)啟動服務,先啟動zk再啟動kafkacd /opt/kafka_2.12-2.4.0/nohup bin/zookeeper-server-start.sh config/zookeeper.properties &nohup bin/kafka-server-start.sh config/server.properties —加守護進程啟動bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties2.kafka基本命令
#4)查看服務是夠啟動 jps#5)創建一個測試用的topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查詢topic列表:bin/kafka-topics.sh --list --zookeeper localhost:2181#查看topic信息:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test #刪除topic(只會刪除元數據):配置上面的delete.topic.enable=true后可生效bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test#手動刪除文件:bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test./kafka-topics.sh --zookeeper 192.168.6.42:2181 --describe --topic itslawnode1./kafka-consumer-groups.sh --describe --group test-consumer-group --zookeeper localhost:2181 #查看offset信息bin/kafka-consumer-groups.sh --bootstrap-server 192.168.6.42:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 0#查看和刪除群組:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group connect-sink-judge-up#從開始的消費信息: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test#6)創建控制臺生產者生產數據bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#7)新開一個進程創建消費者數據bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3.debezium安裝配置
#下載debezium-connector-mysql,將文件中的jar包copy到kafka的libs目錄cd /opt/kafka_2.12-2.4.0/tableconfig #tableconfig是新建目錄,存放配置文件#######第一個啟動的properties文件格式############name=authorization-mysql-connector-new-01connector.class=io.debezium.connector.mysql.MySqlConnectordatabase.hostname=mysql源IPdatabase.port=3306database.user=賬號database.password=密碼database.server.id=1database.server.name=debeziumdatabase.whitelist=platform_authorizationdatabase.serverTimezone=UTCtable.whitelist=platform_authorization.lawyer_authorization,platform_authorization.lawyer_authorization_recorddatabase.history.kafka.bootstrap.servers=localhost:9092database.history.kafka.topic=auth.platform_authorizationinclude.schema.changes=false#使用table名作為topic名字,因為machine.db.table默認topictransforms=routetransforms.route.type=org.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex=([^.]+).([^.]+).([^.]+)transforms.route.replacement=$3#不進行初始化,只獲取當前的schema,初始化采用rds_dbsync比較方便,實際測試比init方式快幾十倍,因為此處是逐條應用的snapshot.mode=schema_only##########json格式的文件#########{"name":"hanukkah-mysql-connector","config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"mysql主機名","database.port":"3306","database.user":"用戶名","database.password":"密碼","database.server.id":"1","database.server.name":"debezium","database.whitelist":"hanukkah","database.serverTimezone":"UTC","table.whitelist":"hanukkah.cooperation_lawyer","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"mysql1.hanukkah","include.schema.changes":"false","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+).([^.]+).([^.]+)","transforms.route.replacement":"$3","snapshot.mode":"schema_only"}}4.sink配置
#首先下載kafka-connect-jdbc-5.3.1.jar并防止到kafka的libs目錄即可{ "name": "sink-cooperation_lawyer-ins", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "cooperation_lawyer", "connection.url": "jdbc:postgresql://目的IP:5432/目的DB?user=用戶&password=密碼&stringtype=unspecified¤tSchema=當前schema名", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "insert.mode": "insert", "delete.enabled": "true","table.name.format": "platform.cooperation_lawyer", "pk.fields": "id", "pk.mode": "record_key" }}需要額外說明的是:在目的是greenplum數倉環境下:
1)如果mysql源端字段類型是timestamp,則需要在gpdb端配置字段類型為timestamptz后無需額外配置sink項
2)如果mysql源端字段類型是datetime,則目的端字段類型需要配置為timestamp,同時需要sink文件中增補TimestampConverter配置項,有幾個datetime字段配置幾個配置項
3)如果mysql源端datetime配置了精度,需要debezium配置增加time.precision.mode=connect4) "auto.evolve": "true" 則源端表結構變更后會自動在目的端創建對應數據結構 "auto.create": "true" 則源端新增表后會自動同步到目的端
5.啟動服務
#啟動kafka,進程多了Kafka和QuorumPeerMainbin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties#啟動第一個sourcebin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>connector-logs/connector.log 2>&1 增補其他sourcecurl -X POST -H "Content-Type:application/json" -d @tableconfig/authorization-source.json http://localhost:8083/connectors/#啟動sinkcurl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties?...#查看所有的connectors: curl -X GET http://127.0.0.1:8083/connectors6.使用eagle進行topic狀態查看和管理
關于kafka eagle的下載安裝,特別簡單,就不單獨說明了,這里僅貼出效果圖
從上圖非常清楚的能看到哪些topic是有問題的(紅色),絕大多數問題在于offset的錯誤導致的,在實際使用中我們通過一個簡單python守護進程的代碼進行了管理
import requestsimport loggingimport psycopg2import jsonimport reimport time# 獲取數倉連接def get_gp_conn(): conn = psycopg2.connect(host="192.168.2.175", port=5432, user="name", password="password",dbname='datawarehouse') return conn'''刪除目的端的主鍵ID'''def del_dup_id(tablefullname,dup_id): db = get_gp_conn() cursor = db.cursor() sql = "delete from "+ tablefullname +" where id='" + dup_id+"'" cursor.execute(sql) db.commit() cursor.close() db.close()'''重啟sink'''def restart_sink(sinkname,configname): '''delurl = 'http://127.0.0.1:8083/connectors/'+ sinkname del_res = requests.delete(delurl) print("del resp:",del_res) url = 'http://127.0.0.1:8083/connectors/' headers = 'Content-Type:application/json,Accept:application/json' datas = 'tableconfig/' + configname start_res = requests.post(url,data=datas,headers=headers) print("start resp:",start_res) #checkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status' ''' url = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/restart' requests.post(url)'''檢測任務狀態'''def check_sink_status(sinkname,tablefullname,configname): sinkurl = 'http://127.0.0.1:8083/connectors/'+ sinkname +'/tasks/0/status' print(sinkurl) resp = requests.get(sinkurl) status = json.loads(resp.text) state = status['state'] if state == 'FAILED': trace = status['trace'] pattern = re.compile(r'Key (id)=((.+)) already exists') search = re.search(pattern, trace) #print(search) if search: del_id = search.group(1) print('duplicate key is {}, now to del this record of target database'.format(del_id)) del_dup_id(tablefullname,del_id) restart_sink(sinkname,configname)'''獲取任務列表'''def get_sink_list(): conn = get_gp_conn() cur = conn.cursor() cur.execute("select taskname,tableschema,tablename,configname from platform.tasklist where tablename is not null") print("current time is:",time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))) rows = cur.fetchall() for row in rows: taskname = row[0] schema = row[1] tablename =row[2] configname = row[3] tablefullname = schema +'.'+tablename check_sink_status(taskname,tablefullname,configname) cur.close() conn.close()if __name__ == '__main__': get_sink_list()同時為了避免standalone進程的異常終止,我們用shell的守護進行進行了監控
#! /bin/bashfunction check(){ count=`ps -ef |grep $1 |grep -v "grep" |wc -l` #echo $count if [ 0 == $count ];then time=$(date "+%Y-%m-%d %H:%M:%S") echo "standalone restart at:${time}" cd /opt/kafka_2.12-2.4.0/ bin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1>>connector-logs/connector.log 2>&1 & sleep 60scurl -X POST -H "Content-Type:application/json" -d @tableconfig/platform-source.json http://localhost:8083/connectors/ ...... sleep 10s curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @tableconfig/paod-base-ins.properties ?..... fi } check ConnectStandalone另外,在實際運行過程中會出現offset錯誤的情況,極其特殊情況下使用上面的方法無法快速解決問題,建議使用kafkacat查看詳細信息,人為跳過offset,具體細節不再贅述。
如喜歡此專題,請關注并提問,技術人驅動自身的是永不停歇的渴望。
總結
以上是生活随笔為你收集整理的oracle和mysql数据实时同步_异构数据源的CDC实时同步系统——最终选型实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux unix域socket_So
- 下一篇: mysql or优化_MySQL 语句优