日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

学习笔记Flink(四)—— Flink基础API及核心数据结构

發布時間:2025/3/21 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 学习笔记Flink(四)—— Flink基础API及核心数据结构 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、Flink基礎API-Flink編程的基本概念

1.1、Flink程序

  • Flink 程序是實現了分布式集合轉換(例如過濾、映射、更新狀態、join、分組、定義窗口、聚合)的規范化程序。

  • 集合初始創建自 source(例如讀取文件、kafka 主題,或本地內存中的集合)。

  • 結果通過 sink 返回,例如,它可以將數據寫入(分布式)文件,或標準輸出(例如命令行終端)。

  • Flink 程序可以在多種環境中運行,獨立運行或嵌入到其他程序中。可以在本地 JVM 中執行,也可以在多臺機器的集群上執行

  • 針對有界和無界兩種數據 source 類型,可以使用 DataSet API 來編寫批處理程序或使用 DataStream API 來編寫流處理程序。

  • 對于流處理,使用 StreamingExecutionEnvironment 和 DataStream API。

  • 對于批處理,將他們替換為 ExecutionEnvironment 和 DataSet API 即可,概念是完全相同的。


1.2、DataSet和DataStream

  • Flink 用特有的 DataSet 和 DataStream 類來表示程序中的數據。

  • 可以將他們視為可能包含重復項的不可變數據集合。對于 DataSet,數據是有限的,而對于 DataStream,元素的數量可以是無限的。

  • 這些集合與標準的 Java 集合有一些關鍵的區別。首先它們是不可變的,也就是說它們一旦被創你就不能添加或刪除元素了,同時也不能簡單地檢查它們內部的元素。

  • 在 Flink 程序中,集合最初通過添加數據 source 來創建,通過使用諸如 map、filter 等 API 方法對數據 source 進行轉換從而派生新的集合。


