2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
生活随笔
收集整理的這篇文章主要介紹了
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
目錄
SparkStreaming實戰案例四 窗口函數
需求
代碼實現
?
SparkStreaming實戰案例四 窗口函數
需求
使用窗口計算: 每隔5s(滑動間隔)計算最近10s(窗口長度)的數據!
回顧窗口:
窗口長度:要計算多久的數據
滑動間隔:每隔多久計算一次
窗口長度10s > 滑動間隔5s:每隔5s計算最近10s的數據--滑動窗口
窗口長度10s = 滑動間隔10s:每隔10s計算最近10s的數據--滾動窗口
窗口長度10s < 滑動間隔15s:每隔15s計算最近10s的數據--會丟失數據,開發不用
?
?
???????代碼實現
package cn.itcast.streamingimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkStreaming接收Socket數據,node01:9999* 窗口長度:要計算多久的數據* 滑動間隔:每隔多久計算一次* 窗口長度10s > 滑動間隔5s:每隔5s計算最近10s的數據--滑動窗口* 窗口長度10s = 滑動間隔10s:每隔10s計算最近10s的數據--滾動窗口* 窗口長度10s < 滑動間隔15s:每隔15s計算最近10s的數據--會丟失數據,開發不用* 使用窗口計算: 每隔5s(滑動間隔)計算最近10s(窗口長度)的數據!*/
object SparkStreamingDemo04_Window {def main(args: Array[String]): Unit = {//1.創建環境val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//batchDuration the time interval at which streaming data will be divided into batches//流數據將被劃分為批的時間間隔,就是每隔多久對流數據進行一次微批劃分!val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))// The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()//注意:因為涉及到歷史數據/歷史狀態,也就是需要將歷史數據/狀態和當前數據進行合并,作為新的Value!//那么新的Value要作為下一次的歷史數據/歷史狀態,那么應該搞一個地方存起來!//所以需要設置一個Checkpoint目錄!ssc.checkpoint("./ckp")//2.接收socket數據val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)//3.做WordCountval resultDS: DStream[(String, Int)] = linesDS.flatMap(_.split(" ")).map((_, 1))//windowDuration:窗口長度:就算最近多久的數據,必須都是微批間隔的整數倍//slideDuration :滑動間隔:就是每隔多久計算一次,,必須都是微批間隔的整數倍//使用窗口計算: 每隔5s(滑動間隔)計算最近10s(窗口長度)的數據!.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Seconds(10),Seconds(5))//總結:實際開發中需要學會的是如何設置windowDuration:窗口長度和slideDuration :滑動間隔//如進行如下需求://每隔30分鐘(slideDuration :滑動間隔),計算最近24小時(windowDuration:窗口長度)的各個廣告點擊量,應該進行如下設置://.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Minutes(24*60),Minutes(30))//每隔10分鐘(slideDuration :滑動間隔),更新最近1小時(windowDuration:窗口長度)熱搜排行榜//.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Minutes(60),Minutes(10))//4.輸出resultDS.print()//5.啟動并等待程序停止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(三十七):S
- 下一篇: 2021年大数据Spark(四十一):S