日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

KMeans算法的Mapreduce实现

發(fā)布時間:2024/3/13 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 KMeans算法的Mapreduce实现 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?

Hive數(shù)據(jù)分析... 4

一、數(shù)據(jù)處理.... 4

1.1處理不符合規(guī)范的數(shù)據(jù)。... 4

1.2訪問時間分段。... 5

二、基本統(tǒng)計(jì)信息.... 6

三、數(shù)據(jù)屬性基礎(chǔ)分析.... 6

3.1用戶ID分析... 6

3.1.1UID的查詢次數(shù)。... 6

3.1.2UID頻度排名分析。... 7

3.2搜索關(guān)鍵詞分析... 8

3.2.1熱詞分析... 8

3.2.2使用幾個單詞還是一個句子作為關(guān)鍵詞。... 9

3.2.3使用文字描述還是域名一部分作為關(guān)鍵詞。... 10

3.3URL分析... 10

3.3.1熱門搜索分析。... 10

3.3.2URL流量分析。... 11

3.4 Rank分析... 12

3.5 Order分析... 13

3.6訪問時間分析... 14

四、數(shù)據(jù)深入特色分析.... 15

4.1.某一用戶分析... 15

4.1.1該UID背后是否是爬蟲程序?. 15

4.1.2該UID背后是瀏覽器代理程序嗎?... 16

4.2.某一網(wǎng)站分析... 18

4.2.1關(guān)鍵字分析... 18

4.2.2訪問量與時間... 19

KMeans算法的MapReduce實(shí)現(xiàn)... 20

一、制定距離衡量標(biāo)準(zhǔn).... 20

1.1曼哈頓距離衡量時間、Order、Rank之間的距離。... 20

1.2萊文斯坦距離衡量關(guān)鍵字之間的距離。... 21

1.3三種距離的計(jì)算。... 22

1.3.1兩個記錄之間的距離。... 22

1.3.2一條記錄與類簇中心點(diǎn)集合的距離。... 22

1.3.3新類簇中心點(diǎn)集合與舊類簇中心點(diǎn)之間的距離。... 22

二、設(shè)計(jì)定制的Writeable集合與實(shí)現(xiàn)功能函數(shù).... 23

2.1定制的Writable集合:dataCell類... 23

2.1.1構(gòu)造函數(shù):dataCell()、dataCell(String time, String uid, String keyword, int rank, int order, String url)。... 23

2.1.2序列化與反序列化:write(DataOutput out)、readFields(DataInput in)。... 23

2.1.3比較大小:compareTo(dataCell o) 23

2.2功能函數(shù)。... 24

2.2.1從一組元素中計(jì)算一個類簇中心:caculateCenter(List<List<Object>> A) 24

2.2.2從文件中獲取所有類簇中心集合:getCenters(String inputpath) 24

2.2.3從Hdfs獲取程序迭代類簇中心結(jié)果及分類結(jié)果到本地:getCenterResult( String localPath)、getClassfiyResult( String localPath) 24

2.2.4使用新類簇中心集合替換舊類簇中心集合:replaceOldCenter(String oldpath, String newpath) 25

2.2.5判斷新舊兩組類簇中心的距離是否已經(jīng)達(dá)到迭代停止條件:isFinished(String oldpath, String newpath, int max) 25

三、生成初始類簇中心點(diǎn).... 26

四、第一次MapReduce:迭代聚類中心點(diǎn).... 27

4.1MapReduce設(shè)計(jì)。... 27

4.2 Mapper實(shí)現(xiàn)。... 27

4.3 Reducer實(shí)現(xiàn)。... 28

4.4 JobDriver實(shí)現(xiàn)。... 29

五、第二次MapReduce:數(shù)據(jù)分類.... 29

5.1 MapReduce設(shè)計(jì)。... 29

5.2 Mapper實(shí)現(xiàn)。... 30

5.3 Recuce實(shí)現(xiàn)。... 30

5.4 JobDriver實(shí)現(xiàn)。... 31

六、衡量分類效果.... 31

七、運(yùn)行與分析.... 32

7.1?????? 一次完整的程序運(yùn)行... 32

7.1.1產(chǎn)生初始聚類中心。... 32

7.1.2迭代聚類中心。... 33

7.1.3數(shù)據(jù)分類。... 34

7.1.4衡量分類效果。... 34

7.2?????? 尋找最佳類簇個數(shù)

Mapreduce附錄.... 44

(1)??? dataCell類... 44

(2)??? Help類... 47

(3)??? HelpTest 55

(4)??? KMeansDriver 56

(5)??? KmeansMapperForCenter類... 59

(6) ?KMeansReducerForCenter類... 60

(7) ?KMeansMapperForClassify類... 61

(8)? KMeansReducerForClassify類??? 62

KMeans算法的MapReduce實(shí)現(xiàn)

在本文中我使用KMeans算法實(shí)現(xiàn)搜狗搜索數(shù)據(jù)集上的MapReduce程序。K-Means算法輸入聚類個數(shù)k,以及源數(shù)據(jù),并將源數(shù)據(jù)分為k類輸出。在分類后的數(shù)據(jù)中,同一聚類中的對象相似度較高;而不同聚類中的對象相似度較小。眾所周知,KMeans算法在初始中心點(diǎn)選取及聚類個數(shù)方面存在一定不足,在本文中我將在實(shí)現(xiàn)算法之余對于這兩點(diǎn)嘗試做出一些改進(jìn)。此外,想要順利的實(shí)現(xiàn)算法清晰的思路必不可少,在程序?qū)崿F(xiàn)方面,我將按照制定距離衡量標(biāo)準(zhǔn)、生成初始聚類中心、迭代聚類中心、數(shù)據(jù)分類、衡量分類效果的步驟進(jìn)行,最后還將對使用不同個數(shù)聚類中心,程序所展示出的效果進(jìn)行分析。

一、制定距離衡量標(biāo)準(zhǔn)

搜狗數(shù)據(jù)集每行一條記錄,每條記錄由六個屬性構(gòu)成:時間、用戶ID、搜索關(guān)鍵字、Order、Rank和URL。因?yàn)閿?shù)據(jù)集沒有分類標(biāo)志,所以不能使用有監(jiān)督算法對其進(jìn)行分類,只能使用無監(jiān)督算法。在六個屬性中,用戶ID是一串瀏覽器生成的字符,并不能衡量兩個ID之間的距離,所以這里我們不將其考慮到算法中;URL的命名規(guī)則很隨意,也很難衡量兩個URL之間的距離,則算法中也不考慮URL屬性。除此之外,我們將在算法中,依據(jù)時間、搜索關(guān)鍵字、OrderRank對數(shù)據(jù)之間的距離進(jìn)行衡量,并分類

時間、搜索關(guān)鍵字、Order、Rank這四個屬性擁有不同的特征,其中時間、Order、Rank是整數(shù),可以執(zhí)行數(shù)字運(yùn)算;而搜索關(guān)鍵字是字符串無法執(zhí)行數(shù)字運(yùn)算,從而這兩類屬性需要使用不同的方法衡量距離。這里我們使用曼哈頓距離衡量時間、Order、Rank之間的距離,使用萊文斯坦距離衡量搜索關(guān)鍵字之間的距離:

1.1曼哈頓距離衡量時間、Order、Rank之間的距離。

在數(shù)據(jù)集中,時間是連續(xù)變化的其范圍是:2011年12月30日至2011年12月31日,數(shù)據(jù)格式為“20111230000005”,其中第7,8位數(shù)字表示小時,為了不使計(jì)算過于麻煩,我們以小時(即時間屬性字符串的7和8位數(shù)字)作為該條數(shù)據(jù)時間屬性的值,每天有24個小時,這里我們對其進(jìn)行歸一化,設(shè)at為記錄A的時間屬性值,bt為記錄B的時間屬性值,則記錄A與記錄B之間時間屬性的距離如公式(1)所示:

Dt=abs(at-bt)/24??? ????????????????????????公式(1)

Order是該條記錄在網(wǎng)頁展示時的排序,這是較為重要的一個屬性。在數(shù)據(jù)集中Order值的范圍在1~40之間,設(shè)ao為記錄A的Order數(shù)值,bo為記錄B的Order數(shù)值,則記錄A與記錄B之間的Order屬性的的距離如公式(2)所示:

Do=abs(ao-bo) ???????????????????????????公式(2)

Rank記錄用戶點(diǎn)擊的次序,也是一個很重要的屬性。這里設(shè)ar為記錄A的Rank值,br為記錄B的Rank至,則記錄A與記錄B之間的Order屬性的距離如公式(3)所示:

