Flink 容错机制:Checkpoints、Savepoints
文章目錄
- Checkpoints(檢查點)
- 恢復流程
- 生成策略
- Savepoints(保存點)
Checkpoints(檢查點)
Flink中基于異步輕量級的分布式快照技術(shù)提供了Checkpoints容錯機制,Checkpoints可以將同一時間點作業(yè)/算子的狀態(tài)數(shù)據(jù)全局統(tǒng)一快照處理,包括前面提到的算子狀態(tài)和鍵值分區(qū)狀態(tài)。當發(fā)生了故障后,Flink會將所有任務的狀態(tài)恢復至最后一次Checkpoint中的狀態(tài),并從那里重新開始執(zhí)行。
那么Checkpoints的生成策略是什么樣的呢?它會在什么時候進行快照的生成呢?
其實就是在所有任務都處理完同一個輸入數(shù)據(jù)流的時候,這時就會對當前全部任務的狀態(tài)進行一個拷貝,生成Checkpoints。
為了方便理解,這里先簡單的用一個樸素算法來解釋這一生成過程(Flink的Checkpoints算法實際要更加復雜,在下面會詳細講解)
恢復流程
為了方便進行實例的講解,假設當前有一個Source任務,負責從一個遞增的數(shù)字流(1、2、3、4……)中讀取數(shù)據(jù),讀取到的數(shù)據(jù)會分為奇數(shù)流和偶數(shù)流,求和算子的兩個任務會分別對它們進行求和。在當前任務中,數(shù)據(jù)源算子的任務會將輸入流的當前偏移量存為狀態(tài),求和算子的任務會將當前和存為狀態(tài)。
某流式應用的一致性檢查點如上圖,在當前生成的Checkpoints中保存的輸入偏移為5,偶數(shù)求和為6,奇數(shù)求和為9。
故障:任務sum_odd失敗假設在下一輪計算中,任務sum_odd計算出現(xiàn)了問題,任務sum_odd的時候產(chǎn)生了問題,導致結(jié)果出現(xiàn)錯誤。由于出現(xiàn)問題,為了防止從頭開始重復計算,此時會通過Checkpoints來進行快照的恢復。
Checkpoints恢復應用需要以下三個步驟
- 第一步我們需要先重啟整個應用,恢復到最原始的狀態(tài)。
- 緊接著從檢查點的快照信息中讀取出輸入源的偏移量以及算子計算的結(jié)果,進行狀態(tài)的恢復
- 狀態(tài)恢復完成后,繼續(xù)Checkpoints恢復的位置開始繼續(xù)處理。
從檢查點恢復后,它的內(nèi)部狀態(tài)會和生成檢查點的時候完全一致,并且會緊接著重新處理那些從之前檢查點完成開始,到發(fā)生系統(tǒng)故障之間已經(jīng)處理過的數(shù)據(jù)。雖然這意味著Flink會重復處理部分消息,但上述機制仍然可以實現(xiàn)精確一次的狀態(tài)一致性,因為所有的算子都會恢復到那些數(shù)據(jù)處理之前的時間點。
但這個機制仍然面臨一些問題,因為Checkpoints和恢復機制僅能重置應用內(nèi)部的狀態(tài),而應用所使用的Sink可能在恢復期間將結(jié)果向下游系統(tǒng)(如事件日志系統(tǒng)、文件系統(tǒng)或數(shù)據(jù)庫)重復發(fā)送多次。為了解決這個問題,對于某些存儲系統(tǒng),Flink提供的Sink函數(shù)支持精確一次輸出(在檢查點完成后才會把寫出的記錄正式提交)。另一種方法則是適用于大多數(shù)存儲系統(tǒng)的冪等更新。
生成策略
Flink中的Checkpoints是基于Chandy-Lamport分布式快照算法實現(xiàn)的,該算法不會暫停整個應用,而是會將生成Checkpoints的過程和處理過程分離,這樣在部分任務持久化狀態(tài)的過程中,其他任務還可以繼續(xù)執(zhí)行。
在介紹生成策略之前,首先需要介紹一下**Checkpoints barrier(屏障)**這一種特殊記錄。
barrir劃分Checkpoints如上圖,與水位線相同,Flink會在Source中間隔性地生成barrier,通過barrier把一條流上的數(shù)據(jù)劃分到不同的Checkpoints中,在barrier之前到來的數(shù)據(jù)導致的狀態(tài)更改,都會被包含在當前所屬的Checkpoints中;而基于barrier之后的數(shù)據(jù)導致的所有更改,就會被包含在之后的Checkpoints中。
擁有兩個有狀態(tài)的Source,兩個有狀態(tài)的任務,以及兩個無狀態(tài)Sink的流式應用- 假設當前有兩個Source任務,各自消費一個遞增的數(shù)字流(1、2、3、4……),讀取到的數(shù)據(jù)會分為奇數(shù)流和偶數(shù)流,求和算子的兩個任務會分別對它們進行求和,并將結(jié)果值更新至下游Sink。
- 此時JobManager向每一個Source任務發(fā)送一個新的Checkpoints編號,以此啟動Checkpoints生成流程。
- 在Source任務收到消息后,會暫停發(fā)出記錄,緊接著利用狀態(tài)后端生成本地狀態(tài)的Checkpoints,并把barrier連同編號廣播給所有傳出的數(shù)據(jù)流分區(qū)。
- 狀態(tài)后端在狀態(tài)存入Checkpoints后通知Source任務,并向JobManager發(fā)送確認消息。
- 在所有barrier發(fā)出后,Source將恢復正常工作。
- Source任務會廣播barrier至所有與之相連的任務,確保這些任務能從它們的每個輸入都收到一個barrier
- 在等待過程中,對于barrier未到達的分區(qū),數(shù)據(jù)會繼續(xù)正常處理。而barrier已經(jīng)到達的分區(qū),它們新到來的記錄會被緩沖起來,不能處理。這個等待所有barrier到來的過程被稱為barrier對齊
- 任務中收齊全部輸入分區(qū)發(fā)送的barrier后,就會通知狀態(tài)后端開始生成Checkpoints,同時繼續(xù)把Checkpoints barrier廣播轉(zhuǎn)發(fā)到下游相連的任務。
- 任務在發(fā)出所有的Checkpoints barrier后就會開始處理緩沖的記錄。等到所有緩沖記錄處理完后,任務就會繼續(xù)處理Source。
- Sink任務在收到分隔符后會依次進行barrier對齊,然后將自身狀態(tài)寫入Checkpoints,最終向JobManager發(fā)送確認信息。
- JobManager在接收到所有任務返回的Checkpoints確認信息后,就說明此次Checkpoints生成結(jié)束。
Savepoints(保存點)
- 由于Cheakpoints是周期性自動生成的,但有些時候我們需要手動的去進行鏡像保存功能,于是Flink同時還為我們提供了Savepoints來完成這個功能,Savepoints不僅可以做到故障恢復,還可以用于手動備份、版本遷移、暫停或重啟應用等。
- Savepoints是Checkpoints的一種特殊實現(xiàn),底層也是使用Checkpoint機制,因此Savepoints可以認為是具有一些額外元數(shù)據(jù)的Checkpoints。
- Savepoints的生成和清理都無法由Flink自動進行,因此都需要用戶自己來顯式觸發(fā)。
總結(jié)
以上是生活随笔為你收集整理的Flink 容错机制:Checkpoints、Savepoints的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink 状态管理:算子状态、键值分区
- 下一篇: Flink 状态一致性:端到端状态一致性