日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Kafka实现MySQL增量同步

發布時間:2023/12/31 数据库 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka实现MySQL增量同步 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目標

本文是對[1]的復現和整理

?

環境

組件版本
Zookeeper3.6.0
Kafka2.5.0
Mysql8.0.21-0ubuntu0.20.04.4

?

準備工作

分別新建兩個數據庫A和B,然后各自新建一個表格

mysql> create database A;
Query OK, 1 row affected (0.12 sec)

mysql> create database B;
Query OK, 1 row affected (0.08 sec)

mysql> use A;
Database changed
mysql> CREATE TABLE `person` (
? ? -> ? `pid` int(11) NOT NULL AUTO_INCREMENT,
? ? -> ? `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
? ? -> ? `age` int(11) DEFAULT NULL,
? ? -> ? PRIMARY KEY (`pid`)
? ? -> ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Query OK, 0 rows affected, 3 warnings (0.82 sec)

mysql> use B;
Database changed
mysql> CREATE TABLE `kafkaperson` (
? ? -> ? `pid` int(11) NOT NULL AUTO_INCREMENT,
? ? -> ? `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
? ? -> ? `age` int(11) DEFAULT NULL,
? ? -> ? PRIMARY KEY (`pid`)
? ? -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
Query OK, 0 rows affected, 5 warnings (0.49 sec)
?

集群啟動

啟動Hadoop,Zookeeper與Kafka

測試

?

①生產者:

$KAFKA/bin/kafka-topics.sh --create --zookeeper Desktop:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-person


②消費者

$KAFKA/bin/connect-standalone.sh $KAFKA/config/connect-standalone.properties $KAFKA/config/quickstart-mysql.properties $KAFKA/config/quickstart-mysql-sink.properties

?

③往A表插入條數據

mysql> INSERT INTO person (pid,firstname,age) VALUES ( 1, 'zs',66);
Query OK, 1 row affected (0.07 sec)

mysql> select * from person;
+-----+-----------+------+
| pid | firstname | age ?|
+-----+-----------+------+
| ? 1 | zs ? ? ? ?| ? 66 |
+-----+-----------+------+
1 row in set (0.00 sec)

?

④mysql> use B;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+-------------+
| Tables_in_B |
+-------------+
| kafkaperson |
+-------------+
1 row in set (0.00 sec)

mysql> select * from kafkaperson
? ? -> ;
+-----+-----------+------+
| pid | firstname | age ?|
+-----+-----------+------+
| ? 1 | zs ? ? ? ?| ? 66 |
+-----+-----------+------+
1 row in set (0.00 sec)
?

可以看到mysql 表A的數據通過kafka順利傳達到了表B,而在我們的kafka終端也會看到相關信息:

?

附錄

?

quickstart-mysql.properties

name=mysql-a-source-person connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://Desktop:3306/A?user=appleyuchi&password=appleyuchi # incrementing 自增 mode=incrementing # 自增字段 pid incrementing.column.name=pid # 白名單表 person table.whitelist=person # topic前綴 mysql-kafka- topic.prefix=mysql-kafka-

?

quickstart-mysql-sink.properties

name=mysql-a-sink-person connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 #kafka的topic名稱 topics=mysql-kafka-person # 配置JDBC鏈接 connection.url=jdbc:mysql://Desktop:3306/B?user=appleyuchi&password=appleyuchi # 不自動創建表,如果為true,會自動創建表,表名為topic名稱 auto.create=false # upsert model更新和插入 insert.mode=upsert # 下面兩個參數配置了以pid為主鍵更新 pk.mode = record_value pk.fields = pid #表名為kafkatable table.name.format=kafkaperson

?

Reference:

[1]Kafka Connect 實現MySQL增量同步

[2]Kafka connect快速構建數據ETL通道

[3]Kafka Connect 日志配置

?

總結

以上是生活随笔為你收集整理的Kafka实现MySQL增量同步的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。