日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式)

發(fā)布時(shí)間:2023/12/31 数据库 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

概述:

對(duì)參考鏈接[1]進(jìn)行DDL上的復(fù)現(xiàn)。

一些基本的業(yè)務(wù)常識(shí)

?來源載體數(shù)據(jù)特點(diǎn)
維表Mysql/Csv/Hbase很少變化
事實(shí)表Kafka不停變化

?

開發(fā)環(huán)境與準(zhǔn)備工作

組件版本
Flink(HA)1.12
Zookeeper3.6.0
Hadoop3.1.2
Kafka(HA)2.5.0
Mysql8.0.22-0ubuntu0.20.04.2 (Ubuntu)

關(guān)閉防火墻:

service firewalld stop

然后啟動(dòng)上述所有集群

實(shí)驗(yàn)框架

數(shù)據(jù)集+完整實(shí)驗(yàn)步驟+DDL/SQL

①CSV數(shù)據(jù)集如下(供t1讀取)

?

把數(shù)據(jù)集放到HDFS上面去:

hdfs dfs -mkdir /test

hdfs dfs -put UserBehavior.csv /test

創(chuàng)建mysql表格

create database dijie_test;

use dijie_test;

-- Mysql 建表語句,注意這是在Mysql執(zhí)行的!不要在Zeppelin執(zhí)行CREATE TABLE `dim_behavior` (`id` int(10) NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',`en_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '英文 行為',`zh_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '中文 行為',PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;-- 搞兩條數(shù)據(jù)INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (1, 'buy', '購買');INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (2, 'pv', '瀏覽');

這樣外部準(zhǔn)備工作(CSV和mysql)就算結(jié)束了。

②操作Flink SQL Client步驟

詳細(xì)操作步驟備注中間實(shí)驗(yàn)效果
創(chuàng)建t1(見下方gitee鏈接)定義source?
select * from t1;檢查flink sql client是否被順利讀取到?
創(chuàng)建t2(見下方gitee鏈接)定義sink?
insert into t2 select user_id,item_id,category_id,behavior,UNIX_TIMESTAMP() as ts from t1;csv寫入kafka

$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test

kafka的消費(fèi)端檢查下是否真的存入了這些數(shù)據(jù)?
創(chuàng)建t3(見下方gitee鏈接)建立事實(shí)表source?
select * from t3確保事實(shí)表格的每條數(shù)據(jù)有出現(xiàn)在kafka中?
創(chuàng)建dim_behavior(見下方gitee鏈接)建立維度表source?
進(jìn)行l(wèi)eft join查詢(見下方gitee鏈接)最終結(jié)果

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數(shù)據(jù)源/KafkaMysqlJoinDDL.sql

上述表格看著挺凌亂,時(shí)刻記住最上方的框圖,出現(xiàn)問題時(shí),要在本文開頭的框圖中定位是哪個(gè)環(huán)節(jié)出了問題,才好調(diào)試。

###################################################################################################################################################

附錄

可能用到的kafka操作

操作命令備注
查看topic$KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

?


?

zeppelin_01_test這個(gè) topic發(fā)送 json 消息

?

$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic zeppelin_01_test

這里可能碰到[2]中的報(bào)錯(cuò),注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴(yán)格保持一致

[2]中的報(bào)錯(cuò)還可能是某個(gè)節(jié)點(diǎn)的kafka掛掉導(dǎo)致的.

?

可能碰到[3]

注意關(guān)閉防火墻

?

?

使用kafka自帶消費(fèi)端測(cè)試下消費(fèi)$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test

如果kafka自帶消費(fèi)者測(cè)試有問題,那么就不用繼續(xù)往下面做了,

此時(shí)如果使用Flink SQL Client來消費(fèi)也必然會(huì)出現(xiàn)問題

清除topic中所有數(shù)據(jù)[6](因?yàn)?萬一你輸錯(cuò)了呢?對(duì)吧)$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic zeppelin_01_test

需要$KAFKA/config/server.properties設(shè)置

delete.topic.enable=true

?

#######################################################################################################################

?

Reference:

[1]https://blog.csdn.net/weixin_47482194/article/details/106672613

[2]Kafka->Flink->Hbase(純DDL/SQL形式)

總結(jié)

以上是生活随笔為你收集整理的Flink进行Kafka事实表与Mysql维度表Join(纯DDL/DML方式)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。