mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)
目錄
1.1. 服務(wù)器準備
1.2. 設(shè)置主機名并配置hosts
1.3. 免密設(shè)置
1.4. 設(shè)置ntp時間
1.5. 關(guān)閉防火墻
1.6. 關(guān)閉selinux
1.7. 安裝JDK
1.8. 安裝zookeeper
1.9. 安裝scala
2.1. 解壓
2.2. 配置環(huán)境變量
2.3. 修改配置文件
2.4. 再次修改server.properties
2.5. 創(chuàng)建日志目錄
2.6. Kafka集群啟動與測試
2.7. topic數(shù)據(jù)發(fā)送與消費
2.8. Kafka集群監(jiān)控–KafkaOffsetMonitor(老的方式)
2.9. Kafka集群監(jiān)控–KafkaCenter
2.9.1. 下載
2.9.2. 初始化
2.9.3. 編輯 application.properties屬性文件
2.9.4. 編譯和運行
3.1. 卸載原來的mysql
3.2. 創(chuàng)建canal賬號
3.3. 開啟Binlog寫入功能
4.1. 機器準備
4.2. 下載canal
4.3. 解壓縮
4.4. 修改配置文件
4.5. 創(chuàng)建example的topic
4.6. 啟動canal服務(wù)
4.7. 驗證功能
4.8. 準備數(shù)據(jù)庫測試數(shù)據(jù)
4.9. ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x106d73f2, /192.168.106.1:1312 :> /192.168.106.103:11111], exception=java.nio.channels.ClosedChannelException
4.10. 數(shù)據(jù)監(jiān)控微服務(wù)
5.1. 機器準備
5.2. 下載canal
5.3. 解壓縮
5.4. 修改配置文件
5.4.1. 修改 canal.properties
5.4.2. 修改 instance.properties
5.4.3. 另外一臺canal server配置
5.4.4. 啟動Zookeeper服務(wù)
5.4.5. 啟動canal服務(wù)(兩個canal同時啟動)
5.4.6. 客戶端鏈接消費數(shù)據(jù)
6.1. 機器準備
6.2. 下載canal
6.3. 解壓縮
6.4. 修改配置文件
6.4.1. 修改instance.properties
6.4.2. 修改canal.properties
6.5. 啟動相關(guān)服務(wù)
6.5.1. 啟動zookeeper服務(wù)
6.5.2. 啟動Kafka服務(wù)
6.5.3. 打開Kafka消費者
6.5.4. 啟動Canal服務(wù)
6.5.5. 觀察Kafka消費者
1.Canal安裝部署
1.1.服務(wù)器準備
| 192.168.106.103 | node1 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (master),canal單集 |
| 192.168.106.104 | node2 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (slave),canal-ha(master) |
| 192.168.106.105 | node3 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (slave),canal-ha(slave) |
1.2.設(shè)置主機名并配置hosts
四臺機器分別執(zhí)行:vim /etc/hostname ,分別修改為:node1,node2,node3
然后配置hosts,具體內(nèi)容如下:
[root@node1 ~]# vim /etc/hosts192.168.106.103 node1 192.168.106.104 node2 192.168.106.105 node31.3.免密設(shè)置
四臺機器上分別執(zhí)行:
ssh-keygen -t rsa ssh-copy-id node1 ssh-copy-id node2 ssh-copy-id node31.4.設(shè)置ntp時間
參考文檔:https://blog.csdn.net/tototuzuoquan/article/details/108900206
1.5.關(guān)閉防火墻
systemctl status firewalld.service # 查看防火墻的狀態(tài) systemctl stop firewalld.service # 關(guān)閉防火墻 systemctl disable firewalld.service # 設(shè)置開機不啟動 systemctl is-enabled firewalld.service # 查看防火墻服務(wù)是否設(shè)置開機啟動1.6.關(guān)閉selinux
https://www.linuxidc.com/Linux/2016-11/137723.htm
1.7.安裝JDK
四臺機器中解壓jdk,然后配置環(huán)境變量,例如:
export JAVA_HOME=/root/jdk1.8.0_161 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib/rt.jar export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin然后在每臺機器上執(zhí)行:source /etc/profile
1.8.安裝zookeeper
參考文章:https://blog.csdn.net/tototuzuoquan/article/details/54003140
其中zoo.cfg的內(nèi)容如下:
將上面zookeeper遠程拷貝到node1、node2、node3上。
進入node1、node2、node3的/root/apache-zookeeper-3.6.2-bin/data,分別執(zhí)行:
echo 1 > myid # node1上執(zhí)行 echo 2 > myid # node2上執(zhí)行 echo 3 > myid # node3上執(zhí)行然后分別進入node1、node2、node3上執(zhí)行:
# 啟動zk $ZOOKEEPER_HOME/bin/zkServer.sh start # 查看zk的狀態(tài) $ZOOKEEPER_HOME/bin/zkServer.sh status1.9.安裝scala
此部分略。
配置環(huán)境變量
export SCALA_HOME=/root/scala-2.12.12 export PATH=$PATH:$SCALA_HOME/bin2.安裝Kafka
2.1.解壓
使用如下命令,解壓kafka安裝包:
tar -zxvf kafka_2.12-2.6.0.tgz刪除Kafka安裝包:
rm -rf kafka_2.12-2.6.0.tgz2.2.配置環(huán)境變量
環(huán)境變量如下:
export SCALA_HOME=/root/scala-2.12.12 export PATH=$PATH:$SCALA_HOME/binexport KAFKA_HOME=/root/kafka_2.12-2.6.0 export PATH=$PATH:$KAFKA_HOME/bin然后執(zhí)行:source /etc/profile
2.3.修改配置文件
cd $KAFKA_HOME/config1、修改zookeeper.properties文件 [root@node1 config]# vim zookeeper.properties # ZooKeeper數(shù)據(jù)存儲路徑與Zookeeper配置文件保持一致 dataDir=/root/apache-zookeeper-3.6.2-bin/data2、修改consumer.properties [root@node1 config]# vim consumer.properties # 配置 Zookeeper 集群連接地址 zookeeper.connect=node1:2181,node2:2181,node3:21813 修改producer.properties [root@node1 config]# vim producer.properties # 修改kafka集群配置地址 bootstrap.servers=node1:9092,node2:9092,node3:90924 修改server.properties [root@node1 config]# vim server.properties # 配置ZooKeeper集群地址 zookeeper.connect=node1:2181,node2:2181,node3:2181 # 存儲日志文件目錄 log.dirs=/tmp/kafka-logs # 這個路徑可以修改將kafka等同步到各機器節(jié)點(在node1節(jié)點上執(zhí)行) [root@node1 ~]# scp -r kafka_2.12-2.6.0 root@node2:$PWD [root@node1 ~]# scp -r kafka_2.12-2.6.0 root@node3:$PWD2.4.再次修改server.properties
在各個節(jié)點分別修改server.properties
# 修改node1節(jié)點 broker.id=1 #修改node2 節(jié)點 broker.id=2 #修改node3節(jié)點 broker.id=32.5.創(chuàng)建日志目錄
三臺機器上分別執(zhí)行:
mkdir -p /tmp/kafka-logs (這里的/tmp/kafka-logs就是上面配置的kafka的日志目錄)2.6.Kafka集群啟動與測試
1、啟動zookeeper集群(3個節(jié)點上執(zhí)行)
$ZOOKEEPER_HOME/bin/zkServer.sh start2、啟動kafka集群
# 啟動kafka cd $KAFKA_HOME bin/kafka-server-start.sh -daemon config/server.properties3、查看topic列表
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list4、創(chuàng)建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 2然后在看topic列表
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list test5、查看topic詳情
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test test2.7.topic數(shù)據(jù)發(fā)送與消費
1.新api使用
node2使用自帶腳本消費topic數(shù)據(jù)
node1使用自帶腳本向topic發(fā)送數(shù)據(jù)
[root@node1 kafka_2.12-2.6.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testnode3使用自帶腳本消費topic數(shù)據(jù)(此時消費最新數(shù)據(jù))
[root@node3 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testnode3使用自帶腳本消費topic數(shù)據(jù)(從頭消費數(shù)據(jù))
[root@node2 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 輸入數(shù)據(jù) adfasdasfd 輸入測試3 shuru 輸入測試2查看消費數(shù)據(jù),必須要指定組。查看kafka組使用以下命令
[root@node2 kafka_2.12-2.6.0]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list console-consumer-21382查看topic每個partition數(shù)據(jù)消費情況
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group xxx --describebin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group xxx參數(shù)說明:
Group 消費者組
TOPIC:曾經(jīng)消費或正在消費的 topic
PARTITION:分區(qū)編號
CURRENT-OFFSET:consumer group 最后一次提交的 offset
LOG-END-OFFSET: 最后提交的生產(chǎn)消息 offset
LAG:消費 offset 與生產(chǎn) offset 之間的差值
CONSUMER-ID:當前消費 topic-partition 的 group 成員 id
HOST:消費節(jié)點的 ip 地址
CLIENT-ID:客戶端 id
2.8.Kafka集群監(jiān)控–KafkaOffsetMonitor(老的方式)
KafkaOffsetMonitor 是一個可以用于監(jiān)控 Kafka 的 Topic 及 Consumer 消費狀況的工具。以程
序一個 jar 包的形式運行,部署較為方便。只有監(jiān)控功能,使用起來也較為安全。
作用:
1)監(jiān)控 Kafka 集群狀態(tài),Topic、Consumer Group 列表。
2)圖形化展示 topic 和 Consumer 之間的關(guān)系。
3)圖形化展示 Consumer 的 offset、Lag 等信息。
1.下載
下載地址:https://github.com/quantifind/KafkaOffsetMonitor(可以使用已經(jīng)修改版本)
目前 kafka Monitor 必須使用舊 api 才可以監(jiān)控到,新 api 目前還沒有實現(xiàn)。
2.腳本參數(shù)格式
zk: Zookeeper 集群地址
port: 為開啟 web 界面的端口號
refresh: 刷新時間
retain: 數(shù)據(jù)保留時間(單位 seconds, minutes, hours, days) 3.開發(fā) kafkamonitor.sh 執(zhí)行腳本
vi kafkamonitor.sh
4.腳本授權(quán)
給腳本 kafkamonitor.sh 賦予可執(zhí)行權(quán)限
5.啟動監(jiān)控腳本
bin/kafkamonitor.sh6.web 可視化
node1:80902.9.Kafka集群監(jiān)控–KafkaCenter
github地址: https://github.com/xaecbd/KafkaCenter,下載KafkaCenter的包。
碼云的地址: https://gitee.com/newegg/KafkaCenter
2.9.1.下載
git clone https://github.com/xaecbd/KafkaCenter.git2.9.2.初始化
執(zhí)行:KafkaCenter-master\KafkaCenter-Core\sql\table_script.sql。
2.9.3.編輯 application.properties屬性文件
具體位置是:KafkaCenter/KafkaCenter-Core/src/main/resources/application.properties
主要是修改數(shù)據(jù)庫的密碼。
2.9.4.編譯和運行
注意的是:確保你安裝的JDK是JDK8+
$ git clone https://github.com/xaecbd/KafkaCenter.git (上面已經(jīng)執(zhí)行過了) $ cd KafkaCenter $ mvn clean package -Dmaven.test.skip=true $ cd KafkaCenter\KafkaCenter-Core\target $ java -jar KafkaCenter-Core-2.3.0-SNAPSHOT.jar3.安裝mysql
3.1.卸載原來的mysql
mysql的安裝方式可以按照https://blog.csdn.net/tototuzuoquan/article/details/104210148中的方式進行安裝。
3.2.創(chuàng)建canal賬號
mysql -uroot -p 輸入:123456內(nèi)容是: mysql> create user 'canal' identified by 'canal'; Query OK, 0 rows affected (0.00 sec)mysql> grant all privileges on *.* to 'canal'@'%' identified by 'canal'; Query OK, 0 rows affected (0.00 sec)mysql> flush privileges; Query OK, 0 rows affected (0.00 sec)mysql>3.3.開啟Binlog寫入功能
對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf
中配置如下:
重啟mysql
[root@node1 etc]# systemctl restart mysqld并創(chuàng)建數(shù)據(jù)庫test
create database test default character set utf8;4.Canal快速安裝部署
官網(wǎng)地址:https://github.com/alibaba/canal
4.1.機器準備
Canal服務(wù)端:node1
MySQL地址:node1
4.2.下載canal
下載地址:https://github.com/alibaba/canal/releases
主要有:
4.3.解壓縮
mkdir -p /root/canal tar zxvf canal.deployer-1.1.4.tar.gz -C /root/canal解壓完成后,進入/root/canal,可以看到如下結(jié)構(gòu):
[root@node1 canal]# pwd /root/canal [root@node1 canal]# ls bin conf lib logs [root@node1 canal]#4.4.修改配置文件
[root@node1 canal]# cd conf/example/ [root@node1 example]# ls instance.properties[root@node1 example]# vim instance.properties 內(nèi)容是: ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=1234## position info需要改成自己的數(shù)據(jù)庫信息 canal.instance.master.address=node1:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.instance.defaultDatabaseName=test #此處不加的時候,表示的是所有庫4.5.創(chuàng)建example的topic
[root@node1 example]# cd $KAFKA_HOME [root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic example --replication-factor 1 --partitions 1 Created topic example.4.6.啟動canal服務(wù)
cd /root/canal bin/startup.sh觀察canal日志
[root@node1 canal]# cd /root/canal/logs/canal [root@node1 canal]# tail -f canal.log 2020-12-18 22:50:52.994 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2020-12-18 22:50:53.083 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2020-12-18 22:50:53.112 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2020-12-18 22:50:53.192 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.106.103(192.168.106.103):11111] 2020-12-18 22:50:55.369 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......4.7.驗證功能
下載canal源碼,在idea中打開:canal-canal-1.1.4.zip。導(dǎo)入之后的效果如下:
打開類:com.alibaba.otter.canal.example.SimpleCanalClientPermanceTest,修改ip地址為:192.168.106.103。
4.8.準備數(shù)據(jù)庫測試數(shù)據(jù)
向mysql節(jié)點的數(shù)據(jù)庫中導(dǎo)入stu.sql表數(shù)據(jù),然后可以對stu表進行插入、刪除或者修改操作。其中stu的內(nèi)容如下:
create table `stu` (`name` varchar (60),`speciality` varchar (60) ); insert into `stu` (`name`, `speciality`) values('張三','美術(shù)'); insert into `stu` (`name`, `speciality`) values('張三','音樂'); insert into `stu` (`name`, `speciality`) values('李四','籃球'); insert into `stu` (`name`, `speciality`) values('小明','美術(shù)'); insert into `stu` (`name`, `speciality`) values('李四','美術(shù)'); insert into `stu` (`name`, `speciality`) values('小明','音樂');在插入數(shù)據(jù),修改,刪除等操作后,查看數(shù)據(jù)變化。(也可以通過下面的”數(shù)據(jù)監(jiān)控微服務(wù)”來查看數(shù)據(jù))。
4.9.ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x106d73f2, /192.168.106.1:1312 :> /192.168.106.103:11111], exception=java.nio.channels.ClosedChannelException
在這個過程中可能出現(xiàn)類似上面這個問題,解決辦法是,參考:https://blog.csdn.net/woainimax/article/details/105991825 所說
4.10.數(shù)據(jù)監(jiān)控微服務(wù)
當用戶執(zhí)行數(shù)據(jù)庫的操作的時候,binlog 日志會被canal捕獲到,并解析出數(shù)據(jù)。我們就可以將解析出來的數(shù)據(jù)進行相應(yīng)的邏輯處理。
我們在這里使用的一個開源的項目,它實現(xiàn)了springboot與canal的集成。比原生的canal更加優(yōu)雅。
https://github.com/chenqian56131/spring-boot-starter-canal
使用前需要將starter-canal安裝到本地倉庫
我們可以參照它提供的canal-test,進行代碼實現(xiàn)。
(1)創(chuàng)建工程模塊changgou_canal,pom引入依賴(注意:也可以在canal-test工程中直接寫,并把下面的依賴添加進去)
<dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version> </dependency>(2)創(chuàng)建包com.changgou.canal ,包下創(chuàng)建啟動類
@SpringBootApplication @EnableCanalClient public class CanalApplication { ?public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);} }(3)添加配置文件application.properties
# 在在canal-test中,此處開始是注釋的 canal.client.instances.example.host=192.168.106.103 # 在canal-test中,此處為2181 canal.client.instances.example.port=11111 canal.client.instances.example.batchSize=1000 # canal.client.instances.example.zookeeperAddress=192.168.0.59:8080,192.168.0.59:8081 # canal.client.instances.example.clusterEnabled=true(4)創(chuàng)建com.changgou.canal.listener包,包下創(chuàng)建類
@CanalEventListener public class BusinessListener {@ListenPoint(schema = "test", table = {"stu"})public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.err.println("監(jiān)聽test庫,stu表數(shù)據(jù)的變化");rowData.getBeforeColumnsList().forEach((c) -> System.err.println("更改前數(shù)據(jù): " + c.getName() + " :: " + c.getValue()));rowData.getAfterColumnsList().forEach((c) -> System.err.println("更改后數(shù)據(jù): " + c.getName() + " :: " + c.getValue()));} }測試:啟動數(shù)據(jù)監(jiān)控微服務(wù),修改test的stu表,觀察控制臺輸出。
執(zhí)行后的效果如下:
5.Canal Server+Canal Client HA
Canal Server和client端的高可用方案依賴zookeer,啟動canal server和client的時候,都會向zookeeper讀取信息。Canal在zookeeper存儲的數(shù)據(jù)結(jié)構(gòu)如下:
/otter └── canal└── destinations└── example # canal 實例名稱├── 1001 # canal client 信息│ ├── cursor # 當前消費的 mysql binlog 位點│ ├── filter # binlog 過濾條件│ └── running # 當前正在運行的 canal client 服務(wù)器├── cluster # canal server 列表│ └── ip:11111 └── running # 當前正在運行的 canal server 服務(wù)器Canal server 和 client 啟動的時候都會去搶占 zk 對應(yīng)的 running 節(jié)點, 保證只有一個server 和 client 在運行, 而 server 和 client 的高可用切換也是基于監(jiān)聽 running 節(jié)點進行的。
5.1.機器準備
3個節(jié)點zookeeper集群:node1,node2,node3
2個節(jié)點Canal服務(wù)端節(jié)點:node2,node3
MySQL節(jié)點:node1
5.2.下載canal
此處略
5.3.解壓縮
[root@node2 ~]# mkdir /root/canal-ha (node2,node3)上一樣。 [root@node2 ~]# tar -zxvf canal.deployer-1.1.4.tar.gz -C canal-ha/在node2上執(zhí)行:
[root@node2 ~]# scp -r canal-ha root@node3:$PWD[root@node2 canal-ha]# pwd /root/canal-ha [root@node2 canal-ha]# ls bin conf lib logs [root@node2 canal-ha]#5.4.修改配置文件
5.4.1.修改 canal.properties
[root@node2 conf]# pwd /root/canal-ha/conf [root@node2 conf]# vim canal.properties # zk集群地址 canal.zkServers = node1:2181,node2:2181,node3:2181# 全局的spring配置方式的組件文件 canal.instance.global.spring.xml = classpath:spring/default-instance.xml備注:
default-instance.xml 介紹:store 選擇了內(nèi)存模式,其余的 parser/sink依賴的位點管理選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數(shù)據(jù)集群共享。
特點:支持HA
場景:生產(chǎn)環(huán)境,集群化部署。
5.4.2.修改 instance.properties
# Canal偽裝的mysql slave的編號,不能與mysql數(shù)據(jù)庫與其他的slave重復(fù)。 canal.instance.mysql.slaveId = 1234 (兩臺canal不能一樣)# 要監(jiān)聽的數(shù)據(jù)庫的地址和端口號 canal.instance.master.address=node1:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal5.4.3.另外一臺canal server配置
配置同上
注意:兩臺機器上的instance目錄的名字需要保證完全一致,HA模式是依賴于instance name進行管理,同時必須都選擇default-instance.xml配置。
5.4.4.啟動Zookeeper服務(wù)
此部分略
5.4.5.啟動canal服務(wù)(兩個canal同時啟動)
兩個節(jié)點分別執(zhí)行如下命令啟動canal服務(wù):
bin/startup.sh啟動后,你可以查看logs/example/example.log,只會看到一臺機器上出現(xiàn)了啟動成功的日志。
node2上可以看到:
node3上可以看到:
[root@node3 logs]# pwd /root/canal-ha/logs [root@node3 logs]# ls canal example [root@node3 logs]# cd example/ [root@node3 example]# ls example.log [root@node3 example]# tail -f example.log -n 500 2020-12-27 03:42:07.860 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-12-27 03:42:07.910 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2020-12-27 03:42:08.708 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2020-12-27 03:42:08.983 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-12-27 03:42:08.983 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2020-12-27 03:42:10.851 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2020-12-27 03:42:10.864 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2020-12-27 03:42:10.864 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2020-12-27 03:42:10.907 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2020-12-27 03:42:11.634 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2020-12-27 03:42:11.636 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2020-12-27 03:42:26.160 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=3802,serverId=1,gtid=,timestamp=1608982052000] cost : 14301ms , the next step is binlog dump查看一下zookeeper中節(jié)點信息,也可以知道當前工作的節(jié)點。
[root@node2 bin]# pwd /root/apache-zookeeper-3.6.2-bin/bin [root@node2 bin]# ./zkCli.sh [zk: localhost:2181(CONNECTED) 6] get /otter/canal/destinations/example/running {"active":true,"address":"192.168.106.105:11111"} [zk: localhost:2181(CONNECTED) 8] ls /otter/canal/destinations/example/cluster [192.168.106.104:11111, 192.168.106.105:11111]5.4.6.客戶端鏈接消費數(shù)據(jù)
可以直接指定zookeeper地址和instance name,canal client會自動從zookeeper中的running節(jié)點,獲取當前服務(wù)的工作節(jié)點,然后與其建立鏈接(這里還是使用官方提供的例子),要修改的類是com.alibaba.otter.canal.example.ClusterCanalClientTest:
package com.alibaba.otter.canal.example;import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors;/*** 集群模式的測試例子* * @version 1.0.4*/ public class ClusterCanalClientTest extends AbstractCanalClientTest {public ClusterCanalClientTest(String destination){super(destination);}public static void main(String args[]) {String destination = "example";// 基于固定canal server的地址,建立鏈接,其中一臺server發(fā)生crash,可以支持failover// CanalConnector connector = CanalConnectors.newClusterConnector(// Arrays.asList(new InetSocketAddress(// AddressUtils.getHostIp(),// 11111)),// "stability_test", "", "");// 基于zookeeper動態(tài)獲取canal server的地址,建立鏈接,其中一臺server發(fā)生crash,可以支持failoverCanalConnector connector = CanalConnectors.newClusterConnector("192.168.106.104:2181", destination, "canal", "canal");final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);clientTest.setConnector(connector);clientTest.start();Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {try {logger.info("## stop the canal client");clientTest.stop();} catch (Throwable e) {logger.warn("##something goes wrong when stopping canal:", e);} finally {logger.info("## canal client is down.");}}});} }輸出的內(nèi)容是:
**************************************************** * Batch Id: [1] ,count : [2] , memsize : [81] , Time : 2020-12-27 04:26:06 * Start : [mysql-bin.000001:3853:1608982052000(2020-12-26 19:27:32)] * End : [mysql-bin.000001:3903:1608982052000(2020-12-26 19:27:32)] ****************************************************----------------> binlog[mysql-bin.000001:3853] , name[test,stu] , eventType : INSERT , executeTime : 1608982052000(2020-12-26 19:27:32) , gtid : () , delay : 32314134 ms name : 小明 type=varchar(60) update=true speciality : 音樂 type=varchar(60) update=true ----------------END ----> transaction id: 730 ================> binlog[mysql-bin.000001:3903] , executeTime : 1608982052000(2020-12-26 19:27:32) , gtid : () , delay : 32314141ms連接成功后,canal server會記錄當前正在工作的canal client信息,比如客戶端ip,連接的端口信息等。
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/1001/running {"active":true,"address":"192.168.106.1:3222","clientId":1001} [zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"node1","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":4172,"serverId":1,"timestamp":1609019073000}}6.MySQL+Canal+Kafka集成開發(fā)
官網(wǎng)地址:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
6.1.機器準備
Zookeeper集群:node1,node2,node3
Kafka集群:node1,node2,node3
MySQL節(jié)點:node1
Canal服務(wù)端:node1
6.2.下載canal
下載地址:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
6.3.解壓縮
[root@node1 ~]# pwd /root [root@node1 ~]# mkdir canal-kafka [root@node1 ~]# tar -zxvf canal.deployer-1.1.4.tar.gz -C canal-kafka解壓完成后,進入/root/canal-kafka
[root@node1 ~]# cd canal-kafka/ [root@node1 canal-kafka]# ls bin conf lib logs [root@node1 canal-kafka]#6.4.修改配置文件
6.4.1.修改instance.properties
/root/canal-kafka/conf/example/instance.properties # position info canal.instance.master.address=192.168.106.103:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal# mq config canal.mq.topic=test6.4.2.修改canal.properties
/root/canal-kafka/conf/canal.properties
# tcp, kafka, RocketMQ canal.serverMode = kafka# zk集群地址 canal.zkServers = node1:2181,node2:2181,node3:2181# kafka集群地址 canal.mq.servers = node1:9092,node2:9092,node3:90926.5.啟動相關(guān)服務(wù)
6.5.1.啟動zookeeper服務(wù)
source /etc/profile $ZOOKEEPER_HOME/bin/zkServer.sh start6.5.2.啟動Kafka服務(wù)
# 啟動kafka cd $KAFKA_HOME bin/kafka-server-start.sh -daemon config/server.properties6.5.3.打開Kafka消費者
查看kafka-topic列表
cd $KAFKA_HOME[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper node1:2181 -list test [root@node1 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test6.5.4.啟動Canal服務(wù)
[root@node1 ~]# cd canal-kafka/ [root@node1 canal-kafka]# bin/startup.sh cd to /root/canal-kafka/bin for workaround relative path LOG CONFIGURATION : /root/canal-kafka/bin/../conf/logback.xml canal conf : /root/canal-kafka/bin/../conf/canal.properties CLASSPATH :/root/canal-kafka/bin/../conf:/root/canal-kafka/bin/../lib/zookeeper-3.4.5.jar:/root/canal-kafka/bin/../lib/zkclient-0.10.jar:/root/canal-kafka/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-core-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-context-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/snappy-java-1.1.7.1.jar:/root/canal-kafka/bin/../lib/snakeyaml-1.19.jar:/root/canal-kafka/bin/../lib/slf4j-api-1.7.12.jar:/root/canal-kafka/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_httpserver-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_hotspot-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_common-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient-0.4.0.jar:/root/canal-kafka/bin/../lib/scala-reflect-2.11.12.jar:/root/canal-kafka/bin/../lib/scala-logging_2.11-3.8.0.jar:/root/canal-kafka/bin/../lib/scala-library-2.11.12.jar:/root/canal-kafka/bin/../lib/rocketmq-srvutil-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-remoting-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-logging-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-common-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-client-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-acl-4.5.2.jar:/root/canal-kafka/bin/../lib/protobuf-java-3.6.1.jar:/root/canal-kafka/bin/../lib/oro-2.0.8.jar:/root/canal-kafka/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/root/canal-kafka/bin/../lib/netty-all-4.1.6.Final.jar:/root/canal-kafka/bin/../lib/netty-3.2.2.Final.jar:/root/canal-kafka/bin/../lib/mysql-connector-java-5.1.47.jar:/root/canal-kafka/bin/../lib/metrics-core-2.2.0.jar:/root/canal-kafka/bin/../lib/lz4-java-1.4.1.jar:/root/canal-kafka/bin/../lib/logback-core-1.1.3.jar:/root/canal-kafka/bin/../lib/logback-classic-1.1.3.jar:/root/canal-kafka/bin/../lib/kafka-clients-1.1.1.jar:/root/canal-kafka/bin/../lib/kafka_2.11-1.1.1.jar:/root/canal-kafka/bin/../lib/jsr305-3.0.2.jar:/root/canal-kafka/bin/../lib/jopt-simple-5.0.4.jar:/root/canal-kafka/bin/../lib/jctools-core-2.1.2.jar:/root/canal-kafka/bin/../lib/jcl-over-slf4j-1.7.12.jar:/root/canal-kafka/bin/../lib/javax.annotation-api-1.3.2.jar:/root/canal-kafka/bin/../lib/jackson-databind-2.9.6.jar:/root/canal-kafka/bin/../lib/jackson-core-2.9.6.jar:/root/canal-kafka/bin/../lib/jackson-annotations-2.9.0.jar:/root/canal-kafka/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/root/canal-kafka/bin/../lib/httpcore-4.4.3.jar:/root/canal-kafka/bin/../lib/httpclient-4.5.1.jar:/root/canal-kafka/bin/../lib/h2-1.4.196.jar:/root/canal-kafka/bin/../lib/guava-18.0.jar:/root/canal-kafka/bin/../lib/fastsql-2.0.0_preview_973.jar:/root/canal-kafka/bin/../lib/fastjson-1.2.58.jar:/root/canal-kafka/bin/../lib/druid-1.1.9.jar:/root/canal-kafka/bin/../lib/disruptor-3.4.2.jar:/root/canal-kafka/bin/../lib/commons-logging-1.1.3.jar:/root/canal-kafka/bin/../lib/commons-lang3-3.4.jar:/root/canal-kafka/bin/../lib/commons-lang-2.6.jar:/root/canal-kafka/bin/../lib/commons-io-2.4.jar:/root/canal-kafka/bin/../lib/commons-compress-1.9.jar:/root/canal-kafka/bin/../lib/commons-codec-1.9.jar:/root/canal-kafka/bin/../lib/commons-cli-1.2.jar:/root/canal-kafka/bin/../lib/commons-beanutils-1.8.2.jar:/root/canal-kafka/bin/../lib/canal.store-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.sink-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.server-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.protocol-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.prometheus-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse.driver-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse.dbsync-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.meta-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.spring-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.manager-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.core-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.filter-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.deployer-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.common-1.1.4.jar:/root/canal-kafka/bin/../lib/aviator-2.2.1.jar:/root/canal-kafka/bin/../lib/aopalliance-1.0.jar:.:/root/jdk1.8.0_161/lib/dt.jar:/root/jdk1.8.0_161/lib/tools.jar:/root/jdk1.8.0_161/jre/lib/rt.jar cd to /root/canal-kafka for continue [root@node1 canal-kafka]#6.5.5.觀察Kafka消費者
第一次啟動canal,如果mysql binlog有數(shù)據(jù)的話,可以直接采集到Kafka集群,打印到Kafka消費者控制臺。
[root@node1 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test {"data":null,"database":"`kafka_center`","es":1609021630000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Dumping database structure for kafka_center\r\nCREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */","sqlType":null,"table":"","ts":1609079707068,"type":"QUERY"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Dumping structure for table kafka_center.alert_group\r\nCREATE TABLE IF NOT EXISTS `alert_group` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `threshold` int(11) DEFAULT NULL,\r\n `dispause` int(11) DEFAULT NULL,\r\n `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_date` datetime DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `disable_alerta` tinyint(1) DEFAULT 0,\r\n `enable` tinyint(1) NOT NULL DEFAULT 1,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"alert_group","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.cluster_info\r\nCREATE TABLE IF NOT EXISTS `cluster_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL,\r\n `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `enable` int(11) DEFAULT NULL,\r\n `broker_size` int(4) DEFAULT 0,\r\n `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '',\r\n `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"cluster_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.ksql_info\r\nCREATE TABLE IF NOT EXISTS `ksql_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) DEFAULT NULL,\r\n `cluster_name` varchar(255) DEFAULT NULL,\r\n `ksql_url` varchar(255) DEFAULT NULL,\r\n `ksql_serverId` varchar(255) DEFAULT NULL,\r\n `version` varchar(255) DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"ksql_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.task_info\r\nCREATE TABLE IF NOT EXISTS `task_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `partition` int(11) DEFAULT NULL,\r\n `replication` int(11) DEFAULT NULL,\r\n `message_rate` int(50) DEFAULT NULL,\r\n `ttl` int(11) DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `approved` int(11) DEFAULT NULL,\r\n `approved_id` int(11) DEFAULT NULL,\r\n `approved_time` datetime DEFAULT NULL,\r\n `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"task_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.team_info\r\nCREATE TABLE IF NOT EXISTS `team_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `alarm_group` varchar(255) COLLATE utf8_bin DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"team_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.topic_collection\r\nCREATE TABLE IF NOT EXISTS `topic_collection` (\r\n `id` int(11) unsigned NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `user_id` int(11) NOT NULL,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"topic_collection","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.topic_info\r\nCREATE TABLE IF NOT EXISTS `topic_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `partition` int(11) DEFAULT NULL,\r\n `replication` int(11) DEFAULT NULL,\r\n `ttl` bigint(11) DEFAULT NULL,\r\n `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `file_size` bigint(20) NOT NULL DEFAULT -1,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"topic_info","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.user_info\r\nCREATE TABLE IF NOT EXISTS `user_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `real_name` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100',\r\n `create_time` datetime DEFAULT NULL,\r\n `password` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"user_info","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.user_team\r\nCREATE TABLE IF NOT EXISTS `user_team` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `user_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"user_team","ts":1609079707072,"type":"CREATE"}可以往mysql刪除、更新、插入數(shù)據(jù),kafka消費者控制臺可以實時消費到binlog日志數(shù)據(jù)。
往stu表中插入數(shù)據(jù):
觀察日志,新增的內(nèi)容如下:
{"data":[{"name":"田七","speciality":"語文"}],"database":"test","es":1609080224000,"id":2,"isDdl":false,"mysqlType":{"name":"varchar(60)","speciality":"varchar(60)"},"old":null,"pkNames":null,"sql":"","sqlType":{"name":12,"speciality":12},"table":"stu","ts":1609080224938,"type":"INSERT"}打個賞唄,您的支持是我堅持寫好博文的動力
總結(jié)
以上是生活随笔為你收集整理的mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 股票阿尔法和贝塔什么意思
- 下一篇: 1.18.Table API SQL(