Flink的时间语义和Watermark
1 時間語義
?? 數據遲到的概念是:數據先產生,但是處理的時候滯后了
?? 在Flink的流式處理中,會涉及到時間的不同概念,如下圖所示:
?? Event Time:是事件創建的時間。它通常由事件中的時間戳描述,例如采集的日志數據中,每一條日志都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。
?? Ingestion Time:是數據進入Flink的時間。
?? Processing Time:是每一個執行基于時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。
?? 在Flink的流式處理中,絕大部分的業務都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。引入EventTime的時間屬性如下:
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer[MyEvent](topic, schema, props))stream.keyBy( _.getUser ).timeWindow(Time.hours(1)).reduce( (a, b) => a.add(b) ).addSink(...)?? 設置了EventTime后后面處理底層會判斷
?? 注意:設置了事件時間,但是并不知道事件時間,Event Time 的使用一定要指定數據源中的時間戳,通過assignTimestampsAndWatermarks指定,時間戳要是ms單位。
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter())val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.filter( _.severity == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>)withTimestampsAndWatermarks.keyBy( _.getGroup ).timeWindow(Time.seconds(10)).reduce( (a, b) => a.add(b) ).addSink(...)?? 對于排序好的數據,不需要延遲觸發,可以只指定時間戳就行了
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(_.timestamp)?? 對于亂序數據調用 assignTimestampAndWatermarks 方法,傳入一個 BoundedOutOfOrdernessTimestampExtractor,就可以指定 watermark
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[WC](Time.milliseconds(1000)){override def extractTimestamp(element: WC): Long = {element.timestamp * 1000}}2 WaterMark
2.1 什么是WaterMark
?? 我們的數據從采集經過kafka,etl等操作要耗時的,再到流經source,到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由于網絡、分布式等原因,導致亂序的產生。
?? 遲到數據是因為有延遲,簡單的想法就多等一下。不要5秒的事件到了就關閉窗口,多等一會。我們要考慮的是當前事件的時間進展到底要按照什么時間算,也就是說假設現在5秒的窗口要關閉,設置延遲為2秒,那么5秒的數據來了就多等2秒,5秒的事件來了就相當于還沒有進展到5秒,是進展到了5-2=3秒,也就是時間才進展到3秒。按照這種多等2秒的方式的話要等到時間戳是7的數據來了之后7-2=5才關閉5秒的窗口。這就提出了Watermark
?? 亂序,其實就是指Flink接收到的事件的先后順序不是嚴格按照事件的Event Time順序排列的。一旦出現亂序,如果只根據eventTime決定window的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了,這個特別的機制,就是Watermark。
?? Watermark可以從以下幾個方面理解:①Watermark是一種衡量Event Time進展的機制。②Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結合window來實現。③數據流中的Watermark用于表示timestamp小于Watermark的數據,都已經到達了,因此,window的執行也是由Watermark觸發的。④Watermark可以理解成一個延遲觸發機制,我們可以設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,然后認定eventTime小于maxEventTime - t的所有數據都已經到達,如果有窗口的停止時間等于maxEventTime – t,那么這個窗口被觸發執行
?? Watermark延遲時間的設置一般根據數據的亂序情況定義,通常設置成最大亂序程度
2.2 Watermark傳遞
?? 真正的Watermark其實就是一條特殊的記錄,可以認為是插入數據流里面的一個特殊數據,Watermark可以理解為是一個有時間戳的特殊數據結構,就和數據一樣一條一條來,后面處理數據如果是正常數據就正常處理,如果是Watermark就按照對于時間的操作該關閉窗口就關閉窗口。
?? Watermark必須單調遞增,既然表示當前事件時間的進展,時間只能朝前不停的推進,另外總和當前數據的時間戳相關,數據的時間戳就應該是當前的事件時間。
?? 當Flink接收到數據時,會按照一定的規則去生成Watermark。Watermark要求單調遞增的話就選取所有當前已經來的數據里面最大的時間戳作為當前的事件時間,要多等一會的話在當前最大的時間戳基礎上再減去一個延遲時間就可以了,即maxEventTime - 延遲時長。所以Watermark是基于數據攜帶的時間戳生成的,如果Watermark比當前未觸發的窗口的停止時間要晚,那么就會觸發相應窗口的執行,如果運行過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠都不被觸發。
?? 有序流的Watermarker(最大延遲時間為0)如下圖所示:
?? 亂序流的Watermarker(最大延遲時間為4)如下圖所示:
?? 上圖中,采用周期性插入Watermark的生成策略,默認每200ms系統插入Watermark。我們設置的允許最大延遲到達時間為4s,當系統要插入第一個Watermark時查看此時數據中的最大事件時間為15,所以插入的Watermark是11s。過了200ms后到了第二次插入watermark的時候,此時數據中的最大事件時間為22,所以插入Watermark是18s。果我們的窗口1是1s-10s,窗口2是10s-20s,那么Watermarker為11到達之后需要觸發窗口1。一旦觸發以當前時刻為準在窗口范圍內的所有所有數據都會收入窗中。
2.3 Watermark的傳遞
?? Watermark的傳遞如上圖所示。
?? Flink 的傳遞策略基本上遵循三點:①watermark 會以廣播的形式在算子之間進行傳播。并行任務沒有數據交互不考慮,只要考慮上游有多少個任務給他發數據,下游要發送多少個數據到別的任務。②如果在程序里面收到了一個 Long.MAX_VALUE這個數值的 watermark,就表示對應的那一條流的一個部分不會再有數據發過來了,它相當于就是一個終止的標志。③單流輸入取其大,多流輸入取小。不同的上游任務發來的Watermark不一樣,不能按照上游所有的Watermark中最大的Watermark來判定當前的事件時間,而是應該按照最小的那個來判定,因為Watermark代表的數據是他之前的數據都到期了,如果只接收到一個分區的Watermark是29表示這個分區29之前數據已經到齊了,但是不能保證當前任務不在接收29之前的數據,因為之前別的Watermark可能還沒進展到29,所以應該按照最小的。
?? 底層實現:上游有2個分區就會對每一個分區都去創建一個分區的Watermark(PARTITION Watermark),分別是29,14所以當前任務的事件時間是14,那么下游的子任務廣播出去也是14,14之前的數據都到齊了。接下來一個分區來了一個新的Watermark是17,相當于這個分區的時間進展為17之前的都到齊,那么首先更新當前的Watermark,然后觀察現在所有分區的Watermark最小值是否改變,如果改變那么事件時間就朝前進展,事件時間更新就往下游廣播。
2.4 WaterMark使用
?? watermark對于有序數據,最常見的引用方式如下:
dataStream.assignTimestampsAndWatermarks(_.timestamp)?? 升序數據不用管Watermark,本身數據來就帶有時間戳
?? watermark對于亂序數據,最常見的引用方式如下:
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {override def extractTimestamp(element: Element): Long = element.ds})?? Watermark就是在assignTimestampsAndWatermarks里面定義出來的,BoundedOutOfOrdernessTimestampExtractor 是Flink內置提供的允許亂序最大延時的watermark生成方式,只需要重寫其extractTimestamp方法?,F在kafka源也支持直接生成Watermark,所以etl的時候可以把Watermark也產生。不過我們一般是在Flink把數據讀進來做了轉換之后馬上分配一個Watermark。Watermark要保證正確性,延遲時間一般定義成最大的亂序程度(從數據里面提煉出來的參數)。同個分區數據可能會亂序,Watermark不會亂序(單調遞增,取最大的時間戳減去延遲時間)
2.5 自定義WaterMark
?? watermark的生成策略有兩種:一種是AssignerWithPeriodicWatermarks周期性生成(隔一段時間系統自動插入),另外一種是AssignerWithPunctuatedWatermarks根據特定標記生成。這兩個接口都是Flink暴露了TimestampAssigner接口的子類型。實際生成中大量密集數據比較多,稀疏較少,所以一般使用周期性AssignerWithPeriodicWatermarks方式。
?? 周期性的生成watermark系統會周期性的將watermark插入到流中。默認周期是200毫秒??梢允褂肊xecutionConfig.setAutoWatermarkInterval(watermarkInterval)方法進行設置。每隔watermarkInterval,Flink會調用AssignerWithPeriodicWatermarks的getCurrentWatermark(watermarkInterval)方法。如果方法返回的watermark大于之前的watermark,新的watermark會被插入到流中。這個檢查保證了watermark是單調遞增的。如果方法返回的時間戳小于等于之前watermark,則不會產生新的watermark。
?? 自定義一個周期性的時間戳抽取:
class MyPeriodicAssigner extends AssignerWithPeriodicWatermarks[Element] {val bound: Long = 60 * 1000 // 延時為1分鐘var maxTs: Long = Long.MinValue // 觀察到的最大時間戳override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}override def extractTimestamp(r: Element, previousTS: Long) = {maxTs = maxTs.max(r.timestamp)r.timestamp} }?? 間斷式地生成watermark。和周期性生成的方式不同,這種方式不是固定時間的,而是可以根據需要對每條數據進行篩選和處理,自定義一個間斷式地生成watermar:
class MyPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Element] {val bound: Long = 60 * 1000override def checkAndGetNextWatermark(r: Element, extractedTS: Long): Watermark = {if (r.status == "sucess") {new Watermark(extractedTS - bound)} else {null}}override def extractTimestamp(r: Element, previousTS: Long): Long = {r.timestamp} }總結
以上是生活随笔為你收集整理的Flink的时间语义和Watermark的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Jupyter Notebook安装 n
- 下一篇: java 钩子线程_java-钩子线程