日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...

發布時間:2024/10/8 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文闡述 Flink 的事件時間和 Watermark 機制,剖析 Watermark 產生和傳遞的流程。


1 Event time 和 Watermark 的關系

1.1 Event time 和 Processing time介紹

Event time 事件時間和Processing time 處理時間主要區別是產生時間不同,前者是事件的實際發生時間,后者是機器的系統處理時間,如下圖所示。

① Event time 事件時間:事件在其設備上發生的時間。

Event time 是事件在進入 Flink 之前已經嵌入到記錄的時間,其大小取決于事件本身,與網絡延時、系統時區等因素無關。

② Processing time 處理時間:作業正在執行相應操作的機器系統時間。

Processing time 提供了最佳的性能和最低的延遲,但是不能提供確定性,即計算結果是不確定的。 例如,時間窗口為5min的求和統計,應用程序在 9:00 開始運行,則第一個時間窗口處理 [9:00, 9:05) 的事件,下一個窗口處理 [9:05, 9:10) 的事件,依此類推。通信延遲、作業故障重啟等問題,可能導致窗口的計算結果是不一樣的。如下圖所示,假設事件(事件時間, 數值) 遇到上述問題,場景一:事件 B 有網絡延遲落在[9:10, 9:15),場景二:作業故障重啟導致事件 A 和事件 B落在[9:10, 9:15)。

1.2 Event time 和 Watermark

問題:Flink 支持事件時間,如何測量事件時間的進度?例如,5min 的事件時間窗口,當事件時間超過 5min 時,需要通知 Flink 觸發窗口計算。解答:Watermark 機制。

Watermark 本質是時間戳,與業務數據一樣無差別地傳遞下去,目的是衡量事件時間的進度(通知 Flink 觸發事件時間相關的操作,例如窗口)。

