日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop: MapReduce2的几个基本示例

發(fā)布時間:2025/3/15 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop: MapReduce2的几个基本示例 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1) WordCount?

這個就不多說了,滿大街都是,網(wǎng)上有幾篇對WordCount的詳細分析

http://www.sxt.cn/u/235/blog/5809

http://www.cnblogs.com/zhanghuijunjava/archive/2013/04/27/3036549.html

這二篇都寫得不錯, 特別幾張圖畫得很清晰

?

2) 去重處理(Distinct)

類似于db中的select distinct(x) from table , 去重處理甚至比WordCount還要簡單,假如我們要對以下文件的內(nèi)容做去重處理(注:該文件也是后面幾個示例的輸入?yún)?shù))

2 8 8 3 2 3 5 3 0 2 7

基本上啥也不用做,在map階段,把每一行的值當成key分發(fā)下去,然后在reduce階段回收上來就可以了.

注:里面用到了一個自己寫的類HDFSUtil,可以在?hadoop: hdfs API示例?一文中找到.

原理:map階段完成后,在reduce開始之前,會有一個combine的過程,相同的key值會自動合并,所以自然而然的就去掉了重復(fù).

1 package yjmyzz.mr; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.NullWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 import org.apache.hadoop.util.GenericOptionsParser; 13 14 import yjmyzz.util.HDFSUtil; 15 16 import java.io.IOException; 17 18 19 public class RemoveDup { 20 21 public static class RemoveDupMapper 22 extends Mapper<Object, Text, Text, NullWritable> { 23 24 public void map(Object key, Text value, Context context) 25 throws IOException, InterruptedException { 26 context.write(value, NullWritable.get()); 27 //System.out.println("map: key=" + key + ",value=" + value); 28 } 29 30 } 31 32 public static class RemoveDupReducer extends Reducer<Text, NullWritable, Text, NullWritable> { 33 public void reduce(Text key, Iterable<NullWritable> values, Context context) 34 throws IOException, InterruptedException { 35 context.write(key, NullWritable.get()); 36 //System.out.println("reduce: key=" + key); 37 } 38 } 39 40 public static void main(String[] args) throws Exception { 41 Configuration conf = new Configuration(); 42 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 43 if (otherArgs.length < 2) { 44 System.err.println("Usage: RemoveDup <in> [<in>...] <out>"); 45 System.exit(2); 46 } 47 48 //刪除輸出目錄(可選,省得多次運行時,總是報OUTPUT目錄已存在) 49 HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]); 50 51 Job job = Job.getInstance(conf, "RemoveDup"); 52 job.setJarByClass(RemoveDup.class); 53 job.setMapperClass(RemoveDupMapper.class); 54 job.setCombinerClass(RemoveDupReducer.class); 55 job.setReducerClass(RemoveDupReducer.class); 56 job.setOutputKeyClass(Text.class); 57 job.setOutputValueClass(NullWritable.class); 58 59 60 for (int i = 0; i < otherArgs.length - 1; ++i) { 61 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 62 } 63 FileOutputFormat.setOutputPath(job, 64 new Path(otherArgs[otherArgs.length - 1])); 65 System.exit(job.waitForCompletion(true) ? 0 : 1); 66 } 67 68 69 } View Code

輸出:

0 2 3 5 7 8

?

3) 記錄計數(shù)(Count)

這個跟WordCount略有不同,類似于Select Count(*) from tables的效果,代碼也超級簡單,直接拿WordCount改一改就行了

1 package yjmyzz.mr; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 import org.apache.hadoop.util.GenericOptionsParser; 13 import yjmyzz.util.HDFSUtil; 14 15 import java.io.IOException; 16 import java.util.StringTokenizer; 17 18 19 public class RowCount { 20 21 public static class RowCountMapper 22 extends Mapper<Object, Text, Text, IntWritable> { 23 24 private final static IntWritable one = new IntWritable(1); 25 private final static Text countKey = new Text("count"); 26 27 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 28 context.write(countKey, one); 29 } 30 } 31 32 public static class RowCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 33 private IntWritable result = new IntWritable(); 34 35 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 36 int sum = 0; 37 for (IntWritable val : values) { 38 sum += val.get(); 39 } 40 result.set(sum); 41 context.write(key, result); 42 } 43 } 44 45 public static void main(String[] args) throws Exception { 46 Configuration conf = new Configuration(); 47 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 48 if (otherArgs.length < 2) { 49 System.err.println("Usage: RowCount <in> [<in>...] <out>"); 50 System.exit(2); 51 } 52 //刪除輸出目錄(可選) 53 HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]); 54 55 Job job = Job.getInstance(conf, "word count"); 56 job.setJarByClass(RowCount.class); 57 job.setMapperClass(RowCountMapper.class); 58 job.setCombinerClass(RowCountReducer.class); 59 job.setReducerClass(RowCountReducer.class); 60 job.setOutputKeyClass(Text.class); 61 job.setOutputValueClass(IntWritable.class); 62 for (int i = 0; i < otherArgs.length - 1; ++i) { 63 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 64 } 65 FileOutputFormat.setOutputPath(job, 66 new Path(otherArgs[otherArgs.length - 1])); 67 System.exit(job.waitForCompletion(true) ? 0 : 1); 68 } 69 70 71 } View Code

輸出: count 11

注:如果只想輸出一個數(shù)字,不需要"count"這個key,可以改進一下:

1 package yjmyzz.mr; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.NullWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.util.GenericOptionsParser; 14 import yjmyzz.util.HDFSUtil; 15 16 import java.io.IOException; 17 18 19 public class RowCount2 { 20 21 public static class RowCount2Mapper 22 extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 23 24 public long count = 0; 25 26 public void map(LongWritable key, Text value, Context context) 27 throws IOException, InterruptedException { 28 count += 1; 29 } 30 31 protected void cleanup(Context context) throws IOException, InterruptedException { 32 context.write(new LongWritable(count), NullWritable.get()); 33 } 34 35 } 36 37 public static class RowCount2Reducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 38 39 public long count = 0; 40 41 public void reduce(LongWritable key, Iterable<NullWritable> values, Context context) 42 throws IOException, InterruptedException { 43 count += key.get(); 44 } 45 46 47 protected void cleanup(Context context) throws IOException, InterruptedException { 48 context.write(new LongWritable(count), NullWritable.get()); 49 } 50 51 } 52 53 public static void main(String[] args) throws Exception { 54 Configuration conf = new Configuration(); 55 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 56 if (otherArgs.length < 2) { 57 System.err.println("Usage: FindMax <in> [<in>...] <out>"); 58 System.exit(2); 59 } 60 61 //刪除輸出目錄(可選,省得多次運行時,總是報OUTPUT目錄已存在) 62 HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]); 63 64 Job job = Job.getInstance(conf, "RowCount2"); 65 job.setJarByClass(RowCount2.class); 66 job.setMapperClass(RowCount2Mapper.class); 67 job.setCombinerClass(RowCount2Reducer.class); 68 job.setReducerClass(RowCount2Reducer.class); 69 job.setOutputKeyClass(LongWritable.class); 70 job.setOutputValueClass(NullWritable.class); 71 72 for (int i = 0; i < otherArgs.length - 1; ++i) { 73 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 74 } 75 FileOutputFormat.setOutputPath(job, 76 new Path(otherArgs[otherArgs.length - 1])); 77 System.exit(job.waitForCompletion(true) ? 0 : 1); 78 } 79 80 81 } View Code

這樣輸出結(jié)果就只有一個數(shù)字11了.

注意: 這里context.write(xxx)只能寫在cleanup方法中, 該方法在Mapper和Reducer接口中都有, 在map方法及reduce方法執(zhí)行完后,會觸發(fā)cleanup方法. 大家可以嘗試下,把context.write(xxx)寫在map和reduce方法中試試看,結(jié)果會出現(xiàn)多行記錄,而不是預(yù)期的僅1個數(shù)字.

?

4)求最大值(Max)

