Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)
###################################################################################################################
目的
本文是對參考文獻[1]在高版本上的的復現
###################################################################################################################
環境與配置
| 組件 | 版本 |
| Flink | 1.12 |
| Hive | 3.1.2 |
| mysql | 8.0.22-0ubuntu0.20.04.2 |
| Zookeeper | 3.6.0 |
| Hadoop | 3.1.2 |
| Ubuntu | 20.04 |
?
###################################################################################################################
步驟
service firewalld stop(關閉防火墻)
啓動hadoop
離開安全模式
啓動zookeeper與kafka集羣
啟動flink集群
?該實驗不需要額外的.yaml文件的配置,采用的是DDL方式
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 如果想刪除topic,可以是: ?
|
| 往my_topic 這個 topic發送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic my_topic | 這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致 [2]中的報錯還可能是某個節點的kafka掛掉導致的. ? 可能碰到[3] 注意關閉防火墻 ? ? |
| 使用kafka自帶消費端測試下消費 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic my_topic | 如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了, 此時如果使用Flink SQL Client來消費也必然會出現問題 |
| 清除topic中所有數據[6] (因為,萬一你輸錯了呢?對吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic my_topic | 需要$KAFKA/config/server.properties設置 delete.topic.enable=true |
kafka生產端輸入的數據:
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
?
###################################################################################################################################################
SQL Client+DDL方式-實驗結果
| ? | DDL/SQL | 實驗效果 |
| 建立表(對接kafka) | CREATE TABLE user_log1 ( ? ? user_id VARCHAR, ? ? item_id VARCHAR, ? ? category_id VARCHAR, ? ? behavior VARCHAR, ? ? ts VARCHAR ) WITH ( ? ? 'connector.type' = 'kafka', ? ? 'connector.version' = 'universal', ? ? 'connector.topic' = 'my_topic', ? ? 'connector.startup-mode' = 'earliest-offset', ? ? 'connector.properties.group.id' = 'testGroup', ? ? 'connector.properties.zookeeper.connect' = 'Desktop:2181,Laptop:2181,Laptop:2183', ? ? 'connector.properties.bootstrap.servers' = 'Desktop:9091', ? ? 'format.type' = 'json' ); | |
| 流計算 | select item_id,count(*) from user_log1 group by item_id; |
###################################################################################################################################################
Maven工程中嵌入DDL方式
代碼方式參考[2],自己運行通過的代碼如下:
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/Java/src/main/java/KafkaFlinkDDL.java
?
?
?
?
Reference:
[1]Flink通過SQLClinet創建kafka源表并進行實時計算
[2]Flink通過SQLClinet/Java代碼創建kafka源表,指定Offset消費,并進行實時計算,最后sink到mysql表中
總結
以上是生活随笔為你收集整理的Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 切片经营时代已来!电信携手华为发布5G终
- 下一篇: Kafka2.5->Flink1.12-