学习笔记Flink(四)—— Flink基础API及核心数据结构
一、Flink基礎API-Flink編程的基本概念
1.1、Flink程序
-
Flink 程序是實現了分布式集合轉換(例如過濾、映射、更新狀態、join、分組、定義窗口、聚合)的規范化程序。
-
集合初始創建自 source(例如讀取文件、kafka 主題,或本地內存中的集合)。
-
結果通過 sink 返回,例如,它可以將數據寫入(分布式)文件,或標準輸出(例如命令行終端)。
-
Flink 程序可以在多種環境中運行,獨立運行或嵌入到其他程序中。可以在本地 JVM 中執行,也可以在多臺機器的集群上執行
-
針對有界和無界兩種數據 source 類型,可以使用 DataSet API 來編寫批處理程序或使用 DataStream API 來編寫流處理程序。
-
對于流處理,使用 StreamingExecutionEnvironment 和 DataStream API。
-
對于批處理,將他們替換為 ExecutionEnvironment 和 DataSet API 即可,概念是完全相同的。
1.2、DataSet和DataStream
-
Flink 用特有的 DataSet 和 DataStream 類來表示程序中的數據。
-
可以將他們視為可能包含重復項的不可變數據集合。對于 DataSet,數據是有限的,而對于 DataStream,元素的數量可以是無限的。
-
這些集合與標準的 Java 集合有一些關鍵的區別。首先它們是不可變的,也就是說它們一旦被創你就不能添加或刪除元素了,同時也不能簡單地檢查它們內部的元素。
-
在 Flink 程序中,集合最初通過添加數據 source 來創建,通過使用諸如 map、filter 等 API 方法對數據 source 進行轉換從而派生新的集合。
1.3、Flink程序構成
1.3.1、獲取執行環境
- Scala DataSet API : org.apache.flink.api.scala ;
- Scala DataStream API : org.apache.flink.streaming.api.scala ;
- StreamExecutionEnvironment 是所有 Flink 程序的基礎,可以使用它的這些靜態方法獲取:
- 一般只需要使用getExecutionEnvironment(),它會根據上下文環境完成正確的工作。
- 例如,在IDE中執行程序或者作為標準的 Java 程序來執行,它會創建本機執行環境。
- 如果將程序封裝成 JAR 包,然后通過命令行調用,Flink 集群管理器會執行你的 main 方法并且 getExecutionEnvironment() 會返回在集群上執行程序的執行環境。
1.3.2、加載數據集
- 針對不同的數據 source,執行環境有若干不同的讀取文件的方法:你可以逐行讀取 CSV 文件,或者使用完全自定義的輸入格式。要將文本文件作為一系列行讀取,你可以使用:
1.3.3、編寫轉換操作
-
通過2.的操作會得到一個DataStream數據流,然后對其應用轉換操作就可以創建新的派生 DataStream。
-
通過調用 DataStream 的轉換函數來進行轉換。如下是一個映射轉換的實例:
-
通過把原始數據集合的每個字符串轉換為一個整數,從而創建出一個新的 DataStream。
1.3.4、指定計算結果存放位置& 觸發程序執行
- 一旦得到了包含最終結果的 DataStream,就可以通過創建 sink 將其寫入外部系統。
- 例如,下面是一些創建 sink 的示例:
- 當設定好整個程序以后只需要調用 StreamExecutionEnvironment 的 execute() 方法觸發程序執行。execute() 方法返回 JobExecutionResult,它包括執行耗時和一個累加器的結果。
- 如果不需要等待作業的結束,只是想要觸發程序執行,你可以調用 StreamExecutionEnvironment 的 executeAsync() 方法。這個方法將返回一個 JobClient 對象,通過 JobClient 能夠與程序對應的作業進行交互。
1.4、延遲計算
-
無論在本地還是集群執行,所有的 Flink 程序都是延遲執行的:
-
當程序的 main 方法被執行時,并不立即執行數據的加載和轉換,而是創建每個操作并將其加入到程序的執行計劃中。
-
當執行環境調用 execute() 方法顯式地觸發執行的時候才真正執行各個操作。
-
延遲計算允許你構建復雜的程序,Flink 將其作為整體計劃單元來執行。
1.5、指定鍵、值
- 一些轉換操作(join, coGroup, keyBy, groupBy)要求在元素集合上定義鍵。
- 另外一些轉換操作 (Reduce, GroupReduce, Aggregate, Windows)允許在應用這些轉換之前將數據按鍵分組。
- Flink 的數據模型不是基于鍵值對的。因此不需要將數據集類型物理地打包到鍵和值中。
- 鍵都是“虛擬的”:它們的功能是指導分組算子用哪些數據來分組。
1.6、為Tuple定義鍵
- 最簡單的方式是按照 Tuple 的一個或多個字段進行分組:
- 按照第一個、第二個字段組合來進行分組
- 使用字段表達式來定義鍵
1.7、指定轉換函數
- 匿名函數
- 富函數
二、Flink基礎API-支持的數據類型
- Java Tuple 和 Scala Case Class
Flink 將滿足如下條件的 Java 和 Scala 的類作為特殊的 POJO 數據類型處理
-
類必須是公有的。
-
它必須有一個公有的無參構造器(默認構造器)。
-
所有的字段要么是公有的要么必須可以通過 getter 和 setter 函數訪問。例如一個名為 foo 的字段,它的 getter 和 setter 方法必須命名為 getFoo() 和 setFoo()。
-
字段的類型必須被已注冊的序列化程序所支持。
-
基本數據類型
Flink 支持所有 Java 和 Scala 的基本數據類型如 Integer、 String、和 Double。 -
常規的類
Flink 支持大部分 Java 和 Scala 的類(API 和自定義)。 除了包含無法序列化的字段的類,如文件指針,I / O流或其他本地資源。 -
值類型
值類型手工描述其序列化和反序列化。它們不是通過通用序列化框架,而是通過實現 org.apache.flinktypes.Value 接口的 read 和 write 方法來為這些操作提供自定義編碼。當通用序列化效率非常低時,使用值類型是合理的。 -
Hadoop Writable
可以使用實現了 org.apache.hadoop.Writable 接口的類型。它們會使用 write() 和 readFields() 方法中定義的序列化邏輯。 -
特殊類型
可以使用特殊類型,包括 Scala 的 Either、Option 和 Try。 Java API 有對 Either 的自定義實現。 類似于 Scala 的 Either,它表示一個具有 Left 或 Right 兩種可能類型的值。 Either 可用于錯誤處理或需要輸出兩種不同類型記錄的算子。
三、DataStream API
3.1、數據源
一般情況下通過StreamExecutionEnvironment.addSource(sourceFunction) 就可以添加數據源。
文件類型數據源:
-
readTextFile(path) : 使用TextInputFormat 按行讀取文本,每行返回一個字符串。
-
readFile(fileInputFormat, path) : 通過自定義fileInputFormat 來讀取數據;
-
readFile(fileInputFormat , path , watchType , interval , pathFilter) : watchType : 新文件數據;
Socket 類型數據源:
- socketTextStream : 通過socket讀取數據,可以通過設置分隔符來區分每個數據。
Collection 類型數據源:
- fromCollection(Seq)
- fromCollection(Iterator)
- fromElements(elements:_*)
- fromParallelCollection(SplittableIterator)
- generateSequence(from, to)
3.2、轉換操作
- 值 -> 值
- (鍵,值) -> (鍵,值)
3.3、輸出源
- writeAsText() / TextOutputFormat : 按行寫入數據
- writeAsCsv() : 把元組數據通過都好進行分割寫入文件
- print() / printToErr() :把數據通過標準輸出或異常輸出進行打印
- writeUsingOutputFormat() / FileOutputFormat : 自定義 輸出文件類型;
- writeToSocket : 通過socket 進行數據輸出;
- addSink : 調用自定義輸出源
四、DataSet API
4.1、數據源
4.2、輸出源
五、Table API
Table API 分為Java, Scala, Python三種,Scala API需要導入
org.apache.flink.api.scala._ 和org.apache.flink.table.api.scala._,并且Scala中字段需要使用特殊字符(’)來進行表示。
下面的例子會對Order表進行掃描,過濾null值,把a列變小寫,然后針對每個小時對a進行分組,求b的平均值。
六、SQL API
啟動SQL client:
./bin/sql-client.sh embedded
測試1:
可以看到
設置table模式:
示例:
看到
設置changelog模式
示例:
看到
總結
以上是生活随笔為你收集整理的学习笔记Flink(四)—— Flink基础API及核心数据结构的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学习笔记Flink(三)—— Flink
- 下一篇: 学习笔记Flink(五)—— Flink