日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

机器学习算法--ALS

發(fā)布時間:2024/7/5 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 机器学习算法--ALS 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

本文轉(zhuǎn)載自:endymecy|ALS

一、什么是ALS

ALS是交替最小二乘(alternating least squares)的簡稱。在機器學(xué)習(xí)中,ALS特指使用交替最小二乘求解的一個協(xié)同推薦算法。它通過觀察到的所有用戶給商品的打分,來推斷每個用戶的喜好并向用戶推薦適合的商品。舉個例子,我們看下面一個8*8的用戶打分矩陣。

????這個矩陣的每一行代表一個用戶(u1,u2,…,u8)、每一列代表一個商品(v1,v2,…,v8)、用戶的打分為1-9分。這個矩陣只顯示了觀察到的打分,我們需要推測沒有觀察到的打分。比如(u6,v5)打分多少?如果以數(shù)獨的方式來解決這個問題,可以得到唯一的結(jié)果。 因為數(shù)獨的規(guī)則很強,每添加一條規(guī)則,就讓整個系統(tǒng)的自由度下降一個量級。當我們滿足所有的規(guī)則時,整個系統(tǒng)的自由度就降為1了,也就得出了唯一的結(jié)果。對于上面的打分矩陣,如果我們不添加任何條件的話,也即打分之間是相互獨立的,我們就沒法得到(u6,v5)的打分。 所以在這個用戶打分矩陣的基礎(chǔ)上,我們需要提出一個限制其自由度的合理假設(shè),使得我們可以通過觀察已有打分來猜測未知打分。

????ALS的核心就是這樣一個假設(shè):打分矩陣是近似低秩的。換句話說,就是一個(m,n)的打分矩陣可以由分解的兩個小矩陣U(m,k)和V(k,n)的乘積來近似,即A=UVT,k<=m,n。這就是ALS的矩陣分解方法。這樣我們把系統(tǒng)的自由度從O(mn)降到了O((m+n)k)。

????那么ALS的低秩假設(shè)為什么是合理的呢?我們描述一個人的喜好經(jīng)常是在一個抽象的低維空間上進行的,并不需要一一列出他喜好的事物。例如,我喜好看偵探影片,可能代表我喜歡《神探夏洛特》、《神探狄仁杰》等。這些影片都符合我對自己喜好的描述,也就是說他們在這個抽象的低維空間的投影和我的喜好相似。 再抽象一些來描述這個問題,我們把某個人的喜好映射到了低維向量ui上,同時將某個影片的特征映射到了維度相同的向量vj上,那么這個人和這個影片的相似度就可以表述成這兩個向量之間的內(nèi)積uTivj 。 我們把打分理解成相似度,那么打分矩陣A就可以由用戶喜好矩陣和產(chǎn)品特征矩陣的乘積UVTUV^TUVT來近似了。

????低維空間的選取是一個問題。這個低維空間要能夠很好的區(qū)分事物,那么就需要一個明確的可量化目標,這就是重構(gòu)誤差。在ALS中我們使用F范數(shù)來量化重構(gòu)誤差,就是每個元素重構(gòu)誤差的平方和。這里存在一個問題,我們只觀察到部分打分,A中的大量未知元是我們想推斷的,所以這個重構(gòu)誤差是包含未知數(shù)的。 解決方案很簡單:只計算已知打分的重構(gòu)誤差。

后面的章節(jié)我們將從原理上講解spark中實現(xiàn)的ALS模型。

二、spark中ALS的實現(xiàn)原理

????Spark利用交換最小二乘解決矩陣分解問題分兩種情況:數(shù)據(jù)集是顯式反饋和數(shù)據(jù)集是隱式反饋。由于隱式反饋算法的原理是在顯示反饋算法原理的基礎(chǔ)上作的修改,所以我們在此只會具體講解數(shù)據(jù)集為隱式反饋的算法。 算法實現(xiàn)所依據(jù)的文獻見參考文獻【1】。

2.1 介紹

????從廣義上講,推薦系統(tǒng)基于兩種不同的策略:基于內(nèi)容的方法和基于協(xié)同過濾的方法。Spark中使用協(xié)同過濾的方式。協(xié)同過濾分析用戶以及用戶相關(guān)的產(chǎn)品的相關(guān)性,用以識別新的用戶-產(chǎn)品相關(guān)性。協(xié)同過濾系統(tǒng)需要的唯一信息是用戶過去的行為信息,比如對產(chǎn)品的評價信息。協(xié)同過濾是領(lǐng)域無關(guān)的,所以它可以方便解決基于內(nèi)容方法難以解決的許多問題。

