日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink SQL解析复杂Join(转载+自己整理和补充)

發布時間:2023/12/31 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink SQL解析复杂Join(转载+自己整理和补充) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?概述

$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic zeppelin_01_test

本文重點復現[1]

目的是為了解析下面的這種比較復雜的JSON(這樣就可以省去寫復雜的UDF啦,嘻嘻)

{"afterColumns":{"created":"1589186680","extra":{"canGiving":false},"parameter":[1,2,3,4]},"beforeColumns":null,"tableVersion":{"binlogFile":null,"binlogPosition":0,"version":0},"touchTime":1589186680591 }

?

開發環境與準備工作

組件版本
Flink(HA)1.12
Zookeeper3.6.0
Hadoop3.1.2
Hbase(HA)2.2.4
Kafka(HA)2.5.0
Mysql8.0.22-0ubuntu0.20.04.2 (Ubuntu)

?

關閉防火墻,啟動上述所有集群

######################################################################################################

①$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic json_parse

{"afterColumns":{"created":"1589186680","extra":{"canGiving":false},"parameter":[1,2,3,4] },"beforeColumns":null,"tableVersion":{"binlogFile":null, "binlogPosition":0, "version":0},"touchTime":1589186680591}

輸入后按下回車

$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic json_parse

消費該topic,確保kafka已經包含了該數據

②啟動Flink SQL Client

輸入DDL,以下面為準:

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/json_parse.sql

select * from t1;

Flink SQL Client查詢語句執行效果
select * from t1;
select afterColumns.extra.canGiving from t1;
select afterColumns.`parameter`[1] from t1;

############################################################################################################

總結

Json中的每個{}都需要用Row來包含

Json中的每個[]都需要用Arrary來包含

############################################################################################################

附錄

JSON數據類型和FLINK SQL數據類型的映射關系

JSON模式Flink SQL
ObjectROW
BooleanBOOLEAN
ArrayARRAY[_]
NumberDECIMAL
IntegerDECIMAL
StringSTRING
String 與 format: date-timeTIMESTAMP
String 與 format: dateDATE
String 與 format: timeTIME
String 與 encoding: base64ARRAY[TINYINT]
NullNULL (尚不支持)

?

可能用到的kafka操作

操作命令備注
查看topic$KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

?


?

flink-test-0這個 topic發送 json 消息

$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic flink-test-0

這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致

[2]中的報錯還可能是某個節點的kafka掛掉導致的.

?

可能碰到[3]

注意關閉防火墻

?

?

使用kafka自帶消費端測試下消費$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic flink-test-0

如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了,

此時如果使用Flink SQL Client來消費也必然會出現問題

清除topic中所有數據[6](因為,萬一你輸錯了呢?對吧)$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic flink-test-0

需要$KAFKA/config/server.properties設置

delete.topic.enable=true

?

Reference:

[1]Flink Sql教程(6)

?

總結

以上是生活随笔為你收集整理的Flink SQL解析复杂Join(转载+自己整理和补充)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。