日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

基于Scala版本的TMDB大数据电影分析项目

發(fā)布時(shí)間:2023/12/31 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于Scala版本的TMDB大数据电影分析项目 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

怒發(fā)沖冠為紅顏

????????基于kaggle的TMDB電影數(shù)據(jù)集的數(shù)據(jù)分析,該數(shù)據(jù)集包括了大約5000部電影的相關(guān)信息。先來(lái)看一下TMDB電影數(shù)據(jù)集的數(shù)據(jù)

?

????????該數(shù)據(jù)集其實(shí)是csv文件,里面記錄這美國(guó)這些年上映的電影,以及電影的種類(lèi),觀(guān)看人數(shù),主題,以及打分等詳細(xì)信息。

????????先來(lái)看一下各個(gè)字段的意義

????????不過(guò)需要注意的是,在csv文件里面并沒(méi)有表頭,也就是說(shuō)并沒(méi)有上面字段。所以在使用Spark SQL處理該數(shù)據(jù)集的時(shí)候,需要?jiǎng)?chuàng)建StructType將上面的字段保存起來(lái)。

使用DataFrame處理

????????通過(guò)Spark讀取csv文件就可以直接創(chuàng)建出來(lái)一個(gè)DataFrame對(duì)象,如果在本項(xiàng)目中直接讀取csv文件雖然不會(huì)報(bào)錯(cuò),但是切分字段的時(shí)候會(huì)出現(xiàn)錯(cuò)誤,原因很簡(jiǎn)單,因?yàn)樵趃enres字段里面有json數(shù)據(jù),不是json的鍋,是這個(gè)genres字段的鍋,先來(lái)看一下這個(gè)genres字段把

[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""name"": ""Adventure""}, {""id"": 14, ""name"": ""Fantasy""}, {""id"": 878, ""name"": ""Science Fiction""}]

????????大體上是這樣的,里面有多個(gè)json,而且每個(gè)json都用",",所以在split分割字段的時(shí)候貨到這這個(gè)genres字段也會(huì)根據(jù)","進(jìn)行分割。

????????為了避免這種情況,第一種方法依舊使用","分割,依靠正則表達(dá)式進(jìn)行分割

val value = sparkContext.textFile("date/tmdb_5000_movies.csv") val value1 = value.map(_.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)")(1)) value1.collect().foreach(println(_))

來(lái)看一下genres字段是否被切分

?

????????顯而易見(jiàn),并沒(méi)有被切分。其實(shí)上面這個(gè)最后是個(gè)RDD類(lèi)型的,當(dāng)然也可以隱式轉(zhuǎn)換,toDF,轉(zhuǎn)換成dataFrame.

????????下面介紹第二種方法

????????通過(guò)StructType對(duì)象,首先將所有字段名放到一個(gè)字符串里面,對(duì)字符串進(jìn)行操作,對(duì)字符串進(jìn)行切分,切分出來(lái)的字符串依次為他們創(chuàng)建StructField對(duì)象(StructField對(duì)象包含了字段名和字段類(lèi)型)

????????spark讀取csv文件的時(shí)候?qū)sv進(jìn)行配置參數(shù),創(chuàng)建dataFrame

????????這個(gè)方法是主要介紹的,來(lái)看一下具體操作把

//將轉(zhuǎn)換操作構(gòu)造成一個(gè)方法def transform_demo() = { //創(chuàng)建Spark SQL環(huán)境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark Demo") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //創(chuàng)建字段名字符串 val scheamdString = "budget,genres,homepage,id,keywords,original_language,original_title, overview,"+"popularity,production_companies,production_countries, release_date,revenue,runtime,spoken_languages,status" + "tagline,title,vote_average,vote_count" //將上面的字符串拆分出來(lái) 并且map到每一個(gè)StructField對(duì)象里面 val fields = scheamdString.split(",").map(fieldname => StructField(fieldname, StringType, nullable = true))//創(chuàng)建StructType對(duì)象,該對(duì)象包含每個(gè)字段的名稱(chēng) val scheam = StructType(fields) //csv文件所在的路徑 val path = "date/tmdb_5000_movies.csv" //創(chuàng)建mdf對(duì)象 val mdf = sparkSession.read.format("com.databricks.spark.csv") .schema(scheam) //指定所謂的表頭.option("inferSchema",value = false) .option("header",value = true) //schema就是header .option("nullValue","\\N") .option("escape","\"") .option("quoteAll","true").option("seq",",").csv(path) mdf }

????????transform_Demo方法最后返回的是DataFrame對(duì)象。

????????來(lái)看一下dataFrame的schema

TMDB電影體裁的分布

????????什么是電影體裁的分布,說(shuō)白了就是計(jì)數(shù),所謂的WordCount。就是對(duì)genres字段進(jìn)行計(jì)數(shù),genres字段里面的數(shù)據(jù)

