spark之CF协同过滤
一)、協同過濾
1.1 概念
協同過濾是一種借助"集體計算"的途徑。它利用大量已有的用戶偏好來估計用戶對其未接觸過的物品的喜好程度。其內在思想是相似度的定義
1.2 分類
1.在基于用戶的方法的中,如果兩個用戶表現出相似的偏好(即對相同物品的偏好大體相同),那就認為他們的興趣類似。要對他們中的一個用戶推薦一個未知物品,
?? 便可選取若干與其類似的用戶并根據他們的喜好計算出對各個物品的綜合得分,再以得分來推薦物品。其整體的邏輯是,如果其他用戶也偏好某些物品,那這些物品很可能值得推薦。
? 2. 同樣也可以借助基于物品的方法來做推薦。這種方法通常根據現有用戶對物品的偏好或是評級情況,來計算物品之間的某種相似度。 這時,相似用戶評級相同的那些物品會被認為更相近。一旦有了物品之間的相似度,便可用用戶接觸過的物品來表示這個用戶,然后找出和這些已知物品相似的那些物品,并將這些物品推薦給用戶。同樣,與已有物品相似的物品被用來生成一個綜合得分,而該得分用于評估未知物品的相似度。
二)、矩陣分解
Spark推薦模型庫當前只包含基于矩陣分解(matrix factorization)的實現,由此我們也將重點關注這類模型。它們有吸引人的地方。首先,這些模型在協同過濾中的表現十分出色。而在Netflix Prize等知名比賽中的表現也很拔尖
1,顯式矩陣分解
?? 要找到和“用戶 物品”矩陣近似的k維(低階)矩陣,最終要求出如下兩個矩陣:一個用于表示用戶的U × k維矩陣,以及一個表示物品的I × k維矩陣。這兩個矩陣也稱作因子矩陣。它們的乘積便是原始評級矩陣的一個近似。值得注意的是,原始評級矩陣通常很稀疏,但因子矩陣卻是稠密的。
特點:因子分解類模型的好處在于,一旦建立了模型,對推薦的求解便相對容易。但也有弊端,即當用戶和物品的數量很多時,其對應的物品或是用戶的因子向量可能達到數以百萬計。這將在存儲和計算能力上帶來挑戰。另一個好處是,這類模型的表現通常都很出色。
2,隱式矩陣分解(關聯因子分確定,可能隨時會變化)
隱式模型仍然會創建一個用戶因子矩陣和一個物品因子矩陣。但是,模型所求解的是偏好矩陣而非評級矩陣的近似。類似地,此時用戶因子向量和物品因子向量的點積所得到的分數,也不再是一個對評級的估值,而是對某個用戶對某一物品偏好的估值(該值的取值雖并不嚴格地處于0到1之間,但十分趨近于這個區間)
3,最小二乘法(Alternating Least Squares ? ?ALS):解決矩陣分解的最優化方法
? ALS的實現原理是迭代式求解一系列最小二乘回歸問題。在每一次迭代時,固定用戶因子矩陣或是物品因子矩陣中的一個,然后用固定的這個矩陣以及評級數據來更新另一個矩陣。之后,被更新的矩陣被固定住,再更新另外一個矩陣。如此迭代,直到模型收斂(或是迭代了預設好的次數)。
三)、Spark下ALS算法的應用
1,數據來源電影集ml-100k
2,代碼實現?
??????spark中ALS是居于矩陣分解方法,既不是user_base也不是items_base,下面中的代碼注釋是有問題的,這里是轉載的? https://www.cnblogs.com/ksWorld/p/6808092.html?
?
package com.spark.milb.study
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.{SparkConf, SparkContext}
import org.jblas.DoubleMatrix
/**
* Created by hadoop on 17-5-3.
* 協同過濾(處理對象movie,使用算法ALS:最小二乘法(實現用戶推薦)
* 余弦相似度實現商品相似度推薦
*/
object cfTest {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf=new SparkConf().setMaster("local").setAppName("AlsTest")
val sc=new SparkContext(conf)
CF(sc,"ml-100k/u.data")
}
def CF(sc:SparkContext,fileName:String): Unit ={
val movieFile=sc.textFile(fileName)
val RatingDatas=movieFile.map(_.split("\t").take(3))
//轉為Ratings數據
val ratings=RatingDatas.map(x =>Rating(x(0).toInt,x(1).toInt,x(2).toDouble))
//獲取用戶評價模型,設置k因子,和迭代次數,隱藏因子lambda,獲取模型
/*
* ? rank :對應ALS模型中的因子個數,也就是在低階近似矩陣中的隱含特征個數。因子個
數一般越多越好。但它也會直接影響模型訓練和保存時所需的內存開銷,尤其是在用戶
和物品很多的時候。因此實踐中該參數常作為訓練效果與系統開銷之間的調節參數。通
常,其合理取值為10到200。
iterations :對應運行時的迭代次數。ALS能確保每次迭代都能降低評級矩陣的重建誤
差,但一般經少數次迭代后ALS模型便已能收斂為一個比較合理的好模型。這樣,大部分
情況下都沒必要迭代太多次(10次左右一般就挺好)。
lambda :該參數控制模型的正則化過程,從而控制模型的過擬合情況。其值越高,正則
化越嚴厲。該參數的賦值與實際數據的大小、特征和稀疏程度有關。和其他的機器學習
模型一樣,正則參數應該通過用非樣本的測試數據進行交叉驗證來調整。
* */
val model=ALS.train(ratings,50,10,0.01)
//基于用戶相似度推薦
println("userNumber:"+model.userFeatures.count()+"\t"+"productNum:"+model.productFeatures.count())
//指定用戶及商品,輸出預測值
println(model.predict(789,123))
//為指定用戶推薦的前N商品
model.recommendProducts(789,11).foreach(println(_))
//為每個人推薦前十個商品
model.recommendProductsForUsers(10).take(1).foreach{
case(x,rating) =>println(rating(0))
}
//基于商品相似度(使用余弦相似度)進行推薦,獲取某個商品的特征值
val itemFactory=model.productFeatures.lookup(567).head
val itemVector=new DoubleMatrix(itemFactory)
//求余弦相似度
val sim=model.productFeatures.map{
case(id,factory)=>
val factorVector=new DoubleMatrix(factory)
val sim=cosineSimilarity(factorVector,itemVector)
(id,sim)
}
val sortedsim=sim.top(11)(Ordering.by[(Int,Double),Double]{
case(id,sim)=>sim
})
println(sortedsim.take(10).mkString("\n"))
//模型評估,通過均誤差
//實際用戶評估值
val actualRatings=ratings.map{
case Rating(user,item,rats) => ((user,item),rats)
}
val userItems=ratings.map{
case(Rating(user,item,rats)) => (user,item)
}
//模型的用戶對商品的預測值
val predictRatings=model.predict(userItems).map{
case(Rating(user,item,rats)) =>((user,item),rats)
}
//聯合獲取rate值
val rates=actualRatings.join(predictRatings).map{
case x =>(x._2._1,x._2._2)
}
//求均方差
val regressionMetrics=new RegressionMetrics(rates)
//越接近0越佳
println(regressionMetrics.meanSquaredError)
//全局平均準確率(MAP)
val itemFactors = model.productFeatures.map { case (id, factor)
=> factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
//分布式廣播商品的特征矩陣
val imBroadcast = sc.broadcast(itemMatrix)
//計算每一個用戶的推薦,在這個操作里,會對用戶因子矩陣和電影因子矩陣做乘積,其結果為一個表示各個電影預計評級的向量(長度為
//1682,即電影的總數目)
val allRecs = model.userFeatures.map{ case (userId, array) =>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val recommendedIds = sortedWithId.map(_._2 + 1).toSeq //+1,矩陣從0開始
(userId, recommendedIds)
}
//實際評分
val userMovies = ratings.map{ case Rating(user, product, rating) =>
(user, product)}.groupBy(_._1)
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case
(userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
}
//求MAP,越大越好吧
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
}
//余弦相似度計算
def cosineSimilarity(vec1:DoubleMatrix,vec2:DoubleMatrix):Double={
vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())
}
}
?
以下轉載:http://blog.51cto.com/snglw/1662153
總結
??????這樣,一個簡單的基于模型的電影推薦應用就算OK了。
??實時推薦架構分析
????????上面,實現了簡單的推薦系統應用,但是,僅僅實現用戶的定向推薦,在實際應用中價值不是非常大,如果體現價值,最好能夠實現實時或者準實時推薦。
????????下面,簡單介紹下實時推薦的一個架構:
????????
????????
????????該架構圖取自淘寶Spark On Yarn的實時架構,這里,給出一些個人的觀點:
? ? ? ? 架構圖分為三層:離線、近線和在線。
????????????離線部分:主要實現模型的建立。原始數據通過ETL加工清洗,得到目標數據,目標業務數據結合合適的算法,學習訓練模型,得到最佳的模型。
????????????近線部分:主要使用HBase存儲用戶行為信息,模型混合系統綜合顯性反饋和隱性反饋的模型處理結果,將最終的結果推薦給用戶。
????????????在線部分:這里,主要有兩種反饋,顯性和隱性,個人理解,顯性反饋理解為用戶將商品加入購物車,用戶購買商品這些用戶行為;隱性反饋理解為用戶在某個商品上停留的時間,用戶點擊哪些商品這些用戶行為。這里,為了實現實時/準實時操作,使用到了Spark Streaming對數據進行實時處理。(有可能是Flume+Kafka+Spark Streaming架構)
總結
以上是生活随笔為你收集整理的spark之CF协同过滤的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何用java语言调用tensorflo
- 下一篇: eclipse+scala+java+m