????推薦系統(tǒng)依賴不同類型的輸入數(shù)據(jù),最方便的是高質(zhì)量的顯式反饋數(shù)據(jù),它們包含用戶對感興趣商品明確的評價。例如,Netflix收集的用戶對電影評價的星星等級數(shù)據(jù)。但是顯式反饋數(shù)據(jù)不一定總是找得到,因此推薦系統(tǒng)可以從更豐富的隱式反饋信息中推測用戶的偏好。 隱式反饋類型包括購買歷史、瀏覽歷史、搜索模式甚至鼠標動作。例如,購買同一個作者許多書的用戶可能喜歡這個作者。

????許多研究都集中在處理顯式反饋,然而在很多應(yīng)用場景下,應(yīng)用程序重點關(guān)注隱式反饋數(shù)據(jù)。因為可能用戶不愿意評價商品或者由于系統(tǒng)限制我們不能收集顯式反饋數(shù)據(jù)。在隱式模型中,一旦用戶允許收集可用的數(shù)據(jù),在客戶端并不需要額外的顯式數(shù)據(jù)。文獻中的系統(tǒng)避免主動地向用戶收集顯式反饋信息,所以系統(tǒng)僅僅依靠隱式信息。

了解隱式反饋的特點非常重要,因為這些特質(zhì)使我們避免了直接調(diào)用基于顯式反饋的算法。最主要的特點有如下幾種:

(1) 沒有負反饋。通過觀察用戶行為,我們可以推測那個商品他可能喜歡,然后購買,但是我們很難推測哪個商品用戶不喜歡。這在顯式反饋算法中并不存在,因為用戶明確告訴了我們哪些他喜歡哪些他不喜歡。

(2)隱式反饋是內(nèi)在的噪音。雖然我們拼命的追蹤用戶行為,但是我們僅僅只是猜測他們的偏好和真實動機。例如,我們可能知道一個人的購買行為,但是這并不能完全說明偏好和動機,因為這個商品可能作為禮物被購買而用戶并不喜歡它。

(3)顯示反饋的數(shù)值值表示偏好(preference),隱式回饋的數(shù)值值表示信任(confidence)?;陲@示反饋的系統(tǒng)用星星等級讓用戶表達他們的喜好程度,例如一顆星表示很不喜歡,五顆星表示非常喜歡。基于隱式反饋的數(shù)值值描述的是動作的頻率,例如用戶購買特定商品的次數(shù)。一個較大的值并不能表明更多的偏愛。但是這個值是有用的,它描述了在一個特定觀察中的信任度。 一個發(fā)生一次的事件可能對用戶偏愛沒有用,但是一個周期性事件更可能反映一個用戶的選擇。

(4)評價隱式反饋推薦系統(tǒng)需要合適的手段。

2.2 顯式反饋模型

????潛在因素模型由一個針對協(xié)同過濾的交替方法組成,它以一個更加全面的方式發(fā)現(xiàn)潛在特征來解釋觀察的ratings數(shù)據(jù)。我們關(guān)注的模型由奇異值分解(SVD)推演而來。一個典型的模型將每個用戶u(包含一個用戶-因素向量uiu_iui?)和每個商品v(包含一個商品-因素向量vjv_jvj?)聯(lián)系起來。 預(yù)測通過內(nèi)積rij=uiTvjr_{ij}=u^{T}_{i}v_jrij?=uiT?vj?來實現(xiàn)。另一個需要關(guān)注的地方是參數(shù)估計。許多當前的工作都應(yīng)用到了顯式反饋數(shù)據(jù)集中,這些模型僅僅基于觀察到的rating數(shù)據(jù)直接建模,同時通過一個適當?shù)恼齽t化來避免過擬合。公式如下:

????在公式(2.1)中,lambda是正則化的參數(shù)。正規(guī)化是為了防止過擬合的情況發(fā)生,具體參見文獻【3】。這樣,我們用最小化重構(gòu)誤差來解決協(xié)同推薦問題。我們也成功將推薦問題轉(zhuǎn)換為了最優(yōu)化問題。

2.3 隱式反饋模型

????在顯式反饋的基礎(chǔ)上,我們需要做一些改動得到我們的隱式反饋模型。首先,我們需要形式化由rijr_{ij}rij?變量衡量的信任度的概念。我們引入了一組二元變量pijp_{ij}pij? ,它表示用戶u對商品v的偏好。pijp_{ij}pij?的公式如下:

