日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)

發布時間:2023/12/31 数据库 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

###################################################################################################################

目的

本文是對參考文獻[1]在高版本上的的復現

###################################################################################################################

環境與配置

組件版本
Flink1.12
Hive3.1.2
mysql8.0.22-0ubuntu0.20.04.2
Zookeeper3.6.0
Hadoop3.1.2
Ubuntu20.04

?

###################################################################################################################

步驟

service firewalld stop(關閉防火墻)

啓動hadoop

離開安全模式

啓動zookeeper與kafka集羣

啟動flink集群

?該實驗不需要額外的.yaml文件的配置,采用的是DDL方式

操作命令備注
查看topic$KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

如果想刪除topic,可以是:

?


?

my_topic 這個 topic發送 json 消息$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic my_topic

這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致

[2]中的報錯還可能是某個節點的kafka掛掉導致的.

?

可能碰到[3]

注意關閉防火墻

?

?

使用kafka自帶消費端測試下消費$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic my_topic

如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了,

此時如果使用Flink SQL Client來消費也必然會出現問題

清除topic中所有數據[6]

(因為,萬一你輸錯了呢?對吧)

$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic my_topic

需要$KAFKA/config/server.properties設置

delete.topic.enable=true

kafka生產端輸入的數據:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
?

###################################################################################################################################################

SQL Client+DDL方式-實驗結果

?DDL/SQL實驗效果
建立表(對接kafka)CREATE TABLE user_log1 (
? ? user_id VARCHAR,
? ? item_id VARCHAR,
? ? category_id VARCHAR,
? ? behavior VARCHAR,
? ? ts VARCHAR
) WITH (
? ? 'connector.type' = 'kafka',
? ? 'connector.version' = 'universal',
? ? 'connector.topic' = 'my_topic',
? ? 'connector.startup-mode' = 'earliest-offset',
? ? 'connector.properties.group.id' = 'testGroup',
? ? 'connector.properties.zookeeper.connect' = 'Desktop:2181,Laptop:2181,Laptop:2183',
? ? 'connector.properties.bootstrap.servers' = 'Desktop:9091',
? ? 'format.type' = 'json'
);
流計算select item_id,count(*) from user_log1 group by item_id;

###################################################################################################################################################

Maven工程中嵌入DDL方式

代碼方式參考[2],自己運行通過的代碼如下:

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/Java/src/main/java/KafkaFlinkDDL.java

?

?

?

?

Reference:

[1]Flink通過SQLClinet創建kafka源表并進行實時計算

[2]Flink通過SQLClinet/Java代碼創建kafka源表,指定Offset消費,并進行實時計算,最后sink到mysql表中

總結

以上是生活随笔為你收集整理的Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。