????????對(duì)里面的name進(jìn)行wc。每個(gè)電影可能會(huì)屬于多個(gè)體裁,因此本題的意義就是求出來(lái)所有電影的體裁,關(guān)于愛(ài)情片的電影有多少部,關(guān)于動(dòng)作片的電影有多少部。

????????需要先解析json數(shù)據(jù),從每個(gè)電影中取出對(duì)應(yīng)的體裁,使用wc進(jìn)行計(jì)數(shù)統(tǒng)計(jì)。

1.實(shí)現(xiàn)一個(gè)CountByJson()函數(shù),該函數(shù)實(shí)現(xiàn)從json解析出來(lái)并進(jìn)行計(jì)數(shù)

def countByJson(field:String):org.apache.spark.rdd.RDD[(String,Int)] ={//對(duì)genres字段進(jìn)行解析 genres字段只有id,name兩個(gè)列名 //jsonSchema包含了列名val jsonSchema =ArrayType(new StructType() .add("id", IntegerType).add("name",StringType)) //mdf是上面創(chuàng)建出來(lái)的dataframe //對(duì)genres字段進(jìn)行查找篩選, //[json{}.json{},json{}] //from_json(數(shù)據(jù),列名) 使用explode炸裂之后就是數(shù)組 mdf.select(mdf.col(field)).filter(mdf.col(field).isNotNull)//炸裂之后,重命名為 genres .select(explode(from_json(mdf.col(field), jsonSchema)) .as(field)) //genres.concat("name") = => genres.name,直接訪(fǎng)問(wèn)name字段 .select(field.concat(".name")) //轉(zhuǎn)換成RDD .rdd //(體裁,1) .map(name=>(name.toString(),1)) //分區(qū)數(shù)為1 .repartition(1) //reducrByKey 根據(jù)key進(jìn)行計(jì)數(shù) .reduceByKey((x,y) => x + y) }

來(lái)看一下精簡(jiǎn)的流程圖,讓大家對(duì)上面的步驟有更加深刻的印象

?

????????雖然經(jīng)過(guò)上面的步驟,我們完成了對(duì)電影體裁的計(jì)數(shù),但是細(xì)心的同學(xué)應(yīng)該會(huì)發(fā)現(xiàn)里面的name被[]包著,為了更加讓我們能夠一目了然,我們對(duì)上面的結(jié)果進(jìn)行轉(zhuǎn)化內(nèi),將[]刪除。只留下(name,count)。

def countByGenres()={ val genresRDD = countByJson("genres") //將RDD的結(jié)果collect之后,對(duì)key進(jìn)行轉(zhuǎn)化內(nèi),去掉[] val jsonString =genresRDD.collect().toList.map { case(genre,count) => ((genre.replace("[","").replace("]","")),count) } //遍歷結(jié)果 jsonString.foreach(println(_)) }

?

????????可以看到,我們已經(jīng)去掉了中括號(hào)。現(xiàn)在展示結(jié)果我們能更加的一目了然了。

前100個(gè)常見(jiàn)關(guān)鍵詞

????????找出關(guān)鍵詞中出現(xiàn)頻率最高的前100個(gè),依舊是詞頻統(tǒng)計(jì)進(jìn)行WC。不過(guò)keywords單詞是json格式的,意味著我們依舊需要對(duì)json進(jìn)行解析。解析完成之后,進(jìn)行上面的wc操作。

????????因此只需要更改函數(shù)countByJson里面的參數(shù)就可以。

????????看一下keywords的格式

????????思路很簡(jiǎn)單,通過(guò)CountByJson之后,返回的是一個(gè)RDD。經(jīng)過(guò)reduceByKey之后處理的數(shù)據(jù)

????????(name,count)

????????做到這里,這道題目就已經(jīng)結(jié)束了,如果為了美觀(guān)也可以對(duì)(name,count)進(jìn)行轉(zhuǎn)化,和上面的一樣,去掉[]。

def countByKeywords() = { val keywordsRDD = countByJson("keywords").sortBy(_._2,false) val jsonString = keywordsRDD.take(100).toList.map{case(keywords,count) => { (keywords.replace("[","").replace("]",""),count) } } jsonString }

????????需要注意的是,reducrByKey處理后的數(shù)據(jù)并沒(méi)有進(jìn)行排序,經(jīng)過(guò)轉(zhuǎn)換算子sortBy根據(jù)(name,count),元組中的count進(jìn)行排序(從大到小)。

TMDB最常見(jiàn)的10種預(yù)算數(shù)

????????探究每個(gè)電影的預(yù)算數(shù),首先對(duì)預(yù)算字段進(jìn)行過(guò)濾,去除預(yù)算項(xiàng)目為0的電影項(xiàng)目,根據(jù)預(yù)算聚合并聚合,根據(jù)計(jì)數(shù)進(jìn)行排序。取前十個(gè)為最終的結(jié)果。

????????先來(lái)看一下budget字段的信息

?

????????這種數(shù)據(jù)最簡(jiǎn)單,雖然在csv里面是整型的,但是我們?cè)趧?chuàng)建StructType對(duì)象的時(shí)候,創(chuàng)建的是字符串類(lèi)型的。因此在使用DSL語(yǔ)言的時(shí)候需要先轉(zhuǎn)化列的數(shù)據(jù)類(lèi)型,分組聚合group + ????????count()+order by取前10.

