Flink SQL解析复杂Join(转载+自己整理和补充)
?概述
$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test
本文重點復現[1]
目的是為了解析下面的這種比較復雜的JSON(這樣就可以省去寫復雜的UDF啦,嘻嘻)
{"afterColumns":{"created":"1589186680","extra":{"canGiving":false},"parameter":[1,2,3,4]},"beforeColumns":null,"tableVersion":{"binlogFile":null,"binlogPosition":0,"version":0},"touchTime":1589186680591 }?
開發環境與準備工作
| 組件 | 版本 |
| Flink(HA) | 1.12 |
| Zookeeper | 3.6.0 |
| Hadoop | 3.1.2 |
| Hbase(HA) | 2.2.4 |
| Kafka(HA) | 2.5.0 |
| Mysql | 8.0.22-0ubuntu0.20.04.2 (Ubuntu) |
?
關閉防火墻,啟動上述所有集群
######################################################################################################
①$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic json_parse
{"afterColumns":{"created":"1589186680","extra":{"canGiving":false},"parameter":[1,2,3,4] },"beforeColumns":null,"tableVersion":{"binlogFile":null, "binlogPosition":0, "version":0},"touchTime":1589186680591}
輸入后按下回車
$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic json_parse
消費該topic,確保kafka已經包含了該數據
②啟動Flink SQL Client
輸入DDL,以下面為準:
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/json_parse.sql
select * from t1;
| Flink SQL Client查詢語句 | 執行效果 |
| select * from t1; | |
| select afterColumns.extra.canGiving from t1; | |
| select afterColumns.`parameter`[1] from t1; |
############################################################################################################
總結
Json中的每個{}都需要用Row來包含
Json中的每個[]都需要用Arrary來包含
############################################################################################################
附錄
JSON數據類型和FLINK SQL數據類型的映射關系
| Object | ROW |
| Boolean | BOOLEAN |
| Array | ARRAY[_] |
| Number | DECIMAL |
| Integer | DECIMAL |
| String | STRING |
| String 與 format: date-time | TIMESTAMP |
| String 與 format: date | DATE |
| String 與 format: time | TIME |
| String 與 encoding: base64 | ARRAY[TINYINT] |
| Null | NULL (尚不支持) |
?
可能用到的kafka操作
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 無 ?
|
| 往flink-test-0這個 topic發送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic flink-test-0 | 這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致 [2]中的報錯還可能是某個節點的kafka掛掉導致的. ? 可能碰到[3] 注意關閉防火墻 ? ? |
| 使用kafka自帶消費端測試下消費 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic flink-test-0 | 如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了, 此時如果使用Flink SQL Client來消費也必然會出現問題 |
| 清除topic中所有數據[6](因為,萬一你輸錯了呢?對吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic flink-test-0 | 需要$KAFKA/config/server.properties設置 delete.topic.enable=true |
?
Reference:
[1]Flink Sql教程(6)
?
總結
以上是生活随笔為你收集整理的Flink SQL解析复杂Join(转载+自己整理和补充)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何实现文件自动归类?
- 下一篇: Flink SQL Client进行Ka