日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

flink checkpoint 恢复_Flink断点恢复机制

發布時間:2025/4/5 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink checkpoint 恢复_Flink断点恢复机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

作為流式計算,Flink通過checkpoint機制和kafka的可回溯性來保證作業在failover時不丟失狀態。

作為生產環境的flink,我們期待做到快速failover、彈性擴縮容和平滑遷移,盡量做到用戶無感知和變更方便,從而讓用戶將更多精力放在功能實現上。

本文先介紹checkpoint機制,接著介紹flink如何rescale,最后再介紹下代碼流程。

注:下文中統一以task表示算子的一個并發,比如并發度為3的map算子包含3個并行的task,下文中出現的subtask語義上等同于task。

Checkpoint機制

flink快照從源頭開始觸發(記錄消費的offset),通過barrier來標記本次快照,如圖1的offset。

barrier流過的地方都會將state保存到共享存儲中,如圖2中的sum。

當barrier流到Sink時,所有算子都完成快照,本次作業快照也就完成,如圖3(Sink算子無狀態)。

一旦作業失敗重啟,會將state都恢復到各個算子中,同時從記錄的offset開始消費,確保從上一次快照的地方恢復作業。我們可以看到,Task中的狀態為21(1+2+3+4+5+6),下一次累加的數據為7,保證了flink內部狀態的一致性。

以上為最簡單的情況,實際情況可能包含多個快照、多個算子、迭代等復雜場景。

當多個快照時,flink通過barrier將數據分段,每個barrier都標記著一個checkpointID,如下圖所示:

當一個task有多個輸入時,必須等待上游所有的barrier都到達后,才能做快照。如果一個上游的barrier已經達到,想要做到exactly-once,需要先把之后到達的數據緩存下來,等做完快照再處理。

具體的實現原理可以參考論文:State Management in Apache Flink

Rescale原理

當作業的并發度改變時,flink會重新分配狀態。這里采取的partition策略是固定總partition個數,當task并發改變時,重新計算并將partition分配到每個task上。除了這種partition策略,還存在根據partition大小自動合并和拆分的策略,比如Hbase所使用的。

在flink中,一個key group就是一個partition。之所以選擇以key group為基本單位來操作狀態,是為了減少磁盤訪問IO和隨機讀寫(如果以key為單位就會出現這種情況,比如恢復時每個task都需要讀取全部的state來決定每個key是否屬于自己)。

flink中有兩種狀態,包括operator state和key state。operator state是以task為單位的,一般采用list的形式存儲,當重新rescale時每個task可以選擇接受全部的operator state或者按照list平分。key state時以key為單位的,必須在keyby時候才能使用這種狀態。

下面以kafka offset作為operator state作為介紹。每個task都會以list的形式記錄自己負責的<partitionId, offset>,當做快照的時候,將狀態保存在共享存儲,所有task的list state會拼接成一個大的list。當重新rescale的時候,flink將list中的元素平分給每個task。(實際的flink kafka consumer是通過union方式獲取所有list,然后再選擇屬于自己的)

下面簡單介紹下key state的rescale。key group的個數等于作業的最大并發(一旦設置不可改變,即key group的個數必須大于等于task的并發度),每個key通過hash映射屬于其中一個key group。比如下圖,共有10個KG,KG-1包含1-11的key。

當作業rescale的時候,會將list形式的KG平分到每個task。

上圖中最下面給出了key->KG→task的映射過程:

  • 計算key的哈希值。
  • 根據哈希值和最大并發確定key所屬KG。
  • 根據key所屬KG來確定發到下游哪個并發的task。

對應KeyGroupRangeAssignment代碼如下:

public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}

上游的計算在KeyGroupStreamPartitioner類里,下游的計算在KeyGroupPartitioner類里。

代碼流程

這里的代碼以flink-1.10.0作為參考。

關鍵類

CheckpointCoordinator

這個是位于Master節點的快照控制中心,負責定期的觸發checkpoint和手動觸發savepoint,維護在做和已完成的快照。

StateAssignmentOperation

這個是位于Master節點的作業恢復時負責rescale的類,主要是根據新作業的并發重新分配狀態。針對operator state,主要采用broadcast的方式使得每個task都能接觸算子全部的狀態;針對key state,采用均分KG的方法來重新劃分state的歸屬。

TaskStateManagerImpl

具體Task的狀態管理中心,包括和JobMaster做checkpoint的交互,管理本地狀態。

StreamTaskStateInitializerImpl

具體task的狀態恢復,這里也是各個statebackend開始創建的地方。

RocksDBKeyedStateBackend

具體task通過rocksdb做key state的地方。使用這種backend,每個狀態是一個cf,主鍵的組織形式為<KG, key, namespace>。支持增量快照和全量快照。

HeapKeyedStateBackend

