Flink SQL Client实现CDC实验
概述
本文主要是對[7]中內(nèi)容的復(fù)現(xiàn)
環(huán)境
| 組件 | 版本 |
| Flink(HA) | 1.12 |
| Zookeeper | 3.6.0 |
| flink-sql-connector-mysql-cdc | 1.1.1 |
| Mysql | 8.0.22-0ubuntu0.20.04.2 |
?
實(shí)驗(yàn)流程圖
Mysql的同步配置
/etc/mysql/mysql.conf.d/mysqld.cnf中的[mysqld]下面添加:
# 前面還有其他配置 # 添加的部分 server-id = 12345 log-bin = mysql-bin # 必須為ROW binlog_format = ROW # 必須為FULL,MySQL-5.7后才有該參數(shù) binlog_row_image = FULL expire_logs_days = 10查看binlog默認(rèn)配置:
SHOW VARIABLES LIKE '%binlog%';
service mysql restart
創(chuàng)建用于同步的用戶,并給予權(quán)限(可供參考的文檔)
-- 設(shè)置擁有同步權(quán)限的用戶 CREATE USER 'appleyuchi' IDENTIFIED BY 'appleyuchi'; -- 賦予同步相關(guān)權(quán)限 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'appleyuchi';創(chuàng)建用戶并賦予權(quán)限成功后,使用該用戶登錄MySQL,可以使用以下命令查看主從同步相關(guān)信息
SHOW MASTER STATUS; SHOW SLAVE STATUS; SHOW BINARY LOGS;?
詳細(xì)操作步驟
| 操作內(nèi)容 | 效果 |
| Mysql建立表格 | ? |
| Mysql插入數(shù)據(jù) | |
| Flink SQL Client建立Source | ? |
| Flink SQL Client建立Sink | ? |
| mysql_binlog的數(shù)據(jù)插入到tb_sink | |
| 查看當(dāng)前Flink集羣的taskmanager中的數(shù)據(jù) | |
| 更新Mysql以後,再次查看Flink集羣的taskmanager中的數(shù)據(jù) |
上述表格中各個(gè)步驟需要的SQL匯總?cè)缦?
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數(shù)據(jù)源/cdc.sql
?
這裏需要注意,如果本實(shí)驗(yàn)的source中的mysql的IP是一個(gè)外網(wǎng)IP,那麼需要確保mysql所在節(jié)點(diǎn)可以被外網(wǎng)訪問,
否則會(huì)無法順利提交任務(wù)到集羣.上述鏈接中使用的localhost,沒有該問題
?
實(shí)驗(yàn)結(jié)論
當(dāng)Mysql中修改數(shù)據(jù)以後,我們會(huì)發(fā)現(xiàn)Flink集羣任務(wù)中的Task Manager的Stdout也會(huì)立刻做出修改.
因此CDC的同步功能順利實(shí)現(xiàn)
CDC其實(shí)就是類似于一個(gè)金山同步盤一樣的功能,上游的數(shù)據(jù)改動(dòng)后,同步到下游.
?
依賴問題
https://maven.aliyun.com/mvn/search
搜索flink-sql-connector-mysql-cdc-1.1.1.jar
然后放到$FLINK_HOME/lib下面,然后同步到集群的其他節(jié)點(diǎn)
?
異常
Caused by: java.sql.SQLNonTransientConnectionException: Public Key Retrieval is not allowed
如果集羣兩個(gè)節(jié)點(diǎn)Desktop和Laptop
此時(shí)Laptop無法登錄Desktop的mysql就會(huì)導(dǎo)致Flink集羣發(fā)生這種異常報(bào)錯(cuò)
此時(shí)下述兩個(gè)解決辦法取其一即可:
①讓Desktop中的mysql支持外網(wǎng)訪問
②source中的mysql的IP從原本的Desktop改成localhost
?
Reference:
[1]Flink SQL Client + Mysql CDC 部署實(shí)踐(yaml格式)
[2]關(guān)于flink:Flink-SQL-Client-Mysql-CDC-部署實(shí)踐(yaml格式)
[3]Flink SQL CDC(涉及ElasticSearch的)
[4]Flink SQL CDC 上線!我們總結(jié)了 13 條生產(chǎn)實(shí)踐經(jīng)驗(yàn)(提到維度表Join和雙流Join不一樣)
[5]Flink1.11中的CDC Connectors操作實(shí)踐(非常詳細(xì))
[6]基于 Flink SQL CDC的實(shí)時(shí)數(shù)據(jù)同步方案(沒啥用,講了一系列的場景和架構(gòu))
[7]Flink示例——Flink-CDC(DDL嵌入到代碼中的形式)
[8]Mysql binlog詳解(詳細(xì)介紹)
總結(jié)
以上是生活随笔為你收集整理的Flink SQL Client实现CDC实验的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 页面自适应的几种方法
- 下一篇: Flink SQL Client方言切换