日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Apache Flink 零基础入门(十八)Flink Table APISQL

發布時間:2024/9/16 数据库 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 零基础入门(十八)Flink Table APISQL 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

什么是Flink關系型API?

雖然Flink已經支持了DataSet和DataStream API,但是有沒有一種更好的方式去編程,而不用關心具體的API實現?不需要去了解Java和Scala的具體實現。

Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.

Flink提供了三層API,每一層API提供了一個在簡潔性和表達力之間的權衡?。

最低層是一個有狀態的事件驅動。在這一層進行開發是非常麻煩的。

雖然很多功能基于DataSet和DataStreamAPI是可以完成的,需要熟悉這兩套API,而且必須要熟悉Java和Scala,這是有一定的難度的。一個框架如果在使用的過程中沒法使用SQL來處理,那么這個框架就有很大的限制。雖然對于開發人員無所謂,但是對于用戶來說卻不顯示。因此SQL是非常面向大眾語言。

好比MapReduce使用Hive SQL,Spark使用Spark SQL,Flink使用Flink SQL。

雖然Flink支持批處理/流處理,那么如何做到API層面的統一?

這樣Table和SQL應運而生。

這其實就是一個關系型API,操作起來如同操作Mysql一樣簡單。

Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way.?

Apache Flink通過使用Table API和SQL 兩大特性,來統一批處理和流處理。?Table API是一個查詢API,集成了Scala和Java語言,并且允許使用select filter join等操作。

使用Table SQL API需要額外依賴

java:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency>

scala:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>${flink.version}</version></dependency>

使用Table SQL API編程

首先導入上面的依賴,然后讀取sales.csv文件,文件內容如下:

transactionId,customerId,itemId,amountPaid 111,1,1,100.0 112,2,2,505.0 113,1,3,510.0 114,2,4,600.0 115,3,2,500.0 116,4,2,500.0 117,1,2,500.0 118,1,2,500.0 119,1,3,500.0 120,1,2,500.0 121,2,4,500.0 122,1,2,500.0 123,1,4,500.0 124,1,2,500.0

Scala

object TableSQLAPI {def main(args: Array[String]): Unit = {val bEnv = ExecutionEnvironment.getExecutionEnvironmentval bTableEnv = BatchTableEnvironment.create(bEnv)val filePath="E:/test/sales.csv"// 已經拿到DataSetval csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)// DataSet => Table}case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double) }

首先拿到DataSet,接下來將DataSet轉為Table,然后就可以執行SQL了

// DataSet => Tableval salesTable = bTableEnv.fromDataSet(csv)// 注冊成Table Table => tablebTableEnv.registerTable("sales", salesTable)// sqlval resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")bTableEnv.toDataSet[Row](resultTable).print()

輸出結果如下:

4,500.0 3,500.0 1,4110.0 2,1605.0

這種方式只需要使用SQL就可以實現之前寫mapreduce的功能。大大方便了開發過程。

Java

package com.vincent.course06;import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.types.Row;public class JavaTableSQLAPI {public static void main(String[] args) throws Exception {ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);DataSource<Sales> salesDataSource = bEnv.readCsvFile("E:/test/sales.csv").ignoreFirstLine().pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid");Table sales = bTableEnv.fromDataSet(salesDataSource);bTableEnv.registerTable("sales", sales);Table resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");DataSet<Row> rowDataSet = bTableEnv.toDataSet(resultTable, Row.class);rowDataSet.print();}public static class Sales {public String transactionId;public String customerId;public String itemId;public Double amountPaid;@Overridepublic String toString() {return "Sales{" +"transactionId='" + transactionId + '\'' +", customerId='" + customerId + '\'' +", itemId='" + itemId + '\'' +", amountPaid=" + amountPaid +'}';}} }

?

總結

以上是生活随笔為你收集整理的Apache Flink 零基础入门(十八)Flink Table APISQL的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。