追源索骥:透过源码看懂Flink核心框架的执行流程
https://www.cnblogs.com/bethunebtj/p/9168274.html
- 追源索驥:透過(guò)源碼看懂Flink核心框架的執(zhí)行流程
- 前言
- 1.從 Hello,World WordCount開(kāi)始
- 1.1 flink執(zhí)行環(huán)境
- 1.2 算子(Operator)的注冊(cè)(聲明)
- 1.3 程序的執(zhí)行
- 1.3.1 本地模式下的execute方法
- 1.3.2 遠(yuǎn)程模式(RemoteEnvironment)的execute方法
- 1.3.3 程序啟動(dòng)過(guò)程
- 2.理解flink的圖結(jié)構(gòu)
- 2.1 flink的三層圖結(jié)構(gòu)
- 2.2 StreamGraph的生成
- 2.2.1 StreamTransformation類代表了流的轉(zhuǎn)換
- 2.2.2 StreamGraph生成函數(shù)分析
- 2.2.3 WordCount函數(shù)的StreamGraph
- 2.3 JobGraph的生成
- 2.3.1 JobGraph生成源碼
- 2.3.2 operator chain的邏輯
- 2.3.3 JobGraph的提交
- 2.4 ExecutionGraph的生成
- 3. 任務(wù)的調(diào)度與執(zhí)行
- 3.1 計(jì)算資源的調(diào)度
- 3.2 JobManager執(zhí)行job
- 3.2.1 JobManager的組件
- 3.2.2 JobManager的啟動(dòng)過(guò)程
- 3.2.3 JobManager啟動(dòng)Task
- 3.3 TaskManager執(zhí)行task
- 3.3.1 TaskManager的基本組件
- 3.3.2 TaskManager執(zhí)行Task
- 3.3.2.1 生成Task對(duì)象
- 3.3.2.2 運(yùn)行Task對(duì)象
- 3.3.2.3 StreamTask的執(zhí)行邏輯
- 3.4 StreamTask與StreamOperator
- 4. StreamOperator的抽象與實(shí)現(xiàn)
- 4.1 數(shù)據(jù)源的邏輯——StreamSource與時(shí)間模型
- 4.2 從數(shù)據(jù)輸入到數(shù)據(jù)處理——OneInputStreamOperator & AbstractUdfStreamOperator
- 4.3 StreamSink
- 5. 為執(zhí)行保駕護(hù)航——Fault Tolerant與保證Exactly-Once語(yǔ)義
- 5.1 Fault Tolerant演進(jìn)之路
- 5.1.1 Storm的Record acknowledgement模式
- 5.1.2 Spark streaming的micro batch模式
- 5.1.3 Google Cloud Dataflow的事務(wù)式模型
- 5.1.4 Flink的分布式快照機(jī)制
- 5.2 checkpoint的生命周期
- 5.2.1 觸發(fā)checkpoint
- 5.2.2 Task層面checkpoint的準(zhǔn)備工作
- 5.2.3 操作符的狀態(tài)保存及barrier傳遞
- 5.3 承載checkpoint數(shù)據(jù)的抽象:State & StateBackend
- 5.1 Fault Tolerant演進(jìn)之路
- 6.數(shù)據(jù)流轉(zhuǎn)——Flink的數(shù)據(jù)抽象及數(shù)據(jù)交換過(guò)程
- 6.1 flink的數(shù)據(jù)抽象
- 6.1.1 MemorySegment
- 6.1.2 ByteBuffer與NetworkBufferPool
- 6.1.3 RecordWriter與Record
- 6.2 數(shù)據(jù)流轉(zhuǎn)過(guò)程
- 6.2.1 整體過(guò)程
- 6.2.2 數(shù)據(jù)跨task傳遞
- 6.3 Credit漫談
- 6.3.1 背壓?jiǎn)栴}
- 6.3.2 使用Credit實(shí)現(xiàn)ATM網(wǎng)絡(luò)流控
- 6.1 flink的數(shù)據(jù)抽象
- 7.其他核心概念
- 7.1 EventTime時(shí)間模型
- 7.2 FLIP-6 部署及處理模型演進(jìn)
- 7.2.1 現(xiàn)有模型不足
- 7.2.2 核心變更
- 7.2.3 Cluster Manager的架構(gòu)
- 7.2.4 組件設(shè)計(jì)及細(xì)節(jié)
- 8.后記
前言
Flink是大數(shù)據(jù)處理領(lǐng)域最近很火的一個(gè)開(kāi)源的分布式、高性能的流式處理框架,其對(duì)數(shù)據(jù)的處理可以達(dá)到毫秒級(jí)別。本文以一個(gè)來(lái)自官網(wǎng)的WordCount例子為引,全面闡述flink的核心架構(gòu)及執(zhí)行流程,希望讀者可以借此更加深入的理解Flink邏輯。
本文跳過(guò)了一些基本概念,如果對(duì)相關(guān)概念感到迷惑,請(qǐng)參考官網(wǎng)文檔。另外在本文寫作過(guò)程中,Flink正式發(fā)布了其1.5 RELEASE版本,在其發(fā)布之后完成的內(nèi)容將按照1.5的實(shí)現(xiàn)來(lái)組織。
1.從?Hello,World?WordCount開(kāi)始
首先,我們把WordCount的例子再放一遍:
public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {if (args.length != 2){System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");return;}String hostName = args[0];Integer port = Integer.parseInt(args[1]);// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input dataDataStream<String> text = env.socketTextStream(hostName, port);text.flatMap(new LineSplitter()).setParallelism(1)// group by the tuple field "0" and sum up tuple field "1".keyBy(0).sum(1).setParallelism(1).print();// execute programenv.execute("Java WordCount from SocketTextStream Example");}/*** Implements the string tokenizer that splits sentences into words as a user-defined* FlatMapFunction. The function takes a line (String) and splits it into* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).*/public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}}首先從命令行中獲取socket對(duì)端的ip和端口,然后啟動(dòng)一個(gè)執(zhí)行環(huán)境,從socket中讀取數(shù)據(jù),split成單個(gè)單詞的流,并按單詞進(jìn)行總和的計(jì)數(shù),最后打印出來(lái)。這個(gè)例子相信接觸過(guò)大數(shù)據(jù)計(jì)算或者函數(shù)式編程的人都能看懂,就不過(guò)多解釋了。
1.1 flink執(zhí)行環(huán)境
程序的啟動(dòng),從這句開(kāi)始:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()。?
這行代碼會(huì)返回一個(gè)可用的執(zhí)行環(huán)境。執(zhí)行環(huán)境是整個(gè)flink程序執(zhí)行的上下文,記錄了相關(guān)配置(如并行度等),并提供了一系列方法,如讀取輸入流的方法,以及真正開(kāi)始運(yùn)行整個(gè)代碼的execute方法等。對(duì)于分布式流處理程序來(lái)說(shuō),我們?cè)诖a中定義的flatMap,keyBy等等操作,事實(shí)上可以理解為一種聲明,告訴整個(gè)程序我們采用了什么樣的算子,而真正開(kāi)啟計(jì)算的代碼不在此處。由于我們是在本地運(yùn)行flink程序,因此這行代碼會(huì)返回一個(gè)LocalStreamEnvironment,最后我們要調(diào)用它的execute方法來(lái)開(kāi)啟真正的任務(wù)。我們先接著往下看。
1.2 算子(Operator)的注冊(cè)(聲明)
我們以flatMap為例,text.flatMap(new LineSplitter())這一句話跟蹤進(jìn)去是這樣的:
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),getType(), Utils.getCallLocationName(), true);return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));}?
里面完成了兩件事,一是用反射拿到了flatMap算子的輸出類型,二是生成了一個(gè)Operator。flink流式計(jì)算的核心概念,就是將數(shù)據(jù)從輸入流一個(gè)個(gè)傳遞給Operator進(jìn)行鏈?zhǔn)教幚?#xff0c;最后交給輸出流的過(guò)程。對(duì)數(shù)據(jù)的每一次處理在邏輯上成為一個(gè)operator,并且為了本地化處理的效率起見(jiàn),operator之間也可以串成一個(gè)chain一起處理(可以參考責(zé)任鏈模式幫助理解)。下面這張圖表明了flink是如何看待用戶的處理流程的:抽象化為一系列operator,以source開(kāi)始,以sink結(jié)尾,中間的operator做的操作叫做transform,并且可以把幾個(gè)操作串在一起執(zhí)行。?
?
我們也可以更改flink的設(shè)置,要求它不要對(duì)某個(gè)操作進(jìn)行chain處理,或者從某個(gè)操作開(kāi)啟一個(gè)新chain等。?
上面代碼中的最后一行transform方法的作用是返回一個(gè)SingleOutputStreamOperator,它繼承了Datastream類并且定義了一些輔助方法,方便對(duì)流的操作。在返回之前,transform方法還把它注冊(cè)到了執(zhí)行環(huán)境中(后面生成執(zhí)行圖的時(shí)候還會(huì)用到它)。其他的操作,包括keyBy,sum和print,都只是不同的算子,在這里出現(xiàn)都是一樣的效果,即生成一個(gè)operator并注冊(cè)給執(zhí)行環(huán)境用于生成DAG。
1.3 程序的執(zhí)行
程序執(zhí)行即env.execute("Java WordCount from SocketTextStream Example")這行代碼。
1.3.1 本地模式下的execute方法
這行代碼主要做了以下事情:
- 生成StreamGraph。代表程序的拓?fù)浣Y(jié)構(gòu),是從用戶代碼直接生成的圖。
- 生成JobGraph。這個(gè)圖是要交給flink去生成task的圖。
- 生成一系列配置
- 將JobGraph和配置交給flink集群去運(yùn)行。如果不是本地運(yùn)行的話,還會(huì)把jar文件通過(guò)網(wǎng)絡(luò)發(fā)給其他節(jié)點(diǎn)。
- 以本地模式運(yùn)行的話,可以看到啟動(dòng)過(guò)程,如啟動(dòng)性能度量、web模塊、JobManager、ResourceManager、taskManager等等
- 啟動(dòng)任務(wù)。值得一提的是在啟動(dòng)任務(wù)之前,先啟動(dòng)了一個(gè)用戶類加載器,這個(gè)類加載器可以用來(lái)做一些在運(yùn)行時(shí)動(dòng)態(tài)加載類的工作。
1.3.2 遠(yuǎn)程模式(RemoteEnvironment)的execute方法
遠(yuǎn)程模式的程序執(zhí)行更加有趣一點(diǎn)。第一步仍然是獲取StreamGraph,然后調(diào)用executeRemotely方法進(jìn)行遠(yuǎn)程執(zhí)行。?
該方法首先創(chuàng)建一個(gè)用戶代碼加載器
?
然后創(chuàng)建一系列配置,交給Client對(duì)象。Client這個(gè)詞有意思,看見(jiàn)它就知道這里絕對(duì)是跟遠(yuǎn)程集群打交道的客戶端。
ClusterClient client;try {client = new StandaloneClusterClient(configuration);client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());}}try {return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();}?
client的run方法首先生成一個(gè)JobGraph,然后將其傳遞給JobClient。關(guān)于Client、JobClient、JobManager到底誰(shuí)管誰(shuí),可以看這張圖:?
?
確切的說(shuō),JobClient負(fù)責(zé)以異步的方式和JobManager通信(Actor是scala的異步模塊),具體的通信任務(wù)由JobClientActor完成。相對(duì)應(yīng)的,JobManager的通信任務(wù)也由一個(gè)Actor完成。
?
可以看到,該方法阻塞在awaitJobResult方法上,并最終返回了一個(gè)JobListeningContext,透過(guò)這個(gè)Context可以得到程序運(yùn)行的狀態(tài)和結(jié)果。
1.3.3 程序啟動(dòng)過(guò)程
上面提到,整個(gè)程序真正意義上開(kāi)始執(zhí)行,是這里:
遠(yuǎn)程模式和本地模式有一點(diǎn)不同,我們先按本地模式來(lái)調(diào)試。?
我們跟進(jìn)源碼,(在本地調(diào)試模式下)會(huì)啟動(dòng)一個(gè)miniCluster,然后開(kāi)始執(zhí)行代碼:
?
這個(gè)方法里有一部分邏輯是與生成圖結(jié)構(gòu)相關(guān)的,我們放在第二章里講;現(xiàn)在我們先接著往里跟:
//MiniCluster.java public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {checkNotNull(job, "job is null");//在這里,最終把job提交給了jobMasterfinal CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose((JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));......}?
正如我在注釋里寫的,這一段代碼核心邏輯就是調(diào)用那個(gè)submitJob方法。那么我們?cè)俳又催@個(gè)方法:
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {final DispatcherGateway dispatcherGateway;try {dispatcherGateway = getDispatcherGateway();} catch (LeaderRetrievalException | InterruptedException e) {ExceptionUtils.checkInterrupted(e);return FutureUtils.completedExceptionally(e);}// we have to allow queued scheduling in Flip-6 mode because we need to request slots// from the ResourceManagerjobGraph.setAllowQueuedScheduling(true);final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(//在這里執(zhí)行了真正的submit操作(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));return acknowledgeCompletableFuture.thenApply((Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));}?
這里的Dispatcher是一個(gè)接收job,然后指派JobMaster去啟動(dòng)任務(wù)的類,我們可以看看它的類結(jié)構(gòu),有兩個(gè)實(shí)現(xiàn)。在本地環(huán)境下啟動(dòng)的是MiniDispatcher,在集群上提交任務(wù)時(shí),集群上啟動(dòng)的是StandaloneDispatcher。?
那么這個(gè)Dispatcher又做了什么呢?它啟動(dòng)了一個(gè)JobManagerRunner(這里我要吐槽Flink的命名,這個(gè)東西應(yīng)該叫做JobMasterRunner才對(duì),flink里的JobMaster和JobManager不是一個(gè)東西),委托JobManagerRunner去啟動(dòng)該Job的JobMaster。我們看一下對(duì)應(yīng)的代碼:
//jobManagerRunner.javaprivate void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception {......final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);......}?
然后,JobMaster經(jīng)過(guò)了一堆方法嵌套之后,執(zhí)行到了這里:
private void scheduleExecutionGraph() {checkState(jobStatusListener == null);// register self as job status change listenerjobStatusListener = new JobManagerJobStatusListener();executionGraph.registerJobStatusListener(jobStatusListener);try {//這里調(diào)用了ExecutionGraph的啟動(dòng)方法executionGraph.scheduleForExecution();}catch (Throwable t) {executionGraph.failGlobal(t);}}?
我們知道,flink的框架里有三層圖結(jié)構(gòu),其中ExecutionGraph就是真正被執(zhí)行的那一層,所以到這里為止,一個(gè)任務(wù)從提交到真正執(zhí)行的流程就走完了,我們?cè)倩仡櫼幌?#xff08;順便提一下遠(yuǎn)程提交時(shí)的流程區(qū)別):
- 客戶端代碼的execute方法執(zhí)行;
- 本地環(huán)境下,MiniCluster完成了大部分任務(wù),直接把任務(wù)委派給了MiniDispatcher;
- 遠(yuǎn)程環(huán)境下,啟動(dòng)了一個(gè)RestClusterClient,這個(gè)類會(huì)以HTTP Rest的方式把用戶代碼提交到集群上;
- 遠(yuǎn)程環(huán)境下,請(qǐng)求發(fā)到集群上之后,必然有個(gè)handler去處理,在這里是JobSubmitHandler。這個(gè)類接手了請(qǐng)求后,委派StandaloneDispatcher啟動(dòng)job,到這里之后,本地提交和遠(yuǎn)程提交的邏輯往后又統(tǒng)一了;
- Dispatcher接手job之后,會(huì)實(shí)例化一個(gè)JobManagerRunner,然后用這個(gè)runner啟動(dòng)job;
- JobManagerRunner接下來(lái)把job交給了JobMaster去處理;
- JobMaster使用ExecutionGraph的方法啟動(dòng)了整個(gè)執(zhí)行圖;整個(gè)任務(wù)就啟動(dòng)起來(lái)了。
至此,第一部分就講完了。
2.理解flink的圖結(jié)構(gòu)
第一部分講到,我們的主函數(shù)最后一項(xiàng)任務(wù)就是生成StreamGraph,然后生成JobGraph,然后以此開(kāi)始調(diào)度任務(wù)運(yùn)行,所以接下來(lái)我們從這里入手,繼續(xù)探索flink。
2.1 flink的三層圖結(jié)構(gòu)
事實(shí)上,flink總共提供了三種圖的抽象,我們前面已經(jīng)提到了StreamGraph和JobGraph,還有一種是ExecutionGraph,是用于調(diào)度的基本數(shù)據(jù)結(jié)構(gòu)。?
?
上面這張圖清晰的給出了flink各個(gè)圖的工作原理和轉(zhuǎn)換過(guò)程。其中最后一個(gè)物理執(zhí)行圖并非flink的數(shù)據(jù)結(jié)構(gòu),而是程序開(kāi)始執(zhí)行后,各個(gè)task分布在不同的節(jié)點(diǎn)上,所形成的物理上的關(guān)系表示。
- 從JobGraph的圖里可以看到,數(shù)據(jù)從上一個(gè)operator流到下一個(gè)operator的過(guò)程中,上游作為生產(chǎn)者提供了IntermediateDataSet,而下游作為消費(fèi)者需要JobEdge。事實(shí)上,JobEdge是一個(gè)通信管道,連接了上游生產(chǎn)的dataset和下游的JobVertex節(jié)點(diǎn)。
- 在JobGraph轉(zhuǎn)換到ExecutionGraph的過(guò)程中,主要發(fā)生了以下轉(zhuǎn)變:?
- 加入了并行度的概念,成為真正可調(diào)度的圖結(jié)構(gòu)
- 生成了與JobVertex對(duì)應(yīng)的ExecutionJobVertex,ExecutionVertex,與IntermediateDataSet對(duì)應(yīng)的IntermediateResult和IntermediateResultPartition等,并行將通過(guò)這些類實(shí)現(xiàn)
- ExecutionGraph已經(jīng)可以用于調(diào)度任務(wù)。我們可以看到,flink根據(jù)該圖生成了一一對(duì)應(yīng)的Task,每個(gè)task對(duì)應(yīng)一個(gè)ExecutionGraph的一個(gè)Execution。Task用InputGate、InputChannel和ResultPartition對(duì)應(yīng)了上面圖中的IntermediateResult和ExecutionEdge。
那么,flink抽象出這三層圖結(jié)構(gòu),四層執(zhí)行邏輯的意義是什么呢??
StreamGraph是對(duì)用戶邏輯的映射。JobGraph在此基礎(chǔ)上進(jìn)行了一些優(yōu)化,比如把一部分操作串成chain以提高效率。ExecutionGraph是為了調(diào)度存在的,加入了并行處理的概念。而在此基礎(chǔ)上真正執(zhí)行的是Task及其相關(guān)結(jié)構(gòu)。
2.2 StreamGraph的生成
在第一節(jié)的算子注冊(cè)部分,我們可以看到,flink把每一個(gè)算子transform成一個(gè)對(duì)流的轉(zhuǎn)換(比如上文中返回的SingleOutputStreamOperator是一個(gè)DataStream的子類),并且注冊(cè)到執(zhí)行環(huán)境中,用于生成StreamGraph。實(shí)際生成StreamGraph的入口是StreamGraphGenerator.generate(env, transformations)?其中的transformations是一個(gè)list,里面記錄的就是我們?cè)趖ransform方法中放進(jìn)來(lái)的算子。
2.2.1 StreamTransformation類代表了流的轉(zhuǎn)換
StreamTransformation代表了從一個(gè)或多個(gè)DataStream生成新DataStream的操作。順便,DataStream類在內(nèi)部組合了一個(gè)StreamTransformation類,實(shí)際的轉(zhuǎn)換操作均通過(guò)該類完成。?
?
我們可以看到,從source到各種map,union再到sink操作全部被映射成了StreamTransformation。?
其映射過(guò)程如下所示:?
以MapFunction為例:
- 首先,用戶代碼里定義的UDF會(huì)被當(dāng)作其基類對(duì)待,然后交給StreamMap這個(gè)operator做進(jìn)一步包裝。事實(shí)上,每一個(gè)Transformation都對(duì)應(yīng)了一個(gè)StreamOperator。
- 由于map這個(gè)操作只接受一個(gè)輸入,所以再被進(jìn)一步包裝為OneInputTransformation。
-
最后,將該transformation注冊(cè)到執(zhí)行環(huán)境中,當(dāng)執(zhí)行上文提到的generate方法時(shí),生成StreamGraph圖結(jié)構(gòu)。
另外,并不是每一個(gè) StreamTransformation 都會(huì)轉(zhuǎn)換成runtime層中的物理操作。有一些只是邏輯概念,比如union、split/select、partition等。如下圖所示的轉(zhuǎn)換樹(shù),在運(yùn)行時(shí)會(huì)優(yōu)化成下方的操作圖。?
2.2.2 StreamGraph生成函數(shù)分析
我們從StreamGraphGenerator.generate()方法往下看:
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {return new StreamGraphGenerator(env).generateInternal(transformations);}//注意,StreamGraph的生成是從sink開(kāi)始的private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {for (StreamTransformation<?> transformation: transformations) {transform(transformation);}return streamGraph;}//這個(gè)方法的核心邏輯就是判斷傳入的steamOperator是哪種類型,并執(zhí)行相應(yīng)的操作,詳情見(jiàn)下面那一大堆if-elseprivate Collection<Integer> transform(StreamTransformation<?> transform) {if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}LOG.debug("Transforming " + transform);if (transform.getMaxParallelism() <= 0) {// if the max parallelism hasn't been set, then first use the job wide max parallelism// from theExecutionConfig.int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();if (globalMaxParallelismFromConfig > 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}// call at least once to trigger exceptions about MissingTypeInfotransform.getOutputType();Collection<Integer> transformedIds;//這里對(duì)操作符的類型進(jìn)行判斷,并以此調(diào)用相應(yīng)的處理邏輯.簡(jiǎn)而言之,處理的核心無(wú)非是遞歸的將該節(jié)點(diǎn)和節(jié)點(diǎn)的上游節(jié)點(diǎn)加入圖if (transform instanceof OneInputTransformation<?, ?>) {transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);} else if (transform instanceof SourceTransformation<?>) {transformedIds = transformSource((SourceTransformation<?>) transform);} else if (transform instanceof SinkTransformation<?>) {transformedIds = transformSink((SinkTransformation<?>) transform);} else if (transform instanceof UnionTransformation<?>) {transformedIds = transformUnion((UnionTransformation<?>) transform);} else if (transform instanceof SplitTransformation<?>) {transformedIds = transformSplit((SplitTransformation<?>) transform);} else if (transform instanceof SelectTransformation<?>) {transformedIds = transformSelect((SelectTransformation<?>) transform);} else if (transform instanceof FeedbackTransformation<?>) {transformedIds = transformFeedback((FeedbackTransformation<?>) transform);} else if (transform instanceof CoFeedbackTransformation<?>) {transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);} else if (transform instanceof PartitionTransformation<?>) {transformedIds = transformPartition((PartitionTransformation<?>) transform);} else if (transform instanceof SideOutputTransformation<?>) {transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);} else {throw new IllegalStateException("Unknown transformation: " + transform);}//注意這里和函數(shù)開(kāi)始時(shí)的方法相對(duì)應(yīng),在有向圖中要注意避免循環(huán)的產(chǎn)生// need this check because the iterate transformation adds itself before// transforming the feedback edgesif (!alreadyTransformed.containsKey(transform)) {alreadyTransformed.put(transform, transformedIds);}if (transform.getBufferTimeout() > 0) {streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());}if (transform.getUid() != null) {streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if (transform.getUserProvidedNodeHash() != null) {streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if (transform.getMinResources() != null && transform.getPreferredResources() != null) {streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}return transformedIds;}?
因?yàn)閙ap,filter等常用操作都是OneInputStreamOperator,我們就來(lái)看看transformOneInputTransform((OneInputTransformation<?, ?>) transform)方法。
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {Collection<Integer> inputIds = transform(transform.getInput());// 在遞歸處理節(jié)點(diǎn)過(guò)程中,某個(gè)節(jié)點(diǎn)可能已經(jīng)被其他子節(jié)點(diǎn)先處理過(guò)了,需要跳過(guò)if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}//這里是獲取slotSharingGroup。這個(gè)group用來(lái)定義當(dāng)前我們?cè)谔幚淼倪@個(gè)操作符可以跟什么操作符chain到一個(gè)slot里進(jìn)行操作//因?yàn)橛袝r(shí)候我們可能不滿意flink替我們做的chain聚合//一個(gè)slot就是一個(gè)執(zhí)行task的基本容器String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);//把該operator加入圖streamGraph.addOperator(transform.getId(),slotSharingGroup,transform.getOperator(),transform.getInputType(),transform.getOutputType(),transform.getName());//對(duì)于keyedStream,我們還要記錄它的keySelector方法//flink并不真正為每個(gè)keyedStream保存一個(gè)key,而是每次需要用到key的時(shí)候都使用keySelector方法進(jìn)行計(jì)算//因此,我們自定義的keySelector方法需要保證冪等性//到后面介紹keyGroup的時(shí)候我們還會(huì)再次提到這一點(diǎn)if (transform.getStateKeySelector() != null) {TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);}streamGraph.setParallelism(transform.getId(), transform.getParallelism());streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());//為當(dāng)前節(jié)點(diǎn)和它的依賴節(jié)點(diǎn)建立邊//這里可以看到之前提到的select union partition等邏輯節(jié)點(diǎn)被合并入edge的過(guò)程for (Integer inputId: inputIds) {streamGraph.addEdge(inputId, transform.getId(), 0);}return Collections.singleton(transform.getId());}public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,null,new ArrayList<String>(),null);}//addEdge的實(shí)現(xiàn),會(huì)合并一些邏輯節(jié)點(diǎn)private void addEdgeInternal(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,List<String> outputNames,OutputTag outputTag) {//如果輸入邊是側(cè)輸出節(jié)點(diǎn),則把side的輸入邊作為本節(jié)點(diǎn)的輸入邊,并遞歸調(diào)用if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;if (outputTag == null) {outputTag = virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);//如果輸入邊是select,則把select的輸入邊作為本節(jié)點(diǎn)的輸入邊} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualSelectNodes.get(virtualId).f0;if (outputNames.isEmpty()) {// selections that happen downstream override earlier selectionsoutputNames = virtualSelectNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);//如果是partition節(jié)點(diǎn)} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;if (partitioner == null) {partitioner = virtualPartitionNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else {//正常的edge處理邏輯StreamNode upstreamNode = getStreamNode(upStreamVertexID);StreamNode downstreamNode = getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner = new ForwardPartitioner<Object>();} else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();}if (partitioner instanceof ForwardPartitioner) {if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {throw new UnsupportedOperationException("Forward partitioning does not allow " +"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}}StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}}?
2.2.3 WordCount函數(shù)的StreamGraph
flink提供了一個(gè)StreamGraph可視化顯示工具,在這里?
我們可以把我們的程序的執(zhí)行計(jì)劃打印出來(lái)System.out.println(env.getExecutionPlan());?復(fù)制到這個(gè)網(wǎng)站上,點(diǎn)擊生成,如圖所示:?
?
可以看到,我們?cè)闯绦虮晦D(zhuǎn)化成了4個(gè)operator。?
另外,在operator之間的連線上也顯示出了flink添加的一些邏輯流程。由于我設(shè)定了每個(gè)操作符的并行度都是1,所以在每個(gè)操作符之間都是直接FORWARD,不存在shuffle的過(guò)程。
2.3 JobGraph的生成
flink會(huì)根據(jù)上一步生成的StreamGraph生成JobGraph,然后將JobGraph發(fā)送到server端進(jìn)行ExecutionGraph的解析。
2.3.1 JobGraph生成源碼
與StreamGraph類似,JobGraph的入口方法是StreamingJobGraphGenerator.createJobGraph()。我們直接來(lái)看源碼
private JobGraph createJobGraph() {// 設(shè)置啟動(dòng)模式為所有節(jié)點(diǎn)均在一開(kāi)始就啟動(dòng)jobGraph.setScheduleMode(ScheduleMode.EAGER);// 為每個(gè)節(jié)點(diǎn)生成hash idMap<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// 為了保持兼容性創(chuàng)建的hashList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();//生成jobvertex,串成chain等//這里的邏輯大致可以理解為,挨個(gè)遍歷節(jié)點(diǎn),如果該節(jié)點(diǎn)是一個(gè)chain的頭節(jié)點(diǎn),就生成一個(gè)JobVertex,如果不是頭節(jié)點(diǎn),就要把自身配置并入頭節(jié)點(diǎn),然后把頭節(jié)點(diǎn)和自己的出邊相連;對(duì)于不能chain的節(jié)點(diǎn),當(dāng)作只有頭節(jié)點(diǎn)處理即可setChaining(hashes, legacyHashes, chainedOperatorHashes);//設(shè)置輸入邊edgesetPhysicalEdges();//設(shè)置slot共享groupsetSlotSharing();//配置檢查點(diǎn)configureCheckpointing();// 如果有之前的緩存文件的配置的話,重新讀入for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());}// 傳遞執(zhí)行環(huán)境配置try {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());}catch (IOException e) {throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +"This indicates that non-serializable types (like custom serializers) were registered");}return jobGraph;}?
2.3.2 operator chain的邏輯
為了更高效地分布式執(zhí)行,Flink會(huì)盡可能地將operator的subtask鏈接(chain)在一起形成task。每個(gè)task在一個(gè)線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時(shí)提高整體的吞吐量。
?
上圖中將KeyAggregation和Sink兩個(gè)operator進(jìn)行了合并,因?yàn)檫@兩個(gè)合并后并不會(huì)改變整體的拓?fù)浣Y(jié)構(gòu)。但是,并不是任意兩個(gè) operator 就能 chain 一起的,其條件還是很苛刻的:
- 上下游的并行度一致
- 下游節(jié)點(diǎn)的入度為1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入)
- 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
- 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認(rèn)是ALWAYS)
- 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認(rèn)是HEAD)
- 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
- 用戶沒(méi)有禁用 chain
flink的chain邏輯是一種很常見(jiàn)的設(shè)計(jì),比如spring的interceptor也是類似的實(shí)現(xiàn)方式。通過(guò)把操作符串成一個(gè)大操作符,flink避免了把數(shù)據(jù)序列化后通過(guò)網(wǎng)絡(luò)發(fā)送給其他節(jié)點(diǎn)的開(kāi)銷,能夠大大增強(qiáng)效率。
2.3.3 JobGraph的提交
前面已經(jīng)提到,JobGraph的提交依賴于JobClient和JobManager之間的異步通信,如圖所示:?
?
在submitJobAndWait方法中,其首先會(huì)創(chuàng)建一個(gè)JobClientActor的ActorRef,然后向其發(fā)起一個(gè)SubmitJobAndWait消息,該消息將JobGraph的實(shí)例提交給JobClientActor。發(fā)起模式是ask,它表示需要一個(gè)應(yīng)答消息。
?
該SubmitJobAndWait消息被JobClientActor接收后,最終通過(guò)調(diào)用tryToSubmitJob方法觸發(fā)真正的提交動(dòng)作。當(dāng)JobManager的actor接收到來(lái)自client端的請(qǐng)求后,會(huì)執(zhí)行一個(gè)submitJob方法,主要做以下事情:
- 向BlobLibraryCacheManager注冊(cè)該Job;
- 構(gòu)建ExecutionGraph對(duì)象;
- 對(duì)JobGraph中的每個(gè)頂點(diǎn)進(jìn)行初始化;
- 將DAG拓?fù)渲袕膕ource開(kāi)始排序,排序后的頂點(diǎn)集合附加到Exec> - utionGraph對(duì)象;
- 獲取檢查點(diǎn)相關(guān)的配置,并將其設(shè)置到ExecutionGraph對(duì)象;
- 向ExecutionGraph注冊(cè)相關(guān)的listener;
- 執(zhí)行恢復(fù)操作或者將JobGraph信息寫入SubmittedJobGraphStore以在后續(xù)用于恢復(fù)目的;
- 響應(yīng)給客戶端JobSubmitSuccess消息;
- 對(duì)ExecutionGraph對(duì)象進(jìn)行調(diào)度執(zhí)行;
最后,JobManger會(huì)返回消息給JobClient,通知該任務(wù)是否提交成功。
2.4 ExecutionGraph的生成
與StreamGraph和JobGraph不同,ExecutionGraph并不是在我們的客戶端程序生成,而是在服務(wù)端(JobManager處)生成的,順便flink只維護(hù)一個(gè)JobManager。其入口代碼是ExecutionGraphBuilder.buildGraph(...)?
該方法長(zhǎng)200多行,其中一大半是checkpoiont的相關(guān)邏輯,我們暫且略過(guò),直接看核心方法executionGraph.attachJobGraph(sortedTopology)?
因?yàn)镋xecutionGraph事實(shí)上只是改動(dòng)了JobGraph的每個(gè)節(jié)點(diǎn),而沒(méi)有對(duì)整個(gè)拓?fù)浣Y(jié)構(gòu)進(jìn)行變動(dòng),所以代碼里只是挨個(gè)遍歷jobVertex并進(jìn)行處理:
?
至此,ExecutorGraph就創(chuàng)建完成了。
3. 任務(wù)的調(diào)度與執(zhí)行
關(guān)于flink的任務(wù)執(zhí)行架構(gòu),官網(wǎng)的這兩張圖就是最好的說(shuō)明:?
?
Flink 集群?jiǎn)?dòng)后,首先會(huì)啟動(dòng)一個(gè) JobManger 和多個(gè)的 TaskManager。用戶的代碼會(huì)由JobClient 提交給 JobManager,JobManager 再把來(lái)自不同用戶的任務(wù)發(fā)給 不同的TaskManager 去執(zhí)行,每個(gè)TaskManager管理著多個(gè)task,task是執(zhí)行計(jì)算的最小結(jié)構(gòu), TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager。TaskManager 之間以流的形式進(jìn)行數(shù)據(jù)的傳輸。上述除了task外的三者均為獨(dú)立的 JVM 進(jìn)程。?
要注意的是,TaskManager和job并非一一對(duì)應(yīng)的關(guān)系。flink調(diào)度的最小單元是task而非TaskManager,也就是說(shuō),來(lái)自不同job的不同task可能運(yùn)行于同一個(gè)TaskManager的不同線程上。?
?
一個(gè)flink任務(wù)所有可能的狀態(tài)如上圖所示。圖上畫的很明白,就不再贅述了。
3.1 計(jì)算資源的調(diào)度
Task slot是一個(gè)TaskManager內(nèi)資源分配的最小載體,代表了一個(gè)固定大小的資源子集,每個(gè)TaskManager會(huì)將其所占有的資源平分給它的slot。?
通過(guò)調(diào)整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的。每個(gè) TaskManager 有一個(gè)slot,也就意味著每個(gè)task運(yùn)行在獨(dú)立的 JVM 中。每個(gè) TaskManager 有多個(gè)slot的話,也就是說(shuō)多個(gè)task運(yùn)行在同一個(gè)JVM中。?
而在同一個(gè)JVM進(jìn)程中的task,可以共享TCP連接(基于多路復(fù)用)和心跳消息,可以減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸,也能共享一些數(shù)據(jù)結(jié)構(gòu),一定程度上減少了每個(gè)task的消耗。?
每個(gè)slot可以接受單個(gè)task,也可以接受多個(gè)連續(xù)task組成的pipeline,如下圖所示,FlatMap函數(shù)占用一個(gè)taskslot,而key Agg函數(shù)和sink函數(shù)共用一個(gè)taskslot:?
?
為了達(dá)到共用slot的目的,除了可以以chain的方式pipeline算子,我們還可以允許SlotSharingGroup,如下圖所示:?
?
我們可以把不能被chain成一條的兩個(gè)操作如flatmap和key&sink放在一個(gè)TaskSlot里執(zhí)行,這樣做可以獲得以下好處:
- 共用slot使得我們不再需要計(jì)算每個(gè)任務(wù)需要的總task數(shù)目,直接取最高算子的并行度即可
- 對(duì)計(jì)算資源的利用率更高。例如,通常的輕量級(jí)操作map和重量級(jí)操作Aggregate不再分別需要一個(gè)線程,而是可以在同一個(gè)線程內(nèi)執(zhí)行,而且對(duì)于slot有限的場(chǎng)景,我們可以增大每個(gè)task的并行度了。?
接下來(lái)我們還是用官網(wǎng)的圖來(lái)說(shuō)明flink是如何重用slot的:? - TaskManager1分配一個(gè)SharedSlot0
- 把source task放入一個(gè)SimpleSlot0,再把該slot放入SharedSlot0
- 把flatmap task放入一個(gè)SimpleSlot1,再把該slot放入SharedSlot0
- 因?yàn)槲覀兊膄latmap task并行度是2,因此不能再放入SharedSlot0,所以向TaskMange21申請(qǐng)了一個(gè)新的SharedSlot0
- 把第二個(gè)flatmap task放進(jìn)一個(gè)新的SimpleSlot,并放進(jìn)TaskManager2的SharedSlot0
- 開(kāi)始處理key&sink task,因?yàn)槠洳⑿卸纫彩?,所以先把第一個(gè)task放進(jìn)TaskManager1的SharedSlot
- 把第二個(gè)key&sink放進(jìn)TaskManager2的SharedSlot
3.2 JobManager執(zhí)行job
JobManager負(fù)責(zé)接收 flink 的作業(yè),調(diào)度 task,收集 job 的狀態(tài)、管理 TaskManagers。被實(shí)現(xiàn)為一個(gè) akka actor。
3.2.1 JobManager的組件
- BlobServer 是一個(gè)用來(lái)管理二進(jìn)制大文件的服務(wù),比如保存用戶上傳的jar文件,該服務(wù)會(huì)將其寫到磁盤上。還有一些相關(guān)的類,如BlobCache,用于TaskManager向JobManager下載用戶的jar文件
- InstanceManager 用來(lái)管理當(dāng)前存活的TaskManager的組件,記錄了TaskManager的心跳信息等
- CompletedCheckpointStore 用于保存已完成的checkpoint相關(guān)信息,持久化到內(nèi)存中或者zookeeper上
- MemoryArchivist 保存了已經(jīng)提交到flink的作業(yè)的相關(guān)信息,如JobGraph等
3.2.2 JobManager的啟動(dòng)過(guò)程
先列出JobManager啟動(dòng)的核心代碼
def runJobManager(configuration: Configuration,executionMode: JobManagerMode,listeningAddress: String,listeningPort: Int): Unit = {val numberProcessors = Hardware.getNumberCPUCores()val futureExecutor = Executors.newScheduledThreadPool(numberProcessors,new ExecutorThreadFactory("jobmanager-future"))val ioExecutor = Executors.newFixedThreadPool(numberProcessors,new ExecutorThreadFactory("jobmanager-io"))val timeout = AkkaUtils.getTimeout(configuration)// we have to first start the JobManager ActorSystem because this determines the port if 0// was chosen before. The method startActorSystem will update the configuration correspondingly.val jobManagerSystem = startActorSystem(configuration,listeningAddress,listeningPort)val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,ioExecutor,AddressResolution.NO_ADDRESS_RESOLUTION)val metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration))metricRegistry.startQueryService(jobManagerSystem, null)val (_, _, webMonitorOption, _) = try {startJobManagerActors(jobManagerSystem,configuration,executionMode,listeningAddress,futureExecutor,ioExecutor,highAvailabilityServices,metricRegistry,classOf[JobManager],classOf[MemoryArchivist],Option(classOf[StandaloneResourceManager]))} catch {case t: Throwable =>futureExecutor.shutdownNow()ioExecutor.shutdownNow()throw t}// block until everything is shut downjobManagerSystem.awaitTermination()....... }?
- 配置Akka并生成ActorSystem,啟動(dòng)JobManager
- 啟動(dòng)HA和metric相關(guān)服務(wù)
- 在startJobManagerActors()方法中啟動(dòng)JobManagerActors,以及webserver,TaskManagerActor,ResourceManager等等
- 阻塞等待終止
- 集群通過(guò)LeaderService等選出JobManager的leader
3.2.3 JobManager啟動(dòng)Task
JobManager 是一個(gè)Actor,通過(guò)各種消息來(lái)完成核心邏輯:
override def handleMessage: Receive = {case GrantLeadership(newLeaderSessionID) =>log.info(s"JobManager $getAddress was granted leadership with leader session ID " +s"$newLeaderSessionID.")leaderSessionID = newLeaderSessionID.......?
有幾個(gè)比較重要的消息:
- GrantLeadership 獲得leader授權(quán),將自身被分發(fā)到的 session id 寫到 zookeeper,并恢復(fù)所有的 jobs
- RevokeLeadership 剝奪leader授權(quán),打斷清空所有的 job 信息,但是保留作業(yè)緩存,注銷所有的 TaskManagers
- RegisterTaskManagers 注冊(cè) TaskManager,如果之前已經(jīng)注冊(cè)過(guò),則只給對(duì)應(yīng)的 Instance 發(fā)送消息,否則啟動(dòng)注冊(cè)邏輯:在 InstanceManager 中注冊(cè)該 Instance 的信息,并停止 Instance BlobLibraryCacheManager 的端口【供下載 lib 包用】,同時(shí)使用 watch 監(jiān)聽(tīng) task manager 的存活
- SubmitJob 提交 jobGraph?
最后一項(xiàng)SubmintJob就是我們要關(guān)注的,從客戶端收到JobGraph,轉(zhuǎn)換為ExecutionGraph并執(zhí)行的過(guò)程。
?
首先做一些準(zhǔn)備工作,然后獲取一個(gè)ExecutionGraph,判斷是否是恢復(fù)的job,然后將job保存下來(lái),并且通知客戶端本地已經(jīng)提交成功了,最后如果確認(rèn)本JobManager是leader,則執(zhí)行executionGraph.scheduleForExecution()方法,這個(gè)方法經(jīng)過(guò)一系列調(diào)用,把每個(gè)ExecutionVertex傳遞給了Excution類的deploy方法:
public void deploy() throws JobException {......try {// good, we are allowed to deployif (!slot.setExecutedVertex(this)) {throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);}// race double check, did we fail/cancel and do we need to release the slot?if (this.state != DEPLOYING) {slot.releaseSlot();return;}if (LOG.isInfoEnabled()) {LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),attemptNumber, getAssignedResourceLocation().getHostname()));}final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId,slot,taskState,attemptNumber);final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);......}catch (Throwable t) {markFailed(t);ExceptionUtils.rethrow(t);}}?
我們首先生成了一個(gè)TaskDeploymentDescriptor,然后交給了taskManagerGateway.submitTask()方法執(zhí)行。接下來(lái)的部分,就屬于TaskManager的范疇了。
3.3 TaskManager執(zhí)行task
3.3.1 TaskManager的基本組件
TaskManager是flink中資源管理的基本組件,是所有執(zhí)行任務(wù)的基本容器,提供了內(nèi)存管理、IO管理、通信管理等一系列功能,本節(jié)對(duì)各個(gè)模塊進(jìn)行簡(jiǎn)要介紹。?
1. MemoryManager flink并沒(méi)有把所有內(nèi)存的管理都委托給JVM,因?yàn)镴VM普遍存在著存儲(chǔ)對(duì)象密度低、大內(nèi)存時(shí)GC對(duì)系統(tǒng)影響大等問(wèn)題。所以flink自己抽象了一套內(nèi)存管理機(jī)制,將所有對(duì)象序列化后放在自己的MemorySegment上進(jìn)行管理。MemoryManger涉及內(nèi)容較多,將在后續(xù)章節(jié)進(jìn)行繼續(xù)剖析。?
2. IOManager flink通過(guò)IOManager管理磁盤IO的過(guò)程,提供了同步和異步兩種寫模式,又進(jìn)一步區(qū)分了block、buffer和bulk三種讀寫方式。?
IOManager提供了兩種方式枚舉磁盤文件,一種是直接遍歷文件夾下所有文件,另一種是計(jì)數(shù)器方式,對(duì)每個(gè)文件名以遞增順序訪問(wèn)。?
在底層,flink將文件IO抽象為FileIOChannle,封裝了底層實(shí)現(xiàn)。?
?
可以看到,flink在底層實(shí)際上都是以異步的方式進(jìn)行讀寫。?
3. NetworkEnvironment 是TaskManager的網(wǎng)絡(luò) IO 組件,包含了追蹤中間結(jié)果和數(shù)據(jù)交換的數(shù)據(jù)結(jié)構(gòu)。它的構(gòu)造器會(huì)統(tǒng)一將配置的內(nèi)存先分配出來(lái),抽象成 NetworkBufferPool 統(tǒng)一管理內(nèi)存的申請(qǐng)和釋放。意思是說(shuō),在輸入和輸出數(shù)據(jù)時(shí),不管是保留在本地內(nèi)存,等待chain在一起的下個(gè)操作符進(jìn)行處理,還是通過(guò)網(wǎng)絡(luò)把本操作符的計(jì)算結(jié)果發(fā)送出去,都被抽象成了NetworkBufferPool。后續(xù)我們還將對(duì)這個(gè)組件進(jìn)行詳細(xì)分析。
3.3.2 TaskManager執(zhí)行Task
對(duì)于TM來(lái)說(shuō),執(zhí)行task就是把收到的TaskDeploymentDescriptor對(duì)象轉(zhuǎn)換成一個(gè)task并執(zhí)行的過(guò)程。TaskDeploymentDescriptor這個(gè)類保存了task執(zhí)行所必須的所有內(nèi)容,例如序列化的算子,輸入的InputGate和輸出的ResultPartition的定義,該task要作為幾個(gè)subtask執(zhí)行等等。?
按照正常邏輯思維,很容易想到TM的submitTask方法的行為:首先是確認(rèn)資源,如尋找JobManager和Blob,而后建立連接,解序列化算子,收集task相關(guān)信息,接下來(lái)就是創(chuàng)建一個(gè)新的Task對(duì)象,這個(gè)task對(duì)象就是真正執(zhí)行任務(wù)的關(guān)鍵所在。
?
如果讀者是從頭開(kāi)始看這篇blog,里面有很多對(duì)象應(yīng)該已經(jīng)比較明確其作用了(除了那個(gè)brVarManager,這個(gè)是管理廣播變量的,廣播變量是一類會(huì)被分發(fā)到每個(gè)任務(wù)中的共享變量)。接下來(lái)的主要任務(wù),就是把這個(gè)task啟動(dòng)起來(lái),然后報(bào)告說(shuō)已經(jīng)啟動(dòng)task了:
// all good, we kick off the task, which performs its own initialization task.startTaskThread()sender ! decorateMessage(Acknowledge.get())?
3.3.2.1 生成Task對(duì)象
在執(zhí)行new Task()方法時(shí),第一步是把構(gòu)造函數(shù)里的這些變量賦值給當(dāng)前task的fields。?
接下來(lái)是初始化ResultPartition和InputGate。這兩個(gè)類描述了task的輸出數(shù)據(jù)和輸入數(shù)據(jù)。
?
最后,創(chuàng)建一個(gè)Thread對(duì)象,并把自己放進(jìn)該對(duì)象,這樣在執(zhí)行時(shí),自己就有了自身的線程的引用。
3.3.2.2 運(yùn)行Task對(duì)象
Task對(duì)象本身就是一個(gè)Runable,因此在其run方法里定義了運(yùn)行邏輯。?
第一步是切換Task的狀態(tài):
?
接下來(lái),就是導(dǎo)入用戶類加載器并加載用戶代碼。?
然后,是向網(wǎng)絡(luò)管理器注冊(cè)當(dāng)前任務(wù)(flink的各個(gè)算子在運(yùn)行時(shí)進(jìn)行數(shù)據(jù)交換需要依賴網(wǎng)絡(luò)管理器),分配一些緩存以保存數(shù)據(jù)?
然后,讀入指定的緩存文件。?
然后,再把task創(chuàng)建時(shí)傳入的那一大堆變量用于創(chuàng)建一個(gè)執(zhí)行環(huán)境Envrionment。?
再然后,對(duì)于那些并不是第一次執(zhí)行的task(比如失敗后重啟的)要恢復(fù)其狀態(tài)。?
接下來(lái)最重要的是
方法。為什么這么說(shuō)呢,因?yàn)檫@個(gè)方法就是用戶代碼所真正被執(zhí)行的入口。比如我們寫的什么new MapFunction()的邏輯,最終就是在這里被執(zhí)行的。這里說(shuō)一下這個(gè)invokable,這是一個(gè)抽象類,提供了可以被TaskManager執(zhí)行的對(duì)象的基本抽象。?
這個(gè)invokable是在解析JobGraph的時(shí)候生成相關(guān)信息的,并在此處形成真正可執(zhí)行的對(duì)象
?
?
上圖顯示了flink提供的可被執(zhí)行的Task類型。從名字上就可以看出各個(gè)task的作用,在此不再贅述。?
接下來(lái)就是invoke方法了,因?yàn)槲覀兊膚ordcount例子用了流式api,在此我們以StreamTask的invoke方法為例進(jìn)行說(shuō)明。
3.3.2.3 StreamTask的執(zhí)行邏輯
先上部分核心代碼:
public final void invoke() throws Exception {boolean disposed = false;try {// -------- Initialize ---------//先做一些賦值操作......// if the clock is not already set, then assign a default TimeServiceProvider//處理timerif (timerService == null) {ThreadFactory timerThreadFactory =new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);}//把之前JobGraph串起來(lái)的chain的信息形成實(shí)現(xiàn)operatorChain = new OperatorChain<>(this);headOperator = operatorChain.getHeadOperator();// task specific initialization//這個(gè)init操作的起名非常詭異,因?yàn)檫@里主要是處理算子采用了自定義的checkpoint檢查機(jī)制的情況,但是起了一個(gè)非常大眾臉的名字init();// save the work of reloading state, etc, if the task is already canceledif (canceled) {throw new CancelTaskException();}// -------- Invoke --------LOG.debug("Invoking {}", getName());// we need to make sure that any triggers scheduled in open() cannot be// executed before all operators are openedsynchronized (lock) {// both the following operations are protected by the lock// so that we avoid race conditions in the case that initializeState()// registers a timer, that fires before the open() is called.//初始化操作符狀態(tài),主要是一些state啥的initializeState();//對(duì)于富操作符,執(zhí)行其open操作openAllOperators();}// final check to exit early before starting to runf (canceled) {throw new CancelTaskException();}// let the task do its work//真正開(kāi)始執(zhí)行的代碼isRunning = true;run();?
StreamTask.invoke()方法里,第一個(gè)值得一說(shuō)的是TimerService。Flink在2015年決定向StreamTask類加入timer service的時(shí)候解釋到:
This integrates the timer as a service in StreamTask that StreamOperators can use by calling a method on the StreamingRuntimeContext. This also ensures that the timer callbacks can not be called concurrently with other methods on the StreamOperator. This behaviour is ensured by an ITCase.
第二個(gè)要注意的是chain操作。前面提到了,flink會(huì)出于優(yōu)化的角度,把一些算子chain成一個(gè)整體的算子作為一個(gè)task來(lái)執(zhí)行。比如wordcount例子中,Source和FlatMap算子就被chain在了一起。在進(jìn)行chain操作的時(shí)候,會(huì)設(shè)定頭節(jié)點(diǎn),并且指定輸出的RecordWriter。
接下來(lái)不出所料仍然是初始化,只不過(guò)初始化的對(duì)象變成了各個(gè)operator。如果是有checkpoint的,那就從state信息里恢復(fù),不然就作為全新的算子處理。從源碼中可以看到,flink針對(duì)keyed算子和普通算子做了不同的處理。keyed算子在初始化時(shí)需要計(jì)算出一個(gè)group區(qū)間,這個(gè)區(qū)間的值在整個(gè)生命周期里都不會(huì)再變化,后面key就會(huì)根據(jù)hash的不同結(jié)果,分配到特定的group中去計(jì)算。順便提一句,flink的keyed算子保存的是對(duì)每個(gè)數(shù)據(jù)的key的計(jì)算方法,而非真實(shí)的key,用戶需要自己保證對(duì)每一行數(shù)據(jù)提供的keySelector的冪等性。至于為什么要用KeyGroup的設(shè)計(jì),這就牽扯到擴(kuò)容的范疇了,將在后面的章節(jié)進(jìn)行講述。?
對(duì)于openAllOperators()方法,就是對(duì)各種RichOperator執(zhí)行其open方法,通常可用于在執(zhí)行計(jì)算之前加載資源。?
最后,run方法千呼萬(wàn)喚始出來(lái),該方法經(jīng)過(guò)一系列跳轉(zhuǎn),最終調(diào)用chain上的第一個(gè)算子的run方法。在wordcount的例子中,它最終調(diào)用了SocketTextStreamFunction的run,建立socket連接并讀入文本。
3.4 StreamTask與StreamOperator
前面提到,Task對(duì)象在執(zhí)行過(guò)程中,把執(zhí)行的任務(wù)交給了StreamTask這個(gè)類去執(zhí)行。在我們的wordcount例子中,實(shí)際初始化的是OneInputStreamTask的對(duì)象(參考上面的類圖)。那么這個(gè)對(duì)象是如何執(zhí)行用戶的代碼的呢?
protected void run() throws Exception {// cache processor reference on the stack, to make the code more JIT friendlyfinal StreamInputProcessor<IN> inputProcessor = this.inputProcessor;while (running && inputProcessor.processInput()) {// all the work happens in the "processInput" method}}?
它做的,就是把任務(wù)直接交給了InputProcessor去執(zhí)行processInput方法。這是一個(gè)StreamInputProcessor的實(shí)例,該processor的任務(wù)就是處理輸入的數(shù)據(jù),包括用戶數(shù)據(jù)、watermark和checkpoint數(shù)據(jù)等。我們先來(lái)看看這個(gè)processor是如何產(chǎn)生的:
public void init() throws Exception {StreamConfig configuration = getConfiguration();TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());int numberOfInputs = configuration.getNumberOfInputs();if (numberOfInputs > 0) {InputGate[] inputGates = getEnvironment().getAllInputGates();inputProcessor = new StreamInputProcessor<>(inputGates,inSerializer,this,configuration.getCheckpointMode(),getCheckpointLock(),getEnvironment().getIOManager(),getEnvironment().getTaskManagerInfo().getConfiguration(),getStreamStatusMaintainer(),this.headOperator);// make sure that stream tasks report their I/O statisticsinputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());}}?
這是OneInputStreamTask的init方法,從configs里面獲取StreamOperator信息,生成自己的inputProcessor。那么inputProcessor是如何處理數(shù)據(jù)的呢?我們接著跟進(jìn)源碼:
public boolean processInput() throws Exception {if (isFinished) {return false;}if (numRecordsIn == null) {numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();}//這個(gè)while是用來(lái)處理單個(gè)元素的(不要想當(dāng)然以為是循環(huán)處理元素的)while (true) {//注意 1在下面//2.接下來(lái),會(huì)利用這個(gè)反序列化器得到下一個(gè)數(shù)據(jù)記錄,并進(jìn)行解析(是用戶數(shù)據(jù)還是watermark等等),然后進(jìn)行對(duì)應(yīng)的操作if (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycle();currentRecordDeserializer = null;}if (result.isFullRecord()) {StreamElement recordOrMark = deserializationDelegate.getInstance();//如果元素是watermark,就準(zhǔn)備更新當(dāng)前channel的watermark值(并不是簡(jiǎn)單賦值,因?yàn)橛衼y序存在),if (recordOrMark.isWatermark()) {// handle watermarkstatusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);continue;} else if (recordOrMark.isStreamStatus()) {//如果元素是status,就進(jìn)行相應(yīng)處理。可以看作是一個(gè)flag,標(biāo)志著當(dāng)前stream接下來(lái)即將沒(méi)有元素輸入(idle),或者當(dāng)前即將由空閑狀態(tài)轉(zhuǎn)為有元素狀態(tài)(active)。同時(shí),StreamStatus還對(duì)如何處理watermark有影響。通過(guò)發(fā)送status,上游的operator可以很方便的通知下游當(dāng)前的數(shù)據(jù)流的狀態(tài)。// handle stream statusstatusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);continue;} else if (recordOrMark.isLatencyMarker()) {//LatencyMarker是用來(lái)衡量代碼執(zhí)行時(shí)間的。在Source處創(chuàng)建,攜帶創(chuàng)建時(shí)的時(shí)間戳,流到Sink時(shí)就可以知道經(jīng)過(guò)了多長(zhǎng)時(shí)間// handle latency markersynchronized (lock) {streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());}continue;} else {//這里就是真正的,用戶的代碼即將被執(zhí)行的地方。從章節(jié)1到這里足足用了三萬(wàn)字,有點(diǎn)萬(wàn)里長(zhǎng)征的感覺(jué)// now we can do the actual processingStreamRecord<IN> record = recordOrMark.asRecord();synchronized (lock) {numRecordsIn.inc();streamOperator.setKeyContextElement1(record);streamOperator.processElement(record);}return true;}}}//1.程序首先獲取下一個(gè)buffer//這一段代碼是服務(wù)于flink的FaultTorrent機(jī)制的,后面我會(huì)講到,這里只需理解到它會(huì)嘗試獲取buffer,然后賦值給當(dāng)前的反序列化器final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if (bufferOrEvent != null) {if (bufferOrEvent.isBuffer()) {currentChannel = bufferOrEvent.getChannelIndex();currentRecordDeserializer = recordDeserializers[currentChannel];currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else {// Event receivedfinal AbstractEvent event = bufferOrEvent.getEvent();if (event.getClass() != EndOfPartitionEvent.class) {throw new IOException("Unexpected event: " + event);}}}else {isFinished = true;if (!barrierHandler.isEmpty()) {throw new IllegalStateException("Trailing data in checkpoint barrier handler.");}return false;}}}?
到此為止,以上部分就是一個(gè)flink程序啟動(dòng)后,到執(zhí)行用戶代碼之前,flink框架所做的準(zhǔn)備工作。回顧一下:
- 啟動(dòng)一個(gè)環(huán)境
- 生成StreamGraph
- 注冊(cè)和選舉JobManager
- 在各節(jié)點(diǎn)生成TaskManager,并根據(jù)JobGraph生成對(duì)應(yīng)的Task
- 啟動(dòng)各個(gè)task,準(zhǔn)備執(zhí)行代碼
接下來(lái),我們挑幾個(gè)Operator看看flink是如何抽象這些算子的。
4. StreamOperator的抽象與實(shí)現(xiàn)
4.1 數(shù)據(jù)源的邏輯——StreamSource與時(shí)間模型
StreamSource抽象了一個(gè)數(shù)據(jù)源,并且指定了一些如何處理數(shù)據(jù)的模式。
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {......public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {run(lockingObject, streamStatusMaintainer, output);}public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector) throws Exception {final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();LatencyMarksEmitter latencyEmitter = null;if (getExecutionConfig().isLatencyTrackingEnabled()) {latencyEmitter = new LatencyMarksEmitter<>(getProcessingTimeService(),collector,getExecutionConfig().getLatencyTrackingInterval(),getOperatorConfig().getVertexID(),getRuntimeContext().getIndexOfThisSubtask());}final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();this.ctx = StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);try {userFunction.run(ctx);// if we get here, then the user function either exited after being done (finite source)// or the function was canceled or stopped. For the finite source case, we should emit// a final watermark that indicates that we reached the end of event-timeif (!isCanceledOrStopped()) {ctx.emitWatermark(Watermark.MAX_WATERMARK);}} finally {// make sure that the context is closed in any casectx.close();if (latencyEmitter != null) {latencyEmitter.close();}}}......private static class LatencyMarksEmitter<OUT> {private final ScheduledFuture<?> latencyMarkTimer;public LatencyMarksEmitter(final ProcessingTimeService processingTimeService,final Output<StreamRecord<OUT>> output,long latencyTrackingInterval,final int vertexID,final int subtaskIndex) {latencyMarkTimer = processingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() {@Overridepublic void onProcessingTime(long timestamp) throws Exception {try {// ProcessingTimeService callbacks are executed under the checkpointing lockoutput.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));} catch (Throwable t) {// we catch the Throwables here so that we don't trigger the processing// timer services async exception handlerLOG.warn("Error while emitting latency marker.", t);}}},0L,latencyTrackingInterval);}public void close() {latencyMarkTimer.cancel(true);}} }?
在StreamSource生成上下文之后,接下來(lái)就是把上下文交給SourceFunction去執(zhí)行:
SourceFunction是對(duì)Function的一個(gè)抽象,就好像MapFunction,KeyByFunction一樣,用戶選擇實(shí)現(xiàn)這些函數(shù),然后flink框架就能利用這些函數(shù)進(jìn)行計(jì)算,完成用戶邏輯。?
我們的wordcount程序使用了flink提供的一個(gè)SocketTextStreamFunction。我們可以看一下它的實(shí)現(xiàn)邏輯,對(duì)source如何運(yùn)行有一個(gè)基本的認(rèn)識(shí):
?
整段代碼里,只有collect方法有些復(fù)雜度,后面我們?cè)谥v到flink的對(duì)象機(jī)制時(shí)會(huì)結(jié)合來(lái)講,此處知道collect方法會(huì)收集結(jié)果,然后發(fā)送給接收者即可。在我們的wordcount里,這個(gè)算子的接收者就是被chain在一起的flatmap算子,不記得這個(gè)示例程序的話,可以返回第一章去看一下。
4.2 從數(shù)據(jù)輸入到數(shù)據(jù)處理——OneInputStreamOperator & AbstractUdfStreamOperator
StreamSource是用來(lái)開(kāi)啟整個(gè)流的算子,而承接輸入數(shù)據(jù)并進(jìn)行處理的算子就是OneInputStreamOperator、TwoInputStreamOperator等。?
?
整個(gè)StreamOperator的繼承關(guān)系如上圖所示(圖很大,建議點(diǎn)開(kāi)放大看)。?
OneInputStreamOperator這個(gè)接口的邏輯很簡(jiǎn)單:
?
而實(shí)現(xiàn)了這個(gè)接口的StreamFlatMap算子也很簡(jiǎn)單,沒(méi)什么可說(shuō)的:
public class StreamFlatMap<IN, OUT>extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {private static final long serialVersionUID = 1L;private transient TimestampedCollector<OUT> collector;public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {super(flatMapper);chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {collector.setTimestamp(element);userFunction.flatMap(element.getValue(), collector);} }?
從類圖里可以看到,flink為我們封裝了一個(gè)算子的基類AbstractUdfStreamOperator,提供了一些通用功能,比如把context賦給算子,保存快照等等,其中最為大家了解的應(yīng)該是這兩個(gè):
@Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void close() throws Exception {super.close();functionsClosed = true;FunctionUtils.closeFunction(userFunction);}?
這兩個(gè)就是flink提供的Rich***Function系列算子的open和close方法被執(zhí)行的地方。
4.3 StreamSink
StreamSink著實(shí)沒(méi)什么可說(shuō)的,邏輯很簡(jiǎn)單,值得一提的只有兩個(gè)方法:
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {sinkContext.element = element;userFunction.invoke(element.getValue(), sinkContext);}@Overrideprotected void reportOrForwardLatencyMarker(LatencyMarker maker) {// all operators are tracking latenciesthis.latencyGauge.reportLatency(maker, true);// sinks don't forward latency markers}?
其中,processElement?是繼承自StreamOperator的方法。reportOrForwardLatencyMarker是用來(lái)計(jì)算延遲的,前面提到StreamSource會(huì)產(chǎn)生LateMarker,用于記錄數(shù)據(jù)計(jì)算時(shí)間,就是在這里完成了計(jì)算。
算子這部分邏輯相對(duì)簡(jiǎn)單清晰,就講這么多吧。
5. 為執(zhí)行保駕護(hù)航——Fault Tolerant與保證Exactly-Once語(yǔ)義
5.1 Fault Tolerant演進(jìn)之路
對(duì)于7×24小時(shí)不間斷運(yùn)行的流程序來(lái)說(shuō),要保證fault tolerant是很難的,這不像是離線任務(wù),如果失敗了只需要清空已有結(jié)果,重新跑一次就可以了。對(duì)于流任務(wù),如果要保證能夠重新處理已處理過(guò)的數(shù)據(jù),就要把數(shù)據(jù)保存下來(lái);而這就面臨著幾個(gè)問(wèn)題:比如一是保存多久的數(shù)據(jù)?二是重復(fù)計(jì)算的數(shù)據(jù)應(yīng)該怎么處理,怎么保證冪等性??
對(duì)于一個(gè)流系統(tǒng),我們有以下希望:
5.1.1 Storm的Record acknowledgement模式
storm的fault tolerant是這樣工作的:每一個(gè)被storm的operator處理的數(shù)據(jù)都會(huì)向其上一個(gè)operator發(fā)送一份應(yīng)答消息,通知其已被下游處理。storm的源operator保存了所有已發(fā)送的消息的每一個(gè)下游算子的應(yīng)答消息,當(dāng)它收到來(lái)自sink的應(yīng)答時(shí),它就知道該消息已經(jīng)被完整處理,可以移除了。?
如果沒(méi)有收到應(yīng)答,storm就會(huì)重發(fā)該消息。顯而易見(jiàn),這是一種at least once的邏輯。另外,這種方式面臨著嚴(yán)重的冪等性問(wèn)題,例如對(duì)一個(gè)count算子,如果count的下游算子出錯(cuò),source重發(fā)該消息,那么防止該消息被count兩遍的邏輯需要程序員自己去實(shí)現(xiàn)。最后,這樣一種處理方式非常低效,吞吐量很低。
5.1.2 Spark streaming的micro batch模式
前面提到,storm的實(shí)現(xiàn)方式就注定了與高吞吐量無(wú)緣。那么,為了提高吞吐量,把一批數(shù)據(jù)聚集在一起處理就是很自然的選擇。Spark Streaming的實(shí)現(xiàn)就是基于這樣的思路:?
我們可以在完全的連續(xù)計(jì)算與完全的分批計(jì)算中間取折中,通過(guò)控制每批計(jì)算數(shù)據(jù)的大小來(lái)控制延遲與吞吐量的制約,如果想要低延遲,就用小一點(diǎn)的batch,如果想要大吞吐量,就不得不忍受更高的延遲(更久的等待數(shù)據(jù)到來(lái)的時(shí)間和更多的計(jì)算),如下圖所示。?
?
以這樣的方式,可以在每個(gè)batch中做到exactly-once,但是這種方式也有其弊端:?
首先,batch的方式使得一些需要跨batch的操作變得非常困難,例如session window;用戶不得不自己想辦法去實(shí)現(xiàn)相關(guān)邏輯。?
其次,batch模式很難做好背壓。當(dāng)一個(gè)batch因?yàn)榉N種原因處理慢了,那么下一個(gè)batch要么不得不容納更多的新來(lái)數(shù)據(jù),要么不得不堆積更多的batch,整個(gè)任務(wù)可能會(huì)被拖垮,這是一個(gè)非常致命的問(wèn)題。?
最后,batch的方式基本意味著其延遲是有比較高的下限的,實(shí)時(shí)性上不好。
5.1.3 Google Cloud Dataflow的事務(wù)式模型
我們?cè)趥鹘y(tǒng)數(shù)據(jù)庫(kù),如mysql中使用binlog來(lái)完成事務(wù),這樣的思路也可以被用在實(shí)現(xiàn)exactly-once模型中。例如,我們可以log下每個(gè)數(shù)據(jù)元素每一次被處理時(shí)的結(jié)果和當(dāng)時(shí)所處的操作符的狀態(tài)。這樣,當(dāng)我們需要fault tolerant時(shí),我們只需要讀一下log就可以了。這種模式規(guī)避了storm和spark所面臨的問(wèn)題,并且能夠很好的實(shí)現(xiàn)exactly-once,唯一的弊端是:如何盡可能的減少log的成本?Flink給了我們答案。
5.1.4 Flink的分布式快照機(jī)制
實(shí)現(xiàn)exactly-once的關(guān)鍵是什么?是能夠準(zhǔn)確的知道和快速記錄下來(lái)當(dāng)前的operator的狀態(tài)、當(dāng)前正在處理的元素(以及正處在不同算子之間傳遞的元素)。如果上面這些可以做到,那么fault tolerant無(wú)非就是從持久化存儲(chǔ)中讀取上次記錄的這些元信息,并且恢復(fù)到程序中。那么Flink是如何實(shí)現(xiàn)的呢?
Flink的分布式快照的核心是其輕量級(jí)異步分布式快照機(jī)制。為了實(shí)現(xiàn)這一機(jī)制,flink引入了一個(gè)概念,叫做Barrier。Barrier是一種標(biāo)記,它被source產(chǎn)生并且插入到流數(shù)據(jù)中,被發(fā)送到下游節(jié)點(diǎn)。當(dāng)下游節(jié)點(diǎn)處理到該barrier標(biāo)志時(shí),這就意味著在該barrier插入到流數(shù)據(jù)時(shí),已經(jīng)進(jìn)入系統(tǒng)的數(shù)據(jù)在當(dāng)前節(jié)點(diǎn)已經(jīng)被處理完畢。?
如圖所示,每當(dāng)一個(gè)barrier流過(guò)一個(gè)算子節(jié)點(diǎn)時(shí),就說(shuō)明了在該算子上,可以觸發(fā)一次檢查點(diǎn),用以保存當(dāng)前節(jié)點(diǎn)的狀態(tài)和已經(jīng)處理過(guò)的數(shù)據(jù),這就是一份快照。(在這里可以聯(lián)想一下micro-batch,把barrier想象成分割每個(gè)batch的邏輯,會(huì)好理解一點(diǎn))這樣的方式下,記錄快照就像和前面提到的micro-batch一樣容易。
與此同時(shí),該算子會(huì)向下游發(fā)送該barrier。因?yàn)閿?shù)據(jù)在算子之間是按順序發(fā)送的,所以當(dāng)下游節(jié)點(diǎn)收到該barrier時(shí),也就意味著同樣的一批數(shù)據(jù)在下游節(jié)點(diǎn)上也處理完畢,可以進(jìn)行一次checkpoint,保存基于該節(jié)點(diǎn)的一份快照,快照完成后,會(huì)通知JobMananger自己完成了這個(gè)快照。這就是分布式快照的基本含義。
再看這張圖:?
?
有時(shí),有的算子的上游節(jié)點(diǎn)和下游節(jié)點(diǎn)都不止一個(gè),應(yīng)該怎么處理呢?如果有不止一個(gè)下游節(jié)點(diǎn),就向每個(gè)下游發(fā)送barrier。同理,如果有不止一個(gè)上游節(jié)點(diǎn),那么就要等到所有上游節(jié)點(diǎn)的同一批次的barrier到達(dá)之后,才能觸發(fā)checkpoint。因?yàn)槊總€(gè)節(jié)點(diǎn)運(yùn)算速度不同,所以有的上游節(jié)點(diǎn)可能已經(jīng)在發(fā)下個(gè)barrier周期的數(shù)據(jù)了,有的上游節(jié)點(diǎn)還沒(méi)發(fā)送本次的barrier,這時(shí)候,當(dāng)前算子就要緩存一下提前到來(lái)的數(shù)據(jù),等比較慢的上游節(jié)點(diǎn)發(fā)送barrier之后,才能處理下一批數(shù)據(jù)。
當(dāng)整個(gè)程序的最后一個(gè)算子sink都收到了這個(gè)barrier,也就意味著這個(gè)barrier和上個(gè)barrier之間所夾雜的這批元素已經(jīng)全部落袋為安。這時(shí),最后一個(gè)算子通知JobManager整個(gè)流程已經(jīng)完成,而JobManager隨后發(fā)出通知,要求所有算子刪除本次快照內(nèi)容,以完成清理。這整個(gè)部分,就是Flink的兩階段提交的checkpoint過(guò)程,如下面四幅圖所示:?
總之,通過(guò)這種方式,flink實(shí)現(xiàn)了我們前面提到的六項(xiàng)對(duì)流處理框架的要求:exactly-once、低延遲、高吞吐、易用的模型、方便的恢復(fù)機(jī)制。
最后,貼一個(gè)美團(tuán)做的flink與storm的性能對(duì)比:flink與storm的性能對(duì)比
5.2 checkpoint的生命周期
接下來(lái),我們結(jié)合源碼來(lái)看看flink的checkpoint到底是如何實(shí)現(xiàn)其生命周期的:
由于flink提供的SocketSource并不支持checkpoint,所以這里我以FlinkKafkaConsumer010作為sourceFunction。
5.2.1 觸發(fā)checkpoint
要完成一次checkpoint,第一步必然是發(fā)起checkpoint請(qǐng)求。那么,這個(gè)請(qǐng)求是哪里發(fā)出的,怎么發(fā)出的,又由誰(shuí)控制呢??
還記得如果我們要設(shè)置checkpoint的話,需要指定checkpoint間隔吧?既然是一個(gè)指定間隔觸發(fā)的功能,那應(yīng)該會(huì)有類似于Scheduler的東西存在,flink里,這個(gè)負(fù)責(zé)觸發(fā)checkpoint的類是CheckpointCoordinator。
flink在提交job時(shí),會(huì)啟動(dòng)這個(gè)類的startCheckpointScheduler方法,如下所示
public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();periodicScheduling = true;currentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(), baseInterval, baseInterval, TimeUnit.MILLISECONDS);}}private final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(System.currentTimeMillis(), true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint.", e);}}}?
啟動(dòng)之后,就會(huì)以設(shè)定好的頻率調(diào)用triggerCheckPoint()方法。這個(gè)方法太長(zhǎng),我大概說(shuō)一下都做了什么:
- 檢查符合觸發(fā)checkpoint的條件,例如如果禁止了周期性的checkpoint,尚未達(dá)到觸發(fā)checkpoint的最小間隔等等,就直接return
- 檢查是否所有需要checkpoint和需要響應(yīng)checkpoint的ACK(ack涉及到checkpoint的兩階段提交,后面會(huì)講)的task都處于running狀態(tài),否則return
- 如果都符合,那么執(zhí)行checkpointID = checkpointIdCounter.getAndIncrement();以生成一個(gè)新的id,然后生成一個(gè)PendingCheckpoint。PendingCheckpoint是一個(gè)啟動(dòng)了的checkpoint,但是還沒(méi)有被確認(rèn)。等到所有的task都確認(rèn)了本次checkpoint,那么這個(gè)checkpoint對(duì)象將轉(zhuǎn)化為一個(gè)CompletedCheckpoint。
- 定義一個(gè)超時(shí)callback,如果checkpoint執(zhí)行了很久還沒(méi)完成,就把它取消
- 觸發(fā)MasterHooks,用戶可以定義一些額外的操作,用以增強(qiáng)checkpoint的功能(如準(zhǔn)備和清理外部資源)
- 接下來(lái)是核心邏輯:
?
這里是調(diào)用了Execution的triggerCheckpoint方法,一個(gè)execution就是一個(gè)executionVertex的實(shí)際執(zhí)行者。我們看一下這個(gè)方法:
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {final LogicalSlot slot = assignedResource;if (slot != null) {//TaskManagerGateway是用來(lái)跟taskManager進(jìn)行通信的組件final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is " +"no longer running.");}}?
再往下跟就進(jìn)入了Task類的范疇,我們將在下一小節(jié)進(jìn)行解讀。本小節(jié)主要講了CheckpointCoordinator類是如何觸發(fā)一次checkpoint,從其名字也可以看出來(lái)其功能:檢查點(diǎn)協(xié)調(diào)器。
5.2.2 Task層面checkpoint的準(zhǔn)備工作
先說(shuō)Task類中的部分,該類創(chuàng)建了一個(gè)CheckpointMetaData的對(duì)象,并且生成了一個(gè)Runable匿名類用于執(zhí)行checkpoint,然后以異步的方式觸發(fā)了該Runable:
public void triggerCheckpointBarrier(final long checkpointID,long checkpointTimestamp,final CheckpointOptions checkpointOptions) {......Runnable runnable = new Runnable() {@Overridepublic void run() {// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try {boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if (!success) {checkpointResponder.declineCheckpoint(getJobID(), getExecutionId(), checkpointID,new CheckpointDeclineTaskNotReadyException(taskName));}}......}};executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));}}?
上面代碼里的invokable事實(shí)上就是我們的StreamTask了。Task類實(shí)際上是將checkpoint委托給了更具體的類去執(zhí)行,而StreamTask也將委托給更具體的類,直到業(yè)務(wù)代碼。?
StreamTask是這樣實(shí)現(xiàn)的:
- 如果task還在運(yùn)行,那就可以進(jìn)行checkpoint。方法是先向下游所有出口廣播一個(gè)Barrier,然后觸發(fā)本task的State保存。
- 如果task結(jié)束了,那我們就要通知下游取消本次checkpoint,方法是發(fā)送一個(gè)CancelCheckpointMarker,這是類似于Barrier的另一種消息。
- 注意,從這里開(kāi)始,整個(gè)執(zhí)行鏈路上開(kāi)始出現(xiàn)Barrier,可以和前面講Fault Tolerant原理的地方結(jié)合看一下。
?
完成broadcastCheckpointBarrier方法后,在checkpointState()方法中,StreamTask還做了很多別的工作:
public void executeCheckpointing() throws Exception {......try {//這里,就是調(diào)用StreamOperator進(jìn)行snapshotState的入口方法for (StreamOperator<?> op : allOperators) {checkpointStreamOperator(op);}// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submitAsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);//這里注冊(cè)了一個(gè)Runnable,在執(zhí)行完checkpoint之后向JobManager發(fā)出CompletedCheckPoint消息,這也是fault tolerant兩階段提交的一部分owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);......} }?
說(shuō)到checkpoint,我們印象里最直觀的感受肯定是我們的一些做聚合的操作符的狀態(tài)保存,比如sum的和以及count的值等等。這些內(nèi)容就是StreamOperator部分將要觸發(fā)保存的內(nèi)容。可以看到,除了我們直觀的這些操作符的狀態(tài)保存外,flink的checkpoint做了大量的其他工作。
接下來(lái),我們就把目光轉(zhuǎn)向操作符的checkpoint機(jī)制。
5.2.3 操作符的狀態(tài)保存及barrier傳遞
第四章時(shí),我們已經(jīng)了解了StreamOperator的類關(guān)系,這里,我們就直接接著上一節(jié)的checkpointStreamOperator(op)方法往下講。?
順便,前面也提到了,在進(jìn)行checkpoint之前,operator初始化時(shí),會(huì)執(zhí)行一個(gè)initializeState方法,在該方法中,如果task是從失敗中恢復(fù)的話,其保存的state也會(huì)被restore進(jìn)來(lái)。
傳遞barrier是在進(jìn)行本operator的statesnapshot之前完成的,我們先來(lái)看看其邏輯,其實(shí)和傳遞一條數(shù)據(jù)是類似的,就是生成一個(gè)CheckpointBarrier對(duì)象,然后向每個(gè)streamOutput寫進(jìn)去:
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {try {CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);for (RecordWriterOutput<?> streamOutput : streamOutputs) {streamOutput.broadcastEvent(barrier);}}catch (InterruptedException e) {throw new IOException("Interrupted while broadcasting checkpoint barrier");}}?
下游的operator接收到本barrier,就會(huì)觸發(fā)其自身的checkpoint。
StreamTask在執(zhí)行完broadcastCheckpointBarrier之后,?
我們當(dāng)前的wordcount程序里有兩個(gè)operator chain,分別是:
- kafka source -> flatmap
- keyed aggregation -> sink
我們就按這個(gè)順序來(lái)捋一下checkpoint的過(guò)程。
1.kafka source的checkpoint過(guò)程
public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call can happen// on this function at a time: either snapshotState() or notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call can happen// on this function at a time: either snapshotState() or notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}?
kafka的snapshot邏輯就是記錄一下當(dāng)前消費(fèi)的offsets,然后做成tuple(partitiion,offset)放進(jìn)一個(gè)StateBackend里。StateBackend是flink抽象出來(lái)的一個(gè)用于保存狀態(tài)的接口。
2.FlatMap算子的checkpoint過(guò)程?
沒(méi)什么可說(shuō)的,就是調(diào)用了snapshotState()方法而已。
3.本operator chain的state保存過(guò)程?
細(xì)心的同學(xué)應(yīng)該注意到了,各個(gè)算子的snapshot方法只把自己的狀態(tài)保存到了StateBackend里,沒(méi)有寫入的持久化操作。這部分操作被放到了AbstractStreamOperator中,由flink統(tǒng)一負(fù)責(zé)持久化。其實(shí)不需要看源碼我們也能想出來(lái),持久化無(wú)非就是把這些數(shù)據(jù)用一個(gè)流寫到磁盤或者別的地方,接下來(lái)我們來(lái)看看是不是這樣:
?
那么這個(gè)operatorStateBackend是怎么保存狀態(tài)的呢?- 首先把各個(gè)算子的state做了一份深拷貝;
- 然后以異步的方式執(zhí)行了一個(gè)內(nèi)部類的runnable,該內(nèi)部類的run方法實(shí)現(xiàn)了一個(gè)模版方法,首先打開(kāi)stream,然后寫入數(shù)據(jù),然后再關(guān)閉stream。
我們來(lái)看看這個(gè)寫入數(shù)據(jù)的方法:
public SnapshotResult<OperatorStateHandle> performOperation() throws Exception {long asyncStartTime = System.currentTimeMillis();CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;// get the registered operator state infos ...List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots =new ArrayList<>(registeredOperatorStatesDeepCopies.size());for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) {operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... write them all in the checkpoint stream ...DataOutputView dov = new DataOutputViewStreamWrapper(localOut);OperatorBackendSerializationProxy backendSerializationProxy =new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);backendSerializationProxy.write(dov);......}?
注釋寫的很清楚,我就不多說(shuō)了。
4.后繼operatorChain的checkpoint過(guò)程?
前面說(shuō)到,在flink的流中,barrier流過(guò)時(shí)會(huì)觸發(fā)checkpoint。在上面第1步中,上游節(jié)點(diǎn)已經(jīng)發(fā)出了Barrier,所以在我們的keyed aggregation -> sink 這個(gè)operatorchain中,我們將首先捕獲這個(gè)barrier。
捕獲barrier的過(guò)程其實(shí)就是處理input數(shù)據(jù)的過(guò)程,對(duì)應(yīng)著StreamInputProcessor.processInput()方法,該方法我們?cè)诘谒恼乱呀?jīng)講過(guò),這里我們簡(jiǎn)單回顧一下:
//每個(gè)元素都會(huì)觸發(fā)這一段邏輯,如果下一個(gè)數(shù)據(jù)是buffer,則從外圍的while循環(huán)里進(jìn)入處理用戶數(shù)據(jù)的邏輯;這個(gè)方法里默默的處理了barrier的邏輯final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if (bufferOrEvent != null) {if (bufferOrEvent.isBuffer()) {currentChannel = bufferOrEvent.getChannelIndex();currentRecordDeserializer = recordDeserializers[currentChannel];currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else {// Event receivedfinal AbstractEvent event = bufferOrEvent.getEvent();if (event.getClass() != EndOfPartitionEvent.class) {throw new IOException("Unexpected event: " + event);}}}?
處理barrier的過(guò)程在這段代碼里沒(méi)有體現(xiàn),因?yàn)楸话诹薵etNextNonBlocked()方法中,我們看下這個(gè)方法的核心邏輯:
//BarrierBuffer.getNextNonBlocked方法else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {if (!endOfStream) {// process barriers only if there is a chance of the checkpoint completingprocessBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}}else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());}?
先提一嘴,大家還記得之前的部分也提到過(guò)CheckpointMarker吧,這里正好也對(duì)上了。
處理barrier也是個(gè)麻煩事,大家回想一下5.1節(jié)提到的屏障的原理圖,一個(gè)opertor必須收到從每個(gè)inputchannel發(fā)過(guò)來(lái)的同一序號(hào)的barrier之后才能發(fā)起本節(jié)點(diǎn)的checkpoint,如果有的channel的數(shù)據(jù)處理的快了,那該barrier后的數(shù)據(jù)還需要緩存起來(lái),如果有的inputchannel被關(guān)閉了,那它就不會(huì)再發(fā)送barrier過(guò)來(lái)了:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {final long barrierId = receivedBarrier.getId();// fast path for single channel casesif (totalNumberOfInputChannels == 1) {if (barrierId > currentCheckpointId) {// new checkpointcurrentCheckpointId = barrierId;notifyCheckpoint(receivedBarrier);}return;}// -- general code path for multiple input channels --if (numBarriersReceived > 0) {// this is only true if some alignment is already progress and was not canceledif (barrierId == currentCheckpointId) {// regular caseonBarrier(channelIndex);}else if (barrierId > currentCheckpointId) {// we did not complete the current checkpoint, another started beforeLOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +"Skipping current checkpoint.", barrierId, currentCheckpointId);// let the task know we are not completing thisnotifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));// abort the current checkpointreleaseBlocksAndResetBarriers();// begin a the new checkpointbeginNewAlignment(barrierId, channelIndex);}else {// ignore trailing barrier from an earlier checkpoint (obsolete now)return;}}else if (barrierId > currentCheckpointId) {// first barrier of a new checkpointbeginNewAlignment(barrierId, channelIndex);}else {// either the current checkpoint was canceled (numBarriers == 0) or// this barrier is from an old subsumed checkpointreturn;}// check if we have all barriers - since canceled checkpoints always have zero barriers// this can only happen on a non canceled checkpointif (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {// actually trigger checkpointif (LOG.isDebugEnabled()) {LOG.debug("Received all barriers, triggering checkpoint {} at {}",receivedBarrier.getId(), receivedBarrier.getTimestamp());}releaseBlocksAndResetBarriers();notifyCheckpoint(receivedBarrier);}}?
總之,當(dāng)收到全部的barrier之后,就會(huì)觸發(fā)notifyCheckpoint(),該方法又會(huì)調(diào)用StreamTask的triggerCheckpoint,和之前的operator是一樣的。
如果還有后續(xù)的operator的話,就是完全相同的循環(huán),不再贅述。
5.報(bào)告完成checkpoint事件?
當(dāng)一個(gè)operator保存完checkpoint數(shù)據(jù)后,就會(huì)啟動(dòng)一個(gè)異步對(duì)象AsyncCheckpointRunnable,用以報(bào)告該檢查點(diǎn)已完成,其具體邏輯在reportCompletedSnapshotStates中。這個(gè)方法把任務(wù)又最終委托給了RpcCheckpointResponder這個(gè)類:
?
從這個(gè)類也可以看出來(lái),它的邏輯是通過(guò)rpc的方式遠(yuǎn)程調(diào)JobManager的相關(guān)方法完成報(bào)告事件,底層也是通過(guò)akka實(shí)現(xiàn)的。?
那么,誰(shuí)響應(yīng)了這個(gè)rpc調(diào)用呢?是該任務(wù)的JobMaster。
?
JobMaster反手就是一巴掌就把任務(wù)又rpc給了CheckpointCoordinator.receiveAcknowledgeMessage()方法。
之前提到,coordinator在觸發(fā)checkpoint時(shí),生成了一個(gè)PendingCheckpoint,保存了所有operator的id。
當(dāng)PendingCheckpoint收到一個(gè)operator的完成checkpoint的消息時(shí),它就把這個(gè)operator從未完成checkpoint的節(jié)點(diǎn)集合移動(dòng)到已完成的集合。當(dāng)所有的operator都報(bào)告完成了checkpoint時(shí),CheckpointCoordinator會(huì)觸發(fā)completePendingCheckpoint()方法,該方法做了以下事情:
- 把pendinCgCheckpoint轉(zhuǎn)換為CompletedCheckpoint
- 把CompletedCheckpoint加入已完成的檢查點(diǎn)集合,并從未完成檢查點(diǎn)集合刪除該檢查點(diǎn)
- 再度向各個(gè)operator發(fā)出rpc,通知該檢查點(diǎn)已完成
本文里,收到這個(gè)遠(yuǎn)程調(diào)用的就是那兩個(gè)operator chain,我們來(lái)看看其邏輯:
public void notifyCheckpointComplete(long checkpointId) throws Exception {synchronized (lock) {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());for (StreamOperator<?> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}}else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());}}}?
再接下來(lái)無(wú)非就是層層通知對(duì)應(yīng)的算子做出響應(yīng)罷了。至此,flink的兩階段提交的checkpoint邏輯全部完成。
5.3 承載checkpoint數(shù)據(jù)的抽象:State & StateBackend
State是快照數(shù)據(jù)的載體,StateBackend是快照如何被保存的抽象。
State分為 KeyedState和OperatorState,從名字就可以看出來(lái)分別對(duì)應(yīng)著keyedStream和其他的oeprator。從State由誰(shuí)管理上,也可以區(qū)分為raw state和Managed state。Flink管理的就是Managed state,用戶自己管理的就是raw state。Managed State又分為ValueState、ListState、ReducingState、AggregatingState、FoldingState、MapState這么幾種,看名字知用途。
StateBackend目前提供了三個(gè)backend,MemoryStateBackend,FsStateBackend,RocksDBStateBackend,都是看名字知用途系列。
State接口、StateBackend接口及其實(shí)現(xiàn)都比較簡(jiǎn)單,代碼就不貼了, 尤其State本質(zhì)上就是一層容器封裝。
貼個(gè)別人寫的狀態(tài)管理的文章吧:詳解Flink中的狀態(tài)管理
6.數(shù)據(jù)流轉(zhuǎn)——Flink的數(shù)據(jù)抽象及數(shù)據(jù)交換過(guò)程
本章打算講一下flink底層是如何定義和在操作符之間傳遞數(shù)據(jù)的。
6.1 flink的數(shù)據(jù)抽象
6.1.1 MemorySegment
Flink作為一個(gè)高效的流框架,為了避免JVM的固有缺陷(java對(duì)象存儲(chǔ)密度低,FGC影響吞吐和響應(yīng)等),必然走上自主管理內(nèi)存的道路。
這個(gè)MemorySegment就是Flink的內(nèi)存抽象。默認(rèn)情況下,一個(gè)MemorySegment可以被看做是一個(gè)32kb大的內(nèi)存塊的抽象。這塊內(nèi)存既可以是JVM里的一個(gè)byte[],也可以是堆外內(nèi)存(DirectByteBuffer)。
如果說(shuō)byte[]數(shù)組和direct memory是最底層的存儲(chǔ),那么memorysegment就是在其上覆蓋的一層統(tǒng)一抽象。它定義了一系列抽象方法,用于控制和底層內(nèi)存的交互,如:
public abstract class MemorySegment {public abstract byte get(int index);public abstract void put(int index, byte b);public int size() ;public abstract ByteBuffer wrap(int offset, int length);...... }?
我們可以看到,它在提供了諸多直接操作內(nèi)存的方法外,還提供了一個(gè)wrap()方法,將自己包裝成一個(gè)ByteBuffer,我們待會(huì)兒講這個(gè)ByteBuffer。
Flink為MemorySegment提供了兩個(gè)實(shí)現(xiàn)類:HeapMemorySegment和HybridMemorySegment。他們的區(qū)別在于前者只能分配堆內(nèi)存,而后者能用來(lái)分配堆內(nèi)和堆外內(nèi)存。事實(shí)上,Flink框架里,只使用了后者。這是為什么呢?
如果HybridMemorySegment只能用于分配堆外內(nèi)存的話,似乎更合常理。但是在JVM的世界中,如果一個(gè)方法是一個(gè)虛方法,那么每次調(diào)用時(shí),JVM都要花時(shí)間去確定調(diào)用的到底是哪個(gè)子類實(shí)現(xiàn)的該虛方法(方法重寫機(jī)制,不明白的去看JVM的invokeVirtual指令),也就意味著每次都要去翻方法表;而如果該方法雖然是個(gè)虛方法,但實(shí)際上整個(gè)JVM里只有一個(gè)實(shí)現(xiàn)(就是說(shuō)只加載了一個(gè)子類進(jìn)來(lái)),那么JVM會(huì)很聰明的把它去虛化處理,這樣就不用每次調(diào)用方法時(shí)去找方法表了,能夠大大提升性能。但是只分配堆內(nèi)或者堆外內(nèi)存不能滿足我們的需要,所以就出現(xiàn)了HybridMemorySegment同時(shí)可以分配兩種內(nèi)存的設(shè)計(jì)。
我們可以看看HybridMemorySegment的構(gòu)造代碼:
HybridMemorySegment(ByteBuffer buffer, Object owner) {super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);this.offHeapBuffer = buffer;}HybridMemorySegment(byte[] buffer, Object owner) {super(buffer, owner);this.offHeapBuffer = null;}?
其中,第一個(gè)構(gòu)造函數(shù)的checkBufferAndGetAddress()方法能夠得到direct buffer的內(nèi)存地址,因此可以操作堆外內(nèi)存。
6.1.2 ByteBuffer與NetworkBufferPool
在MemorySegment這個(gè)抽象之上,Flink在數(shù)據(jù)從operator內(nèi)的數(shù)據(jù)對(duì)象在向TaskManager上轉(zhuǎn)移,預(yù)備被發(fā)給下個(gè)節(jié)點(diǎn)的過(guò)程中,使用的抽象或者說(shuō)內(nèi)存對(duì)象是Buffer。
注意,這個(gè)Buffer是個(gè)flink接口,不是java.nio提供的那個(gè)Buffer抽象類。Flink在這一層面同時(shí)使用了這兩個(gè)同名概念,用來(lái)存儲(chǔ)對(duì)象,直接看代碼時(shí)到處都是各種xxxBuffer很容易混淆:
- java提供的那個(gè)Buffer抽象類在這一層主要用于構(gòu)建HeapByteBuffer,這個(gè)主要是當(dāng)數(shù)據(jù)從jvm里的一個(gè)對(duì)象被序列化成字節(jié)數(shù)組時(shí)用的;
- Flink的這個(gè)Buffer接口主要是一種flink層面用于傳輸數(shù)據(jù)和事件的統(tǒng)一抽象,其實(shí)現(xiàn)類是NetworkBuffer,是對(duì)MemorySegment的包裝。Flink在各個(gè)TaskManager之間傳遞數(shù)據(jù)時(shí),使用的是這一層的抽象。
因?yàn)锽uffer的底層是MemorySegment,這可能不是JVM所管理的,所以為了知道什么時(shí)候一個(gè)Buffer用完了可以回收,Flink引入了引用計(jì)數(shù)的概念,當(dāng)確認(rèn)這個(gè)buffer沒(méi)有人引用,就可以回收這一片MemorySegment用于別的地方了(JVM的垃圾回收為啥不用引用計(jì)數(shù)?讀者思考一下):
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {private volatile int refCnt = 1;...... }?
為了方便管理NetworkBuffer,Flink提供了BufferPoolFactory,并且提供了唯一實(shí)現(xiàn)NetworkBufferPool,這是個(gè)工廠模式的應(yīng)用。
NetworkBufferPool在每個(gè)TaskManager上只有一個(gè),負(fù)責(zé)所有子task的內(nèi)存管理。其實(shí)例化時(shí)就會(huì)嘗試獲取所有可由它管理的內(nèi)存(對(duì)于堆內(nèi)存來(lái)說(shuō),直接獲取所有內(nèi)存并放入老年代,并令用戶對(duì)象只在新生代存活,可以極大程度的減少Full GC),我們看看其構(gòu)造方法:
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {......try {this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);}catch (OutOfMemoryError err) {throw new OutOfMemoryError("Could not allocate buffer queue of length "+ numberOfSegmentsToAllocate + " - " + err.getMessage());}try {for (int i = 0; i < numberOfSegmentsToAllocate; i++) {ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));}}......long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",allocatedMb, availableMemorySegments.size(), segmentSize);}?
由于NetworkBufferPool只是個(gè)工廠,實(shí)際的內(nèi)存池是LocalBufferPool。每個(gè)TaskManager都只有一個(gè)NetworkBufferPool工廠,但是上面運(yùn)行的每個(gè)task都要有一個(gè)和其他task隔離的LocalBufferPool池,這從邏輯上很好理解。另外,NetworkBufferPool會(huì)計(jì)算自己所擁有的所有內(nèi)存分片數(shù),在分配新的內(nèi)存池時(shí)對(duì)每個(gè)內(nèi)存池應(yīng)該占有的內(nèi)存分片數(shù)重分配,步驟是:
- 首先,從整個(gè)工廠管理的內(nèi)存片中拿出所有的內(nèi)存池所需要的最少Buffer數(shù)目總和
- 如果正好分配完,就結(jié)束
- 其次,把所有的剩下的沒(méi)分配的內(nèi)存片,按照每個(gè)LocalBufferPool內(nèi)存池的剩余想要容量大小進(jìn)行按比例分配
- 剩余想要容量大小是這么個(gè)東西:如果該內(nèi)存池至少需要3個(gè)buffer,最大需要10個(gè)buffer,那么它的剩余想要容量就是7
實(shí)現(xiàn)代碼如下:
private void redistributeBuffers() throws IOException {assert Thread.holdsLock(factoryLock);// All buffers, which are not among the required onesfinal int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;if (numAvailableMemorySegment == 0) {// in this case, we need to redistribute buffers so that every pool gets its minimumfor (LocalBufferPool bufferPool : allBufferPools) {bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());}return;}long totalCapacity = 0; // long to avoid int overflowfor (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();totalCapacity += Math.min(numAvailableMemorySegment, excessMax);}// no capacity to receive additional buffers?if (totalCapacity == 0) {return; // necessary to avoid div by zero when nothing to re-distribute}final int memorySegmentsToDistribute = MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, totalCapacity));long totalPartsUsed = 0; // of totalCapacityint numDistributedMemorySegment = 0;for (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();// shortcutif (excessMax == 0) {continue;}totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);final int mySize = MathUtils.checkedDownCast(memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);numDistributedMemorySegment += mySize;bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);}assert (totalPartsUsed == totalCapacity);assert (numDistributedMemorySegment == memorySegmentsToDistribute);}?
接下來(lái)說(shuō)說(shuō)這個(gè)LocalBufferPool內(nèi)存池。?
LocalBufferPool的邏輯想想無(wú)非是增刪改查,值得說(shuō)的是其fields:
?
承接NetworkBufferPool的重分配方法,我們來(lái)看看LocalBufferPool的setNumBuffers()方法,代碼很短,邏輯也相當(dāng)簡(jiǎn)單,就不展開(kāi)說(shuō)了:
public void setNumBuffers(int numBuffers) throws IOException {synchronized (availableMemorySegments) {checkArgument(numBuffers >= numberOfRequiredMemorySegments,"Buffer pool needs at least %s buffers, but tried to set to %s",numberOfRequiredMemorySegments, numBuffers);if (numBuffers > maxNumberOfMemorySegments) {currentPoolSize = maxNumberOfMemorySegments;} else {currentPoolSize = numBuffers;}returnExcessMemorySegments();// If there is a registered owner and we have still requested more buffers than our// size, trigger a recycle via the owner.if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);}}}?
6.1.3 RecordWriter與Record
我們接著往高層抽象走,剛剛提到了最底層內(nèi)存抽象是MemorySegment,用于數(shù)據(jù)傳輸?shù)氖荁uffer,那么,承上啟下對(duì)接從Java對(duì)象轉(zhuǎn)為Buffer的中間對(duì)象是什么呢?是StreamRecord。
從StreamRecord<T>這個(gè)類名字就可以看出來(lái),這個(gè)類就是個(gè)wrap,里面保存了原始的Java對(duì)象。另外,StreamRecord還保存了一個(gè)timestamp。
那么這個(gè)對(duì)象是怎么變成LocalBufferPool內(nèi)存池里的一個(gè)大號(hào)字節(jié)數(shù)組的呢?借助了StreamWriter這個(gè)類。
我們直接來(lái)看把數(shù)據(jù)序列化交出去的方法:
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {RecordSerializer<T> serializer = serializers[targetChannel];SerializationResult result = serializer.addRecord(record);while (result.isFullBuffer()) {if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {// If this was a full record, we are done. Not breaking// out of the loop at this point will lead to another// buffer request before breaking out (that would not be// a problem per se, but it can lead to stalls in the// pipeline).if (result.isFullRecord()) {break;}}BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");if (flushAlways) {targetPartition.flush(targetChannel);}}?
先說(shuō)最后一行,如果配置為flushAlways,那么會(huì)立刻把元素發(fā)送出去,但是這樣吞吐量會(huì)下降;Flink的默認(rèn)設(shè)置其實(shí)也不是一個(gè)元素一個(gè)元素的發(fā)送,是單獨(dú)起了一個(gè)線程,每隔固定時(shí)間flush一次所有channel,較真起來(lái)也算是mini batch了。
再說(shuō)序列化那一句:SerializationResult result = serializer.addRecord(record);。在這行代碼中,Flink把對(duì)象調(diào)用該對(duì)象所屬的序列化器序列化為字節(jié)數(shù)組。
6.2 數(shù)據(jù)流轉(zhuǎn)過(guò)程
上一節(jié)講了各層數(shù)據(jù)的抽象,這一節(jié)講講數(shù)據(jù)在各個(gè)task之間exchange的過(guò)程。
6.2.1 整體過(guò)程
看這張圖:?
6.2.2 數(shù)據(jù)跨task傳遞
本節(jié)講一下算子之間具體的數(shù)據(jù)傳輸過(guò)程。也先上一張圖:?
?
數(shù)據(jù)在task之間傳遞有如下幾步:
數(shù)據(jù)在不同機(jī)器的算子之間傳遞的步驟就是以上這些。
了解了步驟之后,再來(lái)看一下部分關(guān)鍵代碼:?
首先是把數(shù)據(jù)交給recordwriter。
?
然后recordwriter把數(shù)據(jù)發(fā)送到對(duì)應(yīng)的通道。
//RecordWriter.javapublic void emit(T record) throws IOException, InterruptedException {//channelselector登場(chǎng)了for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {sendToTarget(record, targetChannel);}}private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {//選擇序列化器并序列化數(shù)據(jù)RecordSerializer<T> serializer = serializers[targetChannel];SerializationResult result = serializer.addRecord(record);while (result.isFullBuffer()) {if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {// If this was a full record, we are done. Not breaking// out of the loop at this point will lead to another// buffer request before breaking out (that would not be// a problem per se, but it can lead to stalls in the// pipeline).if (result.isFullRecord()) {break;}}BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);//寫入channelresult = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");if (flushAlways) {targetPartition.flush(targetChannel);}}?
接下來(lái)是把數(shù)據(jù)推給底層設(shè)施(netty)的過(guò)程:
//ResultPartition.java@Overridepublic void flushAll() {for (ResultSubpartition subpartition : subpartitions) {subpartition.flush();}}//PartitionRequestQueue.javavoid notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {//這里交給了netty server線程去推ctx.executor().execute(new Runnable() {@Overridepublic void run() {ctx.pipeline().fireUserEventTriggered(reader);}});}?
netty相關(guān)的部分:
//AbstractChannelHandlerContext.javapublic ChannelHandlerContext fireUserEventTriggered(final Object event) {if (event == null) {throw new NullPointerException("event");} else {final AbstractChannelHandlerContext next = this.findContextInbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeUserEventTriggered(event);} else {executor.execute(new OneTimeTask() {public void run() {next.invokeUserEventTriggered(event);}});}return this;}}?
最后真實(shí)的寫入:
//PartittionRequesetQueue.javaprivate void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {return;}// Queue an available reader for consumption. If the queue is empty,// we try trigger the actual write. Otherwise this will be handled by// the writeAndFlushNextMessageIfPossible calls.boolean triggerWrite = availableReaders.isEmpty();registerAvailableReader(reader);if (triggerWrite) {writeAndFlushNextMessageIfPossible(ctx.channel());}}private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {......next = reader.getNextBuffer();if (next == null) {if (!reader.isReleased()) {continue;}markAsReleased(reader.getReceiverId());Throwable cause = reader.getFailureCause();if (cause != null) {ErrorResponse msg = new ErrorResponse(new ProducerFailedException(cause),reader.getReceiverId());ctx.writeAndFlush(msg);}} else {// This channel was now removed from the available reader queue.// We re-add it into the queue if it is still availableif (next.moreAvailable()) {registerAvailableReader(reader);}BufferResponse msg = new BufferResponse(next.buffer(),reader.getSequenceNumber(),reader.getReceiverId(),next.buffersInBacklog());if (isEndOfPartitionEvent(next.buffer())) {reader.notifySubpartitionConsumed();reader.releaseAllResources();markAsReleased(reader.getReceiverId());}// Write and flush and wait until this is done before// trying to continue with the next buffer.channel.writeAndFlush(msg).addListener(writeListener);return;}......}?
上面這段代碼里第二個(gè)方法中調(diào)用的writeAndFlush(msg)就是真正往netty的nio通道里寫入的地方了。在這里,寫入的是一個(gè)RemoteInputChannel,對(duì)應(yīng)的就是下游節(jié)點(diǎn)的InputGate的channels。
有寫就有讀,nio通道的另一端需要讀入buffer,代碼如下:
//CreditBasedPartitionRequestClientHandler.javaprivate void decodeMsg(Object msg) throws Throwable {final Class<?> msgClazz = msg.getClass();// ---- Buffer --------------------------------------------------------if (msgClazz == NettyMessage.BufferResponse.class) {NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);if (inputChannel == null) {bufferOrEvent.releaseBuffer();cancelRequestFor(bufferOrEvent.receiverId);return;}decodeBufferOrEvent(inputChannel, bufferOrEvent);} ......}?
插一句,Flink其實(shí)做阻塞和獲取數(shù)據(jù)的方式非常自然,利用了生產(chǎn)者和消費(fèi)者模型,當(dāng)獲取不到數(shù)據(jù)時(shí),消費(fèi)者自然阻塞;當(dāng)數(shù)據(jù)被加入隊(duì)列,消費(fèi)者被notify。Flink的背壓機(jī)制也是借此實(shí)現(xiàn)。
然后在這里又反序列化成StreamRecord:
//StreamElementSerializer.javapublic StreamElement deserialize(DataInputView source) throws IOException {int tag = source.readByte();if (tag == TAG_REC_WITH_TIMESTAMP) {long timestamp = source.readLong();return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);}else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {return new StreamRecord<T>(typeSerializer.deserialize(source));}else if (tag == TAG_WATERMARK) {return new Watermark(source.readLong());}else if (tag == TAG_STREAM_STATUS) {return new StreamStatus(source.readInt());}else if (tag == TAG_LATENCY_MARKER) {return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());}else {throw new IOException("Corrupt stream, found tag: " + tag);}}?
然后再次在StreamInputProcessor.processInput()循環(huán)中得到處理。
至此,數(shù)據(jù)在跨jvm的節(jié)點(diǎn)之間的流轉(zhuǎn)過(guò)程就講完了。
6.3 Credit漫談
在看上一部分的代碼時(shí),有一個(gè)小細(xì)節(jié)不知道讀者有沒(méi)有注意到,我們的數(shù)據(jù)發(fā)送端的代碼叫做PartittionRequesetQueue.java,而我們的接收端卻起了一個(gè)完全不相干的名字:CreditBasedPartitionRequestClientHandler.java。為什么前面加了CreditBased的前綴呢?
6.3.1 背壓?jiǎn)栴}
在流模型中,我們期待數(shù)據(jù)是像水流一樣平滑的流過(guò)我們的引擎,但現(xiàn)實(shí)生活不會(huì)這么美好。數(shù)據(jù)的上游可能因?yàn)楦鞣N原因數(shù)據(jù)量暴增,遠(yuǎn)遠(yuǎn)超出了下游的瞬時(shí)處理能力(回憶一下98年大洪水),導(dǎo)致系統(tǒng)崩潰。?
那么框架應(yīng)該怎么應(yīng)對(duì)呢?和人類處理自然災(zāi)害的方式類似,我們修建了三峽大壩,當(dāng)洪水來(lái)臨時(shí)把大量的水囤積在大壩里;對(duì)于Flink來(lái)說(shuō),就是在數(shù)據(jù)的接收端和發(fā)送端放置了緩存池,用以緩沖數(shù)據(jù),并且設(shè)置閘門阻止數(shù)據(jù)向下流。
那么Flink又是如何處理背壓的呢?答案也是靠這些緩沖池。?
?
這張圖說(shuō)明了Flink在生產(chǎn)和消費(fèi)數(shù)據(jù)時(shí)的大致情況。ResultPartition和InputGate在輸出和輸入數(shù)據(jù)時(shí),都要向NetworkBufferPool申請(qǐng)一塊MemorySegment作為緩存池。?
接下來(lái)的情況和生產(chǎn)者消費(fèi)者很類似。當(dāng)數(shù)據(jù)發(fā)送太多,下游處理不過(guò)來(lái)了,那么首先InputChannel會(huì)被填滿,然后是InputChannel能申請(qǐng)到的內(nèi)存達(dá)到最大,于是下游停止讀取數(shù)據(jù),上游負(fù)責(zé)發(fā)送數(shù)據(jù)的nettyServer會(huì)得到響應(yīng),停止從ResultSubPartition讀取緩存,那么ResultPartition很快也將存滿數(shù)據(jù)不能被消費(fèi),從而生產(chǎn)數(shù)據(jù)的邏輯被阻塞在獲取新buffer上,非常自然地形成背壓的效果。
Flink自己做了個(gè)試驗(yàn)用以說(shuō)明這個(gè)機(jī)制的效果:?
?
我們首先設(shè)置生產(chǎn)者的發(fā)送速度為60%,然后下游的算子以同樣的速度處理數(shù)據(jù)。然后我們將下游算子的處理速度降低到30%,可以看到上游的生產(chǎn)者的數(shù)據(jù)產(chǎn)生曲線幾乎與消費(fèi)者同步下滑。而后當(dāng)我們解除限速,整個(gè)流的速度立刻提高到了100%。
6.3.2 使用Credit實(shí)現(xiàn)ATM網(wǎng)絡(luò)流控
上文已經(jīng)提到,對(duì)于流量控制,一個(gè)樸素的思路就是在長(zhǎng)江上建三峽鏈路上建立一個(gè)攔截的dam,如下圖所示:?
?
基于Credit的流控就是這樣一種建立在信用(消費(fèi)數(shù)據(jù)的能力)上的,面向每個(gè)虛鏈路(而非端到端的)流模型,如下圖所示:?
?
首先,下游會(huì)向上游發(fā)送一條credit message,用以通知其目前的信用(可聯(lián)想信用卡的可用額度),然后上游會(huì)根據(jù)這個(gè)信用消息來(lái)決定向下游發(fā)送多少數(shù)據(jù)。當(dāng)上游把數(shù)據(jù)發(fā)送給下游時(shí),它就從下游的信用卡上劃走相應(yīng)的額度(credit balance):?
?
下游總共獲得的credit數(shù)目是Buf_Alloc,已經(jīng)消費(fèi)的數(shù)據(jù)是Fwd_Cnt,上游發(fā)送出來(lái)的數(shù)據(jù)是Tx_Cnt,那么剩下的那部分就是Crd_Bal:?
Crd_Bal = Buf_Alloc - ( Tx_Cnt - Fwd_Cnt )?
上面這個(gè)式子應(yīng)該很好理解。
可以看到,Credit Based Flow Control的關(guān)鍵是buffer分配。這種分配可以在數(shù)據(jù)的發(fā)送端完成,也可以在接收端完成。對(duì)于下游可能有多個(gè)上游節(jié)點(diǎn)的情況(比如Flink),使用接收端的credit分配更加合理:?
?
上圖中,接收者可以觀察到每個(gè)上游連接的帶寬情況,而上游的節(jié)點(diǎn)Snd1卻不可能輕易知道發(fā)往同一個(gè)下游節(jié)點(diǎn)的其他Snd2的帶寬情況,從而如果在上游控制流量將會(huì)很困難,而在下游控制流量將會(huì)很方便。
因此,這就是為何Flink在接收端有一個(gè)基于Credit的Client,而不是在發(fā)送端有一個(gè)CreditServer的原因。
最后,再講一下Credit的面向虛鏈路的流設(shè)計(jì)和端到端的流設(shè)計(jì)的區(qū)別:?
?
如上圖所示,a是面向連接的流設(shè)計(jì),b是端到端的流設(shè)計(jì)。其中,a的設(shè)計(jì)使得當(dāng)下游節(jié)點(diǎn)3因某些情況必須緩存數(shù)據(jù)暫緩處理時(shí),每個(gè)上游節(jié)點(diǎn)(1和2)都可以利用其緩存保存數(shù)據(jù);而端到端的設(shè)計(jì)b里,只有節(jié)點(diǎn)3的緩存才可以用于保存數(shù)據(jù)(讀者可以從如何實(shí)現(xiàn)上想想為什么)。
對(duì)流控制感興趣的讀者,可以看這篇文章:Traffic Management For High-Speed Networks。
7.其他核心概念
截至第六章,和執(zhí)行過(guò)程相關(guān)的部分就全部講完,告一段落了。第七章主要講一點(diǎn)雜七雜八的內(nèi)容,有時(shí)間就不定期更新。
7.1 EventTime時(shí)間模型
flink有三種時(shí)間模型:ProcessingTime,EventTime和IngestionTime。?
關(guān)于時(shí)間模型看這張圖:?
?
從這張圖里可以很清楚的看到三種Time模型的區(qū)別。
- EventTime是數(shù)據(jù)被生產(chǎn)出來(lái)的時(shí)間,可以是比如傳感器發(fā)出信號(hào)的時(shí)間等(此時(shí)數(shù)據(jù)還沒(méi)有被傳輸給flink)。
- IngestionTime是數(shù)據(jù)進(jìn)入flink的時(shí)間,也就是從Source進(jìn)入flink流的時(shí)間(此時(shí)數(shù)據(jù)剛剛被傳給flink)
- ProcessingTime是針對(duì)當(dāng)前算子的系統(tǒng)時(shí)間,是指該數(shù)據(jù)已經(jīng)進(jìn)入某個(gè)operator時(shí),operator所在系統(tǒng)的當(dāng)前時(shí)間
例如,我在寫這段話的時(shí)間是2018年5月13日03點(diǎn)47分,但是我引用的這張EventTime的圖片,是2015年畫出來(lái)的,那么這張圖的EventTime是2015年,而ProcessingTime是現(xiàn)在。?
Flink官網(wǎng)對(duì)于時(shí)間戳的解釋非常詳細(xì):點(diǎn)我?
Flink對(duì)于EventTime模型的實(shí)現(xiàn),依賴的是一種叫做watermark的對(duì)象。watermark是攜帶有時(shí)間戳的一個(gè)對(duì)象,會(huì)按照程序的要求被插入到數(shù)據(jù)流中,用以標(biāo)志某個(gè)事件在該時(shí)間發(fā)生了。?
我再做一點(diǎn)簡(jiǎn)短的說(shuō)明,還是以官網(wǎng)的圖為例:?
?
對(duì)于有序到來(lái)的數(shù)據(jù),假設(shè)我們?cè)趖imestamp為11的元素后加入一個(gè)watermark,時(shí)間記錄為11,則下個(gè)元素收到該watermark時(shí),認(rèn)為所有早于11的元素均已到達(dá)。這是非常理想的情況。?
?
而在現(xiàn)實(shí)生活中,經(jīng)常會(huì)遇到亂序的數(shù)據(jù)。這時(shí),我們雖然在timestamp為7的元素后就收到了11,但是我們一直等到了收到元素12之后,才插入了watermark為11的元素。與上面的圖相比,如果我們?nèi)匀辉?1后就插入11的watermark,那么元素9就會(huì)被丟棄,造成數(shù)據(jù)丟失。而我們?cè)?2之后插入watermark11,就保證了9仍然會(huì)被下一個(gè)operator處理。當(dāng)然,我們不可能無(wú)限制的永遠(yuǎn)等待遲到元素,所以要在哪個(gè)元素后插入11需要根據(jù)實(shí)際場(chǎng)景權(quán)衡。
對(duì)于來(lái)自多個(gè)數(shù)據(jù)源的watermark,可以看這張圖:?
?
可以看到,當(dāng)一個(gè)operator收到多個(gè)watermark時(shí),它遵循最小原則(或者說(shuō)最早),即算子的當(dāng)前watermark是流經(jīng)該算子的最小watermark,以容許來(lái)自不同的source的亂序數(shù)據(jù)到來(lái)。?
關(guān)于事件時(shí)間模型,更多內(nèi)容可以參考Stream 101?和谷歌的這篇論文:Dataflow Model paper
7.2 FLIP-6 部署及處理模型演進(jìn)
就在老白寫這篇blog的時(shí)候,Flink發(fā)布了其1.5 RELEASE版本,號(hào)稱實(shí)現(xiàn)了其部署及處理模型(也就是FLIP-6),所以打算簡(jiǎn)略地說(shuō)一下FLIP-6的主要內(nèi)容。
7.2.1 現(xiàn)有模型不足
1.5之前的Flink模型有很多不足,包括:
- 只能靜態(tài)分配計(jì)算資源
- 在YARN上所有的資源分配都是一碗水端平的
- 與Docker/k8s的集成非常之蠢,頗有脫褲子放屁的神韻
- JobManager沒(méi)有任務(wù)調(diào)度邏輯
- 任務(wù)在YARN上執(zhí)行結(jié)束后web dashboard就不可用
- 集群的session模式和per job模式混淆難以理解
就我個(gè)人而言,我覺(jué)得Flink有一個(gè)這里完全沒(méi)提到的不足才是最應(yīng)該修改的:針對(duì)任務(wù)的完全的資源隔離。尤其是如果用Standalone集群,一個(gè)用戶的task跑掛了TaskManager,然后拖垮了整個(gè)集群的情況簡(jiǎn)直不要太多。
7.2.2 核心變更
Single Job JobManager?
最重要的變更是一個(gè)JobManager只處理一個(gè)job。當(dāng)我們生成JobGraph時(shí)就順便起一個(gè)JobManager,這顯然更加自然。
ResourceManager?
其職責(zé)包括獲取新的TM和slot,通知失敗,釋放資源以及緩存TM以用于重用等。重要的是,這個(gè)組件要能做到掛掉時(shí)不要搞垮正在運(yùn)行的好好的任務(wù)。其職責(zé)和與JobManager、TaskManager的交互圖如下:?
TaskManager?
TM要與上面的兩個(gè)組件交互。與JobManager交互時(shí),要能提供slot,要能與所有給出slot的JM交互。丟失與JM的連接時(shí)要能試圖把本TM上的slot的情況通告給新JM,如果這一步失敗,就要能重新分配slot。?
與ResourceManager交互時(shí),要通知RM自己的資源和當(dāng)前的Job分配情況,能按照RM的要求分配資源或者關(guān)閉自身。
JobManager Slot Pool?
這個(gè)pool要持有所有分配給當(dāng)前job的slot資源,并且能在RM掛掉的情況下管理當(dāng)前已經(jīng)持有的slot。
Dispatcher?
需要一個(gè)Job的分發(fā)器的主要原因是在有的集群環(huán)境下我們可能需要一個(gè)統(tǒng)一的提交和監(jiān)控點(diǎn),以及替代之前的Standalone模式下的JobManager。將來(lái)對(duì)分發(fā)器的期望可能包括權(quán)限控制等。?
7.2.3 Cluster Manager的架構(gòu)
YARN?
新的基于YARN的架構(gòu)主要包括不再需要先在容器里啟動(dòng)集群,然后提交任務(wù);用戶代碼不再使用動(dòng)態(tài)ClassLoader加載;不用的資源可以釋放;可以按需分配不同大小的容器等。其執(zhí)行過(guò)程如下:?
無(wú)Dispatcher時(shí)?
?
有Dispatcher時(shí)?
Mesos?
與基于YARN的模式很像,但是只有帶Dispatcher模式,因?yàn)橹挥羞@樣才能在Mesos集群里跑其RM。?
?
Mesos的Fault Tolerance是類似這樣的:?
?
必須用類似Marathon之類的技術(shù)保證Dispatcher的HA。
Standalone?
其實(shí)沒(méi)啥可說(shuō)的,把以前的JobManager的職責(zé)換成現(xiàn)在的Dispatcher就行了。?
?
將來(lái)可能會(huì)實(shí)現(xiàn)一個(gè)類似于輕量級(jí)Yarn的模式。
Docker/k8s?
用戶定義好容器,至少有一個(gè)是job specific的(不然怎么啟動(dòng)任務(wù));還有用于啟動(dòng)TM的,可以不是job specific的。啟動(dòng)過(guò)程如下?
7.2.4 組件設(shè)計(jì)及細(xì)節(jié)
分配slot相關(guān)細(xì)節(jié)?
從新的TM取slot過(guò)程:?
從Cached TM取slot過(guò)程:?
失敗處理
TM失敗?
TM失敗時(shí),RM要能檢測(cè)到失敗,更新自己的狀態(tài),發(fā)送消息給JM,重啟一份TM;JM要能檢測(cè)到失敗,從狀態(tài)移除失效slot,標(biāo)記該TM的task為失敗,并在沒(méi)有足夠slot繼續(xù)任務(wù)時(shí)調(diào)整規(guī)模;TM自身則要能從Checkpoint恢復(fù)
RM失敗?
此時(shí)TM要能檢測(cè)到失敗,并準(zhǔn)備向新的RM注冊(cè)自身,并且向新的RM傳遞自身的資源情況;JM要能檢測(cè)到失敗并且等待新的RM可用,重新請(qǐng)求需要的資源;丟失的數(shù)據(jù)要能從Container、TM等處恢復(fù)。
JM失敗?
TM釋放所有task,向新JM注冊(cè)資源,并且如果不成功,就向RM報(bào)告這些資源可用于重分配;RM坐等;JM丟失的數(shù)據(jù)從持久化存儲(chǔ)中獲得,已完成的checkpoints從HA恢復(fù),從最近的checkpoint重啟task,并申請(qǐng)資源。
JM & RM 失敗?
TM將在一段時(shí)間內(nèi)試圖把資源交給新上任的JM,如果失敗,則把資源交給新的RM
TM & RM失敗?
JM如果正在申請(qǐng)資源,則要等到新的RM啟動(dòng)后才能獲得;JM可能需要調(diào)整其規(guī)模,因?yàn)閾p失了TM的slot。
8.后記
Flink是當(dāng)前流處理領(lǐng)域的優(yōu)秀框架,其設(shè)計(jì)思想和代碼實(shí)現(xiàn)都蘊(yùn)含著許多人的智慧結(jié)晶。這篇解讀花了很多時(shí)間,篇幅也寫了很長(zhǎng),也仍然不能能覆蓋Flink的方方面面,也肯定有很多錯(cuò)誤之處,歡迎大家批評(píng)指正!Flink生態(tài)里中文資料確實(shí)不多,對(duì)Flink源碼有興趣的讀者,可以參考VinoYang的專欄,繼續(xù)學(xué)習(xí)之旅。
本文至此結(jié)束。
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/articles/10647291.html
總結(jié)
以上是生活随笔為你收集整理的追源索骥:透过源码看懂Flink核心框架的执行流程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Flink JAR包上传和运行逻辑
- 下一篇: Spring Cloud Stream