Apache Flink 进阶入门(二):Time 深度解析
前言
Flink 的 API 大體上可以劃分為三個(gè)層次:處于最底層的 ProcessFunction、中間一層的 DataStream API 和最上層的 SQL/Table API,這三層中的每一層都非常依賴于時(shí)間屬性。時(shí)間屬性是流處理中最重要的一個(gè)方面,是流處理系統(tǒng)的基石之一,貫穿這三層 API。在 DataStream API 這一層中因?yàn)榉庋b方面的原因,我們能夠接觸到時(shí)間的地方不是很多,所以我們將重點(diǎn)放在底層的 ProcessFunction 和最上層的 SQL/Table API。
Flink 時(shí)間語(yǔ)義
在不同的應(yīng)用場(chǎng)景中時(shí)間語(yǔ)義是各不相同的,Flink 作為一個(gè)先進(jìn)的分布式流處理引擎,它本身支持不同的時(shí)間語(yǔ)義。其核心是 Processing Time 和 Event Time(Row Time),這兩類時(shí)間主要的不同點(diǎn)如下表所示:
Processing Time 是來(lái)模擬我們真實(shí)世界的時(shí)間,其實(shí)就算是處理數(shù)據(jù)的節(jié)點(diǎn)本地時(shí)間也不一定就是完完全全的我們真實(shí)世界的時(shí)間,所以說它是用來(lái)模擬真實(shí)世界的時(shí)間。而 Event Time 是數(shù)據(jù)世界的時(shí)間,就是我們要處理的數(shù)據(jù)流世界里面的時(shí)間。關(guān)于他們的獲取方式,Process Time 是通過直接去調(diào)用本地機(jī)器的時(shí)間,而 Event Time 則是根據(jù)每一條處理記錄所攜帶的時(shí)間戳來(lái)判定。
這兩種時(shí)間在 Flink 內(nèi)部的處理以及還是用戶的實(shí)際使用方面,難易程度都是不同的。相對(duì)而言的 Processing Time 處理起來(lái)更加的簡(jiǎn)單,而 Event Time 要更麻煩一些。而在使用 Processing Time 的時(shí)候,我們得到的處理結(jié)果(或者說流處理應(yīng)用的內(nèi)部狀態(tài))是不確定的。而因?yàn)樵?Flink 內(nèi)部對(duì) Event Time 做了各種保障,使用 Event Time 的情況下,無(wú)論重放數(shù)據(jù)多少次,都能得到一個(gè)相對(duì)確定可重現(xiàn)的結(jié)果。
因此在判斷應(yīng)該使用 Processing Time 還是 Event Time 的時(shí)候,可以遵循一個(gè)原則:當(dāng)你的應(yīng)用遇到某些問題要從上一個(gè) checkpoint 或者 savepoint 進(jìn)行重放,是不是希望結(jié)果完全相同。如果希望結(jié)果完全相同,就只能用 Event Time;如果接受結(jié)果不同,則可以用 Processing Time。Processing Time 的一個(gè)常見的用途是,我們要根據(jù)現(xiàn)實(shí)時(shí)間來(lái)統(tǒng)計(jì)整個(gè)系統(tǒng)的吞吐,比如要計(jì)算現(xiàn)實(shí)時(shí)間一個(gè)小時(shí)處理了多少條數(shù)據(jù),這種情況只能使用 Processing Time。
時(shí)間的特性
時(shí)間的一個(gè)重要特性是:時(shí)間只能遞增,不會(huì)來(lái)回穿越。?在使用時(shí)間的時(shí)候我們要充分利用這個(gè)特性。假設(shè)我們有這么一些記錄,然后我們來(lái)分別看一下 Processing Time 還有 Event Time 對(duì)于時(shí)間的處理。
- 對(duì)于 Processing Time,因?yàn)槲覀兪鞘褂玫氖潜镜毓?jié)點(diǎn)的時(shí)間(假設(shè)這個(gè)節(jié)點(diǎn)的時(shí)鐘同步?jīng)]有問題),我們每一次取到的 Processing Time 肯定都是遞增的,遞增就代表著有序,所以說我們相當(dāng)于拿到的是一個(gè)有序的數(shù)據(jù)流。
- 而在用 Event Time 的時(shí)候因?yàn)闀r(shí)間是綁定在每一條的記錄上的,由于網(wǎng)絡(luò)延遲、程序內(nèi)部邏輯、或者其他一些分布式系統(tǒng)的原因,數(shù)據(jù)的時(shí)間可能會(huì)存在一定程度的亂序,比如上圖的例子。在 Event Time 場(chǎng)景下,我們把每一個(gè)記錄所包含的時(shí)間稱作 Record Timestamp。如果 Record Timestamp 所得到的時(shí)間序列存在亂序,我們就需要去處理這種情況。
如果單條數(shù)據(jù)之間是亂序,我們就考慮對(duì)于整個(gè)序列進(jìn)行更大程度的離散化。簡(jiǎn)單地講,就是把數(shù)據(jù)按照一定的條數(shù)組成一些小批次,但這里的小批次并不是攢夠多少條就要去處理,而是為了對(duì)他們進(jìn)行時(shí)間上的劃分。經(jīng)過這種更高層次的離散化之后,我們會(huì)發(fā)現(xiàn)最右邊方框里的時(shí)間就是一定會(huì)小于中間方框里的時(shí)間,中間框里的時(shí)間也一定會(huì)小于最左邊方框里的時(shí)間。
這個(gè)時(shí)候我們?cè)谡麄€(gè)時(shí)間序列里插入一些類似于標(biāo)志位的一些特殊的處理數(shù)據(jù),這些特殊的處理數(shù)據(jù)叫做 watermark。一個(gè) watermark 本質(zhì)上就代表了這個(gè) watermark 所包含的 timestamp 數(shù)值,表示以后到來(lái)的數(shù)據(jù)已經(jīng)再也沒有小于或等于這個(gè)時(shí)間的了。
Timestamp 和 Watermark 行為概覽
接下來(lái)我們重點(diǎn)看一下 Event Time 里的 Record Timestamp(簡(jiǎn)寫成 timestamp)和 watermark 的一些基本信息。絕大多數(shù)的分布式流計(jì)算引擎對(duì)于數(shù)據(jù)都是進(jìn)行了 DAG 圖的抽象,它有自己的數(shù)據(jù)源,有處理算子,還有一些數(shù)據(jù)匯。數(shù)據(jù)在不同的邏輯算子之間進(jìn)行流動(dòng)。watermark 和 timestamp 有自己的生命周期,接下來(lái)我會(huì)從 watermark 和 timestamp 的產(chǎn)生、他們?cè)诓煌墓?jié)點(diǎn)之間的傳播、以及在每一個(gè)節(jié)點(diǎn)上的處理,這三個(gè)方面來(lái)展開介紹。
Timestamp 分配和 Watermark 生成
Flink 支持兩種 watermark 生成方式。第一種是在 SourceFunction 中產(chǎn)生,相當(dāng)于把整個(gè)的 timestamp 分配和 watermark 生成的邏輯放在流處理應(yīng)用的源頭。我們可以在 SourceFunction 里面通過這兩個(gè)方法產(chǎn)生 watermark:
- 通過 collectWithTimestamp 方法發(fā)送一條數(shù)據(jù),其中第一個(gè)參數(shù)就是我們要發(fā)送的數(shù)據(jù),第二個(gè)參數(shù)就是這個(gè)數(shù)據(jù)所對(duì)應(yīng)的時(shí)間戳;也可以調(diào)用 emitWatermark 方法去產(chǎn)生一條 watermark,表示接下來(lái)不會(huì)再有時(shí)間戳小于等于這個(gè)數(shù)值記錄。
- 另外,有時(shí)候我們不想在 SourceFunction 里生成 timestamp 或者 watermark,或者說使用的 SourceFunction 本身不支持,我們還可以在使用 DataStream API 的時(shí)候指定,調(diào)用的 DataStream.assignTimestampsAndWatermarks 這個(gè)方法,能夠接收不同的 timestamp 和 watermark 的生成器。
總體上而言生成器可以分為兩類:第一類是定期生成器;第二類是根據(jù)一些在流處理數(shù)據(jù)流中遇到的一些特殊記錄生成的。
兩者的區(qū)別主要有三個(gè)方面,首先定期生成是現(xiàn)實(shí)時(shí)間驅(qū)動(dòng)的,這里的“定期生成”主要是指 watermark(因?yàn)?timestamp 是每一條數(shù)據(jù)都需要有的),即定期會(huì)調(diào)用生成邏輯去產(chǎn)生一個(gè) watermark。而根據(jù)特殊記錄生成是數(shù)據(jù)驅(qū)動(dòng)的,即是否生成 watermark 不是由現(xiàn)實(shí)時(shí)間來(lái)決定,而是當(dāng)看到一些特殊的記錄就表示接下來(lái)可能不會(huì)有符合條件的數(shù)據(jù)再發(fā)過來(lái)了,這個(gè)時(shí)候相當(dāng)于每一次分配 Timestamp 之后都會(huì)調(diào)用用戶實(shí)現(xiàn)的 watermark 生成方法,用戶需要在生成方法中去實(shí)現(xiàn) watermark 的生成邏輯。
大家要注意的是就是我們?cè)诜峙?timestamp 和生成 watermark 的過程,雖然在 SourceFunction 和 DataStream 中都可以指定,但是還是建議生成的工作越靠近 DataSource 越好。這樣會(huì)方便讓程序邏輯里面更多的 operator 去判斷某些數(shù)據(jù)是否亂序。Flink 內(nèi)部提供了很好的機(jī)制去保證這些 timestamp 和 watermark 被正確地傳遞到下游的節(jié)點(diǎn)。
Watermark 傳播
具體的傳播策略基本上遵循這三點(diǎn)。
- 首先,watermark 會(huì)以廣播的形式在算子之間進(jìn)行傳播。比如說上游的算子,它連接了三個(gè)下游的任務(wù),它會(huì)把自己當(dāng)前的收到的 watermark 以廣播的形式傳到下游。
- 第二,如果在程序里面收到了一個(gè) Long.MAX_VALUE 這個(gè)數(shù)值的 watermark,就表示對(duì)應(yīng)的那一條流的一個(gè)部分不會(huì)再有數(shù)據(jù)發(fā)過來(lái)了,它相當(dāng)于就是一個(gè)終止的一個(gè)標(biāo)志。
- 第三,對(duì)于單流而言,這個(gè)策略比較好理解,而對(duì)于有多個(gè)輸入的算子,watermark 的計(jì)算就有講究了,一個(gè)原則是:單輸入取其大,多輸入取小。
舉個(gè)例子,假設(shè)這邊藍(lán)色的塊代表一個(gè)算子的一個(gè)任務(wù),然后它有三個(gè)輸入,分別是 W1、W2、W3,這三個(gè)輸入可以理解成任何的輸入,這三個(gè)輸入可能是屬于同一個(gè)流,也可能是屬于不同的流。然后在計(jì)算 watermark 的時(shí)候,對(duì)于單個(gè)輸入而言是取他們的最大值,因?yàn)槲覀兌贾?watermark 應(yīng)該遵循一個(gè)單調(diào)遞增的一個(gè)原則。對(duì)于多輸入,它要統(tǒng)計(jì)整個(gè)算子任務(wù)的 watermark 時(shí),就會(huì)取這三個(gè)計(jì)算出來(lái)的 watermark 的最小值。即一個(gè)多個(gè)輸入的任務(wù),它的 watermark 受制于最慢的那條輸入流。這一點(diǎn)類似于木桶效應(yīng),整個(gè)木桶中裝的水會(huì)就是受制于最矮的那塊板。
watermark 在傳播的時(shí)候有一個(gè)特點(diǎn)是,它的傳播是冪等的。多次收到相同的 watermark,甚至收到之前的 watermark 都不會(huì)對(duì)最后的數(shù)值產(chǎn)生影響,因?yàn)閷?duì)于單個(gè)輸入永遠(yuǎn)是取最大的,而對(duì)于整個(gè)任務(wù)永遠(yuǎn)是取一個(gè)最小的。
同時(shí)我們可以注意到這種設(shè)計(jì)其實(shí)有一個(gè)局限,具體體現(xiàn)在它沒有區(qū)分你這個(gè)輸入是一條流多個(gè) partition 還是來(lái)自于不同的邏輯上的流的 JOIN。對(duì)于同一個(gè)流的不同 partition,我們對(duì)他做這種強(qiáng)制的時(shí)鐘同步是沒有問題的,因?yàn)橐婚_始就是把一條流拆散成不同的部分,但每一個(gè)部分之間共享相同的時(shí)鐘。但是如果算子的任務(wù)是在做類似于 JOIN 操作,那么要求你兩個(gè)輸入的時(shí)鐘強(qiáng)制同步其實(shí)沒有什么道理的,因?yàn)橥耆锌赡苁前岩粭l離現(xiàn)在時(shí)間很近的數(shù)據(jù)流和一個(gè)離當(dāng)前時(shí)間很遠(yuǎn)的數(shù)據(jù)流進(jìn)行 JOIN,這個(gè)時(shí)候?qū)τ诳斓哪菞l流,因?yàn)樗嚷哪菞l流,所以說它可能就要在狀態(tài)中去緩存非常多的數(shù)據(jù),這對(duì)于整個(gè)集群來(lái)說是一個(gè)很大的性能開銷。
ProcessFunction
在正式介紹 watermark 的處理之前,先簡(jiǎn)單介紹 ProcessFunction,因?yàn)?watermark 在任務(wù)里的處理邏輯分為內(nèi)部邏輯和外部邏輯。外部邏輯其實(shí)就是通過 ProcessFunction 來(lái)體現(xiàn)的,如果你需要使用 Flink 提供的時(shí)間相關(guān)的 API 的話就只能寫在 ProcessFunction 里。
ProcessFunction 和時(shí)間相關(guān)的功能主要有三點(diǎn):
- 第一點(diǎn)就是根據(jù)你當(dāng)前系統(tǒng)使用的時(shí)間語(yǔ)義不同,你可以去獲取當(dāng)前你正在處理這條記錄的 Record Timestamp,或者當(dāng)前的 Processing Time。
- 第二點(diǎn)就是它可以獲取當(dāng)前算子的時(shí)間,可以把它理解成當(dāng)前的 watermark。
- 第三點(diǎn)就是為了在 ProcessFunction 中去實(shí)現(xiàn)一些相對(duì)復(fù)雜的功能,允許注冊(cè)一些 timer(定時(shí)器)。比如說在 watermark 達(dá)到某一個(gè)時(shí)間點(diǎn)的時(shí)候就觸發(fā)定時(shí)器,所有的這些回調(diào)邏輯也都是由用戶來(lái)提供,涉及到如下三個(gè)方法,registerEventTimeTimer、registerProcessingTimeTimer 和 onTimer。在 onTimer 方法中就需要去實(shí)現(xiàn)自己的回調(diào)邏輯,當(dāng)條件滿足時(shí)回調(diào)邏輯就會(huì)被觸發(fā)。
一個(gè)簡(jiǎn)單的應(yīng)用是,我們?cè)谧鲆恍r(shí)間相關(guān)的處理的時(shí)候,可能需要緩存一部分?jǐn)?shù)據(jù),但這些數(shù)據(jù)不能一直去緩存下去,所以需要有一些過期的機(jī)制,我們可以通過 timer 去設(shè)定這么一個(gè)時(shí)間,指定某一些數(shù)據(jù)可能在將來(lái)的某一個(gè)時(shí)間點(diǎn)過期,從而把它從狀態(tài)里刪除掉。所有的這些和時(shí)間相關(guān)的邏輯在 Flink 內(nèi)部都是由自己的 Time Service(時(shí)間服務(wù))完成的。
Watermark處理
一個(gè)算子的實(shí)例在收到 watermark 的時(shí)候,首先要更新當(dāng)前的算子時(shí)間,這樣的話在 ProcessFunction 里方法查詢這個(gè)算子時(shí)間的時(shí)候,就能獲取到最新的時(shí)間。第二步它會(huì)遍歷計(jì)時(shí)器隊(duì)列,這個(gè)計(jì)時(shí)器隊(duì)列就是我們剛剛說到的 timer,你可以同時(shí)注冊(cè)很多 timer,Flink 會(huì)把這些 Timer 按照觸發(fā)時(shí)間放到一個(gè)優(yōu)先隊(duì)列中。第三步 Flink 得到一個(gè)時(shí)間之后就會(huì)遍歷計(jì)時(shí)器的隊(duì)列,然后逐一觸發(fā)用戶的回調(diào)邏輯。 通過這種方式,Flink 的某一個(gè)任務(wù)就會(huì)將當(dāng)前的 watermark 發(fā)送到下游的其他任務(wù)實(shí)例上,從而完成整個(gè) watermark 的傳播,從而形成一個(gè)閉環(huán)。
Table API 中的時(shí)間
下面我們來(lái)看一看 Table/SQL API 中的時(shí)間。為了讓時(shí)間參與到 Table/SQL 這一層的運(yùn)算中,我們需要提前把時(shí)間屬性放到表的 schema 中,這樣的話我們才能夠在 SQL 語(yǔ)句或者 Table 的一些邏輯表達(dá)式里面去使用這些時(shí)間去完成需求。
Table 中指定時(shí)間列
其實(shí)之前社區(qū)就怎么在 Table/SQL 中去使用時(shí)間這個(gè)問題做過一定的討論,是把獲取當(dāng)前 Processing Time 的方法是作為一個(gè)特殊的 UDF,還是把這一個(gè)列物化到整個(gè)的 schema 里面,最終采用了后者。我們這里就分開來(lái)講一講 Processing Time 和 Event Time 在使用的時(shí)候怎么在 Table 中指定。
對(duì)于 Processing Time,我們知道要得到一個(gè) Table 對(duì)象(或者注冊(cè)一個(gè) Table)有兩種手段:
(1)可以從一個(gè) DataStream 轉(zhuǎn)化成一個(gè) Table;
(2)直接通過 TableSource 去生成這么一個(gè) Table;
對(duì)于第一種方法而言,我們只需要在你已有的這些列中(例子中 f1 和 f2 就是兩個(gè)已有的列),在最后用“列名.proctime”這種寫法就可以把最后的這一列注冊(cè)為一個(gè) Processing Time,以后在寫查詢的時(shí)候就可以去直接使用這一列。如果 Table 是通過 TableSource 生成的,就可以通過實(shí)現(xiàn)這一個(gè) DefinedRowtimeAttributes 接口,然后就會(huì)自動(dòng)根據(jù)你提供的邏輯去生成對(duì)應(yīng)的 Processing Time。
相對(duì)而言,在使用 Event Time 時(shí)則有一個(gè)限制,因?yàn)?Event Time 不像 Processing Time 那樣是隨拿隨用。如果你要從 DataStream 去轉(zhuǎn)化得到一個(gè) Table,必須要提前保證原始的 DataStream 里面已經(jīng)存在了 Record Timestamp 和 watermark。如果你想通過 TableSource 生成的,也一定要保證你要接入的一個(gè)數(shù)據(jù)里面存在一個(gè)類型為 long 或者 timestamp 的這么一個(gè)時(shí)間字段。
具體來(lái)說,如果你要從 DataStream 去注冊(cè)一個(gè)表,和 proctime 類似,你只需要加上“列名.rowtime”就可以。需要注意的是,如果你要用 Processing Time,必須保證你要新加的字段是整個(gè) schema 中的最后一個(gè)字段,而 Event Time 的時(shí)候你其實(shí)可以去替換某一個(gè)已有的列,然后 Flink 會(huì)自動(dòng)的把這一列轉(zhuǎn)化成需要的 rowtime 這個(gè)類型。 如果是通過 TableSource 生成的,只需要實(shí)現(xiàn) DefinedRowtimeAttributes 接口就可以了。需要說明的一點(diǎn)是,在 DataStream API 這一側(cè)其實(shí)不支持同時(shí)存在多個(gè) Event Time(rowtime),但是在 Table 這一層理論上可以同時(shí)存在多個(gè) rowtime。因?yàn)?DefinedRowtimeAttributes 接口的返回值是一個(gè)對(duì)于 rowtime 描述的 List,即其實(shí)可以同時(shí)存在多個(gè) rowtime 列,在將來(lái)可能會(huì)進(jìn)行一些其他的改進(jìn),或者基于去做一些相應(yīng)的優(yōu)化。
時(shí)間列和Table操作
指定完了時(shí)間列之后,當(dāng)我們要真正去查詢時(shí)就會(huì)涉及到一些具體的操作。這里我列舉的這些操作都是和時(shí)間列緊密相關(guān),或者說必須在這個(gè)時(shí)間列上才能進(jìn)行的。比如說“Over 窗口聚合”和“Group by 窗口聚合”這兩種窗口聚合,在寫 SQL 提供參數(shù)的時(shí)候只能允許你在這個(gè)時(shí)間列上進(jìn)行這種聚合。第三個(gè)就是時(shí)間窗口聚合,你在寫條件的時(shí)候只支持對(duì)應(yīng)的時(shí)間列。最后就是排序,我們知道在一個(gè)無(wú)盡的數(shù)據(jù)流上對(duì)數(shù)據(jù)做排序幾乎是不可能的事情,但因?yàn)檫@個(gè)數(shù)據(jù)本身到來(lái)的順序已經(jīng)是按照時(shí)間屬性來(lái)進(jìn)行排序,所以說我們?nèi)绻獙?duì)一個(gè) DataStream 轉(zhuǎn)化成 Table 進(jìn)行排序的話,你只能是按照時(shí)間列進(jìn)行排序,當(dāng)然同時(shí)你也可以指定一些其他的列,但是時(shí)間列這個(gè)是必須的,并且必須放在第一位。
為什么說這些操作只能在時(shí)間列上進(jìn)行?因?yàn)槲覀冇械臅r(shí)候可以把到來(lái)的數(shù)據(jù)流就看成是一張按照時(shí)間排列好的一張表,而我們?nèi)魏螌?duì)于表的操作,其實(shí)都是必須在對(duì)它進(jìn)行一次順序掃描的前提下完成的。因?yàn)榇蠹叶贾罃?shù)據(jù)流的特性之一就是一過性,某一條數(shù)據(jù)處理過去之后,將來(lái)其實(shí)不太好去訪問它。當(dāng)然因?yàn)?Flink 中內(nèi)部提供了一些狀態(tài)機(jī)制,我們可以在一定程度上去弱化這個(gè)特性,但是最終還是不能超越的限制狀態(tài)不能太大。所有這些操作為什么只能在時(shí)間列上進(jìn)行,因?yàn)檫@個(gè)時(shí)間列能夠保證我們內(nèi)部產(chǎn)生的狀態(tài)不會(huì)無(wú)限的增長(zhǎng)下去,這是一個(gè)最終的前提。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Apache Flink 进阶入门(二):Time 深度解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 神仙在双11晚上,都干了些啥?
- 下一篇: 云原生下日志方案的架构设计