具體task通過rocksdb做key state的地方。使用這種backend,底層通過使用CopyOnWriteStateMap來存儲,主鍵的組織形式為<NS, K, SV>。相比rocksdb,內存的存取速度都非常快,但是狀態大小受制于內存。

如何制作快照

  • CheckpointCoordinator::triggerCheckpoint()。這個是checkpoint和savepoint共同的入口函數,checkpoint是通過定時調度來做的,savepoint則需要人工觸發。這里頭會做一些控制檢查,沒有問題的話就會向source task發送制作快照通知。
  • Execution::triggerCheckpointHelper()。通知source task對應的節點做快照。
  • TaskExecutor::triggerCheckpoint()。快照通知到TaskManager。
  • Task::triggerCheckpointBarrier()。進到具體task里。
  • StreamTask::performCheckpoint()。新版本采用了mailBox模型來解決持鎖競爭問題。這里會首先下發barrier,然后開始本地快照。
  • CheckpointingOperation::executeCheckpointing()。進行同步快照(checkpointStreamOperator方法)和異步快照(AsyncCheckpointRunnable類)。
  • StreamOperator::snapshotState() → AbstractStreamOperator::snapshotState()。同步快照的制作,主要保存KeyedStateRaw、OperatorStateRaw、OperatorStateManaged、KeyedStateManaged等狀態。
  • AsyncCheckpointRunnable::run()。異步快照的制作。
  • CheckpointResponder::acknowledgeCheckpoint()。快照做完后匯報給主節點。

具體的快照制作,取決于所選statebackend,這里不再詳述,可以參考RocksDBKeyedStateBackend和HeapKeyedStateBackend。

針對非source節點,需要上游的barrier對齊后才能觸發快照,這點跟source task略有不同,如下所示:

  • CheckpointedInputGate::pollNext()。從輸入里頭獲取barrier。
  • CheckpointBarrierAligner::processBarrier()。處理barrier,負責exactly-once快照的處理。另一個類似類CheckpointBarrierTracker則負責at-least-once快照的處理。
  • CheckpointBarrierAligner::notifyCheckpoint()。如果barrier都到齊了,那么開始制作快照。
  • StreamTask::triggerCheckpointOnBarrier()。進到task里,之后的流程就如上面所述了。

如何恢復快照

Master端的分配

主要的狀態分配邏輯都在類StateAssignmentOperation里。這里先明確幾個概念:

  • ExecutionJobVertex,表示一個邏輯上的執行節點,可能是好幾個operator通過chain連到一起的。
  • Execution,對應ExecutionJobVertex的一個并發執行,也是由好幾個operator通過chain連到一起的。
  • OperatorState,表示一個operator的所有并發的狀態。
  • OperatorID,一個operator的唯一標識。
  • KeyGroupRange,表示一個subtask所負責的KG范圍。
  • 狀態。包括ManagedOperatorStates、RawOperatorStates、ManagedKeyedState和RawKeyedState。
  • TaskStateSnapshot,一個Execution所有operator的狀態。

這里的處理邏輯是,對ExecutionJobVertex的所有operator做狀態分配,對operator的所有subtask做狀態分配。基本流程如下:

  • 檢查并發是否符合要求,主要是確保設置并發不要超過最大并發等。
  • 計算每個subtask負責的KeyGroupRange,下面根據這個標準來分配KG。
  • 重新分配operatorState,主要在reDistributePartitionableStates里實現,這里頭對unionState進行了合并,按照round-bin的方式來分配list的state。
  • 重新分配keyedState,主要在reDistributeKeyedStates里實現,這里頭會具體到subtask里,從之前的快照里找到所有屬于它的stateHandler。
  • 將分配好的狀態賦值給ExecutionJobVertices。這里會以Execution為基本單位,設置它的JobManagerTaskRestore(由多個operator的狀態組成)。

Task端的恢復

當狀態都分配好了之后,在Task端就可以進行狀態恢復了。大概流程如下:

  • TaskStateManagerImpl::prioritizedOperatorState() ,將對應operator的狀態(OperatorSubtaskState)拿出來,最后存到PrioritizedOperatorSubtaskState里。
  • StreamTaskStateInitializerImpl::streamOperatorStateContext,初始化keyedStatedBackend、operatorStateBackend、timeServiceManager等過程。
  • BackendRestorerProcedure::createAndRestore(),這個是在初始化keyedStatedBackend的時候調用的,將狀態保存到了keyedStatedBackend中。
  • RestoreOperation::restore(),針對歷史狀態進行恢復。這是個抽象函數,比如一個具體的實現是RocksDBFullRestoreOperation::restore()。
  • 之后根據具體的stateHandle進行恢復。

總結

以上是生活随笔為你收集整理的flink checkpoint 恢复_Flink断点恢复机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。