Flink countWindow窗口
窗口在處理數(shù)據(jù)前,會對數(shù)據(jù)做分流,有兩種控制流的方式,按照數(shù)據(jù)流劃分:Keyed和Non-Keyed Windows
Keyed Windows:就是有按照某個字段分組的數(shù)據(jù)流使用的窗口,可以理解為按照原始數(shù)據(jù)流中的某個key進行分類,擁有同一個key值的數(shù)據(jù)流將為進入同一個window,多個窗口并行的邏輯流。
Non-Keyed Windows:沒有進行按照某個字段分組的數(shù)據(jù)使用的窗口
stream.windowAll(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"Keyed和Non-Keyed Windows的區(qū)別
在定義窗口之前,要指定的第一件事是流是否需要Keyed,使用keyBy(…)將無界流分成邏輯的keyed stream。 如果未調用keyBy(…),則表示流不是keyed stream。
實戰(zhàn)
目前原始數(shù)據(jù)內(nèi)容如下:
1 100 2 200 1 101 2 201 3 300 4 401 3 302 1 103 5 501 6 602 6 601 4 402 6 600我們使用keyBy(0),根據(jù)第一個字段進行分組,然后使用countWindow(2)來進行觸發(fā),代碼如下:
public class Test3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/haha.txt");dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {String[] split = s.split("\t");return Tuple2.of(split[0], split[1]);}}).keyBy(0).countWindow(2).apply(new WindowFunction<Tuple2<String, String>, Object, Tuple, GlobalWindow>() {@Overridepublic void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, String>> input, Collector<Object> out) throws Exception {Iterator<Tuple2<String, String>> iterator = input.iterator();while (iterator.hasNext()) {Tuple2<String, String> next = iterator.next();System.out.println("執(zhí)行操作:" + next.f0 + ", " + next.f1);out.collect( next.f1 + "======");}}}).print();env.execute("Test3");} }輸出結果如下:
執(zhí)行操作:2, 200 執(zhí)行操作:2, 201 執(zhí)行操作:1, 100 執(zhí)行操作:1, 101 執(zhí)行操作:3, 300 執(zhí)行操作:3, 302 執(zhí)行操作:6, 602 執(zhí)行操作:6, 601 執(zhí)行操作:4, 401 執(zhí)行操作:4, 402原始數(shù)據(jù)有13條,而輸出結果只有10條,這是為什么?
原因是我們使用的是countWindow(2),也就是當根據(jù)keyBy(0)分組之后,數(shù)據(jù)的數(shù)量達到2時進行輸出。
而我們的數(shù)據(jù)中id為1的有3條,因此其中一條數(shù)據(jù)將不會被觸發(fā),id為6的有3條,其中一條數(shù)據(jù)沒有達到countWindow(2)也不會觸發(fā),id為5的有1條,沒有達到countWindow(2)也不會被觸發(fā),因此輸出結果少了3條數(shù)據(jù)。
因此countWindow是根據(jù)分組之后的數(shù)據(jù)條數(shù)來確定是否執(zhí)行后面的運算。
當把countWindow(2)改為countWindow(1)時,每一條數(shù)據(jù)都會被處理輸出。
總結
以上是生活随笔為你收集整理的Flink countWindow窗口的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink的重启策略
- 下一篇: Hadoop报错AccessContro