日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink fault tolerance源码剖析(六)

發(fā)布時(shí)間:2023/12/20 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink fault tolerance源码剖析(六) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

上篇文章我們分析了基于檢查點(diǎn)的用戶狀態(tài)的保存機(jī)制——狀態(tài)終端。這篇文章我們來分析barrier(中文常譯為柵欄或者屏障,為了避免引入名稱爭議,此處仍用英文表示)。檢查點(diǎn)的barrier是提供exactly once一致性保證的主要保證機(jī)制。這篇文章我們會(huì)就此展開分析。

這篇文章我們側(cè)重于核心代碼分析,原理我們?cè)谶@個(gè)系列的第一篇文章《Flink數(shù)據(jù)流的Fault Tolerance機(jī)制》

一致性保證

Flink的一致性保證也依賴于檢查點(diǎn)機(jī)制。在利用檢查點(diǎn)進(jìn)行恢復(fù)時(shí),數(shù)據(jù)流會(huì)進(jìn)行重放(replay)。對(duì)于有狀態(tài)的operation以及function,Flink定義了檢查點(diǎn)支持的兩種模式(CheckpointingMode):

  • EXACTLY_ONCE
  • AT_LEAST_ONCE

在定義該枚舉時(shí),還對(duì)這兩個(gè)枚舉值進(jìn)行了詳細(xì)的解釋:

EXACTLY_ONCE

這種模式意味著系統(tǒng)將以如下語義對(duì)operator和udf(user defined function)進(jìn)行快照:在恢復(fù)時(shí),每條記錄將在operator狀態(tài)中只被重現(xiàn)/重放一次。

例如,如果有一個(gè)用戶在一個(gè)流中應(yīng)用統(tǒng)計(jì)元素個(gè)數(shù)的函數(shù),該統(tǒng)計(jì)結(jié)果將總是跟流中的元素的真實(shí)個(gè)數(shù)一致,不管是失敗還是恢復(fù)。

需要注意的是,這并不意味著每個(gè)數(shù)據(jù)流過streaming data flow僅僅一次。它表示的是在恢復(fù)進(jìn)行時(shí),operators/functions的狀態(tài)被恢復(fù)(通過檢查點(diǎn)關(guān)聯(lián)的狀態(tài)),使得被恢復(fù)的數(shù)據(jù)流在其狀態(tài)最后一次修改之后(最新的檢查點(diǎn))被恰好獲取一次。

并且,這里的EXACTLY_ONCE模式也并不保證Flink在跟外部系統(tǒng)交互時(shí)的行為也滿足EXACTLY_ONCE的一致性保證(Flink只保證自己的operator以及function的狀態(tài))。雖然,通常要求在兩個(gè)系統(tǒng)之間都達(dá)到一致性保證,但我們可以通過實(shí)現(xiàn)連接器來達(dá)到這樣的要求(比如Apache Kafka的offset可以實(shí)現(xiàn)這個(gè)需求)。

這種模式可以支撐高吞吐,取決于數(shù)據(jù)流圖以及操作,這種模式可能會(huì)增加記錄處理的延遲,因?yàn)閛perator需要對(duì)齊他們的輸入流,來保證創(chuàng)建一個(gè)一致的快照點(diǎn)。對(duì)于沒有進(jìn)行重新分區(qū)的簡單數(shù)據(jù)流,這些延遲的增加是可以忽略不計(jì)的,而對(duì)于進(jìn)行了重新分區(qū)的簡單數(shù)據(jù)流,延遲的平均值很小,但最慢的記錄通常有一個(gè)明顯的延遲。

AT_LEAST_ONCE

這個(gè)模式意味著系統(tǒng)將以一種更簡單地方式來對(duì)operator和udf的狀態(tài)進(jìn)行快照:在失敗后進(jìn)行恢復(fù)時(shí),在operator的狀態(tài)中,一些記錄可能會(huì)被重放多次。