1 package yjmyzz.mr; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.NullWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.util.GenericOptionsParser; 14 import yjmyzz.util.HDFSUtil; 15 16 import java.io.IOException; 17 18 19 public class Max { 20 21 public static class MaxMapper 22 extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 23 24 public long max = Long.MIN_VALUE; 25 26 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 27 max = Math.max(Long.parseLong(value.toString()), max); 28 } 29 30 protected void cleanup(Mapper.Context context) throws IOException, InterruptedException { 31 context.write(new LongWritable(max), NullWritable.get()); 32 } 33 34 } 35 36 public static class MaxReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 37 38 public long max = Long.MIN_VALUE; 39 40 public void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 41 42 max = Math.max(max, key.get()); 43 44 } 45 46 47 protected void cleanup(Reducer.Context context) throws IOException, InterruptedException { 48 context.write(new LongWritable(max), NullWritable.get()); 49 } 50 51 } 52 53 public static void main(String[] args) throws Exception { 54 Configuration conf = new Configuration(); 55 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 56 if (otherArgs.length < 2) { 57 System.err.println("Usage: Max <in> [<in>...] <out>"); 58 System.exit(2); 59 } 60 61 //刪除輸出目錄(可選,省得多次運行時,總是報OUTPUT目錄已存在) 62 HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]); 63 64 Job job = Job.getInstance(conf, "Max"); 65 job.setJarByClass(Max.class); 66 job.setMapperClass(MaxMapper.class); 67 job.setCombinerClass(MaxReducer.class); 68 job.setReducerClass(MaxReducer.class); 69 job.setOutputKeyClass(LongWritable.class); 70 job.setOutputValueClass(NullWritable.class); 71 72 for (int i = 0; i < otherArgs.length - 1; ++i) { 73 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 74 } 75 FileOutputFormat.setOutputPath(job, 76 new Path(otherArgs[otherArgs.length - 1])); 77 System.exit(job.waitForCompletion(true) ? 0 : 1); 78 } 79 80 81 } View Code

輸出結(jié)果:8

如果看懂了剛才的Count2版本的代碼,這個自然不用多解釋.

?

5)求和(Sum)