Dr=abs(ar-br)?? ??????????????????????????公式(3)

1.2萊文斯坦距離衡量關(guān)鍵字之間的距離。

本數(shù)據(jù)集中的記錄中的搜索關(guān)鍵字屬性是用戶在使用搜狗瀏覽器輸入的搜索內(nèi)容,因其是文本,不能使用簡單的算術(shù)運(yùn)算衡量其距離,所以這里選擇編輯距離——萊文斯坦距離衡量兩個關(guān)鍵字之間的距離。在信息論和計(jì)算機(jī)科學(xué)中,萊文斯坦距離是一種兩個字符串序列的距離度量。形式化地說,兩個單詞的萊文斯坦距離是一個單詞變成另一個單詞要求的最少單個字符編輯數(shù)量(如:刪除、插入和替換)。萊文斯坦距離也被稱做編輯距離,盡管它只是編輯距離的一種,與成對字符串比對緊密相關(guān)。其定義為,兩個字符串a(chǎn),b的萊文斯坦距離記為,其計(jì)算公式為公式(11):

? ?????公式(11)

這里,???????????è?¨?¤o?-??|??2a?b?é??o|???????ˉ????????????o1???|?????o0??¤o?§???°?è???·???a的前?i個字符和b的前?j?個字符之間的距離。

這里我們采用向量存儲的方式實(shí)現(xiàn)萊文斯坦距離的計(jì)算,使用函levenshteinTwoRows(String string1, int s_len, String string2, int t_len) 來實(shí)現(xiàn),該函數(shù)的執(zhí)行過程如流程圖1所示,具體實(shí)現(xiàn)代碼見代碼(1)。設(shè)ak為記錄A的關(guān)鍵字,設(shè)bk為記錄B的關(guān)鍵字,則記錄A與記錄B之間的Keyword屬性的距離如公式(4)所示:

Dk=levenshteinTwoRows(ak,len(ak),bk,len(bk)) ??????????公式(4)

1.3三種距離的計(jì)算。

綜上所述,數(shù)據(jù)集中任意兩條記錄:記錄A與記錄B之間的距離可以使用公式(5)來計(jì)算。結(jié)合程序需求,我們需要計(jì)算三種情況的距離:1??¤??aè?°???1é′?è·?|???2????è?°?????±??°??-???1é??è·?|???3新類簇中心點(diǎn)集合與舊類簇中心點(diǎn)之間的距離。

D=Dt+Do+Dr+Dk????????????????????????? 公式(5)

1.3.1兩個記錄之間的距離

該功能使用函數(shù)caculateDistance0List<Object> A,List<Object> B實(shí)現(xiàn),其實(shí)現(xiàn)邏輯為:程序使用公式(5)計(jì)算兩個參數(shù)的距離,并返回該距離。函數(shù)的實(shí)現(xiàn)代碼見附錄Help類,函數(shù)的測試函數(shù)為caculateDistance0Test(),代碼內(nèi)容見附錄HelpTest類。

1.3.2一條記錄與類簇中心點(diǎn)集合的距離

該功能使用函數(shù)caculateDistance1List<Object> A,List< List<Object>> B)實(shí)現(xiàn),其實(shí)現(xiàn)邏輯為:程序依次讀取B中的元素,并使用公式(5)計(jì)算該元素與A的距離,記錄每次的距離,最終返回最小距離所對應(yīng)的元素。函數(shù)的實(shí)現(xiàn)代碼見附錄Help類,函數(shù)的測試函數(shù)為caculateDistance1Test(),代碼內(nèi)容見附錄HelpTest類。

1.3.3新類簇中心點(diǎn)集合與舊類簇中心點(diǎn)之間的距離。

該功能使用函數(shù)caculateDistance2(List< List<Object>> A,List< List<Object>> B),其實(shí)現(xiàn)邏輯為:程序依次讀取A的第K個元素與B的第K個元素(其中K∈(0,len(A))),并使用公式(5)計(jì)算距離,將每次得到的距離累加得到D,返回D/len(A)。

二、設(shè)計(jì)定制的Writeable集合與實(shí)現(xiàn)功能函數(shù)

2.1定制的Writable集合:dataCell類

?????? Hadoop有一套非常有用的Writable實(shí)現(xiàn)可以滿足大部分需求,但是在本文的情況下,我們需要設(shè)計(jì)構(gòu)造一個新的實(shí)現(xiàn),從而完全控制二進(jìn)制的表示和排序順序,這將有助于后續(xù)的MapReduce算法實(shí)現(xiàn)。

?????? 我們使用類dataCell實(shí)現(xiàn)對于一條記錄的存儲與表示。每條記錄有六個字段,則dataCell需為這六個字段創(chuàng)建對應(yīng)的屬性,分別是: String time;String uid;String keyword;int rank;int order;String url。此外,我們?yōu)槠溥@些屬性提供getter和setter方法。為了讓dataCell類能夠用于MapReduce過程的數(shù)據(jù)傳輸中,我們需要讓dataCell類可序列化、可比較大小,這里我們通過讓類dataCell實(shí)現(xiàn)接口WritableComparable<dataCell>實(shí)現(xiàn)這些功能。dataCell的代碼實(shí)現(xiàn)見附錄dataCell類。

2.1.1構(gòu)造函數(shù):dataCell()、dataCell(String time, String uid, String keyword, int rank, int order, String url)。

在dataCell類中我們提供兩個構(gòu)造函數(shù),其中無參構(gòu)造函數(shù)用于反序列化時的反射;擁有六個參數(shù)的構(gòu)造函數(shù)用于實(shí)例化一個dataCell對象,函數(shù)體內(nèi)六個形參依次對類的六個屬性賦值。

2.1.2序列化與反序列化:write(DataOutput out)、readFields(DataInput in)。

本類中序列化與反序列化的功能通過實(shí)現(xiàn)函數(shù)write與readFields實(shí)現(xiàn)。write函數(shù)實(shí)現(xiàn)序列化,本函數(shù)將六個屬性依次寫入輸出流out,這里要注意的是寫出String類型的屬性時需要使用寫出UTF的形式。readFields函數(shù)實(shí)現(xiàn)反序列化,該函數(shù)對應(yīng)于write寫出屬性的格式與順序?qū)傩詮妮斎肓鱥n中讀取出來。

2.1.3比較大小:compareTo(dataCell o)

MapReduce的suffer過程中需要將輸出的鍵值對進(jìn)行排序,所以dataCell有必要實(shí)現(xiàn)比較大小的功能。這里我們將參數(shù)列表傳入的參數(shù)o與類屬性通過上文所提到函數(shù)caculateDistance0進(jìn)行比較(注:這里不取絕對值),若結(jié)果大于0,返回1;結(jié)果小于0,返回-1。

2.2功能函數(shù)。

?????? 為了讓MapReduce程序結(jié)構(gòu)更清晰,讓程序的可用性更高,這里我們將一些復(fù)雜的邏輯函提取出來放到Help類中,具體實(shí)現(xiàn)代碼見附錄Help類,對應(yīng)測試代碼見附錄HelpTest類。

2.2.1從一組元素中計(jì)算一個類簇中心:caculateCenter(List<List<Object>> A)

本函數(shù)適用于迭代類簇的Reduce程序中。函數(shù)接收一組記錄,首先遍歷記錄計(jì)算出這組記錄的平均值,然后再次遍歷記錄從記錄中找到與平均值距離最近的那條記錄,作為新的類簇中心返回。這里要注意的是:不能直接返回這組記錄的平均值作為新的類簇中心,否則會造成類簇中心集合元素缺失的問題

2.2.2從文件中獲取所有類簇中心集合:getCenters(String inputpath)

該函數(shù)的主要邏輯為從參數(shù)列表中獲的類簇中心集合的路徑,然后通過HDFS的API接口逐行讀取類簇中心文件,并將每行數(shù)據(jù)封裝成為一個List<Object>,最后返回類簇中心列表List< List<Object>>。

2.2.3從Hdfs獲取程序迭代類簇中心結(jié)果及分類結(jié)果到本地:getCenterResult( String localPath)、getClassfiyResult( String localPath)

MapReduce程序執(zhí)行完畢后會在輸出目錄下產(chǎn)生運(yùn)行結(jié)果,getCenterResult與getClassfiyResult分別將類簇中心結(jié)果與分類結(jié)果拷貝到本地。這兩個函數(shù)邏輯大致相同,使用HDFS的API接口從集群上取得對應(yīng)的文件,然后將該文件放入?yún)?shù)localPath路徑中。

