日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark的实战题目——寻找5亿次访问中,访问次数最多的人

發(fā)布時間:2024/2/28 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark的实战题目——寻找5亿次访问中,访问次数最多的人 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

場景描述:這是一個Spark的實戰(zhàn)題目,也是在面試中經(jīng)常出現(xiàn)的一類題目。

問題描述

對于一個大型網(wǎng)站,用戶訪問量嘗嘗高達數(shù)十億。對于數(shù)十億是一個什么樣的概念,我們這里可以簡單的計算一下。對于一個用戶,單次訪問,我們通常會記錄下哪些數(shù)據(jù)呢?

1、用戶的id
2、用戶訪問的時間
3、用戶逗留的時間
4、用戶執(zhí)行的操作
5、用戶的其余數(shù)據(jù)(比如IP等等)

我們單單從用戶id來說,比如10011802330414,這個ID,那么我們一個id差不多就是一個long類型,因為在大量數(shù)據(jù)存儲的時候,我們都是采用文本存儲。因此對于5億個用戶ID,完全存儲在磁盤當中,大概是5G的大小,對于這個大小,并不能算是大數(shù)據(jù)。但是對于一個案例來說,已經(jīng)非常足夠了。

我們會產(chǎn)生一個5億條ID的數(shù)據(jù)集,我們上面說到,這個數(shù)據(jù)集大小為5G(不壓縮的情況下),因此我不會在GitHub上上傳這樣一個數(shù)據(jù)集,但是我們提供一個方法,來生成一個5億條數(shù)據(jù)。

當然要解決這個問題,你可以依然在local模式下運行項目,但是你得有足夠的磁盤空間和內(nèi)存空間,大概8G磁盤空間(因為除了數(shù)據(jù)本身,spark運行過程還要產(chǎn)生一些臨時數(shù)據(jù)),5G內(nèi)存(要進行reduceByKey)。為了真正展示spark的特性,我們這個案例,將會運行在spark集群上。

關于如何搭建集群,我準備在后續(xù)的章節(jié)補上。但是在網(wǎng)上有大量的集群搭建教程,其中不乏一些詳細優(yōu)秀的教程。當然,這節(jié)我們不講如何搭建集群,但是我們?nèi)匀豢梢蚤_始我們的案例。

問題分析
那么現(xiàn)在我們擁有了一個5億條數(shù)據(jù)(實際上這個數(shù)據(jù)并不以文本存儲,而是在運行的時候生成),從五億條數(shù)據(jù)中,找出訪問次數(shù)最多的人,這看起來并不難。但實際上我們想要通過這個案例了解spark的真正優(yōu)勢。

5億條ID數(shù)據(jù),首先可以用map將其緩存到RDD中,然后對RDD進行reduceByKey,最后找出出現(xiàn)最多的ID。思路很簡單,因此代碼量也不會很多。

實現(xiàn)

scala實現(xiàn)
首先是ID生成方法:
RandomId.class

