Hadoop大数据——mapreduce的secondary排序机制
- secondary排序機制
----就是讓mapreduce幫我們根據(jù)value排序
考慮一個場景,需要取按key分組的最大value條目:
通常,shuffle只是對key進行排序
如果需要對value排序,則需要將value放到key中,但是此時,value就和原來的key形成了一個組合key,從而到達reducer時,組合key是一個一個到達reducer,想在reducer中輸出最大value的那一個,不好辦,它會一個一個都輸出去,除非自己弄一個緩存,將到達的組合key全部緩存起來然后只取第一個
(或者弄一個訪問標識?但是同一個reducer可能會收到多個key的組合key,無法判斷訪問標識)
此時就可以用到secondary sort,其思路:
(1)要有對組合key排序的比較器
(2)要有partitioner進行分區(qū)負載并行reducer計算
(3)要有一個groupingcomparator來重定義valuelist聚合策略——這是關鍵,其原理就是將相同key而不同組合key的數(shù)據(jù)進行聚合,從而把他們聚合成一組,然后在reducer中可以一次收到這一組key的組合key,并且,value最大的也就是在這一組中的第一個組合key會被選為迭代器valuelist的key,從而可以直接輸出這個組合key,就實現(xiàn)了我們的需求
示例:輸出每個item的訂單金額最大的記錄
(1)定義一個GroupingComparator
(2)定義訂單信息bean
/*** 訂單信息bean,實現(xiàn)hadoop的序列化機制* @author zhangxueliang**/ public class OrderBean implements WritableComparable<OrderBean>{private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {int cmp = this.itemid.compareTo(o.getItemid());if (cmp == 0) {cmp = -this.amount.compareTo(o.getAmount());}return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount= new DoubleWritable(readDouble);}@Overridepublic String toString() {return itemid.toString() + "\t" + amount.get();} }(3)自定義一個partitioner,以使相同id的bean發(fā)往相同reduce task
public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{@Overridepublic int getPartition(OrderBean key, NullWritable value, int numPartitions) {//指定item_id相同的bean發(fā)往相同的reducer taskreturn (key.getItemid().hashCode() & Integer.MAX_VALUE) % numPartitions;} }(4)定義mr主體流程
/*** 利用secondarysort機制輸出每種item訂單金額最大的記錄* @author zhangxueliang**/ public class SecondarySort {static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{OrderBean bean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, "\t");bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));context.write(bean, NullWritable.get());}}static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{//在設置了groupingcomparator以后,這里收到的kv數(shù)據(jù) 就是: <1001 87.6>,null <1001 76.5>,null .... //此時,reduce方法中的參數(shù)key就是上述kv組中的第一個kv的key:<1001 87.6>//要輸出同一個item的所有訂單中最大金額的那一個,就只要輸出這個key@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SecondarySort.class);job.setMapperClass(SecondarySortMapper.class);job.setReducerClass(SecondarySortReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//指定shuffle所使用的GroupingComparator類job.setGroupingComparatorClass(ItemidGroupingComparator.class);//指定shuffle所使用的partitioner類job.setPartitionerClass(ItemIdPartitioner.class);job.setNumReduceTasks(3);job.waitForCompletion(true);}}總結
以上是生活随笔為你收集整理的Hadoop大数据——mapreduce的secondary排序机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop大数据——mapreduce
- 下一篇: Hadoop大数据——shuffle详解