日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink面试题

發(fā)布時(shí)間:2024/1/8 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink面试题 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一 基礎(chǔ)篇

Flink的執(zhí)行圖有哪幾種?分別有什么作用

Flink中的執(zhí)行圖一般是可以分為四類,按照生成順序分別為:StreamGraph-> JobGraph-> ExecutionGraph->物理執(zhí)行圖。

1)StreamGraph

顧名思義,這里代表的是我們編寫的流程序圖。通過Stream API生成,這是執(zhí)行圖的最原始拓?fù)鋽?shù)據(jù)結(jié)構(gòu)。

2)JobGraph

StreamGraph在Client中經(jīng)過算子chain鏈合并等優(yōu)化,轉(zhuǎn)換為JobGraph拓?fù)鋱D,隨后被提交到JobManager中。

3)ExecutionGraph

JobManager中將JobGraph進(jìn)一步轉(zhuǎn)換為ExecutionGraph,此時(shí)ExecutuonGraph根據(jù)算子配置的并行度轉(zhuǎn)變?yōu)椴⑿谢腉raph拓?fù)浣Y(jié)構(gòu)。

4)物理執(zhí)行圖

比較偏物理執(zhí)行概念,即JobManager進(jìn)行Job調(diào)度,TaskManager最終部署Task的圖結(jié)構(gòu)。


Flink的窗口機(jī)制

在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個消息就處理一次,但是有時(shí)我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點(diǎn)擊了我們的網(wǎng)頁。在這種情況下,我們必須定義一個窗口,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算。

窗口可以是時(shí)間驅(qū)動的(Time Window,例如:每30秒鐘),也可以是數(shù)據(jù)驅(qū)動的(Count Window,例如:每一百個元素)。一種經(jīng)典的窗口分類可以分成:翻滾窗口(Tumbling Window,無重疊),滾動窗口(Sliding Window,有重疊),和會話窗口(Session Window,活動間隙)。

我們舉個具體的場景來形象地理解不同窗口的概念。假設(shè),淘寶網(wǎng)會記錄每個用戶每次購買的商品個數(shù),我們要做的是統(tǒng)計(jì)不同窗口中用戶購買商品的總數(shù)。下圖給出了幾種經(jīng)典的窗口切分概述圖:

上圖中,raw data stream 代表用戶的購買行為流,圈中的數(shù)字代表該用戶本次購買的商品個數(shù),事件是按時(shí)間分布的,所以可以看出事件之間是有time gap的。Flink 提供了上圖中所有的窗口類型,下面我們會逐一進(jìn)行介紹。

Time Window

就如名字所說的,Time Window 是根據(jù)時(shí)間對數(shù)據(jù)流進(jìn)行分組的。這里我們涉及到了流處理中的時(shí)間問題,時(shí)間問題和消息亂序問題是緊密關(guān)聯(lián)的,這是流處理中現(xiàn)存的難題之一,我們將在后續(xù)的 EventTime 和消息亂序處理 中對這部分問題進(jìn)行深入探討。這里我們只需要知道 Flink 提出了三種時(shí)間的概念,分別是event time(事件時(shí)間:事件發(fā)生時(shí)的時(shí)間),ingestion time(攝取時(shí)間:事件進(jìn)入流處理系統(tǒng)的時(shí)間),processing time(處理時(shí)間:消息被計(jì)算處理的時(shí)間)。Flink 中窗口機(jī)制和時(shí)間類型是完全解耦的,也就是說當(dāng)需要改變時(shí)間類型時(shí)不需要更改窗口邏輯相關(guān)的代碼。

  • Tumbling Time Window
    如上圖,我們需要統(tǒng)計(jì)每一分鐘中用戶購買的商品的總數(shù),需要將用戶的行為事件按每一分鐘進(jìn)行切分,這種切分被成為翻滾時(shí)間窗口(Tumbling Time Window)。翻滾窗口能將數(shù)據(jù)流切分成不重疊的窗口,每一個事件只能屬于一個窗口。通過使用 DataStream API,我們可以這樣實(shí)現(xiàn):

// Stream of (userId, buyCnt)val buyCnts: DataStream[(Int, Int)] = ...val tumblingCnts: DataStream[(Int, Int)] = buyCnts// key stream by userId.keyBy(0) // tumbling time window of 1 minute length.timeWindow(Time.minutes(1))// compute sum over buyCnt.sum(1)
  • Sliding Time Window
    但是對于某些應(yīng)用,它們需要的窗口是不間斷的,需要平滑地進(jìn)行窗口聚合。比如,我們可以每30秒計(jì)算一次最近一分鐘用戶購買的商品總數(shù)。這種窗口我們稱為滑動時(shí)間窗口(Sliding Time Window)。在滑窗中,一個元素可以對應(yīng)多個窗口。通過使用 DataStream API,我們可以這樣實(shí)現(xiàn):

val slidingCnts: DataStream[(Int, Int)] = buyCnts.keyBy(0) // sliding time window of 1 minute length and 30 secs trigger interval.timeWindow(Time.minutes(1), Time.seconds(30)).sum(1)

Count Window

Count Window 是根據(jù)元素個數(shù)對數(shù)據(jù)流進(jìn)行分組的。

  • Tumbling Count Window
    當(dāng)我們想要每100個用戶購買行為事件統(tǒng)計(jì)購買總數(shù),那么每當(dāng)窗口中填滿100個元素了,就會對窗口進(jìn)行計(jì)算,這種窗口我們稱之為翻滾計(jì)數(shù)窗口(Tumbling Count Window),上圖所示窗口大小為3個。通過使用 DataStream API,我們可以這樣實(shí)現(xiàn):

// Stream of (userId, buyCnts)val buyCnts: DataStream[(Int, Int)] = ...val tumblingCnts: DataStream[(Int, Int)] = buyCnts// key stream by sensorId.keyBy(0)// tumbling count window of 100 elements size.countWindow(100)// compute the buyCnt sum .sum(1)
  • Sliding Count Window
    當(dāng)然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是類似的,例如計(jì)算每10個元素計(jì)算一次最近100個元素的總和,代碼示例如下。

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts.keyBy(0)// sliding count window of 100 elements size and 10 elements trigger interval.countWindow(100, 10).sum(1)

Session Window

在這種用戶交互事件流中,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續(xù)活躍的周期),由非活躍的間隙分隔開。如上圖所示,就是需要計(jì)算每個用戶在活躍期間總共購買的商品數(shù)量,如果用戶30秒沒有活動則視為會話斷開(假設(shè)raw data stream是單個用戶的購買行為流)。Session Window 的示例代碼如下:

// Stream of (userId, buyCnts)val buyCnts: DataStream[(Int, Int)] = ...val sessionCnts: DataStream[(Int, Int)] = vehicleCnts.keyBy(0)// session window based on a 30 seconds session gap interval .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).sum(1)

一般而言,window 是在無限的流上定義了一個有限的元素集合。這個集合可以是基于時(shí)間的,元素個數(shù)的,時(shí)間和個數(shù)結(jié)合的,會話間隙的,或者是自定義的。Flink 的 DataStream API 提供了簡潔的算子來滿足常用的窗口操作,同時(shí)提供了通用的窗口機(jī)制來允許用戶自己定義窗口分配邏輯。下面我們會對 Flink 窗口相關(guān)的 API 進(jìn)行剖析。

Flink中的時(shí)間概念

Flink在流處理程序支持不同的時(shí)間概念。分別為Event Time/Processing Time/Ingestion Time,也就是事件時(shí)間、處理時(shí)間、提取時(shí)間。

從時(shí)間序列角度來說,發(fā)生的先后順序是:

事件時(shí)間(Event Time)----> 提取時(shí)間(Ingestion Time)----> 處理時(shí)間(Processing Time)

