MapReduce 应用:TF-IDF 分布式实现
概述
本文要說的 TF-IDF 分布式實現(xiàn),運用了很多之前 MapReduce 的核心知識點。算是 MapReduce 的一個小應(yīng)用吧。
版權(quán)說明
著作權(quán)歸作者所有。
商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
本文作者:Q-WHai
發(fā)表日期: 2016年6月24日
本文鏈接:https://qwhai.blog.csdn.net/article/details/51747801
來源:CSDN
更多內(nèi)容:分類 >> 大數(shù)據(jù)之 Hadoop
學(xué)前導(dǎo)讀
本文并不打算再啰里啰嗦地講解一大堆 TF-IDF 的概念,以及 TF-IDF 能夠做什么。如果你對此還不夠了解,可以轉(zhuǎn)到我的另一篇博客《 數(shù)據(jù)挖掘:基于TF-IDF算法的數(shù)據(jù)集選取優(yōu)化 》進(jìn)行學(xué)習(xí)。
由于本人的語言表達(dá)可能并不十分簡單明了,如果你閱讀本文的時候遇到一些難以理解的地方,可以點擊下面相關(guān)的鏈接進(jìn)行學(xué)習(xí)。這些都是本文的基礎(chǔ)和前提,當(dāng)然也可以提交評論與我進(jìn)行交流。
- 《 數(shù)據(jù)挖掘:基于TF-IDF算法的數(shù)據(jù)集選取優(yōu)化 》
- 《 從 WordCount 到 MapReduce 計算模型 》
- 《 MapReduce 進(jìn)階:多 MapReduc e的鏈?zhǔn)侥J?》
- 《 MapReduce 進(jìn)階:多路徑輸入輸出 》
- 《 MapReduce 進(jìn)階:Partitioner 組件 》
算法框架
首先我們來看一下,分布式的 TF-IDF 的算法框架圖:
在圖中,我們有三個大模塊,這三個大模塊正是 MapReduce 中的三個 Job。
在學(xué)習(xí) TF-IDF 的時候我們就知道了,TF-IDF 的計算可以分成三個部分進(jìn)行。第一個階段:計算各個文檔中每個單詞的 TF 值;第二階段:計算所有文檔中所有單詞的 IDF 值;第三個階段:計算各個文檔中各個單詞的 TF-IDF 值。在單機的環(huán)境下,很容易實現(xiàn)這些計算。可是,分布式環(huán)境下要怎么做呢?于是,根據(jù)這三個階段,我設(shè)計了上面的架構(gòu)圖。
TFMapReduceCore 類包含的是計算 TF 的核心類,IDFMapReduceCore 中則包含了 IDF 的核心類,IntegrateCore 中包含的是將 TF、IDF 的結(jié)果進(jìn)行整合,從而計算最終的 TF-IDF 結(jié)果。且這里還產(chǎn)生了兩個中間輸出目錄,而這兩個中間輸出目錄也正是第三個階段的輸入目錄,這一步中,需要用到 MapReduce 的多路徑輸入。上面也有專門的文章描述了這一塊。
代碼實現(xiàn)
TFMapReduceCore
這里我將與計算 TF 相關(guān)的代碼封裝在同一個 TFMapReduceCore 類中,其中的 TFMapper, TFReducer 等都是 TFMapReduceCore 類的一個子類。
TFMapper
public static class TFMapper extends Mapper<Object, Text, Text, Text> {private final Text one = new Text("1");private Text label = new Text();private int allWordCount = 0;private String fileName = "";@Overrideprotected void setup(Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {fileName = getInputSplitFileName(context.getInputSplit());}@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {StringTokenizer tokenizer = new StringTokenizer(value.toString());while (tokenizer.hasMoreTokens()) {allWordCount++;label.set(String.join(":", tokenizer.nextToken(), fileName));context.write(label, one);}}@Overrideprotected void cleanup(Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {context.write(new Text("!:" + fileName), new Text(String.valueOf(allWordCount)));}private String getInputSplitFileName(InputSplit inputSplit) {String fileFullName = ((FileSplit)inputSplit).getPath().toString();String[] nameSegments = fileFullName.split("/");return nameSegments[nameSegments.length - 1];} }因為我們輸入的源文件是用一個文件表示一個分類,如果你是以其他規(guī)則劃分,那么可以不必遵從本文的邏輯。上面我首先在 setup() 里獲取文件名,這樣做的目的在于不用在 map() 中重復(fù)獲取,從而提升程序的效率。并且在 cleanup() 里把文件名(也就是分類)信息寫入到 Mapper 的輸出路徑中。
大家可能注意到了這里我寫入文件名的時候,使用了一個技巧,使用“!”充當(dāng)了一個單詞。因為這個字符的 ASCII 碼比所有的字符的 ASCII 碼都要小,這樣做的目的是可以讓這條記錄在其他所有記錄之前被訪問( 這里所指的其他所有記錄指的是,同一個分類中的所有記錄。因為這里我們有對 Mapper 的輸出做 Partitioner 分區(qū) )。
TFCombiner & TFReducer
從上面的 Mapper 中可以看到 Mapper 輸出的 key 的格式為: : 。如此,只要去解析 key 中的 keyword 就可以了。而在 Mapper 的 cleanup() 方法中還寫入文件的信息。這樣一來,我們就可以使用這個 “!: allWordCount” 對每個文件進(jìn)行區(qū)分開來。區(qū)分的原理之前也說到過了,就是因為 “!” 的 ASCII 碼最小的原因。
public static class TFCombiner extends Reducer<Text, Text, Text, Text> {private int allWordCount = 0;@Overrideprotected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {if (values == null) {return;}if(key.toString().startsWith("!")) {allWordCount = Integer.parseInt(values.iterator().next().toString());return;}int sumCount = 0;for (Text value : values) {sumCount += Integer.parseInt(value.toString());}double tf = 1.0 * sumCount / allWordCount;context.write(key, new Text(String.valueOf(tf)));} }通過上面的 Combiner 的 reduce 操作之后,所有單詞的 TF 值都已經(jīng)計算完成。再通過一次 Reducer 操作就 ok 了。Reducer 的代碼如下:
public static class TFReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {if (values == null) {return;}for (Text value : values) {context.write(key, value);}} }TFPartitioner
在 Partitioner 分區(qū)這一塊,就簡單地以自定義的 Hash Partitioner 作為分區(qū)類。如果你有更加嚴(yán)格的要求,可以參考我之前的博客《MapReduce 進(jìn)階:Partitioner 組件》。
public static class TFPartitioner extends Partitioner<Text, Text> {@Overridepublic int getPartition(Text key, Text value, int numPartitions) {String fileName = key.toString().split(":")[1];return Math.abs((fileName.hashCode() * 127) % numPartitions);} }IDFMapReduceCore
這里我將與計算 IDF 相關(guān)的代碼封裝在同一個 IDFMapReduceCore 類中,其中的 IDFMapper, IDFReducer 都是 IDFMapReduceCore 類的一個子類。
IDFMapper
因為 IDF 的計算是針對所有文檔的,所以在 IDFMapper 中可以直接按照計算 WordCount 的邏輯來編寫就 ok 了。因為在計算 IDF 時,我們不需要關(guān)心某一個單詞的詞頻,所以這里統(tǒng)一的使用 1 填充 mapper 的輸出 value.
public static class IDFMapper extends Mapper<Object, Text, Text, Text> {private final Text one = new Text("1");private Text label = new Text();@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {StringTokenizer tokenizer = new StringTokenizer(value.toString());label.set(tokenizer.nextToken().split(":")[0]);context.write(label, one);} }IDFReducer
在前面我們已經(jīng)統(tǒng)計了某一個單詞在某一個文檔(分類)出現(xiàn)的標(biāo)志,也就是單詞 W 在文檔 D 中出現(xiàn)過了一次。這樣一來,我們就可以統(tǒng)計出單詞 W 在全部文檔中出現(xiàn)過多少次了。而這一思想,正是計算 WordCount 邏輯。所以代碼很好編寫。等等,我們還需要計算所有的文檔數(shù)。是的,在計算 IDF 的公式中,我們需要知道一共有多少個文檔。可是,在當(dāng)前的情況下我們無法獲得這個值,因為這是在 Reducer 中。雖然在 Reducer 里面無法計算文檔總數(shù),但是在 Reducer 外面卻可以。這個過程就是純粹的 Java 邏輯,很簡單,不多說了。
當(dāng)我們知道了訓(xùn)練文檔總數(shù),就可以通過 job 將信息傳遞給 Reducer。只是這里我們并不是調(diào)用 job.setNumReduceTasks(N),而是調(diào)用了 job.setProfileParams(msg) 方法。
IntegrateCore
這里我將與計算 TF-IDF 相關(guān)的代碼封裝在同一個 IntegrateCore 類中,其中的 IntegrateMapper, IntegrateReducer 都是 IntegrateCore 類的一個子類。在計算的最后一步中,沒有什么需要說明的地方。只是,前面計算 TF、IDF 產(chǎn)生的中間輸出文件的格式并不統(tǒng)一,所以這里需要對不同格式的文件內(nèi)容進(jìn)行不同的考慮。
IntegrateMapper
IntegrateReducer
public static class IntegrateReducer extends Reducer<Text, Text, Text, Text> {private double keywordIDF = 0.0d;private Text value = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {if (values == null) {return;}if (key.toString().split(":")[1].startsWith("!")) {keywordIDF = Double.parseDouble(values.iterator().next().toString());return;}value.set(String.valueOf(Double.parseDouble(values.iterator().next().toString()) * keywordIDF));context.write(key, value);} }測試運行
數(shù)據(jù)源
android
android java activity maphadoop
map reduce ssh mapreduceios
ios iphone jobsjava
java code eclipse java mappython
python pycharm執(zhí)行命令
執(zhí)行此命令之前,請先將測試數(shù)據(jù)上傳到 HDFS 的 /input 目錄下。
$ hadoop jar temp/run.jar /input /output執(zhí)行結(jié)果
activity:android 0.0994850021680094 android:android 0.0994850021680094 code:java 0.07958800173440753 eclipse:java 0.07958800173440753 ios:ios 0.13264666955734586 iphone:ios 0.13264666955734586 java:android 0.0554621874040891 java:java 0.08873949984654256 jobs:ios 0.13264666955734586 map:android 0.024227503252014105 map:hadoop 0.024227503252014105 map:java 0.019382002601611284 mapreduce:hadoop 0.0994850021680094 pycharm:python 0.1989700043360188 python:python 0.1989700043360188 reduce:hadoop 0.0994850021680094 ssh:hadoop 0.0994850021680094看到這個結(jié)果你可能會認(rèn)為這個結(jié)果不一定可靠。如果你懷疑這些結(jié)果,你可以自己編寫一個單機版的 Java 程序進(jìn)行驗證。當(dāng)然,我已經(jīng)驗證過了。
Job
此處是瀏覽器登錄 Cluster Metrics 的信息展示。顯示的是程序在執(zhí)行完成之后的內(nèi)容,看到有三個 Job 參與了 TF-IDF 的計算。
GitHub download
- https://github.com/Hadoop-league/TF-IDF_MR
征集
如果你也需要使用ProcessOn這款在線繪圖工具,可以使用如下邀請鏈接進(jìn)行注冊:
https://www.processon.com/i/56205c2ee4b0f6ed10838a6d
總結(jié)
以上是生活随笔為你收集整理的MapReduce 应用:TF-IDF 分布式实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MapReduce 进阶:Partiti
- 下一篇: 决策树之 C4.5 算法