Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图
Flink核心篇,四大基石、容錯機制、廣播、反壓、序列化、內(nèi)存管理、資源管理
Flink基礎篇,基本概念、設計理念、架構(gòu)模型、編程模型、常用算子
Flink源碼篇,作業(yè)提交流程、作業(yè)調(diào)度流程、作業(yè)內(nèi)部轉(zhuǎn)換流程圖
1、Flink作業(yè)提交流程應該了解吧?
2、Flink作業(yè)提交分為幾種方式?
3、Flink JobGraph是在什么時候生成的?
4、那在 JobGraph 提交集群之前都經(jīng)歷哪些過程?
5、PipeExecutor,它有哪些實現(xiàn)類?
6、Local提交模式有啥特點,怎么實現(xiàn)的?
7、遠程提交模式都有哪些?
8、Standalone模式簡單介紹一下?
9、yarn-session模式特點?
10、yarn-perJob模式特點?
11、yarn-application模式特點?
12、yarn-session 提交流程詳細介紹一下?
13、yarn-perJob 提交流程詳細介紹一下?
14、流圖、作業(yè)圖、執(zhí)行圖三者區(qū)別?
15、流圖(StreamGraph)介紹一下?
16、作業(yè)圖(JobGraph)介紹一下?
17、執(zhí)行圖(ExecutionGraph)介紹一下?
18、Flink調(diào)度器的概念介紹一下?
19、Flink調(diào)度行為包含幾種?
20、Flink調(diào)度模式包含幾種?
21、Flink調(diào)度策略包含幾種?
22、Flink作業(yè)生命周期包含哪些狀態(tài)?
23、Task的作業(yè)生命周期包含哪些狀態(tài)?
24、Flink的任務調(diào)度流程講解一下?
25、Flink的任務槽是什么意思?
26、Flink 槽共享又是什么意思?
1、Flink作業(yè)提交流程應該了解吧?
Flink的提交流程:
在Flink Client中,通過反射啟動jar中的main函數(shù),生成Flink StreamGraph和JobGraph,將JobGraph提交給Flink集群
Flink集群收到JobGraph(JobManager收到)后,將JobGraph翻譯成ExecutionGraph,然后開始調(diào)度,啟動成功之后開始消費數(shù)據(jù)。
總結(jié)來說:Flink核心執(zhí)行流程,對用戶API的調(diào)用可以轉(zhuǎn)為 StreamGraph -> JobGraph -> ExecutionGraph。
2、Flink作業(yè)提交分為幾種方式?
Flink的作業(yè)提交分為兩種方式
Local 方式:即本地提交模式,直接在IDEA運行代碼。
遠程提交方式:分為Standalone方式、yarn方式、K8s方式
Yarn方式分為三種提交模式:Yarn-perJob模式、Yarn-Sessionmo模式、Yarn-Application模式
3、Flink JobGraph是在什么時候生成的?
StreamGraph、JobGraph全部是在Flink Client 客戶端生成的,即提交集群之前生成,原理圖如下:
4、那在 JobGraph 提交集群之前都經(jīng)歷哪些過程?
(1)用戶通過啟動Flink集群,使用命令行提交作業(yè),運行 flink run -c WordCount xxx.jar
(2)運行命令行后,會通過run腳本調(diào)用CliFrontend入口,CliFrontend會觸發(fā)用戶提交的jar文件中的main方法,然后交給PipelineExecuteor # execute方法,最終根據(jù)提交的模式選擇觸發(fā)一個具體的PipelineExecutor執(zhí)行。
(3)根據(jù)具體的PipelineExecutor執(zhí)行,將對用戶的代碼進行編譯生成streamGraph,經(jīng)過優(yōu)化后生成jobgraph。
5、PipeExecutor,它有哪些實現(xiàn)類?
PipeExecutor 在Flink中被叫做 流水線執(zhí)行器,它是一個接口,是Flink Client生成JobGraph之后,將作業(yè)提交給集群的重要環(huán)節(jié),前面說過,作業(yè)提交到集群有好幾種方式,最常用的是yarn方式,yarn方式包含3種提交模式,主要使用 session模式,perjob模式,Application模式,jobGraph是在集群中生成。
所以PipeExecutor的實現(xiàn)類如下圖所示:
除了上述框的兩種模式外,在IDEA環(huán)境中運行Flink MiniCluster 進行調(diào)試時,使用LocalExecutor。
6、Local提交模式有啥特點,怎么實現(xiàn)的?
Local是在本地IDEA環(huán)境中運行的提交方式。不上集群。主要用于調(diào)試,原理圖如下:
Flink程序由JobClient進行提交
JobClient將作業(yè)提交給JobManager
JobManager負責協(xié)調(diào)資源分配和作業(yè)執(zhí)行。資源分配完成后,任務將提交給相應的TaskManager
TaskManager啟動一個線程開始執(zhí)行,TaskManager會向JobManager報告狀態(tài)更改,如開始執(zhí)行,正在進行或者已完成。
作業(yè)執(zhí)行完成后,結(jié)果將發(fā)送回客戶端。
源碼分析:通過Flink1.12.2源碼進行分析的
(1)創(chuàng)建獲取對應的StreamExecutionEnvironment對象:LocalStreamEnvironment
調(diào)用StreamExecutionEnvironment對象的execute方法
(2)獲取streamGraph
(3)執(zhí)行具體的PipeLineExecutor -> 得到localExecutorFactory
(4)獲取JobGraph
根據(jù)localExecutorFactory的實現(xiàn)類LocalExecutor生成JobGraph
上面這部分全部是在Flink Client生成的,由于是使用Local模式提交。所有接下來將創(chuàng)建MiniCluster集群,由miniCluster.submitJob指定要提交的jobGraph
(5)實例化MiniCluster集群
(6)返回JobClient 客戶端
在上面執(zhí)行miniCluster.submitJob 將JobGraph提交到本地集群后,會返回一個JobClient客戶端,該JobClient包含了應用的一些詳細信息,包括JobID,應用的狀態(tài)等等。最后返回到代碼執(zhí)行的上一層,對應類為StreamExecutionEnvironment。
以上就是Local模式的源碼執(zhí)行過程。
7、遠程提交模式都有哪些?
遠程提交方式:分為Standalone方式、yarn方式、K8s方式
Standalone:包含session模式
Yarn方式分為三種提交模式:Yarn-perJob模式、Yarn-Sessionmo模式、Yarn-Application模式。
K8s方式:包含 session模式
8、Standalone模式簡單介紹一下?
Standalone 模式為Flink集群的單機版提交方式,只使用一個節(jié)點進行提交,常用Session模式。
作業(yè)提交原理圖如下:
提交命令如下:
bin/flink run org.apache.flink.WordCount xxx.jar
client客戶端提交任務給JobManager
JobManager負責申請任務運行所需要的資源并管理任務和資源,
JobManager分發(fā)任務給TaskManager執(zhí)行
TaskManager定期向JobManager匯報狀態(tài)
9、yarn-session模式特點?
提交命令:
./bin/flink run -t yarn-session \?
-Dyarn.application.id=application_XXXX_YY? xxx.jar
Yarn-Session模式:所有作業(yè)共享集群資源,隔離性差,JM負載瓶頸,main方法在客戶端執(zhí)行。適合執(zhí)行時間短,頻繁執(zhí)行的短任務,集群中的所有作業(yè) 只有一個JobManager,另外,Job被隨機分配給TaskManager
特點:
Session-Cluster模式需要先啟動集群,然后再提交作業(yè),接著會向yarn申請一塊空間后,資源永遠保持不變。如果資源滿了,下一個作業(yè)就無法提交,只能等到 yarn中的其中一個作業(yè)執(zhí)行完成后,釋放了資源,下個作業(yè)才會正常提交。所有作業(yè)共享Dispatcher和ResourceManager;共享資源;適合規(guī)模小執(zhí)行時間短的作業(yè)。
10、yarn-perJob模式特點?
提交命令:
./bin/flink run -t yarn-per-job --detached? xxx.jar
Yarn-Per-Job模式:每個作業(yè)單獨啟動集群,隔離性好,JM負載均衡,main方法在客戶端執(zhí)行。在per-job模式下,每個Job都有一個JobManager,每個TaskManager只有單個Job。
特點:
一個任務會對應一個Job,每提交一個作業(yè)會根據(jù)自身的情況,都會單獨向yarn申請資源,直到作業(yè)執(zhí)行完成,一個作業(yè)的失敗與否并不會影響下一個作業(yè)的正常提交和運行。獨享Dispatcher 和 ResourceManager,按需接受資源申請;適合規(guī)模大長時間運行的作業(yè)。
11、yarn-application模式特點?
提交命令:
./bin/flink run-application -t yarn-application xxx.jar
Yarn-Application模式:每個作業(yè)單獨啟動集群,隔離性好,JM負載均衡,main方法在JobManager上執(zhí)行。
特點:
在yarn-per-job 和 yarn-session模式下,客戶端都需要執(zhí)行以下三步,即:
獲取作業(yè)所需的依賴項;
通過執(zhí)行環(huán)境分析并取得邏輯計劃,即StreamGraph -> JobGraph;
將依賴項和JobGraph上傳到集群中。
只有在這些都完成之后,才會通過env.execute()方法 觸發(fā) Flink運行時真正地開始執(zhí)行作業(yè)。
如果所有用戶都在同一個客戶端上提交作業(yè),較大的依賴會消耗更多的帶寬,而較復雜的作業(yè)邏輯翻譯成JobGraph也需要吃掉更多的CPU和內(nèi)存,客戶端的資源反而會成為瓶頸。
為了解決它,社區(qū)在傳統(tǒng)部署模式的基礎上實現(xiàn)了 Application模式。原本需要客戶端做的三件事被轉(zhuǎn)移到了JobManager里,也就是說main()方法在集群中執(zhí)行(入口點位于 ApplicationClusterEntryPoint ),客戶端只需要負責發(fā)起部署請求了
綜上所述,Flink社區(qū)比較推薦使用?yarn-perjob?或者?yarn-application模式進行提交應用。
12、yarn-session 提交流程詳細介紹一下?
提交流程圖如下:
1、啟動集群
(1)Flink Client 向 Yarn ResourceManager提交任務信息。
Flink Client 將應用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相關(guān)文件(Flink Jar、配置類文件、用戶Jar文件、JobGraph對象等)上傳至分布式存儲HDFS中。
Flink Client 向 Yarn ResourceManager提交任務信息
(2)Yarn 啟動 Flink集群,做2步操作:
通過Yarn Client 向Yarn ResourceManager提交Flink創(chuàng)建集群的申請,Yarn ResourceManager 分配 Container 資源,并通知對應的 NodeManager 上啟動一個ApplicationMaster(每提交一個flink job 就會啟動一個applicationMaster),ApplicationMaster會包含當前要啟動的 JobManager和 Flink自己內(nèi)部要使用的ResourceManager。
在 JobManager 進程中運行 YarnSessionClusterEntryPoint 作為集群啟動的入口。
初始化Dispatcher,Flink自己內(nèi)部要使用的ResourceManager,啟動相關(guān)RPC服務,等待Flink Client 通過Rest接口提交JobGraph。
2、作業(yè)提交
(3)Flink Client 通過Rest 向Dispatcher 提交編譯好的JobGraph。Dispatcher 是 Rest 接口,不負責實際的調(diào)度、指定工作。
(4)Dispatcher 收到 JobGraph 后,為作業(yè)創(chuàng)建一個JobMaster,將工作交給JobMaster,JobMaster負責作業(yè)調(diào)度,管理作業(yè)和Task的生命周期,構(gòu)建ExecutionGraph(JobGraph的并行化版本,調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu))
以上兩步執(zhí)行完后,作業(yè)進入調(diào)度執(zhí)行階段。
3、作業(yè)調(diào)度執(zhí)行
(5)JobMaster向ResourceManager申請資源,開始調(diào)度ExecutionGraph。
(6)ResourceManager將資源請求加入等待隊列,通過心跳向YarnResourceManager申請新的Container來啟動TaskManager進程。
(7)YarnResourceManager啟動,然后從HDFS加載Jar文件等所需相關(guān)資源,在容器中啟動TaskManager,TaskManager啟動TaskExecutor
(8)TaskManager啟動后,向ResourceManager 注冊,并把自己的Slot資源情況匯報給ResourceManager。
(9)ResourceManager從等待隊列取出Slot請求,向TaskManager確認資源可用情況,并告知TaskManager將Slot分配給哪個JobMaster。
(10)TaskManager向JobMaster回復自己的一個Slot屬于你這個任務,JobMaser會將Slot緩存到SlotPool。
(11)JobMaster調(diào)度Task到TaskMnager的Slot上執(zhí)行。
13、yarn-perJob 提交流程詳細介紹一下?
提交命令如下:
./bin/flink run -t yarn-per-job --detached? xxx.jar
提交流程圖如下所示:
1、啟動集群
(1)Flink Client 向 Yarn ResourceManager提交任務信息。
Flink Client 將應用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相關(guān)文件(Flink Jar、配置類文件、用戶Jar文件、JobGraph對象等)上傳至分布式存儲HDFS中。
Flink Client 向 Yarn ResourceManager提交任務信息
(2)Yarn 啟動 Flink集群,做2步操作:
通過Yarn Client 向Yarn ResourceManager提交Flink創(chuàng)建集群的申請,Yarn ResourceManager 分配 Container 資源,并通知對應的 NodeManager 上啟動一個ApplicationMaster(每提交一個flink job 就會啟動一個applicationMaster),ApplicationMaster會包含當前要啟動的 JobManager和 Flink自己內(nèi)部要使用的ResourceManager。
在 JobManager 進程中運行 YarnJobClusterEntryPoint 作為集群啟動的入口。
初始化Dispatcher,Flink自己內(nèi)部要使用的ResourceManager,啟動相關(guān)RPC服務,等待Flink Client 通過Rest接口提交JobGraph。
2、作業(yè)提交
(3)ApplicationMaster啟動Dispatcher,Dispatcher啟動ResourceManager和JobMaster(該步和Session不同,JobMaster是由Dispatcher拉起,而不是Client傳過來的)。
(4)JobMaster負責作業(yè)調(diào)度,管理作業(yè)和Task的生命周期,構(gòu)建ExecutionGraph(JobGraph的并行化版本,調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu))
以上兩步執(zhí)行完后,作業(yè)進入調(diào)度執(zhí)行階段。
3、作業(yè)調(diào)度執(zhí)行
(5)JobMaster向ResourceManager申請資源,開始調(diào)度ExecutionGraph。
(6)ResourceManager將資源請求加入等待隊列,通過心跳向YarnResourceManager申請新的Container來啟動TaskManager進程。
(7)YarnResourceManager啟動,然后從HDFS加載Jar文件等所需相關(guān)資源,在容器中啟動TaskManager,TaskManager啟動TaskExecutor
(8)TaskManager啟動后,向ResourceManager 注冊,并把自己的Slot資源情況匯報給ResourceManager。
(9)ResourceManager從等待隊列取出Slot請求,向TaskManager確認資源可用情況,并告知TaskManager將Slot分配給哪個JobMaster。
(10)TaskManager向JobMaster回復自己的一個Slot屬于你這個任務,JobMaser會將Slot緩存到SlotPool。
(11)JobMaster調(diào)度Task到TaskMnager的Slot上執(zhí)行。
14、流圖、作業(yè)圖、執(zhí)行圖三者區(qū)別?
Flink內(nèi)部Graph總覽圖,由于現(xiàn)在Flink 實行流批一體代碼,Batch API基本廢棄,就不過多介紹在Flink DataStramAPI 中,Graph內(nèi)部轉(zhuǎn)換圖如下:
以WordCount為例,流圖、作業(yè)圖、執(zhí)行圖、物理執(zhí)行圖之間的Task調(diào)度如下:
對于Flink 流計算應用,運行用戶代碼時,首先調(diào)用DataStream API ,將用戶代碼轉(zhuǎn)換為 Transformation,然后經(jīng)過:StreamGraph->JobGraph->ExecutionGraph 3層轉(zhuǎn)換(這些都是Flink內(nèi)置的數(shù)據(jù)結(jié)構(gòu)),最后經(jīng)過Flink調(diào)度執(zhí)行,在Flink 集群中啟動計算任務,形成一個物理執(zhí)行圖。
15、流圖(StreamGraph)介紹一下?
流圖 StreamGraph
流圖StreamGraph 核心對象包括兩個:StreamNode 點 和 StreamEdge 邊
1)StreamNode 點
StreamNode 點 ,從 Transformation轉(zhuǎn)換而來,可以簡單理解為 StreamNode 表示一個算子,存在實體和虛擬,可以有多個輸入和輸出,實體StreamNode 最終變成物理算子,虛擬的附著在StreamEdge 邊 上。
2)StreamEdge 邊
StreamEdge 是 StreamGraph 的邊,用來連接兩個StreamNode 點,一個StreamEdge可以有多個出邊、入邊等信息。
16、作業(yè)圖(JobGraph)介紹一下?
作業(yè)圖 JobGraph
JobGraph是由StreamGraph優(yōu)化而來,是通過OperationChain 機制將算子合并起來,在執(zhí)行時,調(diào)度在同一個Task線程上,避免數(shù)據(jù)的跨線程,跨網(wǎng)絡傳遞。
作業(yè)圖JobGraph 核心對象包括三個:
JobVertex 點 、 JobEdge 邊、IntermediateDataSet 中間數(shù)據(jù)集
1)JobVertex 點
經(jīng)過算子融合優(yōu)化后符合條件的多個StreamNode 可能會融合在一起生成一個 JobVertex,即一個JobVertex 包含一個或多個算子, JobVertex 的輸入是 JobEdge. 輸出是 IntermediateDataSet
2)JobEdge 邊
JobEdge 表示 JobGraph 中的一 個數(shù)據(jù)流轉(zhuǎn)通道, 其上游數(shù)據(jù)源是 IntermediateDataSet ,下游消費者是 JobVertex 。即數(shù)據(jù)通過 JobEdge 由IntermediateDataSet 傳遞給目標 JobVertex
JobEdge 中的數(shù)據(jù)分發(fā)模式,會直接影響執(zhí)行時 Task 之間的數(shù)據(jù)連接關(guān)系,是點對點連接還是全連接。
3)IntermediateDataSet 中間數(shù)據(jù)集
中間數(shù)據(jù)集 IntermediateDataSet 是一種邏輯結(jié)構(gòu),用來表示 JobVertex 的輸出,即經(jīng)過算子處理產(chǎn)生的數(shù)據(jù)集。不同的執(zhí)行模式下,其對應的結(jié)果分區(qū)類型不同,決 定了在執(zhí)行時刻數(shù)據(jù)交換的模式。
17、執(zhí)行圖(ExecutionGraph)介紹一下?
執(zhí)行圖 ExecutionGraph
ExecutionGraph是調(diào)度Flink 作業(yè)執(zhí)行的核心數(shù)據(jù)結(jié)構(gòu),包含了作業(yè)中所有并行執(zhí)行的Task信息、Task之間的關(guān)聯(lián)關(guān)系、數(shù)據(jù)流轉(zhuǎn)關(guān)系。
StreamGraph 和JobGraph都在Flink Client生成,然后交給Flink集群。
JobGraph 到 ExecutionGraph 在JobMaster中完成,轉(zhuǎn)換過程中重要變化如下:
加入了并行度的概念,成為真正可調(diào)度的圖結(jié)構(gòu),生成了6個核心對象。
執(zhí)行圖 ExecutionGraph 核心對象包括6個:
ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition、ExecutionEdge、Execution。
1)ExecutionJobVertex
該對象和 JobGraph 中的 JobVertex 一 一對應。該對象還包含一組 ExecutionVertex, 數(shù)量 與該 JobVertex 中所包含的StreamNode 的并行度一致,假設 StreamNode 的并行度為5 ,那么ExecutionJobVertex中也會包含 5個ExecutionVertex。
ExecutionJobVertex用來將一個JobVertex 封裝成 ExecutionJobVertex,并依次創(chuàng)建 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用于豐富ExecutionGraph。
2)ExecutionVertex
ExecutionJobVertex會對作業(yè)進行并行化處理,構(gòu)造可以并行執(zhí)行的實例,每一個并行執(zhí)行的實例就是 ExecutionVertex。
3)IntermediateResult
IntermediateResult 又叫作中間結(jié)果集,該對象是個邏輯概念 表示 ExecutionJobVertex輸出,和 JobGrap 中的IntermediateDalaSet 一 一對應,同樣 一個ExecutionJobVertex 可以有多個中間結(jié)果,取決于當前 JobVertex 有幾個出邊(JobEdge)。
4)IntermediateResultPartition
IntermediateResultPartition 又叫作中間結(jié)果分區(qū)。表示一個 ExecutionVertex分區(qū)的輸出結(jié)果,與 Execution Edge 相關(guān)聯(lián)。
5)ExecutionEdge
表示ExecutionVertex 的輸入,連接到上游產(chǎn)生的IntermediateResultPartition,一個Execution對應唯一的一個IntermediateResultPartition 和一個ExecutionVertex,一個ExecutionVertex 可以有多個ExecutionEdge。
6)Execution
ExecutionVertex 相當于每個 Task 的模板,在真正執(zhí)行的時候,會將ExecutionVertex中的信息包裝為一個Execution,執(zhí)行一個ExecutionVertex的一次嘗試。
當發(fā)生故障或者數(shù)據(jù)需要重算的情況下 ExecutionVertex 可能會有多個 ExecutionAttemptID。一個 Execution 通過 ExecutionAttemptID 來唯一標識。
JM和TM之間關(guān)于 task 的部署和 task status 的更新都是通過 ExecutionAttemptID 來確定消息接受者
小結(jié),從這些基本概念中,也可以看出以下?點:
由于每個 JobVertex 可能有多個 IntermediateDataSet,所以每個ExecutionJobVertex 可能有多個 IntermediateResult,因此,每個ExecutionVertex 也可能會包含多個 IntermediateResultPartition;
ExecutionEdge 這里主要的作?是把 ExecutionVertex 和 IntermediateResultPartition 連接起來,表示它們之間的連接關(guān)系。
18、Flink調(diào)度器的概念介紹一下?
調(diào)度器是 Flink 作業(yè)執(zhí)行的核心組件,管理作業(yè)執(zhí)行的所有相關(guān)過程,包括JobGraph到ExecutionGraph的轉(zhuǎn)換、作業(yè)生命周期管理(作業(yè)的發(fā)布、取消、停止)、作業(yè)的Task生命周期管理(Task的發(fā)布、取消、停止)、資源申請與釋放、作業(yè)和Task的Faillover等。
調(diào)度器作用:
作業(yè)的生命周期管理,如作業(yè)的發(fā)布、掛起、取消
作業(yè)執(zhí)行資源的申請、分配、釋放
作業(yè)的狀態(tài)管理,作業(yè)發(fā)布過程中的狀態(tài)變化和作業(yè)異常時的FailOver等
作業(yè)的信息提供,對外提供作業(yè)的詳細信息
調(diào)度有幾個重要的組件:
調(diào)度器:SchedulerNG及其子類、實現(xiàn)類
調(diào)度策略:SchedulingStrategy及其實現(xiàn)類
調(diào)度模式:ScheduleMode包含流和批的調(diào)度,有各自不同的調(diào)度模式
19、Flink調(diào)度行為包含幾種?
調(diào)度行為包含四種:
SchedulerStrategy接口定義了調(diào)度行為,其中包含4種行為:
1)startScheduling:調(diào)度入口,觸發(fā)調(diào)度器的調(diào)度行為
(2)restartTasks:重啟執(zhí)行失敗的Task,一般是Task執(zhí)行異常導致的。
(3)onExecutionStateChange:當Execution狀態(tài)發(fā)生改變時。
(4)onPartitionConsumable:當IntermediateResultPartition中的數(shù)據(jù)可以消費時。
20、Flink調(diào)度模式包含幾種?
調(diào)度模式包含3種:Eager模式、分階段模式(Lazy_From_Source)、分階段Slot重用模式(Lazy_From_Sources_With_Batch_Slot_Request)。
1)Eager 調(diào)度
適用于流計算。一次性申請需要的所有資源,如果資源不足,則作業(yè)啟動失敗。
2)分階段調(diào)度
LAZY_FROM_SOURCES 適用于批處理。從 SourceTask 開始分階段調(diào)度,申請資源的時候,一次性申請本階段所需要的所有資源。
上游 Task 執(zhí)行完畢后開始調(diào)度執(zhí)行下游的 Task,讀取上游的數(shù)據(jù),執(zhí)行本階段的計算任務,執(zhí)行完畢之后,調(diào)度后一個階段的 Task,依次進行調(diào)度,直到作業(yè)完成。
3)分階段 Slot 重用調(diào)度
LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 適用于批處理。與分階段調(diào)度基本一樣,區(qū)別在于該模式下使用批處理資源申請模式,可以在資源不足的情況下執(zhí)行作業(yè),但是需要確保在本階段的作業(yè)執(zhí)行中沒有 Shuffle 行為。
目前視線中的 Eager 模式和 LAZY_FROM_SOURCES 模式的資源申請邏輯一樣,LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 是單獨的資源申請邏輯。
21、Flink調(diào)度策略包含幾種?
調(diào)度策略包含3種:
調(diào)度策略全部實現(xiàn)于調(diào)度器SchedulingStrategy,有三種實現(xiàn):
EagerSchedulingStrategy:適用于流計算,同時調(diào)度所有的 task
LazyFromSourcesSchedulingStrategy:適用于批處理,當輸入數(shù)據(jù)準備好時(上游處理完)進行 vertices 調(diào)度。
PipelinedRegionSchedulingStrategy:以流水線的局部為粒度進行調(diào)度
PipelinedRegionSchedulingStrategy 是 1.11 加入的,從 1.12 開始,將以 pipelined region為單位進行調(diào)度。
pipelined region 是一組流水線連接的任務。這意味著,對于包含多個 region的流作業(yè),在開始部署任務之前,它不再等待所有任務獲取 slot。取而代之的是,一旦任何region 獲得了足夠的任務 slot 就可以部署它。對于批處理作業(yè),將不會為任務分配 slot,也不會單獨部署任務。取而代之的是,一旦某個 region 獲得了足夠的 slot,則該任務將與所有其他任務一起部署在同一區(qū)域中
22、Flink作業(yè)生命周期包含哪些狀態(tài)?
在Flink集群中,JobMaster 負責作業(yè)的生命周期管理,具體的管理行為在調(diào)度器和ExecutionGraph中實現(xiàn)。
作業(yè)的完整生命周期狀態(tài)變換如下圖所示:
(1)作業(yè)首先處于創(chuàng)建狀態(tài)(created),然后切換到運行狀態(tài)(running),并且在完成所有工作后,它將切換到完成狀態(tài)(finished)。
(2)在失敗的情況下,作業(yè)首先切換到失敗狀態(tài)(failing),取消所有正在運行任務。
如果所有節(jié)點都已達到最終狀態(tài),并且作業(yè)不可重新啟動,則狀態(tài)將轉(zhuǎn)換為失敗(failed)。
(3)如果作業(yè)可以重新啟動,那么它將進入重新啟動狀態(tài)(restarting)。一旦完成重新啟動,它將變成創(chuàng)建狀態(tài)(created)。
(4)在用戶取消作業(yè)的情況下,將進入取消狀態(tài)(cancelling),會取消所有當前正在運行的任務。一旦所有運行的任務已經(jīng)達到最終狀態(tài),該作業(yè)將轉(zhuǎn)換到已取消狀態(tài)(canceled)。
完成狀態(tài)(finished),取消狀態(tài)(canceled),失敗狀態(tài)(failed)表示一個全局的終結(jié)狀態(tài),并且觸發(fā)清理工作,而暫停狀態(tài)(suspended)僅處于本地終止狀態(tài)。意味著作業(yè)的執(zhí)行在相應的 JobManager 上終止,但集群的另一個 JobManager 可以從持久的HA存儲中恢復這個作業(yè)并重新啟動。因此,處于暫停狀態(tài)的作業(yè)將不會被完全清理。
23、Task的作業(yè)生命周期包含哪些狀態(tài)?
TaskManager 負責Task 的生命周期管理,并將狀態(tài)的變化通知到JobMaster,在ExecutionGraph中跟蹤Execution的狀態(tài)變化,一個Execution對于一個Task。
Task的生命周期如下:共8種狀態(tài)。
在執(zhí)行 ExecutionGraph 期間,每個并行任務經(jīng)過多個階段,從創(chuàng)建(created)到完成(finished)或失敗(failed) ,上圖說明了它們之間的狀態(tài) 和 可能的轉(zhuǎn)換。任務可以執(zhí)行多次(例如故障恢復)。
每個 Execution 跟蹤一個 ExecutionVertex 的執(zhí)行,每個 ExecutionVertex 都有一個當前 Execution(current execution)和一個前驅(qū) Execution(prior execution)。
24、Flink的任務調(diào)度流程講解一下?
任務調(diào)度流程圖如下
1、當Flink執(zhí)行executor會自動根據(jù)程序代碼生成DAG數(shù)據(jù)流圖 ,即 Jobgraph;
2、ActorSystem創(chuàng)建Actor將數(shù)據(jù)流圖發(fā)送給JobManager中的Actor;
3、JobManager會不斷接收TaskManager的心跳消息,從而可以獲取到有效的TaskManager;
4、JobManager通過調(diào)度器在TaskManager中調(diào)度執(zhí)行Task(在Flink中,最小的調(diào)度單元就是task,對應就是一個線程);
5、在程序運行過程中,task與task之間是可以進行數(shù)據(jù)傳輸?shù)?/p>
Job Client
主要職責是提交任務,提交后可以結(jié)束進程,也可以等待結(jié)果返回;
Job Client 不是 Flink 程序執(zhí)行的內(nèi)部部分,但它是任務執(zhí)行的起點;
Job Client 負責接受用戶的程序代碼,然后創(chuàng)建數(shù)據(jù)流,將數(shù)據(jù)流提交給JobManager 以便進一步執(zhí)行。執(zhí)行完成后,Job Client 將結(jié)果返回給用戶。
JobManager
主要職責是調(diào)度工作并協(xié)調(diào)任務做檢查點;
集群中至少要有一個 master,master 負責調(diào)度 task,協(xié)調(diào)checkpoints 和 容錯;
高可用設置的話可以有多個 master,但要保證一個是 leader,其他是stand by;
JobManager 包含 ActorSystem、Scheduler、CheckPoint三個重要的組件 ;
JobManager從客戶端接收到任務以后,首先生成優(yōu)化過的執(zhí)行計劃,再調(diào)度到TaskManager中執(zhí)行。
TaskManager
主要職責是從JobManager處接收任務,并部署和啟動任務,接收上游的數(shù) 據(jù)并處理
TaskManager 是在 JVM 中的一個或多個線程中執(zhí)行任務的工作節(jié)點。
TaskManager在創(chuàng)建之初就設置好了Slot,每個Slot可以執(zhí)行一個任務。
25、Flink的任務槽是什么意思?
每個TaskManager是一個JVM的進程, 可以在不同的線程中執(zhí)行一個或多個子任務。為了控制一個worker能接收多少個task。worker通過task slot來進行控制(一個worker 至少有一個task slot)。
1、任務槽
每個task slot表示TaskManager擁有資源的一個固定大小的子集。
一般來說:我們分配槽的個數(shù)都是和CPU的核數(shù)相等,比如8核,那么就分配8個槽。
Flink將進程的內(nèi)存劃分到多個slot中。
圖中有2個TaskManager,每個TaskManager有3個slot,每個slot占有1/3的內(nèi)存。
內(nèi)存被劃分到不同的slot之后可以獲得如下好處:
TaskManager最多能同時并發(fā)執(zhí)行的任務是可以控制的,那就是3個,因為不能超過slot的數(shù)量。任務槽的作用就是分離任務的托管內(nèi)存,不會發(fā)生cpu隔離。
slot有獨占的內(nèi)存空間,這樣在一個TaskManager中可以運行多個不同的作業(yè),作業(yè)之間不受影響。
總結(jié):task slot的個數(shù)代表TaskManager可以并行執(zhí)行的task數(shù)。
26、Flink 槽共享又是什么意思?
2、槽共享
默認情況下,Flink允許子任務共享插槽,即使它們是不同任務的子任務,只要它們來自同一個作業(yè)。結(jié)果是一個槽可以保存作業(yè)的整個管道。允許插槽共享有兩個主要好處:
只需計算Job中最高并行度(parallelism)的task slot。只要這個滿足,其他的job也都能滿足。
資源分配更加公平。如果有比較空閑的slot可以將更多的任務分配給它。圖中若沒有任務槽共享,負載不高的Source/Map等subtask將會占據(jù)許多資源,而負載較高的窗口subtask則會缺乏資源。
有了任務槽共享,可以將基本并行度(base parallelism)從2提升到6。提高了分槽資源的利用率。同時它還可以保障TaskManager給subtask的分配的slot方案更加公平。
end
Flink 從入門到精通?系列文章基于 Apache Flink 的實時監(jiān)控告警系統(tǒng) 關(guān)于數(shù)據(jù)中臺的深度思考與總結(jié)(干干貨) 日志收集Agent,陰暗潮濕的地底世界公眾號(zhisheng)里回復?面經(jīng)、ClickHouse、ES、Flink、?Spring、Java、Kafka、監(jiān)控?等關(guān)鍵字可以查看更多關(guān)鍵字對應的文章。點個贊+在看,少個 bug?👇總結(jié)
以上是生活随笔為你收集整理的Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎样创建和提交谷歌站点地图?
- 下一篇: 大学毕业生如何成功应聘高薪IT职位 [转