Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式)
概述:
對(duì)參考鏈接[1]進(jìn)行DDL上的復(fù)現(xiàn)。
一些基本的業(yè)務(wù)常識(shí)
| ? | 來源載體 | 數(shù)據(jù)特點(diǎn) |
| 維表 | Mysql/Csv/Hbase | 很少變化 |
| 事實(shí)表 | Kafka | 不停變化 |
?
開發(fā)環(huán)境與準(zhǔn)備工作
| 組件 | 版本 |
| Flink(HA) | 1.12 |
| Zookeeper | 3.6.0 |
| Hadoop | 3.1.2 |
| Kafka(HA) | 2.5.0 |
| Mysql | 8.0.22-0ubuntu0.20.04.2 (Ubuntu) |
關(guān)閉防火墻:
service firewalld stop
然后啟動(dòng)上述所有集群
實(shí)驗(yàn)框架
數(shù)據(jù)集+完整實(shí)驗(yàn)步驟+DDL/SQL
①CSV數(shù)據(jù)集如下(供t1讀取)
?
把數(shù)據(jù)集放到HDFS上面去:
hdfs dfs -mkdir /test
hdfs dfs -put UserBehavior.csv /test
創(chuàng)建mysql表格
create database dijie_test;
use dijie_test;
-- Mysql 建表語句,注意這是在Mysql執(zhí)行的!不要在Zeppelin執(zhí)行CREATE TABLE `dim_behavior` (`id` int(10) NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',`en_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '英文 行為',`zh_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '中文 行為',PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;-- 搞兩條數(shù)據(jù)INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (1, 'buy', '購買');INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (2, 'pv', '瀏覽');這樣外部準(zhǔn)備工作(CSV和mysql)就算結(jié)束了。
②操作Flink SQL Client步驟
| 詳細(xì)操作步驟 | 備注 | 中間實(shí)驗(yàn)效果 |
| 創(chuàng)建t1(見下方gitee鏈接) | 定義source | ? |
| select * from t1; | 檢查flink sql client是否被順利讀取到 | ? |
| 創(chuàng)建t2(見下方gitee鏈接) | 定義sink | ? |
| insert into t2 select user_id,item_id,category_id,behavior,UNIX_TIMESTAMP() as ts from t1; | csv寫入kafka | |
| $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test | kafka的消費(fèi)端檢查下是否真的存入了這些數(shù)據(jù) | ? |
| 創(chuàng)建t3(見下方gitee鏈接) | 建立事實(shí)表source | ? |
| select * from t3 | 確保事實(shí)表格的每條數(shù)據(jù)有出現(xiàn)在kafka中 | ? |
| 創(chuàng)建dim_behavior(見下方gitee鏈接) | 建立維度表source | ? |
| 進(jìn)行l(wèi)eft join查詢(見下方gitee鏈接) | 最終結(jié)果 |
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數(shù)據(jù)源/KafkaMysqlJoinDDL.sql
上述表格看著挺凌亂,時(shí)刻記住最上方的框圖,出現(xiàn)問題時(shí),要在本文開頭的框圖中定位是哪個(gè)環(huán)節(jié)出了問題,才好調(diào)試。
###################################################################################################################################################
附錄
可能用到的kafka操作
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 無 ?
|
| 往zeppelin_01_test這個(gè) topic發(fā)送 json 消息 | ? $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic zeppelin_01_test | 這里可能碰到[2]中的報(bào)錯(cuò),注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴(yán)格保持一致 [2]中的報(bào)錯(cuò)還可能是某個(gè)節(jié)點(diǎn)的kafka掛掉導(dǎo)致的. ? 可能碰到[3] 注意關(guān)閉防火墻 ? ? |
| 使用kafka自帶消費(fèi)端測(cè)試下消費(fèi) | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test | 如果kafka自帶消費(fèi)者測(cè)試有問題,那么就不用繼續(xù)往下面做了, 此時(shí)如果使用Flink SQL Client來消費(fèi)也必然會(huì)出現(xiàn)問題 |
| 清除topic中所有數(shù)據(jù)[6](因?yàn)?萬一你輸錯(cuò)了呢?對(duì)吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic zeppelin_01_test | 需要$KAFKA/config/server.properties設(shè)置 delete.topic.enable=true |
?
#######################################################################################################################
?
Reference:
[1]https://blog.csdn.net/weixin_47482194/article/details/106672613
[2]Kafka->Flink->Hbase(純DDL/SQL形式)
總結(jié)
以上是生活随笔為你收集整理的Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于宁波一些眼科流传的营养针
- 下一篇: 彻底理解IP地址分类与CIDR IP地址