復(fù)制

Event Time 是事件在現(xiàn)實(shí)世界中發(fā)生的時(shí)間,它通常由事件中的時(shí)間戳描述。

Ingestion Time 是數(shù)據(jù)進(jìn)入Apache Flink流處理系統(tǒng)的時(shí)間,也就是Flink讀取數(shù)據(jù)源時(shí)間。

Processing Time 是數(shù)據(jù)流入到具體某個算子 (消息被計(jì)算處理) 時(shí)候相應(yīng)的系統(tǒng)時(shí)間。也就是Flink程序處理該事件時(shí)當(dāng)前系統(tǒng)時(shí)間。

但是我們講解時(shí),會從后往前講解,把最重要的Event Time放在最后。

處理時(shí)間

是數(shù)據(jù)流入到具體某個算子時(shí)候相應(yīng)的系統(tǒng)時(shí)間。

這個系統(tǒng)時(shí)間指的是執(zhí)行相應(yīng)操作的機(jī)器的系統(tǒng)時(shí)間。當(dāng)一個流程序通過處理時(shí)間來運(yùn)行時(shí),所有基于時(shí)間的操作(如: 時(shí)間窗口)將使用各自操作所在的物理機(jī)的系統(tǒng)時(shí)間。

ProcessingTime 有最好的性能和最低的延遲。但在分布式計(jì)算環(huán)境或者異步環(huán)境中,ProcessingTime具有不確定性,相同數(shù)據(jù)流多次運(yùn)行有可能產(chǎn)生不同的計(jì)算結(jié)果。因?yàn)樗菀资艿綇挠涗浀竭_(dá)系統(tǒng)的速度(例如從消息隊(duì)列)到記錄在系統(tǒng)內(nèi)的operator之間流動的速度的影響(停電,調(diào)度或其他)。

提取時(shí)間

IngestionTime是數(shù)據(jù)進(jìn)入Apache Flink框架的時(shí)間,是在Source Operator中設(shè)置的。每個記錄將源的當(dāng)前時(shí)間作為時(shí)間戳,并且后續(xù)基于時(shí)間的操作(如時(shí)間窗口)引用該時(shí)間戳。

提取時(shí)間在概念上位于事件時(shí)間和處理時(shí)間之間。與處理時(shí)間相比,它稍早一些。IngestionTime與ProcessingTime相比可以提供更可預(yù)測的結(jié)果,因?yàn)镮ngestionTime的時(shí)間戳比較穩(wěn)定(在源處只記錄一次),所以同一數(shù)據(jù)在流經(jīng)不同窗口操作時(shí)將使用相同的時(shí)間戳,而對于ProcessingTime同一數(shù)據(jù)在流經(jīng)不同窗口算子會有不同的處理時(shí)間戳。

與事件時(shí)間相比,提取時(shí)間程序無法處理任何無序事件或后期數(shù)據(jù),但程序不必指定如何生成水位線。

在內(nèi)部,提取時(shí)間與事件時(shí)間非常相似,但具有自動時(shí)間戳分配和自動水位線生成功能。

事件時(shí)間

事件時(shí)間就是事件在真實(shí)世界的發(fā)生時(shí)間,即每個事件在產(chǎn)生它的設(shè)備上發(fā)生的時(shí)間(當(dāng)?shù)貢r(shí)間)。比如一個點(diǎn)擊事件的時(shí)間發(fā)生時(shí)間,是用戶點(diǎn)擊操作所在的手機(jī)或電腦的時(shí)間。

在進(jìn)入Apache Flink框架之前EventTime通常要嵌入到記錄中,并且EventTime也可以從記錄中提取出來。在實(shí)際的網(wǎng)上購物訂單等業(yè)務(wù)場景中,大多會使用EventTime來進(jìn)行數(shù)據(jù)計(jì)算。

Flink的watermark

Watermark是Apache Flink為了處理EventTime 窗口計(jì)算提出的一種機(jī)制,本質(zhì)上也是一種時(shí)間戳。watermark是用于處理亂序事件或延遲數(shù)據(jù)的,這通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)(Watermarks用來觸發(fā)window窗口計(jì)算)。

比如對于late element,我們不能無限期的等下去,必須要有個機(jī)制來保證一個特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了。這個特別的機(jī)制,就是watermark。 可以把Watermark看作是一種告訴Flink一個消息延遲多少的方式。定義了什么時(shí)候不再等待更早的數(shù)據(jù)。

1. 窗口觸發(fā)條件

上面談到了對數(shù)據(jù)亂序問題的處理機(jī)制是watermark+window,那么window什么時(shí)候該被觸發(fā)呢?

基于Event Time的事件處理,Flink默認(rèn)的事件觸發(fā)條件為:

對于out-of-order及正常的數(shù)據(jù)而言

watermark的時(shí)間戳 > = window endTime

在 [window_start_time,window_end_time] 中有數(shù)據(jù)存在。

對于late element太多的數(shù)據(jù)而言

Event Time > watermark的時(shí)間戳

WaterMark相當(dāng)于一個EndLine,一旦Watermarks大于了某個window的end_time,就意味著windows_end_time時(shí)間和WaterMark時(shí)間相同的窗口開始計(jì)算執(zhí)行了。

就是說,我們根據(jù)一定規(guī)則,計(jì)算出Watermarks,并且設(shè)置一些延遲,給遲到的數(shù)據(jù)一些機(jī)會,也就是說正常來講,對于遲到的數(shù)據(jù),我只等你一段時(shí)間,再不來就沒有機(jī)會了。

WaterMark時(shí)間可以用Flink系統(tǒng)現(xiàn)實(shí)時(shí)間,也可以用處理數(shù)據(jù)所攜帶的Event time。

使用Flink系統(tǒng)現(xiàn)實(shí)時(shí)間,在并行和多線程中需要注意的問題較少,因?yàn)槎际且袁F(xiàn)實(shí)時(shí)間為標(biāo)準(zhǔn)。

如果使用處理數(shù)據(jù)所攜帶的Event time作為WaterMark時(shí)間,需要注意兩點(diǎn):

因?yàn)閿?shù)據(jù)到達(dá)并不是循序的,注意保存一個當(dāng)前最大時(shí)間戳作為WaterMark時(shí)間

并行同步問題

2. WaterMark設(shè)定方法

標(biāo)點(diǎn)水位線(Punctuated Watermark)

標(biāo)點(diǎn)水位線(Punctuated Watermark)通過數(shù)據(jù)流中某些特殊標(biāo)記事件來觸發(fā)新水位線的生成。這種方式下窗口的觸發(fā)與時(shí)間無關(guān),而是決定于何時(shí)收到標(biāo)記事件。

在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實(shí)時(shí)性要求非常高的場景才會選擇Punctuated的方式進(jìn)行Watermark的生成。

定期水位線(Periodic Watermark)

周期性的(允許一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個Watermark。水位線提升的時(shí)間間隔是由用戶設(shè)置的,在兩次水位線提升時(shí)隔內(nèi)會有一部分消息流入,用戶可以根據(jù)這部分?jǐn)?shù)據(jù)來計(jì)算出新的水位線。

在實(shí)際的生產(chǎn)中Periodic的方式必須結(jié)合時(shí)間和積累條數(shù)兩個維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會有很大的延時(shí)。

舉個例子,最簡單的水位線算法就是取目前為止最大的事件時(shí)間,然而這種方式比較暴力,對亂序事件的容忍程度比較低,容易出現(xiàn)大量遲到事件。

3. 遲到事件

雖說水位線表明著早于它的事件不應(yīng)該再出現(xiàn),但是上如上文所講,接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實(shí)際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計(jì),導(dǎo)致窗口在它們到達(dá)之前已經(jīng)關(guān)閉。

