用Hadoop1.0.3实现KMeans算法
從理論上來(lái)講用MapReduce技術(shù)實(shí)現(xiàn)KMeans算法是很Natural的想法:在Mapper中逐個(gè)計(jì)算樣本點(diǎn)離哪個(gè)中心最近,然后Emit(樣本點(diǎn)所屬的簇編號(hào),樣本點(diǎn));在Reducer中屬于同一個(gè)質(zhì)心的樣本點(diǎn)在一個(gè)鏈表中,方便我們計(jì)算新的中心,然后Emit(質(zhì)心編號(hào),質(zhì)心)。但是技術(shù)上的事并沒(méi)有理論層面那么簡(jiǎn)單。
Mapper和Reducer都要用到K個(gè)中心(我習(xí)慣稱(chēng)之為質(zhì)心),Mapper要讀這些質(zhì)心,Reducer要寫(xiě)這些質(zhì)心。另外Mapper還要讀存儲(chǔ)樣本點(diǎn)的數(shù)據(jù)文件。我先后嘗試以下3種方法,只有第3種是可行的,如果你不想被我誤導(dǎo),請(qǐng)直接跳過(guò)前兩種。
一、用一個(gè)共享變量在存儲(chǔ)K個(gè)質(zhì)心
由于K很小,所以我們認(rèn)為用一個(gè)Vector<Sample>來(lái)存儲(chǔ)K個(gè)質(zhì)心是沒(méi)有問(wèn)題的。以下代碼是錯(cuò)誤的:
| class?MyJob extends Tool{ static?Vector<Sample> centers=new?Vector<Sample>(K); static?class?MyMapper extends Mapper{ //read centers } static?class?MyMapper extends Reducer{ //update centers } void?run(){ until ( convergence ){ map(); reduce(); } } |
發(fā)生這種錯(cuò)誤是因?yàn)閷?duì)hadoop執(zhí)行流程不清楚,對(duì)數(shù)據(jù)流不清楚。簡(jiǎn)單地說(shuō)Mapper和Reducer作為MyJob的內(nèi)部靜態(tài)類(lèi),它們應(yīng)該是獨(dú)立的--它們不應(yīng)該與MyJob有任何交互,因?yàn)镸apper和Reducer分別在Task Tracker的不同JVM中運(yùn)行,而MyJob以及MyJob的內(nèi)部其他類(lèi)都在客戶(hù)端上運(yùn)行,自然不能在不同的JVM中共享一個(gè)變量。
詳細(xì)的流程是這樣的:
首先在客戶(hù)端上,JVM加載MyJob時(shí)先初始化靜態(tài)變量,執(zhí)行static塊。然后提交作業(yè)到Job Tracker。
在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer線程獲得了MyJob類(lèi)靜態(tài)變量的初始拷貝(這份拷貝是指MyJob執(zhí)行完靜態(tài)塊之后靜態(tài)變量的模樣)。
在Task Tracker上,Mapper和Reducer分別地讀寫(xiě)MyJob的靜態(tài)變量的本地拷貝,但是并不影響原始的MyJob中的靜態(tài)變量的值。
二、用分布式緩存文件存儲(chǔ)K個(gè)質(zhì)心
既然不能通過(guò)共享外部類(lèi)變量的方式,那我們通過(guò)文件在map和reduce之間傳遞數(shù)據(jù)總可以吧,Mapper從文件中讀取質(zhì)心,Reducer把更新后的質(zhì)心再寫(xiě)入這個(gè)文件。這里的問(wèn)題是:如果確定要把質(zhì)心放在文件中,那Mapper就需要從2個(gè)文件中讀取數(shù)據(jù)--質(zhì)心文件和樣本數(shù)據(jù)文件。雖然有MutipleInputs可以指定map()的輸入文件有多個(gè),并可以為每個(gè)輸入文件分別指定解析方式,但是MutipleInputs不能保證每條記錄從不同文件中傳給map()的順序。在我們的KMeans中,我們希望質(zhì)心文件全部被讀入后再逐條讀入樣本數(shù)據(jù)。
于是乎就想到了DistributedCache,它主要用于Mapper和Reducer之間共享數(shù)據(jù)。DistributedCacheFile是緩存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式讀取它。于是我又有了一個(gè)錯(cuò)誤的思路:
| class?MyMaper{ ????Vector<Sample> centers=new?Vector<Sample>(K); ????void?setup(){ ????????//讀取cacheFile,給centers賦值 ????} ????void?map(){ ????????//計(jì)算樣本離哪個(gè)質(zhì)心最近 ????} } class?MyReducer{ ????Vector<Sample> centers=new?Vector<Sample>(K); ????void?reduce(){ ????????//更新centers ????} ????void?cleanup(){ ????????//把centers寫(xiě)回cacheFile ????} } |
錯(cuò)因:DistributedCacheFile是只讀的,在任務(wù)運(yùn)行前,TaskTracker從JobTracker文件系統(tǒng)復(fù)制文件到本地磁盤(pán)作為緩存,這是單向的復(fù)制,是不能寫(xiě)回的。試想在分布式環(huán)境下,如果不同的mapper和reducer可以把緩存文件寫(xiě)回的話,那豈不又需要一套復(fù)雜的文件共享機(jī)制,嚴(yán)重地影響hadoop執(zhí)行效率。
三、用分布式緩存文件存儲(chǔ)樣本數(shù)據(jù)
其實(shí)DistributedCache還有一個(gè)特點(diǎn),它更適合于“大文件”(各節(jié)點(diǎn)內(nèi)存容不下)緩存在本地。僅存儲(chǔ)了K個(gè)質(zhì)心的文件顯然是小文件,與之相比樣本數(shù)據(jù)文件才是大文件。
此時(shí)我們需要2個(gè)質(zhì)心文件:一個(gè)存放上一次的質(zhì)心prevCenterFile,一個(gè)存放reducer更新后的質(zhì)心currCenterFile。Mapper從prevCenterFile中讀取質(zhì)心,Reducer把更新后有質(zhì)心寫(xiě)入currCenterFile。在Driver中讀入prevCenterFile和currCenterFile,比較前后兩次的質(zhì)心是否相同(或足夠地接近),如果相同則停止迭代,否則就用currCenterFile覆蓋prevCenterFile(使用fs.rename),進(jìn)入下一次的迭代。
這時(shí)候Mapper就是這樣的:
| class?MyMaper{ ????Vector<Sample> centers=new?Vector<Sample>(K); ????void?map(){ ????????//逐條讀取質(zhì)心,給centers賦值 ????} ????void?cleanup(){ ????????//逐行讀取cacheFile,計(jì)算每個(gè)樣本點(diǎn)離哪個(gè)質(zhì)心最近 ????????//然后Emit(樣本點(diǎn)所屬的簇編號(hào),樣本點(diǎn)) ????} } |
源代碼
試驗(yàn)數(shù)據(jù)是在Mahout項(xiàng)目中作為example提供的,600個(gè)樣本點(diǎn),每個(gè)樣本是一個(gè)60維的浮點(diǎn)向量。點(diǎn)擊下載
為樣本數(shù)據(jù)建立一個(gè)類(lèi)Sample.java。
+ View CodeKMeans.java
+ View Code注意在Driver中創(chuàng)建Job實(shí)例時(shí)一定要把Configuration類(lèi)型的參數(shù)傳遞進(jìn)去,否則在Mapper或Reducer中調(diào)用DistributedCache.getLocalCacheFiles(context.getConfiguration());返回值就為null。因?yàn)榭諛?gòu)造函數(shù)的Job采用的Configuration是從hadoop的配置文件中讀出來(lái)的(使用new Configuration()創(chuàng)建的Configuration就是從hadoop的配置文件中讀出來(lái)的),請(qǐng)注意在main()函數(shù)中有一句:DistributedCache.addCacheFile(dataFile.toUri(), conf);即此時(shí)的Configuration中多了一個(gè)DistributedCacheFile,所以你需要把這個(gè)Configuration傳遞給Job構(gòu)造函數(shù),如果傳遞默認(rèn)的Configuration,那在Job中當(dāng)然不知道DistributedCacheFile的存在了。
Further
方案三還是不如人意,質(zhì)心文件是很小的(因?yàn)橘|(zhì)心總共就沒(méi)幾個(gè)),用map()函數(shù)僅僅是來(lái)讀一個(gè)質(zhì)心文件根本就沒(méi)有發(fā)揮并行的作用,而且在map()中也沒(méi)有調(diào)用context.write(),所以Mapper中做的事情可以放在Reducer的setup()中來(lái)完成,這樣就不需要Mapper了,或者說(shuō)上面設(shè)計(jì)的就不是MapReduce程序,跟平常的單線程串行程序是一樣的。sigh
原文來(lái)自:博客園(華夏35度)http://www.cnblogs.com/zhangchaoyang 作者:Orisun總結(jié)
以上是生活随笔為你收集整理的用Hadoop1.0.3实现KMeans算法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: hadoop下实现kmeans算法——一
- 下一篇: 数据分类:特征处理