日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink自定义trigger详解

發(fā)布時(shí)間:2023/12/31 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink自定义trigger详解 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

適用的場(chǎng)景解釋:

[1]中有句話是這樣的:

"其實(shí),我們要實(shí)現(xiàn)基于事件時(shí)間的窗口隨意輸出,比如1000個(gè)元素觸發(fā)一次輸出,那么我們就可以通過修改這個(gè)觸發(fā)器來實(shí)現(xiàn)。"

這句話的意思是,默認(rèn)的自帶的trigger一般是基于EventTime的。

那么這1000 個(gè)元素可能跨度是一小時(shí),也可能跨度是兩小時(shí),對(duì)吧

但是顯然默認(rèn)的Trigger只能是盯著EventTime(時(shí)間戳)來決定是否觸發(fā)計(jì)算,并不能根據(jù)元素個(gè)數(shù)進(jìn)行觸發(fā)。

也就是說,默認(rèn)的Trigger盯著的跨度是"時(shí)間差"。而不是"個(gè)數(shù)差"

講人話就是:

①例如Flink的Trigger默認(rèn)每隔一天輸出統(tǒng)計(jì)數(shù)據(jù),

②但是不支持默認(rèn)每隔一千個(gè)訂單輸出統(tǒng)計(jì)數(shù)據(jù)。

但是注意這里的一千個(gè)統(tǒng)計(jì)數(shù)據(jù)可能超過一天,甚至超過一周,耗時(shí)可能不固定。

因?yàn)槟阆氚?#xff0c;代碼都是要把邏輯寫死的對(duì)吧?

一千個(gè)訂單可能一開始耗時(shí)一周,后來耗時(shí)一個(gè)月。那程序要怎么根據(jù)變化的時(shí)間來鎖定一千個(gè)訂單觸發(fā)一次?

顯然做不到,這個(gè)時(shí)候我們就希望鎖定"個(gè)數(shù)間隔"、“個(gè)數(shù)差”,這個(gè)時(shí)候就需要自定義Trigger

?

官方文檔說明:

?

下面是官方文檔[4]中Triggers這一節(jié)的內(nèi)容概括

.

需要override的函數(shù)函數(shù)作用
onElement()數(shù)據(jù)(element)被加入window的時(shí)候會(huì)調(diào)用該函數(shù)
onEventTime()?

當(dāng)一個(gè)注冊(cè)的Event-Time定時(shí)器觸發(fā)

onProcessingTime()?當(dāng)一個(gè)注冊(cè)的Processing-Time定時(shí)器觸發(fā)
onMerge()

與有狀態(tài)觸發(fā)器(stateful triggers)和當(dāng)兩個(gè)窗口整合的時(shí)候整合(merge)狀態(tài)相關(guān)。

例如使用session windows

clear()window清理數(shù)據(jù)需要

?

前面三個(gè)用來設(shè)定調(diào)用事件(invocation event)以后如何操作,

所以這些"操作"必須是一個(gè)TriggerResult

也就是說,前三個(gè)函數(shù)返回的TriggerResult可以是下面幾種選擇:

返回的TriggerResult作用
CONTINUE什么都不做
FIRE觸發(fā)計(jì)算
PURGE刪除窗口中的所有數(shù)據(jù)
FIRE_AND_PURG觸發(fā)計(jì)算后刪除窗口中所有數(shù)據(jù)

然后是Fire and Purge這一節(jié)的內(nèi)容:

觸發(fā)計(jì)算時(shí),返回的一定是FIRE或者FIRE_AND_PURG(這個(gè)話僅僅是來自官方文檔的翻譯,其實(shí)Intellij提示的選項(xiàng)并不僅僅是上面幾個(gè))

?

?

具體示范代碼參考[5]即可

private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);/*** 窗口最大數(shù)據(jù)量*/private int maxCount;/*** event time / process time*/private TimeCharacteristic timeType;/*** 用于儲(chǔ)存窗口當(dāng)前數(shù)據(jù)量的狀態(tài)對(duì)象*/private ReducingStateDescriptor<Long> countStateDescriptor =new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {this.maxCount = maxCount;this.timeType = timeType;}private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {clear(window, ctx);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.add(1L);if (countState.get() >= maxCount) {LOG.info("fire with count: " + countState.get());return fireAndPurge(window, ctx);}if (timestamp >= window.getEnd()) {LOG.info("fire with tiem: " + timestamp);return fireAndPurge(window, ctx);} else {return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.ProcessingTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with process tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.EventTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with event tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.clear();}/*** 計(jì)數(shù)方法*/class Sum implements ReduceFunction<Long> {@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}} }

?

?

Reference:

[1]flink自定義trigger-實(shí)現(xiàn)窗口隨意輸出

[2]Flink 自定義Trigger

[3]Flink 自定義trigger

[4]flink官方文檔-窗口

[5]Flink 自定義觸發(fā)器

總結(jié)

以上是生活随笔為你收集整理的flink自定义trigger详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。