2.2.4使用新類簇中心集合替換舊類簇中心集合:replaceOldCenter(String oldpath, String newpath)

由于HDFS的API中并沒有提供集群中移動文件的方法,在這里我們通過首先將新類簇中心文件下載到本地文件,然后再舊類簇中心文件刪除,最后再將本地文件上傳到舊類簇中心文件中的方法實(shí)現(xiàn)該功能。參數(shù)oldpath為舊類簇中心文件的路徑,newpath的新類簇中心文件的路徑,該函數(shù)由isFinished函數(shù)調(diào)用。

2.2.5判斷新舊兩組類簇中心的距離是否已經(jīng)達(dá)到迭代停止條件:isFinished(String oldpath, String newpath, int max)

該函數(shù)首先使用函數(shù)getCenters()分別從參數(shù)oldpath和參數(shù)newpath所對應(yīng)的路徑中獲取舊類簇中心集合與新類簇中心集合,然后使用函數(shù)caculateDistance2()計(jì)算兩組類簇的距離,如果距離小于max,則滿足停止迭代條件,返回false;若距離大于max,則不滿足迭代條件,使用函數(shù)replaceOldCenter將舊類簇中心文件替換為新類簇中心文件,返回true。使用流程圖表示如圖1所示

圖1 isFinished函數(shù)流程圖

三、生成初始類簇中心點(diǎn)

初始聚類中心的選擇對于KMeans算法來說十分重要,初始類簇中心的好壞直接影響到聚類的效果。這里我使用“選擇批次距離盡可能遠(yuǎn)的K個點(diǎn)”的方法,具體操作步驟為,首先隨機(jī)選擇一個點(diǎn)作為作為初始類簇中心點(diǎn),然后選擇距離該店最遠(yuǎn)的那個點(diǎn)作為第二個初始聚類中心點(diǎn),然后再選擇距離前兩個點(diǎn)的最近距離最大的點(diǎn)作為第三個初始類簇的中心點(diǎn),以此類推,直至選擇出K個初始類簇中心點(diǎn)。

基于以上思想,在程序中實(shí)現(xiàn)該算法時,可以按照圖1中流程執(zhí)行。該算法使用函數(shù)ProdeceCenter(String inputpath,int k,int initRank,int initOrder)實(shí)現(xiàn),其中參數(shù)inputpath為源數(shù)據(jù)的路徑,k為要生成的初始類簇集合元素的個數(shù),initRank為隨機(jī)生成的初始類簇中心。函數(shù)的實(shí)現(xiàn)代碼見附錄Help類。

圖1 生成初始類簇中心點(diǎn)

四、第一次MapReduce:迭代聚類中心點(diǎn)

在KMean算法中,迭代聚類中心是使用初始類簇作為集合做初始分類,然后再每個分類中尋找中心點(diǎn)作為新的類簇中心點(diǎn),如此迭代,直到迭代次數(shù)足夠多或者新舊兩組類簇的類簇距離足夠小。下面,將按照MapReduce設(shè)計(jì)、Mapper實(shí)現(xiàn)、Reducer實(shí)現(xiàn)、JobDriver實(shí)現(xiàn)三部分進(jìn)行闡述。

4.1MapReduce設(shè)計(jì)。

該部分的MapReduce讀取源數(shù)據(jù),讀取初始類簇集合,產(chǎn)生聚類中心集合。Map部分逐行讀入搜狗搜索數(shù)據(jù),并找到類簇集合中距離該行數(shù)據(jù)最近的類簇,然后將最近的類簇的序號作為這一行數(shù)據(jù)的標(biāo)簽,最終將標(biāo)簽作為Key,改行數(shù)據(jù)作為Value作為數(shù)據(jù)寫出;Reduce部分負(fù)責(zé)接收Map產(chǎn)生的數(shù)據(jù),并在標(biāo)簽相同的數(shù)據(jù)中找到中心點(diǎn),將中心點(diǎn)作為新的類簇輸出;JobDriver部分負(fù)責(zé)一些配置工作,并負(fù)責(zé)計(jì)算新舊兩組類簇集合的距離、統(tǒng)計(jì)迭代的次數(shù),其中類簇集合的距離與迭代的次數(shù)均可以控制整個MapReduce過程的停止。其中,Map部分與Reduce部分的輸入輸出格式如表1所示。

表1 Map與Reduce的輸入輸出格式

?

輸入

輸出

Map

(字節(jié)偏移量,一行數(shù)據(jù)內(nèi)容)

(類簇中心標(biāo)志,一行數(shù)據(jù)內(nèi)容)

Reduce

(類簇中心標(biāo)志,多行數(shù)據(jù)內(nèi)容)

(NULLWriteable,新的類簇中心)

4.2 Mapper實(shí)現(xiàn)。

本文中我們使用類KmeansMapperForCenter實(shí)現(xiàn)迭代聚類中心的Mapper,該類的實(shí)現(xiàn)代碼見附錄KmeansMapperForCenter類。該類繼承Mapper<LongWritable,Text,IntWritable,Text>類,并實(shí)現(xiàn)了Mapper類的抽象方法map。在map函數(shù)中實(shí)現(xiàn)了Mapper部分的主要邏輯,其流程如圖1所示。

圖1 map函數(shù)流程圖

4.3 Reducer實(shí)現(xiàn)。

本文中我們使用類KMeansReducerForCenter實(shí)現(xiàn)迭代聚類中心的Reducer,該類的實(shí)現(xiàn)代碼見附錄KMeansReducerForCenter類。該類繼承Reducer<IntWritable, Text, NullWritable, Text>類,并實(shí)現(xiàn)了Reducer類的抽象方法reduce。在reduce函數(shù)中實(shí)現(xiàn)了Reducer部分的主要邏輯,其流程如圖2所示。

圖2 reduce函數(shù)流程圖

4.4 JobDriver實(shí)現(xiàn)。

?????? JobDriver部分驅(qū)動MapReduce的執(zhí)行,這里我們在類KMeansDriver中的getCenter()函數(shù)中實(shí)現(xiàn)該功能。getCenter()需要為MapReduce流程設(shè)置六個變量:輸入路徑、輸出路徑、舊類簇中心文件、新類簇中心內(nèi)文件、類簇個數(shù)、聚類停止條件,并且該函數(shù)還設(shè)置了Map過程使用類,Reduce過程使用類等。我們在這個函數(shù)中控制迭代類簇中心的迭代次數(shù),該函數(shù)的流程如圖1所示,實(shí)現(xiàn)代碼見附錄KMeansDriver類。

圖2 getCenter()函數(shù)流程圖

五、第二次MapReduce:數(shù)據(jù)分類

在KMeans算法中,數(shù)據(jù)分類一部分比較簡單,該部分為每一個源數(shù)據(jù)中的元素在類簇中心集合中尋找一個距離最近的類簇中心,并將該元素標(biāo)記為該類簇中心類即可。下面,將按照MapReduce設(shè)計(jì)、Mapper實(shí)現(xiàn)、Reducer實(shí)現(xiàn)、JobDriver實(shí)現(xiàn)三部分進(jìn)行闡述。

5.1 MapReduce設(shè)計(jì)。

該部分的MapReduce讀取源數(shù)據(jù),讀取初始類簇中心集合,給每個源數(shù)據(jù)中元素分類并輸出。Map部分負(fù)責(zé)逐行讀入搜狗搜索數(shù)據(jù),并找到類簇中心集合中距離該行數(shù)據(jù)最近的類簇中心,然后將最近的類簇中心的序號作為這一行數(shù)據(jù)的標(biāo)簽,最終將標(biāo)簽作為Key,改行數(shù)據(jù)作為Value作為數(shù)據(jù)寫出;Reduce部分負(fù)責(zé)將Map傳輸過來的數(shù)據(jù)逐行輸出到結(jié)果集中;JobDriver部分負(fù)責(zé)程序的配置工作,以及提交任務(wù)。其中,Map部分與Reduce部分的輸入輸出格式如表2所示。

表2 Map與Reduce的輸入輸出格式

?

輸入

輸出

Map

(字節(jié)偏移量,一行數(shù)據(jù)內(nèi)容)

(類簇中心標(biāo)志,一行數(shù)據(jù)內(nèi)容)

Reduce

(類簇中心標(biāo)志,多行數(shù)據(jù)內(nèi)容)

n(類簇中心標(biāo)志,一行數(shù)據(jù)內(nèi)容)

5.2 Mapper實(shí)現(xiàn)。

