MapReduce 源码分析(一)准备阶段
生活随笔
收集整理的這篇文章主要介紹了
MapReduce 源码分析(一)准备阶段
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
MapReduce 源碼分析
本篇博客根據wordCount代碼進行分析底層源碼的。以下稱它為WC類。
package com.henu;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author George* @description** hello you**/ public class WC {public static class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable>{Text k1 = new Text();IntWritable v1 = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] strings = line.split("\\s+");for (String s : strings) {k1.set(s);context.write(k1,v1);}}}public static class WCReducer extends Reducer<Text, IntWritable,Text, IntWritable> {int count;IntWritable v2 = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {count = 0;for (IntWritable value : values) {count += value.get();}v2.set(count);context.write(key,v2);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WC.class);job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//map階段設置分區job.setPartitionerClass(MyPartitoner.class);job.setNumReduceTasks(1);FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));job.waitForCompletion(true);}private static class MyPartitoner extends Partitioner<Text,IntWritable> {@Overridepublic int getPartition(Text text, IntWritable intWritable, int i) {String kStr = text.toString();return kStr.equalsIgnoreCase("hello")?0:1;}} }在WC類的main方法中,點擊進入job的waitForCompletion方法。?
waitForCompletion()調用 submit()?
點擊進入submit
submit()調用 submitJobInternal()方法把作業提交到集群
點擊進入submitJobInternal方法中
點擊進入writeSplits方法,writeSplits()調用 writeNewSplits()
進入WriteNewSplits方法中
然后搜索進入FileInputFormat類中
之前都是提交前的準備,最終提交作業
總的來說,客戶端做了以下幾件事:
配置完善
檢查路徑
計算 split:maps
資源提交到 HDFS
提交任務
然后,AppMaster 根據 split 列表信息向 ResourceManager 申請資源,RS 創建 container,然
后 AppMaster 啟動 container,把 MapReducer 任務放進去。
圖示總結:
?
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的MapReduce 源码分析(一)准备阶段的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 大剑无锋之hadoop默认的数据类型都有
- 下一篇: 利剑无意之Dubbo 面试题