1 package yjmyzz.mr; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.NullWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.util.GenericOptionsParser; 14 import yjmyzz.util.HDFSUtil; 15 16 import java.io.IOException; 17 18 19 public class Sum { 20 21 public static class SumMapper 22 extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 23 24 public long sum = 0; 25 26 public void map(LongWritable key, Text value, Context context) 27 throws IOException, InterruptedException { 28 sum += Long.parseLong(value.toString()); 29 } 30 31 protected void cleanup(Context context) throws IOException, InterruptedException { 32 context.write(new LongWritable(sum), NullWritable.get()); 33 } 34 35 } 36 37 public static class SumReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 38 39 public long sum = 0; 40 41 public void reduce(LongWritable key, Iterable<NullWritable> values, Context context) 42 throws IOException, InterruptedException { 43 sum += key.get(); 44 } 45 46 47 protected void cleanup(Context context) throws IOException, InterruptedException { 48 context.write(new LongWritable(sum), NullWritable.get()); 49 } 50 51 } 52 53 public static void main(String[] args) throws Exception { 54 Configuration conf = new Configuration(); 55 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 56 if (otherArgs.length < 2) { 57 System.err.println("Usage: Sum <in> [<in>...] <out>"); 58 System.exit(2); 59 } 60 61 //刪除輸出目錄(可選,省得多次運行時,總是報OUTPUT目錄已存在) 62 HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]); 63 64 Job job = Job.getInstance(conf, "Sum"); 65 job.setJarByClass(Sum.class); 66 job.setMapperClass(SumMapper.class); 67 job.setCombinerClass(SumReducer.class); 68 job.setReducerClass(SumReducer.class); 69 job.setOutputKeyClass(LongWritable.class); 70 job.setOutputValueClass(NullWritable.class); 71 72 for (int i = 0; i < otherArgs.length - 1; ++i) { 73 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 74 } 75 FileOutputFormat.setOutputPath(job, 76 new Path(otherArgs[otherArgs.length - 1])); 77 System.exit(job.waitForCompletion(true) ? 0 : 1); 78 } 79 80 81 } View Code

輸出結(jié)果:43

Sum與剛才的Max原理如出一轍,不多解釋了,依舊利用了cleanup方法

?

6)求平均值(Avg)

