日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop大数据——mapreduce的secondary排序机制

發(fā)布時間:2025/1/21 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop大数据——mapreduce的secondary排序机制 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
  • secondary排序機制

----就是讓mapreduce幫我們根據(jù)value排序
考慮一個場景,需要取按key分組的最大value條目:
通常,shuffle只是對key進行排序
如果需要對value排序,則需要將value放到key中,但是此時,value就和原來的key形成了一個組合key,從而到達reducer時,組合key是一個一個到達reducer,想在reducer中輸出最大value的那一個,不好辦,它會一個一個都輸出去,除非自己弄一個緩存,將到達的組合key全部緩存起來然后只取第一個
(或者弄一個訪問標識?但是同一個reducer可能會收到多個key的組合key,無法判斷訪問標識)
此時就可以用到secondary sort,其思路:
(1)要有對組合key排序的比較器
(2)要有partitioner進行分區(qū)負載并行reducer計算
(3)要有一個groupingcomparator來重定義valuelist聚合策略——這是關鍵,其原理就是將相同key而不同組合key的數(shù)據(jù)進行聚合,從而把他們聚合成一組,然后在reducer中可以一次收到這一組key的組合key,并且,value最大的也就是在這一組中的第一個組合key會被選為迭代器valuelist的key,從而可以直接輸出這個組合key,就實現(xiàn)了我們的需求
示例:輸出每個item的訂單金額最大的記錄
(1)定義一個GroupingComparator

/*** 用于控制shuffle過程中reduce端對kv對的聚合邏輯* @author zhangxueliang**/ public class ItemidGroupingComparator extends WritableComparator {protected ItemidGroupingComparator() {super(OrderBean.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;//將item_id相同的bean都視為相同,從而聚合為一組return abean.getItemid().compareTo(bbean.getItemid());} }

(2)定義訂單信息bean

/*** 訂單信息bean,實現(xiàn)hadoop的序列化機制* @author zhangxueliang**/ public class OrderBean implements WritableComparable<OrderBean>{private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {int cmp = this.itemid.compareTo(o.getItemid());if (cmp == 0) {cmp = -this.amount.compareTo(o.getAmount());}return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount= new DoubleWritable(readDouble);}@Overridepublic String toString() {return itemid.toString() + "\t" + amount.get();} }

(3)自定義一個partitioner,以使相同id的bean發(fā)往相同reduce task

public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{@Overridepublic int getPartition(OrderBean key, NullWritable value, int numPartitions) {//指定item_id相同的bean發(fā)往相同的reducer taskreturn (key.getItemid().hashCode() & Integer.MAX_VALUE) % numPartitions;} }

(4)定義mr主體流程

/*** 利用secondarysort機制輸出每種item訂單金額最大的記錄* @author zhangxueliang**/ public class SecondarySort {static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{OrderBean bean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, "\t");bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));context.write(bean, NullWritable.get());}}static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{//在設置了groupingcomparator以后,這里收到的kv數(shù)據(jù) 就是: <1001 87.6>,null <1001 76.5>,null .... //此時,reduce方法中的參數(shù)key就是上述kv組中的第一個kv的key:<1001 87.6>//要輸出同一個item的所有訂單中最大金額的那一個,就只要輸出這個key@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SecondarySort.class);job.setMapperClass(SecondarySortMapper.class);job.setReducerClass(SecondarySortReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//指定shuffle所使用的GroupingComparator類job.setGroupingComparatorClass(ItemidGroupingComparator.class);//指定shuffle所使用的partitioner類job.setPartitionerClass(ItemIdPartitioner.class);job.setNumReduceTasks(3);job.waitForCompletion(true);}}

總結

以上是生活随笔為你收集整理的Hadoop大数据——mapreduce的secondary排序机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。