Flink的Window
1 Window概述
? ? streaming流式計(jì)算是一種被設(shè)計(jì)用于處理無(wú)限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無(wú)限數(shù)據(jù)集是指一種不斷增長(zhǎng)的本質(zhì)上無(wú)限的數(shù)據(jù)集,而window是一種切割無(wú)限數(shù)據(jù)為有限塊進(jìn)行處理的手段。
? ? Window是無(wú)限數(shù)據(jù)流處理的核心,Window將一個(gè)無(wú)限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計(jì)算操作。
? ? 注意:window一般在keyBy(KeyedStram)后。如果實(shí)在DataStream后的話是windowAll(不建議使用,會(huì)將所有數(shù)據(jù)匯總到一個(gè)分區(qū)計(jì)算)
? ? window assigner確定了數(shù)據(jù)屬于哪個(gè)窗口丟到正確的桶里面,還沒(méi)有做計(jì)算。真正做計(jì)算是在window assigner后面的window function。下面兩步和起來(lái)才是一個(gè)完整的窗口操作
.window(<window assigner>).aggregate(new AverageAggregate)2 Window的類型
? ?Window可以分成兩類:TimeWindow(按照時(shí)間生成Window)和CountWindow(按照指定的數(shù)據(jù)條數(shù)生成一個(gè)Window,與時(shí)間無(wú)關(guān))。
2.1 TimeWindow
? ?對(duì)于TimeWindow,可以根據(jù)窗口實(shí)現(xiàn)原理的不同分成三類:滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)和會(huì)話窗口(Session Window)。
2.1.1 Tumbling Window
? ?將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切片,滾動(dòng)窗口分配器將每個(gè)元素分配到一個(gè)指定窗口大小的窗口中,滾動(dòng)窗口有一個(gè)固定的大小,并且不會(huì)出現(xiàn)重疊。特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,沒(méi)有重疊。適合做BI統(tǒng)計(jì)等(做每個(gè)時(shí)間段的聚合計(jì)算)
2.1.2 Sliding Window
? ?滑動(dòng)窗口由固定的窗口長(zhǎng)度和滑動(dòng)間隔組成。滑動(dòng)窗口分配器將元素分配到固定長(zhǎng)度的窗口中,如果滑動(dòng)參數(shù)小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會(huì)被分配到多個(gè)窗口中。適用場(chǎng)景:對(duì)最近一個(gè)時(shí)間段內(nèi)的統(tǒng)計(jì)(求某接口最近5min的失敗率來(lái)決定是否要報(bào)警)。特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,可以有重疊。
2.1.3 Session Window
? ?由一系列事件組合一個(gè)指定時(shí)間長(zhǎng)度的timeout間隙組成,類似于web應(yīng)用的session,也就是一段時(shí)間沒(méi)有接收到新數(shù)據(jù)就會(huì)生成新的窗口。不會(huì)有重疊和固定的開始時(shí)間和結(jié)束時(shí)間的情況,相反,當(dāng)它在一個(gè)固定的時(shí)間周期內(nèi)不再收到元素,即非活動(dòng)間隔產(chǎn)生,那個(gè)這個(gè)窗口就會(huì)關(guān)閉。特點(diǎn):時(shí)間無(wú)對(duì)齊。
3 WindowAPI
3.0 窗口分配器
??窗口分配器 即 window() 方法,我們可以用 .window() 來(lái)定義一個(gè)窗口,然后基于這個(gè) window 去做一些聚合或者其它處理操作。注意 window () 方法必須在 keyBy 之后才能用。Flink 提供了更加簡(jiǎn)單的 .timeWindow 和 .countWindow 方法,用于定義時(shí)間窗口和計(jì)數(shù)窗口。
??window() 方法接收的輸入?yún)?shù)是一個(gè) WindowAssigner(窗口分配器),WindowAssigner 負(fù)責(zé)將每條輸入的數(shù)據(jù)分發(fā)到正確的 window 中。Flink 提供了通用的 WindowAssigner:①滾動(dòng)時(shí)間窗口( .timeWindow(Time.seconds(5)))②滑動(dòng)時(shí)間窗口(.timeWindow(Time.seconds(15), Time.seconds(5)))③會(huì)話窗口( .window(EventTimeSessionWindows.withGap(Time.minutes(1)))④滾動(dòng)計(jì)數(shù)窗口(.countWindow(5))⑤滑動(dòng)計(jì)數(shù)窗口(.countWindow(20,10))
3.1 TimeWindow
? ?TimeWindow將指定時(shí)間范圍內(nèi)的所有數(shù)據(jù)組成一個(gè)window,一次對(duì)一個(gè)window里面的所有數(shù)據(jù)進(jìn)行計(jì)算。默認(rèn)的時(shí)間窗口根據(jù)Processing Time 進(jìn)行窗口的劃分,將Flink獲取到的數(shù)據(jù)根據(jù)進(jìn)入Flink的時(shí)間劃分到不同的窗口中。
? ?(1)滾動(dòng)窗口
? ?時(shí)間間隔參數(shù)可以通過(guò)Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等指定。
val timeWindowStream = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))? ?(2)滑動(dòng)窗口
? ?在傳參數(shù)時(shí)需要傳入兩個(gè)參數(shù),一個(gè)是window_size,一個(gè)是sliding_size。每5s就計(jì)算輸出結(jié)果一次,每一次計(jì)算的window范圍是1分鐘內(nèi)的所有元素如下:
val timeWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15), Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))? ?或者
val timeWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).window(SlidingEventTimeWindows.of(Time.minutes(1),Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))3.2 CountWindow
? ?CountWindow根據(jù)窗口中相同key元素的數(shù)量來(lái)觸發(fā)執(zhí)行,執(zhí)行時(shí)只計(jì)算元素?cái)?shù)量達(dá)到窗口大小的key對(duì)應(yīng)的結(jié)果。
? ?注意:CountWindow的window_size指的是相同Key的元素的個(gè)數(shù),不是輸入的所有元素的總數(shù)。
? ?(1)滾動(dòng)窗口
? ?指定窗口大小即可,當(dāng)元素?cái)?shù)量達(dá)到窗口大小時(shí),就會(huì)觸發(fā)窗口的執(zhí)行。
val countWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).countWindow(5).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))? ?(2)滑動(dòng)窗口
? ?在傳參數(shù)時(shí)需要傳入兩個(gè)參數(shù),一個(gè)是window_size,一個(gè)是sliding_size。每當(dāng)某一個(gè)key的個(gè)數(shù)達(dá)到10的時(shí)候,觸發(fā)計(jì)算,計(jì)算最近該key最近20個(gè)元素的內(nèi)容如下
val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0).countWindow(20,10).sum(1)3.3 window function
? ?window function 定義了要對(duì)窗口中收集的數(shù)據(jù)做的計(jì)算操作,主要可以分為兩類:①增量聚合函數(shù):每條數(shù)據(jù)到來(lái)就進(jìn)行計(jì)算,保持一個(gè)簡(jiǎn)單的狀態(tài)。典型的增量聚合函數(shù)有ReduceFunction, AggregateFunction。②全窗口函數(shù):先把窗口所有數(shù)據(jù)收集起來(lái),等到計(jì)算的時(shí)候會(huì)遍歷所有數(shù)據(jù)。ProcessWindowFunction就是一個(gè)全窗口函數(shù)
? ?(1)ReduceFunction
? ?下面的示例的展示了如何將遞增的ReduceFunction與ProcessWindowFunction結(jié)合使用,以返回窗口中的最小事件以及窗口的開始時(shí)間。
val input: DataStream[SensorReading] = ...input.keyBy(<key selector>).timeWindow(<duration>).reduce((r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },( key: String,context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,minReadings: Iterable[SensorReading],out: Collector[(Long, SensorReading)] ) =>{val min = minReadings.iterator.next()out.collect((context.window.getStart, min))})? ?(2)AggregateFunction
? ?下面的示例展示了如何將遞增的AggregateFunction與ProcessWindowFunction結(jié)合起來(lái)計(jì)算平均值,并同時(shí)發(fā)出鍵和窗口以及平均值。
val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).timeWindow(<duration>).aggregate(new AverageAggregate(), new MyProcessWindowFunction())// Function definitions/*** The accumulator is used to keep a running sum and a count. The [getResult] method* computes the average.*/ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {override def createAccumulator() = (0L, 0L)override def add(value: (String, Long), accumulator: (Long, Long)) =(accumulator._1 + value._2, accumulator._2 + 1L)override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2override def merge(a: (Long, Long), b: (Long, Long)) =(a._1 + b._1, a._2 + b._2) }class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {val average = averages.iterator.next()out.collect((key, average))} }? ?(3)FoldFunction
? ?下面的示例顯示如何將遞增的FoldFunction與ProcessWindowFunction結(jié)合使用,以提取窗口中的事件數(shù)量,并返回窗口的鍵和結(jié)束時(shí)間。
val input: DataStream[SensorReading] = ...input.keyBy(<key selector>).timeWindow(<duration>).fold (("", 0L, 0),(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },( key: String,window: TimeWindow,counts: Iterable[(String, Long, Int)],out: Collector[(String, Long, Int)] ) =>{val count = counts.iterator.next()out.collect((key, window.getEnd, count._3))})? ?(4)ProcessWindowFunction
? ?除了訪問(wèn)鍵控狀態(tài)(任何富函數(shù)都可以),ProcessWindowFunction還可以使用范圍限定在函數(shù)當(dāng)前處理的窗口的鍵控狀態(tài)。
val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction())3.4 trigger
? ?觸發(fā)器trigger定義 window 什么時(shí)候關(guān)閉,觸發(fā)計(jì)算并輸出結(jié)果。觸發(fā)器確定窗口(由窗口分配者形成)何時(shí)準(zhǔn)備好由窗口函數(shù)處理。每個(gè)WindowAssigner都有一個(gè)默認(rèn)的觸發(fā)器。如果默認(rèn)觸發(fā)器不符合需要,可以自定義觸發(fā)器。
? ?觸發(fā)器接口有五個(gè)方法,允許觸發(fā)器對(duì)不同的事件作出反應(yīng):
? ?①onElement()方法對(duì)添加到窗口的每個(gè)元素調(diào)用。
? ?②當(dāng)一個(gè)已注冊(cè)的事件時(shí)間計(jì)時(shí)器觸發(fā)時(shí),將調(diào)用onEventTime()方法。
? ?③當(dāng)觸發(fā)注冊(cè)的處理時(shí)間計(jì)時(shí)器時(shí),將調(diào)用onProcessingTime()方法。
? ?④onMerge()方法與有狀態(tài)觸發(fā)器相關(guān),當(dāng)兩個(gè)觸發(fā)器對(duì)應(yīng)的窗口合并時(shí),將它們的狀態(tài)合并起來(lái),例如在使用會(huì)話窗口時(shí)。
? ?⑤clear()方法在刪除相應(yīng)窗口時(shí)執(zhí)行所需的任何操作。
? ?關(guān)于上述方法,有兩點(diǎn)需要注意:
? ?①前三個(gè)函數(shù)通過(guò)返回一個(gè)TriggerResult來(lái)決定如何處理它們的調(diào)用事件。動(dòng)作可以是下列動(dòng)作之一:CONTINUE:什么也不做;FIRE:觸發(fā)計(jì)算;PURGE:清除窗口中的元素;FIRE_AND_PURGE:觸發(fā)計(jì)算并隨后清除窗口中的元素。
? ?②這些方法中的任何一個(gè)都可以用于為將來(lái)的操作注冊(cè)處理或事件時(shí)間計(jì)時(shí)器。
3.5 evitor
? ?移除器evitor可以窗口觸發(fā)前或觸發(fā)后,定義移除某些數(shù)據(jù)的邏輯。一般和global window一起用,要自定義trigger和evitor,因?yàn)榘阉械臄?shù)據(jù)都存下來(lái)了,不用的數(shù)據(jù)丟棄。evitor接口有2個(gè)方法如下
/*** Optionally evicts elements. Called before windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/ void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** Optionally evicts elements. Called after windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);??evictBefore()包含要在窗口函數(shù)之前應(yīng)用的驅(qū)逐邏輯,而evictAfter()包含要在窗口函數(shù)之后應(yīng)用的驅(qū)逐邏輯。
? ?Flink提供了三個(gè)預(yù)先實(shí)現(xiàn)的驅(qū)逐器:①CountEvictor:保持窗口中元素的用戶指定數(shù)量,并丟棄從窗口緩沖區(qū)開始的剩余元素。②DeltaEvictor:獲取delta函數(shù)和閾值,計(jì)算窗口緩沖區(qū)中最后一個(gè)元素與每個(gè)剩余元素之間的增量,并刪除增量大于或等于閾值的元素。③TimeEvictor:以毫秒為單位的時(shí)間間隔作為參數(shù),對(duì)于給定的窗口,它在元素中查找最大時(shí)間戳max_ts,并刪除所有時(shí)間戳小于max_ts - interval的元素。
3.6 allowedLateness
? ?允許處理遲到的數(shù)據(jù),分布式計(jì)算數(shù)據(jù)可能是亂序的,開了時(shí)間窗口之后,可能屬于他的數(shù)據(jù)姍姍來(lái)遲。假設(shè)正在是10點(diǎn)關(guān)閉窗口,允許1分鐘的遲到數(shù)據(jù),到10點(diǎn)不關(guān)但是要觸發(fā)一次計(jì)算輸出一個(gè)計(jì)算結(jié)果,后面一分鐘再來(lái)的數(shù)據(jù)可以在這個(gè)基礎(chǔ)上在做疊加觸發(fā)一次計(jì)算再輸出一個(gè)結(jié)果。也就是先輸出結(jié)果后面更新。
? ?注意:這些處理遲到數(shù)據(jù)的必須在數(shù)據(jù)自帶的時(shí)間處理才有意義
val input: DataStream[T] = ...input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).<windowed transformation>(<window function>)3.7 sideOutputLateData和getSideOutput
? ?sideOutputLateData將遲到的數(shù)據(jù)放入側(cè)輸出流,getSideOutput獲取側(cè)輸出流
val lateOutputTag = OutputTag[T]("late-data")val input: DataStream[T] = ...val result = input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).sideOutputLateData(lateOutputTag).<windowed transformation>(<window function>)val lateStream = result.getSideOutput(lateOutputTag)3.8 window API 總覽
Keyed Windowsstream.keyBy(...) <- keyed versus non-keyed windows.window(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag" Non-Keyed Windowsstream.windowAll(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"總結(jié)
以上是生活随笔為你收集整理的Flink的Window的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: SQL行列问题
- 下一篇: led拼接屏报价_液晶拼接屏与led显示