本文中我們使用類KMeansMapperForClassify實(shí)現(xiàn)迭代聚類中心的Mapper,該類的實(shí)現(xiàn)代碼見附錄KMeansMapperForClassify類。該類繼承Mapper<LongWritable,Text,IntWritable,dataCell>類,并實(shí)現(xiàn)了Mapper類的抽象方法map。在map函數(shù)中實(shí)現(xiàn)了Mapper部分的主要邏輯,其流程如圖1所示。

圖1 map函數(shù)流程圖

5.3 Recuce實(shí)現(xiàn)。

本文中我們使用類KMeansReducerForCenter實(shí)現(xiàn)迭代聚類中心的Reducer,該類的實(shí)現(xiàn)代碼見附錄KMeansReducerForCenter類。該類繼承Reducer<IntWritable, Text, NullWritable, Text>類,并實(shí)現(xiàn)了Reducer類的抽象方法reduce。在reduce函數(shù)中實(shí)現(xiàn)了Reducer部分的主要邏輯,該部分比較簡單,直接將迭代器中的dataCell對象寫出到文件中即可。

5.4 JobDriver實(shí)現(xiàn)。

JobDriver部分驅(qū)動MapReduce的執(zhí)行,這里我們在類KMeansDriver中的forClssify()函數(shù)中實(shí)現(xiàn)該功能。forClssify()需要為MapReduce流程設(shè)置四個變量:輸入路徑、輸出路徑、類簇中心文件、類簇個數(shù),并且該函數(shù)還設(shè)置了Map過程使用類,Reduce過程使用類等。該函數(shù)過于簡單,只是對Job做了一些簡單的配置,在這里不予展示,實(shí)現(xiàn)代碼見附錄KMeansDriver類。

六、衡量分類效果

KMeans算法將數(shù)據(jù)分為幾類,如何度量分類效果是值得考慮的問題。聚類的任務(wù)是將目標(biāo)樣本分為若干簇,并且保證每個簇之間樣本盡可能接近,并且不同簇的樣本距離盡可能遠(yuǎn)。基于此,聚類的效果好壞又分為兩類指標(biāo)衡量,一類是外部聚類效果,一類是內(nèi)部聚類效果。這里我們僅使用內(nèi)部聚類效果來衡量聚類的效果,且由于作業(yè)時間太緊,我們僅僅衡量聚類的緊湊度一項(xiàng)指標(biāo)。

這里我們使用類簇中所有樣本到類簇中心距離的累加和作為衡量緊湊度的標(biāo)準(zhǔn),其中,數(shù)據(jù)集合相同的情況下,累加和越小,緊湊度越高;累加和越大,緊湊度越低。我們使用函數(shù)measureResult(String inputPath ,String centerPath,int k)衡量聚類效果,該函數(shù)有三個參數(shù):inputPath為分類后結(jié)果數(shù)據(jù)集的路徑,centerPath是迭代后類簇中心點(diǎn)的坐標(biāo),k是類簇的個數(shù)。程序調(diào)用getCenter()函數(shù)得到類簇中心點(diǎn)的集合,然后逐行讀取inputPath中的數(shù)據(jù)并計(jì)算其與對應(yīng)類簇中心的距離,并將距離累加,最終打印累加距離的值。該函數(shù)的實(shí)現(xiàn)代碼見附錄中Help類,其實(shí)現(xiàn)的流程如圖1所示。

圖1 measureResult函數(shù)流程圖

七、運(yùn)行與分析

由于電腦配置跟不上,而KMeans算法有需要較多的迭代次數(shù),所以這里我僅使用了10000條數(shù)據(jù)運(yùn)行程序。

7.1?? 一次完整的程序運(yùn)行

在執(zhí)行程序之前,首先要做一些配置:(1)創(chuàng)建迭代類簇中心點(diǎn)輸出文件夾,創(chuàng)建分類結(jié)果輸出文件夾;(2)將源數(shù)據(jù)提交到集群上;(3)將代碼打包上傳到Linux系統(tǒng)上。程序的運(yùn)行步驟為:產(chǎn)生初始聚類中心、迭代聚類中心、數(shù)據(jù)分類、衡量分類效果。

7.1.1產(chǎn)生初始聚類中心。

我們首先產(chǎn)生20個不重復(fù)的類簇中心,以time=00,Rank=1,Order=1,key=“火影忍者”為隨機(jī)初始類簇中心,運(yùn)行函數(shù)Help.ProdeceCenter(),可以得到初始類簇中心,這里僅展示前10個,如表1所示。

?

表1 初始類簇中心

時間

搜索關(guān)鍵詞

Rank

Order

20111230001328

火影忍者

2

2

20111230001600

蹲墻誘相公

10

10

20111230004356

家園守衛(wèi)戰(zhàn)羅德港防守攻略

1

1

20111230003246

Gay 性騷擾 圖

10

9

20111230002353

汕頭市金平區(qū)八年級第一學(xué)期數(shù)學(xué)試卷

3

1

20111230000219

人體藝術(shù)

9

9

20111230004156

廣州渥格服裝輔料有限公司

1

2

20111230001037

快播 中文字幕 主婦42

10

8

20111230003830

WWW、RRMMM、COM

1

9

20111230001230

海南師范大學(xué)美術(shù)系校園照片

10

1

7.1.2迭代聚類中心。

?????? 根據(jù)上文產(chǎn)生的初始類簇中心,我們選取前六個初始類簇中心點(diǎn)迭代聚類中心。本此迭代共計(jì)六輪,最終迭代后的類簇中心如表2所示,程序運(yùn)行截圖如圖1所示。

表2 迭代后的類簇中心

時間

搜索關(guān)鍵詞

Rank

Order

20111230000249

天與地

2

1

20111230004246

HTCG10手機(jī)系統(tǒng)自帶軟件怎么刪除?

1

2

20111230004356

家園守衛(wèi)戰(zhàn)羅德港防守攻略

1

1

20111230000158

北京市西城區(qū)2008英語抽樣測試答案

4

1

20111230002353

汕頭市金平區(qū)八年級第一學(xué)期數(shù)學(xué)試卷

3

1

20111230001418

環(huán)衛(wèi)工人業(yè)務(wù)知識競賽搶答題

4

2

圖1 程序運(yùn)行截圖

7.1.3數(shù)據(jù)分類。

?????? 根據(jù)以上迭代產(chǎn)生的類簇中心點(diǎn)集合,我們執(zhí)行數(shù)據(jù)分類操作,運(yùn)行函數(shù)forClssify(),可以對數(shù)據(jù)集進(jìn)行分類,部分分類結(jié)果如表1所示。從表1中我們可以看到分類效果還是不錯的。

表1 數(shù)據(jù)分類結(jié)果

類簇

時間

用戶ID

關(guān)鍵詞

Rank

Order

URL

0

20111230002225

5794763849288f418c58789492cd1f2e

左耳

2

1

http://www.tudou.com/programs/view/q96O7olHT-Q/

0

20111230000942

b54b6c1e8039276b87c8002be3e8583f

遵義宅 快遞電話

2

2

http://zhidao.baidu.com/question/235623645

0

20111230001418

1b4fc71d2a068a638e66db462a93f89f

最終幻想

2

1

http://www.163dyy.com/detail/1678.html

0

20111230003938

fa936e397a0994997f234681a65549b2

最新移動手機(jī)充值q幣

2

1

http://service.qq.com/info/25295.html

1

20111230003905

3c21686be709b847009680976d6a2b4c

百度一下

1

2

http://www.baidu.com/

1

20111230004234

6056710d9eafa569ddc800fe24643051

百度一下

1

2

http://www.baidu.com/

1

20111230000701

c71267c05b21e2a8f6a3e6b812fabc1f

百度ady

1

2

http://zhidao.baidu.com/question/188644177

7.1.4衡量分類效果。

經(jīng)過以上三個步驟,我們已經(jīng)基本完成了KMeans算法的基本過程,最后對算法的分類效果進(jìn)行衡量。運(yùn)行函數(shù)measureResult()可以得到類簇的累加距離,運(yùn)行結(jié)果為10528。該數(shù)據(jù)需要有多組分類數(shù)據(jù)時進(jìn)行比較才有意義,所以接下來我們尋找本數(shù)據(jù)集的最佳類簇個數(shù)。

7.2?? 尋找最佳類簇個數(shù)

