日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Flink 必知必会经典课程四:Fault-tolerance in Flink

發(fā)布時(shí)間:2024/8/23 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 必知必会经典课程四:Fault-tolerance in Flink 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

簡(jiǎn)介:?本文由 Apache Flink PMC , 阿里巴巴高級(jí)技術(shù)專家李鈺分享,主要從有狀態(tài)的流計(jì)算、全局一致性快照 、Flink的容錯(cuò)機(jī)制、Flink的狀態(tài)管理 四個(gè)方面介紹 Flink 的容錯(cuò)機(jī)制原理。

作者 | 李鈺

分享人:本文由 Apache Flink PMC , 阿里巴巴高級(jí)技術(shù)專家李鈺分享,主要介紹 Flink 的容錯(cuò)機(jī)制原理,內(nèi)容大綱如下:

  • 有狀態(tài)的流計(jì)算
  • 全局一致性快照
  • Flink的容錯(cuò)機(jī)制
  • Flink的狀態(tài)管理
  • 一、有狀態(tài)的流計(jì)算

    流計(jì)算

    流計(jì)算是指有一個(gè)數(shù)據(jù)源可以持續(xù)不斷地發(fā)送消息,同時(shí)有一個(gè)常駐程序運(yùn)行代碼,從數(shù)據(jù)源拿到一個(gè)消息后會(huì)進(jìn)行處理,然后把結(jié)果輸出到下游。

    分布式流計(jì)算

    分布式流計(jì)算是指把輸入流以某種方式進(jìn)行一個(gè)劃分,再使用多個(gè)分布式實(shí)例對(duì)流進(jìn)行處理。

    流計(jì)算中的狀態(tài)

    計(jì)算可以分成有狀態(tài)和無(wú)狀態(tài)兩種,無(wú)狀態(tài)的計(jì)算只需要處理單一事件,有狀態(tài)的計(jì)算需要記錄并處理多個(gè)事件。

    舉個(gè)簡(jiǎn)單的例子。例如一個(gè)事件由事件ID和事件值兩部分組成,如果處理邏輯是每拿到一個(gè)事件,都解析并輸出它的事件值,那么這就是一個(gè)無(wú)狀態(tài)的計(jì)算;相反,如果每拿到一個(gè)狀態(tài),解析它的值出來(lái)后,需要和前一個(gè)事件值進(jìn)行比較,比前一個(gè)事件值大的時(shí)候才把它進(jìn)行輸出,這就是一個(gè)有狀態(tài)的計(jì)算。

    流計(jì)算中的狀態(tài)有很多種。比如在去重的場(chǎng)景下,會(huì)記錄所有的主鍵;又或者在窗口計(jì)算里,已經(jīng)進(jìn)入窗口還沒(méi)觸發(fā)的數(shù)據(jù),這也是流計(jì)算的狀態(tài);在機(jī)器學(xué)習(xí)/深度學(xué)習(xí)場(chǎng)景里,訓(xùn)練的模型及參數(shù)數(shù)據(jù)都是流計(jì)算的狀態(tài)。

    二、全局一致性快照

    全局一致性快照是可以用來(lái)給分布式系統(tǒng)做備份和故障恢復(fù)的機(jī)制。

    全局快照

    什么是全局快照

    全局快照首先是一個(gè)分布式應(yīng)用,它有多個(gè)進(jìn)程分布在多個(gè)服務(wù)器上;其次,它在應(yīng)用內(nèi)部有自己的處理邏輯和狀態(tài);第三,應(yīng)用間是可以互相通信的;第四,在這種分布式的應(yīng)用,有內(nèi)部狀態(tài),硬件可以通信的情況下,某一時(shí)刻的全局狀態(tài),就叫做全局的快照。

    為什么需要全局快照

    • 第一,用它來(lái)做檢查點(diǎn),可以定期對(duì)全局狀態(tài)做備份,當(dāng)應(yīng)用程序故障時(shí),就可以拿來(lái)恢復(fù);
    • 第二,做死鎖檢測(cè),進(jìn)行快照后當(dāng)前的程序繼續(xù)運(yùn)行,然后可以對(duì)快照進(jìn)行分析,看應(yīng)用程序是不是存在死鎖狀態(tài),如果是就可以進(jìn)行相應(yīng)的處理。

    全局快照舉例

    下圖為分布式系統(tǒng)中全局快照的示例。

    P1和P2是兩個(gè)進(jìn)程,它們之間有消息發(fā)送的管道,分別是C12和C21。對(duì)于 P1進(jìn)程來(lái)說(shuō), C12是它發(fā)送消息的管道,稱作output channel; C21是它接收消息的管道,稱作 input channel。

    除了管道,每個(gè)進(jìn)程都有一個(gè)本地的狀態(tài)。比如說(shuō)P1和P2每個(gè)進(jìn)程的內(nèi)存里都有XYZ三個(gè)變量和相應(yīng)的值。那么 P1和P2進(jìn)程的本地狀態(tài)和它們之間發(fā)送消息的管道狀態(tài),就是一個(gè)初始的全局狀態(tài),也可稱為全局快照。

    假設(shè)P1給P2發(fā)了一條消息,讓P2把x的狀態(tài)值從4改為7,但是這個(gè)消息在管道中,還沒(méi)到達(dá)P2。這個(gè)狀態(tài)也是一個(gè)全局快照。

    再接下來(lái),P2收到了P1的消息,但是還沒(méi)有處理,這個(gè)狀態(tài)也是一個(gè)全局快照。

    最后接到消息的P2把本地的X的值從4改為7,這也是一個(gè)全局快照。

    所以當(dāng)有事件發(fā)生的時(shí)候,全局的狀態(tài)就會(huì)發(fā)生改變。事件包括進(jìn)程發(fā)送消息、進(jìn)程接收消息和進(jìn)程修改自己的狀態(tài)。

    2.全局一致性快照

    假如說(shuō)有兩個(gè)事件,a和b,在絕對(duì)時(shí)間下,如果a發(fā)生在b之前,且b被包含在快照當(dāng)中,那么則a也被包含在快照當(dāng)中。滿足這個(gè)條件的全局快照,就稱為全局一致性快照。

    2.1 全局一致性快照的實(shí)現(xiàn)方法

    時(shí)鐘同步并不能實(shí)現(xiàn)全局一致性快照;全局同步雖然可以實(shí)現(xiàn),但是它的缺點(diǎn)也非常明顯,它會(huì)讓所有應(yīng)用程序都停下來(lái),會(huì)影響全局的性能。

    3.異步全局一致性快照算法 – Chandy-Lamport

    異步全局一致性快照算法Chandy-Lamport可以在不影響應(yīng)用程序運(yùn)行的前提下,實(shí)現(xiàn)全局一致性快照。

    Chandy-Lamport的系統(tǒng)要求有以下幾點(diǎn):

    • 第一,不影響應(yīng)用運(yùn)行,也就是不影響收發(fā)消息,不需要停止應(yīng)用程序;
    • 第二,每個(gè)進(jìn)程都可以記錄本地狀態(tài);
    • 第三,可以分布式地對(duì)已記錄的狀態(tài)進(jìn)行收集;
    • 第四,任意進(jìn)程都可以發(fā)起快照

    同時(shí),Chandy-Lamport算法可以執(zhí)行還有一個(gè)前提條件:消息有序且不重復(fù),并且消息可靠性可保障。

    3.1 Chandy-Lamport算法流程

    Chandy-Lamport的算法流程主要分為三個(gè)部分:發(fā)起快照、分布式的執(zhí)行快照和終止快照。

    發(fā)起快照

    任意進(jìn)程都可以發(fā)起快照。如下圖所示,當(dāng)由P1發(fā)起快照的時(shí)候,第一步需要記錄本地的狀態(tài),也就是對(duì)本地進(jìn)行快照,然后立刻向它所有 output channel發(fā)送一個(gè)marker消息,這中間是沒(méi)有時(shí)間間隙的。marker消息是一個(gè)特殊的消息,它不同于應(yīng)用之間傳遞的消息。

    發(fā)出Marker消息后,P1就會(huì)開(kāi)始記錄所有input channel的消息,也就是圖示C21管道的消息。

    分布式的執(zhí)行快照

    如下圖,先假定當(dāng) Pi接收到來(lái)自Cki的marker消息,也就是Pk發(fā)給Pi的marker消息。可以分兩種情況來(lái)看:

    第一種情況:這個(gè)是Pi收到的第一個(gè)來(lái)自其它管道的marker消息,它會(huì)先記錄一下本地的狀態(tài),再把 C12管道記為空,也就是說(shuō)后續(xù)再?gòu)?P1發(fā)消息,就不包含在此次快照里了,與此同時(shí)立刻向它所有output channel發(fā)送marker消息。 最后開(kāi)始記錄來(lái)自除Cki之外的所有input channel的消息。

    上面提到Cki消息不包含在實(shí)時(shí)快照里,但是實(shí)時(shí)消息還是會(huì)發(fā)生,所以第二種情況是,如果此前Pi已經(jīng)接收過(guò)marker消息,它會(huì)停止記錄 Cki消息,同時(shí)會(huì)將此前記錄的所有Cki消息作為Cki在本次快照中的最終狀態(tài)來(lái)保存。

    終止快照

    終止快照的條件有兩個(gè):

    • 第一,所有進(jìn)程都已經(jīng)接收到marker消息,并記錄在本地快照;
    • 第二,所有進(jìn)程都從它的n-1個(gè)input channel里收到了marker 消息,并記錄了管道狀態(tài)。

    當(dāng)快照終止,快照收集器 (Central Server) 就開(kāi)始收集每一個(gè)部分的快照去形成全局一致性快照了。

    示例展示

    在下圖的例子里,一些狀態(tài)是在內(nèi)部發(fā)生的,比如A,它跟其它進(jìn)程沒(méi)有交互。內(nèi)部狀態(tài)就是 P1發(fā)給自己消息,可以將A認(rèn)為是C11=[A->]。

    Chandy-Lamport全局一致性快照的算法是怎么執(zhí)行的呢?

    假設(shè)從p1來(lái)發(fā)起快照,它發(fā)起快照時(shí),首先對(duì)本地的狀態(tài)進(jìn)行快照,稱之為S1,然后立刻向它所有的output channel,即P2和P3,分別發(fā)marker消息,然后再去記錄它所有input channel的消息,即來(lái)自P2和P3及自身的消息。

    圖例所示,縱軸是絕對(duì)時(shí)間,按照絕對(duì)時(shí)間來(lái)看,為什么P3和P2收到marker消息會(huì)有時(shí)間差呢?因?yàn)榧偃邕@是一個(gè)真實(shí)的物理環(huán)境里的分布式進(jìn)程,不同節(jié)點(diǎn)之間的網(wǎng)絡(luò)狀況是不一樣的,這種情況會(huì)導(dǎo)致消息送達(dá)時(shí)間存在差異。

    P3先收到marker消息,且是它接收到的第一個(gè)marker消息。接收到消息后,它首先會(huì)對(duì)本地狀態(tài)進(jìn)行快照,然后把 C13管道的標(biāo)記成 close,與此同時(shí)開(kāi)始向它所有的output channel發(fā)送 marker消息,最后它會(huì)把來(lái)自除了C13之外的所有input channel的消息開(kāi)始進(jìn)行記錄。

    接收到P3發(fā)出的marker信息的是P1,但這不是它接收的第一個(gè)marker,它會(huì)把來(lái)自C31 channel的管道立刻關(guān)閉,并且把當(dāng)前的記錄消息做這個(gè)channel的快照,后續(xù)再接收到來(lái)自P3的消息,就不會(huì)更新在此次的快照狀態(tài)里了。

    接下來(lái)P2接收到來(lái)自P3的消息,這是它接到的第一個(gè)marker消息。接收到消息后,它首先對(duì)本地狀態(tài)進(jìn)行快照,然后把 C32管道的標(biāo)記成 close,與此同時(shí)開(kāi)始向它所有的output channel發(fā)送 marker消息,最后它會(huì)把來(lái)自除了C32之外的所有input channel的消息開(kāi)始進(jìn)行記錄。

    再來(lái)看P2接收到來(lái)自P1的消息,這不是P2接收到的第一個(gè)marker消息,所以它會(huì)把所有的 input channel全部關(guān)閉,并且記錄channel的狀態(tài)。

    接下來(lái)看P1接收到來(lái)自P2的消息,這也不是它接收的第一個(gè)消息。那么它就會(huì)把所有的input channel關(guān)閉,并把記錄的消息作為狀態(tài)。那么這里面有兩個(gè)狀態(tài),一個(gè)是C11,即自己發(fā)給自己的消息;一個(gè)是C21,是P2里H發(fā)給P1D的。

    最后一個(gè)時(shí)間點(diǎn),P3接收到來(lái)自P2的消息,這也不是它收到的第一個(gè)消息,操作跟上面介紹的一樣。在這期間P3本地有一個(gè)事件J,它也會(huì)把J作為它的狀態(tài)。

    當(dāng)所有進(jìn)程都記錄了本地狀態(tài),而且每一個(gè)進(jìn)程的所有輸入管道都已經(jīng)關(guān)閉了,那么全局一致性快照就結(jié)束了,也就是對(duì)過(guò)去時(shí)間點(diǎn)的全局性的狀態(tài)記錄完成了。

    3.3 Chandy-Lamport與 Flink之間的關(guān)系

    Flink 是分布式系統(tǒng),所以 Flink 會(huì)采用全局一致性快照的方式形成檢查點(diǎn),來(lái)支持故障恢復(fù)。Flink的異步全局一致性快照算法跟Chandy-Lamport算法的區(qū)別主要有以下幾點(diǎn):

    • 第一,Chandy-Lamput支持強(qiáng)連通圖,而 Flink支持弱連通圖;
    • 第二,Flink采用的是裁剪的(Tailored)Chandy-Lamput異步快照算法;
    • 第三,Flink的異步快照算法在DAG場(chǎng)景下不需要存儲(chǔ)Channel state,從而極大減少快照的存儲(chǔ)空間。

    三、Flink的容錯(cuò)機(jī)制

    容錯(cuò),就是恢復(fù)到出錯(cuò)前的狀態(tài)。流計(jì)算容錯(cuò)一致性保證有三種,分別是:Exactly once,At least once,At most once。

    • Exactly once,是指每條event會(huì)且只會(huì)對(duì)state產(chǎn)生一次影響,這里的“一次”并非端到端的嚴(yán)格一次,而是指在 Flink內(nèi)部只處理一次,不包括source和sink的處理。
    • At least once,是指每條event會(huì)對(duì)state產(chǎn)生最少一次影響,也就是存在重復(fù)處理的可能。
    • At most once,是指每條event會(huì)對(duì)state產(chǎn)生最多一次影響,就是狀態(tài)可能會(huì)在出錯(cuò)時(shí)丟失。

    端到端的Exactly once

    Exactly once的意思是,作業(yè)結(jié)果總是正確的,但是很可能產(chǎn)出多次;所以它的要求是需要有可重放的source。
    端到端的Exactly once,是指作業(yè)結(jié)果正確且只會(huì)被產(chǎn)出一次,它的要求除了有可重放的source外,還要求有事務(wù)型的sink和可以接收冪等的產(chǎn)出結(jié)果。

    Flink的狀態(tài)容錯(cuò)

    很多場(chǎng)景都會(huì)要求在Exactly once的語(yǔ)義,即處理且僅處理一次。如何確保語(yǔ)義呢?

    簡(jiǎn)單場(chǎng)景的 Exactly Once 容錯(cuò)方法

    簡(jiǎn)單場(chǎng)景的做法如下圖,方法就是,記錄本地狀態(tài)并且把 source的offset,即 Event log的位置記錄下來(lái)就好了。



    分布式場(chǎng)景的狀態(tài)容錯(cuò)

    如果是分布式場(chǎng)景,我們需要在不中斷運(yùn)算的前提下對(duì)多個(gè)擁有本地狀態(tài)的算子產(chǎn)生全局一致性快照。Flink 分布式場(chǎng)景的作業(yè)拓?fù)浔容^特殊,它是有向無(wú)環(huán)并且是弱聯(lián)通圖,可以采用裁剪的Chandy-Lamport,也就是只記錄所有輸入的offset和各個(gè)算子狀態(tài),并依賴rewindable source(可回溯的source,即可以通過(guò)offset讀取比較早一點(diǎn)時(shí)間點(diǎn)),從而不需要存儲(chǔ)channel的狀態(tài),這在存在聚合 (aggregation)邏輯的情況下可以節(jié)省大量的存儲(chǔ)空間。

    最后做恢復(fù),恢復(fù)就是把數(shù)據(jù)源的位置重新設(shè)定,然后每一個(gè)算子都從檢查點(diǎn)恢復(fù)狀態(tài)。

    3.Flink 的分布式快照方法

    首先在源數(shù)據(jù)流里插入Checkpoint barrier,也就是上文提到的Chandy-Lamport算法里的marker message,不同的Checkpoint barrier會(huì)把流自然地切分多個(gè)段,每個(gè)段都包含了Checkpoint的數(shù)據(jù);

    Flink 里有一個(gè)全局的Coordinator,它不像Chandy-Lamport對(duì)任意一個(gè)進(jìn)程都可以發(fā)起快照,這個(gè)集中式的 Coordinator會(huì)把Checkpoint barrier注入到每個(gè)source里,然后啟動(dòng)快照。當(dāng)每個(gè)節(jié)點(diǎn)收到barrier后,因?yàn)?Flink 里面它不存儲(chǔ) Channel state,所以它只需存儲(chǔ)本地的狀態(tài)就好。

    在做完了Checkpoint 后,每個(gè)算子的每個(gè)并發(fā)都會(huì)向Coordinator發(fā)送一個(gè)確認(rèn)消息,當(dāng)所有任務(wù)的確認(rèn)消息都被Checkpoint Coordinator接收,快照就結(jié)束了。

    4.流程演示

    見(jiàn)下圖示,假設(shè)Checkpoint N 被注入到 source里,這時(shí)source會(huì)先把它正在處理分區(qū)的offset記錄下來(lái)。

    隨著時(shí)間的流逝,它會(huì)把Checkpoint barrier發(fā)送到兩個(gè)并發(fā)的下游,當(dāng)barrier分別到達(dá)兩個(gè)并發(fā),這兩個(gè)并發(fā)會(huì)分別把它們本地的狀態(tài)都記錄在Checkpoint 的里:

    最后barrier到達(dá)最終的subtask,快照就完成了。

    這是比較簡(jiǎn)單的場(chǎng)景演示,每個(gè)算子只有單流的輸入,再來(lái)看下圖比較復(fù)雜的場(chǎng)景,算子有多流輸入的情況。

    當(dāng)算子有多個(gè)輸入,需要把Barrier 對(duì)齊。怎么把Barrier對(duì)齊呢?如下圖所示,在左側(cè)原本的狀態(tài)下,當(dāng)其中一條barrier到達(dá),另一條barrier命令上有的barrier還在管道中沒(méi)有到達(dá),這時(shí)會(huì)在保證Exactly once的情況下,把先到達(dá)的流直接阻塞掉,然后等待另一條流的數(shù)據(jù)處理。等到另外一條流也到達(dá)了,會(huì)把之前的流unblock,同時(shí)把barrier發(fā)送到算子。

    在這個(gè)過(guò)程中,阻塞掉其中一條流的作用是,會(huì)讓它產(chǎn)生反壓。Barrier 對(duì)齊會(huì)導(dǎo)致反壓和暫停operator的數(shù)據(jù)處理。

    如果不在對(duì)齊過(guò)程中阻塞已收到barrier的數(shù)據(jù)管道,數(shù)據(jù)持續(xù)不斷流進(jìn)來(lái),那么屬于下個(gè)Checkpoint的數(shù)據(jù)被包含在當(dāng)前的Checkpoint里,如果一旦發(fā)生故障恢復(fù)后,由于source會(huì)被rewind,部分?jǐn)?shù)據(jù)會(huì)有重復(fù)處理,這就是at-least-once。 如果能接收at-least-once,那么可以選擇其他可以避免barrier對(duì)齊帶來(lái)的副作用。另外也可以通過(guò)異步快照來(lái)盡量減少任務(wù)停頓并支持多個(gè)Checkpoint同時(shí)進(jìn)行。

    5.快照觸發(fā)

    本地快照同步上傳到系統(tǒng)需要state Copy-on-write的機(jī)制。

    假如對(duì)元數(shù)據(jù)信息做了快照之后數(shù)據(jù)處理恢復(fù)了,在上傳數(shù)據(jù)的過(guò)程中如何保證恢復(fù)的應(yīng)用程序邏輯不會(huì)修改正在上傳的數(shù)據(jù)呢?實(shí)際上不同狀態(tài)存儲(chǔ)后端的處理是不一樣的,Heap backend會(huì)觸發(fā)數(shù)據(jù)的copy-on-write,而對(duì)于RocksDB backend來(lái)說(shuō)LSM的特性可以保證已經(jīng)快照的數(shù)據(jù)不會(huì)被修改。

    四、Flink 的狀態(tài)管理

    1.Flink 狀態(tài)管理

    首先需要去定義一個(gè)狀態(tài),在下圖的例子里,先定義一個(gè)Value state。

    在定義的狀態(tài)的時(shí)候,需要給出以下的幾個(gè)信息:

    • 狀態(tài)識(shí)別ID
    • 狀態(tài)數(shù)據(jù)類型
    • 本地狀態(tài)后端注冊(cè)狀態(tài)
    • 本地狀態(tài)后端讀寫狀態(tài)

    2.Flink 狀態(tài)后端

    又叫state backend,Flink狀態(tài)后端有兩種;

    • 第一種,JVM Heap,它里面的數(shù)據(jù)是以Java對(duì)象形式存在的,讀寫也是以對(duì)象形式去完成的,所以速度很快。但是也存在兩個(gè)弊端:第一個(gè)弊端,以對(duì)象方式存儲(chǔ)所需的空間是磁盤上序列化壓縮后的數(shù)據(jù)大小的很多倍,所以占用的內(nèi)存空間很大;第二個(gè)弊端,雖然讀寫不用做序列化,但是在形成snapshot時(shí)需要做序列化,所以它的異步snapshot過(guò)程會(huì)比較慢。

    • 第二種,RocksDB,這個(gè)類型在讀寫時(shí)就需要做序列化,所以它讀寫的速度比較慢。但是它有一個(gè)好處,基于LSM的數(shù)據(jù)結(jié)構(gòu)在快照之后會(huì)形成sst文件,它的異步checkpoint過(guò)程就是文件拷貝的過(guò)程,CPU消耗會(huì)比較低。

    原文鏈接

    本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

    總結(jié)

    以上是生活随笔為你收集整理的Flink 必知必会经典课程四:Fault-tolerance in Flink的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。