日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop MapReduce

發布時間:2025/6/15 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop MapReduce 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  先看一段代碼:

  

package com.abc;import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class TestWorldCount {static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {private final static LongWritable one = new LongWritable(1);private Text word = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);while (tokenizer.hasMoreTokens()) {word.set(tokenizer.nextToken());context.write(word, one);}}}static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {Iterator<LongWritable> iter= values.iterator();Long sum = 0L;LongWritable res =new LongWritable();while(iter.hasNext()){sum +=iter.next().get();}res.set(sum);context.write(key, res);}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Job job = new Job(conf, "word count");job.setJarByClass(TestWorldCount.class);job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);String uri1 = "input0001";String uri3 = "output0001/wc";FileInputFormat.addInputPath(job, new Path(uri1));FileOutputFormat.setOutputPath(job, new Path(uri3));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

這個是最最簡單的WorldCount的例子,在設置完一系列參數后,通過Job類來等待程序運行結束。下面是運行的基本流程:

1.Job類初始化JobClient實例,JobClient中生成JobTracker的RPC實例,這樣可以保持與JobTracker的通訊,JobTracker的地址和端口等都是外部配置的,通過Configuration對象讀取并且傳入。

2.JobClient提交作業。

3.JobClient生成作業目錄。

4.從本地拷貝MapReduce的作業jar文件(一般是自己寫的程序代碼jar)。

5.如果DistributedCache中有需要的數據,從DistributedCache中拷貝這部分數據。

6.根據InputFormat實例,實現輸入數據的split,在作業目錄上生成job.split和job.splitmetainfo文件。

7.將配置文件寫入到作業目錄的job.xml文件中。

8.JobClient和JobTracker通訊,提交作業。

9.JobTracker將job加入到job隊列中。

10.JobTracker的TaskScheduler對job隊列進行調度。

11.TaskTracker通過心跳和JobTracker保持聯系,JobTracker收到后根據心跳帶來的數據,判斷是否可以分配給TaskTracker Task,TaskScheduler會對Task進行分配。

12.TaskTracker啟動TaskRunner實例,在TaskRunner中啟動單獨的JVM進行Mapper運行。

13.Map端會從HDFS中讀取輸入數據,執行之后Map輸出數據先是在內存當中,當達到閥值后,split到硬盤上面,在此過程中如果有combiner的話要進行combiner,當然sort是肯定要進行的。

14.Map結束了,Reduce開始運行,從Map端拷貝數據,稱為shuffle階段,之后執行reduce輸出結果數據,之后進行commit的操作。

15.TaskTracker在收到commit請求后和JobTracker進行通訊,JobTracker做最后收尾工作。

16.JobTracker返回結果給JobClient,運行結束。

附上一張基本流程圖:

?

Map端機制

對于map端的輸入,需要做如下的事情:

1.反射構造InputFormat.

2.反射構造InputSplit.

3.創建RecordReader.

4.反射創建MapperRunner(新api形式下是反射創建org.apache.hadoop.mapreduce.Mapper.Context).

?

?

對Map端輸出,需要做如下的事情:
1.如果有Partitioner的話,反射構造Partitioner。
2.將key/value/Partitioner數據寫入到內存當中。
3.當內存當中的數據達到一定閥值了,需要spill到硬盤上面,在spill前,需要進行排序,如果有combiner的話需要進行combiner。
4.sort的規則是先進行Partitioner的排序,然后再進行key的字典排序,默認的是快速排序。
5.當生成多個spill文件時,需要進行歸并,最終歸并成一個大文件

關于排序:
1.在內存中進行排序,整個數據的內存不會進行移動,只是再加上一層索引的數據,排序只要調整索引數據就可以了
2.多個spill文件歸并到一個大文件時,是一個歸并排序的過程,每一個spill文件都是按分區和key排序好的,所以歸并完的文件也是按分區和key排序好的。

?

在進行歸并的時候,也不是一次性的把所有的spill文件歸并成一個大文件,而是部分spill文件歸并成中間文件,然后中間文件和剩下的spill文件再進行一次歸并,依次類推,這個的考慮還是因為一次歸并文件太多的話IO消耗太大了,如下圖:

?

Reduce端機制

1。ReduceTask有一個線程和TaskTracker聯系,之后TaskTracker和JobTracker聯系,獲取MapTask完成事件
2. ReduceTask會創建和MapTask數目相等的拷貝線程,用于拷貝MapTask的輸出數據,MapTask的數據一般都是非本地的
3. 當有新的MapTask完成事件時,拷貝線程就從指定的機器上面拷貝數據,是通過http的形式進行拷貝
4. 當數據拷貝的時候,分兩種情況,當數據量小的時候就會寫入內存當中,當數據量大的時候就會寫入硬盤當中,這些工作分別由兩個線程完成
5. 因為所有的數據都來自不同的機器,所以有多個文件,這些文件需要歸并成一個文件,在拷貝文件的時候就會進行歸并動作
6. 拷貝和歸并過程統稱為shuffle過程

Reduce端輸出需要做如下的事情:
1.構造RecordWriter,這個是根據客戶端設置的OutputFormat中getRecordWriter()方法得到?
2.通過OutputFormat和RecordWriter將結果輸出到臨時文件中
3.Rudece進行commit過程,和TaskTracker進行通信,TaskTracker和JobTracker進行通信,然后JobTracker返回commit的指令,Reduce進行
commit,將臨時結果文件重命名成最終的文件
4.commit成功后,kill掉其他的TaskAttempt


總結

以上是生活随笔為你收集整理的Hadoop MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。