遲到事件出現(xiàn)時(shí)窗口已經(jīng)關(guān)閉并產(chǎn)出了計(jì)算結(jié)果,因此處理的方法有3種:

重新激活已經(jīng)關(guān)閉的窗口并重新計(jì)算以修正結(jié)果。

將遲到事件收集起來另外處理。

將遲到事件視為錯誤消息并丟棄。

Flink 默認(rèn)的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。

Side Output機(jī)制可以將遲到事件單獨(dú)放入一個數(shù)據(jù)流分支,這會作為 window 計(jì)算結(jié)果的副產(chǎn)品,以便用戶獲取并對其進(jìn)行特殊處理。

Allowed Lateness機(jī)制允許用戶設(shè)置一個允許的最大遲到時(shí)長。Flink 會在窗口關(guān)閉后一直保存窗口的狀態(tài)直至超過允許遲到時(shí)長,這期間的遲到事件不會被丟棄,而是默認(rèn)會觸發(fā)窗口重新計(jì)算。因?yàn)楸4娲翱跔顟B(tài)需要額外內(nèi)存,并且如果窗口計(jì)算使用了 ProcessWindowFunction API 還可能使得每個遲到事件觸發(fā)一次窗口的全量計(jì)算,代價(jià)比較大,所以允許遲到時(shí)長不宜設(shè)得太長,遲到事件也不宜過多,否則應(yīng)該考慮降低水位線提高的速度或者調(diào)整算法。

這里總結(jié)機(jī)制為:

窗口window 的作用是為了周期性的獲取數(shù)據(jù)。

watermark的作用是防止數(shù)據(jù)出現(xiàn)亂序(經(jīng)常),事件時(shí)間內(nèi)獲取不到指定的全部數(shù)據(jù),而做的一種保險(xiǎn)方法。

allowLateNess是將窗口關(guān)閉時(shí)間再延遲一段時(shí)間。

sideOutPut是最后兜底操作,所有過期延遲數(shù)據(jù),指定窗口已經(jīng)徹底關(guān)閉了,就會把數(shù)據(jù)放到側(cè)輸出流。

4.例子

假如我們設(shè)置10s的時(shí)間窗口(window),那么0~10s,10~20s都是一個窗口,以0~10s為例,0為start-time,10為end-time。假如有4個數(shù)據(jù)的event-time分別是8(A),12.5(B),9(C),13.5(D),我們設(shè)置Watermarks為當(dāng)前所有到達(dá)數(shù)據(jù)event-time的最大值減去延遲值3.5秒

當(dāng)A到達(dá)的時(shí)候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會觸發(fā)計(jì)算

當(dāng)B到達(dá)的時(shí)候,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會觸發(fā)計(jì)算

當(dāng)C到達(dá)的時(shí)候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會觸發(fā)計(jì)算

當(dāng)D到達(dá)的時(shí)候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發(fā)計(jì)算

觸發(fā)計(jì)算的時(shí)候,會將A,C(因?yàn)樗麄兌夹∮?0)都計(jì)算進(jìn)去,其中C是遲到的。 max這個很關(guān)鍵,就是當(dāng)前窗口內(nèi),所有事件的最大事件。 這里的延遲3.5s是我們假設(shè)一個數(shù)據(jù)到達(dá)的時(shí)候,比他早3.5s的數(shù)據(jù)肯定也都到達(dá)了,這個是需要根據(jù)經(jīng)驗(yàn)推算。假設(shè)加入D到達(dá)以后有到達(dá)了一個E,event-time=6,但是由于0~10的時(shí)間窗口已經(jīng)開始計(jì)算了,所以E就丟了。 從這里上面E的丟失說明,水位線也不是萬能的,但是如果根據(jù)我們自己的生產(chǎn)經(jīng)驗(yàn)+側(cè)道輸出等方案,可以做到數(shù)據(jù)不丟失。

Flink分布式快照原理是什么

可靠性是分布式系統(tǒng)實(shí)現(xiàn)必須考慮的因素之一。Flink基于Chandy-Lamport分布式快照算法實(shí)現(xiàn)了一套可靠的Checkpoint機(jī)制,可以保證集群中某些節(jié)點(diǎn)出現(xiàn)故障時(shí),能夠?qū)⒄麄€作業(yè)恢復(fù)到故障之前某個狀態(tài)。同時(shí),Checkpoint機(jī)制也是Flink實(shí)現(xiàn)Exactly-Once語義的基礎(chǔ)。

本文將介紹Flink的Checkpoint機(jī)制的原理,并從源碼層面了解Checkpoint機(jī)制是如何實(shí)現(xiàn)的(基于Flink 1.10)。

1. 為什么需要Checkpoint

Flink是有狀態(tài)的流計(jì)算處理引擎,每個算子Operator可能都需要記錄自己的運(yùn)行數(shù)據(jù),并在接收到新流入的元素后不斷更新自己的狀態(tài)數(shù)據(jù)。當(dāng)分布式系統(tǒng)引入狀態(tài)計(jì)算后,為了保證計(jì)算結(jié)果的正確性(特別是對于流處理系統(tǒng),不可能每次系統(tǒng)故障后都從頭開始計(jì)算),就必然要求系統(tǒng)具有容錯性。對于Flink來說,Flink作業(yè)運(yùn)行在多個節(jié)點(diǎn)上,當(dāng)出現(xiàn)節(jié)點(diǎn)宕機(jī)、網(wǎng)絡(luò)故障等問題,需要一個機(jī)制保證節(jié)點(diǎn)保存在本地的狀態(tài)不丟失。流處理中Exactly-Once語義的實(shí)現(xiàn)也要求作業(yè)從失敗恢復(fù)后的狀態(tài)要和失敗前的狀態(tài)一致。

那么怎么保證分布式環(huán)境下各節(jié)點(diǎn)狀態(tài)的容錯呢?通常這是通過定期對作業(yè)狀態(tài)和數(shù)據(jù)流進(jìn)行快照實(shí)現(xiàn)的,常見的檢查點(diǎn)算法有比如Sync-and-Stop(SNS)算法、Chandy-Lamport(CL)算法。

Flink的Checkpoint機(jī)制是基于Chandy-Lamport算法的思想改進(jìn)而來,引入了Checkpoint Barrier的概念,可以在不停止整個流處理系統(tǒng)的前提下,讓每個節(jié)點(diǎn)獨(dú)立建立檢查點(diǎn)保存自身快照,并最終達(dá)到整個作業(yè)全局快照的狀態(tài)。有了全局快照,當(dāng)我們遇到故障或者重啟的時(shí)候就可以直接從快照中恢復(fù),這就是Flink容錯的核心。

2. Checkpoint執(zhí)行流程

Barrier是Flink分布式快照的核心概念之一,稱之為屏障或者數(shù)據(jù)柵欄(可以理解為快照的分界線)。Barrier是一種特殊的內(nèi)部消息,在進(jìn)行Checkpoint的時(shí)候Flink會在數(shù)據(jù)流源頭處周期性地注入Barrier,這些Barrier會作為數(shù)據(jù)流的一部分,一起流向下游節(jié)點(diǎn)并且不影響正常的數(shù)據(jù)流。Barrier的作用是將無界數(shù)據(jù)流從時(shí)間上切分成多個窗口,每個窗口對應(yīng)一系列連續(xù)的快照中的一個,每個Barrier都帶有一個快照ID,一個Barrier生成之后,在這之前的數(shù)據(jù)都進(jìn)入此快照,在這之后的數(shù)據(jù)則進(jìn)入下一個快照。