????換句話說,如果用戶購買了商品,我們認為用戶喜歡該商品,否則我們認為用戶不喜歡該商品。然而我們的信念(beliefs)與變化的信任(confidence)等級息息相關(guān)。首先,很自然的,pij的值為0和低信任有關(guān)。用戶對一個商品沒有得到一個正的偏好可能源于多方面的原因,并不一定是不喜歡該商品。例如,用戶可能并不知道該商品的存在。 另外,用戶購買一個商品也并不一定是用戶喜歡它。因此我們需要一個新的信任等級來顯示用戶偏愛某個商品。一般情況下,rijr_{ij}rij?越大,越能暗示用戶喜歡某個商品。因此,我們引入了一組變量cijc_{ij}cij?,它衡量了我們觀察到pijp_{ij}pij?的信任度。cijc_{ij}cij?一個合理的選擇如下所示:

????按照這種方式,我們存在最小限度的信任度,并且隨著我們觀察到的正偏向的證據(jù)越來越多,信任度也會越來越大。

????我們的目的是找到用戶向量ui以及商品向量vj來表明用戶偏好。這些向量分別是用戶因素(特征)向量和商品因素(特征)向量。本質(zhì)上,這些向量將用戶和商品映射到一個公用的隱式因素空間,從而使它們可以直接比較。這和用于顯式數(shù)據(jù)集的矩陣分解技術(shù)類似,但是包含兩點不一樣的地方: (1)我們需要考慮不同的信任度,(2)最優(yōu)化需要考慮所有可能的u,v對,而不僅僅是和觀察數(shù)據(jù)相關(guān)的u,v對。顯性反饋的矩陣分解優(yōu)化時,對于missing data(沒有評分),是不會當做訓(xùn)練數(shù)據(jù)輸入到模型的,優(yōu)化時針對已知評分數(shù)據(jù)優(yōu)化。而這里隱性反饋,是利用所有可能的u,i鍵值對,所以總的數(shù)據(jù)是m*n,其中m是用戶數(shù)量,n是物品數(shù)量。這里沒有所謂的missing data,因為假如u對i沒有任何動作,我們就認為偏好值為0,只不過置信度較低而已。因此,通過最小化下面的損失函數(shù)來計算相關(guān)因素(factors)。

2.4 求解最小化損失函數(shù)

????考慮到損失函數(shù)包含mn個元素,m是用戶的數(shù)量,n是商品的數(shù)量。一般情況下,mn可以到達幾百億。這么多的元素應(yīng)該避免使用隨機梯度下降法來求解,因此,spark選擇使用交替最優(yōu)化方式求解。

????公式(2.1)和公式(2.4)是非凸函數(shù),無法求解最優(yōu)解。但是,固定公式中的用戶-特征向量或者商品-特征向量,公式就會變成二次方程,可以求出全局的極小值。交替最小二乘的計算過程是:交替的重新計算用戶-特征向量和商品-特征向量,每一步都保證降低損失函數(shù)的值,直到找到極小值。 交替最小二乘法的處理過程如下所示:

三、ALS在spark中的實現(xiàn)

????在spark的源代碼中,ALS算法實現(xiàn)于org.apache.spark.ml.recommendation.ALS.scala文件中。我們以官方文檔中的例子為起點,來分析ALS算法的分布式實現(xiàn)。下面是官方的例子:

//處理訓(xùn)練數(shù)據(jù) val data = sc.textFile("data/mllib/als/test.data") val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>Rating(user.toInt, item.toInt, rate.toDouble) }) // 使用ALS訓(xùn)練推薦模型 val rank = 10 val numIterations = 10 val model = ALS.train(ratings, rank, numIterations, 0.01)

從代碼中我們知道,訓(xùn)練模型用到了ALS.scala文件中的train方法,下面我們將詳細介紹train方法的實現(xiàn)。在此之前,我們先了解一下train方法的參數(shù)表示的含義。

