Flink保姆级教程,超全五万字,学习与面试收藏这一篇就够了
本文目錄:
一、Flink簡介
二、Flink 部署及啟動
三、Flink 運行架構
四、Flink 算子大全
五、流處理中的 Time 與 Window
六、Flink 狀態管理
七、Flink 容錯
八、Flink SQL
九、Flink CEP
十、Flink CDC
十一、基于 Flink 構建全場景實時數倉
十二、Flink 大廠面試題
Flink 涉及的知識點如下圖所示,本文將逐一講解:
本文檔參考了?Flink 的官網及其他眾多資料整理而成,為了整潔的排版及舒適的閱讀,對于模糊不清晰的圖片及黑白圖片進行重新繪制成了高清彩圖。
本文超長,獲取本文完整PDF文檔,帶目錄超全總結,請掃碼關注公眾號【五分鐘學大數據】,后臺發送:flink pdf,即可下載帶目錄的完整版flink文檔:
正文開始:
一、Flink 簡介
1. Flink 發展
這幾年大數據的飛速發展,出現了很多熱門的開源社區,其中著名的有 Hadoop、Storm,以及后來的 Spark,他們都有著各自專注的應用場景。Spark 掀開了內存計算的先河,也以內存為賭注,贏得了內存計算的飛速發展。Spark 的火熱或多或少的掩蓋了其他分布式計算的系統身影。就像 Flink,也就在這個時候默默的發展著。
在國外一些社區,有很多人將大數據的計算引擎分成了 4 代,當然,也有很多人不會認同。我們先姑且這么認為和討論。
首先第一代的計算引擎,無疑就是 Hadoop 承載的 MapReduce。這里大家應該都不會對 MapReduce 陌生,它將計算分為兩個階段,分別為 Map 和 Reduce。對于上層應用來說,就不得不想方設法去拆分算法,甚至于不得不在上層應用實現多個 Job 的串聯,以完成一個完整的算法,例如迭代計算。
由于這樣的弊端,催生了支持 DAG 框架的產生。因此,支持 DAG 的框架被劃分為第二代計算引擎。如 Tez 以及更上層的 Oozie。這里我們不去細究各種 DAG 實現之間的區別,不過對于當時的 Tez 和 Oozie 來說,大多還是批處理的任務。
接下來就是以 Spark 為代表的第三代的計算引擎。第三代計算引擎的特點主要是 Job 內部的 DAG 支持(不跨越 Job),以及強調的實時計算。在這里,很多人也會認為第三代計算引擎也能夠很好的運行批處理的 Job。
隨著第三代計算引擎的出現,促進了上層應用快速發展,例如各種迭代計算的性能以及對流計算和 SQL 等的支持。Flink 的誕生就被歸在了第四代。這應該主要表現在 Flink 對流計算的支持,以及更一步的實時性上面。當然 Flink 也可以支持 Batch 的任務,以及 DAG 的運算。
總結:
第 1 代:Hadoop MapReduc 批處理 Mapper、Reducer 2;
第 2 代:DAG 框架(Oozie 、Tez),Tez + MapReduce 批處理 1 個 Tez = MR(1) + MR(2) + ... + MR(n) 相比 MR 效率有所提升;
第 3 代:Spark 批處理、流處理、SQL 高層 API 支持 自帶 DAG 內存迭代計算、性能較之前大幅提;
第 4 代:Flink 批處理、流處理、SQL 高層 API 支持 自帶 DAG 流式計算性能更高、可靠性更高。
2. 什么是 Flink
Flink 起源于 Stratosphere 項目,Stratosphere 是在 2010~2014 年由 3 所地處柏林的大學和歐洲的一些其他的大學共同進行的研究項目,2014 年 4 月 Stratosphere 的代碼被復制并捐贈給了 Apache 軟件基金會,參加這個孵化項目的初始成員是 Stratosphere 系統的核心開發人員,2014 年 12 月,Flink 一躍成為 Apache 軟件基金會的頂級項目。
在德語中,Flink 一詞表示快速和靈巧,項目采用一只松鼠的彩色圖案作為 logo,這不僅是因為松鼠具有快速和靈巧的特點,還因為柏林的松鼠有一種迷人的紅棕色,而 Flink 的松鼠 logo 擁有可愛的尾巴,尾巴的顏色與 Apache 軟件基金會的 logo 顏色相呼應,也就是說,這是一只 Apache 風格的松鼠。
Flink 主頁在其頂部展示了該項目的理念:“Apache Flink 是為分布式、高性能、隨時可用以及準確的流處理應用程序打造的開源流處理框架”。
Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數據流進行有狀態計算。Flink 被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。
3. Flink 流處理特性
-
支持高吞吐、低延遲、高性能的流處理
-
支持帶有事件時間的窗口(Window)操作
-
支持有狀態計算的 Exactly-once 語義
-
支持高度靈活的窗口(Window)操作,支持基于 time、count、session,以及 data-driven 的窗口操作
-
支持具有 Backpressure 功能的持續流模型
-
支持基于輕量級分布式快照(Snapshot)實現的容錯
-
一個運行時同時支持 Batch on Streaming 處理和 Streaming 處理
-
Flink 在 JVM 內部實現了自己的內存管理
-
支持迭代計算
-
支持程序自動優化:避免特定情況下 Shuffle、排序等昂貴操作,中間結果有必要進行緩存
4. Flink 基石
Flink 之所以能這么流行,離不開它最重要的四個基石:Checkpoint、State、Time、Window。
首先是 Checkpoint 機制,這是 Flink 最重要的一個特性。Flink 基于Chandy-Lamport算法實現了一個分布式的一致性的快照,從而提供了一致性的語義。Chandy-Lamport 算法實際上在 1985 年的時候已經被提出來,但并沒有被很廣泛的應用,而 Flink 則把這個算法發揚光大了。
Spark 最近在實現 Continue streaming,Continue streaming 的目的是為了降低它處理的延時,其也需要提供這種一致性的語義,最終采用 Chandy-Lamport 這個算法,說明 Chandy-Lamport 算法在業界得到了一定的肯定。
提供了一致性的語義之后,Flink 為了讓用戶在編程時能夠更輕松、更容易地去管理狀態,還提供了一套非常簡單明了的 State API,包括里面的有 ValueState、ListState、MapState,近期添加了 BroadcastState,使用 State API 能夠自動享受到這種一致性的語義。
除此之外,Flink 還實現了 Watermark 的機制,能夠支持基于事件的時間的處理,或者說基于系統時間的處理,能夠容忍數據的延時、容忍數據的遲到、容忍亂序的數據。
另外流計算中一般在對流數據進行操作之前都會先進行開窗,即基于一個什么樣的窗口上做這個計算。Flink 提供了開箱即用的各種窗口,比如滑動窗口、滾動窗口、會話窗口以及非常靈活的自定義的窗口。
5. 批處理與流處理
批處理的特點是有界、持久、大量,批處理非常適合需要訪問全套記錄才能完成的計算工作,一般用于離線統計。流處理的特點是無界、實時,流處理方式無需針對整個數據集執行操作,而是對通過系統傳輸的每個數據項執行操作,一般用于實時統計。
在 Spark 生態體系中,對于批處理和流處理采用了不同的技術框架,批處理由 SparkSQL 實現,流處理由 Spark Streaming 實現,這也是大部分框架采用的策略,使用獨立的處理器實現批處理和流處理,而 Flink 可以同時實現批處理和流處理。
Flink 是如何同時實現批處理與流處理的呢?答案是,Flink 將批處理(即處理有限的靜態數據)視作一種特殊的流處理。
Flink 的核心計算架構是下圖中的 Flink Runtime 執行引擎,它是一個分布式系統,能夠接受數據流程序并在一臺或多臺機器上以容錯方式執行。
Flink Runtime 執行引擎可以作為 YARN(Yet Another Resource Negotiator)的應用程序在集群上運行,也可以在 Mesos 集群上運行,還可以在單機上運行(這對于調試 Flink 應用程序來說非常有用)。
上圖為 Flink 技術棧的核心組成部分,值得一提的是,Flink 分別提供了面向流式處理的接口(DataStream API)和面向批處理的接口(DataSet API)。因此,Flink 既可以完成流處理,也可以完成批處理。Flink 支持的拓展庫涉及機器學習(FlinkML)、復雜事件處理(CEP)、以及圖計算(Gelly),還有分別針對流處理和批處理的 Table API。
能被 Flink Runtime 執行引擎接受的程序很強大,但是這樣的程序有著冗長的代碼,編寫起來也很費力,基于這個原因,Flink 提供了封裝在 Runtime 執行引擎之上的 API,以幫助用戶方便地生成流式計算程序。Flink 提供了用于流處理的 DataStream API 和用于批處理的 DataSet API。值得注意的是,盡管 Flink Runtime 執行引擎是基于流處理的,但是 DataSet API 先于 DataStream API 被開發出來,這是因為工業界對無限流處理的需求在 Flink 誕生之初并不大。
DataStream API 可以流暢地分析無限數據流,并且可以用 Java 或者 Scala 等來實現。開發人員需要基于一個叫 DataStream 的數據結構來開發,這個數據結構用于表示永不停止的分布式數據流。
Flink 的分布式特點體現在它能夠在成百上千臺機器上運行,它將大型的計算任務分成許多小的部分,每個機器執行一部分。Flink 能夠自動地確保發生機器故障或者其他錯誤時計算能夠持續進行,或者在修復 bug 或進行版本升級后有計劃地再執行一次。這種能力使得開發人員不需要擔心運行失敗。Flink 本質上使用容錯性數據流,這使得開發人員可以分析持續生成且永遠不結束的數據(即流處理)。
二、Flink 部署及啟動
Flink 支持多種安裝模式:
-
local(本地)——單機模式,一般不使用;
-
standalone——獨立模式,Flink 自帶集群,開發測試環境使用;
-
yarn——計算資源統一由 Hadoop YARN 管理,生產環境使用。
Flink 集群的安裝不屬于本文檔的范疇,如安裝 Flink,可自行搜索資料進行安裝。
本節重點在 Flink 的 Yarn 部署模式。
在一個企業中,為了最大化的利用集群資源,一般都會在一個集群中同時運行多種類型的 Workload,可以使用 YARN 來管理所有計算資源。
1. Flink 在 Yarn 上的部署架構
從圖中可以看出,Yarn 的客戶端需要獲取 hadoop 的配置信息,連接 Yarn 的 ResourceManager。所以要設置 YARN_CONF_DIR 或者 HADOOP_CONF_DIR 或者 HADOOP_CONF_PATH,只要設置了其中一個環境變量,就會被讀取。如果讀取上述的變量失敗了,那么將會選擇 hadoop_home 的環境變量,會嘗試加載$HADOOP_HOME/etc/hadoop 的配置文件。
-
當啟動一個 Flink Yarn 會話時,客戶端首先會檢查本次請求的資源(存儲、計算)是否足夠。資源足夠將會上傳包含 HDFS 及 Flink 的配置信息和 Flink 的 jar 包到 HDFS;
-
客戶端向 RM 發起請求;
-
RM 向 NM 發請求指令,創建 container,并從 HDFS 中下載 jar 以及配置文件;
-
啟動 ApplicationMaster 和 jobmanager,將 jobmanager 的地址信息寫到配置文件中,再發到 hdfs 上;
-
同時,AM 向 RM 發送心跳注冊自己,申請資源(cpu、內存);
-
創建 TaskManager 容器,從 HDFS 中下載 jar 包及配置文件并啟動;
-
各 task 任務通過 jobmanager 匯報自己的狀態和進度,AM 和 jobmanager 在一個容器上,AM 就能掌握各任務的運行狀態,從而可以在任務失敗時,重新啟動任務;
-
任務完成后,AM 向 RM 注銷并關閉自己;
2. 啟動集群
-
修改 hadoop 的配置參數:
vim etc/hadoop/yarn-site.xml
添加:
<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value>
</property>
修改 Hadoop 的 yarn-site.xml,添加該配置表示內存超過分配值,是否將任務殺掉。
默認為 true。運行 Flink 程序,很容易內存超標,這個時候 yarn 會自動殺掉 job。
-
修改全局變量?
/etc/profile:
添加:export HADOOP_CONF_DIR=/export/servers/hadoop/etc/Hadoop
YARN_CONF_DIR 或者 HADOOP_CONF_DIR 必須將環境變量設置為讀取 YARN 和 HDFS 配置
-
啟動 HDFS、zookeeper(如果是外置 zookeeper)、YARN 集群;
-
使用 yarn-session 的模式提交作業。
Yarn Session 模式提交作業有兩種方式:yarn-session 和 yarn-cluster
3. 模式一: yarn-session
特點:
-
使用 Flink 中的 yarn-session(yarn 客戶端),會啟動兩個必要服務 JobManager 和 TaskManagers;
-
客戶端通過 yarn-session 提交作業;
-
yarn-session 會一直啟動,不停地接收客戶端提交的任務;
-
如果擁有有大量的小作業,適合使用這種方式。
在 flink 目錄啟動 yarn-session:
bin/yarn-session.sh -n 2 -tm 800 -jm 800 -s 1 -d
-n 表示申請 2 個容器
-s 表示每個容器啟動多少個 slot 離模式,表示以后臺程
-tm 表示每個 TaskManager 申請 800M 內存
-d 分序方式運行
使用 flink 提交任務:
bin/flink run examples/batch/WordCount.jar
如果程序運行完了,可以使用 yarn application -kill application_id 殺掉任務:
yarn application -kill application_1554377097889_0002
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d 意思是:
同時向 Yarn 申請 3 個 container(即便只申請了兩個,因為 ApplicationMaster 和 Job Manager 有一個額外的容器。一旦將 Flink 部署到 YARN 群集中,它就會顯示 Job Manager 的連接詳細信息),其中 2 個 Container 啟動 TaskManager(-n 2),每個 TaskManager 擁有兩個 Task Slot(-s 1),并且向每個 TaskManager 的 Container 申請 800M 的內存,以及一個 ApplicationMaster(Job Manager)。
4. 模式二: yarn-cluster
特點:
-
直接提交任務給 YARN;
-
大作業,適合使用這種方式;
-
會自動關閉 session。
使用 flink 直接提交任務:
bin/flink run -m yarn-cluster -yn 2 -yjm 800 -ytm 800 /export/servers/flink-1.6.0/examples/batch/WordCount.jar
-yn 表示 TaskManager 的個數
注意:
-
在創建集群的時候,集群的配置參數就寫好了,但是往往因為業務需要,要更改一些配置參數,這個時候可以不必因為一個實例的提交而修改 conf/flink-conf.yaml;
可以通過:-D <arg> Dynamic properties來覆蓋原有的配置信息:比如:
-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
-
如果使用的是 flink on yarn 方式,想切換回 standalone 模式的話,需要刪除:
/tmp/.yarn-properties-root,因為默認查找當前 yarn 集群中已有的 yarn-session 信息中的 jobmanager。
三、Flink 運行架構
1. Flink 程序結構
Flink 程序的基本構建塊是流和轉換(請注意,Flink 的 DataSet API 中使用的 DataSet 也是內部流 )。從概念上講,流是(可能永無止境的)數據記錄流,而轉換是將一個或多個流作為一個或多個流的操作。輸入,并產生一個或多個輸出流。
Flink 應用程序結構就是如上圖所示:
Source: 數據源,Flink 在流處理和批處理上的 source 大概有 4 類:基于本地集合的 source、基于文件的 source、基于網絡套接字的 source、自定義的 source。自定義的 source 常見的有 Apache kafka、RabbitMQ 等,當然你也可以定義自己的 source。
Transformation:數據轉換的各種操作,有?Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select等,操作很多,可以將數據轉換計算成你想要的數據。
Sink:接收器,Flink 將轉換計算后的數據發送的地點 ,你可能需要存儲下來,Flink 常見的 Sink 大概有如下幾類:寫入文件、打印出來、寫入 socket 、自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 sink。
2. Flink 并行數據流
Flink 程序在執行的時候,會被映射成一個 Streaming Dataflow,一個 Streaming Dataflow 是由一組 Stream 和 Transformation Operator 組成的。在啟動時從一個或多個 Source Operator 開始,結束于一個或多個 Sink Operator。
Flink 程序本質上是并行的和分布式的,在執行過程中,一個流(stream)包含一個或多個流分區,而每一個 operator 包含一個或多個 operator 子任務。操作子任務間彼此獨立,在不同的線程中執行,甚至是在不同的機器或不同的容器上。operator 子任務的數量是這一特定 operator 的并行度。相同程序中的不同 operator 有不同級別的并行度。
一個 Stream 可以被分成多個 Stream 的分區,也就是 Stream Partition。一個 Operator 也可以被分為多個 Operator Subtask。如上圖中,Source 被分成 Source1 和 Source2,它們分別為 Source 的 Operator Subtask。每一個 Operator Subtask 都是在不同的線程當中獨立執行的。一個 Operator 的并行度,就等于 Operator Subtask 的個數。上圖 Source 的并行度為 2。而一個 Stream 的并行度就等于它生成的 Operator 的并行度。
數據在兩個 operator 之間傳遞的時候有兩種模式:
One to One 模式:兩個 operator 用此模式傳遞的時候,會保持數據的分區數和數據的排序;如上圖中的 Source1 到 Map1,它就保留的 Source 的分區特性,以及分區元素處理的有序性。
Redistributing (重新分配)模式:這種模式會改變數據的分區數;每個一個 operator subtask 會根據選擇 transformation 把數據發送到不同的目標 subtasks,比如 keyBy()會通過 hashcode 重新分區,broadcast()和 rebalance()方法會隨機重新分區;
3. Task 和 Operator chain
Flink的所有操作都稱之為Operator,客戶端在提交任務的時候會對Operator進行優化操作,能進行合并的Operator會被合并為一個Operator,合并后的Operator稱為Operator chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的線程中執行。
4. 任務調度與執行
-
當Flink執行executor會自動根據程序代碼生成DAG數據流圖;
-
ActorSystem創建Actor將數據流圖發送給JobManager中的Actor;
-
JobManager會不斷接收TaskManager的心跳消息,從而可以獲取到有效的TaskManager;
-
JobManager通過調度器在TaskManager中調度執行Task(在Flink中,最小的調度單元就是task,對應就是一個線程);
-
在程序運行過程中,task與task之間是可以進行數據傳輸的。
Job Client:
-
主要職責是提交任務, 提交后可以結束進程, 也可以等待結果返回;
-
Job Client 不是 Flink 程序執行的內部部分,但它是任務執行的起點;
-
Job Client 負責接受用戶的程序代碼,然后創建數據流,將數據流提交給 Job Manager 以便進一步執行。執行完成后,Job Client 將結果返回給用戶。
JobManager:
-
主要職責是調度工作并協調任務做檢查點;
-
集群中至少要有一個 master,master 負責調度 task,協調checkpoints 和容錯;
-
高可用設置的話可以有多個 master,但要保證一個是 leader, 其他是standby;
-
Job Manager 包含 Actor System、Scheduler、CheckPoint三個重要的組件;
-
JobManager從客戶端接收到任務以后, 首先生成優化過的執行計劃, 再調度到TaskManager中執行。
TaskManager:
-
主要職責是從JobManager處接收任務, 并部署和啟動任務, 接收上游的數據并處理;
-
Task Manager 是在 JVM 中的一個或多個線程中執行任務的工作節點;
-
TaskManager在創建之初就設置好了Slot, 每個Slot可以執行一個任務。
5. 任務槽和槽共享
每個TaskManager是一個JVM的進程, 可以在不同的線程中執行一個或多個子任務。為了控制一個worker能接收多少個task。worker通過task slot來進行控制(一個worker至少有一個task slot)。
1) 任務槽
每個task slot表示TaskManager擁有資源的一個固定大小的子集。
flink將進程的內存進行了劃分到多個slot中。
圖中有2個TaskManager,每個TaskManager有3個slot的,每個slot占有1/3的內存。
內存被劃分到不同的slot之后可以獲得如下好處:
-
TaskManager最多能同時并發執行的任務是可以控制的,那就是3個,因為不能超過slot的數量。
-
slot有獨占的內存空間,這樣在一個TaskManager中可以運行多個不同的作業,作業之間不受影響。
2) 槽共享
默認情況下,Flink允許子任務共享插槽,即使它們是不同任務的子任務,只要它們來自同一個作業。結果是一個槽可以保存作業的整個管道。允許插槽共享有兩個主要好處:
-
只需計算Job中最高并行度(parallelism)的task slot,只要這個滿足,其他的job也都能滿足。
-
資源分配更加公平,如果有比較空閑的slot可以將更多的任務分配給它。圖中若沒有任務槽共享,負載不高的Source/Map等subtask將會占據許多資源,而負載較高的窗口subtask則會缺乏資源。
-
有了任務槽共享,可以將基本并行度(base parallelism)從2提升到6.提高了分槽資源的利用率。同時它還可以保障TaskManager給subtask的分配的slot方案更加公平。
四、Flink 算子大全
Flink和Spark類似,也是一種一站式處理的框架;既可以進行批處理(DataSet),也可以進行實時處理(DataStream)。
所以下面將Flink的算子分為兩大類:一類是DataSet,一類是DataStream。
DataSet 批處理算子
一、Source算子
1. fromCollection
fromCollection:從本地集合讀取數據
例:
val?env?=?ExecutionEnvironment.getExecutionEnvironment
val?textDataSet:?DataSet[String]?=?env.fromCollection(List("1,張三",?"2,李四",?"3,王五",?"4,趙六")
)
2. readTextFile
readTextFile:從文件中讀取
val?textDataSet:?DataSet[String]??=?env.readTextFile("/data/a.txt")
3. readTextFile:遍歷目錄
readTextFile可以對一個文件目錄內的所有文件,包括所有子目錄中的所有文件的遍歷訪問方式
val?parameters?=?new?Configuration
//?recursive.file.enumeration?開啟遞歸
parameters.setBoolean("recursive.file.enumeration",?true)
val?file?=?env.readTextFile("/data").withParameters(parameters)
4. readTextFile:讀取壓縮文件
對于以下壓縮類型,不需要指定任何額外的inputformat方法,flink可以自動識別并且解壓。但是,壓縮文件可能不會并行讀取,可能是順序讀取的,這樣可能會影響作業的可伸縮性。
| 壓縮方法 | 文件擴展名 | 是否可并行讀取 |
|---|---|---|
| DEFLATE | .deflate | no |
| GZip | .gz .gzip | no |
| Bzip2 | .bz2 | no |
| XZ | .xz | no |
val?file?=?env.readTextFile("/data/file.gz")
二、Transform轉換算子
因為Transform算子基于Source算子操作,所以首先構建Flink執行環境及Source算子,后續Transform算子操作基于此:
val?env?=?ExecutionEnvironment.getExecutionEnvironment
val?textDataSet:?DataSet[String]?=?env.fromCollection(List("張三,1",?"李四,2",?"王五,3",?"張三,4")
)
1. map
將DataSet中的每一個元素轉換為另外一個元素
//?使用map將List轉換為一個Scala的樣例類case?class?User(name:?String,?id:?String)val?userDataSet:?DataSet[User]?=?textDataSet.map?{text?=>val?fieldArr?=?text.split(",")User(fieldArr(0),?fieldArr(1))
}
userDataSet.print()
2. flatMap
將DataSet中的每一個元素轉換為0...n個元素。
//?使用flatMap操作,將集合中的數據:
//?根據第一個元素,進行分組
//?根據第二個元素,進行聚合求值?val?result?=?textDataSet.flatMap(line?=>?line).groupBy(0)?//?根據第一個元素,進行分組.sum(1)?//?根據第二個元素,進行聚合求值result.print()
3. mapPartition
將一個分區中的元素轉換為另一個元素
//?使用mapPartition操作,將List轉換為一個scala的樣例類case?class?User(name:?String,?id:?String)val?result:?DataSet[User]?=?textDataSet.mapPartition(line?=>?{line.map(index?=>?User(index._1,?index._2))})result.print()
4. filter
過濾出來一些符合條件的元素,返回boolean值為true的元素
val?source:?DataSet[String]?=?env.fromElements("java",?"scala",?"java")
val?filter:DataSet[String]?=?source.filter(line?=>?line.contains("java"))//過濾出帶java的數據
filter.print()
5. reduce
可以對一個dataset或者一個group來進行聚合計算,最終聚合成一個元素
//?使用?fromElements?構建數據源
val?source?=?env.fromElements(("java",?1),?("scala",?1),?("java",?1))
//?使用map轉換成DataSet元組
val?mapData:?DataSet[(String,?Int)]?=?source.map(line?=>?line)
//?根據首個元素分組
val?groupData?=?mapData.groupBy(_._1)
//?使用reduce聚合
val?reduceData?=?groupData.reduce((x,?y)?=>?(x._1,?x._2?+?y._2))
//?打印測試
reduceData.print()
6. reduceGroup
將一個dataset或者一個group聚合成一個或多個元素。
reduceGroup是reduce的一種優化方案;
它會先分組reduce,然后在做整體的reduce;這樣做的好處就是可以減少網絡IO
//?使用?fromElements?構建數據源
val?source:?DataSet[(String,?Int)]?=?env.fromElements(("java",?1),?("scala",?1),?("java",?1))
//?根據首個元素分組
val?groupData?=?source.groupBy(_._1)
//?使用reduceGroup聚合
val?result:?DataSet[(String,?Int)]?=?groupData.reduceGroup?{(in:?Iterator[(String,?Int)],?out:?Collector[(String,?Int)])?=>val?tuple?=?in.reduce((x,?y)?=>?(x._1,?x._2?+?y._2))out.collect(tuple)}
//?打印測試
result.print()
7. minBy和maxBy
選擇具有最小值或最大值的元素
//?使用minBy操作,求List中每個人的最小值
//?List("張三,1",?"李四,2",?"王五,3",?"張三,4")case?class?User(name:?String,?id:?String)
//?將List轉換為一個scala的樣例類
val?text:?DataSet[User]?=?textDataSet.mapPartition(line?=>?{line.map(index?=>?User(index._1,?index._2))})val?result?=?text.groupBy(0)?//?按照姓名分組.minBy(1)???//?每個人的最小值
8. Aggregate
在數據集上進行聚合求最值(最大值、最小值)
val?data?=?new?mutable.MutableList[(Int,?String,?Double)]data.+=((1,?"yuwen",?89.0))data.+=((2,?"shuxue",?92.2))data.+=((3,?"yuwen",?89.99))
//?使用?fromElements?構建數據源
val?input:?DataSet[(Int,?String,?Double)]?=?env.fromCollection(data)
//?使用group執行分組操作
val?value?=?input.groupBy(1)//?使用aggregate求最大值元素.aggregate(Aggregations.MAX,?2)?
//?打印測試
value.print()???????
Aggregate只能作用于元組上
注意:
要使用aggregate,只能使用字段索引名或索引名稱來進行分組?groupBy(0)?,否則會報一下錯誤:
Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.
9. distinct
去除重復的數據
//?數據源使用上一題的
//?使用distinct操作,根據科目去除集合中重復的元組數據val?value:?DataSet[(Int,?String,?Double)]?=?input.distinct(1)
value.print()
10. first
取前N個數
input.first(2)?//?取前兩個數
11. join
將兩個DataSet按照一定條件連接到一起,形成新的DataSet
// s1 和 s2 數據集格式如下:
//?DataSet[(Int,?String,String,?Double)]val?joinData?=?s1.join(s2)??//?s1數據集?join?s2數據集.where(0).equalTo(0)?{?????//?join的條件(s1,?s2)?=>?(s1._1,?s1._2,?s2._2,?s1._3)}
12. leftOuterJoin
左外連接,左邊的Dataset中的每一個元素,去連接右邊的元素
此外還有:
rightOuterJoin:右外連接,左邊的Dataset中的每一個元素,去連接左邊的元素
fullOuterJoin:全外連接,左右兩邊的元素,全部連接
下面以 leftOuterJoin 進行示例:
?val?data1?=?ListBuffer[Tuple2[Int,String]]()data1.append((1,"zhangsan"))data1.append((2,"lisi"))data1.append((3,"wangwu"))data1.append((4,"zhaoliu"))val?data2?=?ListBuffer[Tuple2[Int,String]]()data2.append((1,"beijing"))data2.append((2,"shanghai"))data2.append((4,"guangzhou"))val?text1?=?env.fromCollection(data1)
val?text2?=?env.fromCollection(data2)text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{if(second==null){(first._1,first._2,"null")}else{(first._1,first._2,second._2)}}).print()
13. cross
交叉操作,通過形成這個數據集和其他數據集的笛卡爾積,創建一個新的數據集
和join類似,但是這種交叉操作會產生笛卡爾積,在數據比較大的時候,是非常消耗內存的操作
val?cross?=?input1.cross(input2){(input1?,?input2)?=>?(input1._1,input1._2,input1._3,input2._2)}cross.print()
14. union
聯合操作,創建包含來自該數據集和其他數據集的元素的新數據集,不會去重
val?unionData:?DataSet[String]?=?elements1.union(elements2).union(elements3)
//?去除重復數據
val?value?=?unionData.distinct(line?=>?line)
15. rebalance
Flink也有數據傾斜的時候,比如當前有數據量大概10億條數據需要處理,在處理過程中可能會發生如圖所示的狀況:
這個時候本來總體數據量只需要10分鐘解決的問題,出現了數據傾斜,機器1上的任務需要4個小時才能完成,那么其他3臺機器執行完畢也要等待機器1執行完畢后才算整體將任務完成;所以在實際的工作中,出現這種情況比較好的解決方案就是接下來要介紹的—rebalance(內部使用round robin方法將數據均勻打散。這對于數據傾斜時是很好的選擇。)
//?使用rebalance操作,避免數據傾斜
val?rebalance?=?filterData.rebalance()
16. partitionByHash
按照指定的key進行hash分區
val?data?=?new?mutable.MutableList[(Int,?Long,?String)]
data.+=((1,?1L,?"Hi"))
data.+=((2,?2L,?"Hello"))
data.+=((3,?2L,?"Hello?world"))val?collection?=?env.fromCollection(data)
val?unique?=?collection.partitionByHash(1).mapPartition{line?=>line.map(x?=>?(x._1?,?x._2?,?x._3))
}unique.writeAsText("hashPartition",?WriteMode.NO_OVERWRITE)
env.execute()
17. partitionByRange
根據指定的key對數據集進行范圍分區
val?data?=?new?mutable.MutableList[(Int,?Long,?String)]
data.+=((1,?1L,?"Hi"))
data.+=((2,?2L,?"Hello"))
data.+=((3,?2L,?"Hello?world"))
data.+=((4,?3L,?"Hello?world,?how?are?you?"))val?collection?=?env.fromCollection(data)
val?unique?=?collection.partitionByRange(x?=>?x._1).mapPartition(line?=>?line.map{x=>(x._1?,?x._2?,?x._3)
})
unique.writeAsText("rangePartition",?WriteMode.OVERWRITE)
env.execute()
18. sortPartition
根據指定的字段值進行分區的排序
val?data?=?new?mutable.MutableList[(Int,?Long,?String)]data.+=((1,?1L,?"Hi"))data.+=((2,?2L,?"Hello"))data.+=((3,?2L,?"Hello?world"))data.+=((4,?3L,?"Hello?world,?how?are?you?"))val?ds?=?env.fromCollection(data)val?result?=?ds.map?{?x?=>?x?}.setParallelism(2).sortPartition(1,?Order.DESCENDING)//第一個參數代表按照哪個字段進行分區.mapPartition(line?=>?line).collect()println(result)
三、Sink算子
1. collect
將數據輸出到本地集合
result.collect()
2. writeAsText
將數據輸出到文件
Flink支持多種存儲設備上的文件,包括本地文件,hdfs文件等
Flink支持多種文件的存儲格式,包括text文件,CSV文件等
//?將數據寫入本地文件
result.writeAsText("/data/a",?WriteMode.OVERWRITE)//?將數據寫入HDFS
result.writeAsText("hdfs://node01:9000/data/a",?WriteMode.OVERWRITE)
DataStream流處理算子
和DataSet一樣,DataStream也包括一系列的Transformation操作
一、Source算子
Flink可以使用 StreamExecutionEnvironment.addSource(source) 來為我們的程序添加數據來源。
Flink 已經提供了若干實現好了的 source functions,當然我們也可以通過實現 SourceFunction 來自定義非并行的source或者實現 ParallelSourceFunction 接口或者擴展 RichParallelSourceFunction 來自定義并行的 source。
Flink在流處理上的source和在批處理上的source基本一致。大致有4大類:
-
基于本地集合的source(Collection-based-source)
-
基于文件的source(File-based-source)- 讀取文本文件,即符合 TextInputFormat 規范的文件,并將其作為字符串返回
-
基于網絡套接字的source(Socket-based-source)- 從 socket 讀取。元素可以用分隔符切分。
-
自定義的source(Custom-source)
下面使用addSource將Kafka數據寫入Flink為例:
如果需要外部數據源對接,可使用addSource,如將Kafka數據寫入Flink, 先引入依賴:
<!--?https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11?-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version>
</dependency>
將Kafka數據寫入Flink:
val?properties?=?new?Properties()
properties.setProperty("bootstrap.servers",?"localhost:9092")
properties.setProperty("group.id",?"consumer-group")
properties.setProperty("key.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset",?"latest")val?source?=?env.addSource(new?FlinkKafkaConsumer011[String]("sensor",?new?SimpleStringSchema(),?properties))
基于網絡套接字的:
val?source?=?env.socketTextStream("IP",?PORT)
二、Transform轉換算子
1. map
將DataSet中的每一個元素轉換為另外一個元素
dataStream.map?{?x?=>?x?*?2?}
2. FlatMap
采用一個數據元并生成零個,一個或多個數據元。將句子分割為單詞的flatmap函數
dataStream.flatMap?{?str?=>?str.split("?")?}
3. Filter
計算每個數據元的布爾函數,并保存函數返回true的數據元。過濾掉零值的過濾器
dataStream.filter?{?_?!=?0?}
4.?KeyBy
邏輯上將流分區為不相交的分區。具有相同Keys的所有記錄都分配給同一分區。在內部,keyBy()是使用散列分區實現的。指定鍵有不同的方法。
此轉換返回KeyedStream,其中包括使用被Keys化狀態所需的KeyedStream。
dataStream.keyBy(0)?
5. Reduce
被Keys化數據流上的“滾動”Reduce。將當前數據元與最后一個Reduce的值組合并發出新值
keyedStream.reduce?{?_?+?_?}??
6.?Fold
具有初始值的被Keys化數據流上的“滾動”折疊。將當前數據元與最后折疊的值組合并發出新值
val?result:?DataStream[String]?=??keyedStream.fold("start")((str,?i)?=>?{?str?+?"-"?+?i?})?//?解釋:當上述代碼應用于序列(1,2,3,4,5)時,輸出結果“start-1”,“start-1-2”,“start-1-2-3”,...
7. Aggregations
在被Keys化數據流上滾動聚合。min和minBy之間的差異是min返回最小值,而minBy返回該字段中具有最小值的數據元(max和maxBy相同)。
keyedStream.sum(0);keyedStream.min(0);keyedStream.max(0);keyedStream.minBy(0);keyedStream.maxBy(0);
8.?Window
可以在已經分區的KeyedStream上定義Windows。Windows根據某些特征(例如,在最后5秒內到達的數據)對每個Keys中的數據進行分組。這里不再對窗口進行詳解,有關窗口的完整說明,請查看這篇文章:Flink 中極其重要的 Time 與 Window 詳細解析
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));?
9.?WindowAll
Windows可以在常規DataStream上定義。Windows根據某些特征(例如,在最后5秒內到達的數據)對所有流事件進行分組。
注意:在許多情況下,這是非并行轉換。所有記錄將收集在windowAll 算子的一個任務中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
10. Window Apply
將一般函數應用于整個窗口。
注意:如果您正在使用windowAll轉換,則需要使用AllWindowFunction。
下面是一個手動求和窗口數據元的函數
windowedStream.apply?{?WindowFunction?}allWindowedStream.apply?{?AllWindowFunction?}
11. Window Reduce
將函數縮減函數應用于窗口并返回縮小的值
windowedStream.reduce?{?_?+?_?}
12. Window Fold
將函數折疊函數應用于窗口并返回折疊值
val?result:?DataStream[String]?=?windowedStream.fold("start",?(str,?i)?=>?{?str?+?"-"?+?i?})?//?上述代碼應用于序列(1,2,3,4,5)時,將序列折疊為字符串“start-1-2-3-4-5”
13.?Union
兩個或多個數據流的聯合,創建包含來自所有流的所有數據元的新流。注意:如果將數據流與自身聯合,則會在結果流中獲取兩次數據元
dataStream.union(otherStream1,?otherStream2,?...)
14.?Window Join
在給定Keys和公共窗口上連接兩個數據流
dataStream.join(otherStream).where(<key?selector>).equalTo(<key?selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply?(new?JoinFunction?()?{...})
15. Interval Join
在給定的時間間隔內使用公共Keys關聯兩個被Key化的數據流的兩個數據元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
am.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2),?Time.milliseconds(2))?.upperBoundExclusive(true)?.lowerBoundExclusive(true)?.process(new?IntervalJoinFunction()?{...})
16. Window CoGroup
在給定Keys和公共窗口上對兩個數據流進行Cogroup
dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply?(new?CoGroupFunction?()?{...})
17.?Connect
“連接”兩個保存其類型的數據流。連接允許兩個流之間的共享狀態
DataStream<Integer>?someStream?=?...?DataStream<String>?otherStream?=?...?ConnectedStreams<Integer,?String>?connectedStreams?=?someStream.connect(otherStream)//?...?代表省略中間操作
18.?CoMap,CoFlatMap
類似于連接數據流上的map和flatMap
connectedStreams.map((_?:?Int)?=>?true,(_?:?String)?=>?false)connectedStreams.flatMap((_?:?Int)?=>?true,(_?:?String)?=>?false)
19.?Split
根據某些標準將流拆分為兩個或更多個流
val?split?=?someDataStream.split((num:?Int)?=>(num?%?2)?match?{case?0?=>?List("even")case?1?=>?List("odd")})??????
20.?Select
從拆分流中選擇一個或多個流
SplitStream<Integer>?split;DataStream<Integer>?even?=?split.select("even");DataStream<Integer>?odd?=?split.select("odd");DataStream<Integer>?all?=?split.select("even","odd")
三、Sink算子
支持將數據輸出到:
-
本地文件(參考批處理)
-
本地集合(參考批處理)
-
HDFS(參考批處理)
除此之外,還支持:
-
sink到kafka
-
sink到mysql
-
sink到redis
下面以sink到kafka為例:
val?sinkTopic?=?"test"//樣例類
case?class?Student(id:?Int,?name:?String,?addr:?String,?sex:?String)
val?mapper:?ObjectMapper?=?new?ObjectMapper()//將對象轉換成字符串
def?toJsonString(T:?Object):?String?=?{mapper.registerModule(DefaultScalaModule)mapper.writeValueAsString(T)
}def?main(args:?Array[String]):?Unit?=?{//1.創建流執行環境val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//2.準備數據val?dataStream:?DataStream[Student]?=?env.fromElements(Student(8,?"xiaoming",?"beijing?biejing",?"female"))//將student轉換成字符串val?studentStream:?DataStream[String]?=?dataStream.map(student?=>toJsonString(student)?//?這里需要顯示SerializerFeature中的某一個,否則會報同時匹配兩個方法的錯誤)//studentStream.print()val?prop?=?new?Properties()prop.setProperty("bootstrap.servers",?"node01:9092")val?myProducer?=?new?FlinkKafkaProducer011[String](sinkTopic,?new?KeyedSerializationSchemaWrapper[String](new?SimpleStringSchema()),?prop)studentStream.addSink(myProducer)studentStream.print()env.execute("Flink?add?sink")
}
本文檔首發于公眾號【五分鐘學大數據】,更多大數據技術文檔可下方掃碼關注獲取:
五、流處理中的Time與Window
Flink 是流式的、實時的 計算引擎。
上面一句話就有兩個概念,一個是流式,一個是實時。
流式:就是數據源源不斷的流進來,也就是數據沒有邊界,但是我們計算的時候必須在一個有邊界的范圍內進行,所以這里面就有一個問題,邊界怎么確定?無非就兩種方式,根據時間段或者數據量進行確定,根據時間段就是每隔多長時間就劃分一個邊界,根據數據量就是每來多少條數據劃分一個邊界,Flink 中就是這么劃分邊界的,本文會詳細講解。
實時:就是數據發送過來之后立馬就進行相關的計算,然后將結果輸出。這里的計算有兩種:
-
一種是只有邊界內的數據進行計算,這種好理解,比如統計每個用戶最近五分鐘內瀏覽的新聞數量,就可以取最近五分鐘內的所有數據,然后根據每個用戶分組,統計新聞的總數。
-
另一種是邊界內數據與外部數據進行關聯計算,比如:統計最近五分鐘內瀏覽新聞的用戶都是來自哪些地區,這種就需要將五分鐘內瀏覽新聞的用戶信息與 hive 中的地區維表進行關聯,然后在進行相關計算。
本節所講的 Flink 內容就是圍繞以上概念進行詳細剖析的!
1. Time
在Flink中,如果以時間段劃分邊界的話,那么時間就是一個極其重要的字段。
Flink中的時間有三種類型,如下圖所示:
-
Event Time:是事件創建的時間。它通常由事件中的時間戳描述,例如采集的日志數據中,每一條日志都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。
-
Ingestion Time:是數據進入Flink的時間。
-
Processing Time:是每一個執行基于時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。
例如,一條日志進入Flink的時間為2021-01-22 10:00:00.123,到達Window的系統時間為2021-01-22 10:00:01.234,日志的內容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2
對于業務來說,要統計1min內的故障日志個數,哪個時間是最有意義的?—— eventTime,因為我們要根據日志的生成時間進行統計。
2. Window
Window,即窗口,我們前面一直提到的邊界就是這里的Window(窗口)。
官方解釋:流式計算是一種被設計用于處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段。
所以Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。
Window類型
本文剛開始提到,劃分窗口就兩種方式:
-
根據時間進行截取(time-driven-window),比如每1分鐘統計一次或每10分鐘統計一次。
-
根據數據進行截取(data-driven-window),比如每5個數據統計一次或每50個數據統計一次。
窗口類型
對于TimeWindow(根據時間劃分窗口), 可以根據窗口實現原理的不同分成三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。
-
滾動窗口(Tumbling Windows)
將數據依據固定的窗口長度對數據進行切片。
特點:時間對齊,窗口長度固定,沒有重疊。
滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,并且不會出現重疊。
例如:如果你指定了一個5分鐘大小的滾動窗口,窗口的創建如下圖所示:
滾動窗口
適用場景:適合做BI統計等(做每個時間段的聚合計算)。
-
滑動窗口(Sliding Windows)
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。
特點:時間對齊,窗口長度固定,有重疊。
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動,那么每個窗口中5分鐘的窗口里包含著上個10分鐘產生的數據,如下圖所示:
滑動窗口
適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。
-
會話窗口(Session Windows)
由一系列事件組合一個指定時間長度的timeout間隙組成,類似于web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。
特點:時間無對齊。
session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度,當這個非活躍周期產生,那么當前的session將關閉并且后續的元素將被分配到新的session窗口中去。
會話窗口
3. Window API
1) TimeWindow
TimeWindow是將指定時間范圍內的所有數據組成一個window,一次對一個window里面的所有數據進行計算(就是本文開頭說的對一個邊界內的數據進行計算)。
我們以?紅綠燈路口通過的汽車數量?為例子:
紅綠燈路口會有汽車通過,一共會有多少汽車通過,無法計算。因為車流源源不斷,計算沒有邊界。
所以我們統計每15秒鐘通過紅路燈的汽車數量,如第一個15秒為2輛,第二個15秒為3輛,第三個15秒為1輛 ...
-
tumbling-time-window (無重疊數據)
我們使用 Linux 中的 nc 命令模擬數據的發送方
1.開啟發送端口,端口號為9999
nc?-lk?99992.發送內容(key?代表不同的路口,value?代表每次通過的車輛)
一次發送一行,發送的時間間隔代表汽車經過的時間間隔
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
Flink 進行采集數據并計算:
object?Window?{def?main(args:?Array[String]):?Unit?=?{//TODO?time-window//1.創建運行環境val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//2.定義數據流來源val?text?=?env.socketTextStream("localhost",?9999)//3.轉換數據格式,text->CarWccase?class?CarWc(sensorId:?Int,?carCnt:?Int)val?ds1:?DataStream[CarWc]?=?text.map?{line?=>?{val?tokens?=?line.split(",")CarWc(tokens(0).trim.toInt,?tokens(1).trim.toInt)}}//4.執行統計操作,每個sensorId一個tumbling窗口,窗口的大小為5秒//也就是說,每5秒鐘統計一次,在這過去的5秒鐘內,各個路口通過紅綠燈汽車的數量。val?ds2:?DataStream[CarWc]?=?ds1.keyBy("sensorId").timeWindow(Time.seconds(5)).sum("carCnt")//5.顯示統計結果ds2.print()//6.觸發流計算env.execute(this.getClass.getName)}
}
我們發送的數據并沒有指定時間字段,所以Flink使用的是默認的 Processing Time,也就是Flink系統處理數據時的時間。
-
sliding-time-window (有重疊數據)
//1.創建運行環境
val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//2.定義數據流來源
val?text?=?env.socketTextStream("localhost",?9999)//3.轉換數據格式,text->CarWc
case?class?CarWc(sensorId:?Int,?carCnt:?Int)val?ds1:?DataStream[CarWc]?=?text.map?{line?=>?{val?tokens?=?line.split(",")CarWc(tokens(0).trim.toInt,?tokens(1).trim.toInt)}
}
//4.執行統計操作,每個sensorId一個sliding窗口,窗口時間10秒,滑動時間5秒
//也就是說,每5秒鐘統計一次,在這過去的10秒鐘內,各個路口通過紅綠燈汽車的數量。
val?ds2:?DataStream[CarWc]?=?ds1.keyBy("sensorId").timeWindow(Time.seconds(10),?Time.seconds(5)).sum("carCnt")//5.顯示統計結果
ds2.print()//6.觸發流計算
env.execute(this.getClass.getName)
2) CountWindow
CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。
注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數。
-
tumbling-count-window (無重疊數據)
//1.創建運行環境
val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//2.定義數據流來源
val?text?=?env.socketTextStream("localhost",?9999)//3.轉換數據格式,text->CarWc
case?class?CarWc(sensorId:?Int,?carCnt:?Int)val?ds1:?DataStream[CarWc]?=?text.map?{(f)?=>?{val?tokens?=?f.split(",")CarWc(tokens(0).trim.toInt,?tokens(1).trim.toInt)}
}
//4.執行統計操作,每個sensorId一個tumbling窗口,窗口的大小為5
//按照key進行收集,對應的key出現的次數達到5次作為一個結果
val?ds2:?DataStream[CarWc]?=?ds1.keyBy("sensorId").countWindow(5).sum("carCnt")//5.顯示統計結果
ds2.print()//6.觸發流計算
env.execute(this.getClass.getName)
-
sliding-count-window (有重疊數據)
同樣也是窗口長度和滑動窗口的操作:窗口長度是5,滑動長度是3
//1.創建運行環境
val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//2.定義數據流來源
val?text?=?env.socketTextStream("localhost",?9999)//3.轉換數據格式,text->CarWc
case?class?CarWc(sensorId:?Int,?carCnt:?Int)val?ds1:?DataStream[CarWc]?=?text.map?{(f)?=>?{val?tokens?=?f.split(",")CarWc(tokens(0).trim.toInt,?tokens(1).trim.toInt)}
}
//4.執行統計操作,每個sensorId一個sliding窗口,窗口大小3條數據,窗口滑動為3條數據
//也就是說,每個路口分別統計,收到關于它的3條消息時統計在最近5條消息中,各自路口通過的汽車數量
val?ds2:?DataStream[CarWc]?=?ds1.keyBy("sensorId").countWindow(5,?3).sum("carCnt")//5.顯示統計結果
ds2.print()//6.觸發流計算
env.execute(this.getClass.getName)
-
Window 總結
-
flink支持兩種劃分窗口的方式(time和count)
-
如果根據時間劃分窗口,那么它就是一個time-window
-
如果根據數據劃分窗口,那么它就是一個count-window
-
-
flink支持窗口的兩個重要屬性(size和interval)
-
如果size=interval,那么就會形成tumbling-window(無重疊數據)
-
如果size>interval,那么就會形成sliding-window(有重疊數據)
-
如果size<interval,那么這種窗口將會丟失數據。比如每5秒鐘,統計過去3秒的通過路口汽車的數據,將會漏掉2秒鐘的數據。
-
-
通過組合可以得出四種基本窗口
-
time-tumbling-window 無重疊數據的時間窗口,設置方式舉例:timeWindow(Time.seconds(5))
-
time-sliding-window ?有重疊數據的時間窗口,設置方式舉例:timeWindow(Time.seconds(5), Time.seconds(3))
-
count-tumbling-window無重疊數據的數量窗口,設置方式舉例:countWindow(5)
-
count-sliding-window 有重疊數據的數量窗口,設置方式舉例:countWindow(5,3)
-
3) Window Reduce
WindowedStream → DataStream:給window賦一個reduce功能的函數,并返回一個聚合的結果。
import?org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import?org.apache.flink.api.scala._
import?org.apache.flink.streaming.api.windowing.time.Timeobject?StreamWindowReduce?{def?main(args:?Array[String]):?Unit?=?{//?獲取執行環境val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//?創建SocketSourceval?stream?=?env.socketTextStream("node01",?9999)//?對stream進行處理并按key聚合val?streamKeyBy?=?stream.map(item?=>?(item,?1)).keyBy(0)//?引入時間窗口val?streamWindow?=?streamKeyBy.timeWindow(Time.seconds(5))//?執行聚合操作val?streamReduce?=?streamWindow.reduce((item1,?item2)?=>?(item1._1,?item1._2?+?item2._2))//?將聚合數據寫入文件streamReduce.print()//?執行程序env.execute("TumblingWindow")}
}
4) Window Apply
apply方法可以進行一些自定義處理,通過匿名內部類的方法來實現。當有一些復雜計算時使用。
用法
-
實現一個 WindowFunction 類
-
指定該類的泛型為 [輸入數據類型, 輸出數據類型, keyBy中使用分組字段的類型, 窗口類型]
示例:使用apply方法來實現單詞統計
步驟:
-
獲取流處理運行環境
-
構建socket流數據源,并指定IP地址和端口號
-
對接收到的數據轉換成單詞元組
-
使用 keyBy 進行分流(分組)
-
使用 timeWinodw 指定窗口的長度(每3秒計算一次)
-
實現一個WindowFunction匿名內部類
-
apply方法中實現聚合計算
-
使用Collector.collect收集數據
-
核心代碼如下:
????//1.?獲取流處理運行環境val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//2.?構建socket流數據源,并指定IP地址和端口號val?textDataStream?=?env.socketTextStream("node01",?9999).flatMap(_.split("?"))//3.?對接收到的數據轉換成單詞元組val?wordDataStream?=?textDataStream.map(_->1)//4.?使用?keyBy?進行分流(分組)val?groupedDataStream:?KeyedStream[(String,?Int),?String]?=?wordDataStream.keyBy(_._1)//5.?使用?timeWinodw?指定窗口的長度(每3秒計算一次)val?windowDataStream:?WindowedStream[(String,?Int),?String,?TimeWindow]?=?groupedDataStream.timeWindow(Time.seconds(3))//6.?實現一個WindowFunction匿名內部類val?reduceDatStream:?DataStream[(String,?Int)]?=?windowDataStream.apply(new?RichWindowFunction[(String,?Int),?(String,?Int),?String,?TimeWindow]?{//在apply方法中實現數據的聚合override?def?apply(key:?String,?window:?TimeWindow,?input:?Iterable[(String,?Int)],?out:?Collector[(String,?Int)]):?Unit?=?{println("hello?world")val?tuple?=?input.reduce((t1,?t2)?=>?{(t1._1,?t1._2?+?t2._2)})//將要返回的數據收集起來,發送回去out.collect(tuple)}})reduceDatStream.print()env.execute()
5) Window Fold
WindowedStream → DataStream:給窗口賦一個fold功能的函數,并返回一個fold后的結果。
import?org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import?org.apache.flink.api.scala._
import?org.apache.flink.streaming.api.windowing.time.Timeobject?StreamWindowFold?{def?main(args:?Array[String]):?Unit?=?{//?獲取執行環境val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//?創建SocketSourceval?stream?=?env.socketTextStream("node01",?9999,'\n',3)//?對stream進行處理并按key聚合val?streamKeyBy?=?stream.map(item?=>?(item,?1)).keyBy(0)//?引入滾動窗口val?streamWindow?=?streamKeyBy.timeWindow(Time.seconds(5))//?執行fold操作val?streamFold?=?streamWindow.fold(100){(begin,?item)?=>begin?+?item._2}//?將聚合數據寫入文件streamFold.print()//?執行程序env.execute("TumblingWindow")}
}
6) Aggregation on Window
WindowedStream → DataStream:對一個window內的所有元素做聚合操作。min和 minBy的區別是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同樣的原理適用于 max 和 maxBy)。
import?org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import?org.apache.flink.streaming.api.windowing.time.Time
import?org.apache.flink.api.scala._object?StreamWindowAggregation?{def?main(args:?Array[String]):?Unit?=?{//?獲取執行環境val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//?創建SocketSourceval?stream?=?env.socketTextStream("node01",?9999)//?對stream進行處理并按key聚合val?streamKeyBy?=?stream.map(item?=>?(item.split("?")(0),?item.split("?")(1))).keyBy(0)//?引入滾動窗口val?streamWindow?=?streamKeyBy.timeWindow(Time.seconds(5))//?執行聚合操作val?streamMax?=?streamWindow.max(1)//?將聚合數據寫入文件streamMax.print()//?執行程序env.execute("TumblingWindow")}
}
4. EventTime與Window
1) EventTime的引入
-
與現實世界中的時間是不一致的,在flink中被劃分為事件時間,提取時間,處理時間三種。
-
如果以EventTime為基準來定義時間窗口那將形成EventTimeWindow,要求消息本身就應該攜帶EventTime
-
如果以IngesingtTime為基準來定義時間窗口那將形成IngestingTimeWindow,以source的systemTime為準。
-
如果以ProcessingTime基準來定義時間窗口那將形成ProcessingTimeWindow,以operator的systemTime為準。
在Flink的流式處理中,絕大部分的業務都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的時間屬性,引入方式如下所示:
val?env?=?StreamExecutionEnvironment.getExecutionEnvironment//?從調用時刻開始給env創建的每一個stream追加時間特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2) Watermark
我們知道,流處理從事件產生,到流經 source,再到 operator,中間是有一個過程和時間的,雖然大部分情況下,流到 operator 的數據都是按照事件產生的時間順序來的,但是也不排除由于網絡、背壓等原因,導致亂序的產生,所謂亂序,就是指 Flink 接收到的事件的先后順序不是嚴格按照事件的 Event Time 順序排列的,所以 Flink 最初設計的時候,就考慮到了網絡延遲,網絡亂序等問題,所以提出了一個抽象概念:水印(WaterMark);
如上圖所示,就出現一個問題,一旦出現亂序,如果只根據 EventTime 決定 Window 的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發 Window 去進行計算了,這個特別的機制,就是 Watermark。
Watermark 是用于處理亂序事件的,而正確的處理亂序事件,通常用 Watermark 機制結合 Window 來實現。
數據流中的 Watermark 用于表示 timestamp 小于 Watermark 的數據,都已經到達了,因此,Window 的執行也是由 Watermark 觸發的。
Watermark 可以理解成一個延遲觸發機制,我們可以設置 Watermark 的延時時長 t,每次系統會校驗已經到達的數據中最大的 maxEventTime,然后認定 EventTime 小于 maxEventTime - t 的所有數據都已經到達,如果有窗口的停止時間等于 maxEventTime – t,那么這個窗口被觸發執行。
有序流的Watermarker如下圖所示:(Watermark設置為0)
有序數據的Watermark
亂序流的Watermarker如下圖所示:(Watermark設置為2)
無序數據的Watermark
當 Flink 接收到每一條數據時,都會產生一條 Watermark,這條 Watermark 就等于當前所有到達數據中的 maxEventTime - 延遲時長,也就是說,Watermark 是由數據攜帶的,一旦數據攜帶的 Watermark 比當前未觸發的窗口的停止時間要晚,那么就會觸發相應窗口的執行。由于 Watermark 是由數據攜帶的,因此,如果運行過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠都不被觸發。
上圖中,我們設置的允許最大延遲到達時間為2s,所以時間戳為7s的事件對應的Watermark是5s,時間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那么時間戳為7s的事件到達時的Watermarker恰好觸發窗口1,時間戳為12s的事件到達時的Watermark恰好觸發窗口2。
3) Flink對于遲到數據的處理
waterMark和Window機制解決了流式數據的亂序問題,對于因為延遲而順序有誤的數據,可以根據eventTime進行業務處理,于延遲的數據Flink也有自己的解決辦法,主要的辦法是給定一個允許延遲的時間,在該時間范圍內仍可以接受處理延遲數據。
設置允許延遲的時間是通過?allowedLateness(lateness: Time)?設置
保存延遲數據則是通過?sideOutputLateData(outputTag: OutputTag[T])?保存
獲取延遲數據是通過?DataStream.getSideOutput(tag: OutputTag[X])?獲取
具體的用法如下:
allowedLateness(lateness: Time)
def?allowedLateness(lateness:?Time):?WindowedStream[T,?K,?W]?=?{javaStream.allowedLateness(lateness)this
}
該方法傳入一個Time值,設置允許數據遲到的時間,這個時間和 WaterMark 中的時間概念不同。再來回顧一下:
WaterMark=數據的事件時間-允許亂序時間值
隨著新數據的到來,waterMark的值會更新為最新數據事件時間-允許亂序時間值,但是如果這時候
來了一條歷史數據,waterMark值則不會更新。總的來說,waterMark是為了能接收到盡可能多的亂序數據。
那這里的Time值,主要是為了等待遲到的數據,在一定時間范圍內,如果屬于該窗口的數據到來,仍會進行計算,后面會對計算方式仔細說明
注意:該方法只針對于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值則會拋出異常。
sideOutputLateData(outputTag: OutputTag[T])
def?sideOutputLateData(outputTag:?OutputTag[T]):?WindowedStream[T,?K,?W]?=?{javaStream.sideOutputLateData(outputTag)this
}
該方法是將遲來的數據保存至給定的outputTag參數,而OutputTag則是用來標記延遲數據的一個對象。
DataStream.getSideOutput(tag: OutputTag[X])
通過window等操作返回的DataStream調用該方法,傳入標記延遲數據的對象來獲取延遲的數據。
對延遲數據的理解
延遲數據是指:
在當前窗口【假設窗口范圍為10-15】已經計算之后,又來了一個屬于該窗口的數據【假設事件時間為13】,這時候仍會觸發 Window 操作,這種數據就稱為延遲數據。
那么問題來了,延遲時間怎么計算呢?
假設窗口范圍為10-15,延遲時間為2s,則只要 WaterMark<15+2,并且屬于該窗口,就能觸發 Window 操作。而如果來了一條數據使得 WaterMark>=15+2,10-15這個窗口就不能再觸發 Window 操作,即使新來的數據的 Event Time 屬于這個窗口時間內 。
4) Flink 關聯 Hive 分區表
Flink 1.12 支持了 Hive 最新的分區作為時態表的功能,可以通過 SQL 的方式直接關聯 Hive 分區表的最新分區,并且會自動監聽最新的 Hive 分區,當監控到新的分區后,會自動地做維表數據的全量替換。通過這種方式,用戶無需編寫 DataStream 程序即可完成?Kafka 流實時關聯最新的 Hive 分區實現數據打寬。
具體用法:
在 Sql Client 中注冊 HiveCatalog:
vim?conf/sql-client-defaults.yaml?
catalogs:?-?name:?hive_catalog?type:?hive?hive-conf-dir:?/disk0/soft/hive-conf/?#該目錄需要包hive-site.xml文件?
創建 Kafka 表
CREATE?TABLE?hive_catalog.flink_db.kfk_fact_bill_master_12?(??master?Row<reportDate?String,?groupID?int,?shopID?int,?shopName?String,?action?int,?orderStatus?int,?orderKey?String,?actionTime?bigint,?areaName?String,?paidAmount?double,?foodAmount?double,?startTime?String,?person?double,?orderSubType?int,?checkoutTime?String>,??
proctime?as?PROCTIME()??--?PROCTIME用來和Hive時態表關聯??
)?WITH?(??'connector'?=?'kafka',??'topic'?=?'topic_name',??'format'?=?'json',??'properties.bootstrap.servers'?=?'host:9092',??'properties.group.id'?=?'flinkTestGroup',??'scan.startup.mode'?=?'timestamp',??'scan.startup.timestamp-millis'?=?'1607844694000'??
);?
Flink 事實表與 Hive 最新分區數據關聯
dim_extend_shop_info 是 Hive 中已存在的表,所以我們用 table hint 動態地開啟維表參數。
CREATE?VIEW?IF?NOT?EXISTS?hive_catalog.flink_db.view_fact_bill_master?as??
SELECT?*?FROM??(select?t1.*,?t2.group_id,?t2.shop_id,?t2.group_name,?t2.shop_name,?t2.brand_id,???ROW_NUMBER()?OVER?(PARTITION?BY?groupID,?shopID,?orderKey?ORDER?BY?actionTime?desc)?rn??from?hive_catalog.flink_db.kfk_fact_bill_master_12?t1??JOIN?hive_catalog.flink_db.dim_extend_shop_info???/*+?OPTIONS('streaming-source.enable'='true',??'streaming-source.partition.include'?=?'latest',??'streaming-source.monitor-interval'?=?'1?h','streaming-source.partition-order'?=?'partition-name')?*/FOR?SYSTEM_TIME?AS?OF?t1.proctime?AS?t2?--時態表??ON?t1.groupID?=?t2.group_id?and?t1.shopID?=?t2.shop_id??where?groupID?in?(202042))?t??where?t.rn?=?1?
參數解釋:
-
streaming-source.enable?開啟流式讀取 Hive 數據。
-
streaming-source.partition.include?有以下兩個值:
-
latest 屬性: 只讀取最新分區數據。
-
all: 讀取全量分區數據 ,默認值為 all,表示讀所有分區,latest 只能用在 temporal join 中,用于讀取最新分區作為維表,不能直接讀取最新分區數據。
-
-
streaming-source.monitor-interval?監聽新分區生成的時間、不宜過短 、最短是1 個小時,因為目前的實現是每個 task 都會查詢 metastore,高頻的查可能會對metastore 產生過大的壓力。需要注意的是,1.12.1 放開了這個限制,但仍建議按照實際業務不要配個太短的 interval。
-
streaming-source.partition-order?分區策略,主要有以下 3 種,其中最為推薦的是?partition-name:
-
partition-name 使用默認分區名稱順序加載最新分區
-
create-time 使用分區文件創建時間順序
-
partition-time 使用分區時間順序
-
。。。。。。
因字數限制,不能上傳更多,獲取完整版pdf,請掃碼關注公眾號【五分鐘學大數據】,后臺發送:flink pdf,即可下載帶目錄的完整版flink文檔:?
?
總結
以上是生活随笔為你收集整理的Flink保姆级教程,超全五万字,学习与面试收藏这一篇就够了的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [AtCoder Grand Conte
- 下一篇: 企业邮箱购买价格