日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink countWindow窗口

發(fā)布時間:2024/9/16 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink countWindow窗口 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

窗口在處理數(shù)據(jù)前,會對數(shù)據(jù)做分流,有兩種控制流的方式,按照數(shù)據(jù)流劃分:Keyed和Non-Keyed Windows
Keyed Windows:就是有按照某個字段分組的數(shù)據(jù)流使用的窗口,可以理解為按照原始數(shù)據(jù)流中的某個key進行分類,擁有同一個key值的數(shù)據(jù)流將為進入同一個window,多個窗口并行的邏輯流。

stream.keyBy(...) // 是keyed類型數(shù)據(jù)集.window(...) // 指定窗口分配器類型[.trigger(...)] // 指定觸發(fā)器類型(可選)[.evictor(...)] // 指定evictor或不指定(可選)[.allowedLateness(...)] // 指定是否延遲處理數(shù)據(jù)(可選)[.sideOutputLateData(...)] // optional: "output tag" (else no side output for late data).reduce/aggregate/flod/apply() //指定窗口計算函數(shù).getSideOutput(...) //根據(jù)Tag輸出數(shù)據(jù)(可選)

Non-Keyed Windows:沒有進行按照某個字段分組的數(shù)據(jù)使用的窗口

stream.windowAll(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"

Keyed和Non-Keyed Windows的區(qū)別
在定義窗口之前,要指定的第一件事是流是否需要Keyed,使用keyBy(…)將無界流分成邏輯的keyed stream。 如果未調用keyBy(…),則表示流不是keyed stream。

  • 對于Keyed流:可以將傳入事件的任何屬性用作key。 擁有Keyed stream將允許窗口計算由多個任務并行執(zhí)行,因為每個邏輯Keyed流可以獨立于其余任務進行處理。 相同Key的所有元素將被發(fā)送到同一個任務。
  • 對于Non-Keyed流:原始流將不會被分成多個邏輯流,并且所有窗口邏輯將由單個Task執(zhí)行,即并行性為1。
  • 實戰(zhàn)

    目前原始數(shù)據(jù)內(nèi)容如下:

    1 100 2 200 1 101 2 201 3 300 4 401 3 302 1 103 5 501 6 602 6 601 4 402 6 600

    我們使用keyBy(0),根據(jù)第一個字段進行分組,然后使用countWindow(2)來進行觸發(fā),代碼如下:

    public class Test3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/haha.txt");dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {String[] split = s.split("\t");return Tuple2.of(split[0], split[1]);}}).keyBy(0).countWindow(2).apply(new WindowFunction<Tuple2<String, String>, Object, Tuple, GlobalWindow>() {@Overridepublic void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, String>> input, Collector<Object> out) throws Exception {Iterator<Tuple2<String, String>> iterator = input.iterator();while (iterator.hasNext()) {Tuple2<String, String> next = iterator.next();System.out.println("執(zhí)行操作:" + next.f0 + ", " + next.f1);out.collect( next.f1 + "======");}}}).print();env.execute("Test3");} }

    輸出結果如下:

    執(zhí)行操作:2, 200 執(zhí)行操作:2, 201 執(zhí)行操作:1, 100 執(zhí)行操作:1, 101 執(zhí)行操作:3, 300 執(zhí)行操作:3, 302 執(zhí)行操作:6, 602 執(zhí)行操作:6, 601 執(zhí)行操作:4, 401 執(zhí)行操作:4, 402

    原始數(shù)據(jù)有13條,而輸出結果只有10條,這是為什么?
    原因是我們使用的是countWindow(2),也就是當根據(jù)keyBy(0)分組之后,數(shù)據(jù)的數(shù)量達到2時進行輸出。
    而我們的數(shù)據(jù)中id為1的有3條,因此其中一條數(shù)據(jù)將不會被觸發(fā),id為6的有3條,其中一條數(shù)據(jù)沒有達到countWindow(2)也不會觸發(fā),id為5的有1條,沒有達到countWindow(2)也不會被觸發(fā),因此輸出結果少了3條數(shù)據(jù)。
    因此countWindow是根據(jù)分組之后的數(shù)據(jù)條數(shù)來確定是否執(zhí)行后面的運算。
    當把countWindow(2)改為countWindow(1)時,每一條數(shù)據(jù)都會被處理輸出。

    總結

    以上是生活随笔為你收集整理的Flink countWindow窗口的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。