def train( ratings: RDD[Rating[ID]], //訓(xùn)練數(shù)據(jù)rank: Int = 10, //隱含特征數(shù)numUserBlocks: Int = 10, //分區(qū)數(shù)numItemBlocks: Int = 10,maxIter: Int = 10, //迭代次數(shù)regParam: Double = 1.0,implicitPrefs: Boolean = false,alpha: Double = 1.0,nonnegative: Boolean = false,intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,checkpointInterval: Int = 10,seed: Long = 0L): MatrixFactorizationModel

以上定義中,ratings指用戶提供的訓(xùn)練數(shù)據(jù),它包括用戶id集、商品id集以及相應(yīng)的打分集。rank表示隱含因素的數(shù)量,也即特征的數(shù)量。numUserBlocks和numItemBlocks分別指用戶和商品的塊數(shù)量,即分區(qū)數(shù)量。maxIter表示迭代次數(shù)。regParam表示最小二乘法中l(wèi)ambda值的大小。 implicitPrefs表示我們的訓(xùn)練數(shù)據(jù)是否是隱式反饋數(shù)據(jù)。Nonnegative表示求解的最小二乘的值是否是非負,根據(jù)Nonnegative的值的不同,spark使用了不同的求解方法。

下面我們分步驟分析train方法的處理流程。
??
(1) 初始化ALSPartitioner和LocalIndexEncoder。

ALSPartitioner實現(xiàn)了基于hash的分區(qū),它根據(jù)用戶或者商品id的hash值來進行分區(qū)。LocalIndexEncoder對(blockid,localindex)即(分區(qū)id,分區(qū)內(nèi)索引)進行編碼,并將其轉(zhuǎn)換為一個整數(shù),這個整數(shù)在高位存分區(qū)ID,在低位存對應(yīng)分區(qū)的索引,在空間上盡量做到了不浪費。 同時也可以根據(jù)這個轉(zhuǎn)換的整數(shù)分別獲得blockid和localindex。這兩個對象在后續(xù)的代碼中會用到。

val userPart = new ALSPartitioner(numUserBlocks) val itemPart = new ALSPartitioner(numItemBlocks) val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)//ALSPartitioner即HashPartitioner class HashPartitioner(partitions: Int) extends Partitioner {def numPartitions: Int = partitionsdef getPartition(key: Any): Int = key match {case null => 0case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)}override def equals(other: Any): Boolean = other match {case h: HashPartitioner =>h.numPartitions == numPartitionscase _ =>false}override def hashCode: Int = numPartitions }//LocalIndexEncoder private[recommendation] class LocalIndexEncoder(numBlocks: Int) extends Serializable {private[this] final val numLocalIndexBits =math.min(java.lang.Integer.numberOfLeadingZeros(numBlocks - 1), 31)//左移(<<,相當于乘2),右移(>>,相當于除2)和無符號右移(>>>,無符號右移,忽略符號位,空位都以0補齊)private[this] final val localIndexMask = (1 << numLocalIndexBits) - 1//encodeIndex高位存分區(qū)ID,在低位存對應(yīng)分區(qū)的索引def encode(blockId: Int, localIndex: Int): Int = {(blockId << numLocalIndexBits) | localIndex}@inlinedef blockId(encoded: Int): Int = {encoded >>> numLocalIndexBits}@inlinedef localIndex(encoded: Int): Int = {encoded & localIndexMask}}

(2) 根據(jù)nonnegative參數(shù)選擇解決矩陣分解的方法。

如果需要解的值為非負,即nonnegative為true,那么用非負最小二乘(NNLS)來解,如果沒有這個限制,用喬里斯基(Cholesky)分解來解。

val solver = if (nonnegative) new NNLSSolver else new CholeskySolver

喬里斯基分解分解是把一個對稱正定的矩陣表示成一個上三角矩陣U的轉(zhuǎn)置和其本身的乘積的分解。在ml代碼中,直接調(diào)用netlib-java封裝的dppsv方法實現(xiàn)。

lapack.dppsv(“u”, k, 1, ne.ata, ne.atb, k, info)

可以深入dppsv代碼(Fortran代碼)了解更深的細節(jié)。我們分析的重點是非負正則化最小二乘的實現(xiàn),因為在某些情況下,方程組的解為負數(shù)是沒有意義的。雖然方程組可以得到精確解,但卻不能取負值解。在這種情況下,其非負最小二乘解比方程的精確解更有意義。NNLS在最優(yōu)化模塊會作詳細講解。

(3) 將ratings數(shù)據(jù)轉(zhuǎn)換為分區(qū)的格式。

將ratings數(shù)據(jù)轉(zhuǎn)換為分區(qū)的形式,即((用戶分區(qū)id,商品分區(qū)id),分區(qū)數(shù)據(jù)集blocks))的形式,并緩存到內(nèi)存中。其中分區(qū)id的計算是通過ALSPartitioner的getPartitions方法獲得的,分區(qū)數(shù)據(jù)集由RatingBlock組成, 它表示(用戶分區(qū)id,商品分區(qū)id )對所對應(yīng)的用戶id集,商品id集,以及打分集,即(用戶id集,商品id集,打分集)。

