flink的table/sql api的多种写法汇总
這個記載是為了方便轉化網絡中各種資料的寫法,
所以每個階段都收集了各種寫法.
并且用代碼進行了運行驗證.
?
DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
? ? ? ? ? ? new OrderStream(1L, "beer", 3, Timestamp.valueOf("2017-09-16 10:30:00")),
? ? ? ? ? ? new OrderStream(3L, "rubber", 2,Timestamp.valueOf("2017-09-16 10:10:00")),
? ? ? ? ? ? new OrderStream(1L, "diaper", 4,Timestamp.valueOf("2017-09-16 10:20:00"))
? ? ));
?
| 步驟 | 寫法 |
| 初始化環境 | StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); |
| 建立表格 | 下面4種寫法大多數情況下完全等效(任選1種即可) ①tEnv.registerDataStream("orderA", orderA, "user,product,amount"); ? ②Deprecated寫法 tEnv.createTemporaryView("orderA", orderA, "user,product,amount"); ? ③ 最新寫法,②的替代寫法 tEnv.createTemporaryView("orderA", orderA, $("user,product,amount"));
Table tableA = tEnv.fromDataStream(orderA, "user,product,amount"); ? ⑤當后續有使用as函數的需要時,必須采用下面這種寫法,此時禁止采用③的寫法 tEnv.createTemporaryView("Orders", orderA, $("user"), $("product"), $("amount")); |
| 注冊UDF | 下面3種寫法完全等效(任選1種即可) ①tEnv.registerFunction("hashcode", new HashCode(10));
? ③tEnv.createTemporarySystemFunction("hashcode", new HashCode(10)); ? ②③在概念上肯定會有差別,但是一般應用中,效果沒區別. |
| SQL查詢與輸出 | 下面4種寫法完全等效(任選1種即可) ①需要注冊UDF 的情況下 Table result = tEnv.from("orderA").select("product,hashcode(product)"); ? ②需要注冊UDF 的情況下 Table result=tEnv.from("orderA").select($("product"),call("hashcode","product")); ? ③需要注冊UDF 的情況下 Table result=tEnv.sqlQuery("select product,hashcode(product)?from orderA"); ? ④需要注冊UDF 的情況下 tableA.select($("product"),call("hashcode","product")).execute().print();
? ⑤不注冊UDF情況下,直接使用UDF Table result=tEnv.from("orderA").select($("product"),call(HashCode.class,"product")); ? ? ? |
?
把上述步驟中,每個步驟任意抽取一個,拼接起來,
就能構成完整的Flink SQL程序.
?
Reference:
[1]Flink基礎(二十):Table API 和 Flink SQL(五)函數
總結
以上是生活随笔為你收集整理的flink的table/sql api的多种写法汇总的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Cannot resolve field
- 下一篇: createTemporaryView