說明: Watermark(T) 表示目前系統的時間事件是 T,即系統后續沒有 T'<T 的事件即 Event(T'<T)/*** 1.Watermark 是一個時間戳, 它表示小于該時間戳的事件都已經到達了。* 2.Watermark 一般情況在源位置產生(也可以在流圖中的其它節點產生), 通過流圖節點傳播。* 3.Watermark 也是 StreamElement, 和普通數據一起在算子之間傳遞。* 4.Watermark 可以觸發窗口計算, 時間戳為 Long.MAX_VALUE 表示算子后續沒有任何數據。*/ public final class Watermark extends StreamElement {// 省略.../*** The timestamp of the watermark in milliseconds.*/private final long timestamp;/*** Creates a new watermark with the given timestamp in milliseconds.*/public Watermark(long timestamp) {this.timestamp = timestamp;}/*** Returns the timestamp associated with this {@link Watermark} in milliseconds.*/public long getTimestamp() {return timestamp;}// 省略... }

如下圖所示,事件 Event 是按照事件時間 EventTime 順序上報的。

如下圖所示,事件 Event 是不按照事件時間 EventTime 亂序上報的。

2 Watermark 的產生

2.1 Watermark 類型

說明:flink-1.12 支持 WatermarkStrategy 和 WatermarkGenerator

flink 采用 WatermarkStrategy 設置自定義 Watermark 類型,WatermarkGenerator 是 Watermark 的基類。flink 實現了 Punctuated Watermarks 從事件獲取事件的時間戳、Periodic Watermarks 周期獲取事件的時間戳。

/*** The {@code WatermarkGenerator} generates watermarks either based on events or* periodically (in a fixed interval).** <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.*/ @Public public interface WatermarkGenerator<T> {/*** 從事件獲取事件的時間戳*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 周期獲取事件的時間戳*/void onPeriodicEmit(WatermarkOutput output); }

使用 WatermarkStrategy 的樣例,如下代碼。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> input = env.fromElements("data");// 使用 WatermarkStrategy 設置 Watermark 類型input.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)));

2.2 Watermark 的產生

Watermark 是算子 TimestampsAndWatermarksOperator 產生的,WatermarkStrategy 相當于 UDFFunction(封裝于TimestampsAndWatermarksOperator 內部)。processElement 方法實現事件產生 Watermark,processWatermark 方法阻斷上游傳過來的 Watermark,onProcessingTime 方法實現周期產生 Watermark。

public class TimestampsAndWatermarksOperator<T>extends AbstractStreamOperator<T>implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { // 省略...@Overridepublic void processElement(final StreamRecord<T> element) throws Exception {final T event = element.getValue();final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);element.setTimestamp(newTimestamp);output.collect(element);// 事件產生 WatermarkwatermarkGenerator.onEvent(event, newTimestamp, wmOutput);}// 阻斷上游傳過來的 watermark@Overridepublic void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif (mark.getTimestamp() == Long.MAX_VALUE) {wmOutput.emitWatermark(Watermark.MAX_WATERMARK);}}@Overridepublic void onProcessingTime(long timestamp) throws Exception {// 采用定時器, 周期產生 WatermarkwatermarkGenerator.onPeriodicEmit(wmOutput);final long now = getProcessingTimeService().getCurrentProcessingTime();// 更新定時器getProcessingTimeService().registerTimer(now + watermarkInterval, this);} // 省略... }

(1)Watermark 周期產生

public class TimePeriodicWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// don't need to do anything because we work on processing time}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));} }結合算子 TimestampsAndWatermarksOperator 和 TimePeriodicWatermarkGenerator,分析 Watermark 的產生流程。如下圖所示,橫軸表示 processing time,圓形表示事件,圓形中的時間 t 表示事件時間,圓形落在橫軸表示事件在算子中的處理,其中 Watermark 的產生周期為 60s 和允許延遲時間為 10s。以第一個周期 [0,60) 為例,獲取事件中的最大事件時間 max,向下游發送 watermark(最大事件時間 - 允許延遲時間 - 1)。

(2)Watermark 事件產生

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {if (event.hasWatermarkMarker()) {output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// don't need to do anything because we emit in reaction to events above} }

3 Watermark 的傳遞

Watermark 的傳遞方式是廣播,即廣播方式發送到下游。Watermark 與業務數據一樣,無差別地傳遞下去。

例子:多并發的場景下,Watermark 是 source task 產生,經過 keyby 分組后觸發窗口計算。 說明:① Watermark 要單調遞增。② 如果算子有多個上游(廣播)即輸入多個 Watermark(T),則該算子取最小 Watermark 即 min(Watermark(T1), Watermark(T2))。

從 WindowOperator 源碼分析窗口是如何傳遞 Watermark。 首先分析 WindowOperator 類圖,可知 WindowOperator 間接繼承AbstractStreamOperator,而 AbstractStreamOperator 實現了接口 Input 的 processWatermark 方法、接口 TwoInputStreamOperator 的 processWatermark1 方法 和 processWatermark2 方法。

接著分析一下 AbstractStreamOperator 實現的 processWatermark 、processWatermark1 和 processWatermark2。

// 省略 ....public void processWatermark(Watermark mark) throws Exception {if (timeServiceManager != null) {timeServiceManager.advanceWatermark(mark);}// 發送 watermarkoutput.emitWatermark(mark);}/*** 2個上游的watermark* 計算最小watermark, 并設置為當前算子的watermark*/public void processWatermark1(Watermark mark) throws Exception {input1Watermark = mark.getTimestamp();long newMin = Math.min(input1Watermark, input2Watermark);if (newMin > combinedWatermark) {combinedWatermark = newMin;processWatermark(new Watermark(combinedWatermark));}}/*** 2個上游的watermark* 計算最小watermark, 并設置為當前算子的watermark*/public void processWatermark2(Watermark mark) throws Exception {input2Watermark = mark.getTimestamp();long newMin = Math.min(input1Watermark, input2Watermark);if (newMin > combinedWatermark) {combinedWatermark = newMin;processWatermark(new Watermark(combinedWatermark));}} // 省略 ....

總結

以上是生活随笔為你收集整理的flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 实拍女处破www免费看 | 看黄色一级大片 | 国产中文在线 | 亚洲欧美日韩精品久久 | 91极品蜜桃臀 | 亚州男人的天堂 | 天天操导航 | 黄色免费一级 | 福利小视频在线 | 麻豆视频播放 | 日本一区二区在线不卡 | 亚洲国产精品久久 | 亚洲国产精品久久久久久 | 91精品国产高清一区二区三蜜臀 | v8888av| 天堂网中文在线观看 | 97香蕉视频 | 国产亚洲午夜 | 午夜免费一区二区 | 人妖videosex高潮另类 | 国产第9页| 中出在线观看 | 久射网 | 欧美在线播放 | 亚洲伊人色 | 亚洲精品高潮 | 欧美丰满美乳xxⅹ高潮www | 少妇高潮惨叫久久久久久 | 色播放| 日韩欧美成人一区二区三区 | 1级黄色大片儿 | 国产一级大片 | 久久精品视频日本 | 性色一区 | 中文字幕不卡av | 夜夜天天 | 亚洲性生活片 | 无码免费一区二区三区 | 国产无毛片 | 欧美日韩在线观看视频 | 黄色网址多少 | 欧美三级小视频 | 青青草色视频 | 男同av在线观看一区二区三区 | 熟睡人妻被讨厌的公侵犯 | 国产激情四射 | 91av在线免费视频 | 尤物精品视频在线观看 | 无遮挡又爽又刺激的视频 | jizz91| 一区二区三区日韩 | 欧美成人免费在线 | 亚洲欧洲日本精品 | 精品人妻无码一区二区色欲产成人 | 自拍偷拍在线视频 | 久草热在线视频 | 国产人人看 | 精品国产视频在线 | 亚洲成人一区 | 校园春色亚洲 | 日韩av在线看免费观看 | 18禁裸乳无遮挡啪啪无码免费 | 国模精品一区二区三区 | 国产成人高清视频 | 成人h动漫在线 | 夜夜爽夜夜叫夜夜高潮漏水 | a级黄毛片 | 国产精品三区在线观看 | 久久精品视频国产 | 激情在线观看视频 | 亚洲精品中字 | 亚洲一区二区精品在线观看 | 国产精品久久AV无码 | 黄色网占| 国产经典三级在线 | 日韩欧美精品久久 | 久草成人在线 | 丰满人妻在公车被猛烈进入电影 | 国产精品高潮呻吟久久aⅴ码 | 国产精品sm调教免费专区 | 91九色中文| 久久99精品久久久久婷婷 | 影音先锋成人资源网 | 国产精品成人免费精品自在线观看 | 夜色一区二区三区 | 91在线免费观看网站 | 99精品久久久久 | 亚洲欧美中文日韩在线v日本 | 色播欧美| 精品视频久久久久久 | 午夜中出 | 日韩成人av在线 | 小黄网站在线观看 | 高清中文字幕在线a片 | 男人天堂成人网 | 麻豆91精品| 视频区小说区 | 成人在线影片 | 国产极品91|