val blockRatings = partitionRatings(ratings, userPart, itemPart).persist(intermediateRDDStorageLevel)//以下是partitionRatings的實現(xiàn)//默認是10*10val numPartitions = srcPart.numPartitions * dstPart.numPartitionsratings.mapPartitions { iter =>val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID])iter.flatMap { r =>val srcBlockId = srcPart.getPartition(r.user)val dstBlockId = dstPart.getPartition(r.item)//當前builder的索引位置val idx = srcBlockId + srcPart.numPartitions * dstBlockIdval builder = builders(idx)builder.add(r)//如果某個builder的數(shù)量大于2048,那么構(gòu)建一個分區(qū)if (builder.size >= 2048) { // 2048 * (3 * 4) = 24kbuilders(idx) = new RatingBlockBuilder//單元素集合Iterator.single(((srcBlockId, dstBlockId), builder.build()))} else {Iterator.empty}} ++ {builders.view.zipWithIndex.filter(_._1.size > 0).map { case (block, idx) =>//用戶分區(qū)idval srcBlockId = idx % srcPart.numPartitions//商品分區(qū)idval dstBlockId = idx / srcPart.numPartitions((srcBlockId, dstBlockId), block.build())}}}.groupByKey().mapValues { blocks =>val builder = new RatingBlockBuilder[ID]blocks.foreach(builder.merge)builder.build()}.setName("ratingBlocks")}

(4)獲取inblocks和outblocks數(shù)據(jù)。

獲取inblocks和outblocks數(shù)據(jù)是數(shù)據(jù)處理的重點。我們知道,通信復(fù)雜度是分布式實現(xiàn)一個算法時要重點考慮的問題,不同的實現(xiàn)可能會對性能產(chǎn)生很大的影響。我們假設(shè)最壞的情況:即求解商品需要的所有用戶特征都需要從其它節(jié)點獲得。 如下圖3.1所示,求解v1需要獲得u1,u2,求解v2需要獲得u1,u2,u3等,在這種假設(shè)下,每步迭代所需的交換數(shù)據(jù)量是O(m*rank),其中m表示所有觀察到的打分集大小,rank表示特征數(shù)量。

這里寫圖片描述

從圖3.1中,我們知道,如果計算v1和v2是在同一個分區(qū)上進行的,那么我們只需要把u1和u2一次發(fā)給這個分區(qū)就好了,而不需要將u2分別發(fā)給v1,v2,這樣就省掉了不必要的數(shù)據(jù)傳輸。

圖3.2描述了如何在分區(qū)的情況下通過U來求解V,注意節(jié)點之間的數(shù)據(jù)交換量減少了。使用這種分區(qū)結(jié)構(gòu),我們需要在原始打分數(shù)據(jù)的基礎(chǔ)上額外保存一些信息。

這里寫圖片描述

在Q1中,我們需要知道和v1相關(guān)聯(lián)的用戶向量及其對應(yīng)的打分,從而構(gòu)建最小二乘問題并求解。這部分數(shù)據(jù)不僅包含原始打分數(shù)據(jù),還包含從每個用戶分區(qū)收到的向量排序信息,在代碼里稱作InBlock。在P1中,我們要知道把u1,u2 發(fā)給Q1。我們可以查看和u1相關(guān)聯(lián)的所有產(chǎn)品來確定需要把u1發(fā)給誰,但每次迭代都掃一遍數(shù)據(jù)很不劃算,所以在spark的實現(xiàn)中只計算一次這個信息,然后把結(jié)果通過RDD緩存起來重復(fù)使用。這部分數(shù)據(jù)我們在代碼里稱作OutBlock。 所以從U求解V,我們需要通過用戶的OutBlock信息把用戶向量發(fā)給商品分區(qū),然后通過商品的InBlock信息構(gòu)建最小二乘問題并求解。從V求解U,我們需要商品的OutBlock信息和用戶的InBlock信息。所有的InBlock和OutBlock信息在迭代過程中都通過RDD緩存。打分數(shù)據(jù)在用戶的InBlock和商品的InBlock各存了一份,但分區(qū)方式不同。這么做可以避免在迭代過程中原始數(shù)據(jù)的交換。

下面介紹獲取InBlock和OutBlock的方法。下面的代碼用來分別獲取用戶和商品的InBlock和OutBlock。

