flink 自定义 窗口_Flink入门实战 (下)
一、 時(shí)間語義與 Wartermark
1、 Flink 中的時(shí)間語義
在 Flink 的流式處理中,會(huì)涉及到時(shí)間的不同概念,如下圖所示:
Event Time:是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述,例如采集的
日志數(shù)據(jù)中,每一條日志都會(huì)記錄自己的生成時(shí)間,Flink 通過時(shí)間戳分配器訪問事
件時(shí)間戳。 Ingestion Time:是數(shù)據(jù)進(jìn)入 Flink 的時(shí)間。 Processing Time:是每一個(gè)執(zhí)行基于時(shí)間操作的算子的本地系統(tǒng)時(shí)間,與機(jī)器
相關(guān),默認(rèn)的時(shí)間屬性就是 Processing Time。
一個(gè)例子——電影《星球大戰(zhàn)》:
例如,一條日志進(jìn)入 Flink 的時(shí)間為 2017-11-12 10:00:00.123,到達(dá) Window 的
系統(tǒng)時(shí)間為 2017-11-12 10:00:01.234,日志的內(nèi)容如下:
2017對于業(yè)務(wù)來說,要統(tǒng)計(jì) 1min 內(nèi)的故障日志個(gè)數(shù),哪個(gè)時(shí)間是最有意義的?——
eventTime,因?yàn)槲覀円鶕?jù)日志的生成時(shí)間進(jìn)行統(tǒng)計(jì)。
2、 EventTime 的引入
在 Flink 的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用 eventTime,一般只在
eventTime 無法使用時(shí),才會(huì)被迫使用 ProcessingTime 或者 IngestionTime。
如果要使用 EventTime,那么需要引入 EventTime 的時(shí)間屬性,引入方式如下所
示:
val 3、Watermark
3.1、基本概念
我們知道,流處理從事件產(chǎn)生,到流經(jīng) source,再到 operator,中間是有一個(gè)過
程和時(shí)間的,雖然大部分情況下,流到 operator 的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順
序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就
是指 Flink 接收到的事件的先后順序不是嚴(yán)格按照事件的 Event Time 順序排列的。
那么此時(shí)出現(xiàn)一個(gè)問題,一旦出現(xiàn)亂序,如果只根據(jù) eventTime 決定 window 的
運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時(shí)必須要有
個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā) window 去進(jìn)行計(jì)算了,這個(gè)特別的機(jī)
制,就是 Watermark。
- Watermark 是一種衡量 Event Time 進(jìn)展的機(jī)制。
- Watermark 是用于處理亂序事件的,而正確的處理亂序事件,通常用
Watermark 機(jī)制結(jié)合 window 來實(shí)現(xiàn)。
- 數(shù)據(jù)流中的 Watermark 用于表示 timestamp 小于 Watermark 的數(shù)據(jù),都已經(jīng)
到達(dá)了,因此,window 的執(zhí)行也是由 Watermark 觸發(fā)的。
- Watermark 可以理解成一個(gè)延遲觸發(fā)機(jī)制,我們可以設(shè)置 Watermark 的延時(shí)
時(shí)長 t,每次系統(tǒng)會(huì)校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的 maxEventTime,然后認(rèn)定 eventTime
小于 maxEventTime - t 的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于
maxEventTime – t,那么這個(gè)窗口被觸發(fā)執(zhí)行。
有序流的 Watermarker 如下圖所示:(Watermark 設(shè)置為 0)
亂序流的 Watermarker 如下圖所示:(Watermark 設(shè)置為 2)
當(dāng) Flink 接收到數(shù)據(jù)時(shí),會(huì)按照一定的規(guī)則去生成 Watermark,這條 Watermark
就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的 maxEventTime - 延遲時(shí)長,也就是說,Watermark 是
由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的 Watermark 比當(dāng)前未觸發(fā)的窗口的停止時(shí)間要晚,
那么就會(huì)觸發(fā)相應(yīng)窗口的執(zhí)行。由于 Watermark 是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行
過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。
上圖中,我們設(shè)置的允許最大延遲到達(dá)時(shí)間為 2s,所以時(shí)間戳為 7s 的事件對應(yīng)
的 Watermark 是 5s,時(shí)間戳為 12s 的事件的 Watermark 是 10s,如果我們的窗口 1
是 1s~5s,窗口 2 是 6s~10s,那么時(shí)間戳為 7s 的事件到達(dá)時(shí)的 Watermarker 恰好觸
發(fā)窗口 1,時(shí)間戳為 12s 的事件到達(dá)時(shí)的 Watermark 恰好觸發(fā)窗口 2。
Watermark 就是觸發(fā)前一窗口的“關(guān)窗時(shí)間”,一旦觸發(fā)關(guān)門那么以當(dāng)前時(shí)刻
為準(zhǔn)在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會(huì)收入窗中。
只要沒有達(dá)到水位那么不管現(xiàn)實(shí)中的時(shí)間推進(jìn)了多久都不會(huì)觸發(fā)關(guān)窗。
3.2、Watermark 的引入
watermark 的引入很簡單,對于亂序數(shù)據(jù),最常見的引用方式如下:
dataStreamEvent Time 的使用一定要指定數(shù)據(jù)源中的時(shí)間戳。否則程序無法知道事件的事
件時(shí)間是什么(數(shù)據(jù)源里的數(shù)據(jù)沒有時(shí)間戳的話,就只能使用 Processing Time 了)。
我們看到上面的例子中創(chuàng)建了一個(gè)看起來有點(diǎn)復(fù)雜的類,這個(gè)類實(shí)現(xiàn)的其實(shí)就
是分配時(shí)間戳的接口。Flink 暴露了 TimestampAssigner 接口供我們實(shí)現(xiàn),使我們可
以自定義如何從事件數(shù)據(jù)中抽取時(shí)間戳。
val MyAssigner 有兩種類型
- AssignerWithPeriodicWatermarks
- AssignerWithPunctuatedWatermarks
以上兩個(gè)接口都繼承自 TimestampAssigner。
Assigner with periodic watermarks
周期性的生成 watermark:系統(tǒng)會(huì)周期性的將 watermark 插入到流中(水位線也
是一種特殊的事件!)。默認(rèn)周期是 200 毫秒。可以使用
ExecutionConfig.setAutoWatermarkInterval()方法進(jìn)行設(shè)置。
val 產(chǎn)生 watermark 的邏輯:每隔 5 秒鐘,Flink 會(huì)調(diào)用
AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。如果方法返回一個(gè)
時(shí)間戳大于之前水位的時(shí)間戳,新的 watermark 會(huì)被插入到流中。這個(gè)檢查保證了
水位線是單調(diào)遞增的。如果方法返回的時(shí)間戳小于等于之前水位的時(shí)間戳,則不會(huì)
產(chǎn)生新的 watermark。
例子,自定義一個(gè)周期性的時(shí)間戳抽取:
class 一種簡單的特殊情況是,如果我們事先得知數(shù)據(jù)流的時(shí)間戳是單調(diào)遞增的,也
就是說沒有亂序,那我們可以使用 assignAscendingTimestamps,這個(gè)方法會(huì)直接使
用數(shù)據(jù)的時(shí)間戳生成 watermark。
val 而對于亂序數(shù)據(jù)流,如果我們能大致估算出數(shù)據(jù)流中的事件的最大延遲時(shí)間,
就可以使用如下代碼:
val Assigner with punctuated watermarks
間斷式地生成 watermark。和周期性生成的方式不同,這種方式不是固定時(shí)間的,
而是可以根據(jù)需要對每條數(shù)據(jù)進(jìn)行篩選和處理。直接上代碼來舉個(gè)例子,我們只給
sensor_1 的傳感器的數(shù)據(jù)流插入 watermark:
class 4、 EvnetTime 在 window 中的使用
案例一:Flink窗口操作之簡單測試
4.1、滾動(dòng)窗口(TumblingEventTimeWindows)
代碼具體實(shí)現(xiàn):
package 啟動(dòng)程序后,視頻演示:表示10秒之內(nèi)統(tǒng)計(jì)數(shù)據(jù)
Flink的滾動(dòng)窗口實(shí)現(xiàn)方式https://www.zhihu.com/video/1241477585970135040案例二:Flink窗口操作之事件時(shí)間測試
代碼具體實(shí)現(xiàn):
package 啟動(dòng)程序后,視頻演示:事件時(shí)間測試表示執(zhí)行多少個(gè)才能把窗口關(guān)閉,由于這里簡單測試沒遇到窗口關(guān)閉
事件時(shí)間測試 https://www.zhihu.com/video/1241492597937111040案例三:Flink窗口操作之Window起始點(diǎn)
視頻演示:
Window起始點(diǎn)https://www.zhihu.com/video/1241685277611372544二、ProcessFunction API(底層 API)
我們之前學(xué)習(xí)的轉(zhuǎn)換算子是無法訪問事件的時(shí)間戳信息和水位線信息的。而這
在一些應(yīng)用場景下,極為重要。例如 MapFunction 這樣的 map 轉(zhuǎn)換算子就無法訪問
時(shí)間戳或者當(dāng)前事件的事件時(shí)間。
基于此,DataStream API 提供了一系列的 Low-Level 轉(zhuǎn)換算子。可以訪問時(shí)間 戳、watermark 以及注冊定時(shí)事件。還可以輸出特定的一些事件,例如超時(shí)事件等。
Process Function 用來構(gòu)建事件驅(qū)動(dòng)的應(yīng)用以及實(shí)現(xiàn)自定義的業(yè)務(wù)邏輯(使用之前的
window 函數(shù)和轉(zhuǎn)換算子無法實(shí)現(xiàn))。例如,Flink SQL 就是使用 Process Function 實(shí)
現(xiàn)的。
Flink 提供了 8 個(gè) Process Function:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
1、KeyedProcessFunction
這里我們重點(diǎn)介紹 KeyedProcessFunction。
KeyedProcessFunction 用來操作 KeyedStream。KeyedProcessFunction 會(huì)處理流
的每一個(gè)元素,輸出為 0 個(gè)、1 個(gè)或者多個(gè)元素。所有的 Process Function 都繼承自
RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。
而 KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個(gè)方法:
- processElement(v: IN, ctx: Context, out: Collector[OUT]),
流中的每一個(gè)元素 都會(huì)調(diào)用這個(gè)方法,調(diào)用結(jié)果將會(huì)放在 Collector 數(shù)據(jù)類型中輸出。Context 可以訪問元素的時(shí)間戳,元素的 key,以及 TimerService 時(shí)間服務(wù)。Context
還可以將結(jié)果輸出到別的流(side outputs)。
- onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])
是一個(gè)回 調(diào)函數(shù)。當(dāng)之前注冊的定時(shí)器觸發(fā)時(shí)調(diào)用。參數(shù) timestamp 為定時(shí)器所設(shè)定
的觸發(fā)的時(shí)間戳。Collector 為輸出結(jié)果的集合。OnTimerContext 和
processElement 的 Context 參數(shù)一樣,提供了上下文的一些信息,例如定時(shí)器
觸發(fā)的時(shí)間信息(事件時(shí)間或者處理時(shí)間)。
2、TimerService 和 定時(shí)器(Timers)
Context 和 OnTimerContext 所持有的 TimerService 對象擁有以下方法:
- currentProcessingTime(): Long 返回當(dāng)前處理時(shí)間
- ? currentWatermark(): Long 返回當(dāng)前 watermark 的時(shí)間戳
- ? registerProcessingTimeTimer(timestamp: Long): Unit 會(huì)注冊當(dāng)前 key 的
processing time 的定時(shí)器。當(dāng) processing time 到達(dá)定時(shí)時(shí)間時(shí),觸發(fā) timer。
- registerEventTimeTimer(timestamp: Long): Unit 會(huì)注冊當(dāng)前 key 的 event time
定時(shí)器。當(dāng)水位線大于等于定時(shí)器注冊的時(shí)間時(shí),觸發(fā)定時(shí)器執(zhí)行回調(diào)函數(shù)。
- deleteProcessingTimeTimer(timestamp: Long): Unit 刪除之前注冊處理時(shí)間定
時(shí)器。如果沒有這個(gè)時(shí)間戳的定時(shí)器,則不執(zhí)行。
- deleteEventTimeTimer(timestamp: Long): Unit 刪除之前注冊的事件時(shí)間定時(shí)
器,如果沒有此時(shí)間戳的定時(shí)器,則不執(zhí)行。
當(dāng)定時(shí)器 timer 觸發(fā)時(shí),會(huì)執(zhí)行回調(diào)函數(shù) onTimer()。注意定時(shí)器 timer 只能在
keyed streams 上面使用。
下面舉個(gè)例子說明 KeyedProcessFunction 如何操作 KeyedStream。
需求:監(jiān)控溫度傳感器的溫度值,如果溫度值在一秒鐘之內(nèi)(processing time)連
續(xù)上升,則報(bào)警。
val 看一下 TempIncreaseAlertFunction 如何實(shí)現(xiàn), 程序中使用了 ValueState 這樣一個(gè)
狀態(tài)變量。
具體代碼實(shí)現(xiàn):
package 啟動(dòng)程序,控制臺(tái)打印數(shù)據(jù)
3、側(cè)輸出流(SideOutput)
大部分的 DataStream API 的算子的輸出是單一輸出,也就是某種數(shù)據(jù)類型的流。
除了 split 算子,可以將一條流分成多條流,這些流的數(shù)據(jù)類型也都相同。process
function 的 side outputs 功能可以產(chǎn)生多條流,并且這些流的數(shù)據(jù)類型可以不一樣。
一個(gè) side output 可以定義為 OutputTag[X]對象,X 是輸出流的數(shù)據(jù)類型。process
function 可以通過 Context 對象發(fā)射一個(gè)事件到一個(gè)或者多個(gè) side outputs。
下面是一個(gè)示例程序:
val 接下來我們實(shí)現(xiàn) FreezingMonitor 函數(shù),用來監(jiān)控傳感器溫度值,將溫度值低于
32F 的溫度輸出到 side output。
具體代碼實(shí)現(xiàn):
package 啟動(dòng)程序,控制臺(tái)打印數(shù)據(jù)
冰點(diǎn)低溫輸出流https://www.zhihu.com/video/12417521135516467204、CoProcessFunction
對于兩條輸入流,DataStream API 提供了 CoProcessFunction 這樣的 low-level
操作。CoProcessFunction 提供了操作每一個(gè)輸入流的方法: processElement1()和
processElement2()。
類似于 ProcessFunction,這兩種方法都通過 Context 對象來調(diào)用。這個(gè) Context
對象可以訪問事件數(shù)據(jù),定時(shí)器時(shí)間戳,TimerService,以及 side outputs。
CoProcessFunction 也提供了 onTimer()回調(diào)函數(shù)。
總結(jié)
以上是生活随笔為你收集整理的flink 自定义 窗口_Flink入门实战 (下)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 求一个qq网名三字女生。
- 下一篇: python数据库框架_Python数据