import org.apache.spark.{SparkConf, SparkContext}object ActiveVisitor {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")val sc = new SparkContext(conf)//生成一個0-9999的列表val list = 1 until 10000val id =new RandomId()//這里記錄最大的次數(shù)var max = 0//這里記錄最大次數(shù)的IDvar maxId = 0Lval lastNum = sc.parallelize(list)//第一步生成5億條數(shù)據(jù).flatMap(num => {//遍歷list列表//總共遍歷1萬次每次生成5萬個IDvar list2 = List(id.next())for (i <- 1 to 50000){list2 = id.next() :: list2}//這里記錄當前生成ID的百分比println(num/1000.0 +"%")//返回生成完成后的list//每次循環(huán)里面都包含5萬個IDlist2})//遍歷5億條數(shù)據(jù)//為每條數(shù)據(jù)出現(xiàn)標記1.map((_,1))//對標記后的數(shù)據(jù)進行處理//得到每個ID出現(xiàn)的次數(shù),即(ID,Count).reduceByKey(_+_)//遍歷處理后的數(shù)據(jù).foreach(x => {//將最大值存儲在max中if (x._2 > max){max = x._2maxId = x._1//若X比之前記錄的值大,則輸出該id和次數(shù)//最后一次輸出結果,則是出現(xiàn)次數(shù)最多的的ID和以及其出現(xiàn)的次數(shù)//當然出現(xiàn)次數(shù)最多的可能有多個ID//這里只輸出一個println(x)}})}}

然后是用它生成5億條數(shù)據(jù)

import org.apache.spark.{SparkConf, SparkContext}object ActiveVisitor {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")val sc = new SparkContext(conf)val list = 1 until 100000val id =new RandomId()var max = 0var maxId = 0Lval lastNum = sc.parallelize(list).flatMap(num => {var list2 = List(id.next())for (i <- 1 to 50000){list2 = id.next() :: list2}println(num +"%")list2}).map((_,1)).reduceByKey(_+_).foreach(x => {if (x._2 > max){max = x._2maxId = x._1println(x)}})} }

處理5億條數(shù)據(jù)

import org.apache.spark.{SparkConf, SparkContext}object ActiveVisitor {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")val sc = new SparkContext(conf)//生成一個0-9999的列表val list = 1 until 10000val id =new RandomId()//這里記錄最大的次數(shù)var max = 0//這里記錄最大次數(shù)的IDvar maxId = 0Lval lastNum = sc.parallelize(list)//第一步生成5億條數(shù)據(jù).flatMap(num => {//遍歷list列表//總共遍歷1萬次每次生成5萬個IDvar list2 = List(id.next())for (i <- 1 to 50000){list2 = id.next() :: list2}//這里記錄當前生成ID的百分比println(num/1000.0 +"%")//返回生成完成后的list//每次循環(huán)里面都包含5萬個IDlist2})//遍歷5億條數(shù)據(jù)//為每條數(shù)據(jù)出現(xiàn)標記1.map((_,1))//對標記后的數(shù)據(jù)進行處理//得到每個ID出現(xiàn)的次數(shù),即(ID,Count).reduceByKey(_+_)//遍歷處理后的數(shù)據(jù).foreach(x => {//將最大值存儲在max中if (x._2 > max){max = x._2maxId = x._1//若X比之前記錄的值大,則輸出該id和次數(shù)//最后一次輸出結果,則是出現(xiàn)次數(shù)最多的的ID和以及其出現(xiàn)的次數(shù)//當然出現(xiàn)次數(shù)最多的可能有多個ID//這里只輸出一個println(x)}})} }

運行得到結果
將其提交到spark上運行,觀察日志

1% 5000% 2% 5001% 3% 5002% 4% 5003% 5% 5004% 6% 5005% 7% 5006% 8% 5007% 9% 5008% 10% 5009% 11% 5010% 12% 5011% 5012% 13% 5013% 14% 15% 5014%... ... ...

這里是輸出的部分日志,從日志中,我們顯然發(fā)現(xiàn),程序是并行的。我采用的集群由四個節(jié)點組成,每個節(jié)點提供5G的內(nèi)存空間,集群在不同節(jié)點中運行,有節(jié)點分配到的分區(qū)是從1開始,而有節(jié)點則是從5000開始,因此程序并沒有按照我們所想的從1%-9999%。好在未按照順序執(zhí)行,也并不影響最終結果,畢竟最終要進行一個reduceByKey,才是我們真正需要得到結果的地方。
再看日志另一部分:

5634% 5635% 5636% 5637% 5638% 5639% 5640% 5641% 5642% 5643% 5644% 5645% 2019-03-05 11:52:14 INFO ExternalSorter:54 - Thread 63 spilling in-memory map of 1007.3 MB to disk (2 times so far) 647% 648% 649% 650% 651% 652% 653% 654% 655% 656%

注意到這里,spilling in-memory map of 1007.3 MB to disk,spilling操作將map中的 1007.3 MB的數(shù)據(jù)溢寫到磁盤中。這是由于spark在處理的過程中,由于數(shù)據(jù)量過于龐大,因此將多的數(shù)據(jù)溢寫到磁盤,當再次用到時,會從磁盤讀取。對于實時性操作的程序來說,多次、大量讀寫磁盤是絕對不被允許的。但是在處理大數(shù)據(jù)中,溢寫到磁盤是非常常見的操作。

事實上,在完整的日志中,我們可以看到有相當一部分日志是在溢寫磁盤的時候生成的,大概49次(這是我操作過程中的總數(shù))
如圖:

總共出現(xiàn)49條溢寫操作的日志,每次大概是1G,這也印證了我們5億條數(shù)據(jù),占據(jù)空間5G的一個說法。事實上,我曾將這5億條數(shù)據(jù)存儲在磁盤中,的確其占據(jù)的空間是5G左右。

結果

最終,我們可以在日志中看到結果。

整個過程持續(xù)了將近47min,當然在龐大的集群中,時間能夠大大縮短,要知道,我們現(xiàn)在只采用了4個節(jié)點。

我們看到了次數(shù)2、4、6、8居然分別出現(xiàn)了兩次,這并不奇怪,因為集群并行運行,異步操作,出現(xiàn)重復結果十分正常,當然我們也可以用并發(fā)機制,去處理這個現(xiàn)象。這個在后續(xù)的案例中,我們會繼續(xù)優(yōu)化結果。

從結果上看,我們發(fā)現(xiàn)5億條數(shù)據(jù)中,出現(xiàn)最多的ID也僅僅出現(xiàn)了8次,這說明了在大量數(shù)據(jù)中,很多ID可能只出現(xiàn)了1次、2次。這也就是為什么最后我采用的是foreach方法去尋找最大值,而不采用如下的方法

import org.apache.spark.{SparkConf, SparkContext}object ActiveVisitor {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")val sc = new SparkContext(conf)//生成一個0-9999的列表val list = 1 until 10000val id =new RandomId()//這里記錄最大的次數(shù)var max = 0//這里記錄最大次數(shù)的IDvar maxId = 0Lval lastNum = sc.parallelize(list)//第一步生成5億條數(shù)據(jù).flatMap(num => {//遍歷list列表//總共遍歷1萬次每次生成5萬個IDvar list2 = List(id.next())for (i <- 1 to 50000){list2 = id.next() :: list2}//這里記錄當前生成ID的百分比println(num/1000.0 +"%")//返回生成完成后的list//每次循環(huán)里面都包含5萬個IDlist2})//遍歷5億條數(shù)據(jù)//為每條數(shù)據(jù)出現(xiàn)標記1.map((_,1))//對標記后的數(shù)據(jù)進行處理//得到每個ID出現(xiàn)的次數(shù),即(ID,Count).reduceByKey(_+_)//為數(shù)據(jù)進行排序//倒序.sortByKey(false)//次數(shù)最多的,在第一個,將其輸出println(lastNum.first())} }

這個方法中,我們對reduceByKey結果進行排序,輸出排序結果的第一個,即次數(shù)最大的ID。這樣做似乎更符合我們的要求。但是實際上,為了得到同樣的結果,這樣做,會消耗更多的資源。如我們所說,很多ID啟其實只出現(xiàn)了一次,兩次,排序的過程中,仍然要對其進行排序。要知道,由于很多ID只出現(xiàn)一次,排序的數(shù)據(jù)集大小很有可能是數(shù)億的條目。

根據(jù)我們對排序算法的了解,這樣一個龐大數(shù)據(jù)集進行排序,勢必要耗費大量資源。因此,我們能夠容忍輸出一些冗余信息,但不影響我們的得到正確結果。

至此,我們完成了5億數(shù)據(jù)中,找出最多出現(xiàn)次數(shù)的數(shù)據(jù)。如果感興趣,可以嘗試用這個方法解決50億條數(shù)據(jù),出現(xiàn)最多的數(shù)據(jù)條目。但是這樣做的話,你得準備好50G的空間。盡管用上述的程序,屬于閱后即焚,但是50億數(shù)據(jù)仍然會耗費大量的時間。

總結

以上是生活随笔為你收集整理的Spark的实战题目——寻找5亿次访问中,访问次数最多的人的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。