flink checkpoint 恢复_Flink断点恢复机制
作為流式計(jì)算,Flink通過checkpoint機(jī)制和kafka的可回溯性來保證作業(yè)在failover時(shí)不丟失狀態(tài)。
作為生產(chǎn)環(huán)境的flink,我們期待做到快速failover、彈性擴(kuò)縮容和平滑遷移,盡量做到用戶無感知和變更方便,從而讓用戶將更多精力放在功能實(shí)現(xiàn)上。
本文先介紹checkpoint機(jī)制,接著介紹flink如何rescale,最后再介紹下代碼流程。
注:下文中統(tǒng)一以task表示算子的一個(gè)并發(fā),比如并發(fā)度為3的map算子包含3個(gè)并行的task,下文中出現(xiàn)的subtask語義上等同于task。
Checkpoint機(jī)制
flink快照從源頭開始觸發(fā)(記錄消費(fèi)的offset),通過barrier來標(biāo)記本次快照,如圖1的offset。
barrier流過的地方都會(huì)將state保存到共享存儲(chǔ)中,如圖2中的sum。
當(dāng)barrier流到Sink時(shí),所有算子都完成快照,本次作業(yè)快照也就完成,如圖3(Sink算子無狀態(tài))。
一旦作業(yè)失敗重啟,會(huì)將state都恢復(fù)到各個(gè)算子中,同時(shí)從記錄的offset開始消費(fèi),確保從上一次快照的地方恢復(fù)作業(yè)。我們可以看到,Task中的狀態(tài)為21(1+2+3+4+5+6),下一次累加的數(shù)據(jù)為7,保證了flink內(nèi)部狀態(tài)的一致性。
以上為最簡(jiǎn)單的情況,實(shí)際情況可能包含多個(gè)快照、多個(gè)算子、迭代等復(fù)雜場(chǎng)景。
當(dāng)多個(gè)快照時(shí),flink通過barrier將數(shù)據(jù)分段,每個(gè)barrier都標(biāo)記著一個(gè)checkpointID,如下圖所示:
當(dāng)一個(gè)task有多個(gè)輸入時(shí),必須等待上游所有的barrier都到達(dá)后,才能做快照。如果一個(gè)上游的barrier已經(jīng)達(dá)到,想要做到exactly-once,需要先把之后到達(dá)的數(shù)據(jù)緩存下來,等做完快照再處理。
具體的實(shí)現(xiàn)原理可以參考論文:State Management in Apache Flink
Rescale原理
當(dāng)作業(yè)的并發(fā)度改變時(shí),flink會(huì)重新分配狀態(tài)。這里采取的partition策略是固定總partition個(gè)數(shù),當(dāng)task并發(fā)改變時(shí),重新計(jì)算并將partition分配到每個(gè)task上。除了這種partition策略,還存在根據(jù)partition大小自動(dòng)合并和拆分的策略,比如Hbase所使用的。
在flink中,一個(gè)key group就是一個(gè)partition。之所以選擇以key group為基本單位來操作狀態(tài),是為了減少磁盤訪問IO和隨機(jī)讀寫(如果以key為單位就會(huì)出現(xiàn)這種情況,比如恢復(fù)時(shí)每個(gè)task都需要讀取全部的state來決定每個(gè)key是否屬于自己)。
flink中有兩種狀態(tài),包括operator state和key state。operator state是以task為單位的,一般采用list的形式存儲(chǔ),當(dāng)重新rescale時(shí)每個(gè)task可以選擇接受全部的operator state或者按照list平分。key state時(shí)以key為單位的,必須在keyby時(shí)候才能使用這種狀態(tài)。
下面以kafka offset作為operator state作為介紹。每個(gè)task都會(huì)以list的形式記錄自己負(fù)責(zé)的<partitionId, offset>,當(dāng)做快照的時(shí)候,將狀態(tài)保存在共享存儲(chǔ),所有task的list state會(huì)拼接成一個(gè)大的list。當(dāng)重新rescale的時(shí)候,flink將list中的元素平分給每個(gè)task。(實(shí)際的flink kafka consumer是通過union方式獲取所有l(wèi)ist,然后再選擇屬于自己的)
下面簡(jiǎn)單介紹下key state的rescale。key group的個(gè)數(shù)等于作業(yè)的最大并發(fā)(一旦設(shè)置不可改變,即key group的個(gè)數(shù)必須大于等于task的并發(fā)度),每個(gè)key通過hash映射屬于其中一個(gè)key group。比如下圖,共有10個(gè)KG,KG-1包含1-11的key。
當(dāng)作業(yè)rescale的時(shí)候,會(huì)將list形式的KG平分到每個(gè)task。
上圖中最下面給出了key->KG→task的映射過程:
- 計(jì)算key的哈希值。
- 根據(jù)哈希值和最大并發(fā)確定key所屬KG。
- 根據(jù)key所屬KG來確定發(fā)到下游哪個(gè)并發(fā)的task。
對(duì)應(yīng)KeyGroupRangeAssignment代碼如下:
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}上游的計(jì)算在KeyGroupStreamPartitioner類里,下游的計(jì)算在KeyGroupPartitioner類里。
代碼流程
這里的代碼以flink-1.10.0作為參考。
關(guān)鍵類
CheckpointCoordinator
這個(gè)是位于Master節(jié)點(diǎn)的快照控制中心,負(fù)責(zé)定期的觸發(fā)checkpoint和手動(dòng)觸發(fā)savepoint,維護(hù)在做和已完成的快照。
StateAssignmentOperation
這個(gè)是位于Master節(jié)點(diǎn)的作業(yè)恢復(fù)時(shí)負(fù)責(zé)rescale的類,主要是根據(jù)新作業(yè)的并發(fā)重新分配狀態(tài)。針對(duì)operator state,主要采用broadcast的方式使得每個(gè)task都能接觸算子全部的狀態(tài);針對(duì)key state,采用均分KG的方法來重新劃分state的歸屬。
TaskStateManagerImpl
具體Task的狀態(tài)管理中心,包括和JobMaster做checkpoint的交互,管理本地狀態(tài)。
StreamTaskStateInitializerImpl
具體task的狀態(tài)恢復(fù),這里也是各個(gè)statebackend開始創(chuàng)建的地方。
RocksDBKeyedStateBackend
具體task通過rocksdb做key state的地方。使用這種backend,每個(gè)狀態(tài)是一個(gè)cf,主鍵的組織形式為<KG, key, namespace>。支持增量快照和全量快照。
HeapKeyedStateBackend
具體task通過rocksdb做key state的地方。使用這種backend,底層通過使用CopyOnWriteStateMap來存儲(chǔ),主鍵的組織形式為<NS, K, SV>。相比rocksdb,內(nèi)存的存取速度都非常快,但是狀態(tài)大小受制于內(nèi)存。
如何制作快照
- CheckpointCoordinator::triggerCheckpoint()。這個(gè)是checkpoint和savepoint共同的入口函數(shù),checkpoint是通過定時(shí)調(diào)度來做的,savepoint則需要人工觸發(fā)。這里頭會(huì)做一些控制檢查,沒有問題的話就會(huì)向source task發(fā)送制作快照通知。
- Execution::triggerCheckpointHelper()。通知source task對(duì)應(yīng)的節(jié)點(diǎn)做快照。
- TaskExecutor::triggerCheckpoint()。快照通知到TaskManager。
- Task::triggerCheckpointBarrier()。進(jìn)到具體task里。
- StreamTask::performCheckpoint()。新版本采用了mailBox模型來解決持鎖競(jìng)爭(zhēng)問題。這里會(huì)首先下發(fā)barrier,然后開始本地快照。
- CheckpointingOperation::executeCheckpointing()。進(jìn)行同步快照(checkpointStreamOperator方法)和異步快照(AsyncCheckpointRunnable類)。
- StreamOperator::snapshotState() → AbstractStreamOperator::snapshotState()。同步快照的制作,主要保存KeyedStateRaw、OperatorStateRaw、OperatorStateManaged、KeyedStateManaged等狀態(tài)。
- AsyncCheckpointRunnable::run()。異步快照的制作。
- CheckpointResponder::acknowledgeCheckpoint()。快照做完后匯報(bào)給主節(jié)點(diǎn)。
具體的快照制作,取決于所選statebackend,這里不再詳述,可以參考RocksDBKeyedStateBackend和HeapKeyedStateBackend。
針對(duì)非source節(jié)點(diǎn),需要上游的barrier對(duì)齊后才能觸發(fā)快照,這點(diǎn)跟source task略有不同,如下所示:
- CheckpointedInputGate::pollNext()。從輸入里頭獲取barrier。
- CheckpointBarrierAligner::processBarrier()。處理barrier,負(fù)責(zé)exactly-once快照的處理。另一個(gè)類似類CheckpointBarrierTracker則負(fù)責(zé)at-least-once快照的處理。
- CheckpointBarrierAligner::notifyCheckpoint()。如果barrier都到齊了,那么開始制作快照。
- StreamTask::triggerCheckpointOnBarrier()。進(jìn)到task里,之后的流程就如上面所述了。
如何恢復(fù)快照
Master端的分配
主要的狀態(tài)分配邏輯都在類StateAssignmentOperation里。這里先明確幾個(gè)概念:
- ExecutionJobVertex,表示一個(gè)邏輯上的執(zhí)行節(jié)點(diǎn),可能是好幾個(gè)operator通過chain連到一起的。
- Execution,對(duì)應(yīng)ExecutionJobVertex的一個(gè)并發(fā)執(zhí)行,也是由好幾個(gè)operator通過chain連到一起的。
- OperatorState,表示一個(gè)operator的所有并發(fā)的狀態(tài)。
- OperatorID,一個(gè)operator的唯一標(biāo)識(shí)。
- KeyGroupRange,表示一個(gè)subtask所負(fù)責(zé)的KG范圍。
- 狀態(tài)。包括ManagedOperatorStates、RawOperatorStates、ManagedKeyedState和RawKeyedState。
- TaskStateSnapshot,一個(gè)Execution所有operator的狀態(tài)。
這里的處理邏輯是,對(duì)ExecutionJobVertex的所有operator做狀態(tài)分配,對(duì)operator的所有subtask做狀態(tài)分配。基本流程如下:
- 檢查并發(fā)是否符合要求,主要是確保設(shè)置并發(fā)不要超過最大并發(fā)等。
- 計(jì)算每個(gè)subtask負(fù)責(zé)的KeyGroupRange,下面根據(jù)這個(gè)標(biāo)準(zhǔn)來分配KG。
- 重新分配operatorState,主要在reDistributePartitionableStates里實(shí)現(xiàn),這里頭對(duì)unionState進(jìn)行了合并,按照round-bin的方式來分配list的state。
- 重新分配keyedState,主要在reDistributeKeyedStates里實(shí)現(xiàn),這里頭會(huì)具體到subtask里,從之前的快照里找到所有屬于它的stateHandler。
- 將分配好的狀態(tài)賦值給ExecutionJobVertices。這里會(huì)以Execution為基本單位,設(shè)置它的JobManagerTaskRestore(由多個(gè)operator的狀態(tài)組成)。
Task端的恢復(fù)
當(dāng)狀態(tài)都分配好了之后,在Task端就可以進(jìn)行狀態(tài)恢復(fù)了。大概流程如下:
- TaskStateManagerImpl::prioritizedOperatorState() ,將對(duì)應(yīng)operator的狀態(tài)(OperatorSubtaskState)拿出來,最后存到PrioritizedOperatorSubtaskState里。
- StreamTaskStateInitializerImpl::streamOperatorStateContext,初始化keyedStatedBackend、operatorStateBackend、timeServiceManager等過程。
- BackendRestorerProcedure::createAndRestore(),這個(gè)是在初始化keyedStatedBackend的時(shí)候調(diào)用的,將狀態(tài)保存到了keyedStatedBackend中。
- RestoreOperation::restore(),針對(duì)歷史狀態(tài)進(jìn)行恢復(fù)。這是個(gè)抽象函數(shù),比如一個(gè)具體的實(shí)現(xiàn)是RocksDBFullRestoreOperation::restore()。
- 之后根據(jù)具體的stateHandle進(jìn)行恢復(fù)。
總結(jié)
以上是生活随笔為你收集整理的flink checkpoint 恢复_Flink断点恢复机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 启动资金什么意思
- 下一篇: bootstrap 空行不显示横杠_电脑