日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

通过Spark Streaming的window操作实战模拟热点搜索词案例实战

發布時間:2024/3/12 windows 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 通过Spark Streaming的window操作实战模拟热点搜索词案例实战 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本博文主要內容包括:

1、在線熱點搜索詞實現解析
2、SparkStreaming 利用reduceByKeyAndWindow實現在線熱點搜索詞實戰

一:在線熱點搜索詞實現解析

背景描述:在社交網絡(例如微博),電子商務(例如京東),熱搜詞(例如百度)等人們核心關注的內容之一就是我所關注的內容中,大家正在最關注什么或者說當前的熱點是什么,這在市級企業級應用中是非常有價值,例如我們關心過去30分鐘大家正在熱搜什么,并且每5分鐘更新一次,這就使得熱點內容是動態更新的,當然更有價值。 Yahoo(是Hadoop的最大用戶)被收購,因為沒做到實時在線處理實現技術:Spark Streaming(在線批處理) 提供了滑動窗口的奇數來支撐實現上述業務背景,我外面您可以使用reduceByKeyAndWindow操作來做具體實現

我們知道在SparkStreaming中可以設置batchInterval,讓SparkStreaming每隔batchInterval時間提交一次Job,假設batchInterval設置為5秒,那如果需要對1分鐘內的數據做統計,該如何實現呢?SparkStreaming中提供了window的概念。我們看下圖:

官網給的例子每個2秒鐘更新過去3秒鐘的內容,3秒鐘算一下,5秒鐘算一下,3秒鐘是一個窗口。window可以包含多個batchInterval(例如5秒),但是必須為batchInterval的整數倍例如1分鐘。另外window可以移動,稱之為滑動時間間隔,它也是batchInterval的整數倍,例如10秒。一般情況滑動時間間隔小于window的時間長度,否則會丟失數據。

SparkStreaming提供了如下與window相關的方法:

二、SparkStreaming 實現在線熱點搜索詞實戰

1、經過分析我們采用reduceByKeyAndWindow的方法,reduceByKeyAndWindow方法分析如下:

從代碼上面來看, 入口為:

reduceByKeyAndWindow(_+_, _-_, Duration, Duration)

一步一步跟蹤進去, 可以看到實際的業務類是在ReducedWindowedDStream 這個類里面:
代碼理解就直接拿這個類來看了: 主要功能是在compute里面實現, 通過下面代碼回調mergeValues 來計算最后的返回值

val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]] .mapValues(mergeValues)

先計算oldRDD 和newRDD

//currentWindow 就是以當前時間回退一個window的時間再向前一個batch 到當前時間的窗口 代碼里面有一個圖很有用:
我們要計算的new rdd就是15秒-25秒期間的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值

然后最終結果是 重復區間(previous window的值 - oldRDD的值) =》 也就是中間重復部分, 再加上newRDD的值, 這樣的話得到的結果就是10秒到25秒這個時間區間的值

// 0秒 10秒 15秒 25秒 // _____________________________ // | previous window _________|___________________ // |___________________| current window | --------------> Time // |_____________________________| // // |________ _________| |________ _________| // | | // V V // old RDDs new RDDs //

reduceByWindow(reduceFunc, windowDuration, slideDuration) 代碼:

可以看到他做了兩次reduce, 第一次對整個self做一次reduce, 然后截取時間區間, 對結果再做一次reduce。

第一點: 對整個self做reduce會比較慢, 因為self都是相對比較大的集合。
第二點:進行了兩次reduce ,源碼如下:

def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) }

如果我們看:
reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

實際上他是調用了效率非常高的reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc) 方法 ==》 詳細計算過程參考之前的博文

這樣的話其實他只對newRDDs和oldRDDs做reduce, 由于這兩個RDDs都非常小, 可以想象效率是非常高的

def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.map(x => (1, x)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) }

如果看reduceByKeyAndWindow的話, 情況也是一樣, 一個是執行:

self.reduceByKey(reduceFunc, partitioner) .window(windowDuration, slideDuration) .reduceByKey(reduceFunc, partitioner)

而另外一個確是在已有的window值基礎上做了簡單的加加減減

宗上, 從效率上面考慮, 我們應該盡量使用包含invReduceFunc的方法, 同樣情況下摒棄只有reduceFunc的方法

2、我們案例代碼如下:

import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by zpf on 2016/8/23.*/ /*** 使用Scala并發集群運行的Spark來實現在線熱搜詞** 背景描述:在社交網絡(例如微博),電子商務(例如京東),熱搜詞(例如百度)等人們核心關注的內容之一就是我所關注的內容中* 大家正在最關注什么或者說當前的熱點是什么,這在市級企業級應用中是非常有價值,例如我們關心過去30分鐘大家正在熱搜什么,并且* 每5分鐘更新一次,這就使得熱點內容是動態更新的,當然更有價值。* Yahoo(是Hadoop的最大用戶)被收購,因為沒做到實時在線處理* 實現技術:Spark Streaming(在線批處理) 提供了滑動窗口的奇數來支撐實現上述業務背景,我外面您可以使用reduceByKeyAndWindow操作來做具體實現**/ object OnlineHottestItems {def main(args: Array[String]){/*** 第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,* 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置* 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如* 只有1G的內存)的初學者 **/val conf = new SparkConf() //創建SparkConf對象conf.setAppName("OnlineHottestItems") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱conf.setMaster("spark://Master:7077") //此時,程序在Spark集群/* * 此處設置 Batch Interval 實在spark Streaming 中生成基本Job的單位,窗口和滑動時間間隔 * 一定是該batch Interval的整數倍*/val ssc = new StreamingContext(conf, Seconds(5))val hottestStream = ssc.socketTextStream("Master", 9999)/** 用戶搜索的格式簡化為 name item,在這里我們由于要計算熱點內容,所以只需要提取item即可* 提取出的item通過map轉化為(item,1)形式* 每隔20秒更新過去60秒的內容窗口60秒,滑動20秒* */val searchPair = hottestStream.map(_.split(" ")(1)).map(item => (item , 1))val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1 + v2, Seconds(60) ,Seconds(20))hottestDStream.transform(hottestItemRDD => {val top3 = hottestItemRDD.map(pair => (pair._2,pair._1) ).sortByKey(false).map(pair => (pair._2,pair._1)).take(3)for(item <- top3){println(item)}hottestItemRDD}).print()ssc.start()ssc.awaitTermination()} }

3、將程序打包運行到集群上觀察結果:

4、接下來我們使用reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) 這個函數,來實現增量的計算。

使用這個函數,必須進行Checkpoint。代碼如下

ssc.checkpoint("/user/checkpoints/") val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))

總結

以上是生活随笔為你收集整理的通过Spark Streaming的window操作实战模拟热点搜索词案例实战的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。