例如,如果有一個(gè)用戶函數(shù)用來統(tǒng)計(jì)流中的元素個(gè)數(shù),在失敗后恢復(fù)時(shí),統(tǒng)計(jì)值將等于或者大于流中元素的真實(shí)值。

這種模式對(duì)延遲產(chǎn)生的影響很小,通常應(yīng)用于接收低延遲并且容忍重復(fù)消息的場景。

barrier定義

checkpoint barriers用來在流拓?fù)渲袑?duì)齊檢查點(diǎn)。

單個(gè)數(shù)據(jù)流視角,barrier示意:

分布式多input channel視角,barrier示意圖:

該圖演示的是多barrier aligning(對(duì)齊),但只有EXACTLY_ONCE一致性時(shí)才會(huì)要求這一點(diǎn)

JobManager將指示source發(fā)射barriers。當(dāng)某個(gè)operator從其輸入中接收到一個(gè)CheckpointBarrier,它將會(huì)意識(shí)到當(dāng)前正處于前一個(gè)檢查點(diǎn)和后一個(gè)檢查點(diǎn)之間。一旦某operator從它的所有input channel中接收到checkpoint barrier。那么它將意識(shí)到該檢查點(diǎn)已經(jīng)完成了。它可以觸發(fā)operator特殊的檢查點(diǎn)行為并將該barrier廣播給下游的operator。

checkpoint barrier的ID是嚴(yán)格單調(diào)增長的。

CheckpointBarrier在Flink中被看做一個(gè)運(yùn)行時(shí)事件(繼承自RuntimeEvent類)以區(qū)分普通的數(shù)據(jù)流數(shù)據(jù)(buffer),Flink中的運(yùn)行時(shí)事件必須支持序列化并且可以在TaskManager之間互相通信。CheckpointBarrier只有兩個(gè)屬性:id以及timestamp。

barrier處理器

CheckpointBarrierHandler定義了響應(yīng)來自input channel中的barrier的處理機(jī)制,它是提供一致性保證的核心。

Flink給出了兩個(gè)實(shí)現(xiàn),分別是:元素阻塞緩存機(jī)制以及barrier跟蹤機(jī)制

兩個(gè)關(guān)鍵接口方法:

  • getNextNonBlocked :返回operator可能消費(fèi)的下一個(gè)BufferOrEvent。這個(gè)調(diào)用會(huì)導(dǎo)致阻塞直到獲取到下一個(gè)BufferOrEvent,如果流已經(jīng)完成,那么就返回null。
  • registerCheckpointEventHandler : 注冊(cè)一個(gè)事件回調(diào),用來在檢查點(diǎn)成功完成時(shí)執(zhí)行。

BarrierBuffer

BarrierBuffer用于提供EXACTLY_ONCE一致性保證,其行為是:它將以barrier阻塞輸入直到所有的輸入都接收到基于某個(gè)檢查點(diǎn)的barrier,也就是上面所說的對(duì)齊。

為了避免背壓輸入流(這可能導(dǎo)致分布式的死鎖),BarrierBuffer將從被阻塞的channel中持續(xù)地接收buffer并在內(nèi)部存儲(chǔ)它們,直到阻塞被解除。

getNextNonBlocked

getNextNonBlocked方法用于獲取待operator處理的下一條(非阻塞)的記錄。該方法以多種機(jī)制阻塞當(dāng)前調(diào)用上下文,直到獲取到下一個(gè)非阻塞的記錄。

這里理解這個(gè)非阻塞非常重要,兩種類型的記錄是所謂的非阻塞的記錄,一種是來自于上流未被標(biāo)記為blocked channel輸出的數(shù)據(jù)記錄;另一種是,從已被阻塞了的緩沖區(qū)隊(duì)列中激活了的緩沖區(qū)中提取出的數(shù)據(jù)記錄。

這里以多種機(jī)制相結(jié)合來造成對(duì)當(dāng)前調(diào)用的阻塞,直到獲取到滿足上面提及的非阻塞的記錄,多種機(jī)制分別是:

  • while(true)重復(fù)調(diào)用
  • inputGate.getNextBufferOrEvent方法本身的阻塞調(diào)用
  • 以及遞歸調(diào)用當(dāng)前方法

