Hadoop入门(二十四)Mapreduce的求TopK程序
一、簡(jiǎn)介
求TopK是算法中最常使用到的,現(xiàn)在使用Mapreduce在海量數(shù)據(jù)中統(tǒng)計(jì)數(shù)據(jù)的求TopK。
?
二、例子
(1)實(shí)例描述
給出三個(gè)文件,每個(gè)文件中都存儲(chǔ)了若干個(gè)數(shù)值,求所有數(shù)值中的求Top 5。
樣例輸入: ???????????????????????????????????????????
1)file1: ?
2)file2: ?
3)file3: ?
?期望輸出:
?
(2)問題分析
實(shí)現(xiàn)統(tǒng)計(jì)海量數(shù)據(jù)的求TopK,不能將所有的數(shù)據(jù)加載到內(nèi)存,計(jì)算只能使用類似外部排序的方式,加載一部分?jǐn)?shù)據(jù)統(tǒng)計(jì)求TopK,接著加載另一部分進(jìn)行統(tǒng)計(jì)TopK。
(3)實(shí)現(xiàn)步驟
1)Map過程?
????首先使用默認(rèn)的TextInputFormat類對(duì)輸入文件進(jìn)行處理,得到文本中每行的偏移量及其內(nèi)容。顯然,Map過程首先必須分析輸入的<key,value>對(duì),得到數(shù)值,然后在mapper中統(tǒng)計(jì)單個(gè)分塊的求TopK。
2)Reduce過程?
????經(jīng)過map方法處理后,Reduce過程將獲取每個(gè)mapper的求TopK進(jìn)行統(tǒng)計(jì),分行統(tǒng)計(jì)出總的TopK。
?
(3)關(guān)鍵代碼
package com.mk.mapreduce;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.List;public class TopK {public static class TopKMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {private List<Integer> top5 = new ArrayList<>(5);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if (StringUtils.isBlank(value.toString())) {System.out.println("空白行");return;}Integer v = Integer.valueOf(value.toString().trim());if(top5.size()<5){top5.add(v);}else{Integer min = Collections.min(top5);if (min < v) {top5.remove(min);top5.add(v);}}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {for (Integer v : top5)context.write(new IntWritable(v), NullWritable.get());}}public static class TopKReducer extends Reducer< IntWritable, NullWritable,IntWritable, NullWritable> {private List<Integer> top5 = new ArrayList<>(5);@Overrideprotected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {Integer v = key.get();if(top5.size()<5){top5.add(v);}else{Integer min = Collections.min(top5);if (min < v) {top5.remove(min);top5.add(v);}}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {top5.sort((a,b)->b-a);for (Integer v : top5)context.write(new IntWritable(v), NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/topk/input";String output = "/topk/output";Configuration conf = new Configuration();if (System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform", "true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path, true);Job job = new Job(conf, "TopK");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(TopK.class);job.setMapperClass(TopKMapper.class);job.setReducerClass(TopKReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" + ret);} }?
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Hadoop入门(二十四)Mapreduce的求TopK程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 看视频卡怎么办 解决方法大全
- 下一篇: Hadoop生态hive(一)介绍