1 package yjmyzz.mr; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.*; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.Mapper; 8 import org.apache.hadoop.mapreduce.Reducer; 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 10 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 13 import org.apache.hadoop.util.GenericOptionsParser; 14 import yjmyzz.util.HDFSUtil; 15 16 import java.io.IOException; 17 18 19 public class Average { 20 21 public static class AvgMapper 22 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 23 24 public long sum = 0; 25 public long count = 0; 26 27 public void map(LongWritable key, Text value, Context context) 28 throws IOException, InterruptedException { 29 sum += Long.parseLong(value.toString()); 30 count += 1; 31 } 32 33 protected void cleanup(Context context) throws IOException, InterruptedException { 34 context.write(new LongWritable(sum), new LongWritable(count)); 35 } 36 37 } 38 39 public static class AvgCombiner extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> { 40 41 public long sum = 0; 42 public long count = 0; 43 44 public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) 45 throws IOException, InterruptedException { 46 sum += key.get(); 47 for (LongWritable v : values) { 48 count += v.get(); 49 } 50 } 51 52 protected void cleanup(Context context) throws IOException, InterruptedException { 53 context.write(new LongWritable(sum), new LongWritable(count)); 54 } 55 56 } 57 58 public static class AvgReducer extends Reducer<LongWritable, LongWritable, DoubleWritable, NullWritable> { 59 60 public long sum = 0; 61 public long count = 0; 62 63 public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) 64 throws IOException, InterruptedException { 65 sum += key.get(); 66 for (LongWritable v : values) { 67 count += v.get(); 68 } 69 } 70 71 72 protected void cleanup(Context context) throws IOException, InterruptedException { 73 context.write(new DoubleWritable(new Double(sum)/count), NullWritable.get()); 74 } 75 76 } 77 78 public static void main(String[] args) throws Exception { 79 Configuration conf = new Configuration(); 80 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 81 if (otherArgs.length < 2) { 82 System.err.println("Usage: Avg <in> [<in>...] <out>"); 83 System.exit(2); 84 } 85 86 //刪除輸出目錄(可選,省得多次運行時,總是報OUTPUT目錄已存在) 87 HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]); 88 89 Job job = Job.getInstance(conf, "Avg"); 90 job.setJarByClass(Average.class); 91 job.setMapperClass(AvgMapper.class); 92 job.setCombinerClass(AvgCombiner.class); 93 job.setReducerClass(AvgReducer.class); 94 95 //注意這里:由于Mapper與Reducer的輸出Key,Value類型不同,所以要單獨為Mapper設(shè)置類型 96 job.setMapOutputKeyClass(LongWritable.class); 97 job.setMapOutputValueClass(LongWritable.class); 98 99 100 job.setOutputKeyClass(DoubleWritable.class); 101 job.setOutputValueClass(NullWritable.class); 102 103 for (int i = 0; i < otherArgs.length - 1; ++i) { 104 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 105 } 106 FileOutputFormat.setOutputPath(job, 107 new Path(otherArgs[otherArgs.length - 1])); 108 System.exit(job.waitForCompletion(true) ? 0 : 1); 109 } 110 111 112 } View Code

輸出:3.909090909090909

這個稍微要復(fù)雜一點,平均值大家都知道=Sum/Count,所以這其實前面Count與Max的綜合運用而已,思路是在輸出的key-value中,用max做key,用count做value,最終形成{sum,count}的輸出,然后在最后的cleanup中,sum/count即得avg,但是有一個特點要注意的地方,由于Mapper與Reducer的output {key,value}類型并不一致,所以96-101行這里,分別設(shè)置了Map及Reduce的key,value輸出類型,如果沒有96-97這二行,100-101這二行會默認把Mapper,Combiner,Reducer這三者的輸出類型設(shè)置成相同的類型.

?

7) 改進型的WordCount(按詞頻倒排)

官網(wǎng)示例WordCount只統(tǒng)計出單詞出現(xiàn)的次數(shù),并未按詞頻做倒排,下面的代碼示例實現(xiàn)了該功能

