MapReduce程序之数据排序
[toc]
MapReduce程序之數據排序
需求
下面有三個文件:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file1.csv 2 32 654 32 15 756 65223 yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file2.csv 5956 22 650 92 yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file3.csv 26 54 6使用MapReduce對其進行排序并輸出。
分析思路
Map階段分析: /*** 數據在Map之后會做sort(從內存緩沖區到磁盤的時候會做sort),所以Map操作只需要把數據直接寫出即可,最后在本地做數據* 合并的時候也是會有排序的,詳細可以參考MapReduce的過程,但是需要注意的是,因為我們需要進行的是數字的排序,* 所以在Map輸出時,key的類型應該是Int類型才能按照數字的方式進行排序,如果是Text文本的話,那么是按照字典順序* 來進行排序的(也就是先比較字符串中的第一個字符,如果相同再比較第二個字符,以此類推),而不是按照數字進行排序*/Reduce階段分析: /*** 需要注意的是,排序與其它MapReduce程序有所不同,最后在驅動程序設置ReduceTask時,必須要設置為1* 這樣才能把數據都匯總到一起,另外一點,數據在shuffle到達reducer的時候,從內存緩沖區寫到磁盤時* 也會進行排序操作,所以即便是從不同節點上的Map上shuffle來的數據,到輸入到reducer時,數據也是有序的,* 所以Reducer需要做的是把數據直接寫到context中就可以了*/MapReduce程序
關于如何進行數據的排序,思路已經在代碼注釋中有說明,不過需要注意的是,這里使用了前面開發的Job工具類來開發驅動程序,程序代碼如下:
package com.uplooking.bigdata.mr.sort;import com.uplooking.bigdata.common.utils.MapReduceJobUtil; import com.uplooking.bigdata.mr.duplication.DuplicationJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;public class SortJob {/*** 驅動程序,使用工具類來生成job** @param args*/public static void main(String[] args) throws Exception {if (args == null || args.length < 2) {System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");System.exit(-1);}Job job = MapReduceJobUtil.buildJob(new Configuration(),SortJob.class,args[0],TextInputFormat.class,SortMapper.class,IntWritable.class,NullWritable.class,new Path(args[1]),TextOutputFormat.class,SortReducer.class,IntWritable.class,NullWritable.class);// ReduceTask必須設置為1job.setNumReduceTasks(1);job.waitForCompletion(true);}/*** 數據在Map之后會做sort(從內存緩沖區到磁盤的時候會做sort),所以Map操作只需要把數據直接寫出即可,最后在本地做數據* 合并的時候也是會有排序的,詳細可以參考MapReduce的過程,但是需要注意的是,因為我們需要進行的是數字的排序,* 所以在Map輸出時,key的類型應該是Int類型才能按照數字的方式進行排序,如果是Text文本的話,那么是按照字典順序* 來進行排序的(也就是先比較字符串中的第一個字符,如果相同再比較第二個字符,以此類推),而不是按照數字進行排序*/public static class SortMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 先將value轉換為數字int num = Integer.valueOf(value.toString());// 直接寫出數據到context中context.write(new IntWritable(num), NullWritable.get());}}/*** 需要注意的是,排序與其它MapReduce程序有所不同,最后在驅動程序設置ReduceTask時,必須要設置為1* 這樣才能把數據都匯總到一起,另外一點,數據在shuffle到達reducer的時候,從內存緩沖區寫到磁盤時* 也會進行排序操作,所以即便是從不同節點上的Map上shuffle來的數據,到輸入到reducer時,數據也是有序的,* 所以Reducer需要做的是把數據直接寫到context中就可以了*/public static class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {@Overrideprotected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {// 直接將數據寫入到context中context.write(key, NullWritable.get());}}/*** 仍然需要說明的是,因為reduce端在shuffle數據寫入到磁盤的時候已經完成了排序,* 而這個排序的操作不是在reducer的輸出中完成的,這也就意味著,reducer的輸出數據中的key數據類型,* 可以是IntWritable,顯然也可以設置為Text的,說明這個問題主要是要理清map-shuffle-reduce的過程*/ }測試
這里使用本地環境來運行MapReduce程序,輸入的參數如下:
/Users/yeyonghao/data/input/sort /Users/yeyonghao/data/output/mr/sort也可以將其打包成jar包,然后上傳到Hadoop環境中運行。
運行程序后,查看輸出結果如下:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/sort$ cat part-r-00000 2 6 15 22 26 32 54 92 650 654 756 5956 65223可以看到,我們的MapReduce已經完成了數據排序的操作。
注意事項
因為在map輸出后,相同的key會被shuffle到同一個reducer中,所以這個過程其實也完成了去重的操作,這也就意味著,按照上面的MapReduce程序的思路,重復的數據也會被刪除,那么如何解決這個問題呢?大家可以思考一下。
思路也比較簡單,可以這樣做,map輸出的時候,key還是原來的key,而value不再是NullWritalbe,而是跟key一樣的,這樣到了reducer的時候,如果有相同的數據,輸入的數據就類似于<32, [32, 32, 32]>,那么在reducer輸出數據的時候,就可以迭代[32, 32, 32]數據進行輸出,這樣就可以避免shuffle階段key去重所帶來去除了相同數字的問題。
注意,前面一些文章,很多操作多次提到是在shuffle,其實有些是不準備的,包括這里的。在Map端,其實key的去重是在merge on disk的過程中完成了。
總結
以上是生活随笔為你收集整理的MapReduce程序之数据排序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JMockit 1.37 示例
- 下一篇: svn 修改文件的二进制或文本标记