Flink 滚动窗口、滑动窗口详解
1 滾動窗口(Tumbling Windows)
滾動窗口有固定的大小,是一種對數(shù)據(jù)進行“均勻切片”的劃分方式。窗口之間沒有重疊,也不會有間隔,是“首尾相接”的狀態(tài)。如果我們把多個窗口的創(chuàng)建,看作一個窗口的運動,那就好像它在不停地向前“翻滾”一樣。這是最簡單的窗口形式,我們之前所舉的例子都是滾動窗口。也正是因為滾動窗口是“無縫銜接”,所以每個數(shù)據(jù)都會被分配到一個窗口,而且只會屬于一個窗口。
滾動窗口可以基于時間定義,也可以基于數(shù)據(jù)個數(shù)定義;需要的參數(shù)只有一個,就是窗口的大小(window size)。比如我們可以定義一個長度為 1 小時的滾動時間窗口,那么每個小時就會進行一次統(tǒng)計;或者定義一個長度為 10 的滾動計數(shù)窗口,就會每 10 個數(shù)進行一次統(tǒng)計。
2 滑動窗口(Sliding Windows)
與滾動窗口類似,滑動窗口的大小也是固定的。區(qū)別在于,窗口之間并不是首尾相接的,而是可以“錯開”一定的位置。如果看作一個窗口的運動,那么就像是向前小步“滑動”一樣。
既然是向前滑動,那么每一步滑多遠,就也是可以控制的。所以定義滑動窗口的參數(shù)有兩個:除去窗口大小(window size)之外,還有一個“滑動步長”(window slide),它其實就代
表了窗口計算的頻率。滑動的距離代表了下個窗口開始的時間間隔,而窗口大小是固定的,所以也就是兩個窗口結(jié)束時間的間隔;窗口在結(jié)束時間觸發(fā)計算輸出結(jié)果,那么滑動步長就代表
了計算頻率。例如,我們定義一個長度為 1 小時、滑動步長為 5 分鐘的滑動窗口,那么就會統(tǒng)計 1 小時內(nèi)的數(shù)據(jù),每 5 分鐘統(tǒng)計一次。同樣,滑動窗口可以基于時間定義,也可以基于數(shù)據(jù)
個數(shù)定義。
我們可以看到,當滑動步長小于窗口大小時,滑動窗口就會出現(xiàn)重疊,這時數(shù)據(jù)也可能會被同時分配到多個窗口中。而具體的個數(shù),就由窗口大小和滑動步長的比值(size/slide)來決
定。如圖 6-18 所示,滑動步長剛好是窗口大小的一半,那么每個數(shù)據(jù)都會被分配到 2 個窗口里。比如我們定義的窗口長度為 1 小時、滑動步長為 30 分鐘,那么對于 8 點 55 分的數(shù)據(jù),應該同時屬于[8 點, 9 點)和[8 點半, 9 點半)兩個窗口;而對于 8 點 10 分的數(shù)據(jù),則同時屬于[8點, 9 點)和[7 點半, 8 點半)兩個窗口。所以,滑動窗口其實是固定大小窗口的更廣義的一種形式;換句話說,滾動窗口也可以看作是一種特殊的滑動窗口——窗口大小等于滑動步長(size = slide)。當然,我們也可以定義滑動步長大于窗口大小,這樣的話就會出現(xiàn)窗口不重疊、但會有間隔的情況;這時有些數(shù)據(jù)不
屬于任何一個窗口,就會出現(xiàn)遺漏統(tǒng)計。所以一般情況下,我們會讓滑動步長小于窗口大小,并盡量設置為整數(shù)倍的關系。
在一些場景中,可能需要統(tǒng)計最近一段時間內(nèi)的指標,而結(jié)果的輸出頻率要求又很高,甚至要求實時更新,比如股票價格的 24 小時漲跌幅統(tǒng)計,或者基于一段時間內(nèi)行為檢測的異常報警。這時滑動窗口無疑就是很好的實現(xiàn)方式。
3 窗口API
3.1 按鍵分區(qū)窗口(Keyed Windows)
經(jīng)過按鍵分區(qū) keyBy 操作后,數(shù)據(jù)流會按照 key 被分為多條邏輯流(logical streams),這就是 KeyedStream。基于 KeyedStream 進行窗口操作時, 窗口計算會在多個并行子任務上同時
執(zhí)行。相同 key 的數(shù)據(jù)會被發(fā)送到同一個并行子任務,而窗口操作會基于每個 key 進行單獨的處理。所以可以認為,每個 key 上都定義了一組窗口,各自獨立地進行統(tǒng)計計算。
在代碼實現(xiàn)上,我們需要先對 DataStream 調(diào)用.keyBy()進行按鍵分區(qū),然后再調(diào)用.window()定義窗口。
stream.keyBy(...) .window(...)3.2 非按鍵分區(qū)(Non-Keyed Windows)
如果沒有進行 keyBy,那么原始的 DataStream 就不會分成多條邏輯流。這時窗口邏輯只能在一個任務(task)上執(zhí)行,就相當于并行度變成了 1。所以在實際應用中一般不推薦使用這種方式。
在代碼中,直接基于 DataStream 調(diào)用.windowAll()定義窗口。
stream.windowAll(...)3.3 代碼中窗口 API 的調(diào)用
窗口操作主要有兩個部分:窗口分配器(Window Assigners)和窗口函數(shù)(Window Functions)。
stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>)其中.window()方法需要傳入一個窗口分配器,它指明了窗口的類型;而后面的.aggregate()方法傳入一個窗口函數(shù)作為參數(shù),它用來定義窗口具體的處理邏輯。窗口分配器有各種形式,而窗口函數(shù)的調(diào)用方法也不只.aggregate()一種,另外,在實際應用中,一般都需要并行執(zhí)行任務,非按鍵分區(qū)很少用到,所以我們之后都以按鍵分區(qū)窗口為例;如果想要實現(xiàn)非按鍵分區(qū)窗口,只要前面不做 keyBy,后面調(diào)用.window()時直接換成.windowAll()就可以了。
3.4 滾動處理時間窗口
窗口分配器由類 TumblingProcessingTimeWindows 提供,需要調(diào)用它的靜態(tài)方法.of()。
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)這里.of()方法需要傳入一個 Time 類型的參數(shù) size,表示滾動窗口的大小,我們這里創(chuàng)建了一個長度為 5 秒的滾動窗口。另外,.of()還有一個重載方法,可以傳入兩個 Time 類型的參數(shù):size 和 offset。第一個參數(shù)當然還是窗口大小,第二個參數(shù)則表示窗口起始點的偏移量,用這個偏移量可以處理時區(qū)。
例如:我們所在的時區(qū)是東八區(qū),也就是 UTC+8,跟 UTC 有 8小時的時差。我們定義 1 天滾動窗口時,如果用默認的起始點,那么得到就是倫敦時間每天 0點開啟窗口,這時是北京時間早上 8 點。那怎樣得到北京時間每天 0 點開啟的滾動窗口呢?只要設置-8 小時的偏移量就可以了。
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))3.5 滑動處理時間窗口
窗口分配器由類 SlidingProcessingTimeWindows 提供,同樣需要調(diào)用它的靜態(tài)方法.of()。
這里.of()方法需要傳入兩個 Time 類型的參數(shù):size 和 slide,前者表示滑動窗口的大小,后者表示滑動窗口的滑動步長。我們這里創(chuàng)建了一個長度為 10 秒、滑動步長為 5 秒的滑動窗口。
滑動窗口同樣可以追加第三個參數(shù),用于指定窗口起始點的偏移量,用法與滾動窗口完全一致。
4 窗口函數(shù)
定義了窗口分配器,我們只是知道了數(shù)據(jù)屬于哪個窗口,可以將數(shù)據(jù)收集起來了;至于收集起來到底要做什么,其實還完全沒有頭緒。所以在窗口分配器之后,必須再接上一個定義窗
口如何進行計算的操作,這就是所謂的“窗口函數(shù)”(window functions)。經(jīng)窗口分配器處理之后,數(shù)據(jù)可以分配到對應的窗口中,而數(shù)據(jù)流經(jīng)過轉(zhuǎn)換得到的數(shù)據(jù)類型是 WindowedStream。這個類型并不是 DataStream,所以并不能直接進行其他轉(zhuǎn)換,而必須進一步調(diào)用窗口函數(shù),對收集到的數(shù)據(jù)進行處理計算之后,才能最終再次得到 DataStream。
4.1 增量聚合函數(shù)(incremental aggregation functions)
4.1.1 歸約函數(shù)(ReduceFunction)
將窗口中收集到的數(shù)據(jù)兩兩進行歸約。當我們進行流處理時,就是要保存一個狀態(tài);每來一個新的數(shù)據(jù),就和之前的聚合狀態(tài)做歸約,這樣就實現(xiàn)了增量式的聚合。
統(tǒng)計每一小時用戶的訪問量:
package com.rosh.flink.pojo;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;@AllArgsConstructor @NoArgsConstructor @Data public class UserPojo {private Integer userId;private String name;private String uri;private Long timestamp;} package com.rosh.flink.wartermark;import com.rosh.flink.pojo.UserPojo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.ArrayList; import java.util.List; import java.util.Random;public class WindowTS {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<UserPojo> dataDS = env.fromCollection(getUserLists());//生成有序水位線SingleOutputStreamOperator<UserPojo> orderStreamDS = dataDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {@Overridepublic long extractTimestamp(UserPojo element, long recordTimestamp) {return element.getTimestamp();}}));//聚合SingleOutputStreamOperator<Tuple2<Integer, Long>> userDS = orderStreamDS.map(new MapFunction<UserPojo, Tuple2<Integer, Long>>() {@Overridepublic Tuple2<Integer, Long> map(UserPojo value) throws Exception {return Tuple2.of(value.getUserId(), 1L);}});//開窗統(tǒng)計每1小時用戶訪問了多少次SingleOutputStreamOperator<Tuple2<Integer, Long>> resultDS = userDS.keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.hours(1))).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {@Overridepublic Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {value1.f1 = value1.f1 + value2.f1;return value1;}});resultDS.print();env.execute("WarterMarkTest");}private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {List<UserPojo> lists = new ArrayList<>();Random random = SecureRandom.getInstanceStrong();for (int i = 1; i <= 1000; i++) {String uri = "/goods/" + i;int userId = random.nextInt(10);//有序時間UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, (long) (1000 * i));//無序時間lists.add(userPojo);}return lists;}}4.1.2 聚合函數(shù)(AggregateFunction)
ReduceFunction 可以解決大多數(shù)歸約聚合的問題,但是這個接口有一個限制,就是聚合狀態(tài)的類型、輸出結(jié)果的類型都必須和輸入數(shù)據(jù)類型一樣。這就迫使我們必須在聚合前,先將數(shù)
據(jù)轉(zhuǎn)換(map)成預期結(jié)果類型;而在有些情況下,還需要對狀態(tài)進行進一步處理才能得到輸出結(jié)果,這時它們的類型可能不同,使用 ReduceFunction 就會非常麻煩。
例如,如果我們希望計算一組數(shù)據(jù)的平均值,應該怎樣做聚合呢?很明顯,這時我們需要計算兩個狀態(tài)量:數(shù)據(jù)的總和(sum),以及數(shù)據(jù)的個數(shù)(count),而最終輸出結(jié)果是兩者的商(sum/count)。如果用 ReduceFunction,那么我們應該先把數(shù)據(jù)轉(zhuǎn)換成二元組(sum, count)的形式,然后進行歸約聚合,最后再將元組的兩個元素相除轉(zhuǎn)換得到最后的平均值。本來應該只是一個任務,可我們卻需要 map-reduce-map 三步操作,這顯然不夠高效。
于是自然可以想到,如果取消類型一致的限制,讓輸入數(shù)據(jù)、中間狀態(tài)、輸出結(jié)果三者類型都可以不同,不就可以一步直接搞定了嗎?Flink 的 Window API 中的 aggregate 就提供了這樣的操作。直接基于 WindowedStream 調(diào)用.aggregate()方法,就可以定義更加靈活的窗口聚合操作。這個方法需要傳入一個AggregateFunction 的實現(xiàn)類作為參數(shù)。AggregateFunction 在源碼中的定義如下:
/**** The type of the values that are aggregated (input values)* The type of the accumulator (intermediate aggregate state).* The type of the aggregated result**/ public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {/*** 創(chuàng)建一個累加器,這就是為聚合創(chuàng)建了一個初始狀態(tài),每個聚合任務只會調(diào)用一次。*/ ACC createAccumulator();/*** 將輸入的元素添加到累加器中。這就是基于聚合狀態(tài),對新來的數(shù)據(jù)進行進一步聚合的過程。方法傳入兩個參數(shù):當前新到的數(shù)據(jù) value,和當前的累加器accumulator;* 返回一個新的累加器值,也就是對聚合狀態(tài)進行更新。每條數(shù)據(jù)到來之后都會調(diào)用這個方法。*/ACC add(IN value, ACC accumulator);/*** 從累加器中提取聚合的輸出結(jié)果。也就是說,我們可以定義多個狀態(tài),然后再基于這些聚合的狀態(tài)計算出一個結(jié)果進行輸出。比如之前我們提到的計算平均* 值,就可以把 sum 和 count 作為狀態(tài)放入累加器,而在調(diào)用這個方法時相除得到最終結(jié)果。這個方法只在窗口要輸出結(jié)果時調(diào)用。*/OUT getResult(ACC accumulator);/*** 合并兩個累加器,并將合并后的狀態(tài)作為一個累加器返回。這個方法只在需要合并窗口的場景下才會被調(diào)用;最常見的合并窗口(Merging Window)的場景* 就是會話窗口(Session Windows)。*/ACC merge(ACC a, ACC b); }所以可以看到,AggregateFunction 的工作原理是:首先調(diào)用 createAccumulator()為任務初始化一個狀態(tài)(累加器);而后每來一個數(shù)據(jù)就調(diào)用一次 add()方法,對數(shù)據(jù)進行聚合,得到的
結(jié)果保存在狀態(tài)中;等到了窗口需要輸出時,再調(diào)用 getResult()方法得到計算結(jié)果。很明顯,與 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于輸入、中間狀態(tài)、輸
出的類型可以不同,使得應用更加靈活方便。
·統(tǒng)計人均訪問次數(shù):
package com.rosh.flink.wartermark;import com.rosh.flink.pojo.UserPojo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.*;public class AggWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<UserPojo> userDS = env.fromCollection(getUserLists()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {@Overridepublic long extractTimestamp(UserPojo element, long recordTimestamp) {return element.getTimestamp();}}));//統(tǒng)計5秒內(nèi),人均訪問次數(shù)SingleOutputStreamOperator<Double> resultDS = userDS.keyBy(key -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new PeopleHourAvgCount());resultDS.print("人均訪問次數(shù)為:");env.execute("AggWindowTest");}private static class PeopleHourAvgCount implements AggregateFunction<UserPojo, Tuple2<HashSet<Integer>, Long>, Double> {/*** 初始化累加器*/@Overridepublic Tuple2<HashSet<Integer>, Long> createAccumulator() {return Tuple2.of(new HashSet<>(), 0L);}/****/@Overridepublic Tuple2<HashSet<Integer>, Long> add(UserPojo value, Tuple2<HashSet<Integer>, Long> accumulator) {//distinct userIdaccumulator.f0.add(value.getUserId());//次數(shù)+1accumulator.f1 = accumulator.f1 + 1;//返回累加器return accumulator;}@Overridepublic Double getResult(Tuple2<HashSet<Integer>, Long> accumulator) {return accumulator.f1 * 1.0 / accumulator.f0.size();}@Overridepublic Tuple2<HashSet<Integer>, Long> merge(Tuple2<HashSet<Integer>, Long> a, Tuple2<HashSet<Integer>, Long> b) {return null;}}/*** 獲取隨機人數(shù)的1000次訪問*/private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {List<UserPojo> lists = new ArrayList<>();Random random = SecureRandom.getInstanceStrong();//獲取隨機人數(shù)int peopleCount = random.nextInt(20);System.out.println("隨機人數(shù)為:" + peopleCount);for (int i = 1; i <= 1000; i++) {String uri = "/goods/" + i;int userId = random.nextInt(peopleCount);//有序時間UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());//無序時間lists.add(userPojo);}return lists;} }4.2 全窗口函數(shù)(full window functions)
窗口操作中的另一大類就是全窗口函數(shù)。與增量聚合函數(shù)不同,全窗口函數(shù)需要先收集窗口中的數(shù)據(jù),并在內(nèi)部緩存起來,等到窗口要輸出結(jié)果的時候再取出數(shù)據(jù)進行計算。
ProcessWindowFunction 是 Window API 中最底層的通用窗口函數(shù)接口。之所以說它“最底層”,是因為除了可以拿到窗口中的所有數(shù)據(jù)之外,ProcessWindowFunction 還可以獲取到一個“上下文對象”(Context)。這個上下文對象非常強大,不僅能夠獲取窗口信息,還可以訪問當前的時間和狀態(tài)信息。這里的時間就包括了處理時間(processing time)和事件時間水位線(event time watermark)。這就使得 ProcessWindowFunction 更加靈活、功能更加豐富。事實上,ProcessWindowFunction 是 Flink 底層 API——處理函數(shù)(process function)中的一員。
統(tǒng)計10秒訪問UV:
package com.rosh.flink.wartermark;import com.rosh.flink.pojo.UserPojo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.sql.Timestamp; import java.util.*;public class ProcessWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());//水位線SingleOutputStreamOperator<UserPojo> watermarks = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {@Overridepublic long extractTimestamp(UserPojo element, long recordTimestamp) {return element.getTimestamp();}}));//開窗10秒UV統(tǒng)計SingleOutputStreamOperator<String> resultDS = watermarks.keyBy(key -> true).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new UserUVCount());resultDS.print("UV:");env.execute("ProcessWindowTest");}private static class UserUVCount extends ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow> {@Overridepublic void process(Boolean aBoolean, ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow>.Context context, Iterable<UserPojo> elements, Collector<String> out) throws Exception {//用戶集合HashSet<Integer> hashSet = new HashSet<>();for (UserPojo user : elements) {hashSet.add(user.getUserId());}//獲取時間信息long start = context.window().getStart();long end = context.window().getEnd();String rs = "窗口信息,startTime:" + new Timestamp(start) + ",endTime: " + new Timestamp(end) + ",用戶訪問的次數(shù)為:" + hashSet.size();out.collect(rs);}}private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {List<UserPojo> lists = new ArrayList<>();Random random = SecureRandom.getInstanceStrong();int userCount = random.nextInt(100);for (int i = 1; i <= 1000; i++) {String uri = "/goods/" + i;int userId = random.nextInt(userCount);//有序時間UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());//無序時間lists.add(userPojo);}return lists;}}4.3 增量聚合和全窗口函數(shù)的結(jié)合使用
增量聚合函數(shù)處理計算會更高效。全窗口函數(shù)的優(yōu)勢在于提供了更多的信息,可以認為是更加“通用”的窗口操作。所以在實際應用中,我們往往希望兼具這兩者的優(yōu)點,把它們結(jié)合在一起使用。Flink 的Window API 就給我們實現(xiàn)了這樣的用法。
在調(diào)用 WindowedStream 的.reduce()和.aggregate()方法時,只是簡單地直接傳入了一個 ReduceFunction 或 AggregateFunction 進行增量聚合。除此之外,其實還可以傳入第二個參數(shù):一個全窗口函數(shù),可以是 WindowFunction 或者 ProcessWindowFunction。
// ReduceFunction 與 WindowFunction 結(jié)合 public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) // ReduceFunction 與 ProcessWindowFunction 結(jié)合 public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)// AggregateFunction 與 WindowFunction 結(jié)合 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)// AggregateFunction 與 ProcessWindowFunction 結(jié)合 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)這樣調(diào)用的處理機制是:基于第一個參數(shù)(增量聚合函數(shù))來處理窗口數(shù)據(jù),每來一個數(shù)據(jù)就做一次聚合;等到窗口需要觸發(fā)計算時,則調(diào)用第二個參數(shù)(全窗口函數(shù))的處理邏輯輸
出結(jié)果。需要注意的是,這里的全窗口函數(shù)就不再緩存所有數(shù)據(jù)了,而是直接將增量聚合函數(shù)的結(jié)果拿來當作了 Iterable 類型的輸入。一般情況下,這時的可迭代集合中就只有一個元素了。
統(tǒng)計10秒的url瀏覽量:
package com.rosh.flink.wartermark;import com.alibaba.fastjson.JSONObject; import com.rosh.flink.pojo.UserPojo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random;public class UrlWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//讀取數(shù)據(jù)源DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());//水位線SingleOutputStreamOperator<UserPojo> waterDS = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {@Overridepublic long extractTimestamp(UserPojo element, long recordTimestamp) {return element.getTimestamp();}}));//url countSingleOutputStreamOperator<Tuple2<String, Long>> urlDS = waterDS.map(new MapFunction<UserPojo, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(UserPojo value) throws Exception {return Tuple2.of(value.getUri(), 1L);}});SingleOutputStreamOperator<JSONObject> resultDS = urlDS.keyBy(data -> data.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {value1.f1 = value1.f1 + value2.f1;return value1;}}, new WindowFunction<Tuple2<String, Long>, JSONObject, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<JSONObject> out) throws Exception {Tuple2<String, Long> tuple2 = input.iterator().next();JSONObject jsonObject = new JSONObject();jsonObject.put("url", tuple2.f0);jsonObject.put("count", tuple2.f1);new Timestamp(window.getStart());jsonObject.put("startTime", new Timestamp(window.getStart()).toString());jsonObject.put("endTime", new Timestamp(window.getEnd()).toString());out.collect(jsonObject);}});resultDS.print();env.execute("UrlWindowTest");}private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {List<UserPojo> lists = new ArrayList<>();Random random = SecureRandom.getInstanceStrong();for (int i = 1; i <= 1000; i++) {//隨機生成userId、goodIdint userId = random.nextInt(100);int goodId = random.nextInt(50);String uri = "/goods/" + goodId;//有序時間UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());//無序時間lists.add(userPojo);}return lists;}}總結(jié)
以上是生活随笔為你收集整理的Flink 滚动窗口、滑动窗口详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 航司企业级大数据规划(方法论) - 企业
- 下一篇: 测试十年的前辈工作心得与经验分享