def countByBudget(order:String,ascending:Boolean):Array[Row]={ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark Demo") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ if(ascending){ //對(duì)budget字段進(jìn)行轉(zhuǎn)化類(lèi)型 //filter過(guò)濾//group by + count 分組聚合//orderby排序 transform_demo().withColumn("budget",col("budget") .cast("Integer")) .filter($"budget" > 0 ).groupBy("budget") .count().orderBy(order)take(10) } else{ transform_demo().withColumn("budget", col("budget") .cast("Integer")).filter($"budget" > 0).groupBy("budget").count() .orderBy(desc(order)).take(10) } }

TMDB中最常見(jiàn)的電影時(shí)長(zhǎng)(只展示電影數(shù)>100的時(shí)長(zhǎng))

????????顯而易見(jiàn),這些小實(shí)驗(yàn)都是wc,不過(guò)是在wc的基礎(chǔ)上增加了或多或少的數(shù)據(jù)操作。來(lái)看看這個(gè)電影時(shí)長(zhǎng)的操作吧。

????????電影數(shù)>100 runtime > 100

????????先看一下runtime的數(shù)據(jù)類(lèi)型

?

????????transform_demo生成的是DataFrame,我們可以直接在DataFrame上進(jìn)行操作,需要先轉(zhuǎn)換數(shù)據(jù)類(lèi)型,將runtime轉(zhuǎn)換成Integer整型然后根據(jù)下面的操作進(jìn)行

????????filter + group by + count +orderby

????????直接來(lái)看代碼吧

def distrbutionOfRuntime() = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark Demo") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ transform_demo().withColumn("runtime",col("runtime").cast("Integer")) .groupBy("runtime").count().filter($"count" > 100).orderBy($"count".asc) .collect() }

生產(chǎn)電影最多的10大公司

????????對(duì)生產(chǎn)公司這個(gè)字段'product_companies'

????????不過(guò)需要注意的是這個(gè)字段依舊是包含著多個(gè)json,使用countByJson就可以實(shí)現(xiàn)對(duì)這種字段進(jìn)行json解析以及wc操作。經(jīng)過(guò)countByJson之后,就已經(jīng)是(name,count)這種形式,已經(jīng)完成了計(jì)數(shù)。

????????計(jì)數(shù)完成之后,(name,count)這個(gè)元組,根據(jù)元組第二個(gè)元素進(jìn)行SortBy排序,排序完成之后取前十個(gè)(name,count)就完成了目標(biāo)

def countByCompanies() = { val production_companiesRDD = countByJson("production_companies") .sortBy(_._2,false) val list = production_companiesRDD.take(10).toList.map {case (company, count) => {(company.replace("[", "").replace("]", ""), count) } } list }

TMDB的10大電影語(yǔ)言

????????該字段同樣也是json字段,使用countByJson方法進(jìn)行解析之后,進(jìn)行的操作差不多跟之前的操作千篇一律。

//TMDB中的10大電影語(yǔ)言 def countByLanguage() = { val countByLanguage = countByJson("spoken_languages").sortBy(_._2,false) val joinString = countByLanguage.take(10).toList.map {case (language, count) => (language.replace("[", "").replace("]", ""), count) } joinString }

總結(jié)

????????這是一個(gè)入門(mén)案例,雖然說(shuō)是入門(mén)案例,其實(shí)并不簡(jiǎn)單,對(duì)于還有json的字段,需要進(jìn)行處理,單獨(dú)split會(huì)報(bào)錯(cuò),需要split+正則,使用Spark SQL需要?jiǎng)?chuàng)建StructType對(duì)象,使用相關(guān)的json解析函數(shù)。總之呢剛開(kāi)始的時(shí)候可能會(huì)很難,但是做的多了發(fā)現(xiàn)并不是那么的難。

????????可能這個(gè)案例比較簡(jiǎn)單,之后會(huì)有更多的案例,不過(guò)到時(shí)候進(jìn)行的操作可能更難。差不多五月份左右會(huì)將數(shù)倉(cāng)項(xiàng)目梳理一遍。

????????這個(gè)項(xiàng)目很簡(jiǎn)單的,但是為了這個(gè)項(xiàng)目我花了有將近三天把,StructType不懂先去看了StructType,Spark SQL用到的相關(guān)函數(shù)不會(huì)又去看了看函數(shù)。

????????大家也要努力奧,好想好想她

總結(jié)

以上是生活随笔為你收集整理的基于Scala版本的TMDB大数据电影分析项目的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。