1 package yjmyzz.mr; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.util.GenericOptionsParser; 15 import yjmyzz.util.HDFSUtil; 16 17 import java.io.IOException; 18 import java.util.Comparator; 19 import java.util.StringTokenizer; 20 import java.util.TreeMap; 21 22 23 public class WordCount2 { 24 25 public static class TokenizerMapper 26 extends Mapper<Object, Text, Text, IntWritable> { 27 28 private final static IntWritable one = new IntWritable(1); 29 private Text word = new Text(); 30 31 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 32 StringTokenizer itr = new StringTokenizer(value.toString()); 33 while (itr.hasMoreTokens()) { 34 word.set(itr.nextToken()); 35 context.write(word, one); 36 } 37 } 38 } 39 40 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 41 42 //定義treeMap來保持統(tǒng)計結(jié)果,由于treeMap是按key升序排列的,這里要人為指定Comparator以實現(xiàn)倒排 43 private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() { 44 @Override 45 public int compare(Integer x, Integer y) { 46 return y.compareTo(x); 47 } 48 }); 49 50 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 51 //reduce后的結(jié)果放入treeMap,而不是向context中記入結(jié)果 52 int sum = 0; 53 for (IntWritable val : values) { 54 sum += val.get(); 55 } 56 if (treeMap.containsKey(sum)){ 57 String value = treeMap.get(sum) + "," + key.toString(); 58 treeMap.put(sum,value); 59 } 60 else { 61 treeMap.put(sum, key.toString()); 62 } 63 } 64 65 protected void cleanup(Context context) throws IOException, InterruptedException { 66 //將treeMap中的結(jié)果,按value-key順序?qū)懭隿ontex中 67 for (Integer key : treeMap.keySet()) { 68 context.write(new Text(treeMap.get(key)), new IntWritable(key)); 69 } 70 } 71 } 72 73 public static void main(String[] args) throws Exception { 74 Configuration conf = new Configuration(); 75 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 76 if (otherArgs.length < 2) { 77 System.err.println("Usage: wordcount2 <in> [<in>...] <out>"); 78 System.exit(2); 79 } 80 //刪除輸出目錄 81 HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]); 82 Job job = Job.getInstance(conf, "word count2"); 83 job.setJarByClass(WordCount2.class); 84 job.setMapperClass(TokenizerMapper.class); 85 job.setCombinerClass(IntSumReducer.class); 86 job.setReducerClass(IntSumReducer.class); 87 job.setOutputKeyClass(Text.class); 88 job.setOutputValueClass(IntWritable.class); 89 for (int i = 0; i < otherArgs.length - 1; ++i) { 90 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 91 } 92 FileOutputFormat.setOutputPath(job, 93 new Path(otherArgs[otherArgs.length - 1])); 94 System.exit(job.waitForCompletion(true) ? 0 : 1); 95 } 96 97 98 } View Code

原理: 依然用到了cleanup,此外為了實現(xiàn)排序,采用了TreeMap這種內(nèi)置了key排序的數(shù)據(jù)結(jié)構(gòu).

這里為了展示更直觀,選用了電影<超能陸戰(zhàn)隊>主題曲的第一段歌詞做為輸入:

They say we are what we are But we do not have to be I am bad behavior but I do it in the best way I will be the watcher Of the eternal flame I will be the guard dog of all your fever dreams

原版的WordCount處理完后,結(jié)果如下:

But 1 I 4 Of 1 They 1 all 1 am 1 are 2 bad 1 be 3 behavior 1 best 1 but 1 do 2 dog 1 dreams 1 eternal 1 fever 1 flame 1 guard 1 have 1 in 1 it 1 not 1 of 1 say 1 the 4 to 1 watcher 1 way 1 we 3 what 1 will 2 your 1

改進后的WordCount2處理結(jié)果如下:

I,the 4 be,we 3 are,do,will 2 But,Of,They,all,am,bad,behavior,best,but,dog,dreams,eternal,fever,flame,guard,have,in,it,not,of,say,to,watcher,way,what,your 1

?

轉(zhuǎn)載于:https://www.cnblogs.com/yjmyzz/p/hadoop-mapreduce-2-sample.html

總結(jié)

以上是生活随笔為你收集整理的Hadoop: MapReduce2的几个基本示例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。