日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

4 Debezium抽取部署

發布時間:2024/3/13 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 4 Debezium抽取部署 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文目標

debezium,簡稱dbz,偽裝為MySQL從庫,當主庫發生變化后,主庫會主動將變化的信息同步到dbz內,dbz將收到的信息轉為JSON推送到Kafka內。

安裝JDK11

yum -y install java-11-openjdk-devel

解壓部署

tar xfz debezium-server-dist-2.0.0.Final.tar.gz

修改配置文件

application.properties

[yinyx@localhost conf]$ cat application.properties quarkus.http.port=8999 rkus.log.level=INFO quarkus.log.console.json=falsedebezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0debezium.source.database.hostname=127.0.0.1 debezium.source.database.port=6306 debezium.source.database.user=test debezium.source.database.password=test debezium.source.database.server.id=2 debezium.source.database.include.list=testdebezium.source.topic.prefix=yyx debezium.source.key.converter.schemas.enable=false debezium.source.value.converter.schemas.enable=false debezium.source.schema.history.internal.kafka.bootstrap.servers=127.0.0.1:9092 debezium.source.schema.history.internal.kafka.topic=schemahistorydebezium.source.decimal.handling.mode=string debezium.source.lob.enabled=true debezium.source.database.history.skip.unparseable.ddl=true debezium.source.tombstones.on.delete=falsedebezium.sink.type=kafka debezium.sink.kafka.producer.bootstrap.servers=127.0.0.1:9092 debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializerdebezium.format.key.schemas.enable=false debezium.format.value.schemas.enable=false[yinyx@localhost conf]$

啟動

./run.sh

注意先啟動kafka

測試

檢查topic是否已經創建
[yinyx@localhost bin]$ ./kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092 __consumer_offsets schemahistory yinyx yyx yyx.test.t1
啟動kafka的消費

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yyx.test.t1

到MySQL更新t1表的數據

insert update delete 隨便整

查看kafka消費,應出現類似如下信息

{“before”:{“f1”:3,“f2”:“cc|33”,“f3”:1670056305000},“after”:{“f1”:3,“f2”:“cc|333”,“f3”:1670056305000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670030109000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:25”,“file”:“mysql-bin.000002”,“pos”:7907,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525419,“transaction”:null}

{“before”:{“f1”:2,“f2”:“bb|222”,“f3”:1670002422000},“after”:{“f1”:2,“f2”:“bb|222haha”,“f3”:1670002422000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670031487000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:27”,“file”:“mysql-bin.000002”,“pos”:8561,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525422,“transaction”:null}

總結

至此,MySQL的變化,會實時反應到Kafka的JSON數據里面,后續自己開發程序從Kafka接收處理即可。

總結

以上是生活随笔為你收集整理的4 Debezium抽取部署的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。