通过简单的Word Count讲解MapReduce原理以及Java实现
MapReduce原理:
??????MapReduce采用"分而治之"的思想,把對大規(guī)模數(shù)據(jù)集的操作,分發(fā)給一個主節(jié)點管理下的各個分節(jié)點共同完成,然后通過整合各個節(jié)點的中間結(jié)果,得到最終結(jié)果。簡單地說,MapReduce就是"任務(wù)的分解與結(jié)果的匯總"。
?
在Hadoop中,用于執(zhí)行MapReduce任務(wù)的機器角色有兩個:一個是JobTracker;另一個是TaskTracker,JobTracker是用于調(diào)度工作的,TaskTracker是用于執(zhí)行工作的。一個Hadoop集群中只有一臺JobTracker。
?
在分布式計算中,MapReduce框架負(fù)責(zé)處理了并行編程中分布式存儲、工作調(diào)度、負(fù)載均衡、容錯均衡、容錯處理以及網(wǎng)絡(luò)通信等復(fù)雜問題,把處理過程高度抽象為兩個函數(shù):map和reduce,map負(fù)責(zé)把任務(wù)分解成多個任務(wù),reduce負(fù)責(zé)把分解后多任務(wù)處理的結(jié)果匯總起來。
?
需要注意的是,用MapReduce來處理的數(shù)據(jù)集(或任務(wù))必須具備這樣的特點:待處理的數(shù)據(jù)集可以分解成許多小的數(shù)據(jù)集,而且每一個小數(shù)據(jù)集都可以完全并行地進行處理。
在Hadoop中,每個MapReduce任務(wù)都被初始化為一個Job,每個Job又可以分為兩種階段:map階段和reduce階段。這兩個階段分別用兩個函數(shù)表示,即map函數(shù)和reduce函數(shù)。map函數(shù)接收一個形式的輸入,然后同樣產(chǎn)生一個形式的中間輸出,Hadoop函數(shù)接收一個如形式的輸入,然后對這個value集合進行處理,每個reduce產(chǎn)生0或1個輸出,reduce的輸出也是形式的。
下面以一個最簡單的例子說明:
單詞計數(shù)是最簡單也是最能體現(xiàn)MapReduce思想的程序之一,可以稱為MapReduce版"Hello World",該程序的完整代碼可以在Hadoop安裝包的"src/examples"目錄下找到。單詞計數(shù)主要完成功能是:統(tǒng)計一系列文本文件中每個單詞出現(xiàn)的次數(shù),如下圖所示。
?
package org.apache.hadoop.examples;
?
import java.io.IOException;
?
import java.util.StringTokenizer;
?
import org.apache.hadoop.conf.Configuration;
?
import org.apache.hadoop.fs.Path;
?
import org.apache.hadoop.io.IntWritable;
?
import org.apache.hadoop.io.Text;
?
import org.apache.hadoop.mapreduce.Job;
?
import org.apache.hadoop.mapreduce.Mapper;
?
import org.apache.hadoop.mapreduce.Reducer;
?
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
?
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
?
import org.apache.hadoop.util.GenericOptionsParser;
?
public class WordCount {
?
public static class TokenizerMapper
?
extends Mapper?{
?
private final static IntWritable one = new IntWritable(1);
?
private Text word = new Text();
?
?
?
public void map(Object key, Text value, Context context)
?
throws IOException, InterruptedException {
?
StringTokenizer itr = new StringTokenizer(value.toString());
?
while (itr.hasMoreTokens()) {
?
word.set(itr.nextToken());
?
context.write(word, one);
?
}
?
}
?
}
?
public static class IntSumReducer
?
extends Reducer {
?
private IntWritable result = new IntWritable();
?
public void reduce(Text key, Iterable values,Context context)
?
?throws IOException, InterruptedException {
?
int sum = 0;
?
for (IntWritable val : values) {
?
sum += val.get();
?
}
?
result.set(sum);
?
context.write(key, result);
?
}
?
}
?
?
?
public static void main(String[] args) throws Exception {
?
Configuration conf = new Configuration();
?
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
?
if (otherArgs.length != 2) {
?
System.err.println("Usage: wordcount ");
?
System.exit(2);
?
}
?
Job job = new Job(conf, "word count");
?
job.setJarByClass(WordCount.class);
?
job.setMapperClass(TokenizerMapper.class);
?
job.setCombinerClass(IntSumReducer.class);
?
job.setReducerClass(IntSumReducer.class);
?
job.setOutputKeyClass(Text.class);
?
job.setOutputValueClass(IntWritable.class);
?
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
?
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
?
System.exit(job.waitForCompletion(true) ? 0 : 1);
?
}
?
}
?
Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,并重寫其map方法。通過在map方法中添加兩句把key值和value值輸出到控制臺的代碼,可以發(fā)現(xiàn)map方法中value值存儲的是文本文件中的一行(以回車符為行結(jié)束標(biāo)記),而key值為該行的首字母相對于文本文件的首地址的偏移量。然后StringTokenizer類將每一行拆分成為一個個的單詞,并將作為map方法的結(jié)果輸出,其余的工作都交有MapReduce框架處理。
Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reducer類,并重寫其reduce方法。Map過程輸出中key為單個單詞,而values是對應(yīng)單詞的計數(shù)值所組成的列表,Map的輸出就是Reduce的輸入,所以reduce方法只要遍歷values并求和,即可得到某個單詞的總次數(shù)。
在MapReduce中,由Job對象負(fù)責(zé)管理和運行一個計算任務(wù),并通過Job的一些方法對任務(wù)的參數(shù)進行相關(guān)的設(shè)置。此處設(shè)置了使用TokenizerMapper完成Map過程中的處理和使用IntSumReducer完成Combine和Reduce過程中的處理。還設(shè)置了Map過程和Reduce過程的輸出類型:key的類型為Text,value的類型為IntWritable。任務(wù)的輸出和輸入路徑則由命令行參數(shù)指定,并由FileInputFormat和FileOutputFormat分別設(shè)定。完成相應(yīng)任務(wù)的參數(shù)設(shè)定后,即可調(diào)用job.waitForCompletion()方法執(zhí)行任務(wù)。
?
Hadoop提供了如下內(nèi)容的數(shù)據(jù)類型,這些數(shù)據(jù)類型都實現(xiàn)了WritableComparable接口,以便用這些類型定義的數(shù)據(jù)可以被序列化進行網(wǎng)絡(luò)傳輸和文件存儲,以及進行大小比較。
?
?
?
????BooleanWritable:標(biāo)準(zhǔn)布爾型數(shù)值
?
????ByteWritable:單字節(jié)數(shù)值
?
????DoubleWritable:雙字節(jié)數(shù)
?
????FloatWritable:浮點數(shù)
?
????IntWritable:整型數(shù)
?
????LongWritable:長整型數(shù)
?
????Text:使用UTF8格式存儲的文本
?
????NullWritable:當(dāng)中的key或value為空時使用
總結(jié)
以上是生活随笔為你收集整理的通过简单的Word Count讲解MapReduce原理以及Java实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《SAS编程与数据挖掘商业案例》学习笔记
- 下一篇: Java常用类集接口以及实现方式总结