日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce多个job同时使用的方式(从网上找到的案例,原始博文:http://www.cnblogs.com/yjmyzz/p/4540469.html)

發(fā)布時(shí)間:2024/9/27 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce多个job同时使用的方式(从网上找到的案例,原始博文:http://www.cnblogs.com/yjmyzz/p/4540469.html) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

復(fù)雜的MapReduce處理中,往往需要將復(fù)雜的處理過程,分解成多個(gè)簡(jiǎn)單的Job來(lái)執(zhí)行,第1個(gè)Job的輸出做為第2個(gè)Job的輸入,相互之間有一定依賴關(guān)系。以上一篇中的求平均數(shù)為例,可以分解成三個(gè)步驟:

1. 求Sum

2. 求Count

3. 計(jì)算平均數(shù)

每1個(gè)步驟看成一個(gè)Job,其中Job3必須等待Job1、Job2完成,并將Job1、Job2的輸出結(jié)果做為輸入,下面的代碼演示了如何將這3個(gè)Job串起來(lái)

代碼:

package cn.toto.bigdata.mr.wc;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Avg2 {private static final Text TEXT_SUM = new Text("SUM");private static final Text TEXT_COUNT = new Text("COUNT");private static final Text TEXT_AVG = new Text("AVG");public static class SumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {public long sum = 0;@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {sum += value.toString().length();}@Overrideprotected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws IOException, InterruptedException {context.write(TEXT_SUM, new LongWritable(sum));}}public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {public long sum = 0;@Overrideprotected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {for (LongWritable v : values) {sum += v.get();}context.write(TEXT_SUM, new LongWritable(sum));}}//計(jì)算Countpublic static class CountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {public long count = 0;@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws IOException, InterruptedException {count += 1;}@Overrideprotected void cleanup(Context context)throws IOException, InterruptedException {context.write(TEXT_COUNT, new LongWritable(count));}}public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {public long count = 0;@Overridepublic void reduce(Text key, Iterable<LongWritable> values, Context context)throws IOException, InterruptedException {for (LongWritable v : values) {count += v.get();}context.write(TEXT_COUNT, new LongWritable(count));}}//計(jì)算Avg public static class AvgMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {public long count = 0;public long sum = 0;@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] v = value.toString().split("\t");if (v[0].equals("COUNT")) {count = Long.parseLong(v[1]);} else if (v[0].equals("SUM")) {sum = Long.parseLong(v[1]);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {context.write(new LongWritable(sum), new LongWritable(count));}}public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> {public long sum = 0;public long count = 0;@Overrideprotected void reduce(LongWritable key, Iterable<LongWritable> values,Context context)throws IOException, InterruptedException {sum += key.get();for(LongWritable v : values) {count += v.get();}}@Overrideprotected void cleanup(Reducer<LongWritable, LongWritable, Text, DoubleWritable>.Context context)throws IOException, InterruptedException {context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String inputPath = "E:/wordcount/input/a.txt";String maxOutputPath = "E:/wordcount/output/max/";String countOutputPath = "E:/wordcount/output/count/";String avgOutputPath = "E:/wordcount/output/avg/";Job job1 = Job.getInstance(conf, "Sum");job1.setJarByClass(Avg2.class);job1.setMapperClass(SumMapper.class);job1.setCombinerClass(SumReducer.class);job1.setReducerClass(SumReducer.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(job1, new Path(inputPath));FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));Job job2 = Job.getInstance(conf, "Count");job2.setJarByClass(Avg2.class);job2.setMapperClass(CountMapper.class);job2.setCombinerClass(CountReducer.class);job2.setReducerClass(CountReducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(job2, new Path(inputPath));FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));Job job3 = Job.getInstance(conf, "Average");job3.setJarByClass(Avg2.class);job3.setMapperClass(AvgMapper.class);job3.setReducerClass(AvgReducer.class);job3.setMapOutputKeyClass(LongWritable.class);job3.setMapOutputValueClass(LongWritable.class);job3.setOutputKeyClass(Text.class);job3.setOutputValueClass(DoubleWritable.class);//將job1及job2的輸出為做job3的輸入FileInputFormat.addInputPath(job3, new Path(maxOutputPath));FileInputFormat.addInputPath(job3, new Path(countOutputPath));FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));//提交job1及job2,并等待完成if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {System.exit(job3.waitForCompletion(true) ? 0 : 1);}}} 運(yùn)行準(zhǔn)備:

準(zhǔn)備數(shù)據(jù)文件:

E:/wordcount/input/a.txt

數(shù)據(jù)文件的內(nèi)容如下:


運(yùn)行后:E:\wordcount\output\count\part-r-00000的值如下:


運(yùn)行后:

E:\wordcount\output\max\part-r-00000的內(nèi)容如下:


最終的平均值是:E:\wordcount\output\avg\part-r-00000



總結(jié)

以上是生活随笔為你收集整理的MapReduce多个job同时使用的方式(从网上找到的案例,原始博文:http://www.cnblogs.com/yjmyzz/p/4540469.html)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。