日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式)

發布時間:2023/12/31 数据库 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概述:

對參考鏈接[1]進行DDL上的復現。

一些基本的業務常識

?來源載體數據特點
維表Mysql/Csv/Hbase很少變化
事實表Kafka不停變化

?

開發環境與準備工作

組件版本
Flink(HA)1.12
Zookeeper3.6.0
Hadoop3.1.2
Kafka(HA)2.5.0
Mysql8.0.22-0ubuntu0.20.04.2 (Ubuntu)

關閉防火墻:

service firewalld stop

然后啟動上述所有集群

實驗框架

數據集+完整實驗步驟+DDL/SQL

①CSV數據集如下(供t1讀取)

?

把數據集放到HDFS上面去:

hdfs dfs -mkdir /test

hdfs dfs -put UserBehavior.csv /test

創建mysql表格

create database dijie_test;

use dijie_test;

-- Mysql 建表語句,注意這是在Mysql執行的!不要在Zeppelin執行CREATE TABLE `dim_behavior` (`id` int(10) NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',`en_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '英文 行為',`zh_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '中文 行為',PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;-- 搞兩條數據INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (1, 'buy', '購買');INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (2, 'pv', '瀏覽');

這樣外部準備工作(CSV和mysql)就算結束了。

②操作Flink SQL Client步驟

詳細操作步驟備注中間實驗效果
創建t1(見下方gitee鏈接)定義source?
select * from t1;檢查flink sql client是否被順利讀取到?
創建t2(見下方gitee鏈接)定義sink?
insert into t2 select user_id,item_id,category_id,behavior,UNIX_TIMESTAMP() as ts from t1;csv寫入kafka

$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test

kafka的消費端檢查下是否真的存入了這些數據?
創建t3(見下方gitee鏈接)建立事實表source?
select * from t3確保事實表格的每條數據有出現在kafka中?
創建dim_behavior(見下方gitee鏈接)建立維度表source?
進行left join查詢(見下方gitee鏈接)最終結果

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/KafkaMysqlJoinDDL.sql

上述表格看著挺凌亂,時刻記住最上方的框圖,出現問題時,要在本文開頭的框圖中定位是哪個環節出了問題,才好調試。

###################################################################################################################################################

附錄

可能用到的kafka操作

操作命令備注
查看topic$KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

?


?

zeppelin_01_test這個 topic發送 json 消息

?

$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic zeppelin_01_test

這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致

[2]中的報錯還可能是某個節點的kafka掛掉導致的.

?

可能碰到[3]

注意關閉防火墻

?

?

使用kafka自帶消費端測試下消費$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test

如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了,

此時如果使用Flink SQL Client來消費也必然會出現問題

清除topic中所有數據[6](因為,萬一你輸錯了呢?對吧)$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic zeppelin_01_test

需要$KAFKA/config/server.properties設置

delete.topic.enable=true

?

#######################################################################################################################

?

Reference:

[1]https://blog.csdn.net/weixin_47482194/article/details/106672613

[2]Kafka->Flink->Hbase(純DDL/SQL形式)

總結

以上是生活随笔為你收集整理的Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。