初学Hadoop之图解MapReduce与WordCount示例分析
1、MapReduce整體流程
最簡(jiǎn)單的MapReduce應(yīng)用程序至少包含 3 個(gè)部分:一個(gè) Map?函數(shù)、一個(gè) Reduce 函數(shù)和一個(gè) main 函數(shù)。在運(yùn)行一個(gè)mapreduce計(jì)算任務(wù)時(shí)候,任務(wù)過(guò)程被分為兩個(gè)階段:map階段和reduce階段,每個(gè)階段都是用鍵值對(duì)(key/value)作為輸入(input)和輸出(output)。main 函數(shù)將作業(yè)控制和文件輸入/輸出結(jié)合起來(lái)。- 并行讀取文本中的內(nèi)容,然后進(jìn)行MapReduce操作。
- Map過(guò)程:并行讀取文本,對(duì)讀取的單詞進(jìn)行map操作,每個(gè)詞都以<key,value>形式生成。
我的理解:
一個(gè)有三行文本的文件進(jìn)行MapReduce操作。
讀取第一行Hello World Bye World ,分割單詞形成Map。
<Hello,1>?<World,1>?<Bye,1>?<World,1>
讀取第二行Hello Hadoop Bye Hadoop?,分割單詞形成Map。
<Hello,1>?<Hadoop,1>?<Bye,1>?<Hadoop,1>
讀取第三行Bye Hadoop Hello Hadoop,分割單詞形成Map。
<Bye,1>?<Hadoop,1>?<Hello,1>?<Hadoop,1>
- Reduce操作是對(duì)map的結(jié)果進(jìn)行排序,合并,最后得出詞頻。
我的理解:
經(jīng)過(guò)進(jìn)一步處理(combiner),將形成的Map根據(jù)相同的key組合成value數(shù)組。
<Bye,1,1,1>?<Hadoop,1,1,1,1>?<Hello,1,1,1>?<World,1,1>
循環(huán)執(zhí)行Reduce(K,V[]),分別統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)。
<Bye,3>?<Hadoop,4>?<Hello,3>?<World,2>
2、WordCount源碼
package org.apache.hadoop.examples;import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /*** * 描述:WordCount explains by York* @author Hadoop Dev Group*/ publicclass WordCount {/*** 建立Mapper類TokenizerMapper繼承自泛型類Mapper* Mapper類:實(shí)現(xiàn)了Map功能基類* Mapper接口:* WritableComparable接口:實(shí)現(xiàn)WritableComparable的類可以相互比較。所有被用作key的類應(yīng)該實(shí)現(xiàn)此接口。* Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用。 * */publicstaticclass TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{/*** IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類,這些類實(shí)現(xiàn)了WritableComparable接口,* 都能夠被串行化從而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,你可以將它們分別視為int,String 的替代品。* 聲明one常量和word用于存放單詞的變量*/privatefinalstatic IntWritable one =new IntWritable(1);private Text word =new Text();/*** Mapper中的map方法:* void map(K1 key, V1 value, Context context)* 映射一個(gè)單個(gè)的輸入k/v對(duì)到一個(gè)中間的k/v對(duì)* 輸出對(duì)不需要和輸入對(duì)是相同的類型,輸入對(duì)可以映射到0個(gè)或多個(gè)輸出對(duì)。* Context:收集Mapper輸出的<k,v>對(duì)。* Context的write(k, v)方法:增加一個(gè)(k,v)對(duì)到context* 程序員主要編寫Map和Reduce函數(shù).這個(gè)Map函數(shù)使用StringTokenizer函數(shù)對(duì)字符串進(jìn)行分隔,通過(guò)write方法把單詞存入word中* write方法存入(單詞,1)這樣的二元組到context中*/ publicvoid map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr =new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}publicstaticclass IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result =new IntWritable();/*** Reducer類中的reduce方法:* void reduce(Text key, Iterable<IntWritable> values, Context context)* 中k/v來(lái)自于map函數(shù)中的context,可能經(jīng)過(guò)了進(jìn)一步處理(combiner),同樣通過(guò)context輸出 */publicvoid reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum =0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}publicstaticvoid main(String[] args) throws Exception {/*** Configuration:map/reduce的j配置類,向hadoop框架描述map-reduce執(zhí)行的工作*/Configuration conf =new Configuration();String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length !=2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job =new Job(conf, "word count"); //設(shè)置一個(gè)用戶定義的job名稱job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class); //為job設(shè)置Mapper類job.setCombinerClass(IntSumReducer.class); //為job設(shè)置Combiner類job.setReducerClass(IntSumReducer.class); //為job設(shè)置Reducer類job.setOutputKeyClass(Text.class); //為job的輸出數(shù)據(jù)設(shè)置Key類job.setOutputValueClass(IntWritable.class); //為job輸出設(shè)置value類FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為job設(shè)置輸入路徑FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為job設(shè)置輸出路徑System.exit(job.waitForCompletion(true) ?0 : 1); //運(yùn)行job } }
3、WordCount逐行解析
- 對(duì)于map函數(shù)的方法。
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…} 這里有三個(gè)參數(shù),前面兩個(gè)Object key, Text value就是輸入的key和value,第三個(gè)參數(shù)Context context這是可以記錄輸入的key和value,例如:context.write(word, one);此外context還會(huì)記錄map運(yùn)算的狀態(tài)。
- 對(duì)于reduce函數(shù)的方法。
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…} reduce函數(shù)的輸入也是一個(gè)key/value的形式,不過(guò)它的value是一個(gè)迭代器的形式Iterable<IntWritable> values,也就是說(shuō)reduce的輸入是一個(gè)key對(duì)應(yīng)一組的值的value,reduce也有context和map的context作用一致。
至于計(jì)算的邏輯則需要程序員編碼實(shí)現(xiàn)。
- 對(duì)于main函數(shù)的調(diào)用。
首先是:
Configuration conf = new Configuration();
運(yùn)行MapReduce程序前都要初始化Configuration,該類主要是讀取MapReduce系統(tǒng)配置信息,這些信息包括hdfs還有MapReduce,也就是安裝hadoop時(shí)候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解為啥要這么做,這個(gè)是沒(méi)有深入思考MapReduce計(jì)算框架造成,我們程序員開(kāi)發(fā)MapReduce時(shí)候只是在填空,在map函數(shù)和reduce函數(shù)里編寫實(shí)際進(jìn)行的業(yè)務(wù)邏輯,其它的工作都是交給MapReduce框架自己操作的,但是至少我們要告訴它怎么操作啊,比如hdfs在哪里,MapReduce的jobstracker在哪里,而這些信息就在conf包下的配置文件里。
接下來(lái)的代碼是:
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);} If的語(yǔ)句好理解,就是運(yùn)行WordCount程序時(shí)候一定是兩個(gè)參數(shù),如果不是就會(huì)報(bào)錯(cuò)退出。至于第一句里的GenericOptionsParser類,它是用來(lái)解釋常用hadoop命令,并根據(jù)需要為Configuration對(duì)象設(shè)置相應(yīng)的值,其實(shí)平時(shí)開(kāi)發(fā)里我們不太常用它,而是讓類實(shí)現(xiàn)Tool接口,然后再main函數(shù)里使用ToolRunner運(yùn)行程序,而ToolRunner內(nèi)部會(huì)調(diào)用GenericOptionsParser。
接下來(lái)的代碼是:
Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);
第一行就是在構(gòu)建一個(gè)job,在mapreduce框架里一個(gè)mapreduce任務(wù)也叫mapreduce作業(yè)也叫做一個(gè)mapreduce的job,而具體的map和reduce運(yùn)算就是task了,這里我們構(gòu)建一個(gè)job,構(gòu)建時(shí)候有兩個(gè)參數(shù),一個(gè)是conf這個(gè)就不累述了,一個(gè)是這個(gè)job的名稱。
第二行就是裝載程序員編寫好的計(jì)算程序,例如我們的程序類名就是WordCount了。這里我要做下糾正,雖然我們編寫mapreduce程序只需要實(shí)現(xiàn)map函數(shù)和reduce函數(shù),但是實(shí)際開(kāi)發(fā)我們要實(shí)現(xiàn)三個(gè)類,第三個(gè)類是為了配置mapreduce如何運(yùn)行map和reduce函數(shù),準(zhǔn)確的說(shuō)就是構(gòu)建一個(gè)mapreduce能執(zhí)行的job了,例如WordCount類。
第三行和第五行就是裝載map函數(shù)和reduce函數(shù)實(shí)現(xiàn)類了,這里多了個(gè)第四行,這個(gè)是裝載Combiner類,這個(gè)類和mapreduce運(yùn)行機(jī)制有關(guān),其實(shí)本例去掉第四行也沒(méi)有關(guān)系,但是使用了第四行理論上運(yùn)行效率會(huì)更好。
接下來(lái)的代碼:
job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
這個(gè)是定義輸出的key/value的類型,也就是最終存儲(chǔ)在hdfs上結(jié)果文件的key/value的類型。
最后的代碼是:
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);
第一行就是構(gòu)建輸入的數(shù)據(jù)文件,第二行是構(gòu)建輸出的數(shù)據(jù)文件,最后一行如果job運(yùn)行成功了,我們的程序就會(huì)正常退出。
轉(zhuǎn)載于:https://www.cnblogs.com/hehaiyang/p/4484442.html
總結(jié)
以上是生活随笔為你收集整理的初学Hadoop之图解MapReduce与WordCount示例分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 焊工证多少钱啊?
- 下一篇: Singleton 单例模板