還需要理解這里的返回值BufferOrEvent,因?yàn)閎arrier混入在數(shù)據(jù)流中,所以獲取到的數(shù)據(jù)可能是正常的數(shù)據(jù)流Buffer,也可能是某種特殊的Event,比如這里的barrier

分析一下getNextNonBlocked方法的實(shí)現(xiàn)

public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {while (true) {// process buffered BufferOrEvents before grabbing new ones//獲得下一個(gè)待緩存的buffer或者barrier事件BufferOrEvent next;//如果當(dāng)前的緩沖區(qū)為null,則從輸入端獲得if (currentBuffered == null) {next = inputGate.getNextBufferOrEvent();}//如果緩沖區(qū)不為空,則從緩沖區(qū)中獲得數(shù)據(jù)else {next = currentBuffered.getNext();//如果獲得的數(shù)據(jù)為null,則表示緩沖區(qū)中已經(jīng)沒有更多地?cái)?shù)據(jù)了if (next == null) {//清空當(dāng)前緩沖區(qū),獲取已經(jīng)新的緩沖區(qū)并打開它completeBufferedSequence();//遞歸調(diào)用,處理下一條數(shù)據(jù)return getNextNonBlocked();}}//獲取到一條記錄,不為nullif (next != null) {//如果獲取到得記錄所在的channel已經(jīng)處于阻塞狀態(tài),則該記錄會(huì)被加入緩沖區(qū)if (isBlocked(next.getChannelIndex())) {// if the channel is blocked we, we just store the BufferOrEventbufferSpiller.add(next);}//如果該記錄是一個(gè)正常的記錄,而不是一個(gè)barrier(事件),則直接返回else if (next.isBuffer()) {return next;}//如果是一個(gè)barrierelse if (next.getEvent().getClass() == CheckpointBarrier.class) {//并且當(dāng)前流還未處于結(jié)束狀態(tài),則處理該barrierif (!endOfStream) {// process barriers only if there is a chance of the checkpoint completingprocessBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());}}else {//如果它是一個(gè)事件,表示當(dāng)前已到達(dá)分區(qū)末尾if (next.getEvent().getClass() == EndOfPartitionEvent.class) {//以關(guān)閉的channel計(jì)數(shù)器加一numClosedChannels++;// no chance to complete this checkpoint//此時(shí)已經(jīng)沒有機(jī)會(huì)完成該檢查點(diǎn),則解除阻塞releaseBlocks();}//返回該事件return next;}}//next 為null 同時(shí)流結(jié)束標(biāo)識(shí)為falseelse if (!endOfStream) {// end of stream. we feed the data that is still buffered//置流結(jié)束標(biāo)識(shí)為trueendOfStream = true;//解除阻塞,這種情況下我們會(huì)看到,緩沖區(qū)的數(shù)據(jù)會(huì)被加入隊(duì)列,并等待處理releaseBlocks();//繼續(xù)獲取下一個(gè)待處理的記錄return getNextNonBlocked();}else {return null;}}}

processBarrier

該方法用于處理barrier,也是分析的重點(diǎn)。

//獲取接收到得barrier的ID //接收到的barrier數(shù)目 > 0 ,說明當(dāng)前正在處理某個(gè)檢查點(diǎn)的過程中 if numBarriersReceived > 0 //當(dāng)前檢查點(diǎn)的某個(gè)后續(xù)的barrierIdif barrierId == currentCheckpointId //處理barrieronBarrier(channelIndex);//barrierId > 當(dāng)前檢查點(diǎn)Idelse if barrierId > currentCheckpointId //當(dāng)前的檢查點(diǎn)已經(jīng)沒有機(jī)會(huì)完成了,則解除阻塞releaseBlocks(); //跳過當(dāng)前檢查點(diǎn),直接進(jìn)入該barrier對(duì)應(yīng)的檢查點(diǎn)currentCheckpointId = barrierId; //處理barrier onBarrier(channelIndex);else//忽略終止的檢查點(diǎn)的barrier,barrierId < currentCheckpointIdreturn; //接收到的barrier數(shù)目等于0且barrierId > currentCheckpointId else if (barrierId > currentCheckpointId) //說明這是一個(gè)新檢查點(diǎn)的初始barriercurrentCheckpointId = barrierId;onBarrier(channelIndex); //忽略之前(跳過的)檢查點(diǎn)的未處理的barrier else return;

另一段處理接收到所有barrier的邏輯:

//接收到barriers的數(shù)目 + 關(guān)閉的channel的數(shù)目 = 輸入channel的總數(shù)目 if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels)//觸發(fā)檢查點(diǎn)處理器回調(diào)事件checkpointHandler.onEvent(receivedBarrier);releaseBlocks(); //解除阻塞

onBarrier

將barrier關(guān)聯(lián)的channel標(biāo)識(shí)為阻塞狀態(tài)同時(shí)將barrier計(jì)數(shù)器加一。代碼:

private void onBarrier(int channelIndex) throws IOException {if (!blockedChannels[channelIndex]) {blockedChannels[channelIndex] = true;numBarriersReceived++;if (LOG.isDebugEnabled()) {LOG.debug("Received barrier from channel " + channelIndex);}}else {throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");}}

releaseBlocks

解除所有channel的阻塞,并確保剛剛寫入的數(shù)據(jù)(buffer)被消費(fèi)。

首先是重置狀態(tài)標(biāo)識(shí):

for (int i = 0; i < blockedChannels.length; i++) {////將所有channel的阻塞標(biāo)識(shí)置為falseblockedChannels[i] = false; } ////將接收到的barrier累加值重置為0 numBarriersReceived = 0;

接下來,

//如果當(dāng)前的緩沖區(qū)中的數(shù)據(jù)為空 if (currentBuffered == null) {// common case: no more buffered data//初始化新的緩沖區(qū)讀寫器currentBuffered = bufferSpiller.rollOver();//打開緩沖區(qū)讀寫器if (currentBuffered != null) {currentBuffered.open();} } else {// uncommon case: buffered data pending// push back the pending data, if we have any// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one//緩沖區(qū)中還有數(shù)據(jù),則初始化一塊新的存儲(chǔ)空間來存儲(chǔ)新的緩沖數(shù)據(jù)BufferSpiller.SpilledBufferOrEventSequence bufferedNow = bufferSpiller.rollOverWithNewBuffer();if (bufferedNow != null) {//打開新的緩沖區(qū)讀寫器bufferedNow.open();//將當(dāng)前沒有處理完的數(shù)據(jù)加入隊(duì)列中queuedBuffered.addFirst(currentBuffered);//將新開辟的緩沖區(qū)讀寫器置為新的當(dāng)前緩沖區(qū)currentBuffered = bufferedNow;} }

BarrierTracker

BarrierTracker會(huì)對(duì)各個(gè)input channel接收到的檢查點(diǎn)的barrier進(jìn)行跟蹤。一旦它觀察到某個(gè)檢查點(diǎn)的所有barrier都已經(jīng)到達(dá),它將會(huì)通知監(jiān)聽器檢查點(diǎn)已完成,以觸發(fā)相應(yīng)地回調(diào)處理。

不像BarrierBuffer,BarrierTracker不阻塞已經(jīng)發(fā)送了barrier的input channel,所以它不能提供exactly-once的一致性保證。但是它可以提供at least once的一致性保證。

這里不阻塞input channel,也就說明不采用對(duì)齊機(jī)制,因此本檢查點(diǎn)的數(shù)據(jù)會(huì)及時(shí)被處理,并且因此下一個(gè)檢查點(diǎn)的數(shù)據(jù)可能會(huì)在該檢查點(diǎn)還沒有完成時(shí)就已經(jīng)到來。所以,在恢復(fù)時(shí)只能提供AT_LEAST_ONCE保證。

getNextNonBlocked

還是來重點(diǎn)觀察getNextNonBlocked方法:

public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {while (true) {//從輸入中獲得數(shù)據(jù),該操作將導(dǎo)致阻塞,直到獲得一條記錄BufferOrEvent next = inputGate.getNextBufferOrEvent();//null表示沒有數(shù)據(jù)了if (next == null) {return null;}//這是跟BarrierBuffer的關(guān)鍵差別,只要它不是一個(gè)barrier,就直接返回//不管BufferOrEvent對(duì)應(yīng)的channel是否已處于阻塞狀態(tài),這里不存在緩存數(shù)據(jù)的做法,直接返回else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {return next;}else {//如果是barrier,則進(jìn)入barrier的處理邏輯processBarrier((CheckpointBarrier) next.getEvent());}}}

processBarrier

處理barrier依賴于一個(gè)內(nèi)部數(shù)據(jù)結(jié)構(gòu)CheckpointBarrierCount,該類用來對(duì)某個(gè)檢查點(diǎn)的barrier做統(tǒng)計(jì)。

private void processBarrier(CheckpointBarrier receivedBarrier) {// fast path for single channel trackers//首先判斷特殊情況:當(dāng)前operator是否只有一個(gè)input channel//如果是,那么就省略了統(tǒng)計(jì)的步驟,直接觸發(fā)barrier handler回調(diào)if (totalNumberOfInputChannels == 1) {if (checkpointHandler != null) {checkpointHandler.onEvent(receivedBarrier);}return;}// general path for multiple input channels//判斷通常狀態(tài):當(dāng)前operator存在多個(gè)input channelfinal long barrierId = receivedBarrier.getId();// find the checkpoint barrier in the queue of bending barriers//所有未完成的檢查點(diǎn)都存儲(chǔ)在一個(gè)隊(duì)列里,需要找到當(dāng)前barrier對(duì)應(yīng)的檢查點(diǎn)CheckpointBarrierCount cbc = null;int pos = 0; //對(duì)應(yīng)的檢查點(diǎn)在隊(duì)列中對(duì)應(yīng)的位置for (CheckpointBarrierCount next : pendingCheckpoints) {//如果找到則跳出循環(huán)if (next.checkpointId == barrierId) {cbc = next;break;}//沒找到位置加一pos++;}//最終找到了對(duì)應(yīng)的未完成的檢查點(diǎn)if (cbc != null) {// add one to the count to that barrier and check for completion//將barrier計(jì)數(shù)器加一int numBarriersNew = cbc.incrementBarrierCount();//如果barrier計(jì)數(shù)器等于input channel的總數(shù)if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)//移除pos之前的所有檢查點(diǎn)(檢查點(diǎn)在隊(duì)列中得先后順序跟檢查點(diǎn)的時(shí)序是一致的)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listener//觸發(fā)檢查點(diǎn)處理器事件if (checkpointHandler != null) {checkpointHandler.onEvent(receivedBarrier);}}}//如果沒有找到對(duì)應(yīng)的檢查點(diǎn),則說明該barrier有可能是新檢查點(diǎn)的第一個(gè)barrierelse {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anyways//如果是比當(dāng)前最新的檢查點(diǎn)編號(hào)還大,則說明是新檢查點(diǎn)if (barrierId > latestPendingCheckpointID) {latestPendingCheckpointID = barrierId;//添加進(jìn)隊(duì)列到末尾pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpoints//如果超出閾值,則移除最老的檢查點(diǎn)if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}

小結(jié)

本篇文章剖析了Flink在fault tolerance時(shí)采用checkpoint barrier來實(shí)現(xiàn)多種一致性保證機(jī)制的核心代碼進(jìn)行了分析。


微信掃碼關(guān)注公眾號(hào):Apache_Flink


QQ掃碼關(guān)注QQ群:Apache Flink學(xué)習(xí)交流群(123414680)

總結(jié)

以上是生活随笔為你收集整理的Apache Flink fault tolerance源码剖析(六)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。