我們使用產(chǎn)生聚類中心小節(jié)中產(chǎn)生的初始聚類中心,分別取前1個、前2個、前3個、前4個、前5個、前6個、前7個初始類簇中心對數(shù)據(jù)集進(jìn)行聚類,并最終使用函數(shù)measureResult()計(jì)算累加距離,結(jié)果如表1所示。將表中數(shù)據(jù)用折線圖表示如圖1所示,從圖中我們可以清楚看到,在類簇個數(shù)為3時,圖中曲線出現(xiàn)了很大的轉(zhuǎn)折:在類簇個數(shù)小于3時,每增加一個類簇,累加距離下降頻度很大;在類簇個數(shù)大于3時,每增加一個類簇,累加距離下降頻度較小。由此,當(dāng)類簇個數(shù)為3時,既可以保證較好的分類效果,又可以避免分類過于細(xì)致的麻煩。

表1 1~7個類簇的分類效果

類簇個數(shù)

距離

1

25805

2

17503

3

12348

4

11798

5

11003

6

10528

7

10288

?

圖1 1~7個類簇的分類效果

Mapreduce附錄

(1)?????? dataCell類

package myKMeans;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**

?* 自定義的可以作為MR傳輸對象的類

?* @author zheng

?*

?*/

public class dataCell implements WritableComparable<dataCell>{

?????? private String time;

?????? private String uid;

?????? private String keyword;

?????? private int rank;

?????? private int order;

?????? private String url;

/**************************get set 方法************************/

?????? public String getTime() {

????????????? return time;

?????? }

?????? public void setTime(String time) {

????????????? this.time = time;

?????? }

?????? public String getUid() {

????????????? return uid;

?????? }

?????? public void setUid(String uid) {

????????????? this.uid = uid;

?????? }

?????? public String getKeyword() {

????????????? return keyword;

?????? }

?????? public void setKeyword(String keyword) {

????????????? this.keyword = keyword;

?????? }

?????? public int getRank() {

????????????? return rank;

?????? }

?????? public void setRank(int rank) {

????????????? this.rank = rank;

?????? }

?????? public int getOrder() {

????????????? return order;

?????? }

?????? public void setOrder(int order) {

????????????? this.order = order;

?????? }

?????? public String getUrl() {

????????????? return url;

?????? }

?????? public void setUrl(String url) {

????????????? this.url = url;

?????? }

/*************************構(gòu)造函數(shù)*****************************/

?????? /**

?????? ?* 構(gòu)造函數(shù)

?????? ?* @param time

?????? ?* @param uid

?????? ?* @param keyword

?????? ?* @param rank

?????? ?* @param order

?????? ?* @param url

?????? ?*/

?????? public dataCell(String time, String uid, String keyword, int rank, int order, String url) {

????????????? super();

????????????? this.time = time;

????????????? this.uid = uid;

????????????? this.keyword = keyword;

????????????? this.rank = rank;

????????????? this.order = order;

????????????? this.url = url;

?????? }

?????? /**

?????? ?* 無參構(gòu)造函數(shù)

?????? ?* 空構(gòu)造函數(shù)用于反射 反序列化

?????? ?*/

?????? public dataCell() {

????????????? super();

?????? }

/**********************實(shí)現(xiàn)接口函數(shù)*****************************/

?????? /**

?????? ?*

?????? ?* 反序列化的方法,反序列化是,從流中讀取到各個字段的順序應(yīng)該與序列化時些出去的順序保持一致

?????? ?*/

?????? public void readFields(DataInput in) throws IOException {

????????????? // TODO Auto-generated method stub

????????????? time=in.readUTF();

????????????? uid=in.readUTF();

????????????? keyword=in.readUTF();

????????????? rank=in.readInt();

????????????? order=in.readInt();

????????????? url=in.readUTF();

?????? }

?????? /**

?????? ?* 序列化的方法

?????? ?*/

?????? public void write(DataOutput out) throws IOException {

????????????? // TODO Auto-generated method stub

????????????? out.writeUTF(time);

????????????? out.writeUTF(uid);

????????????? out.writeUTF(keyword);

????????????? out.writeInt(rank);

????????????? out.writeInt(order);

????????????? out.writeUTF(url);

?????? }

?????? /**

?????? ?* 比較排序

?????? ?*/

?????? public int compareTo(dataCell o) {

????????????? // TODO Auto-generated method stub

????????????? //正序排列

????????????? if(this.rank>o.rank){

???????????????????? return 1;

????????????? }

????????????? else if (this.order>o.order){

???????????????????? return 1;

????????????? }

????????????? else{

???????????????????? return -1;

????????????? }

?????? }

?????? /**

?????? ?* 字符串輸出時的方法

?????? ?*/

?????? public String toString(){

????????????? return time+"\t"+uid+"\t"+keyword+"\t"+rank+"\t"+order+"\t"+url;

?????? }

}

(2)?????? Help類

package myKMeans;

import java.io.File;

import java.io.FileOutputStream;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.BlockLocation;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.junit.Before;

import org.junit.Test;

import java.io.IOException;

import java.util.ArrayList;

import java.util.LinkedList;

import java.util.List;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.util.LineReader;

import org.apache.xerces.util.URI;

