2021年大数据Flink(三十二):Table与SQL案例准备 API
目錄
API
獲取環境
創建表
查詢表
Table API
SQL
???????寫出表
???????與DataSet/DataStream集成
???????TableAPI
???????SQLAPI
API
獲取環境
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment
?
// **********************// FLINK STREAMING QUERY// **********************import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import?org.apache.flink.table.api.EnvironmentSettings;import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;EnvironmentSettings?fsSettings?=?EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment?fsEnv?=?StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment?fsTableEnv?=?StreamTableEnvironment.create(fsEnv,?fsSettings);// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);// ******************// FLINK BATCH QUERY// ******************import?org.apache.flink.api.java.ExecutionEnvironment;import?org.apache.flink.table.api.bridge.java.BatchTableEnvironment;ExecutionEnvironment?fbEnv?=?ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment?fbTableEnv?=?BatchTableEnvironment.create(fbEnv);// **********************// BLINK STREAMING QUERY// **********************import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import?org.apache.flink.table.api.EnvironmentSettings;import?org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment?bsEnv?=?StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings?bsSettings?=?EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment?bsTableEnv?=?StreamTableEnvironment.create(bsEnv,?bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// ******************// BLINK BATCH QUERY// ******************import?org.apache.flink.table.api.EnvironmentSettings;import?org.apache.flink.table.api.TableEnvironment;EnvironmentSettings?bbSettings?=?EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment?bbTableEnv?=?TableEnvironment.create(bbSettings);
?
???????創建表
// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// table is the result of a simple projection queryTable?projTable?=?tableEnv.from("X").select(...);// register the Table projTable as table "projectedTable"tableEnv.createTemporaryView("projectedTable",?projTable);
tableEnvironment.connect(...).withFormat(...).withSchema(...).inAppendMode().createTemporaryTable("MyTable")
?
???????查詢表
Table API
// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// register Orders table// scan registered Orders tableTable?orders?=?tableEnv.from("Orders");// compute revenue for all customers from FranceTable?revenue?=?orders.filter($("cCountry").isEqual("FRANCE")).groupBy($("cID"),?$("cName").select($("cID"),?$("cName"),?$("revenue").sum().as("revSum"));// emit or convert Table// execute query
SQL
// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// register Orders table// compute revenue for all customers from FranceTable?revenue?=?tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum "?+"FROM Orders "?+"WHERE cCountry = 'FRANCE' "?+"GROUP BY cID, cName");// emit or convert Table// execute query
// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// register "Orders" table// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"tableEnv.executeSql("INSERT INTO RevenueFrance "?+"SELECT cID, cName, SUM(revenue) AS revSum "?+"FROM Orders "?+"WHERE cCountry = 'FRANCE' "?+"GROUP BY cID, cName");
?
???????寫出表
// get a TableEnvironmentTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// create an output Tablefinal?Schema?schema?=?new?Schema().field("a",?DataTypes.INT()).field("b",?DataTypes.STRING()).field("c",?DataTypes.BIGINT());tableEnv.connect(new?FileSystem().path("/path/to/file")).withFormat(new?Csv().fieldDelimiter('|').deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable");// compute a result Table using Table API operators and/or SQL queriesTable?result?=?...// emit the result Table to the registered TableSinkresult.executeInsert("CsvSinkTable");
?
???????與DataSet/DataStream集成
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
?
?
- Create a View from a DataStream or DataSet
// get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalentStreamTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" sectionDataStream<Tuple2<Long,?String>>?stream?=?...// register the DataStream as View "myTable" with fields "f0", "f1"tableEnv.createTemporaryView("myTable",?stream);// register the DataStream as View "myTable2" with fields "myLong", "myString"tableEnv.createTemporaryView("myTable2",?stream,?$("myLong"),?$("myString"));
- Convert a DataStream or DataSet into a Table
// get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalentStreamTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" sectionDataStream<Tuple2<Long,?String>>?stream?=?...// Convert the DataStream into a Table with default fields "f0", "f1"Table?table1?=?tableEnv.fromDataStream(stream);// Convert the DataStream into a Table with fields "myLong", "myString"Table?table2?=?tableEnv.fromDataStream(stream,?$("myLong"),?$("myString"));?
- Convert a Table into a DataStream or DataSet
Convert a Table into a DataStream
Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.
追加模式:只有當動態表僅通過插入更改進行修改時,才能使用此模式,即,它是僅追加模式,并且以前發出的結果從不更新。
Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.
撤回模式:此模式始終可用。它使用布爾標志對插入和刪除更改進行編碼。
// get StreamTableEnvironment.StreamTableEnvironment?tableEnv?=?...;?// see "Create a TableEnvironment" section// Table with two fields (String name, Integer age)Table?table?=?...// convert the Table into an append DataStream of Row by specifying the classDataStream<Row>?dsRow?=?tableEnv.toAppendStream(table,?Row.class);// convert the Table into an append DataStream of Tuple2<String, Integer>// ??via a TypeInformationTupleTypeInfo<Tuple2<String,?Integer>>?tupleType?=?new?TupleTypeInfo<>(Types.STRING(),Types.INT());DataStream<Tuple2<String,?Integer>>?dsTuple?=?tableEnv.toAppendStream(table,?tupleType);// convert the Table into a retract DataStream of Row.// ??A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.// ??The boolean field indicates the type of the change.// ??True is INSERT, false is DELETE.DataStream<Tuple2<Boolean,?Row>>?retractStream?=?tableEnv.toRetractStream(table,?Row.class);
Convert a Table into a DataSet
// get BatchTableEnvironmentBatchTableEnvironment?tableEnv?=?BatchTableEnvironment.create(env);// Table with two fields (String name, Integer age)Table?table?=?...// convert the Table into a DataSet of Row by specifying a classDataSet<Row>?dsRow?=?tableEnv.toDataSet(table,?Row.class);// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String,?Integer>>?tupleType?=?new?TupleTypeInfo<>(Types.STRING(),Types.INT());DataSet<Tuple2<String,?Integer>>?dsTuple?=?tableEnv.toDataSet(table,?tupleType);
?
???????TableAPI
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
?
???????SQLAPI
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/
總結
以上是生活随笔為你收集整理的2021年大数据Flink(三十二):Table与SQL案例准备 API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(三十一):
- 下一篇: 2021年大数据Flink(三十三):