val (userInBlocks, userOutBlocks) =makeBlocks("user", blockRatings,userPart, itemPart,intermediateRDDStorageLevel) //交換userBlockId和itemBlockId以及其對應(yīng)的數(shù)據(jù) val swappedBlockRatings = blockRatings.map {case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) =>((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } val (itemInBlocks, itemOutBlocks) =makeBlocks("item", swappedBlockRatings,itemPart, userPart,intermediateRDDStorageLevel)

我們會以求商品的InBlock以及用戶的OutBlock為例來分析makeBlocks方法。因為在第(5)步中構(gòu)建最小二乘的講解中,我們會用到這兩部分數(shù)據(jù)。

下面的代碼用來求商品的InBlock信息。

val inBlocks = ratingBlocks.map {case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) =>val start = System.nanoTime()val dstIdSet = new OpenHashSet[ID](1 << 20)//將用戶id保存到hashset中,用來去重dstIds.foreach(dstIdSet.add)val sortedDstIds = new Array[ID](dstIdSet.size)var i = 0var pos = dstIdSet.nextPos(0)while (pos != -1) {sortedDstIds(i) = dstIdSet.getValue(pos)pos = dstIdSet.nextPos(pos + 1)i += 1}//對用戶id進行排序Sorting.quickSort(sortedDstIds)val dstIdToLocalIndex = new OpenHashMap[ID, Int](sortedDstIds.length)i = 0while (i < sortedDstIds.length) {dstIdToLocalIndex.update(sortedDstIds(i), i)i += 1}//求取塊內(nèi),用戶id的本地位置val dstLocalIndices = dstIds.map(dstIdToLocalIndex.apply)//返回數(shù)據(jù)集(srcBlockId, (dstBlockId, srcIds, dstLocalIndices, ratings)) }.groupByKey(new ALSPartitioner(srcPart.numPartitions)).mapValues { iter =>val builder =new UncompressedInBlockBuilder[ID](new LocalIndexEncoder(dstPart.numPartitions))iter.foreach { case (dstBlockId, srcIds, dstLocalIndices, ratings) =>builder.add(dstBlockId, srcIds, dstLocalIndices, ratings)}//構(gòu)建非壓縮塊,并壓縮為InBlockbuilder.build().compress()}.setName(prefix + "InBlocks").persist(storageLevel)

這段代碼首先對ratingBlocks數(shù)據(jù)集作map操作,將ratingBlocks轉(zhuǎn)換成(商品分區(qū)id,(用戶分區(qū)id,商品集合,用戶id在分區(qū)中相對應(yīng)的位置,打分)這樣的集合形式。然后對這個數(shù)據(jù)集作groupByKey操作,以商品分區(qū)id為key值,處理key對應(yīng)的值,將數(shù)據(jù)集轉(zhuǎn)換成(商品分區(qū)id,InBlocks)的形式。 這里值得我們?nèi)シ治龅氖禽斎雺K(InBlock)的結(jié)構(gòu)。為簡單起見,我們用圖3.2為例來說明輸入塊的結(jié)構(gòu)。

以Q1為例,我們需要知道關(guān)于v1和v2的所有打分:(v1, u1, r11),(v2, u1, r12), (v1, u2, r21), (v2, u2, r22), (v2, u3, r32),把這些項以Tuple的形式存儲會存在問題,第一,Tuple有額外開銷,每個Tuple實例都需要一個指針,而每個Tuple所存的數(shù)據(jù)不過是兩個ID和一個打分; 第二,存儲大量的Tuple會降低垃圾回收的效率。所以spark實現(xiàn)中,是使用三個數(shù)組來存儲打分的,如([v1, v2, v1, v2, v2], [u1, u1, u2, u2, u3], [r11, r12, r21, r22, r32])。這樣不僅大幅減少了實例數(shù)量,還有效地利用了連續(xù)內(nèi)存。

