flink自定义trigger详解
適用的場(chǎng)景解釋:
[1]中有句話是這樣的:
"其實(shí),我們要實(shí)現(xiàn)基于事件時(shí)間的窗口隨意輸出,比如1000個(gè)元素觸發(fā)一次輸出,那么我們就可以通過修改這個(gè)觸發(fā)器來實(shí)現(xiàn)。"
這句話的意思是,默認(rèn)的自帶的trigger一般是基于EventTime的。
那么這1000 個(gè)元素可能跨度是一小時(shí),也可能跨度是兩小時(shí),對(duì)吧
但是顯然默認(rèn)的Trigger只能是盯著EventTime(時(shí)間戳)來決定是否觸發(fā)計(jì)算,并不能根據(jù)元素個(gè)數(shù)進(jìn)行觸發(fā)。
也就是說,默認(rèn)的Trigger盯著的跨度是"時(shí)間差"。而不是"個(gè)數(shù)差"
講人話就是:
①例如Flink的Trigger默認(rèn)每隔一天輸出統(tǒng)計(jì)數(shù)據(jù),
②但是不支持默認(rèn)每隔一千個(gè)訂單輸出統(tǒng)計(jì)數(shù)據(jù)。
但是注意這里的一千個(gè)統(tǒng)計(jì)數(shù)據(jù)可能超過一天,甚至超過一周,耗時(shí)可能不固定。
因?yàn)槟阆氚?#xff0c;代碼都是要把邏輯寫死的對(duì)吧?
一千個(gè)訂單可能一開始耗時(shí)一周,后來耗時(shí)一個(gè)月。那程序要怎么根據(jù)變化的時(shí)間來鎖定一千個(gè)訂單觸發(fā)一次?
顯然做不到,這個(gè)時(shí)候我們就希望鎖定"個(gè)數(shù)間隔"、“個(gè)數(shù)差”,這個(gè)時(shí)候就需要自定義Trigger
?
官方文檔說明:
?
下面是官方文檔[4]中Triggers這一節(jié)的內(nèi)容概括
.
| 需要override的函數(shù) | 函數(shù)作用 |
| onElement() | 數(shù)據(jù)(element)被加入window的時(shí)候會(huì)調(diào)用該函數(shù) |
| onEventTime()? | 當(dāng)一個(gè)注冊(cè)的Event-Time定時(shí)器觸發(fā) |
| onProcessingTime()? | 當(dāng)一個(gè)注冊(cè)的Processing-Time定時(shí)器觸發(fā) |
| onMerge() | 與有狀態(tài)觸發(fā)器(stateful triggers)和當(dāng)兩個(gè)窗口整合的時(shí)候整合(merge)狀態(tài)相關(guān)。 例如使用session windows |
| clear() | window清理數(shù)據(jù)需要 |
?
前面三個(gè)用來設(shè)定調(diào)用事件(invocation event)以后如何操作,
所以這些"操作"必須是一個(gè)TriggerResult
也就是說,前三個(gè)函數(shù)返回的TriggerResult可以是下面幾種選擇:
| 返回的TriggerResult | 作用 |
| CONTINUE | 什么都不做 |
| FIRE | 觸發(fā)計(jì)算 |
| PURGE | 刪除窗口中的所有數(shù)據(jù) |
| FIRE_AND_PURG | 觸發(fā)計(jì)算后刪除窗口中所有數(shù)據(jù) |
然后是Fire and Purge這一節(jié)的內(nèi)容:
觸發(fā)計(jì)算時(shí),返回的一定是FIRE或者FIRE_AND_PURG(這個(gè)話僅僅是來自官方文檔的翻譯,其實(shí)Intellij提示的選項(xiàng)并不僅僅是上面幾個(gè))
?
?
具體示范代碼參考[5]即可
private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);/*** 窗口最大數(shù)據(jù)量*/private int maxCount;/*** event time / process time*/private TimeCharacteristic timeType;/*** 用于儲(chǔ)存窗口當(dāng)前數(shù)據(jù)量的狀態(tài)對(duì)象*/private ReducingStateDescriptor<Long> countStateDescriptor =new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {this.maxCount = maxCount;this.timeType = timeType;}private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {clear(window, ctx);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.add(1L);if (countState.get() >= maxCount) {LOG.info("fire with count: " + countState.get());return fireAndPurge(window, ctx);}if (timestamp >= window.getEnd()) {LOG.info("fire with tiem: " + timestamp);return fireAndPurge(window, ctx);} else {return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.ProcessingTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with process tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.EventTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with event tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.clear();}/*** 計(jì)數(shù)方法*/class Sum implements ReduceFunction<Long> {@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}} }?
?
Reference:
[1]flink自定義trigger-實(shí)現(xiàn)窗口隨意輸出
[2]Flink 自定義Trigger
[3]Flink 自定義trigger
[4]flink官方文檔-窗口
[5]Flink 自定義觸發(fā)器
總結(jié)
以上是生活随笔為你收集整理的flink自定义trigger详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 家装全屋wifi方案对比与推荐 全屋wi
- 下一篇: intellij中java文件都是灰色