MapReduce排序-实现比较器和序列化代码
生活随笔
收集整理的這篇文章主要介紹了
MapReduce排序-实现比较器和序列化代码
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
要求:
第一列按照字典順序進行排列
第一列相同的時候, 第二列按照升序進行排列
解決思路:
將 Map 端輸出的 <key,value> 中的 key 和 value 組合成一個新的 key (newKey),value值不變
這里就變成 <(key,value),value> , 在針對 newKey 排序的時候, 如果 key 相同, 就再對value進行排序
自定義類型和比較器
public class PairWritable implements WritableComparable<PairWritable> {// 組合key,第一部分是我們第一列,第二部分是我們第二列private String first;private int second;public PairWritable() {}public PairWritable(String first, int second) {this.set(first, second);}/*** 方便設置字段*/public void set(String first, int second) {this.first = first;this.second = second;}/*** 反序列化*/@Overridepublic void readFields(DataInput input) throws IOException {this.first = input.readUTF();this.second = input.readInt();}/*** 序列化*/@Overridepublic void write(DataOutput output) throws IOException {output.writeUTF(first);output.writeInt(second);}/** 重寫比較器*/public int compareTo(PairWritable o) {//每次比較都是調用該方法的對象與傳遞的參數進行比較,說白了就是第一行與第二行比較完了之后的結果與第三行比較,//得出來的結果再去與第四行比較,依次類推System.out.println(o.toString());Step 2. MapperSystem.out.println(this.toString());int comp = this.first.compareTo(o.first);if (comp != 0) {return comp;} else { // 若第一個字段相等,則比較第二個字段return Integer.valueOf(this.second).compareTo(Integer.valueOf(o.getSecond()));}}public int getSecond() {return second;}public void setSecond(int second) {this.second = second;}public String getFirst() {return first;}public void setFirst(String first) {this.first = first;}@Overridepublic String toString() {return "PairWritable{" +"first='" + first + '\'' +", second=" + second +'}';}}Mapper
public class SortMapper extends Mapper<LongWritable,Text,PairWritable,IntWritable> {private PairWritable mapOutKey = new PairWritable();private IntWritable mapOutValue = new IntWritable();@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String lineValue = value.toString();String[] strs = lineValue.split("\t");//設置組合key和value ==> <(key,value),value>mapOutKey.set(strs[0], Integer.valueOf(strs[1]));mapOutValue.set(Integer.valueOf(strs[1]));context.write(mapOutKey, mapOutValue);}}Reducer
public class SortReducer extends Reducer<PairWritable,IntWritable,Text,IntWritable> {private Text outPutKey = new Text();@Overridepublic void reduce(PairWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {//迭代輸出for(IntWritable value : values) {outPutKey.set(key.getFirst());context.write(outPutKey, value);}}}Main 入口
public class SecondarySort extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Configuration conf = super.getConf();conf.set("mapreduce.framework.name","local");Job job = Job.getInstance(conf,SecondarySort.class.getSimpleName());job.setJarByClass(SecondarySort.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("file:///L:\\大數據離線階段備課教案以及資料文檔——by老王\\4、大數據離線第四天\\排序\\input"));TextOutputFormat.setOutputPath(job,new Path("file:///L:\\大數據離線階段備課教案以及資料文檔——by老王\\4、大數據離線第四天\\排序\\output"));job.setMapperClass(SortMapper.class);job.setMapOutputKeyClass(PairWritable.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(SortReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);boolean b = job.waitForCompletion(true);return b?0:1;}public static void main(String[] args) throws Exception {Configuration entries = new Configuration();ToolRunner.run(entries,new SecondarySort(),args);}}?
總結
以上是生活随笔為你收集整理的MapReduce排序-实现比较器和序列化代码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MapReduce排序-概述
- 下一篇: MapReduce-计数器