Flink的处理背压原理及问题-面试必备
點(diǎn)擊上方“朱小廝的博客”,選擇“設(shè)為星標(biāo)”
后臺(tái)回復(fù)"書",獲取
來(lái)源:r6a.cn/gtsJ
反壓機(jī)制(BackPressure)被廣泛應(yīng)用到實(shí)時(shí)流處理系統(tǒng)中,流處理系統(tǒng)需要能優(yōu)雅地處理反壓(backpressure)問題。反壓通常產(chǎn)生于這樣的場(chǎng)景:短時(shí)負(fù)載高峰導(dǎo)致系統(tǒng)接收數(shù)據(jù)的速率遠(yuǎn)高于它處理數(shù)據(jù)的速率。許多日常問題都會(huì)導(dǎo)致反壓,例如,垃圾回收停頓可能會(huì)導(dǎo)致流入的數(shù)據(jù)快速堆積,或者遇到大促或秒殺活動(dòng)導(dǎo)致流量陡增。反壓如果不能得到正確的處理,可能會(huì)導(dǎo)致資源耗盡甚至系統(tǒng)崩潰。反壓機(jī)制就是指系統(tǒng)能夠自己檢測(cè)到被阻塞的Operator,然后系統(tǒng)自適應(yīng)地降低源頭或者上游的發(fā)送速率。目前主流的流處理系統(tǒng) Apache Storm、JStorm、Spark Streaming、S4、Apache Flink、Twitter Heron都采用反壓機(jī)制解決這個(gè)問題,不過他們的實(shí)現(xiàn)各自不同。
不同的組件可以不同的速度執(zhí)行(并且每個(gè)組件中的處理速度隨時(shí)間改變)。例如,考慮一個(gè)工作流程,或由于數(shù)據(jù)傾斜或任務(wù)調(diào)度而導(dǎo)致數(shù)據(jù)被處理十分緩慢。在這種情況下,如果上游階段不減速,將導(dǎo)致緩沖區(qū)建立長(zhǎng)隊(duì)列,或?qū)е孪到y(tǒng)丟棄元組。如果元組在中途丟棄,那么效率可能會(huì)有損失,因?yàn)橐呀?jīng)為這些元組產(chǎn)生的計(jì)算被浪費(fèi)了。并且在一些流處理系統(tǒng)中比如Strom,會(huì)將這些丟失的元組重新發(fā)送,這樣會(huì)導(dǎo)致數(shù)據(jù)的一致性問題,并且還會(huì)導(dǎo)致某些Operator狀態(tài)疊加。進(jìn)而整個(gè)程序輸出結(jié)果不準(zhǔn)確。第二由于系統(tǒng)接收數(shù)據(jù)的速率是隨著時(shí)間改變的,短時(shí)負(fù)載高峰導(dǎo)致系統(tǒng)接收數(shù)據(jù)的速率遠(yuǎn)高于它處理數(shù)據(jù)的速率的情況,也會(huì)導(dǎo)致Tuple在中途丟失。所以實(shí)時(shí)流處理系統(tǒng)必須能夠解決發(fā)送速率遠(yuǎn)大于系統(tǒng)能處理速率這個(gè)問題,大多數(shù)實(shí)時(shí)流處理系統(tǒng)采用反壓(BackPressure)機(jī)制解決這個(gè)問題。下面我們就來(lái)介紹一下不同的實(shí)時(shí)流處理系統(tǒng)采用的反壓機(jī)制:
1.Strom 反壓機(jī)制
1.1 Storm 1.0 以前的反壓機(jī)制
對(duì)于開啟了acker機(jī)制的storm程序,可以通過設(shè)置conf.setMaxSpoutPending參數(shù)來(lái)實(shí)現(xiàn)反壓效果,如果下游組件(bolt)處理速度跟不上導(dǎo)致spout發(fā)送的tuple沒有及時(shí)確認(rèn)的數(shù)超過了參數(shù)設(shè)定的值,spout會(huì)停止發(fā)送數(shù)據(jù),這種方式的缺點(diǎn)是很難調(diào)優(yōu)conf.setMaxSpoutPending參數(shù)的設(shè)置以達(dá)到最好的反壓效果,設(shè)小了會(huì)導(dǎo)致吞吐上不去,設(shè)大了會(huì)導(dǎo)致worker OOM;有震蕩,數(shù)據(jù)流會(huì)處于一個(gè)顛簸狀態(tài),效果不如逐級(jí)反壓;另外對(duì)于關(guān)閉acker機(jī)制的程序無(wú)效;
1.2 Storm Automatic Backpressure
新的storm自動(dòng)反壓機(jī)制(Automatic Back Pressure)通過監(jiān)控bolt中的接收隊(duì)列的情況,當(dāng)超過高水位值時(shí)專門的線程會(huì)將反壓信息寫到 Zookeeper ,Zookeeper上的watch會(huì)通知該拓?fù)涞乃蠾orker都進(jìn)入反壓狀態(tài),最后Spout降低tuple發(fā)送的速度。
每個(gè)Executor都有一個(gè)接受隊(duì)列和發(fā)送隊(duì)列用來(lái)接收Tuple和發(fā)送Spout或者Bolt生成的Tuple元組。每個(gè)Worker進(jìn)程都有一個(gè)單的的接收線程監(jiān)聽接收端口。它從每個(gè)網(wǎng)絡(luò)上進(jìn)來(lái)的消息發(fā)送到Executor的接收隊(duì)列中。Executor接收隊(duì)列存放Worker或者Worker內(nèi)部其他Executor發(fā)過來(lái)的消息。Executor工作線程從接收隊(duì)列中拿出數(shù)據(jù),然后調(diào)用execute方法,發(fā)送Tuple到Executor的發(fā)送隊(duì)列。Executor的發(fā)送線程從發(fā)送隊(duì)列中獲取消息,按照消息目的地址選擇發(fā)送到Worker的傳輸隊(duì)列中或者其他Executor的接收隊(duì)列中。最后Worker的發(fā)送線程從傳輸隊(duì)列中讀取消息,然后將Tuple元組發(fā)送到網(wǎng)絡(luò)中。
1. 當(dāng)Worker進(jìn)程中的Executor線程發(fā)現(xiàn)自己的接收隊(duì)列滿了時(shí),也就是接收隊(duì)列達(dá)到high watermark的閾值后,因此它會(huì)發(fā)送通知消息到背壓線程。
2. 背壓線程將當(dāng)前worker進(jìn)程的信息注冊(cè)到Zookeeper的Znode節(jié)點(diǎn)中。具體路徑就是 /Backpressure/topo1/wk1下
3. Zookeepre的Znode Watcher監(jiān)視/Backpreesure/topo1下的節(jié)點(diǎn)目錄變化情況,如果發(fā)現(xiàn)目錄增加了znode節(jié)點(diǎn)說(shuō)明或者其他變化。這就說(shuō)明該Topo1需要反壓控制,然后它會(huì)通知Topo1所有的Worker進(jìn)入反壓狀態(tài)。
4.最終Spout降低tuple發(fā)送的速度。
2. JStorm 反壓機(jī)制
Jstorm做了兩級(jí)的反壓,第一級(jí)和Jstorm類似,通過執(zhí)行隊(duì)列來(lái)監(jiān)測(cè),但是不會(huì)通過ZK來(lái)協(xié)調(diào),而是通過Topology Master來(lái)協(xié)調(diào)。在隊(duì)列中會(huì)標(biāo)記high water mark和low water mark,當(dāng)執(zhí)行隊(duì)列超過high water mark時(shí),就認(rèn)為bolt來(lái)不及處理,則向TM發(fā)一條控制消息,上游開始減慢發(fā)送速率,直到下游低于low water mark時(shí)解除反壓。
此外,在Netty層也做了一級(jí)反壓,由于每個(gè)Worker Task都有自己的發(fā)送和接收的緩沖區(qū),可以對(duì)緩沖區(qū)設(shè)定限額、控制大小,如果spout數(shù)據(jù)量特別大,緩沖區(qū)填滿會(huì)導(dǎo)致下游bolt的接收緩沖區(qū)填滿,造成了反壓。
限流機(jī)制:jstorm的限流機(jī)制, 當(dāng)下游bolt發(fā)生阻塞時(shí), 并且阻塞task的比例超過某個(gè)比例時(shí)(現(xiàn)在默認(rèn)設(shè)置為0.1),觸發(fā)反壓
限流方式:計(jì)算阻塞Task的地方執(zhí)行線程執(zhí)行時(shí)間,Spout每發(fā)送一個(gè)tuple等待相應(yīng)時(shí)間,然后講這個(gè)時(shí)間發(fā)送給Spout, 于是, spout每發(fā)送一個(gè)tuple,就會(huì)等待這個(gè)執(zhí)行時(shí)間。
Task阻塞判斷方式:在jstorm 連續(xù)4次采樣周期中采樣,隊(duì)列情況,當(dāng)隊(duì)列超過80%(可以設(shè)置)時(shí),即可認(rèn)為該task處在阻塞狀態(tài)。
3. SparkStreaming 反壓機(jī)制
3.1 為什么引入反壓機(jī)制Backpressure
默認(rèn)情況下,Spark Streaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù),計(jì)算過程中會(huì)出現(xiàn)batch processing time > batch interval的情況,其中batch processing time 為實(shí)際計(jì)算一個(gè)批次花費(fèi)時(shí)間, batch interval為Streaming應(yīng)用設(shè)置的批處理間隔。這意味著Spark Streaming的數(shù)據(jù)接收速率高于Spark從隊(duì)列中移除數(shù)據(jù)的速率,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當(dāng)前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長(zhǎng)的時(shí)間,會(huì)造成數(shù)據(jù)在內(nèi)存中堆積,導(dǎo)致Receiver所在Executor內(nèi)存溢出等問題(如果設(shè)置StorageLevel包含disk, 則內(nèi)存存放不下的數(shù)據(jù)會(huì)溢寫至disk, 加大延遲)。Spark 1.5以前版本,用戶如果要限制Receiver的數(shù)據(jù)接收速率,可以通過設(shè)置靜態(tài)配制參數(shù)“spark.streaming.receiver.maxRate”的值來(lái)實(shí)現(xiàn),此舉雖然可以通過限制接收速率,來(lái)適配當(dāng)前的處理能力,防止內(nèi)存溢出,但也會(huì)引入其它問題。比如:producer數(shù)據(jù)生產(chǎn)高于maxRate,當(dāng)前集群處理能力也高于maxRate,這就會(huì)造成資源利用率下降等問題。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機(jī)制(back-pressure),通過動(dòng)態(tài)控制數(shù)據(jù)接收速率來(lái)適配集群數(shù)據(jù)處理能力。
3.2 反壓機(jī)制Backpressure
Spark Streaming Backpressure: 根據(jù)JobScheduler反饋?zhàn)鳂I(yè)的執(zhí)行信息來(lái)動(dòng)態(tài)調(diào)整Receiver數(shù)據(jù)接收率。通過屬性“spark.streaming.backpressure.enabled”來(lái)控制是否啟用backpressure機(jī)制,默認(rèn)值false,即不啟用。
SparkStreaming 架構(gòu)圖如下所示:
SparkStreaming 反壓過程執(zhí)行如下圖所示:
在原架構(gòu)的基礎(chǔ)上加上一個(gè)新的組件RateController,這個(gè)組件負(fù)責(zé)監(jiān)聽“OnBatchCompleted”事件,然后從中抽取processingDelay 及schedulingDelay信息. Estimator依據(jù)這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉(zhuǎn)發(fā)給BlockGenerator(繼承自RateLimiter).
4. Heron 反壓機(jī)制
當(dāng)下游處理速度跟不上上游發(fā)送速度時(shí),一旦StreamManager 發(fā)現(xiàn)一個(gè)或多個(gè)Heron Instance 速度變慢,立刻對(duì)本地spout進(jìn)行降級(jí),降低本地Spout發(fā)送速度, 停止從這些spout讀取數(shù)據(jù)。并且受影響的StreamManager 會(huì)發(fā)送一個(gè)特殊的start backpressure message 給其他的StreamManager ,要求他們對(duì)spout進(jìn)行本地降級(jí)。當(dāng)其他StreamManager 接收到這個(gè)特殊消息時(shí),他們通過不讀取當(dāng)?shù)豐pout中的Tuple來(lái)進(jìn)行降級(jí)。一旦出問題的Heron Instance 恢復(fù)速度后,本地的SM 會(huì)發(fā)送stop backpressure message 解除降級(jí)。
很多Socket Channel與應(yīng)用程序級(jí)別的Buffer相關(guān)聯(lián),該緩沖區(qū)由high watermark 和low watermark組成。當(dāng)緩沖區(qū)大小達(dá)到high watermark時(shí)觸發(fā)反壓,并保持有效,直到緩沖區(qū)大小低于low watermark。此設(shè)計(jì)的基本原理是防止拓?fù)湓谶M(jìn)入和退出背壓緩解模式之間快速振蕩。
5. Flink 反壓機(jī)制
Flink 沒有使用任何復(fù)雜的機(jī)制來(lái)解決反壓?jiǎn)栴},因?yàn)楦静恍枰菢拥姆桨?#xff01;它利用自身作為純數(shù)據(jù)流引擎的優(yōu)勢(shì)來(lái)優(yōu)雅地響應(yīng)反壓?jiǎn)栴}。下面我們會(huì)深入分析 Flink 是如何在 Task 之間傳輸數(shù)據(jù)的,以及數(shù)據(jù)流如何實(shí)現(xiàn)自然降速的。
Flink 在運(yùn)行時(shí)主要由operators和streams兩大組件構(gòu)成。每個(gè) operator 會(huì)消費(fèi)中間態(tài)的流,并在流上進(jìn)行轉(zhuǎn)換,然后生成新的流。對(duì)于 Flink 的網(wǎng)絡(luò)機(jī)制一種形象的類比是,Flink 使用了高效有界的分布式阻塞隊(duì)列,就像 Java 通用的阻塞隊(duì)列(BlockingQueue)一樣。還記得經(jīng)典的線程間通信案例:生產(chǎn)者消費(fèi)者模型嗎?使用 BlockingQueue 的話,一個(gè)較慢的接受者會(huì)降低發(fā)送者的發(fā)送速率,因?yàn)橐坏╆?duì)列滿了(有界隊(duì)列)發(fā)送者會(huì)被阻塞。Flink 解決反壓的方案就是這種感覺。
在 Flink 中,這些分布式阻塞隊(duì)列就是這些邏輯流,而隊(duì)列容量是通過緩沖池來(lái)(LocalBufferPool)實(shí)現(xiàn)的。每個(gè)被生產(chǎn)和被消費(fèi)的流都會(huì)被分配一個(gè)緩沖池。緩沖池管理著一組緩沖(Buffer),緩沖在被消費(fèi)后可以被回收循環(huán)利用。這很好理解:你從池子中拿走一個(gè)緩沖,填上數(shù)據(jù),在數(shù)據(jù)消費(fèi)完之后,又把緩沖還給池子,之后你可以再次使用它。
5.1 Flink 網(wǎng)絡(luò)傳輸中的內(nèi)存管理
如下圖所示展示了 Flink 在網(wǎng)絡(luò)傳輸場(chǎng)景下的內(nèi)存管理。網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)會(huì)寫到 Task 的 InputGate(IG) 中,經(jīng)過 Task 的處理后,再由 Task 寫到 ResultPartition(RS) 中。每個(gè) Task 都包括了輸入和輸入,輸入和輸出的數(shù)據(jù)存在 Buffer 中(都是字節(jié)數(shù)據(jù))。Buffer 是 MemorySegment 的包裝類。
TaskManager(TM)在啟動(dòng)時(shí),會(huì)先初始化NetworkEnvironment對(duì)象,TM 中所有與網(wǎng)絡(luò)相關(guān)的東西都由該類來(lái)管理(如 Netty 連接),其中就包括NetworkBufferPool。根據(jù)配置,Flink 會(huì)在 NetworkBufferPool 中生成一定數(shù)量(默認(rèn)2048個(gè))的內(nèi)存塊 MemorySegment(關(guān)于 Flink 的內(nèi)存管理,后續(xù)文章會(huì)詳細(xì)談到),內(nèi)存塊的總數(shù)量就代表了網(wǎng)絡(luò)傳輸中所有可用的內(nèi)存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共享的,每個(gè) TM 只會(huì)實(shí)例化一個(gè)。
Task 線程啟動(dòng)時(shí),會(huì)向 NetworkEnvironment 注冊(cè),NetworkEnvironment 會(huì)為 Task 的 InputGate(IG)和 ResultPartition(RP) 分別創(chuàng)建一個(gè) LocalBufferPool(緩沖池)并設(shè)置可申請(qǐng)的 MemorySegment(內(nèi)存塊)數(shù)量。IG 對(duì)應(yīng)的緩沖池初始的內(nèi)存塊數(shù)量與 IG 中 InputChannel 數(shù)量一致,RP 對(duì)應(yīng)的緩沖池初始的內(nèi)存塊數(shù)量與 RP 中的 ResultSubpartition 數(shù)量一致。不過,每當(dāng)創(chuàng)建或銷毀緩沖池時(shí),NetworkBufferPool 會(huì)計(jì)算剩余空閑的內(nèi)存塊數(shù)量,并平均分配給已創(chuàng)建的緩沖池。注意,這個(gè)過程只是指定了緩沖池所能使用的內(nèi)存塊數(shù)量,并沒有真正分配內(nèi)存塊,只有當(dāng)需要時(shí)才分配。為什么要?jiǎng)討B(tài)地為緩沖池?cái)U(kuò)容呢?因?yàn)閮?nèi)存越多,意味著系統(tǒng)可以更輕松地應(yīng)對(duì)瞬時(shí)壓力(如GC),不會(huì)頻繁地進(jìn)入反壓狀態(tài),所以我們要利用起那部分閑置的內(nèi)存塊。
在 Task 線程執(zhí)行過程中,當(dāng) Netty 接收端收到數(shù)據(jù)時(shí),為了將 Netty 中的數(shù)據(jù)拷貝到 Task 中,InputChannel(實(shí)際是 RemoteInputChannel)會(huì)向其對(duì)應(yīng)的緩沖池申請(qǐng)內(nèi)存塊(上圖中的①)。如果緩沖池中也沒有可用的內(nèi)存塊且已申請(qǐng)的數(shù)量還沒到池子上限,則會(huì)向 NetworkBufferPool 申請(qǐng)內(nèi)存塊(上圖中的②)并交給 InputChannel 填上數(shù)據(jù)(上圖中的③和④)。如果緩沖池已申請(qǐng)的數(shù)量達(dá)到上限了呢?或者 NetworkBufferPool 也沒有可用內(nèi)存塊了呢?這時(shí)候,Task 的 Netty Channel 會(huì)暫停讀取,上游的發(fā)送端會(huì)立即響應(yīng)停止發(fā)送,拓?fù)鋾?huì)進(jìn)入反壓狀態(tài)。當(dāng) Task 線程寫數(shù)據(jù)到 ResultPartition 時(shí),也會(huì)向緩沖池請(qǐng)求內(nèi)存塊,如果沒有可用內(nèi)存塊時(shí),會(huì)阻塞在請(qǐng)求內(nèi)存塊的地方,達(dá)到暫停寫入的目的。
當(dāng)一個(gè)內(nèi)存塊被消費(fèi)完成之后(在輸入端是指內(nèi)存塊中的字節(jié)被反序列化成對(duì)象了,在輸出端是指內(nèi)存塊中的字節(jié)寫入到 Netty Channel 了),會(huì)調(diào)用 Buffer.recycle() 方法,會(huì)將內(nèi)存塊還給 LocalBufferPool (上圖中的⑤)。如果LocalBufferPool中當(dāng)前申請(qǐng)的數(shù)量超過了池子容量(由于上文提到的動(dòng)態(tài)容量,由于新注冊(cè)的 Task 導(dǎo)致該池子容量變小),則LocalBufferPool會(huì)將該內(nèi)存塊回收給 NetworkBufferPool(上圖中的⑥)。如果沒超過池子容量,則會(huì)繼續(xù)留在池子中,減少反復(fù)申請(qǐng)的開銷。
5.2 Flink 反壓機(jī)制
下面這張圖簡(jiǎn)單展示了兩個(gè) Task 之間的數(shù)據(jù)傳輸以及 Flink 如何感知到反壓的:
記錄“A”進(jìn)入了 Flink 并且被 Task 1 處理。(這里省略了 Netty 接收、反序列化等過程)
記錄被序列化到 buffer 中。
該 buffer 被發(fā)送到 Task 2,然后 Task 2 從這個(gè) buffer 中讀出記錄。
不要忘了:記錄能被 Flink 處理的前提是,必須有空閑可用的 Buffer。
結(jié)合上面兩張圖看:Task 1 在輸出端有一個(gè)相關(guān)聯(lián)的 LocalBufferPool(稱緩沖池1),Task 2 在輸入端也有一個(gè)相關(guān)聯(lián)的 LocalBufferPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buffer 來(lái)序列化記錄 “A”,我們就序列化并發(fā)送該 buffer。
這里我們需要注意兩個(gè)場(chǎng)景:
本地傳輸:如果 Task 1 和 Task 2 運(yùn)行在同一個(gè) worker 節(jié)點(diǎn)(TaskManager),該 buffer 可以直接交給下一個(gè) Task。一旦 Task 2 消費(fèi)了該 buffer,則該 buffer 會(huì)被緩沖池1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就會(huì)趕不上 Task 1 取 buffer 的速度,導(dǎo)致緩沖池1無(wú)可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1 的降速。
遠(yuǎn)程傳輸:如果 Task 1 和 Task 2 運(yùn)行在不同的 worker 節(jié)點(diǎn)上,那么 buffer 會(huì)在發(fā)送到網(wǎng)絡(luò)(TCP Channel)后被回收。在接收端,會(huì)從 LocalBufferPool 中申請(qǐng) buffer,然后拷貝網(wǎng)絡(luò)中的數(shù)據(jù)到 buffer 中。如果沒有可用的 buffer,會(huì)停止從 TCP 連接中讀取數(shù)據(jù)。在輸出端,通過 Netty 的水位值機(jī)制來(lái)保證不往網(wǎng)絡(luò)中寫入太多數(shù)據(jù)(后面會(huì)說(shuō))。如果網(wǎng)絡(luò)中的數(shù)據(jù)(Netty輸出緩沖中的字節(jié)數(shù))超過了高水位值,我們會(huì)等到其降到低水位值以下才繼續(xù)寫入數(shù)據(jù)。這保證了網(wǎng)絡(luò)中不會(huì)有太多的數(shù)據(jù)。如果接收端停止消費(fèi)網(wǎng)絡(luò)中的數(shù)據(jù)(由于接收端緩沖池沒有可用 buffer),網(wǎng)絡(luò)中的緩沖數(shù)據(jù)就會(huì)堆積,那么發(fā)送端也會(huì)暫停發(fā)送。另外,這會(huì)使得發(fā)送端的緩沖池得不到回收,writer 阻塞在向 LocalBufferPool 請(qǐng)求 buffer,阻塞了 writer 往 ResultSubPartition 寫數(shù)據(jù)。
這種固定大小緩沖池就像阻塞隊(duì)列一樣,保證了 Flink 有一套健壯的反壓機(jī)制,使得 Task 生產(chǎn)數(shù)據(jù)的速度不會(huì)快于消費(fèi)的速度。我們上面描述的這個(gè)方案可以從兩個(gè) Task 之間的數(shù)據(jù)傳輸自然地?cái)U(kuò)展到更復(fù)雜的 pipeline 中,保證反壓機(jī)制可以擴(kuò)散到整個(gè) pipeline。
5.3 反壓實(shí)驗(yàn)
另外,官方博客中為了展示反壓的效果,給出了一個(gè)簡(jiǎn)單的實(shí)驗(yàn)。下面這張圖顯示了:隨著時(shí)間的改變,生產(chǎn)者(黃色線)和消費(fèi)者(綠色線)每5秒的平均吞吐與最大吞吐(在單一JVM中每秒達(dá)到8百萬(wàn)條記錄)的百分比。我們通過衡量task每5秒鐘處理的記錄數(shù)來(lái)衡量平均吞吐。該實(shí)驗(yàn)運(yùn)行在單 JVM 中,不過使用了完整的 Flink 功能棧。
首先,我們運(yùn)行生產(chǎn)task到它最大生產(chǎn)速度的60%(我們通過Thread.sleep()來(lái)模擬降速)。消費(fèi)者以同樣的速度處理數(shù)據(jù)。然后,我們將消費(fèi)task的速度降至其最高速度的30%。你就會(huì)看到背壓?jiǎn)栴}產(chǎn)生了,正如我們所見,生產(chǎn)者的速度也自然降至其最高速度的30%。接著,停止消費(fèi)task的人為降速,之后生產(chǎn)者和消費(fèi)者task都達(dá)到了其最大的吞吐。接下來(lái),我們?cè)俅螌⑾M(fèi)者的速度降至30%,pipeline給出了立即響應(yīng):生產(chǎn)者的速度也被自動(dòng)降至30%。最后,我們?cè)俅瓮V瓜匏?#xff0c;兩個(gè)task也再次恢復(fù)100%的速度。總而言之,我們可以看到:生產(chǎn)者和消費(fèi)者在 pipeline 中的處理都在跟隨彼此的吞吐而進(jìn)行適當(dāng)?shù)恼{(diào)整,這就是我們希望看到的反壓的效果。
5.4 Flink 反壓監(jiān)控
在 Storm/JStorm 中,只要監(jiān)控到隊(duì)列滿了,就可以記錄下拓?fù)溥M(jìn)入反壓了。但是 Flink 的反壓太過于天然了,導(dǎo)致我們無(wú)法簡(jiǎn)單地通過監(jiān)控隊(duì)列來(lái)監(jiān)控反壓狀態(tài)。Flink 在這里使用了一個(gè) trick 來(lái)實(shí)現(xiàn)對(duì)反壓的監(jiān)控。如果一個(gè) Task 因?yàn)榉磯憾邓倭?#xff0c;那么它會(huì)卡在向?LocalBufferPool?申請(qǐng)內(nèi)存塊上。那么這時(shí)候,該 Task 的 stack trace 就會(huì)長(zhǎng)下面這樣:
java.lang.Object.wait(Native Method) o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163) o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request [...]那么事情就簡(jiǎn)單了。通過不斷地采樣每個(gè) task 的 stack trace 就可以實(shí)現(xiàn)反壓監(jiān)控。
Flink 的實(shí)現(xiàn)中,只有當(dāng) Web 頁(yè)面切換到某個(gè) Job 的 Backpressure 頁(yè)面,才會(huì)對(duì)這個(gè) Job 觸發(fā)反壓檢測(cè),因?yàn)榉磯簷z測(cè)還是挺昂貴的。JobManager 會(huì)通過 Akka 給每個(gè) TaskManager 發(fā)送TriggerStackTraceSample消息。默認(rèn)情況下,TaskManager 會(huì)觸發(fā)100次 stack trace 采樣,每次間隔 50ms(也就是說(shuō)一次反壓檢測(cè)至少要等待5秒鐘)。并將這 100 次采樣的結(jié)果返回給 JobManager,由 JobManager 來(lái)計(jì)算反壓比率(反壓出現(xiàn)的次數(shù)/采樣的次數(shù)),最終展現(xiàn)在 UI 上。UI 刷新的默認(rèn)周期是一分鐘,目的是不對(duì) TaskManager 造成太大的負(fù)擔(dān)。
總結(jié)
Flink 不需要一種特殊的機(jī)制來(lái)處理反壓,因?yàn)?Flink 中的數(shù)據(jù)傳輸相當(dāng)于已經(jīng)提供了應(yīng)對(duì)反壓的機(jī)制。因此,Flink 所能獲得的最大吞吐量由其 pipeline 中最慢的組件決定。相對(duì)于 Storm/JStorm 的實(shí)現(xiàn),Flink 的實(shí)現(xiàn)更為簡(jiǎn)潔優(yōu)雅,源碼中也看不見與反壓相關(guān)的代碼,無(wú)需 Zookeeper/TopologyMaster 的參與也降低了系統(tǒng)的負(fù)載,也利于對(duì)反壓更迅速的響應(yīng)。
想知道更多?掃描下面的二維碼關(guān)注我后臺(tái)回復(fù)"技術(shù)",加入技術(shù)群【精彩推薦】超清晰的DNS入門指南
如何用ELK搭建TB級(jí)的日志系統(tǒng)
深度好文:Linux系統(tǒng)內(nèi)存知識(shí)
日志采集系統(tǒng)都用到哪些技術(shù)?
面試官:為什么HashMap的加載因子是0.75?
原創(chuàng)|OpenAPI標(biāo)準(zhǔn)規(guī)范
如此簡(jiǎn)單| ES最全詳細(xì)使用教程
ClickHouse到底是什么?為什么如此牛逼!
原來(lái)ElasticSearch還可以這么理解
點(diǎn)個(gè)贊+在看,少個(gè) bug?????
總結(jié)
以上是生活随笔為你收集整理的Flink的处理背压原理及问题-面试必备的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 睡前必读 | 如何系统性地学习分布式系统
- 下一篇: 面试官:GET和POST两种基本请求方法