如上圖,Barrier-n跟隨著數(shù)據(jù)流一起流動,當(dāng)算子從輸入流接收到Barrier-n后,就會停止接收數(shù)據(jù)并對當(dāng)前自身的狀態(tài)做一次快照,快照完成后再將Barrier-n以廣播的形式傳給下游節(jié)點(diǎn)。一旦作業(yè)的Sink算子接收到Barrier n后,會向JobMnager發(fā)送一個消息,確認(rèn)Barrier-n對應(yīng)的快照完成。當(dāng)作業(yè)中的所有Sink算子都確認(rèn)后,意味一次全局快照也就完成。

當(dāng)一個算子有多個上游節(jié)點(diǎn)時(shí),會接收到多個Barrier,這時(shí)候需要進(jìn)行Barrier Align對齊操作。

如上圖,一個算子有兩個輸入流,當(dāng)算子從一個上游數(shù)據(jù)流接收到一個Barrier-n后,它不會立即向下游廣播,而是先暫停對該數(shù)據(jù)流的處理,將到達(dá)的數(shù)據(jù)先緩存在Input Buffer中(因?yàn)檫@些數(shù)據(jù)屬于下一次快照而不是當(dāng)前快照,緩存數(shù)據(jù)可以不阻塞該數(shù)據(jù)流),直到從另外一個數(shù)據(jù)流中接收到Barrier-n,才會進(jìn)行快照處理并將Barrier-n向下游發(fā)送。從這個流程可以看出,如果開啟Barrier對齊后,算子由于需要等待所有輸入節(jié)點(diǎn)的Barrier到來出現(xiàn)暫停,對整體的性能也會有一定的影響。

綜上,Flink Checkpoint機(jī)制的核心思想實(shí)質(zhì)上是通過Barrier來標(biāo)記觸發(fā)快照的時(shí)間點(diǎn)和對應(yīng)需要進(jìn)行快照的數(shù)據(jù)集,將數(shù)據(jù)流處理和快照操作解耦開來,從而最大程度降低快照對系統(tǒng)性能的影響。

Flink的一致性和Checkpoint機(jī)制有緊密的關(guān)系:

當(dāng)不開啟Checkpoint時(shí),節(jié)點(diǎn)發(fā)生故障時(shí)可能會導(dǎo)致數(shù)據(jù)丟失,這就是At-Most-Once

當(dāng)開啟Checkpoint但不進(jìn)行Barrier對齊時(shí),對于有多個輸入流的節(jié)點(diǎn)如果發(fā)生故障,會導(dǎo)致有一部分?jǐn)?shù)據(jù)可能會被處理多次,這就是At-Least-Once

當(dāng)開啟Checkpoint并進(jìn)行Barrier對齊時(shí),可以保證每條數(shù)據(jù)在故障恢復(fù)時(shí)只會被重放一次,這就是Exactly-Once

3. Checkpoint相關(guān)配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

默認(rèn)情況下,Checkpoint機(jī)制是關(guān)閉的,需要通過enableCheckpointing(interval)來開啟,并指定每interval毫秒進(jìn)行一次Checkpoint。

Checkpoint模式支持Exactly-Once和At-Least-Once,可以通過setCheckpointingMode來設(shè)置。

如果兩次Checkpoint的時(shí)間很短,會導(dǎo)致整個系統(tǒng)大部分資源都用于執(zhí)行Checkpoint,影響正常作業(yè)的執(zhí)行。可以通過setMinPauseBetweenCheckpoints來設(shè)置兩次Checkpoint之間的最小間隔。

setCheckpointTimeout可以給Checkpoint設(shè)置一個超時(shí)時(shí)間,當(dāng)一次Checkpoint超過一定時(shí)間沒有完成,直接終止掉。

默認(rèn)情況下,當(dāng)一個Checkpoint還在執(zhí)行時(shí),不會觸發(fā)另一個Checkpoint,通過setMaxConcurrentCheckpoints可以設(shè)置最大并發(fā)Checkpoint數(shù)量。

enableExternalizedCheckpoints可以設(shè)置當(dāng)用戶取消了作業(yè)后,是否保留遠(yuǎn)程存儲上的Checkpoint數(shù)據(jù),一般設(shè)置為RETAIN_ON_CANCELLATION。

保存多個Checkpoint

默認(rèn)情況下,如果設(shè)置了Checkpoint選項(xiàng),則Flink只保留最近成功生成的1個Checkpoint,而當(dāng)Flink程序失敗時(shí),可以從最近的這個Checkpoint來進(jìn)行恢復(fù)。但是,如果我們希望保留多個Checkpoint,并能夠根據(jù)實(shí)際需要選擇其中一個進(jìn)行恢復(fù),這樣會更加靈活,比如,我們發(fā)現(xiàn)最近4個小時(shí)數(shù)據(jù)記錄處理有問題,希望將整個狀態(tài)還原到4小時(shí)之前。

Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數(shù):

state.checkpoints.num-retained: 20

保留了最近的20個Checkpoint。如果希望會退到某個Checkpoint點(diǎn),只需要指定對應(yīng)的某個Checkpoint路徑即可實(shí)現(xiàn)。

從Checkpoint進(jìn)行恢復(fù)

從指定的checkpoint處啟動,最近的一個/flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584啟動,通常需要先停掉當(dāng)前運(yùn)行的flink-session,然后通過命令啟動:

../bin/flink run -p 10 -s /flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584/_metadata -c com.code2144.helper_wink-1.0-SNAPSHOT.jar

可以把命令放到腳本里面,每次直接執(zhí)行checkpoint恢復(fù)腳本即可:

保存點(diǎn)機(jī)制 (Savepoints)

保存點(diǎn)機(jī)制 (Savepoints)是檢查點(diǎn)機(jī)制的一種特殊的實(shí)現(xiàn),它允許通過手工的方式來觸發(fā) Checkpoint,并將結(jié)果持久化存儲到指定路徑中,主要用于避免 Flink 集群在重啟或升級時(shí)導(dǎo)致狀態(tài)丟失。示例如下:

# 觸發(fā)指定id的作業(yè)的Savepoint,并將結(jié)果存儲到指定目錄下

bin/flink savepoint :jobId [:targetDirectory]

手動savepoint

/app/local/flink-1.6.2/bin/flink savepoint 0409251eaff826ef2dd775b6a2d5e219 [hdfs://bigdata/path]

成功觸發(fā)savepoint通常會提示:Savepoint completed. Path: hdfs://path...:

手動取消任務(wù)

與checkpoint異常停止或者手動Kill掉不一樣,對于savepoint通常是我們想要手動停止任務(wù),然后更新代碼,可以使用flink cancel ...命令:

/app/local/flink-1.6.2/bin/flink cancel 0409251eaff826ef2dd775b6a2d5e219

從指定savepoint啟動job

bin/flink run -p 8 -s hdfs:///flink/savepoints/savepoint-567452-9e3587e55980 -c com.code2144.helper_workflow.HelperWorkFlowStreaming jars/BSS-ONSS-Flink-1.0-SNAPSHOT.jar

Flink的內(nèi)存管理是如何做的

在介紹內(nèi)存管理之前,先介紹一下JVM中的堆內(nèi)存和堆外內(nèi)存。

通常來說。JVM堆空間概念,簡單描述就是在程序中,關(guān)于對象實(shí)例|數(shù)組的創(chuàng)建、使用和釋放的內(nèi)存,都會在JVM中的一塊被稱作為"JVM堆"內(nèi)存區(qū)域內(nèi)進(jìn)行管理分配。

Flink程序在創(chuàng)建對象后,JVM會在堆內(nèi)內(nèi)存中分配一定大小的空間,創(chuàng)建Class對象并返回對象引用,Flink保存對象引用,同時(shí)記錄占用的內(nèi)存信息。

而堆外內(nèi)存如果你有過Java相關(guān)編程經(jīng)歷的話,相信對堆外內(nèi)存的使用并不陌生。其底層調(diào)用基于C的JDK Unsafe類方法,通過指針直接進(jìn)行內(nèi)存的操作,包括內(nèi)存空間的申請、使用、刪除釋放等。

介紹完了堆內(nèi)內(nèi)存和堆外內(nèi)存的概念,下面我們來看下Flink的內(nèi)存管理。

1)JobManager內(nèi)存管理

