日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop入门(十)Mapreduce高级shuffle之Sort和Group

發布時間:2023/12/3 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop入门(十)Mapreduce高级shuffle之Sort和Group 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、排序分組概述

MapReduce中排序和分組在哪里被執行

第3步中需要對不同分區中的數據進行排序和分組,默認情況按照key進行排序和分組

?

二、排序

在Hadoop默認的排序算法中,只會針對key值進行排序

任務:
數據文件中,如果按照第一列升序排列,
當第一列相同時,第二列升序排列
如果當第一列相同時,求出第二列的最小值

自定義排序

1.封裝一個自定義類型作為key的新類型:將第一列與第二列都作為key

WritableComparable接口

定義:

public interface WritableComparable<T> extends Writable, Comparable<T> { }

自定義類型MyNewKey實現了WritableComparable的接口,該接口中有一個compareTo()方法,當對key進行比較時會調用該方法,而我們將其改為了我們自己定義的比較規則,從而實現我們想要的效果
?

private static class MyNewKey implements WritableComparable<MyNewKey> {long firstNum;long secondNum;public MyNewKey() {}public MyNewKey(long first, long second) {firstNum = first;secondNum = second;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(firstNum);out.writeLong(secondNum);}@Overridepublic void readFields(DataInput in) throws IOException {firstNum = in.readLong();secondNum = in.readLong();}/** 當key進行排序時會調用以下這個compreTo方法*/@Overridepublic int compareTo(MyNewKey anotherKey) {long min = firstNum - anotherKey.firstNum;if (min != 0) {// 說明第一列不相等,則返回兩數之間小的數return (int) min;} else {return (int) (secondNum - anotherKey.secondNum);}}}

2.改寫最初的MapReduce方法函數

public static class MyMapper extendsMapper<LongWritable, Text, MyNewKey, LongWritable> {protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, MyNewKey, LongWritable>.Context context)throws java.io.IOException, InterruptedException {String[] spilted = value.toString().split("\t");long firstNum = Long.parseLong(spilted[0]);long secondNum = Long.parseLong(spilted[1]);// 使用新的類型作為key參與排序MyNewKey newKey = new MyNewKey(firstNum, secondNum);context.write(newKey, new LongWritable(secondNum));};} public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {context.write(new LongWritable(key.firstNum), new LongWritable(key.secondNum));};}

?

三、分組

在Hadoop中的默認分組規則中,也是基于Key進行的,會將相同key的value放到一個集合中去

目的:求出第一列相同時第二列的最小值
上面的例子看分組,因為我們自定義了一個新的key,它是以兩列數據作為key的,因此這6行數據中每個key都不相同產生6組
它們是:1 1,2 1,2 2,3 1,3 2,3 3。

而實際上只可以分為3組,分別是1,2,3?,F在首先改寫一下reduce函數代碼

public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {long min = Long.MAX_VALUE;for (LongWritable number : values) {long temp = number.get();if (temp < min) {min = temp;}}context.write(new LongWritable(key.firstNum), new LongWritable(min));};}

自定義分組

為了針對新的key類型作分組,我們也需要自定義一下分組規則:

private static class MyGroupingComparator implementsRawComparator<MyNewKey> {/** 基本分組規則:按第一列firstNum進行分組*/@Overridepublic int compare(MyNewKey key1, MyNewKey key2) {return (int) (key1.firstNum - key2.firstNum);}/** @param b1 表示第一個參與比較的字節數組*?* @param s1 表示第一個參與比較的字節數組的起始位置*?* @param l1 表示第一個參與比較的字節數組的偏移量*?* @param b2 表示第二個參與比較的字節數組*?* @param s2 表示第二個參與比較的字節數組的起始位置*?* @param l2 表示第二個參與比較的字節數組的偏移量*/@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);}}

自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,而RawComparator接口又實現了Comparator接口,這兩個接口的定義:

public interface RawComparator<T> extends Comparator<T> {public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); } public interface Comparator<T> {int compare(T o1, T o2);boolean equals(Object obj); }

分組實現步驟:

1.MyGroupingComparator實現這兩個接口
RawComparator中的compare()方法是基于字節的比較,
Comparator中的compare()方法是基于對象的比較

由于在MyNewKey中有兩個long類型,每個long類型又占8個字節。這里因為比較的是第一列數字,所以讀取的偏移量為8字節。

2.添加對分組規則的設置:
  // 設置自定義分組規則
? ?job.setGroupingComparatorClass(MyGroupingComparator.class);

3. 運行結果:

?

?

總結

以上是生活随笔為你收集整理的Hadoop入门(十)Mapreduce高级shuffle之Sort和Group的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。