Hadoop大数据——mapreduce的secondary排序机制
- secondary排序機(jī)制
----就是讓mapreduce幫我們根據(jù)value排序
考慮一個(gè)場(chǎng)景,需要取按key分組的最大value條目:
通常,shuffle只是對(duì)key進(jìn)行排序
如果需要對(duì)value排序,則需要將value放到key中,但是此時(shí),value就和原來的key形成了一個(gè)組合key,從而到達(dá)reducer時(shí),組合key是一個(gè)一個(gè)到達(dá)reducer,想在reducer中輸出最大value的那一個(gè),不好辦,它會(huì)一個(gè)一個(gè)都輸出去,除非自己弄一個(gè)緩存,將到達(dá)的組合key全部緩存起來然后只取第一個(gè)
(或者弄一個(gè)訪問標(biāo)識(shí)?但是同一個(gè)reducer可能會(huì)收到多個(gè)key的組合key,無法判斷訪問標(biāo)識(shí))
此時(shí)就可以用到secondary sort,其思路:
(1)要有對(duì)組合key排序的比較器
(2)要有partitioner進(jìn)行分區(qū)負(fù)載并行reducer計(jì)算
(3)要有一個(gè)groupingcomparator來重定義valuelist聚合策略——這是關(guān)鍵,其原理就是將相同key而不同組合key的數(shù)據(jù)進(jìn)行聚合,從而把他們聚合成一組,然后在reducer中可以一次收到這一組key的組合key,并且,value最大的也就是在這一組中的第一個(gè)組合key會(huì)被選為迭代器valuelist的key,從而可以直接輸出這個(gè)組合key,就實(shí)現(xiàn)了我們的需求
示例:輸出每個(gè)item的訂單金額最大的記錄
(1)定義一個(gè)GroupingComparator
(2)定義訂單信息bean
/*** 訂單信息bean,實(shí)現(xiàn)hadoop的序列化機(jī)制* @author zhangxueliang**/ public class OrderBean implements WritableComparable<OrderBean>{private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {int cmp = this.itemid.compareTo(o.getItemid());if (cmp == 0) {cmp = -this.amount.compareTo(o.getAmount());}return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount= new DoubleWritable(readDouble);}@Overridepublic String toString() {return itemid.toString() + "\t" + amount.get();} }(3)自定義一個(gè)partitioner,以使相同id的bean發(fā)往相同reduce task
public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{@Overridepublic int getPartition(OrderBean key, NullWritable value, int numPartitions) {//指定item_id相同的bean發(fā)往相同的reducer taskreturn (key.getItemid().hashCode() & Integer.MAX_VALUE) % numPartitions;} }(4)定義mr主體流程
/*** 利用secondarysort機(jī)制輸出每種item訂單金額最大的記錄* @author zhangxueliang**/ public class SecondarySort {static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{OrderBean bean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, "\t");bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));context.write(bean, NullWritable.get());}}static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{//在設(shè)置了groupingcomparator以后,這里收到的kv數(shù)據(jù) 就是: <1001 87.6>,null <1001 76.5>,null .... //此時(shí),reduce方法中的參數(shù)key就是上述kv組中的第一個(gè)kv的key:<1001 87.6>//要輸出同一個(gè)item的所有訂單中最大金額的那一個(gè),就只要輸出這個(gè)key@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SecondarySort.class);job.setMapperClass(SecondarySortMapper.class);job.setReducerClass(SecondarySortReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//指定shuffle所使用的GroupingComparator類job.setGroupingComparatorClass(ItemidGroupingComparator.class);//指定shuffle所使用的partitioner類job.setPartitionerClass(ItemIdPartitioner.class);job.setNumReduceTasks(3);job.waitForCompletion(true);}}總結(jié)
以上是生活随笔為你收集整理的Hadoop大数据——mapreduce的secondary排序机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop大数据——mapreduce
- 下一篇: Hadoop大数据——shuffle详解