日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink的table/sql api的多种写法汇总

發布時間:2023/12/31 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink的table/sql api的多种写法汇总 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這個記載是為了方便轉化網絡中各種資料的寫法,

所以每個階段都收集了各種寫法.

并且用代碼進行了運行驗證.

?

DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
? ? ? ? ? ? new OrderStream(1L, "beer", 3, Timestamp.valueOf("2017-09-16 10:30:00")),
? ? ? ? ? ? new OrderStream(3L, "rubber", 2,Timestamp.valueOf("2017-09-16 10:10:00")),
? ? ? ? ? ? new OrderStream(1L, "diaper", 4,Timestamp.valueOf("2017-09-16 10:20:00"))
? ? ));

?

步驟寫法
初始化環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
建立表格

下面4種寫法大多數情況下完全等效(任選1種即可)

①tEnv.registerDataStream("orderA", orderA, "user,product,amount");

?

Deprecated寫法

tEnv.createTemporaryView("orderA", orderA, "user,product,amount");

?

最新寫法,②的替代寫法

tEnv.createTemporaryView("orderA", orderA, $("user,product,amount"));


Table tableA = tEnv.fromDataStream(orderA, "user,product,amount");
tEnv.registerTable("orderA", tableA);

?

⑤當后續有使用as函數的需要時,必須采用下面這種寫法,此時禁止采用③的寫法

tEnv.createTemporaryView("Orders", orderA, $("user"), $("product"), $("amount"));

注冊UDF

下面3種寫法完全等效(任選1種即可)

①tEnv.registerFunction("hashcode", new HashCode(10));


②tEnv.createTemporaryFunction("hashcode", new HashCode(10));

?

③tEnv.createTemporarySystemFunction("hashcode", new HashCode(10));

?

②③在概念上肯定會有差別,但是一般應用中,效果沒區別.

SQL查詢與輸出

下面4種寫法完全等效(任選1種即可)

①需要注冊UDF 的情況下

Table result = tEnv.from("orderA").select("product,hashcode(product)");
tEnv.toAppendStream(result, Row.class).print();
env.execute();

?

②需要注冊UDF 的情況下

Table result=tEnv.from("orderA").select($("product"),call("hashcode","product"));
tEnv.toAppendStream(result, Row.class).print();
env.execute();

?

③需要注冊UDF 的情況下

Table result=tEnv.sqlQuery("select product,hashcode(product)?from orderA");
tEnv.toAppendStream(result,Row.class).print();
env.execute();

?

④需要注冊UDF 的情況下

tableA.select($("product"),call("hashcode","product")).execute().print();


?

?

不注冊UDF情況下,直接使用UDF

Table result=tEnv.from("orderA").select($("product"),call(HashCode.class,"product"));
tEnv.toAppendStream(result, Row.class).print();
env.execute();

?

?

?

?

把上述步驟中,每個步驟任意抽取一個,拼接起來,

就能構成完整的Flink SQL程序.

?

Reference:

[1]Flink基礎(二十):Table API 和 Flink SQL(五)函數

總結

以上是生活随笔為你收集整理的flink的table/sql api的多种写法汇总的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。