flink如何设置以每天零点到第二天零点为区间的window进行计算
環(huán)境
flink1.11.2
JAVA
stream API
timewindow
背景
公司之前的指標(biāo)是以分鐘為單位的滾動(dòng)窗口進(jìn)行檢查,然后在查詢系統(tǒng)里查詢的時(shí)候,對(duì)該天所有的分鐘數(shù)據(jù)進(jìn)行聚合統(tǒng)計(jì)。
?當(dāng)前需要在flink中添加以天為單位的Job進(jìn)行額外指標(biāo)檢查。指標(biāo)出來之后和發(fā)現(xiàn)數(shù)據(jù)口徑不一致,flink中默認(rèn)是timeWindow按天進(jìn)行滾動(dòng)統(tǒng)計(jì)的數(shù)據(jù)是每天八點(diǎn)到第二天八點(diǎn)的數(shù)據(jù)。
導(dǎo)致統(tǒng)計(jì)指標(biāo)的含義對(duì)不上,沒有參考意義和進(jìn)行不同數(shù)據(jù)間的join。
解決方案
使用window配置自定義的窗口分隔TumblingEventTimeWindows對(duì)象(因?yàn)楝F(xiàn)在處理數(shù)據(jù)基本都使用的flink? eventTime作為數(shù)據(jù)時(shí)間進(jìn)行處理,所以例子中需要數(shù)據(jù)流的時(shí)間用的是eventtime, 使用processtime的話可以使用TumblingProcessTimeWindows處理,講道理應(yīng)該配置都一樣)
話不多說直接上代碼吧。
默認(rèn)情況8點(diǎn)->8點(diǎn)的時(shí)間統(tǒng)計(jì)的代碼:
// 原始數(shù)據(jù)流 DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));// 進(jìn)行數(shù)據(jù)清洗統(tǒng)計(jì)的邏輯 DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null).map(StreamTransformCommon::renameAppInfoName).filter(x -> x != null).keyBy("page").timeWindow(Time.days(1)) // 默認(rèn)情況下 以天為單位的滾動(dòng)窗口.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());每天0點(diǎn)->0點(diǎn)的時(shí)間窗口統(tǒng)計(jì)代碼(實(shí)際上可以舉一反三搞出任意想要的時(shí)間的規(guī)則):
// 原始數(shù)據(jù)流 DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));// 進(jìn)行數(shù)據(jù)清洗統(tǒng)計(jì)的邏輯 DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null).map(StreamTransformCommon::renameAppInfoName).filter(x -> x != null).keyBy("page").window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16))) // 改改參數(shù),就可以調(diào)整到自己想要的時(shí)間窗口統(tǒng)計(jì)規(guī)則.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());結(jié)果
大家可以在操作windowFunction的時(shí)候打印一下apply方法參數(shù)中的TimeWindow對(duì)象的起止時(shí)間驗(yàn)證一下。我這邊屢試不爽,問題解決了記錄一下這個(gè)過程。
總結(jié)
以上是生活随笔為你收集整理的flink如何设置以每天零点到第二天零点为区间的window进行计算的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最常用、最好用的vue服务端渲染框架
- 下一篇: pthread_cancel 退出线程引