Hadoop MapReduce
先看一段代碼:
package com.abc;import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class TestWorldCount {static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {private final static LongWritable one = new LongWritable(1);private Text word = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);while (tokenizer.hasMoreTokens()) {word.set(tokenizer.nextToken());context.write(word, one);}}}static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {Iterator<LongWritable> iter= values.iterator();Long sum = 0L;LongWritable res =new LongWritable();while(iter.hasNext()){sum +=iter.next().get();}res.set(sum);context.write(key, res);}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Job job = new Job(conf, "word count");job.setJarByClass(TestWorldCount.class);job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);String uri1 = "input0001";String uri3 = "output0001/wc";FileInputFormat.addInputPath(job, new Path(uri1));FileOutputFormat.setOutputPath(job, new Path(uri3));System.exit(job.waitForCompletion(true) ? 0 : 1);}}
這個是最最簡單的WorldCount的例子,在設置完一系列參數后,通過Job類來等待程序運行結束。下面是運行的基本流程:
1.Job類初始化JobClient實例,JobClient中生成JobTracker的RPC實例,這樣可以保持與JobTracker的通訊,JobTracker的地址和端口等都是外部配置的,通過Configuration對象讀取并且傳入。
2.JobClient提交作業。
3.JobClient生成作業目錄。
4.從本地拷貝MapReduce的作業jar文件(一般是自己寫的程序代碼jar)。
5.如果DistributedCache中有需要的數據,從DistributedCache中拷貝這部分數據。
6.根據InputFormat實例,實現輸入數據的split,在作業目錄上生成job.split和job.splitmetainfo文件。
7.將配置文件寫入到作業目錄的job.xml文件中。
8.JobClient和JobTracker通訊,提交作業。
9.JobTracker將job加入到job隊列中。
10.JobTracker的TaskScheduler對job隊列進行調度。
11.TaskTracker通過心跳和JobTracker保持聯系,JobTracker收到后根據心跳帶來的數據,判斷是否可以分配給TaskTracker Task,TaskScheduler會對Task進行分配。
12.TaskTracker啟動TaskRunner實例,在TaskRunner中啟動單獨的JVM進行Mapper運行。
13.Map端會從HDFS中讀取輸入數據,執行之后Map輸出數據先是在內存當中,當達到閥值后,split到硬盤上面,在此過程中如果有combiner的話要進行combiner,當然sort是肯定要進行的。
14.Map結束了,Reduce開始運行,從Map端拷貝數據,稱為shuffle階段,之后執行reduce輸出結果數據,之后進行commit的操作。
15.TaskTracker在收到commit請求后和JobTracker進行通訊,JobTracker做最后收尾工作。
16.JobTracker返回結果給JobClient,運行結束。
附上一張基本流程圖:
?
Map端機制
對于map端的輸入,需要做如下的事情:
1.反射構造InputFormat.
2.反射構造InputSplit.
3.創建RecordReader.
4.反射創建MapperRunner(新api形式下是反射創建org.apache.hadoop.mapreduce.Mapper.Context).
?
?
對Map端輸出,需要做如下的事情:
1.如果有Partitioner的話,反射構造Partitioner。
2.將key/value/Partitioner數據寫入到內存當中。
3.當內存當中的數據達到一定閥值了,需要spill到硬盤上面,在spill前,需要進行排序,如果有combiner的話需要進行combiner。
4.sort的規則是先進行Partitioner的排序,然后再進行key的字典排序,默認的是快速排序。
5.當生成多個spill文件時,需要進行歸并,最終歸并成一個大文件
關于排序:
1.在內存中進行排序,整個數據的內存不會進行移動,只是再加上一層索引的數據,排序只要調整索引數據就可以了
2.多個spill文件歸并到一個大文件時,是一個歸并排序的過程,每一個spill文件都是按分區和key排序好的,所以歸并完的文件也是按分區和key排序好的。
?
在進行歸并的時候,也不是一次性的把所有的spill文件歸并成一個大文件,而是部分spill文件歸并成中間文件,然后中間文件和剩下的spill文件再進行一次歸并,依次類推,這個的考慮還是因為一次歸并文件太多的話IO消耗太大了,如下圖:
?
Reduce端機制
1。ReduceTask有一個線程和TaskTracker聯系,之后TaskTracker和JobTracker聯系,獲取MapTask完成事件
2. ReduceTask會創建和MapTask數目相等的拷貝線程,用于拷貝MapTask的輸出數據,MapTask的數據一般都是非本地的
3. 當有新的MapTask完成事件時,拷貝線程就從指定的機器上面拷貝數據,是通過http的形式進行拷貝
4. 當數據拷貝的時候,分兩種情況,當數據量小的時候就會寫入內存當中,當數據量大的時候就會寫入硬盤當中,這些工作分別由兩個線程完成
5. 因為所有的數據都來自不同的機器,所以有多個文件,這些文件需要歸并成一個文件,在拷貝文件的時候就會進行歸并動作
6. 拷貝和歸并過程統稱為shuffle過程
Reduce端輸出需要做如下的事情:
1.構造RecordWriter,這個是根據客戶端設置的OutputFormat中getRecordWriter()方法得到?
2.通過OutputFormat和RecordWriter將結果輸出到臨時文件中
3.Rudece進行commit過程,和TaskTracker進行通信,TaskTracker和JobTracker進行通信,然后JobTracker返回commit的指令,Reduce進行
commit,將臨時結果文件重命名成最終的文件
4.commit成功后,kill掉其他的TaskAttempt
總結
以上是生活随笔為你收集整理的Hadoop MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HDFS的运行原理
- 下一篇: c++ 大数类 大数模板