通过Spark Streaming的window操作实战模拟热点搜索词案例实战
本博文主要內容包括:
1、在線熱點搜索詞實現解析
2、SparkStreaming 利用reduceByKeyAndWindow實現在線熱點搜索詞實戰
一:在線熱點搜索詞實現解析
背景描述:在社交網絡(例如微博),電子商務(例如京東),熱搜詞(例如百度)等人們核心關注的內容之一就是我所關注的內容中,大家正在最關注什么或者說當前的熱點是什么,這在市級企業級應用中是非常有價值,例如我們關心過去30分鐘大家正在熱搜什么,并且每5分鐘更新一次,這就使得熱點內容是動態更新的,當然更有價值。 Yahoo(是Hadoop的最大用戶)被收購,因為沒做到實時在線處理實現技術:Spark Streaming(在線批處理) 提供了滑動窗口的奇數來支撐實現上述業務背景,我外面您可以使用reduceByKeyAndWindow操作來做具體實現
我們知道在SparkStreaming中可以設置batchInterval,讓SparkStreaming每隔batchInterval時間提交一次Job,假設batchInterval設置為5秒,那如果需要對1分鐘內的數據做統計,該如何實現呢?SparkStreaming中提供了window的概念。我們看下圖:
官網給的例子每個2秒鐘更新過去3秒鐘的內容,3秒鐘算一下,5秒鐘算一下,3秒鐘是一個窗口。window可以包含多個batchInterval(例如5秒),但是必須為batchInterval的整數倍例如1分鐘。另外window可以移動,稱之為滑動時間間隔,它也是batchInterval的整數倍,例如10秒。一般情況滑動時間間隔小于window的時間長度,否則會丟失數據。
SparkStreaming提供了如下與window相關的方法:
二、SparkStreaming 實現在線熱點搜索詞實戰
1、經過分析我們采用reduceByKeyAndWindow的方法,reduceByKeyAndWindow方法分析如下:
從代碼上面來看, 入口為:
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)一步一步跟蹤進去, 可以看到實際的業務類是在ReducedWindowedDStream 這個類里面:
代碼理解就直接拿這個類來看了: 主要功能是在compute里面實現, 通過下面代碼回調mergeValues 來計算最后的返回值
先計算oldRDD 和newRDD
//currentWindow 就是以當前時間回退一個window的時間再向前一個batch 到當前時間的窗口 代碼里面有一個圖很有用:
我們要計算的new rdd就是15秒-25秒期間的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值
然后最終結果是 重復區間(previous window的值 - oldRDD的值) =》 也就是中間重復部分, 再加上newRDD的值, 這樣的話得到的結果就是10秒到25秒這個時間區間的值
// 0秒 10秒 15秒 25秒 // _____________________________ // | previous window _________|___________________ // |___________________| current window | --------------> Time // |_____________________________| // // |________ _________| |________ _________| // | | // V V // old RDDs new RDDs //reduceByWindow(reduceFunc, windowDuration, slideDuration) 代碼:
可以看到他做了兩次reduce, 第一次對整個self做一次reduce, 然后截取時間區間, 對結果再做一次reduce。
第一點: 對整個self做reduce會比較慢, 因為self都是相對比較大的集合。
第二點:進行了兩次reduce ,源碼如下:
如果我們看:
reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
實際上他是調用了效率非常高的reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc) 方法 ==》 詳細計算過程參考之前的博文
這樣的話其實他只對newRDDs和oldRDDs做reduce, 由于這兩個RDDs都非常小, 可以想象效率是非常高的
def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.map(x => (1, x)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) }如果看reduceByKeyAndWindow的話, 情況也是一樣, 一個是執行:
self.reduceByKey(reduceFunc, partitioner) .window(windowDuration, slideDuration) .reduceByKey(reduceFunc, partitioner)而另外一個確是在已有的window值基礎上做了簡單的加加減減
宗上, 從效率上面考慮, 我們應該盡量使用包含invReduceFunc的方法, 同樣情況下摒棄只有reduceFunc的方法
2、我們案例代碼如下:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by zpf on 2016/8/23.*/ /*** 使用Scala并發集群運行的Spark來實現在線熱搜詞** 背景描述:在社交網絡(例如微博),電子商務(例如京東),熱搜詞(例如百度)等人們核心關注的內容之一就是我所關注的內容中* 大家正在最關注什么或者說當前的熱點是什么,這在市級企業級應用中是非常有價值,例如我們關心過去30分鐘大家正在熱搜什么,并且* 每5分鐘更新一次,這就使得熱點內容是動態更新的,當然更有價值。* Yahoo(是Hadoop的最大用戶)被收購,因為沒做到實時在線處理* 實現技術:Spark Streaming(在線批處理) 提供了滑動窗口的奇數來支撐實現上述業務背景,我外面您可以使用reduceByKeyAndWindow操作來做具體實現**/ object OnlineHottestItems {def main(args: Array[String]){/*** 第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,* 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置* 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如* 只有1G的內存)的初學者 **/val conf = new SparkConf() //創建SparkConf對象conf.setAppName("OnlineHottestItems") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱conf.setMaster("spark://Master:7077") //此時,程序在Spark集群/* * 此處設置 Batch Interval 實在spark Streaming 中生成基本Job的單位,窗口和滑動時間間隔 * 一定是該batch Interval的整數倍*/val ssc = new StreamingContext(conf, Seconds(5))val hottestStream = ssc.socketTextStream("Master", 9999)/** 用戶搜索的格式簡化為 name item,在這里我們由于要計算熱點內容,所以只需要提取item即可* 提取出的item通過map轉化為(item,1)形式* 每隔20秒更新過去60秒的內容窗口60秒,滑動20秒* */val searchPair = hottestStream.map(_.split(" ")(1)).map(item => (item , 1))val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1 + v2, Seconds(60) ,Seconds(20))hottestDStream.transform(hottestItemRDD => {val top3 = hottestItemRDD.map(pair => (pair._2,pair._1) ).sortByKey(false).map(pair => (pair._2,pair._1)).take(3)for(item <- top3){println(item)}hottestItemRDD}).print()ssc.start()ssc.awaitTermination()} }3、將程序打包運行到集群上觀察結果:
4、接下來我們使用reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) 這個函數,來實現增量的計算。
使用這個函數,必須進行Checkpoint。代碼如下
ssc.checkpoint("/user/checkpoints/") val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))總結
以上是生活随笔為你收集整理的通过Spark Streaming的window操作实战模拟热点搜索词案例实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 盛世昊通:工信部第351批即将上市的汽车
- 下一篇: 计算机系统字体变大,手把手教你如何调整电