1.3、Flink程序構成

  • 獲取執行環境
  • 加載/創建初始數據;
  • 編寫對數據的轉換操作;
  • 指定計算結果存放的位置;
  • 觸發程序執行;
  • 1.3.1、獲取執行環境

    • Scala DataSet API : org.apache.flink.api.scala ;
    • Scala DataStream API : org.apache.flink.streaming.api.scala ;
    • StreamExecutionEnvironment 是所有 Flink 程序的基礎,可以使用它的這些靜態方法獲取:
    • 一般只需要使用getExecutionEnvironment(),它會根據上下文環境完成正確的工作。
    • 例如,在IDE中執行程序或者作為標準的 Java 程序來執行,它會創建本機執行環境。
    • 如果將程序封裝成 JAR 包,然后通過命令行調用,Flink 集群管理器會執行你的 main 方法并且 getExecutionEnvironment() 會返回在集群上執行程序的執行環境。

    1.3.2、加載數據集

    • 針對不同的數據 source,執行環境有若干不同的讀取文件的方法:你可以逐行讀取 CSV 文件,或者使用完全自定義的輸入格式。要將文本文件作為一系列行讀取,你可以使用:

    1.3.3、編寫轉換操作

    • 通過2.的操作會得到一個DataStream數據流,然后對其應用轉換操作就可以創建新的派生 DataStream。

    • 通過調用 DataStream 的轉換函數來進行轉換。如下是一個映射轉換的實例:

    • 通過把原始數據集合的每個字符串轉換為一個整數,從而創建出一個新的 DataStream。


    1.3.4、指定計算結果存放位置& 觸發程序執行

    • 一旦得到了包含最終結果的 DataStream,就可以通過創建 sink 將其寫入外部系統。
    • 例如,下面是一些創建 sink 的示例:
    • 當設定好整個程序以后只需要調用 StreamExecutionEnvironment 的 execute() 方法觸發程序執行。execute() 方法返回 JobExecutionResult,它包括執行耗時和一個累加器的結果。
    • 如果不需要等待作業的結束,只是想要觸發程序執行,你可以調用 StreamExecutionEnvironment 的 executeAsync() 方法。這個方法將返回一個 JobClient 對象,通過 JobClient 能夠與程序對應的作業進行交互。

    1.4、延遲計算

    • 無論在本地還是集群執行,所有的 Flink 程序都是延遲執行的:

    • 當程序的 main 方法被執行時,并不立即執行數據的加載和轉換,而是創建每個操作并將其加入到程序的執行計劃中。

    • 當執行環境調用 execute() 方法顯式地觸發執行的時候才真正執行各個操作。

    • 延遲計算允許你構建復雜的程序,Flink 將其作為整體計劃單元來執行。


    1.5、指定鍵、值

    • 一些轉換操作(join, coGroup, keyBy, groupBy)要求在元素集合上定義鍵。
    • 另外一些轉換操作 (Reduce, GroupReduce, Aggregate, Windows)允許在應用這些轉換之前將數據按鍵分組。
    • Flink 的數據模型不是基于鍵值對的。因此不需要將數據集類型物理地打包到鍵和值中。
    • 鍵都是“虛擬的”:它們的功能是指導分組算子用哪些數據來分組。

    1.6、為Tuple定義鍵

    • 最簡單的方式是按照 Tuple 的一個或多個字段進行分組:
    • 按照第一個、第二個字段組合來進行分組
    • 使用字段表達式來定義鍵

    1.7、指定轉換函數

    • 匿名函數
    • 富函數

    二、Flink基礎API-支持的數據類型

    • Java Tuple 和 Scala Case Class

    Flink 將滿足如下條件的 Java 和 Scala 的類作為特殊的 POJO 數據類型處理

    • 類必須是公有的。

    • 它必須有一個公有的無參構造器(默認構造器)。

    • 所有的字段要么是公有的要么必須可以通過 getter 和 setter 函數訪問。例如一個名為 foo 的字段,它的 getter 和 setter 方法必須命名為 getFoo() 和 setFoo()。

    • 字段的類型必須被已注冊的序列化程序所支持。

    • 基本數據類型
      Flink 支持所有 Java 和 Scala 的基本數據類型如 Integer、 String、和 Double。

    • 常規的類
      Flink 支持大部分 Java 和 Scala 的類(API 和自定義)。 除了包含無法序列化的字段的類,如文件指針,I / O流或其他本地資源。

    • 值類型
      值類型手工描述其序列化和反序列化。它們不是通過通用序列化框架,而是通過實現 org.apache.flinktypes.Value 接口的 read 和 write 方法來為這些操作提供自定義編碼。當通用序列化效率非常低時,使用值類型是合理的。

    • Hadoop Writable
      可以使用實現了 org.apache.hadoop.Writable 接口的類型。它們會使用 write() 和 readFields() 方法中定義的序列化邏輯。

    • 特殊類型
      可以使用特殊類型,包括 Scala 的 Either、Option 和 Try。 Java API 有對 Either 的自定義實現。 類似于 Scala 的 Either,它表示一個具有 Left 或 Right 兩種可能類型的值。 Either 可用于錯誤處理或需要輸出兩種不同類型記錄的算子。


    三、DataStream API

    3.1、數據源

    一般情況下通過StreamExecutionEnvironment.addSource(sourceFunction) 就可以添加數據源。

    文件類型數據源

    • readTextFile(path) : 使用TextInputFormat 按行讀取文本,每行返回一個字符串。

    • readFile(fileInputFormat, path) : 通過自定義fileInputFormat 來讀取數據;

    • readFile(fileInputFormat , path , watchType , interval , pathFilter) : watchType : 新文件數據;

    Socket 類型數據源

    • socketTextStream : 通過socket讀取數據,可以通過設置分隔符來區分每個數據。

    Collection 類型數據源

    • fromCollection(Seq)
    • fromCollection(Iterator)
    • fromElements(elements:_*)
    • fromParallelCollection(SplittableIterator)
    • generateSequence(from, to)

    3.2、轉換操作

    • 值 -> 值
    • (鍵,值) -> (鍵,值)

    3.3、輸出源

    • writeAsText() / TextOutputFormat : 按行寫入數據
    • writeAsCsv() : 把元組數據通過都好進行分割寫入文件
    • print() / printToErr() :把數據通過標準輸出或異常輸出進行打印
    • writeUsingOutputFormat() / FileOutputFormat : 自定義 輸出文件類型;
    • writeToSocket : 通過socket 進行數據輸出;
    • addSink : 調用自定義輸出源

    四、DataSet API

    4.1、數據源


    4.2、輸出源


    五、Table API

    Table API 分為Java, Scala, Python三種,Scala API需要導入
    org.apache.flink.api.scala._ 和org.apache.flink.table.api.scala._,并且Scala中字段需要使用特殊字符(’)來進行表示。

    下面的例子會對Order表進行掃描,過濾null值,把a列變小寫,然后針對每個小時對a進行分組,求b的平均值。


    六、SQL API

    啟動SQL client:

    ./bin/sql-client.sh embedded


    測試1

    可以看到

    設置table模式

    SET execution.result-mode=table;

    示例:

    看到

    設置changelog模式

    SET execution.result-mode=changelog;

    示例:

    看到

    總結

    以上是生活随笔為你收集整理的学习笔记Flink(四)—— Flink基础API及核心数据结构的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。