但是,光這么做并不夠,spark代碼實現(xiàn)中,并沒有存儲用戶的真實id,而是存儲的使用LocalIndexEncoder生成的編碼,這樣節(jié)省了空間,格式為UncompressedInBlock:(商品id集,用戶id集對應(yīng)的編碼集,打分集), 如,([v1, v2, v1, v2, v2], [ui1, ui1, ui2, ui2, ui3], [r11, r12, r21, r22, r32])。這種結(jié)構(gòu)仍舊有壓縮的空間,spark調(diào)用compress方法將商品id進行排序(排序有兩個好處,除了壓縮以外,后文構(gòu)建最小二乘也會因此受益), 并且轉(zhuǎn)換為(不重復(fù)的有序的商品id集,商品位置偏移集,用戶id集對應(yīng)的編碼集,打分集)的形式,以獲得更優(yōu)的存儲效率(代碼中就是將矩陣的coo格式轉(zhuǎn)換為csc格式,你可以更進一步了解矩陣存儲,以獲得更多信息)。 以這樣的格式修改([v1, v2, v1, v2, v2], [ui1, ui1, ui2, ui2, ui3], [r11, r12, r21, r22, r32]),得到的結(jié)果是([v1, v2], [0, 2, 5], [ui1, ui2, ui1, ui2, ui3], [r11, r21, r12, r22, r32])。其中[0, 2]指v1對應(yīng)的打分的區(qū)間是[0, 2],[2, 5]指v2對應(yīng)的打分的區(qū)間是[2, 5]。

Compress方法利用spark內(nèi)置的Timsort算法將UncompressedInBlock進行排序并轉(zhuǎn)換為InBlock。代碼如下所示:

def compress(): InBlock[ID] = {val sz = length//Timsort排序sort()val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[ID]val dstCountsBuilder = mutable.ArrayBuilder.make[Int]var preSrcId = srcIds(0)uniqueSrcIdsBuilder += preSrcIdvar curCount = 1var i = 1var j = 0while (i < sz) {val srcId = srcIds(i)if (srcId != preSrcId) {uniqueSrcIdsBuilder += srcIddstCountsBuilder += curCountpreSrcId = srcIdj += 1curCount = 0}curCount += 1i += 1}dstCountsBuilder += curCountval uniqueSrcIds = uniqueSrcIdsBuilder.result()val numUniqueSrdIds = uniqueSrcIds.lengthval dstCounts = dstCountsBuilder.result()val dstPtrs = new Array[Int](numUniqueSrdIds + 1)var sum = 0i = 0//計算偏移量while (i < numUniqueSrdIds) {sum += dstCounts(i)i += 1dstPtrs(i) = sum}InBlock(uniqueSrcIds, dstPtrs, dstEncodedIndices, ratings) } private def sort(): Unit = {val sz = lengthval sortId = Utils.random.nextInt()val sorter = new Sorter(new UncompressedInBlockSort[ID])sorter.sort(this, 0, length, Ordering[KeyWrapper[ID]])}

下面的代碼用來求用戶的OutBlock信息。

val outBlocks = inBlocks.mapValues { case InBlock(srcIds, dstPtrs, dstEncodedIndices, _) =>val encoder = new LocalIndexEncoder(dstPart.numPartitions)val activeIds = Array.fill(dstPart.numPartitions)(mutable.ArrayBuilder.make[Int])var i = 0val seen = new Array[Boolean](dstPart.numPartitions)while (i < srcIds.length) {var j = dstPtrs(i)ju.Arrays.fill(seen, false)while (j < dstPtrs(i + 1)) {val dstBlockId = encoder.blockId(dstEncodedIndices(j))if (!seen(dstBlockId)) {activeIds(dstBlockId) += i seen(dstBlockId) = true}j += 1}i += 1}activeIds.map { x =>x.result()} }.setName(prefix + "OutBlocks").persist(storageLevel)

這段代碼中,inBlocks表示用戶的輸入分區(qū)塊,格式為(用戶分區(qū)id,(不重復(fù)的用戶id集,用戶位置偏移集,商品id集對應(yīng)的編碼集,打分集))。 activeIds表示商品分區(qū)中涉及的用戶id集,也即上文所說的需要發(fā)送給確定的商品分區(qū)的用戶信息。activeIds是一個二維數(shù)組,第一維表示分區(qū),第二維表示用戶id集。用戶OutBlocks的最終格式是(用戶分區(qū)id,OutBlocks)。

通過用戶的OutBlock把用戶信息發(fā)給商品分區(qū),然后結(jié)合商品的InBlock信息構(gòu)建最小二乘問題,我們就可以借此解得商品的極小解。反之,通過商品OutBlock把商品信息發(fā)送給用戶分區(qū),然后結(jié)合用戶的InBlock信息構(gòu)建最小二乘問題,我們就可以解得用戶解。 第(6)步會詳細介紹如何構(gòu)建最小二乘。

(5)初始化用戶特征矩陣和商品特征矩陣。

交換最小二乘算法是分別固定用戶特征矩陣和商品特征矩陣來交替計算下一次迭代的商品特征矩陣和用戶特征矩陣。通過下面的代碼初始化第一次迭代的特征矩陣。

var userFactors = initialize(userInBlocks, rank, seedGen.nextLong()) var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())

