日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop入门(十五)Mapreduce的数据排序程序

發(fā)布時(shí)間:2023/12/3 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop入门(十五)Mapreduce的数据排序程序 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

"數(shù)據(jù)排序"是許多實(shí)際任務(wù)執(zhí)行時(shí)要完成的第一項(xiàng)工作,比如學(xué)生成績(jī)?cè)u(píng)比、數(shù)據(jù)建立索引等。這個(gè)實(shí)例和數(shù)據(jù)去重類似,都是先對(duì)原始數(shù)據(jù)進(jìn)行初步處理,為進(jìn)一步的數(shù)據(jù)操作打好基礎(chǔ)

1 實(shí)例描述

對(duì)輸入文件中數(shù)據(jù)進(jìn)行排序。輸入文件中的每行內(nèi)容均為一個(gè)數(shù)字,即一個(gè)數(shù)據(jù)。要求在輸出中每行有兩個(gè)間隔的數(shù)字,其中,第一個(gè)代表原始數(shù)據(jù)在原始數(shù)據(jù)集中的位次,第二個(gè)代表原始數(shù)據(jù)。
樣例輸入如下所示:?
1)file1 ?

2 32 654 32 15 756 65223

2)file2 ?

5956 22 650 92

3)file3

26 54 6

期望輸出:

1??? 2 2??? 6 3??? 15 4??? 22 5??? 26 6??? 32 7??? 32 8??? 54 9??? 92 10??? 650 11??? 654 12??? 756 13??? 5956 14??? 65223

?

2 問題分析

這個(gè)實(shí)例僅僅要求對(duì)輸入數(shù)據(jù)進(jìn)行排序

分析:
? ?MapReduce過程中就有排序,它的默認(rèn)排序規(guī)則按照key值進(jìn)行排序的,如果key為封裝int的IntWritable類型,那么MapReduce按照數(shù)字大小對(duì)key排序,如果key為封裝為String的Text類型,那么MapReduce按照字典順序?qū)ψ址判颉?br />  
使用封裝int的IntWritable型數(shù)據(jù)結(jié)構(gòu)了。也就是在map中將讀入的數(shù)據(jù)轉(zhuǎn)化成IntWritable型,然后作為key值輸出(value任意)。reduce拿到<key,value-list>之后,將輸入的key作為value輸出,并根據(jù)value-list中元素的個(gè)數(shù)決定輸出的次數(shù)。輸出的key(即代碼中的linenum)是一個(gè)全局變量,它統(tǒng)計(jì)當(dāng)前key的位次。需要注意的是這個(gè)程序中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因?yàn)槭褂胢ap和reduce就已經(jīng)能夠完成任務(wù)了。

?

3.實(shí)現(xiàn)步驟

  • 在map中將讀入的數(shù)據(jù)轉(zhuǎn)化成IntWritable型,然后作為key值輸出(value任意)。?
  • reduce拿到<key,value-list>之后,將輸入的key作為value輸出,并根據(jù)value-list中元素的個(gè)數(shù)決定輸出的次數(shù)
  • 輸出的key是一個(gè)全局變量,它統(tǒng)計(jì)當(dāng)前key的位次
    ?
  • ?

    4.關(guān)鍵代碼

    正序:

    package com.mk.mapreduce;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI;public class Sort {public static class SortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {IntWritable v = new IntWritable(Integer.parseInt(value.toString().trim()));context.write(v, new IntWritable(1));}}public static class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {int count = 1;@Overrideprotected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable v: values) {context.write(new IntWritable(count ++), key);}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/sort/input";String output = "/sort/output";Configuration conf = new Configuration();if(System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform","true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path,true);Job job = new Job(conf,"Sort");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(Sort.class);job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" +ret);} }

    ?

    ?

    逆序:

    package com.mk.mapreduce;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI;public class Sort {public static class SortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {IntWritable v = new IntWritable(Integer.parseInt(value.toString().trim()));context.write(v, new IntWritable(1));}}public static class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {int count = 1;@Overrideprotected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable v: values) {context.write(new IntWritable(count ++), key);}}}public static class SortComparator implements RawComparator<IntWritable> {@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return IntWritable.Comparator.compareBytes(b2, s2, l2, b1, s1, l1);}@Overridepublic int compare(IntWritable o1, IntWritable o2) {return o2.get() - o1.get();}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/sort/input";String output = "/sort/output";Configuration conf = new Configuration();if(System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform","true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path,true);Job job = new Job(conf,"Sort");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(Sort.class);job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));job.setSortComparatorClass(SortComparator.class);boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" +ret);} }

    ?

    總結(jié)

    以上是生活随笔為你收集整理的Hadoop入门(十五)Mapreduce的数据排序程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。