public class Help {

?????? /**

?????? ?* 從hdfs文件中獲取中心點(diǎn),返回中心點(diǎn)列表的List

?????? ?* @param inputpath

?????? ?* @return

?????? ?*/

?????? public static ArrayList<ArrayList<Integer>> getCenters(String inputpath) {

????????????? // TODO Auto-generated method stub

????????????? ArrayList<ArrayList<Integer>> result = new ArrayList<ArrayList<Integer>>();?

??????? Configuration conf = new Configuration();

????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");

??????? try {?

??????????? FileSystem hdfs = FileSystem.get(conf);

??????????? Path in = new Path(inputpath);?

??????????? FSDataInputStream fsIn = hdfs.open(in);?

??????????? LineReader lineIn = new LineReader(fsIn, conf);?

??????????? Text line = new Text();?

??????????? while (lineIn.readLine(line) > 0){?

????? ??????????String record = line.toString();?

??????????????? /**

???????????????? *? 因?yàn)镠adoop輸出鍵值對時會在鍵跟值之間添加制表符, 所以用空格代替之。

???????????????? */?

???????????????

??????????????? String[] fields = record.split("\t");?

??????????????? List<Integer> tmplist = new ArrayList<Integer>();?

??????????????? for (int i = 0; i < fields.length; ++i){?

??????????????????? tmplist.add(Integer.parseInt(fields[i]));?

??????????????? }?

??????????????? result.add((ArrayList<Integer>) tmplist);?

??????????? }?

????? ??????fsIn.close();?

??????? } catch (IOException e){?

??????????? e.printStackTrace();?

??????? }?

??????? return result;? ???? ?

?????? }

?????? /**

?????? ?* 計(jì)算兩個點(diǎn)之間的距離,返回兩個點(diǎn)的距離

?????? ?* @param data

?????? ?* @param arrayList

?????? ?* @return

?????? ?*/

?????? public static int caculateDistance0(ArrayList<Integer> data, ArrayList<Integer> arrayList) {

????????????? // TODO Auto-generated method stub

????????????? //曼哈頓距離

????????????? int x1=data.get(0);

????????????? int y1=data.get(1);

????????????? int x2=arrayList.get(0);

????????????? int y2=arrayList.get(1);

????????????? int distance=Math.abs(x1-x2)+Math.abs(y1-y2);

????????????? return distance;

?????? }

?????? /**

?????? ?* 計(jì)算oldcenter隊(duì)列與newcenter隊(duì)列之間的距離,返回old隊(duì)列中中心點(diǎn)與new隊(duì)列中對應(yīng)中心點(diǎn)的距離之和

?????? ?* @param oldCenter

?????? ?* @param newCenter

?????? ?* @param k

?????? ?* @return

?????? ?*/

?????? public static int caculateDistance2(List<ArrayList<Integer>> oldCenter,

???????????????????? List<ArrayList<Integer>> newCenter) {

????????????? // TODO Auto-generated method stub

????????????? //曼哈頓距離

????????????? int distance=0;

????????????? //System.out.println(oldCenter.size());

????????????? //System.out.println(newCenter.size());

????????????? for(int i=0;i<oldCenter.size()&&i<newCenter.size();i++){

???????????????????? distance+=Math.abs(oldCenter.get(i).get(0)-newCenter.get(i).get(0))

???????????????????? +Math.abs(oldCenter.get(i).get(1)-newCenter.get(i).get(1));

????????????? }

????????????? return distance;

?????? }

?????? /**

?????? ?* 計(jì)算一個點(diǎn)與中心點(diǎn)隊(duì)列的距離,返回該點(diǎn)與隊(duì)列中所有中心的距離之和

?????? ?* @param node

?????? ?* @param centerList

?????? ?* @return

?????? ?*/

?????? public static int caculateDistance1( List<Integer> data,

???????????????????? List<ArrayList<Integer>> centerList) {

????????????? // TODO Auto-generated method stub

????????????? //曼哈頓距離

????????????? int distance=0;

????????????? for(int i=0;i<centerList.size();i++){

???????????????????? int temp=Integer.MIN_VALUE;

???????????????????? temp=Math.abs(data.get(0)-centerList.get(i).get(0))

?????????????????????????????????? +Math.abs(data.get(1)-centerList.get(i).get(1));

???????????????????? if(temp!=0){

??????????????????????????? distance+=temp;

???????????????????? }

???????????????????? else{

??????????????????????????? distance=Integer.MIN_VALUE;

??????????????????????????? return distance;

???????????????????? }

????????????? }

????????????? return distance;

?????? }

?????? /**

?????? ?* 計(jì)算中心點(diǎn)

?????? ?* 在一堆node里面找中間的那一個

?????? ?* @param helpList

?????? ?* @return

?????? ?*/

?????? public static Text caculateCenter(List<ArrayList<Integer>> helpList) {

????????????? // TODO Auto-generated method stub

????????????? float rankTotal=0.0f;

????????????? float orderTotal=0.0f;

????????????? int totalDistance=Integer.MAX_VALUE;

????????????? int rankRusult=Integer.MAX_VALUE;

????????????? int orderResult=Integer.MAX_VALUE;

????????????? int i=0;

????????????? for(ArrayList<Integer> list:helpList){

???????????????????? rankTotal+=list.get(0);

???????????????????? orderTotal+=list.get(1);

???????????????????? i++;

????????????? }

????????????? System.out.println("$$$$$$$$$$"+i);

????????????? int rank=0;

????????????? int order=0;

????????????? if(i!=0){

???????????????????? rank =Math.round(rankTotal/i);

???????????????????? order=Math.round(orderTotal/i);

???????????????????? for(ArrayList<Integer> list:helpList){

??????????????????????????? int temp=list.get(0)-rank+list.get(1)-order;

??????????????????????????? if(temp<totalDistance){

?????????????????????????????????? rankRusult=list.get(0);

?????????????????????????????????? orderResult=list.get(1);

?????????????????????????????????? totalDistance=temp;

??????????????????????????? }

???????????????????? }

????????????? }

????????????? //System.out.println(rank);

????????????? //System.out.println(order);

????????????? Text result=new Text(rankRusult+"\t"+orderResult);

????????????? System.out.println(rankRusult+"\t"+orderResult);

????????????? return result;

?????? }

?????? /**

?????? ?* 判斷當(dāng)前中心點(diǎn)是否已經(jīng)到達(dá)停止條件

?????? ?* @param oldpath

?????? ?* @param newpath

?????? ?* @param k

?????? ?* @param max

?????? ?* @return

?????? ?* @throws IOException

?????? ?*/

?????? public static boolean isFinished(String oldpath, String newpath, int max) throws IOException {

????????????? // TODO Auto-generated method stub

????????????? //<oldcenters> <newcenters> <k> <threshold>

????????????? //構(gòu)建oldcenters,newcenters數(shù)組

????????????? List<ArrayList<Integer>> oldcenters = Help.getCenters(oldpath);

??????? List<ArrayList<Integer>> newcenters = Help.getCenters(newpath);

??????? //計(jì)算距離

??????? int distance=Help.caculateDistance2(oldcenters, newcenters);

??????? System.out.println(distance);

??????? if (distance<max){

??????? ?????? //停止迭代

??????? ?????? System.out.println("false");

??????? ?????? return false;

??????? }

??????? else{

??????? ?????? //繼續(xù)迭代

??????? ?????? //使用新中心替換舊中心

??????? ?????? boolean flag=Help.replaceOldCenter(oldpath,newpath);

??????? ?????? System.out.println(flag);

??????? ?????? System.out.println("true");

??????? ?????? return true;

??????? }

?????? }

?????? /**

?????? ?* 使用新中心點(diǎn)替代舊的中心點(diǎn)

?????? ?* @param oldpath

?????? ?* @param newpath

?????? ?* @return

?????? ?* @throws IOException

?????? ?*/

?????? public static boolean replaceOldCenter(String oldpath, String newpath) throws IOException {

????????????? // TODO Auto-generated method stub

????????????? ArrayList<ArrayList<Integer>> result = new ArrayList<ArrayList<Integer>>();?

??????? Configuration conf = new Configuration();

????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");

??????? FileSystem fs = FileSystem.get(conf);

??????? Path newFile = new Path(newpath);

??????? Path oldFile=new Path(oldpath);

??????? //Path temp=new Path("/root/testForHelp1.txt");

??????? Path temp=new Path("C:\\Users\\zheng\\Desktop\\testForHelp1.txt");

??????? //"/root/testForHelp1.txt"

????????????? fs.copyToLocalFile(newFile, temp);

????????????? fs.copyFromLocalFile(temp, oldFile);

????????????? return true;???

?????? }

?????? public static boolean getClassfiyResult( String localPath) throws IOException {

????????????? // TODO Auto-generated method stub

????????????? ArrayList<ArrayList<Integer>> result = new ArrayList<ArrayList<Integer>>();?

??????? Configuration conf = new Configuration();

????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");

??????? FileSystem fs = FileSystem.get(conf);

??????? Path resultFile = new Path("/outForClassify/part-r-00000");

??????? Path localFile=new Path(localPath);

????????????? fs.copyToLocalFile(resultFile, localFile);

????????????? System.out.println("successful copy");

????????????? return true;???

?????? }

?????? public static boolean getCenterResult( String localPath) throws IOException {

????????????? // TODO Auto-generated method stub

????????????? ArrayList<ArrayList<Integer>> result = new ArrayList<ArrayList<Integer>>();?

??????? Configuration conf = new Configuration();

????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");

??????? FileSystem fs = FileSystem.get(conf);

??????? Path resultFile = new Path("/out/part-r-00000");

??????? Path localFile=new Path(localPath);

????????????? fs.copyToLocalFile(resultFile, localFile);

????????????? System.out.println("successful copy");

????????????? return true;???

?????? }

?????? /**

?????? ?* 產(chǎn)生中心點(diǎn)

?????? ?* @param inputpath? 元數(shù)據(jù)集

?????? ?* @param k 要產(chǎn)生幾個聚類中心

?????? ?* @param initRank? 初始的rank

?????? ?* @param initOrder? 初始的order

?????? ?*/

?????? public static? void ProdeceCenter(String inputpath,int k,int initRank,int initOrder){

?????? ??? Configuration conf = new Configuration();

????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");

????????????? //保存所有的center點(diǎn)的隊(duì)列

????????????? List<ArrayList<Integer>> centerList=new ArrayList<ArrayList<Integer>>();

????????????? //保存最開始的中心點(diǎn)

????????????? ArrayList<Integer> firstCenter=new ArrayList<Integer>();

????????????? firstCenter.add(initRank);

????????????? firstCenter.add(initOrder);

????????????? //將最開始的中心點(diǎn)加入隊(duì)列

????????????? centerList.add(firstCenter);

????????????? //保存臨時最大距離

????????????? int maxDistance=0;

????????????? //保存待選中心點(diǎn)

????????????? ArrayList<Integer> tmpCenter=new ArrayList<Integer>();

????????????? for(int i=0;i<k-1;i++){

???????????????????? try {?

??????????????????????????? //打開目標(biāo)數(shù)據(jù)文件

??????????????????????????? FileSystem hdfs = FileSystem.get(conf);

??????????????????????????? Path in = new Path(inputpath);?

??????????????????????????? FSDataInputStream fsIn = hdfs.open(in);?

??????????????????????????? LineReader lineIn = new LineReader(fsIn, conf);?

??????????????????????????? Text line = new Text();?

??????????????????????????? //對數(shù)據(jù)文件中的每一行都進(jìn)行處理

??????????? ?????? while (lineIn.readLine(line) > 0){

??????????? ????????????? //從取出的記錄中拿到Rank Order對,進(jìn)行比較

??????????? ????????????? String record = line.toString();???????????????

??????????? ????????????? String[] fields = record.split("\t");?

??????????? ????????????? ArrayList<Integer> data = new ArrayList<Integer>();?

??????????? ????????????? data.add(Integer.parseInt(fields[3]));

??????????? ????????????? data.add(Integer.parseInt(fields[4]));

??????????? ????????????? //比較list,將距離最遠(yuǎn)的放在tmpCenter里面

??????????? ????????????? int tmpDistance=Help.caculateDistance1(data, centerList);

??????????? ????????????? if(tmpDistance>maxDistance){

??????????? ???????????????????? boolean flag=true;

??????????? ???????????????????? for(ArrayList<Integer> c:centerList){

??????????? ???????????????????? if(Integer.parseInt(fields[3])==c.get(0)&&Integer.parseInt(fields[4])==c.get(1)){

??????????? ?????????????????????????????????? flag=false;

??????????? ??????????????????????????? }

??????????? ???????????????????? }

??????????? ???????????????????? if(flag){

??????????? ???????????????????? tmpCenter=data;

??????????????? ????????????? maxDistance=tmpDistance;

??????????? ???????????????????? }

??????????? ????????????? }

??????????? ?????? }

??????????? ?????? centerList.add(tmpCenter);

??????????? ?????? System.out.println(tmpCenter.get(0)+"? "+tmpCenter.get(1));

??????????? ?????? fsIn.close();?

???????????????????? } catch (IOException e){?

??????????????????????????? e.printStackTrace();?

???????????????????? }?

????????????? ?}

????????????? for(ArrayList<Integer> c:centerList){

???????????????????? System.out.print(c.get(0)+" "+c.get(1)+";");

????????????? }

?????? }

?????? /**

?????? ?* 衡量聚類的結(jié)果? 返回質(zhì)心距離的累加和,這里使用曼哈頓距離

?????? ?* @param inputPath? 聚類的結(jié)果及

?????? ?* @param centerPth? 聚類中心

?????? ?* @param k? 聚類中心的個數(shù)

?????? ?* @return

?????? ?*/

?????? public static int measureResult(String inputPath ,String centerPth,int k){

????????????? Configuration conf = new Configuration();

????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");

????????????? int [] distanceList=new int[k];

????????????? try {?

???????????????????? //打開目標(biāo)數(shù)據(jù)文件

???????????????????? FileSystem hdfs = FileSystem.get(conf);

???????????????????? Path in = new Path(inputPath);?

???????????????????? FSDataInputStream fsIn = hdfs.open(in);?

???????????????????? LineReader lineIn = new LineReader(fsIn, conf);?

???????????????????? Text line = new Text();?

???????????????????? //對數(shù)據(jù)文件中的每一行都進(jìn)行處理

???????????????????? ArrayList<ArrayList<Integer>> centers =Help.getCenters(centerPth);

??????? ?????? while (lineIn.readLine(line) > 0){

??????? ????????????? String record = line.toString();???????????????

??????? ????????????? String[] fields = record.split("\t");

??????? ????????????? int centerNum=Integer.parseInt(fields[0]);

??????? ????????????? int rank=Integer.parseInt(fields[4]);

??????? ????????????? int order=Integer.parseInt(fields[5]);

??????? ????????????? ArrayList<Integer> data = new ArrayList<Integer>();?

??????? ????????????? data.add(rank);

??????? ????????????? data.add(order);

???? distanceList[centerNum]+=Help.caculateDistance0(centers.get(centerNum), data);

??????? ?????? }

????????????? } catch (IOException e){?

????????????? e.printStackTrace();?

????????????? }

????????????? int distanceTotal=0;

?????? for(int i=0;i<k;i++){

????????????? //System.out.println(distanceList[i]);

????????????? distanceTotal+=distanceList[i];

?????? }

?????? System.out.println(distanceTotal);

?????? return distanceTotal;

?????? }

}

