基于Spark的Als算法+自迭代+Spark2.0新写法
主要介紹了一下幾點(diǎn):?
1矩陣分解的幾種算法?
2spark使用矩陣分解的幾種方式,1ml 包中使用,2mllib包中的使用,其實(shí)有不調(diào)用包自己寫的案列(可以去看看哈,就在example目錄)?
3使用ALS做推薦的一個比較詳細(xì)的流程:1自迭代確定比較優(yōu)的參數(shù)是,2使用參數(shù)訓(xùn)練模型,3使用模型推薦topn的物品給用戶?
4講了怎么自迭代ALS算法參數(shù),感覺這個還重要點(diǎn)?
5提交spark的報了一個錯誤,已經(jīng)錯誤解決方式?
6好多細(xì)節(jié)都沒寫,感覺要寫的有好多,也不是很完善,時間不夠,只是提供了核心代碼和思路
一:Spark2.0新概率解釋(僅限本文使用)
1 SparkSession
SparkSession是spark2.0的全新切入點(diǎn),以前都是sparkcontext創(chuàng)建RDD的,StreamingContext,sqlContext,HiveContext。?
DataDrame提供的API慢慢的成為新的標(biāo)準(zhǔn)API,我們需要1個新的切入點(diǎn)來構(gòu)建他,這個就是SparkSession哈,以前我也沒見過.官網(wǎng)API介紹?
?
官網(wǎng)上說,這是用來構(gòu)建Dataset和DataFrame的API的切入點(diǎn)。在環(huán)境中,SparkSession已經(jīng)預(yù)先創(chuàng)建了,我們需要使用bulder方法得到已經(jīng)存在在SparkSession。使用方法如下:
- 1
- 2
- 3
- 4
- 5
- 6
二:ALS算法
1含義
在現(xiàn)實(shí)中用戶-物品-評分矩陣是及其大的,用戶消費(fèi)有限,對單個用戶來說,消費(fèi)的物品的非常有限的,產(chǎn)生的評分也是比較少的,這樣就造成了用戶-物品矩陣有大量的空值。?
假定用戶的興趣只受少數(shù)因素的影響,所以用戶-物品矩陣可以分解為用戶的特征向量矩陣和物品的特征向量矩陣(降維了)。用戶的特征向量距離表示用戶的興趣(U),物品的特征向量矩陣代表用戶的特點(diǎn)(V),合起來(內(nèi)積)表示用戶對物品的特點(diǎn)的興趣,也就是喜好程度。?
M=U*V
2協(xié)同過濾矩陣分解算法
2.1奇異值分解(SVD)
矩陣的奇異值分解是最簡單的一種矩陣分解算法,主要是在U*V中間加了個一個奇異值矩陣,公式如下:?
M=U*(奇異值矩陣)*(V的共軛)?
奇異值矩陣是對角矩陣,奇異值分解的缺點(diǎn)(沒試過不知道,書上說的),1不允許分解矩陣有null值,需要進(jìn)行填分,2如果填分,又有兩個問題:1增加數(shù)據(jù)量,增加算法復(fù)雜度,2簡單粗暴的填分方式會導(dǎo)致數(shù)據(jù)失真,如果將null值設(shè)置為0,那么會導(dǎo)致過度學(xué)習(xí)問題。?
奇異值分解方式,感覺用的不多,我自己接觸的話。
2.2正則化矩陣分解
加入正則化是為了解決稀疏矩陣可能過學(xué)習(xí)問題,評價矩陣分解是RMSE,通過最小化RMSE來學(xué)習(xí)用戶特征矩陣U和物品特征矩陣V,在RMSE函數(shù)中加入了正則化項(xiàng)減少過擬合,公式如下,公式都是書上寫的哈,這里截圖:?
K表示評分記錄(u用戶對I物品的評分),Ru,i表示用戶u對物品i的真實(shí)評分,誒夢達(dá)表示正則化系數(shù),誒夢達(dá)后面的表示防止過擬合的正則化項(xiàng)。?
加入正則化的含義可以理解為,修改rmse,不要其太大或者太小。?
假設(shè)用戶特征矩陣為Umt,物品評分矩陣為Vtn,其中t特征<
2.3帶偏置的矩陣分解(說的很有道理,但是比較難評估)
理論就不說了,舉個例子,u1對v1的評分為4表示u1對v1這個物品非常喜歡,u2對v1的評分為4表示u1對v1一般喜歡,對用用戶來說,即使他們對同一物品的評分相同,但是表示他們的喜好程度并不是一樣的。同理對于物品來說也是一樣。把這種獨(dú)立于用戶和獨(dú)立于物品的影響因素成為偏置,偏置一共有3個部分組成。?
1訓(xùn)練集中所有評分記錄的全局平均,表示訓(xùn)練集中總體評分情況,一般是一個常數(shù)。?
2用戶偏置bu,獨(dú)立于物品特征因素,表示用戶特定的打分習(xí)慣。?
3物品偏置bi,表示獨(dú)立于用戶特征因素,舉個列子,好片子一般總體評分偏高,爛片一般評分偏低,偏置就是表示這種特征。?
以上的所有偏置對用戶對物品喜好無關(guān),得到的預(yù)測評分公式如下:?
按照這種思路,其實(shí)還要很多其他優(yōu)化,比如加入時間因素,社會流行因素等。
Spark使用的是帶正則化矩陣分解,優(yōu)化函數(shù)的方式選用的是交叉最小二乘法ALS
三Spark代碼
spark代碼一半是官方列子修改過來的哈
1調(diào)用ml包
使用org.apache.spark.ml.recommendation.ALS來計算,并且使用了spark2.0的新特性SparkSession來實(shí)現(xiàn)推薦,具體代碼與注釋如下:
package org.wq.scala.ml import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS import org.apache.spark.sql.SparkSession/*** Created by Administrator on 2016/10/24.*/ //這是spark新的Als算法的列子 object ALSRecommendNewTest {//定義個類,來保存一次評分哈case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)//把一行轉(zhuǎn)換成一個評分類def parseRating(str: String): Rating = {val fields = str.split("::")assert(fields.size == 4)Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)}def main(args:Array[String])={//SparkSession是spark2.0的全新切入點(diǎn),以前都是sparkcontext創(chuàng)建RDD的,StreamingContext,sqlContext,HiveContext。//DataDrame提供的API慢慢的成為新的標(biāo)準(zhǔn)API,我們需要1個新的切入點(diǎn)來構(gòu)建他,這個就是SparkSession哈//以前我也沒見過val spark = SparkSession.builder().config("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse").master("local").appName("ALSExample").getOrCreate()import spark.implicits._//read方法返回的是一個DataFrameReader類,可以轉(zhuǎn)換為DataFrame//DataFrameReader類的textFile方法:加載文本數(shù)據(jù),返回為Dataset//使用一個函數(shù)parseRating處理一行數(shù)據(jù)val ratings = spark.read.textFile("data/mllib/sample_movielens_ratings.txt").map(parseRating).toDF()val Array(training,test)=ratings.randomSplit(Array(0.8, 0.2))// Build the recommendation model using ALS on the training data//使用訓(xùn)練數(shù)據(jù)訓(xùn)練模型//這里的ALS是import org.apache.spark.ml.recommendation.ALS,不是mllib中的哈//setMaxiter設(shè)置最大迭代次數(shù)//setRegParam設(shè)置正則化參數(shù),日lambda這個不是更明顯么//setUserCol設(shè)置用戶id列名//setItemCol設(shè)置物品列名//setRatingCol設(shè)置打分列名val als = new ALS()als.setRank(10).setMaxIter(5).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")//fit給輸出的數(shù)據(jù),訓(xùn)練模型,fit返回的是ALSModel類val model = als.fit(training)//使用測試數(shù)據(jù)計算模型的誤差平方和//transform方法把數(shù)據(jù)dataset換成dataframe類型,預(yù)測數(shù)據(jù)val predictions = model.transform(test)//RegressionEvaluator這個類是用戶評估預(yù)測效果的,預(yù)測值與原始值//這個setLabelCol要和als設(shè)置的setRatingCol一致,不然會報錯哈//RegressionEvaluator的setPredictionCol必須是prediction因?yàn)?#xff0c;ALSModel的默認(rèn)predictionCol也是prediction//如果要修改的話必須把ALSModel和RegressionEvaluator一起修改//model.setPredictionCol("prediction")和evaluator.setPredictionCol("prediction")//setMetricName這個方法,評估方法的名字,一共有哪些呢?//rmse-平均誤差平方和開根號//mse-平均誤差平方和//mae-平均距離(絕對)//r2-沒用過不知道//這里建議就是用rmse就好了,其他的基本都沒用,當(dāng)然還是要看應(yīng)用場景,這里是預(yù)測分值就是用rmse。如果是預(yù)測距離什么的mae就不從,看場景哈val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")val rmse = evaluator.evaluate(predictions)println("Root-mean-square error = "+rmse)//stop是停止底層的SparkContextspark.stop()} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
2調(diào)用mllib,實(shí)現(xiàn)
使用mllib中的ALS算法如下,如果是生產(chǎn),建議使用mllib中的
package org.wq.scala.mlimport org.apache.log4j.{Level, Logger} import org.apache.spark.examples.mllib.AbstractParamsimport scala.collection.mutable //處理輸入?yún)?shù)的庫 import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scopt.OptionParser/*** Created by Administrator on 2016/10/24.*/ object ALSRecommendMllibTest {//參數(shù)含義//input表示數(shù)據(jù)路徑//kryo表示是否使用kryo序列化//numIterations迭代次數(shù)//lambda正則化參數(shù)//numUserBlocks用戶的分塊數(shù)//numProductBlocks物品的分塊數(shù)//implicitPrefs這個參數(shù)沒用過,但是通過后面的可以推斷出來了,是否開啟隱藏的分值參數(shù)閾值,預(yù)測在那個級別才建議推薦,這里是5分制度的,詳細(xì)看后面代碼case class Params(input: String = null,output:String=null,kryo: Boolean = false,numIterations: Int = 20,lambda: Double = 1.0,rank: Int = 10,numUserBlocks: Int = -1,numProductBlocks: Int = -1,implicitPrefs: Boolean = false) extends AbstractParams[Params]def main(args: Array[String]) {val defaultParams = Params()//規(guī)定參數(shù)的輸入方式 --rank 10 這種//我個人習(xí)慣為直接用空格分割(如果參數(shù)不對,給予提示),當(dāng)然下面這種更規(guī)范化和人性化,還有默認(rèn)參數(shù)的//以后再研究OptionParser用法,不過他這種參數(shù)用法挺好用的哈val parser = new OptionParser[Params]("Mllib 的ALS") {head("MovieLensALS: an example app for ALS on MovieLens data.")opt[Int]("rank").text(s"rank, default: ${defaultParams.rank}").action((x, c) => c.copy(rank = x))opt[Int]("numIterations").text(s"number of iterations, default: ${defaultParams.numIterations}").action((x, c) => c.copy(numIterations = x))opt[Double]("lambda").text(s"lambda (smoothing constant), default: ${defaultParams.lambda}").action((x, c) => c.copy(lambda = x))opt[Unit]("kryo").text("use Kryo serialization").action((_, c) => c.copy(kryo = true))opt[Int]("numUserBlocks").text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)").action((x, c) => c.copy(numUserBlocks = x))opt[Int]("numProductBlocks").text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)").action((x, c) => c.copy(numProductBlocks = x))opt[Unit]("implicitPrefs").text("use implicit preference").action((_, c) => c.copy(implicitPrefs = true))arg[String]("<input>").required().text("input paths to a MovieLens dataset of ratings").action((x, c) => c.copy(input = x))arg[String]("<output>").required().text("output Model Path").action((x, c) => c.copy(output = x))note("""|For example, the following command runs this app on a synthetic dataset:|| bin/spark-submit --class org.apache.spark.examples.mllib.MovieLensALS \| examples/target/scala-*/spark-examples-*.jar \| --rank 5 --numIterations 20 --lambda 1.0 --kryo \| data/mllib/sample_movielens_data.txt""".stripMargin)}//雖然是map但是只運(yùn)行1次哈,主要看run方法做了什么parser.parse(args, defaultParams).map { params =>run(params)} getOrElse {System.exit(1)}}def run(params: Params) {val conf = new SparkConf().setAppName(s"MovieLensALS with $params").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")//如果參數(shù)設(shè)置了kryo序列化沒那么需要注冊序列化的類和配置序列化的緩存,模板照著寫就是了//使用序列化是為傳輸?shù)臅r候速度更快,我沒有使用這個,因?yàn)榉葱蛄性捯残枰欢ǖ臅r間,我是局域網(wǎng)搭建spark集群的(機(jī)子之間很快)。// 如果是在云搭建集群可以考慮使用if (params.kryo) {conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])).set("spark.kryoserializer.buffer", "8m")}val sc = new SparkContext(conf)//設(shè)置log基本,生產(chǎn)也建議使用WARNLogger.getRootLogger.setLevel(Level.WARN)//得到因此的級別val implicitPrefs = params.implicitPrefs//讀取數(shù)據(jù),并通過是否設(shè)置了分值閾值來修正評分//官方推薦是,只有哦大于3級別的時候才值得推薦//且下面的代碼,implicitPrefs,直接就是默認(rèn)5 Must see,按道理會根據(jù)自己對分?jǐn)?shù)閾值的預(yù)估,rating減去相應(yīng)的值,比如fields(2).toDouble - 2.5//5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5//現(xiàn)在是5分值的映射關(guān)系,如果是其他分值的映射關(guān)系有該怎么做?還不確定,個人建議別使用這個了。//經(jīng)過下面代碼推斷出,如果implicitPrefs=true或者flase,true的意思是,預(yù)測的分?jǐn)?shù)要大于2.5(自己設(shè)置),才能推薦給用戶,小了,沒有意義//它引入implicitPrefs的整體含義為,只有用戶對物品的滿意達(dá)到一定的值,才推薦,不然推薦不喜歡的沒有意思,所以在構(gòu)建樣本的時候,會減去相應(yīng)的值fields(2).toDouble - 2.5(自己設(shè)置)//這種理論是可以的,但是還有一個理論,不給用戶推薦比給用戶推薦錯了還要嚴(yán)重(有人提出過),不推薦產(chǎn)生的效果還要嚴(yán)重,還有反向推薦,//我把implicitPrefs叫做分值閾值val ratings = sc.textFile(params.input).map { line =>val fields = line.split("::")if (implicitPrefs) {/** MovieLens ratings are on a scale of 1-5:* 5: Must see* 4: Will enjoy* 3: It's okay* 2: Fairly bad* 1: Awful* So we should not recommend a movie if the predicted rating is less than 3.* To map ratings to confidence scores, we use* 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved* entries are generally between It's okay and Fairly bad.* The semantics of 0 in this expanded world of non-positive weights* are "the same as never having interacted at all".*/Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)} else {Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)}}.cache()//計算一共有多少樣本數(shù)val numRatings = ratings.count()//計算一共有多少用戶val numUsers = ratings.map(_.user).distinct().count()//計算應(yīng)該有多少物品val numMovies = ratings.map(_.product).distinct().count()println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")//按80%訓(xùn)練,20%驗(yàn)證分割樣本val splits = ratings.randomSplit(Array(0.8, 0.2))//把訓(xùn)練樣本緩存起來,加快運(yùn)算速度val training = splits(0).cache()//構(gòu)建測試樣,我先翻譯下他說的英文哈。//分值為0表示,我對物品的評分不知道,一個積極有意義的評分表示:有信心預(yù)測值為1//一個消極的評分表示:有信心預(yù)測值為0//在這個案列中,我們使用的加權(quán)的RMSE,這個權(quán)重為自信的絕對值(命中就為1,否則為0)//關(guān)于誤差,在預(yù)測和1,0之間是不一樣的,取決于r 是正,還是負(fù)//這里splits已經(jīng)減了分值閾值了,所以>0 =1 else 0的含義是,1表示分值是大于分值閾值的,這里是大于2.5,0表示小于2.5val test = if (params.implicitPrefs) {/** 0 means "don't know" and positive values mean "confident that the prediction should be 1".* Negative values means "confident that the prediction should be 0".* We have in this case used some kind of weighted RMSE. The weight is the absolute value of* the confidence. The error is the difference between prediction and either 1 or 0,* depending on whether r is positive or negative.*/splits(1).map(x => Rating(x.user, x.product, if (x.rating > 0) 1.0 else 0.0))} else {splits(1)}.cache()//訓(xùn)練樣本量和測試樣本量val numTraining = training.count()val numTest = test.count()println(s"Training: $numTraining, test: $numTest.")//這里應(yīng)為不適用ratings了,釋放掉它占的內(nèi)存ratings.unpersist(blocking = false)//setRank設(shè)置隨機(jī)因子,就是隱藏的屬性//setIterations設(shè)置最大迭代次數(shù)//setLambda設(shè)置正則化參數(shù)//setImplicitPrefs 是否開啟分值閾值//setUserBlocks設(shè)置用戶的塊數(shù)量,并行化計算,當(dāng)特別大的時候需要設(shè)置//setProductBlocks設(shè)置物品的塊數(shù)量val model = new ALS().setRank(params.rank).setIterations(params.numIterations).setLambda(params.lambda).setImplicitPrefs(params.implicitPrefs).setUserBlocks(params.numUserBlocks).setProductBlocks(params.numProductBlocks).run(training)//訓(xùn)練的樣本和測試的樣本的分值全部是減了2.5分的//測試樣本的分值如果大于0為1,else 0,表示分值大于2.5才預(yù)測為Ok//計算rmseval rmse = computeRmse(model, test, params.implicitPrefs)println(s"Test RMSE = $rmse.")//保存模型,模型保存路勁為model.save(sc,params.output)println("模型保存成功,保存路勁為:"+params.output)sc.stop()}/** Compute RMSE (Root Mean Squared Error). */def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean): Double = {//內(nèi)部方法含義如下// 如果已經(jīng)開啟了implicitPref那么,預(yù)測的分值大于0的為1,小于0的為0,沒有開啟的話,就是用原始分值//min(r,1.0)求預(yù)測分值和1.0那個小,求小值,然后max(x,0.0)求大值, 意思就是把預(yù)測分值大于0的為1,小于0 的為0//這樣構(gòu)建之后預(yù)測的預(yù)測值和測試樣本的樣本分值才一直,才能進(jìn)行加權(quán)rmse計算def mapPredictedRating(r: Double): Double = {if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r}//根據(jù)模型預(yù)測,用戶對物品的分值,predict的參數(shù)為RDD[(Int, Int)]val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))//mapPredictedRating把預(yù)測的分值映射為1或者0//join連接原始的分?jǐn)?shù),連接的key為x.user, x.product//values方法表示只保留預(yù)測值,真實(shí)值val predictionsAndRatings = predictions.map{ x =>((x.user, x.product), mapPredictedRating(x.rating))}.join(data.map(x => ((x.user, x.product), x.rating))).values//最后計算預(yù)測與真實(shí)值的平均誤差平方和//這是先每個的平方求出來,然后再求平均值,最后開方math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
3找到最優(yōu)(可能最優(yōu)哈)參數(shù)
package org.wq.scala.mlimport org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** Created by Administrator on 2016/10/24.*/ object ALSRecommendMllibBestParamTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ALS_mllib_best_param").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")val sc = new SparkContext(conf)//設(shè)置log基本,生產(chǎn)也建議使用WARNLogger.getRootLogger.setLevel(Level.WARN)//第一步構(gòu)建time,Ratingval movie = sc.textFile("data/mllib/sample_movielens_ratings.txt")val ratings = movie.map(line=>{val fields = line.split("::")val rating = Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)val timestamp =fields(3).toLong%5(timestamp,rating)})//輸出數(shù)據(jù)的基本信息val numRatings = ratings.count()val numUser = ratings.map(_._2.user).distinct().count()val numItems = ratings.map(_._2.product).distinct().count()println("樣本基本信息為:")println("樣本數(shù):"+numRatings)println("用戶數(shù):"+numUser)println("物品數(shù):"+numItems)val sp = ratings.randomSplit(Array(0.6,0.2,0.2))//第二步驟//使用日期把數(shù)據(jù)分為訓(xùn)練集(timestamp<6),驗(yàn)證集(6<timestamp<8)和測試集(timestamp>8)/* val training = ratings.filter(x=>x._1<6).values.repartition(2).cache()val validation = ratings.filter(x=>x._1>6 && x._1<8).values.repartition(2).cache()val test=ratings.filter(x=>x._1>=8).values.cache()*///樣本時間參數(shù)都一樣,測試就使用隨機(jī)算了val training=sp(0).map(x=>Rating(x._2.user,x._2.product,x._2.rating)).repartition(2).cache()val validation=sp(1).map(x=>Rating(x._2.user,x._2.product,x._2.rating)).repartition(2).cache()val test=sp(1).map(x=>Rating(x._2.user,x._2.product,x._2.rating))val numTraining = training.count()val numValidation=validation.count()val numTest=test.count()println("驗(yàn)證樣本基本信息為:")println("訓(xùn)練樣本數(shù):"+numTraining)println("驗(yàn)證樣本數(shù):"+numValidation)println("測試樣本數(shù):"+numTest)//第三步//定義RMSE方法def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating]):Double={val predictions:RDD[Rating]=model.predict(data.map(x=>(x.user,x.product)))val predictionAndRatings = predictions.map(x=>{((x.user,x.product),x.rating)}).join(data.map(x=>((x.user,x.product),x.rating))).valuesmath.sqrt(predictionAndRatings.map(x=>(x._1-x._2)*(x._1-x._2)).mean())}//第四步驟,使用不同的參數(shù)訓(xùn)練模型,并且選擇RMSE最小的模型,規(guī)定參數(shù)的范圍//隱藏因子數(shù):8或者12//正則化系數(shù),0.01或者0.1選擇,迭代次數(shù)為10或者20,訓(xùn)練8個模型val ranks = List(8,12)val lambdas = List(0.01,0.1)val numiters = List(10,20)var bestModel:Option[MatrixFactorizationModel]=Nonevar bestValidationRmse=Double.MaxValuevar bestRank=0var bestLamdba = -1.0var bestNumIter=1for(rank<-ranks;lambda<-lambdas;numiter<-numiters){println(rank+"-->"+lambda+"-->"+numiter)val model = ALS.train(training,rank,numiter,lambda)val valadationRmse=computeRmse(model,validation)if(valadationRmse<bestValidationRmse){bestModel=Some(model)bestValidationRmse=valadationRmsebestRank=rankbestLamdba=lambdabestNumIter=numiter}}val testRmse = computeRmse(bestModel.get,test)println("測試數(shù)據(jù)的rmse為:"+testRmse)println("范圍內(nèi)的最后模型參數(shù)為:")println("隱藏因子數(shù):"+bestRank)println("正則化參數(shù):"+bestLamdba)println("迭代次數(shù):"+bestNumIter) //步驟5可以對比使用協(xié)同過濾和不適用協(xié)同過濾(使用平均分來做預(yù)測結(jié)果)能提升多大的預(yù)測效果。//計算訓(xùn)練樣本和驗(yàn)證樣本的平均分?jǐn)?shù) val meanR = training.union(validation).map(x=>x.rating).mean()//這就是使用平均分做預(yù)測,test樣本的rmse val baseRmse=math.sqrt(test.map(x=>(meanR-x.rating)*(meanR-x.rating)).mean())val improvement =(baseRmse-testRmse)/baseRmse*100println("使用了ALS協(xié)同過濾算法比使用評價分作為預(yù)測的提升度為:"+improvement)} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
4使用ALS模型進(jìn)行預(yù)測
package org.wq.scala.mlimport org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating} import org.apache.spark.{SparkConf, SparkContext}/*** Created by Administrator on 2016/10/25.*/ object ALSModelTopn {def main(args: Array[String]): Unit = {//給用戶推薦val conf = new SparkConf().setAppName("ALS_mllib_best_param").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")val sc = new SparkContext(conf)Logger.getRootLogger.setLevel(Level.WARN)val movie = sc.textFile("data/mllib/sample_movielens_ratings.txt")val ratings = movie.map(line=>{val fields = line.split("::")val rating = Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)val timestamp =fields(3).toLong%5(rating)})val model= MatrixFactorizationModel.load(sc,"data/mllib/t")//選擇一個用戶val user=5val myRating = ratings.filter(x=>x.user==5)//該用戶已經(jīng)消費(fèi)了的物品val myRateItem = myRating.map(x=>x.product).collect().toSet//給用戶5推薦前評分前10的物品val recommendations = model.recommendProducts(user,10)recommendations.map(x=>{println(x.user+"-->"+x.product+"-->"+x.rating)})} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
提交部署
1提交尋找最優(yōu)參數(shù)的jar
提交部署求最優(yōu)參數(shù)的那個jar,這就把最優(yōu)參數(shù)簡單的打印出來,如果要周期的自迭代更新參數(shù)的話,就寫在數(shù)據(jù)庫或者配置文件中,當(dāng)訓(xùn)練的時候,就從數(shù)據(jù)庫或者配置文件讀。?
首先需要把上面的第三個程序修改一下,修改如下,因?yàn)橐峤唤o集群嘛,所以不能指定master為local了,參數(shù)從命令行傳入。把jar上傳到master節(jié)點(diǎn)的目錄下,data需要上傳到所有的slaves.?
if(args.length!=1){?
println(“請輸入1個參數(shù) 購物籃數(shù)據(jù)路徑”)?
System.exit(0)?
}?
val conf = new SparkConf().setAppName(“ALS_mllib_best_param”)?
以后所有的提交都需要修改conf的,以后就不說了?
jar與數(shù)據(jù)目錄如下:?
?
數(shù)據(jù)長下面這個樣子,用戶id,物品id,評分,時間戳,用戶id和物品id必須是整型,如果你的不是,那么必須進(jìn)行一次映射:?
- 1
- 2
- 3
提交job?
spark-submit –class org.wq.scala.ml.ALSRecommendMllibBestParam –master spark://master:7077 –executor-memory 700m –num-executors 1 /home/jar/ALSRecommendMllibBestParam.jar /home/jar/data/sample_movielens_ratings.txt?
運(yùn)行結(jié)果如下:?
也給大家看下job運(yùn)行的過程?
http://192.168.247.132:4040/jobs/?
2把求得的最好參數(shù)帶入mllib寫的算法中,訓(xùn)練形成模型
提交Job?
spark-submit –class org.wq.scala.ml.ALSRecommendMllib –master spark://master:7077 –executor-memory 700m –num-executors 1 /home/jar/ALSRecommendMllib.jar –rank 8 –numIterations 10 –lambda 0.1 /home/jar/data/sample_movielens_ratings.txt /home/jar/model/AlsModel?
悲劇的報錯了?
這個錯誤很明顯是缺少包spark-examples_2.11-2.0.0.jar,這個包在example目錄下的。?
兩個種解決方法:?
1修改/etc/profile,把example/jars加入classpath.?
2把jar復(fù)制到目錄sparkhome/jars目錄下,因?yàn)閟parkhome/jars目錄下,因?yàn)閟park_home/jars這個目錄在環(huán)境變量中,這里采用第二種.
修改之后的運(yùn)行結(jié)果為:?
3調(diào)用模型,得出推薦
到這里模型就訓(xùn)練好了,這個模型可以定時訓(xùn)練,crontab就可以實(shí)現(xiàn),訓(xùn)練好的模型,使用用戶數(shù)據(jù)預(yù)測分?jǐn)?shù)。?
就不提交到集群運(yùn)行了,因?yàn)檫@是demo而已,真實(shí)應(yīng)該為提供接口,別人來調(diào)用?
總結(jié):?
1矩陣分解的幾種算法?
2spark使用矩陣分解的幾種方式,1ml 包中使用,2mllib包中的使用,其實(shí)有不調(diào)用包自己寫的案列(可以去看看哈,就在example目錄)?
3使用ALS做推薦的一個比較詳細(xì)的流程:1自迭代確定比較優(yōu)的參數(shù)是,2使用參數(shù)訓(xùn)練模型,3使用模型推薦topn的物品給用戶?
4講了怎么自迭代ALS算法參數(shù),感覺這個還重要點(diǎn)?
5提交spark的報了一個錯誤,已經(jīng)錯誤解決方式?
6好多細(xì)節(jié)都沒寫,感覺要寫的有好多,也不是很完善,時間不夠,只是提供了核心代碼和思路
疑問:在做的過程中,我發(fā)現(xiàn)spark的job查看,只有在job運(yùn)行的時候才可以查看,其他時候不行?
http://192.168.247.132:4040/jobs/?
這個應(yīng)該是可以隨時查看的,應(yīng)該是spark的日志和查看jobs的服務(wù)要一直開啟才行,希望對spark集群熟悉的人求解,跪謝
總結(jié)
以上是生活随笔為你收集整理的基于Spark的Als算法+自迭代+Spark2.0新写法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RDD基本转换操作:zipWithInd
- 下一篇: 机器学习手动撸代码系列3-感知机