JobManager進(jìn)程總內(nèi)存包括JVM堆內(nèi)內(nèi)存、JVM堆外內(nèi)存以及JVM MetaData內(nèi)存,其中涉及的內(nèi)存配置參數(shù)為:

# JobManager總進(jìn)程內(nèi)存 jobmanager.memory.process.size:# 作業(yè)管理器的 JVM 堆內(nèi)存大小 jobmanager.memory.heap.size:#作業(yè)管理器的堆外內(nèi)存大小。此選項(xiàng)涵蓋所有堆外內(nèi)存使用。 jobmanager.memory.off-heap.size: 復(fù)制代碼

2)TaskManager內(nèi)存管理

TaskManager內(nèi)存同樣包含JVM堆內(nèi)內(nèi)存、JVM堆外內(nèi)存以及JVM MetaData內(nèi)存三大塊。其中JVM堆內(nèi)內(nèi)存又包含F(xiàn)ramework Heap和Task Heap,即框架堆內(nèi)存和任務(wù)Task堆內(nèi)存。

JVM堆外內(nèi)存包含Memory memory托管內(nèi)存,主要用于保存排序、結(jié)果緩存、狀態(tài)后端數(shù)據(jù)等。另一塊為Direct Memory直接內(nèi)存,包含如下:

  • Framework Off-Heap Memory:Flink框架的堆外內(nèi)存,即Flink中TaskManager的自身內(nèi)存,和slot無關(guān)。

  • Task Off-Heap:Task的堆外內(nèi)存

  • Network Memory:網(wǎng)絡(luò)內(nèi)存

其中涉及的內(nèi)存配置參數(shù)為:

// tm的框架堆內(nèi)內(nèi)存 taskmanager.memory.framework.heap.size=// tm的任務(wù)堆內(nèi)內(nèi)存 taskmanager.memory.task.heap.size// Flink管理的原生托管內(nèi)存 taskmanager.memory.managed.size= taskmanager.memory.managed.fraction=// Flink 框架堆外內(nèi)存 taskmanager.memory.framework.off-heap.size=// Task 堆外內(nèi)存 taskmanager.memory.task.off-heap.size=// 網(wǎng)絡(luò)數(shù)據(jù)交換所使用的堆外內(nèi)存大小 taskmanager.memory.network.min: 64mb taskmanager.memory.network.max: 1gb taskmanager.memory.network.fraction: 0.1復(fù)制代碼

Flink/Spark/Hive SQL的執(zhí)行原理

這里我把三個組件SQL執(zhí)行原理放到了一起,通過對比加深一下印象。

1)Hive SQL的執(zhí)行原理

Hive SQL是Hive提供的SQL查詢引擎,底層由MapReduce實(shí)現(xiàn)。Hive根據(jù)輸入的SQL語句執(zhí)行詞法分析、語法樹構(gòu)建、編譯、邏輯計(jì)劃、優(yōu)化邏輯計(jì)劃以及物理計(jì)劃等過程,轉(zhuǎn)化為Map Task和Reduce Task最終交由Mapreduce引擎執(zhí)行。

  • 執(zhí)行引擎。具有mapreduce的一切特性,適合大批量數(shù)據(jù)離線處理,相較于Spark而言,速度較慢且IO操作頻繁

  • 有完整的hql語法,支持基本sql語法、函數(shù)和udf

  • 對表數(shù)據(jù)存儲格式有要求,不同存儲、壓縮格式性能不同

2)Spark SQL的執(zhí)行原理

Spark SQL底層基于Spark引擎,使用Antlr解析語法,編譯生成邏輯計(jì)劃和物理計(jì)劃,過程和Hive SQL執(zhí)行過程類似,只不過Spark SQL產(chǎn)生的物理計(jì)劃為Spark程序。

  • 輸入編寫的Spark SQL

  • SqlParser分析器。進(jìn)行語法檢查、詞義分析,生成未綁定的Logical Plan邏輯計(jì)劃(未綁定查詢數(shù)據(jù)的元數(shù)據(jù)信息,比如查詢什么文件,查詢那些列等)

  • Analyzer解析器。查詢元數(shù)據(jù)信息并綁定,生成完整的邏輯計(jì)劃。此時(shí)可以知道具體的數(shù)據(jù)位置和對象,Logical Plan 形如from table -> filter column -> select 形式的樹結(jié)構(gòu)

  • Optimizer優(yōu)化器。選擇最好的一個Logical Plan,并優(yōu)化其中的不合理的地方。常見的例如謂詞下推、剪枝、合并等優(yōu)化操作

  • Planner使用Planing Strategies將邏輯計(jì)劃轉(zhuǎn)化為物理計(jì)劃,并根據(jù)最佳策略選擇出的物理計(jì)劃作為最終的執(zhí)行計(jì)劃

  • 調(diào)用Spark Plan Execution執(zhí)行引擎執(zhí)行Spark RDD任務(wù)

3)Flink SQL的執(zhí)行原理

一條SQL從提交到Calcite解析,優(yōu)化,到最后的Flink執(zhí)行,一般分以下過程:

1. Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;

2. Sql Validator: 結(jié)合數(shù)字字典(catalog)去驗(yàn)證sql語法;

3. 生成Logical Plan: 將sqlNode表示的AST轉(zhuǎn)換成LogicalPlan, 用relNode表示;

4. 生成 optimized LogicalPlan: 先基于calcite rules 去優(yōu)化logical Plan,基于flink定制的一些優(yōu)化rules去優(yōu)化logical Plan;

5. 生成Flink PhysicalPlan: 這里也是基于flink里頭的rules將,將optimized LogicalPlan轉(zhuǎn)成成Flink的物理執(zhí)行計(jì)劃;

6. 將物理執(zhí)行計(jì)劃轉(zhuǎn)成Flink ExecutionPlan: 就是調(diào)用相應(yīng)的tanslateToPlan方法轉(zhuǎn)換和利用CodeGen元編程成Flink的各種算子。

Table API 來提交任務(wù)的話,基本流程和運(yùn)行SQL類似,稍微不同的是:table api parser: flink會把table api表達(dá)的計(jì)算邏輯也表示成一顆樹,用treeNode去表式; 在這棵樹上的每個節(jié)點(diǎn)的計(jì)算邏輯用Expression來表示。

簡單說一下SQL優(yōu)化:RBO(基于規(guī)則)

RBO主要是開發(fā)人員在使用SQL的過程中,有些發(fā)現(xiàn)有些通用的規(guī)則,可以顯著提高SQL執(zhí)行的效率,比如最經(jīng)典的filter下推:

將Filter下推到Join之前執(zhí)行,這樣做的好處是減少了Join的數(shù)量,同時(shí)降低了CPU,內(nèi)存,網(wǎng)絡(luò)等方面的開銷,提高效率。

SQL優(yōu)化的發(fā)展,則可以分為兩個階段,即RBO(基于規(guī)則),和CBO(基于代價(jià))

RBO和CBO的區(qū)別大概在于: RBO只為應(yīng)用提供的rule,而CBO會根據(jù)給出的Cost信息,智能應(yīng)用rule,求出一個Cost最低的執(zhí)行計(jì)劃。需要糾正很多人誤區(qū)的一點(diǎn)是,CBO其實(shí)也是基于rule的,接觸到RBO和CBO這兩個概念的時(shí)候,很容易將他們對立起來。但實(shí)際上CBO,可以理解為就是加上Cost的RBO。