初始化后的userFactors的格式是(用戶分區(qū)id,用戶特征矩陣factors),其中factors是一個二維數(shù)組,第一維的長度是用戶數(shù),第二維的長度是rank數(shù)。初始化的值是異或隨機數(shù)的F范式。itemFactors的初始化與此類似。

(6)利用inblock和outblock信息構(gòu)建最小二乘。
??構(gòu)建最小二乘的方法是在computeFactors方法中實現(xiàn)的。我們以商品inblock信息結(jié)合用戶outblock信息構(gòu)建最小二乘為例來說明這個過程。代碼首先用用戶outblock與userFactor進行join操作,然后以商品分區(qū)id為key進行分組。 每一個商品分區(qū)包含一組所需的用戶分區(qū)及其對應(yīng)的用戶factor信息,格式即(用戶分區(qū)id集,用戶分區(qū)對應(yīng)的factor集)。緊接著,用商品inblock信息與merged進行join操作,得到商品分區(qū)所需要的所有信息,即(商品inblock,(用戶分區(qū)id集,用戶分區(qū)對應(yīng)的factor集))。 有了這些信息,構(gòu)建最小二乘的數(shù)據(jù)就齊全了。詳細代碼如下:

val srcOut = srcOutBlocks.join(srcFactorBlocks).flatMap {case (srcBlockId, (srcOutBlock, srcFactors)) =>srcOutBlock.view.zipWithIndex.map { case (activeIndices, dstBlockId) =>(dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx))))} } val merged = srcOut.groupByKey(new ALSPartitioner(dstInBlocks.partitions.length)) dstInBlocks.join(merged)

我們知道求解商品值時,我們需要通過所有和商品關(guān)聯(lián)的用戶向量信息來構(gòu)建最小二乘問題。這里有兩個選擇,第一是掃一遍InBlock信息,同時對所有的產(chǎn)品構(gòu)建對應(yīng)的最小二乘問題; 第二是對于每一個產(chǎn)品,掃描InBlock信息,構(gòu)建并求解其對應(yīng)的最小二乘問題。第一種方式復(fù)雜度較高,具體的復(fù)雜度計算在此不作推導(dǎo)。spark選取第二種方法求解最小二乘問題,同時也做了一些優(yōu)化。 做優(yōu)化的原因是二種方法針對每個商品,都會掃描一遍InBlock信息,這會浪費較多時間,為此,將InBlock按照商品id進行排序(前文已經(jīng)提到過),我們通過一次掃描就可以創(chuàng)建所有的最小二乘問題并求解。 構(gòu)建代碼如下所示:

while (j < dstIds.length) {ls.reset()var i = srcPtrs(j)var numExplicits = 0while (i < srcPtrs(j + 1)) {val encoded = srcEncodedIndices(i)val blockId = srcEncoder.blockId(encoded)val localIndex = srcEncoder.localIndex(encoded)val srcFactor = sortedSrcFactors(blockId)(localIndex)val rating = ratings(i)ls.add(srcFactor, rating)numExplicits += 1i += 1}dstFactors(j) = solver.solve(ls, numExplicits * regParam)j += 1 }

到了這一步,構(gòu)建顯式反饋算法的最小二乘就結(jié)束了。隱式反饋算法的實現(xiàn)與此類似,不同的地方是它將YtY這個值預(yù)先計算了(可以參考文獻【1】了解更多信息),而不用在每次迭代中都計算一遍。代碼如下:

//在循環(huán)之外計算 val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None//在每個循環(huán)內(nèi) if (implicitPrefs) {ls.merge(YtY.get) } if (implicitPrefs) {// Extension to the original paper to handle b < 0. confidence is a function of |b|// instead so that it is never negative. c1 is confidence - 1.0.val c1 = alpha * math.abs(rating)// For rating <= 0, the corresponding preference is 0. So the term below is only added// for rating > 0. Because YtY is already added, we need to adjust the scaling here.if (rating > 0) {numExplicits += 1ls.add(srcFactor, (c1 + 1.0) / c1, c1)} }

后面的問題就如何求解最小二乘了。我們會在最優(yōu)化章節(jié)介紹spark版本的NNLS。

本文轉(zhuǎn)載自:endymecy|ALS

總結(jié)

以上是生活随笔為你收集整理的机器学习算法--ALS的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。