Flink中的window知识体系与scala完整案例
[1]中得到大類,插圖來自[2]
| 窗口大類(官方) | 子分類 | 數據是否在窗口之間重疊 |
| Time Windows | Tumbling Windows | |
| Sliding Windows | ||
| Count Windows | 到達指定數量的單詞才進行統計 | |
?
| 多用戶窗口[4] | 窗口范圍 |
| sessionwindow | |
| globalwindow |
?
GlobalWindow的用法參考[7]
GlobalWindow和 WindowAll的區別是WindowAll 算子:并行度始終為1[8]
上述是來自官方文檔的分類,來看下[9]的分類
#################################################################################
缺少windowall的圖示
################################下面是亂入的小知識和本文無關#####################################
Physical Partitioning
1、Custom
2、Random
3、Rebalancing
4、Rescaling
5、BroadCasting
#################################################################################################
窗口還可以劃分為 Keyed Window與Non-Keyed Window
實際例子是?
參考[5]來使用,注意找下圖示
#################################Tumbling Window(滾動窗口,一個窗口參數)使用#####################
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Timeobject WindowedStreamingWordCount {def main(args: Array[String]) {val env = StreamExecutionEnvironment.getExecutionEnvironment// create a stream using socketval socketStream = env.socketTextStream("localhost",9999)// implement word countval wordsStream = socketStream.flatMap(value => value.split("\\s+")).map(value => (value,1))val keyValuePair = wordsStream.keyBy(0).timeWindow(Time.seconds(15))val countStream = keyValuePair.sum(1)countStream.print()env.execute()}}該窗口只會計算從當前計時開始的15秒內的數據
#############################Sliding Window(滑動窗口,兩個窗口參數)使用#################################
package com.madhukaraphatak.flink.streaming.examplesimport org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Timeobject WindowExample {def main(args: Array[String]) {val env = StreamExecutionEnvironment.getExecutionEnvironmentval source = env.socketTextStream("localhost",9000)//word mapval values = source.flatMap(value => value.split("\\s+")).map(value => (value,1))val keyValue = values.keyBy(0)//tumbling window : Calculate wordcount for each 15 seconds// val tumblingWindow = keyValue.timeWindow(Time.seconds(15))// // sliding window : Calculate wordcount for last 5 secondsval slidingWindow = keyValue.timeWindow(Time.seconds(15),Time.seconds(5))//count window : Calculate for every 5 recordsval countWindow = keyValue.countWindow(5)//tumblingWindow.sum(1).name("tumblingwindow").print()slidingWindow.sum(1).name("slidingwindow").print()// countWindow.sum(1).name("count window").print()env.execute()}}需要定義兩個參數,分別是:
①窗口的大小
②在窗口中滑動的大小,但理論上講滑動的大小不能超過窗口大小
##############################countWindow的使用###################################################
package com.madhukaraphatak.flink.streaming.examplesimport org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Timeobject WindowExample {def main(args: Array[String]) {val senv = StreamExecutionEnvironment.getExecutionEnvironmentval source = senv.socketTextStream("localhost",9999)//word mapval values = source.flatMap(value => value.split("\\s+")).map(value => (value,1))val keyValue = values.keyBy(0)//tumbling window : Calculate wordcount for each 15 secondsval tumblingWindow = keyValue.timeWindow(Time.seconds(15))// sliding window : Calculate wordcount for last 5 secondsval slidingWindow = keyValue.timeWindow(Time.seconds(15),Time.seconds(5))//count window : Calculate for every 5 recordsval countWindow = keyValue.countWindow(5)//tumblingWindow.sum(1).name("tumblingwindow").print()//slidingWindow.sum(1).name("slidingwindow").print()countWindow.sum(1).name("count window").print()senv.execute()}}對應的start-scala-shell的remote模式(HA)中的代碼如下:
val source = senv.socketTextStream("localhost",9999)val values = source.flatMap(value => value.split("\\s+")).map(value => (value,1))val keyValue = values.keyBy(0)val countWindow = keyValue.countWindow(5)countWindow.sum(1).name("count window").print()senv.execute()nc ?-lk ?9999
結果可以在Desktop的$FLINK_HOME/log/flink-appleyuchi-taskexecutor-3-Desktop.out找到
如何體現countwindow的特性呢?
在nc -lk的終端輸入:
pku pku pku pku pku
tsinghua tsinghua tsinghua
定于自己的行為。 也就是說子類能夠根據需要實現父類的方法。
重寫方法不能拋出新的檢查異常或者比被重寫方法申明更加寬泛的異常。例如: 父類的一個方法申明了一個檢查異常 IOException,但是在重寫這個方法的時候不能拋出 Exception 異常,因為 Exception 是 IOException 的父類
tsinghua tsinghua
在結果終端輸入:
(Python3.6) appleyuchi@Desktop:log$ grep -r pku
flink-appleyuchi-taskexecutor-2-Desktop.out:2> (pku,5)
(Python3.6) appleyuchi@Desktop:log$ grep -r tsinghua
(Python3.6) appleyuchi@Desktop:log$ grep -r tsinghua
flink-appleyuchi-taskexecutor-2-Desktop.out:1> (tsinghua,5)
?
發現沒?如果tsinghua這個單詞數量不超過5個,你就別想在flink-appleyuchi-taskexecutor-2-Desktop.out這個文件中看到結果,這也體現了CountWindow的特性,如果不超過指定數據,Flink拒絕統計
#########################下面的是sliding-countwindow(滑動計數窗口)########################
代碼來自[6]
自己運行後的代碼在:
https://gitee.com/appleyuchi/Flink_Code
sliding-count同樣存在重疊(由窗口長度決定),但是這裏聚合數據的維度是接受的數據數量,而不是等待時間.
##############################################################################################
Reference:
[1]Introduction to Flink Streaming - Part 5 : Window API in Flink
[2]Tumbling Windows和Sliding Windows區別與聯系
[3]Introduction to Flink Streaming - Part 2 : Discretization of Stream using Window API
[4]Flink之窗口的使用
[5]Apache Flink:Keyed Window與Non-Keyed Window
[6]Flink countWindow 使用
[7]flink的datastream進行join操作沒有輸出結果一例
[8]Flink的Window與WindowAll 算子對比
[9]Flink Window介紹
總結
以上是生活随笔為你收集整理的Flink中的window知识体系与scala完整案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里巴巴怎么推广(1688阿里巴巴批发网
- 下一篇: win7+ubuntu20.04双系统+