OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步(附视频)
《OpenShift / RHEL / DevSecOps 匯總目錄》
說(shuō)明:本文已經(jīng)在OpenShift 4.10環(huán)境中驗(yàn)證
文章目錄
- 場(chǎng)景說(shuō)明
- 部署環(huán)境
- 安裝CDC源和目標(biāo)數(shù)據(jù)庫(kù)
- 安裝 MySQL
- 安裝 PostgreSQLSQL
- 安裝 AMQ Stream 環(huán)境
- 安裝 AMQ Stream Opeartor
- 創(chuàng)建 Kafka 實(shí)例
- 創(chuàng)建 KafkaConnect 用到的 Image
- 配置 KafkaConnect
- 配置 KafkaConnector
- MySqlConnector
- JdbcSinkConnector
- 環(huán)境檢查
- CDC 驗(yàn)證
- 數(shù)據(jù)同步
- 添加數(shù)據(jù)
- 更新數(shù)據(jù)
- 刪除數(shù)據(jù)
- 演示視頻
- 參考
場(chǎng)景說(shuō)明
本文使用 OpenShift 的 AMQ Steams(即企業(yè)版 Kafka)和 Redhat 主導(dǎo)的 CDC 開(kāi)源項(xiàng)目 Debezium 來(lái)實(shí)現(xiàn)從 MySQL 到 PostgreSQL 數(shù)據(jù)庫(kù)的數(shù)據(jù)同步。
上圖中的 Kafka Connector 提供了訪(fǎng)問(wèn)源或目標(biāo)的參數(shù), 而 Kafka Connect 為訪(fǎng)問(wèn)源或目標(biāo)的實(shí)際運(yùn)行環(huán)境,該環(huán)境運(yùn)行在相關(guān)容器中。
注意:本文操作需要用到 access.redhat.com 賬號(hào),另外還需有一個(gè)鏡像 Registry 服務(wù)的賬號(hào),本文使用的是 quay.io Registry 服務(wù)。
部署環(huán)境
首先創(chuàng)建一個(gè)項(xiàng)目
$ oc project db-cdc安裝CDC源和目標(biāo)數(shù)據(jù)庫(kù)
安裝 MySQL
安裝 PostgreSQLSQL
安裝 AMQ Stream 環(huán)境
安裝 AMQ Stream Opeartor
在 OpenShift 中使用默認(rèn)配置安裝 AMQ Stream Opeartor,步驟略。
創(chuàng)建 Kafka 實(shí)例
在安裝好的 AMQ Stream Opeartor 中根據(jù)以下配置創(chuàng)建 kafka 服務(wù)。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata:name: my-cluster spec:kafka:config:offsets.topic.replication.factor: 3transaction.state.log.replication.factor: 3transaction.state.log.min.isr: 2default.replication.factor: 3min.insync.replicas: 2inter.broker.protocol.version: '3.1'storage:type: ephemerallisteners:- name: plainport: 9092type: internaltls: false- name: tlsport: 9093type: internaltls: trueversion: 3.1.0replicas: 3entityOperator:topicOperator: {}userOperator: {}zookeeper:storage:type: ephemeralreplicas: 3創(chuàng)建后會(huì)在 OpenShift 中看到部署的相關(guān)資源。
創(chuàng)建 KafkaConnect 用到的 Image
配置 KafkaConnect
在安裝好的 AMQ Stream Opeartor 中根據(jù)以下配置創(chuàng)建 KafkaConnect 對(duì)象,其中使用了前面生成的 “quay.io/dawnskyliu/connect-debezium:v1” 鏡像。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata:name: my-connect-clusterannotations:strimzi.io/use-connector-resources: "true" spec:version: 3.1.0replicas: 1image: 'quay.io/dawnskyliu/connect-debezium:v1'bootstrapServers: 'my-cluster-kafka-bootstrap:9093'tls:trustedCertificates:- secretName: my-cluster-cluster-ca-certcertificate: ca.crtconfig:group.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-statusconfig.storage.replication.factor: 1offset.storage.replication.factor: 1status.storage.replication.factor: 1config.storage.min.insync.replicas: 1offset.storage.min.insync.replicas: 1status.storage.min.insync.replicas: 1配置 KafkaConnector
MySqlConnector
在安裝好的 AMQ Stream Opeartor 中根據(jù)以下配置創(chuàng)建 KafkaConnector 對(duì)象。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata:name: mysql-source-connectorlabels:strimzi.io/cluster: my-connect-cluster spec:class: io.debezium.connector.mysql.MySqlConnectortasksMax: 1config:"database.hostname": "mysql""database.ssl.mode": "disabled""database.allowPublicKeyRetrieval": "true""database.port": "3306""database.user": "debezium""database.password": "dbz""database.server.id": "1""database.server.name": "dbserver1""database.include": "inventory""database.history.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092""database.history.kafka.topic": "schema-changes.inventory""transforms": "route""transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter""transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)""transforms.route.replacement": "$3"JdbcSinkConnector
在安裝好的 AMQ Stream Opeartor 中根據(jù)以下配置創(chuàng)建 KafkaConnector 對(duì)象。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata:name: postgresql-sink-connectorlabels:strimzi.io/cluster: my-connect-cluster spec:class: io.confluent.connect.jdbc.JdbcSinkConnectortasksMax: 1config:"topics": "customers""connection.url": "jdbc:postgresql://postgresql:5432/inventory?user=postgresuser&password=postgrespw""transforms": "unwrap""transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState""transforms.unwrap.drop.tombstones": "false""auto.create": "true""insert.mode": "upsert""delete.enabled": "true""pk.fields": "id""pk.mode": "record_key"環(huán)境檢查
在 AMQ Streams 的 Operator 中確認(rèn) Kafka,Kafka Connect 和 Kafka Connector 的運(yùn)行狀態(tài)。
CDC 驗(yàn)證
數(shù)據(jù)同步
確認(rèn) customers 表和數(shù)據(jù)已經(jīng)從 MySQL 同步到 PostgreSQL 中。
$ POSTGRESQL_POD=$(oc get pod -l name=postgresql -o jsonpath={.items[0].metadata.name}) $ oc exec $POSTGRESQL_POD -it -- psql -U postgresuser inventory inventory=> select * from customers;last_name | id | first_name | email -----------+------+------------+-----------------------Thomas | 1001 | Sally | sally.thomas@acme.comBailey | 1002 | George | gbailey@foobar.comWalker | 1003 | Edward | ed@walker.comKretchmar | 1004 | Anne | annek@noanswer.org (4 rows)添加數(shù)據(jù)
在 MySQL 中執(zhí)行命令添加新數(shù)據(jù),然后在 PostgreSQL 確認(rèn)變化數(shù)據(jù)已同步。
mysql> INSERT INTO customers VALUES (default,"test1","test1","test1@acme.com");更新數(shù)據(jù)
在 MySQL 中執(zhí)行命令更新數(shù)據(jù),然后在 PostgreSQL 確認(rèn)變化數(shù)據(jù)已同步。
mysql> update customers set first_name='Test' where id = 1001;刪除數(shù)據(jù)
在 MySQL 中執(zhí)行命令刪除數(shù)據(jù),然后在 PostgreSQL 確認(rèn)變化數(shù)據(jù)已同步。
mysql> delete from customers where first_name='test1';演示視頻
視頻
參考
https://github.com/liuxiaoyu-git/debezium_openshift
https://debezium.io/documentation/reference/1.9/operations/openshift.html
https://github.com/debezium/debezium-examples/tree/main/openshift
https://aws.amazon.com/cn/blogs/china/debezium-deep-dive/
總結(jié)
以上是生活随笔為你收集整理的OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步(附视频)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Plot 绘制点图
- 下一篇: centos7安装es mysql_ce