(3)?? HelpTest

package myKMeans;

import static org.junit.Assert.*;

import java.io.IOException;

import java.util.ArrayList;

import java.util.LinkedList;

import java.util.List;

import org.apache.hadoop.io.Text;

import org.junit.Test;

public class Helptest {

?????? @Test

?????? public void getCentersTest() {

????????????? String path="/testForHelp.txt" ;

????????????? ArrayList<ArrayList<Integer>> result =Help.getCenters(path);

????????????? for(ArrayList<Integer>? re :result){

???????????????????? System.out.println(re.get(0));

???????????????????? System.out.println(re.get(1));

????????????? }

?????? }

?????? @Test

?????? public void caculateDistanceTest(){

????????????? ArrayList<Integer> data=new ArrayList<Integer>();

????????????? data.add(1);

????????????? data.add(2);

????????????? ArrayList<Integer> arrayList=new ArrayList<Integer>();

????????????? arrayList.add(7);

????????????? arrayList.add(2);

????????????? int distance=Help.caculateDistance0(data, arrayList);

?????? System.out.println(distance);?????

?????? }

?????? @Test

?????? public void caculateCenterTest(){

????????????? List<ArrayList<Integer>> list=new LinkedList<ArrayList<Integer>>();

????????????? ArrayList<Integer> a=new ArrayList<Integer>();

????????????? a.add(1);

????????????? a.add(2);

????????????? ArrayList<Integer> b=new ArrayList<Integer>();

????????????? b.add(2);

????????????? b.add(3);

????????????? ArrayList<Integer> c=new ArrayList<Integer>();

????????????? c.add(3);

????????????? c.add(4);

????????????? list.add(a);

????????????? list.add(b);

????????????? list.add(c);

????????????? Text t=Help.caculateCenter(list);

????????????? System.out.println(t.getLength());

?????? }

?????? @Test

?????? public void replaceOldCenterTest() throws IOException{

????????????? String oldpath = "/testForHelp.txt";

????????????? String newpath = "/out/part-r-00000";

????????????? Help.replaceOldCenter(oldpath,newpath); }

?????? @Test

?????? public void caculateDistance1Test(){

????????????? List<Integer> data=new ArrayList<Integer>();

????????????? List<ArrayList<Integer>> centerList=new ArrayList<ArrayList<Integer>>();

????????????? data.add(10);

????????????? data.add(6);

????????????? ArrayList<Integer> data1=new ArrayList<Integer>();

????????????? data1.add(1);

????????????? data1.add(1);

????????????? ArrayList<Integer> data2=new ArrayList<Integer>();

????????????? data2.add(10);

????????????? data2.add(10);

????????????? centerList.add( data1);

????????????? centerList.add( data2);

????????????? int a=Help.caculateDistance1(data, centerList);

????????????? System.out.println(a);

?????? }

}

(4)?? KMeansDriver

