日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

發布時間:2023/11/28 生活经验 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

API

獲取環境

創建表

查詢表

Table API

SQL

???????寫出表

???????與DataSet/DataStream集成

???????TableAPI

???????SQLAPI


API

獲取環境

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment

?

// **********************// FLINK STREAMING QUERY// **********************import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import?org.apache.flink.table.api.EnvironmentSettings;import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;EnvironmentSettings?fsSettings?=?EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment?fsEnv?=?StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment?fsTableEnv?=?StreamTableEnvironment.create(fsEnv,?fsSettings);// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);// ******************// FLINK BATCH QUERY// ******************import?org.apache.flink.api.java.ExecutionEnvironment;import?org.apache.flink.table.api.bridge.java.BatchTableEnvironment;ExecutionEnvironment?fbEnv?=?ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment?fbTableEnv?=?BatchTableEnvironment.create(fbEnv);// **********************// BLINK STREAMING QUERY// **********************import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import?org.apache.flink.table.api.EnvironmentSettings;import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment?bsEnv?=?StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings?bsSettings?=?EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment?bsTableEnv?=?StreamTableEnvironment.create(bsEnv,?bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// ******************// BLINK BATCH QUERY// ******************import?org.apache.flink.table.api.EnvironmentSettings;import?org.apache.flink.table.api.TableEnvironment;EnvironmentSettings?bbSettings?=?EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment?bbTableEnv?=?TableEnvironment.create(bbSettings);

?

???????創建表

// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// table is the result of a simple projection queryTable?projTable?=?tableEnv.from("X").select(...);// register the Table projTable as table "projectedTable"tableEnv.createTemporaryView("projectedTable",?projTable);
tableEnvironment.connect(...).withFormat(...).withSchema(...).inAppendMode().createTemporaryTable("MyTable")

?

???????查詢表

Table API

// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// register Orders table// scan registered Orders tableTable?orders?=?tableEnv.from("Orders");// compute revenue for all customers from FranceTable?revenue?=?orders.filter($("cCountry").isEqual("FRANCE")).groupBy($("cID"),?$("cName").select($("cID"),?$("cName"),?$("revenue").sum().as("revSum"));// emit or convert Table// execute query

SQL

// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// register Orders table// compute revenue for all customers from FranceTable?revenue?=?tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum "?+"FROM Orders "?+"WHERE cCountry = 'FRANCE' "?+"GROUP BY cID, cName");// emit or convert Table// execute query

// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// register "Orders" table// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"tableEnv.executeSql("INSERT INTO RevenueFrance "?+"SELECT cID, cName, SUM(revenue) AS revSum "?+"FROM Orders "?+"WHERE cCountry = 'FRANCE' "?+"GROUP BY cID, cName");

?

???????寫出表

// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// create an output Tablefinal?Schema?schema?=?new?Schema().field("a",?DataTypes.INT()).field("b",?DataTypes.STRING()).field("c",?DataTypes.BIGINT());tableEnv.connect(new?FileSystem().path("/path/to/file")).withFormat(new?Csv().fieldDelimiter('|').deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable");// compute a result Table using Table API operators and/or SQL queriesTable?result?=?...// emit the result Table to the registered TableSinkresult.executeInsert("CsvSinkTable");

?

???????與DataSet/DataStream集成

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api

?

?

  • Create a View from a DataStream or DataSet

// get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalentStreamTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" sectionDataStream<Tuple2<Long,?String>>?stream?=?...// register the DataStream as View "myTable" with fields "f0", "f1"tableEnv.createTemporaryView("myTable",?stream);// register the DataStream as View "myTable2" with fields "myLong", "myString"tableEnv.createTemporaryView("myTable2",?stream,?$("myLong"),?$("myString"));
  • Convert a DataStream or DataSet into a Table
    
    // get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalentStreamTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" sectionDataStream<Tuple2<Long,?String>>?stream?=?...// Convert the DataStream into a Table with default fields "f0", "f1"Table?table1?=?tableEnv.fromDataStream(stream);// Convert the DataStream into a Table with fields "myLong", "myString"Table?table2?=?tableEnv.fromDataStream(stream,?$("myLong"),?$("myString"));

    ?

  • Convert a Table into a DataStream or DataSet

Convert a Table into a DataStream

Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.

追加模式:只有當動態表僅通過插入更改進行修改時,才能使用此模式,即,它是僅追加模式,并且以前發出的結果從不更新。

Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.

撤回模式:此模式始終可用。它使用布爾標志對插入和刪除更改進行編碼。

// get StreamTableEnvironment.StreamTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// Table with two fields (String name, Integer age)Table?table?=?...// convert the Table into an append DataStream of Row by specifying the classDataStream<Row>?dsRow?=?tableEnv.toAppendStream(table,?Row.class);// convert the Table into an append DataStream of Tuple2<String, Integer>// ??via a TypeInformationTupleTypeInfo<Tuple2<String,?Integer>>?tupleType?=?new?TupleTypeInfo<>(Types.STRING(),Types.INT());DataStream<Tuple2<String,?Integer>>?dsTuple?=?tableEnv.toAppendStream(table,?tupleType);// convert the Table into a retract DataStream of Row.// ??A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.// ??The boolean field indicates the type of the change.// ??True is INSERT, false is DELETE.DataStream<Tuple2<Boolean,?Row>>?retractStream?=?tableEnv.toRetractStream(table,?Row.class);

Convert a Table into a DataSet

// get BatchTableEnvironmentBatchTableEnvironment?tableEnv?=?BatchTableEnvironment.create(env);// Table with two fields (String name, Integer age)Table?table?=?...// convert the Table into a DataSet of Row by specifying a classDataSet<Row>?dsRow?=?tableEnv.toDataSet(table,?Row.class);// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String,?Integer>>?tupleType?=?new?TupleTypeInfo<>(Types.STRING(),Types.INT());DataSet<Tuple2<String,?Integer>>?dsTuple?=?tableEnv.toDataSet(table,?tupleType);

?

???????TableAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

?

???????SQLAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/

總結

以上是生活随笔為你收集整理的2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。