日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink触发器Triggers

發(fā)布時間:2023/12/3 编程问答 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink触发器Triggers 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

觸發(fā)器(Triggers)

觸發(fā)器確定窗口(由窗口分配器形成)何時準(zhǔn)備好由窗口功能處理。每個WindowAssigner都帶有一個默認(rèn)觸發(fā)器。如果默認(rèn)觸發(fā)器不適合您的需求,則可以使用trigger(...)指定自定義觸發(fā)器。

trigger觸發(fā)器接口有五個方法允許trigger對不同的事件做出反應(yīng):

  • onElement()進(jìn)入窗口的每個元素都會調(diào)用該方法。

  • onEventTime()事件時間timer觸發(fā)的時候被調(diào)用。

  • onProcessingTime()處理時間timer觸發(fā)的時候會被調(diào)用。

  • onMerge()有狀態(tài)的觸發(fā)器相關(guān),并在它們相應(yīng)的窗口合并時合并兩個觸發(fā)器的狀態(tài),例如使用會話窗口。

  • clear()該方法主要是執(zhí)行窗口的刪除操作。

關(guān)于上述方法需要注意兩點:

1).前三方法決定著如何通過返回一個TriggerResult來操作輸入事件。

CONTINUE:什么都不做。

FIRE:觸發(fā)計算。

PURE:清除窗口的元素。

FIRE_AND_PURE:觸發(fā)計算和清除窗口元素。

2). 這些方法中的任何一個都可用于為將來的操作注冊處理或事件時間計時器

Fire和Purge

一旦觸發(fā)器確定窗口已準(zhǔn)備好進(jìn)行處理,它將觸發(fā),即返回FIRE或FIRE_AND_PURGE。這是窗口操作員發(fā)出當(dāng)前窗口結(jié)果的信號。給定一個帶有ProcessWindowFunction的窗口,所有元素都將傳遞給ProcessWindowFunction(可能在將它們傳遞給逐出者之后)。具有ReduceFunction,AggregateFunction或FoldFunction的Windows只會發(fā)出其急切的聚合結(jié)果。

當(dāng)觸發(fā)器觸發(fā)時,它可以是FIRE或FIRE_AND_PURGE。在FIRE保留窗口內(nèi)容的同時,FIRE_AND_PURGE刪除其內(nèi)容。默認(rèn)情況下,預(yù)實現(xiàn)的觸發(fā)器僅觸發(fā)FIRE,而不會清除窗口狀態(tài)。

注意??: 清除將僅刪除窗口的內(nèi)容,并將保留有關(guān)該窗口的任何潛在元信息以及任何觸發(fā)狀態(tài)。

默認(rèn)觸發(fā)器

WindowAssigner的默認(rèn)觸發(fā)器適用于許多用例。例如,所有事件時間窗口分配器都有一個EventTimeTrigger作為默認(rèn)觸發(fā)器。一旦WaterMark通過窗口的末端,該觸發(fā)器便會觸發(fā)。

注意??: GlobalWindow的默認(rèn)觸發(fā)器是NeverTrigger,它從不觸發(fā)。因此,在使用GlobalWindow時,您始終必須定義一個自定義觸發(fā)器。 通過使用trigger()指定觸發(fā)器,您將覆蓋WindowAssigner的默認(rèn)觸發(fā)器。例如,如果為TumblingEventTimeWindows指定CountTrigger, 則將不再基于時間進(jìn)度而是僅通過計數(shù)來獲取窗口觸發(fā)。現(xiàn)在,如果要基于時間和計數(shù)做出反應(yīng),則必須編寫自己的自定義觸發(fā)器。

內(nèi)置和自定義觸發(fā)器

Flink帶有一些內(nèi)置觸發(fā)器。

  • EventTimeTrigger基于事件時間和watermark機制來對窗口進(jìn)行觸發(fā)計算。

  • ProcessingTimeTrigger基于處理時間觸發(fā)。

  • CountTrigger窗口元素數(shù)超過預(yù)先給定的限制值的話會觸發(fā)計算。

  • PurgingTrigger作為其它trigger的參數(shù),將其轉(zhuǎn)化為一個purging觸發(fā)器。

如果需要實現(xiàn)自定義觸發(fā)器,則應(yīng)該實現(xiàn)Trigger類。請注意,API仍在不斷發(fā)展,并可能在Flink的未來版本中更改。

import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** <p/>* <li>title: DataStream 觸發(fā)器</li>* <li>@author: li.pan</li>* <li>Date: 2019/12/29 5:00 下午</li>* <li>Version: V1.0</li>* <li>Description: 自定義元素個數(shù)觸發(fā)器</li>*/ public class CustomProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private CustomProcessingTimeTrigger() {}private static int flag = 0;@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());// CONTINUE是代表不做輸出,也即是,此時我們想要實現(xiàn)比如100條輸出一次,// 而不是窗口結(jié)束再輸出就可以在這里實現(xiàn)。if(flag > 9){flag = 0;return TriggerResult.FIRE;}else{flag++;}System.out.println("onElement : "+element);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE_AND_PURGE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteProcessingTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "ProcessingTimeTrigger()";}/*** 創(chuàng)建一個自定義觸發(fā)器對象*/public static CustomProcessingTimeTrigger create() {return new CustomProcessingTimeTrigger();}}

?

?

總結(jié)

以上是生活随笔為你收集整理的Flink触发器Triggers的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。