package myKMeans;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class KMeansDriver {

?????? public final static int K=5;

?????? public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException{

????????????? ?//Help.ProdeceCenter("/in/ssaa", 20, 1, 1);

????????????? ?//getCenter();

????????????? ?forClssify();

//Help.getClassfiyResult("C:\\Users\\zheng\\Desktop\\mapreduce\\5M\\"+K+"\\result\\result.txt");

????????????? ?//Help.getCenterResult("C:\\Users\\zheng\\Desktop\\mapreduce\\5M\\"+K+"\\result\\center.txt");

????????????? ?//Help.measureResult("/outForClassify/part-r-00000", "/out/part-r-00000", K);

?? }?

?????? public? static void getCenter() throws IOException, ClassNotFoundException, InterruptedException{

????????????? int repeated=0;

????????????? ?String[] otherArgs=new String[]{"/in","/out","/oldCenterSet","/out/part-r-00000",K+"","1"

????????????? };

????????????? do{

???????????????????? Configuration conf = new Configuration();?

???????????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");

?????????? //String[] otherArgs? = new GenericOptionsParser(conf, args).getRemainingArgs();?

?????????? if (otherArgs.length != 6){?

?????????????? System.err.println("Usage: <in> <out> <oldcenters> <newcenters> <k> <threshold>");?

??? ???????????System.exit(2);?

?????????? }?

????????

?????????? conf.set("centerpath", otherArgs[2]);?

?????????? conf.set("kpath", otherArgs[4]);?

?????????? Job job = new Job(conf, "KMeansCluster");??

?????????? job.setJarByClass(KMeansDriver.class);?

?????????? Path in = new Path(otherArgs[0]);?

?????????? Path out = new Path(otherArgs[1]);?

?????????? FileInputFormat.addInputPath(job, in);?

?????????? FileSystem fs = FileSystem.get(conf);?

?????????? if (fs.exists(out)){

?????????????? fs.delete(out, true);?

?????????? }?

?????????? FileOutputFormat.setOutputPath(job, out);?

?????????? job.setMapperClass(KmeansMapperForCenter.class);?

?????????? job.setReducerClass(KMeansReducerForCenter.class);

?????????? job.setOutputKeyClass(IntWritable.class);?

?????????? job.setOutputValueClass(Text.class);?

?????????? job.waitForCompletion(true);

?????????? ++repeated;?

?????????? System.out.println("We have repeated " + repeated + " times.");?

??????? } while (repeated < 9

?????? ???????? ?&& (Help.isFinished(otherArgs[2], otherArgs[3], Integer.parseInt(otherArgs[5]))));?

//&& (Help.isFinished(args[2], args[3], Integer.parseInt(args[4]), Float.parseFloat(args[5])) == false)

?????

?????? }

?????? public? static void forClssify() throws IOException, ClassNotFoundException, InterruptedException{

????????????? ?String[] otherArgs=new String[]{"/in","/outForClassify","/oldCenterSet","/out/part-r-00000",K+"","2"

????????????? };

???????????????????? Configuration conf = new Configuration();?

???????????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");

???????? ?//String[] otherArgs? = new GenericOptionsParser(conf, args).getRemainingArgs();?

????????? if (otherArgs.length != 6){?

????????????? System.err.println("Usage: <in> <out> <oldcenters> <newcenters> <k> <threshold>");?

????????????? System.exit(2);?

????????? }?

????????? conf.set("centerpath", otherArgs[2]);?

????????? conf.set("kpath", otherArgs[4]);?

????????? Job job = new Job(conf, "KMeansCluster");?

????????? job.setJarByClass(KMeansDriver.class);?

????????? Path in = new Path(otherArgs[0]);?

????????? Path out = new Path(otherArgs[1]);?

????????? FileInputFormat.addInputPath(job, in);

????????? FileSystem fs = FileSystem.get(conf);?

????????? if (fs.exists(out)){?

????????????? fs.delete(out, true);?

????????? }?

????????? FileOutputFormat.setOutputPath(job, out);?

????????? job.setMapperClass(KMeansMapperForClassify.class);/

????????? job.setReducerClass(KMeansReducerForClassify.class);

????????? job.setOutputKeyClass(IntWritable.class);

????????? job.setOutputValueClass(dataCell.class);

????????? job.setMapOutputKeyClass(IntWritable.class);

????????? job.setMapOutputValueClass(dataCell.class);

????????? job.waitForCompletion(true);

?????? }

}

(5)?? ?KmeansMapperForCenter

package myKMeans;

import org.apache.hadoop.io.IntWritable;?

import org.apache.hadoop.io.LongWritable;?

import org.apache.hadoop.io.Text;?

import org.apache.hadoop.mapreduce.Mapper;?

?

import java.io.IOException;?

import java.util.ArrayList;?

import java.util.List;?

/***

?* 未獲得中心點(diǎn)的mapper?

?* @author zheng

?*

?*/

public class KmeansMapperForCenter extends Mapper<LongWritable,Text,IntWritable,Text>{

?????? public void map(LongWritable key,Text value,Context context)

???????????????????? throws IOException,InterruptedException{

????????????? String line =value.toString();

????????????? String[] fields=line.split("\t");

????????????? int rank=Integer.parseInt(fields[3]);

????????????? int order=Integer.parseInt(fields[4]);

????????????? ArrayList<Integer> data =new ArrayList<Integer>();

????????????? data.add(rank);

????????????? data.add(order);

?????????????

????????????? //獲取中心點(diǎn)列表

????????????? List<ArrayList<Integer>> centers = Help.getCenters(context.getConfiguration().get("centerpath"));

????????????? //有幾個聚類中心

????????????? int k = Integer.parseInt(context.getConfiguration().get("kpath"));?

????????????? //當(dāng)前數(shù)據(jù)與中心點(diǎn)的最小距離

????????????? int minDist = Integer.MAX_VALUE;?

????????????? //中心點(diǎn)索引

????????????? int centerIndex = k;

????????????? //計(jì)算樣本點(diǎn)到各個中心的距離,并把樣本聚類到距離最近的中心點(diǎn)所屬的類

????????????? for(int i=0;i<k;i++){

???????????????????? int currentDist=0;

???????????????????? currentDist=Help.caculateDistance0(data,centers.get(i));

???????????????????? if(minDist>currentDist){

??????????????????????????? minDist=currentDist;

??????????????????????????? centerIndex=i;

???????????????????? }

????????????? }

????????????? Text centerdata=new Text(rank+"\t"+order);

????????????? context.write(new IntWritable(centerIndex), centerdata);

?????? }

}

(6) KMeansReducerForCenter類

package myKMeans;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;?

import org.apache.hadoop.mapreduce.Reducer;?

import java.io.IOException;?

import java.util.ArrayList;

import java.util.LinkedList;

import java.util.List;?

/**

?* 為獲得中心點(diǎn)的reducer

?* @author zheng

?*

?*/

public class KMeansReducerForCenter extends Reducer<IntWritable, Text, NullWritable, Text> {

?????? public void reduce(IntWritable key,Iterable<Text> value,Context context)

???????????????????? throws IOException,InterruptedException {

????????????? System.out.println("#######################");

????????????? List<ArrayList<Integer>>? helpList=new LinkedList<ArrayList<Integer>> ();

????????????? String tempResult="";

????????????? for(Text val:value){

???????????????????? String line =val.toString();

???????????????????? String[] fields=line.split("\t");

???????????????????? ArrayList<Integer> tempList=new ArrayList<Integer>();

???????????????????? for( String f:fields){

??????????????????????????? tempList.add(Integer.parseInt(f));

???????????????????? }

???????????????????? helpList.add(tempList);

????????????? }

????????????? //計(jì)算新的聚類中心

????????????? ?Text result= Help.caculateCenter(helpList);

????????????? ?context.write(NullWritable.get(), result);

?????? }

}

(7) KMeansMapperForClassify類

package myKMeans;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Mapper.Context;

/**

?* 為數(shù)據(jù)集分類的mapper

?* @author zheng

?*

?*/

public class KMeansMapperForClassify extends

Mapper<LongWritable,Text,IntWritable,dataCell>{

?????? public void map(LongWritable key,Text value,Context context)

???????????????????? throws IOException,InterruptedException{

????????????? String line =value.toString();

????????????? String[] fields=line.split("\t");

????????????? dataCell cell=new dataCell(fields[0],fields[1],fields[2]

?????????????????????????? ,Integer.parseInt(fields[3]),Integer.parseInt(fields[4]),fields[5]);

????????????? int rank=cell.getRank();

????????????? int order=cell.getOrder();

????????????? ArrayList<Integer> data =new ArrayList<Integer>();

????????????? data.add(rank);

????????????? data.add(order);

?????? ?????? //獲取中心點(diǎn)列表

????????????? List<ArrayList<Integer>> centers = Help.getCenters(context.getConfiguration().get("centerpath"));

????????????? //有幾個聚類中心

????????????? int k = Integer.parseInt(context.getConfiguration().get("kpath"));?

????????????? //當(dāng)前數(shù)據(jù)與中心點(diǎn)的最小距離

????????????? int minDist = Integer.MAX_VALUE;?

????????????? //中心點(diǎn)索引

????????????? int centerIndex = k;

????????????? //計(jì)算樣本點(diǎn)到各個中心的距離,并把樣本聚類到距離最近的中心點(diǎn)所屬的類

????????????? for(int i=0;i<k;i++){

???????????????????? int currentDist=0;

???????????????????? currentDist=Help.caculateDistance0(data,centers.get(i));

???????????????????? if(minDist>currentDist){

??????????????????????????? minDist=currentDist;

??????????????????????????? centerIndex=i;

???????????????????? }

????????????? }

????????????? Text centerdata=new Text(rank+"\t"+order);

????????????? context.write(new IntWritable(centerIndex), cell);

?????? }

}

(8)? KMeansReducerForClassify類

package myKMeans;

import java.io.IOException;

import java.util.ArrayList;

import java.util.LinkedList;

import java.util.List;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.hadoop.mapreduce.Reducer;

/**

?* 為數(shù)據(jù)集分類的Reducer

?* @author zheng

?*

?*/

public class KMeansReducerForClassify extends Reducer<IntWritable, dataCell, IntWritable, dataCell>{

?????? public void reduce(IntWritable key,Iterable<dataCell> value,Context context)

???????????????????? throws IOException,InterruptedException {

????????????? System.out.println("#######################");

????????????? for(dataCell val:value){

???????????????????? context.write(key, val);

?????? ?????? }

?????? }

}

?

?

總結(jié)

以上是生活随笔為你收集整理的KMeans算法的Mapreduce实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。