基于Spark的电影推荐系统(推荐系统~3)
第四部分-推薦系統-數據加工
- 本模塊基于第2節加載到 數據倉庫 里的數據做進一步的加工,加工后的數據主要用于 模型訓練 。
前置準備
本節我采用Spark on Yarn 來跑作業
拓展:Hadoop YARN中內存的設置
(1)yarn.scheduler.minimum-allocation-mb
單個任務可申請的最少物理內存量,默認是1024(MB),如果一個任務申請的物理內存量少于該值,則該對應的值改為這個數。
(2)yarn.scheduler.maximum-allocation-mb
單個任務可申請的最多物理內存量,默認是8192(MB)。
(3)yarn.nodemanager.resource.memory-mb
表示該節點上YARN可使用的物理內存總量,默認是8192(MB),注意,如果你的節點內存資源不夠8GB,則需要調減小這個值,而YARN不會智能的探測節點的物理內存總量。
$HADOOP_HOME/etc/hadoop
NM(yarn-site.xml)
說明幾點
開始項目Coding
步驟一: 在第二節中的項目中,新建machiniing包,再新建RatingDataApp
package com.csylh.recommend.datamachiningimport org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel/*** Description:* 數據加工: ratings 評分數據[≈0.27億]切分成訓練集和測試集數據 7:3** 為模型訓練 準備的RDD數據集* ratings(userId int,movieId int,rating Double,timestamp int)* ==> timestamp 拆分* trainingData(userId int,movieId int,rating double) 前70% 訓練集* testData(userId int,movieId int,rating double) 后30% 測試集* @Author: 留歌36* @Date: 2019-07-17 14:55*/ object RatingDataApp{def main(args: Array[String]): Unit = {val spark = SparkSession.builder().config("spark.sql.shuffle.partitions","8").enableHiveSupport().getOrCreate()// ps:這里cache內存不夠,會報錯,要增加內存才行// spark sql 性能調優 https://spark.apache.org/docs/latest/sql-performance-tuning.htmlspark.catalog.cacheTable("ratings",StorageLevel.MEMORY_AND_DISK)// 取第一行.first() first :返回的是一個Row對象 第一列元素.getLong(0)val count = spark.sql("select count(1) from ratings").first().getLong(0)val percent = 0.7val trainingdatacount = (count * percent).toInt // 訓練數據集大小val testdatacount = (count * (1 - percent)).toInt // 測試數據集大小// 1.將數據按時間 升序排序,order by limit的時候,需要注意OOM(Out Of Memory)的問題: 全表掃描+limitval trainingDataAsc = spark.sql(s"select userId,movieId,rating from ratings order by timestamp asc")// 2.數據寫入到HDFS上 這一步可能出現OOMtrainingDataAsc.write.mode(SaveMode.Overwrite).parquet("/tmp/trainingDataAsc")// 3.將數據加載到數據倉庫中去spark.sql("drop table if exists trainingDataAsc")spark.sql("create table if not exists trainingDataAsc(userId int,movieId int,rating double) stored as parquet")spark.sql("load data inpath '/tmp/trainingDataAsc' overwrite into table trainingDataAsc")// 將數據按時間 降序排序val trainingDataDesc = spark.sql(s"select userId,movieId,rating from ratings order by timestamp desc")trainingDataDesc.write.mode(SaveMode.Overwrite).parquet("/tmp/trainingDataDesc")spark.sql("drop table if exists trainingDataDesc")spark.sql("create table if not exists trainingDataDesc(userId int,movieId int,rating double) stored as parquet")spark.sql("load data inpath '/tmp/trainingDataDesc' overwrite into table trainingDataDesc")/*** 1.獲取70% 升序排列數據進行訓練模型*/val trainingData = spark.sql(s"select * from trainingDataAsc limit $trainingdatacount")trainingData.write.mode(SaveMode.Overwrite).parquet("/tmp/trainingData")spark.sql("drop table if exists trainingData")spark.sql("create table if not exists trainingData(userId int,movieId int,rating double) stored as parquet")spark.sql("load data inpath '/tmp/trainingData' overwrite into table trainingData")/*** 2.獲取30% 降序排列數據進行測試模型*/val testData = spark.sql(s"select * from trainingDataDesc limit $testdatacount")testData.write.mode(SaveMode.Overwrite).parquet("/tmp/testData")spark.sql("drop table if exists testData")spark.sql("create table if not exists testData(userId int,movieId int,rating double) stored as parquet")spark.sql("load data inpath '/tmp/testData' overwrite into table testData")}}步驟二:將創建的項目進行打包上傳到服務器
mvn clean package -Dmaven.test.skip=true
步驟三:編寫shell 執行腳本
[root@hadoop001 ml]# vim machining.sh export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop$SPARK_HOME/bin/spark-submit \ --class com.csylh.recommend.datamachining.RatingDataApp \ --master yarn \ --name RatingDataApp \ --driver-memory 4g \ --executor-memory 2g \ /root/data/ml/movie-recommend-1.0.jar步驟四:執行 sh machining.sh 即可
sh machining.sh之前:
sh machining.sh之后:
http://hadoop001:8088/cluster/apps/RUNNING
這樣我們基于這份rating基礎數據加工就完成了。接來下就是訓練我們的模型做預測。
有任何問題,歡迎留言一起交流~~
更多文章:基于Spark的電影推薦系統:https://blog.csdn.net/liuge36/column/info/29285
總結
以上是生活随笔為你收集整理的基于Spark的电影推荐系统(推荐系统~3)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql开启慢查询方法(转)
- 下一篇: 豆瓣电影推荐系统(Ⅰ)ItemCF算法原