Flink SQL 引擎的工作流總結(jié)如圖所示。

從圖中可以看出,一段查詢 SQL / 使用TableAPI 編寫的程序(以下簡稱 TableAPI 代碼)從輸入到編譯為可執(zhí)行的 JobGraph 主要經(jīng)歷如下幾個階段

  • 將 SQL文本 / TableAPI 代碼轉(zhuǎn)化為邏輯執(zhí)行計(jì)劃(Logical Plan)

  • Logical Plan 通過優(yōu)化器優(yōu)化為物理執(zhí)行計(jì)劃(Physical Plan)

  • 通過代碼生成技術(shù)生成 Transformations 后進(jìn)一步編譯為可執(zhí)行的 JobGraph 提交運(yùn)行

  • Flink的背壓,怎么解決

    Flink背壓是生產(chǎn)應(yīng)用中常見的情況,當(dāng)程序存在數(shù)據(jù)傾斜、內(nèi)存不足狀況經(jīng)常會發(fā)生背壓,我將從如下幾個方面去分析。

    1)Flink背壓表現(xiàn)

    • 1)運(yùn)行開始時(shí)正常,后面出現(xiàn)大量Task任務(wù)等待

    • 2)少量Task任務(wù)開始報(bào)checkpoint超時(shí)問題

    • 3)大量Kafka數(shù)據(jù)堆積,無法消費(fèi)

    • 4)Flink UI的BackPressure頁面出現(xiàn)紅色High標(biāo)識

    2) 反壓一般有哪些情況

    一般可以細(xì)分兩種情況:

    • 當(dāng)前Task任務(wù)處理速度慢,比如task任務(wù)中調(diào)用算法處理等復(fù)雜邏輯,導(dǎo)致上游申請不到足夠內(nèi)存。

    • 下游Task任務(wù)處理速度慢,比如多次collect()輸出到下游,導(dǎo)致當(dāng)前節(jié)點(diǎn)無法申請足夠的內(nèi)存。

    3) 頻繁反壓的影響是什么

    頻繁反壓會導(dǎo)致流處理作業(yè)數(shù)據(jù)延遲增加,同時(shí)還會影響到Checkpoint。

    Checkpoint時(shí)需要進(jìn)行Barrier對齊,此時(shí)若某個Task出現(xiàn)反壓,Barrier流動速度會下降,導(dǎo)致Checkpoint變慢甚至超時(shí),任務(wù)整體也變慢。

    長期或頻繁出現(xiàn)反壓才需要處理,如果由于網(wǎng)絡(luò)波動或者GC出現(xiàn)的偶爾反壓可以不必處理。

    4)Flink的反壓機(jī)制

    背壓時(shí)一般下游速度慢于上游速度,數(shù)據(jù)久積成疾,需要做限流。但是無法提前預(yù)估下游實(shí)際速度,且存在網(wǎng)絡(luò)波動情況。

    需要保持上下游動態(tài)反饋,如果下游速度慢,則上游限速;否則上游提速。實(shí)現(xiàn)動態(tài)自動反壓的效果。

    下面看下Flink內(nèi)部是怎么實(shí)現(xiàn)反壓機(jī)制的。

    • 1)每個TaskManager維護(hù)共享Network BufferPool(Task共享內(nèi)存池),初始化時(shí)向Off-heap Memory中申請內(nèi)存。

    • 2)每個Task創(chuàng)建自身的Local BufferPool(Task本地內(nèi)存池),并和Network BufferPool交換內(nèi)存。

    • 3)上游Record Writer向 Local BufferPool申請buffer(內(nèi)存)寫數(shù)據(jù)。如果Local BufferPool沒有足夠內(nèi)存則向Network BufferPool申請,使用完之后將申請的內(nèi)存返回Pool。

    • 4)Netty Buffer拷貝buffer并經(jīng)過Socket Buffer發(fā)送到網(wǎng)絡(luò),后續(xù)下游端按照相似機(jī)制處理。

    • 5)當(dāng)下游申請buffer失敗時(shí),表示當(dāng)前節(jié)點(diǎn)內(nèi)存不夠,則逐層發(fā)送反壓信號給上游,上游慢慢停止數(shù)據(jù)發(fā)送,直到下游再次恢復(fù)。

    5)反壓如何處理

    • 查看Flink UI界面,定位哪些Task出現(xiàn)反壓問題

    • 查看代碼和數(shù)據(jù),檢查是否出現(xiàn)數(shù)據(jù)傾斜

    • 如果發(fā)生數(shù)據(jù)傾斜,進(jìn)行預(yù)聚合key或拆分?jǐn)?shù)據(jù)

    • 加大執(zhí)行內(nèi)存,調(diào)整并發(fā)度和分區(qū)數(shù)

    • 其他方式。。。

    由于篇幅有限,更多Flink反壓內(nèi)容請查看我的相關(guān)文章:萬字趣解Flink背壓

    Flink的exactly-once怎么保障

    精準(zhǔn)一次消費(fèi)需要整個系統(tǒng)各環(huán)節(jié)均保持強(qiáng)一致性,包括可靠的數(shù)據(jù)源端(數(shù)據(jù)可重復(fù)讀取、不丟失) 、可靠的消費(fèi)端(Flink)、可靠的輸出端(冪等性、事務(wù))。

    Flink保持精準(zhǔn)一次消費(fèi)主要依靠checkpoint一致性快照和二階段提交機(jī)制。

    1)數(shù)據(jù)源端

    Flink內(nèi)置FlinkKafkaConsumer類,不依賴于 kafka 內(nèi)置的消費(fèi)組offset管理,在內(nèi)部自行記錄并維護(hù) kafka consumer 的offset。

    (1)管理offset(手動提交)并保存到checkpoint中
    (2)FlinkKafkaConsumer API內(nèi)部集成Flink的Checkpoint機(jī)制,自動實(shí)現(xiàn)精確一次的處理語義。

    從源碼中看到stateBackend中把offset state恢復(fù)到restoredState,然后從fetcher拉取最新的offset數(shù)據(jù),隨后將offset存入到stateBackend中;最后更新xcheckpoint。

    2)Flink消費(fèi)端

    Flink內(nèi)部采用一致性快照機(jī)制來保障Exactly-Once的一致性語義。

    通過間隔時(shí)間自動執(zhí)行一致性檢查點(diǎn)(Checkpoints)程序,b并異步插入barrier檢查點(diǎn)分界線。整個流程所有的operator均會進(jìn)行barrier對齊->數(shù)據(jù)完成確認(rèn)->checkpoints狀態(tài)保存,從而保證數(shù)據(jù)被精確一次處理。

    3)輸出端

    Flink內(nèi)置二階段事務(wù)提交機(jī)制和目標(biāo)源支持冪等寫入。

    冪等寫入就是多次寫入會產(chǎn)生相同的結(jié)果,結(jié)果具有不可變性。在Flink中saveAsTextFile算子就是一種比較典型的冪等寫入。

    二階段提交則對于每個checkpoint創(chuàng)建事務(wù),先預(yù)提交數(shù)據(jù)到sink中,然后等所有的checkpoint全部完成后再真正提交請求到sink, 并把狀態(tài)改為已確認(rèn),從而保證數(shù)據(jù)僅被處理一次。

    為checkpoint創(chuàng)建事務(wù),等到所有的checkpoint全部真正的完成后,才把計(jì)算結(jié)果寫入到sink中。

    Flink怎么處理遲到數(shù)據(jù)

  • Flink內(nèi)置watermark機(jī)制,可在一定程度上允許數(shù)據(jù)延遲

  • 程序可在watermark的基礎(chǔ)上再配置最大延遲時(shí)間

  • 開啟側(cè)輸出流,將延遲的數(shù)據(jù)輸出到側(cè)輸出流

  • 程序內(nèi)部控制,延遲過高的數(shù)據(jù)單獨(dú)進(jìn)行后續(xù)處理

  • Flink的雙流JOIN

    Flink雙流JOIN主要分為兩大類。一類是基于原生State的Connect算子操作,另一類是基于窗口的JOIN操作。其中基于窗口的JOIN可細(xì)分為window join和interval join兩種。

    實(shí)現(xiàn)原理:底層原理依賴Flink的State狀態(tài)存儲,通過將數(shù)據(jù)存儲到State中進(jìn)行關(guān)聯(lián)join, 最終輸出結(jié)果。

    1)基于Window Join的雙流JOIN實(shí)現(xiàn)機(jī)制

    通俗理解,將兩條實(shí)時(shí)流中元素分配到同一個時(shí)間窗口中完成Join。兩條實(shí)時(shí)流數(shù)據(jù)緩存在Window State中,當(dāng)窗口觸發(fā)計(jì)算時(shí)執(zhí)行join操作。

    • join算子操作

    兩條流數(shù)據(jù)按照關(guān)聯(lián)主鍵在(滾動、滑動、會話)窗口內(nèi)進(jìn)行inner join, 底層基于State存儲,并支持處理時(shí)間和事件時(shí)間兩種時(shí)間特征,看下源碼:

    windows窗口、state存儲和雙層for循環(huán)執(zhí)行join()實(shí)現(xiàn)雙流JOIN操作,但是此時(shí)僅支持inner join類型。

    • coGroup算子操作

    coGroup算子也是基于window窗口機(jī)制,不過coGroup算子比Join算子更加靈活,可以按照用戶指定的邏輯匹配左流或右流數(shù)據(jù)并輸出,達(dá)到left join和right join的目的。

    orderDetailStream.coGroup(orderStream).where(r -> r.getOrderId()).equalTo(r -> r.getOrderId()).window(TumblingProcessingTimeWindows.of(Time.seconds(60))).apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {@Overridepublic void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector) {for (OrderDetail orderDetaill : orderDetailRecords) {boolean flag = false;for (Order orderRecord : orderRecords) {// 右流中有對應(yīng)的記錄collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));flag = true;}if (!flag) {// 右流中沒有對應(yīng)的記錄collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));}}}}).print(); 復(fù)制代碼

    2)基于Interval Join的雙流JOIN實(shí)現(xiàn)機(jī)制

    Interval Join根據(jù)右流相對左流偏移的時(shí)間區(qū)間(interval)作為關(guān)聯(lián)窗口,在偏移區(qū)間窗口中完成join操作。

    滿足數(shù)據(jù)流stream2在數(shù)據(jù)流stream1的 interval(low, high)偏移區(qū)間內(nèi)關(guān)聯(lián)join。interval越大,關(guān)聯(lián)上的數(shù)據(jù)就越多,超出interval的數(shù)據(jù)不再關(guān)聯(lián)。

    實(shí)現(xiàn)原理:interval join也是利用Flink的state存儲數(shù)據(jù),不過此時(shí)存在state失效機(jī)制ttl,觸發(fā)數(shù)據(jù)清理操作。 val env = ... // kafka 訂單流val orderStream = ... // kafka 訂單明細(xì)流val orderDetailStream = ...orderStream.keyBy(_.1)// 調(diào)用intervalJoin關(guān)聯(lián).intervalJoin(orderDetailStream._2)// 設(shè)定時(shí)間上限和下限.between(Time.milliseconds(-30), Time.milliseconds(30)) .process(new ProcessWindowFunction())class ProcessWindowFunction extends ProcessJoinFunction...{override def processElement(...) {collector.collect((r1, r2) => r1 + " : " + r2)} } 復(fù)制代碼

    訂單流在流入程序后,等候(low,high)時(shí)間間隔內(nèi)的訂單明細(xì)流數(shù)據(jù)進(jìn)行join, 否則繼續(xù)處理下一個流。interval join目前也僅支持inner join。

    3)基于Connect的雙流JOIN實(shí)現(xiàn)機(jī)制

    對兩個DataStream執(zhí)行connect操作,將其轉(zhuǎn)化為ConnectedStreams, 生成的Streams可以調(diào)用不同方法在兩個實(shí)時(shí)流上執(zhí)行,且雙流之間可以共享狀態(tài)。

    兩個數(shù)據(jù)流被connect之后,只是被放在了同一個流中,內(nèi)部依然保持各自的數(shù)據(jù)和形式,兩個流相互獨(dú)立。

    [DataStream1, DataStream2] -> ConnectedStreams[1,2]

    我們可以在Connect算子底層的ConnectedStreams中編寫代碼,自行實(shí)現(xiàn)雙流JOIN的邏輯處理。

    • 1)調(diào)用connect算子,根據(jù)orderid進(jìn)行分組,并使用process算子分別對兩條流進(jìn)行處理。

    orderStream.connect(orderDetailStream).keyBy("orderId", "orderId").process(new orderProcessFunc()); 復(fù)制代碼
    • 2)process方法內(nèi)部進(jìn)行狀態(tài)編程, 初始化訂單、訂單明細(xì)和定時(shí)器的ValueState狀態(tài)。

    private ValueState<OrderEvent> orderState; private ValueState<TxEvent> orderDetailState; private ValueState<Long> timeState;// 初始化狀態(tài)Value orderState = getRuntimeContext().getState(new ValueStateDescriptor<Order>("order-state",Order.class)); ···· 復(fù)制代碼
    • 3)為每個進(jìn)入的數(shù)據(jù)流保存state狀態(tài)并創(chuàng)建定時(shí)器。在時(shí)間窗口內(nèi)另一個流達(dá)到時(shí)進(jìn)行join并輸出,完成后刪除定時(shí)器。

    @Override public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){if (orderDetailState.value() == null){//明細(xì)數(shù)據(jù)未到,先把訂單數(shù)據(jù)放入狀態(tài)orderState.update(value);//建立定時(shí)器,60秒后觸發(fā)Long ts = (value.getEventTime()+10)*1000L;ctx.timerService().registerEventTimeTimer(ts);timeState.update(ts);}else{//明細(xì)數(shù)據(jù)已到,直接輸出到主流out.collect(new Tuple2<>(value,orderDetailState.value()));//刪除定時(shí)器ctx.timerService().deleteEventTimeTimer(timeState.value());//清空狀態(tài),注意清空的是支付狀態(tài)orderDetailState.clear();timeState.clear();} } ... @Override public void processElement2(){... } 復(fù)制代碼
    • 4)未及時(shí)達(dá)到的數(shù)據(jù)流觸發(fā)定時(shí)器輸出到側(cè)輸出流,左流先到而右流未到,則輸出左流,反之輸出右連流。

    @Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {// 實(shí)現(xiàn)左連接if (orderState.value() != null){ctx.output(new OutputTag<String>("left-jo in") {}, orderState.value().getTxId());// 實(shí)現(xiàn)右連接}else{ctx.output(new OutputTag<String>("left-jo in") {}, orderDetailState.value().getTxId());}orderState.clear();orderDetailState.clear();timeState.clear(); } 復(fù)制代碼

    4)Flink雙流JOIN問題處理總結(jié)

    • 1)為什么我的雙流join時(shí)間到了卻不觸發(fā),一直沒有輸出

    檢查一下watermark的設(shè)置是否合理,數(shù)據(jù)時(shí)間是否遠(yuǎn)遠(yuǎn)大于watermark和窗口時(shí)間,導(dǎo)致窗口數(shù)據(jù)經(jīng)常為空
    • 2)state數(shù)據(jù)保存多久,會內(nèi)存爆炸嗎

    state自帶有ttl機(jī)制,可以設(shè)置ttl過期策略,觸發(fā)Flink清理過期state數(shù)據(jù)。建議程序中的state數(shù)據(jù)結(jié)構(gòu)用完后手動clear掉。
    • 3)我的雙流join傾斜怎么辦

    join傾斜三板斧: 過濾異常key、拆分表減少數(shù)據(jù)、打散key分布。當(dāng)然可以的話我建議加內(nèi)存!加內(nèi)存!加內(nèi)存!!
    • 4)想實(shí)現(xiàn)多流join怎么辦

    目前無法一次實(shí)現(xiàn),可以考慮先union然后再二次處理;或者先進(jìn)行connnect操作再進(jìn)行join操作,僅建議~
    • 5)join過程延遲、沒關(guān)聯(lián)上的數(shù)據(jù)會丟失嗎

    這個一般來說不會,join過程可以使用側(cè)輸出流存儲延遲流;如果出現(xiàn)節(jié)點(diǎn)網(wǎng)絡(luò)等異常,Flink checkpoint也可以保證數(shù)據(jù)不丟失。

    Flink數(shù)據(jù)傾斜怎么處理

    數(shù)據(jù)傾斜一般都是數(shù)據(jù)Key分配不均,比如某一類型key數(shù)量過多,導(dǎo)致shuffle過程分到某節(jié)點(diǎn)數(shù)據(jù)量過大,內(nèi)存無法支撐。

    1)數(shù)據(jù)傾斜可能的情況

    那我們怎么發(fā)現(xiàn)數(shù)據(jù)傾斜了呢?一般是監(jiān)控某任務(wù)Job執(zhí)行情況,可以去Yarn UI或者Flink UI觀察,一般會出現(xiàn)如下狀況:

    • 發(fā)現(xiàn)某subTask執(zhí)行時(shí)間過慢

    • 傳輸數(shù)據(jù)量和其他task相差過大

    • BackPressure頁面出現(xiàn)反壓問題(紅色High標(biāo)識)

    結(jié)合以上的排查定位到具體的task中執(zhí)行的算子,一般常見于Keyed類型算子:比如groupBy()、rebance()等產(chǎn)生shuffle過程的操作。

    2)數(shù)據(jù)傾斜的處理方法

    • 數(shù)據(jù)拆分。如果能定位的數(shù)據(jù)傾斜的key,總結(jié)其規(guī)律特征。比如發(fā)現(xiàn)包含某字符,則可以在代碼中把該部分?jǐn)?shù)據(jù)key拆分出來,單獨(dú)處理后拼接。

    • key二次聚合。兩次聚合,第一次將key加前綴聚合,分散單點(diǎn)壓力;隨后去除前綴后再次聚合,得到最終結(jié)果。

    • 調(diào)整參數(shù)。加大TaskManager內(nèi)存、keyby均衡等參數(shù),一般效果不是很好。

    • 自定義分區(qū)或聚合邏輯。繼承分區(qū)劃分、聚合計(jì)算接口,根據(jù)數(shù)據(jù)特征和自定義邏輯,調(diào)整數(shù)據(jù)分區(qū)并均勻打散數(shù)據(jù)key。

    Flink數(shù)據(jù)重復(fù)怎么辦

    一般來說Flink可以開啟exactly-once機(jī)制,可保證精準(zhǔn)一次消費(fèi)。但是如果存在數(shù)據(jù)處理過程異常導(dǎo)致數(shù)據(jù)重復(fù),可以借助一些工具或者程序來處理。

    建議數(shù)據(jù)量不大的話可以使用flink自身的state或者借助bitmap結(jié)構(gòu);稍微大點(diǎn)可以用布隆過濾器或hyperlog工具;其次使用外部介質(zhì)(redis或hbase)設(shè)計(jì)好key就行自動去重,只不過會增加處理過程。

    總結(jié)一下Flink的去重方式:

    • 內(nèi)存去重。采用Hashset等數(shù)據(jù)結(jié)構(gòu),讀取數(shù)據(jù)中類似主鍵等唯一性標(biāo)識字段,在內(nèi)存中存儲并進(jìn)行去重判斷。

    • 使用Redis Key去重。借助Redis的Hset等特殊數(shù)據(jù)類型,自動完成Key去重。

    • DataFrame/SQL場景,使用group by、over()、window開窗等SQL函數(shù)去重

    • 利用groupByKey等聚合算子去重

    Flink實(shí)時(shí)數(shù)倉架構(gòu),為什么這么設(shè)計(jì)

    實(shí)時(shí)數(shù)倉數(shù)據(jù)規(guī)整為層級存儲,每層獨(dú)立加工。整體遵循由下向上建設(shè)思想,最大化數(shù)據(jù)賦能。

    1)數(shù)倉分層設(shè)計(jì)

    • 數(shù)據(jù)源: 分為日志數(shù)據(jù)和業(yè)務(wù)數(shù)據(jù)兩大類,包括結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù)。

    • 數(shù)倉類型:根據(jù)及時(shí)性分為離線數(shù)倉和實(shí)時(shí)數(shù)倉

    • 技術(shù)棧:

    • 采集(Sqoop、Flume、CDC)

    • 存儲(Hive、Hbase、Mysql、Kafka、數(shù)據(jù)湖)

    • 加工(Hive、Spark、Flink)

    • OLAP查詢(Kylin、Clickhous、ES、Dorisdb)等。

    2)數(shù)倉架構(gòu)設(shè)計(jì)

    整體采用Lambda架構(gòu)。保留實(shí)時(shí)、離線兩條處理流程,即最終會同時(shí)構(gòu)建實(shí)時(shí)數(shù)倉和離線數(shù)倉。

    1. 技術(shù)實(shí)現(xiàn)

    • 使用Flink和Kafka、Hive為主要技術(shù)棧

    • 實(shí)時(shí)技術(shù)流程。通過實(shí)時(shí)采集程序同步數(shù)據(jù)到Kafka消息隊(duì)列

    • Flink實(shí)時(shí)讀取Kafka數(shù)據(jù),回寫到kafka ods貼源層topic

    • Flink實(shí)時(shí)讀取Kafka的ods層數(shù)據(jù),進(jìn)行實(shí)時(shí)清洗和加工,結(jié)果寫入到kafka dwd明細(xì)層topic

    • 同樣的步驟,Flink讀取dwd層數(shù)據(jù)寫入到kafka dws匯總層topic

    • 離線技術(shù)流程和前面章節(jié)一致

    • 實(shí)時(shí)olap引擎查詢分析、報(bào)表展示

    2. 優(yōu)缺點(diǎn)

    • 兩套技術(shù)流程,全面保障實(shí)時(shí)性和歷史數(shù)據(jù)完整性

    • 同時(shí)維護(hù)兩套技術(shù)架構(gòu),維護(hù)成本高,技術(shù)難度大

    • 相同數(shù)據(jù)源處理兩次且存儲兩次,產(chǎn)生大量數(shù)據(jù)冗余和操作重復(fù)

    • 容易產(chǎn)生數(shù)據(jù)不一致問題

    3)數(shù)據(jù)流程設(shè)計(jì)

    整體從上而下,數(shù)據(jù)經(jīng)過采集 -> 數(shù)倉明細(xì)加工、匯總 -> 應(yīng)用步驟,提供實(shí)時(shí)數(shù)倉服務(wù)。

    總結(jié)

    以上是生活随笔為你收集整理的Flink面试题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。