? ? 二次排序就是首先按照第一字段排序,然后再對第一字段相同的行按照第二字段排序,注意不能破壞第一次排序的結果。
????這里主要講如何使用一個Mapreduce就可以實現二次排序。Hadoop有自帶的SecondarySort程序,但這個程序只能對整數進行排序,所以我們需要對其進行改進,使其可以對任意字符串進行排序。下面會分別列出這兩個程序的詳解。????
??? Hadoop自帶的例子中定義的map和reduce如下,關鍵是它對輸入輸出類型的定義:(java泛型編程)?
????????public static ?class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> ?
????????public static class Reduce extends Reducer<IntPair, NullWritable, ?IntWritable, IntWritable>
????在 map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時 InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文 本的一行的行號作為key,這一行的文本作為value。這就是自定義Map的輸入是<LongWritable, ? Text>的原因。然后調用自定義Map的map方法,將一個個<LongWritable, ? Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, ? IntWritable>。最終是生成一個List<IntPair, ? IntWritable>。在map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,每個分區映射到 一個reducer。每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序??梢钥吹?#xff0c;這本身就是一個二次 排序。如果沒有通過job.setSortComparatorClass設置key比較函數類,則使用key的實現的compareTo方法。在第一個 例子中,使用了IntPair實現的compareTo方法,而在下一個例子中,專門定義了key比較函數類。??
????在reduce階 段,reducer接收到所有映射到這個reducer的map輸出后,也是會調用job.setSortComparatorClass設置的key比 較函數類對所有數據對排序。然后開始構造一個key對應的value迭代器。這時就要用到分組,使用 jobjob.setGroupingComparatorClass設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬于同一個組,它們 的value放在一個value迭代器,而這個迭代器的key使用屬于同一個組的所有key的第一個key。最后就是進入Reducer的reduce方 法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。
2?Hadoop自帶的只對兩個整型自帶排序例子詳解
?
2.1?測試數據如下所示:
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job.JobState;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import service.plugin.EJob;public class SecondarySort{/*** @ClassName IntPair* @Description 定義IntPair對象,該對象實現WritableComparable接口,描述第一列和第二列數據,同時完成兩列數據的相關操作,這里是對二者進行比較* */public static class IntPair
implements WritableComparable<IntPair>
{int first;int second;/*** Set the left and right values.*/public void set(
int left,
int right) {first =
left;second =
right;}public int getFirst() {return first;}public int getSecond() {return second;}@Override// 反序列化,從流中的二進制轉換成IntPairpublic void readFields(DataInput in)
throws IOException {// TODO Auto-generated method stubfirst =
in.readInt();second =
in.readInt();}@Override// 序列化,將IntPair轉化成使用流傳送的二進制public void write(DataOutput out)
throws IOException {// TODO Auto-generated method stub
out.writeInt(first);out.writeInt(second);}@Override// key的比較public int compareTo(IntPair o) {// TODO Auto-generated method stubif (first !=
o.first) {return first < o.first ? -1 : 1
;} else if (second !=
o.second) {return second < o.second ? -1 : 1
;} else {return 0
;}}// 新定義類應該重寫的兩個方法,不用這個方法好像也可以// @Override// The hashCode() method is used by the HashPartitioner (the default// partitioner in MapReduce)// public int hashCode() {// return first * 157 + second;// }
@Overridepublic boolean equals(Object right) {if (right ==
null)return false;if (
this ==
right)return true;if (right
instanceof IntPair) {IntPair r =
(IntPair) right;return r.first == first && r.second ==
second;} else {return false;}}}/*** 分區函數類。根據first確定Partition。*/public static class FirstPartitioner
extends Partitioner<IntPair, IntWritable>
{@Overridepublic int getPartition(IntPair key, IntWritable value,
int numPartitions) {System.out.println("FirstPartitioner-----------------------------------------------"
);System.out.println("Math.abs(key.getFirst() * 127) % numPartitions: " + Math.abs(key.getFirst() * 127) %
numPartitions);return Math.abs(key.getFirst() * 127) %
numPartitions;}}/*** 分組函數類。只要first相同就屬于同一個組。*//** //第一種方法,實現接口RawComparator public static class GroupingComparator* implements RawComparator<IntPair> {* * @Override public int compare(IntPair o1, IntPair o2) { int l =* o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 :* 1); }* * @Override //一個字節一個字節的比,直到找到一個不相同的字節,然后比這個字節的大小作為兩個字節流的大小比較結果。 public int* compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ // TODO* Auto-generated method stub return WritableComparator.compareBytes(b1, s1,* Integer.SIZE/8, b2, s2, Integer.SIZE/8); } }*/// 第二種方法,繼承WritableComparatorpublic static class GroupingComparator
extends WritableComparator {protected GroupingComparator() {super(IntPair.
class,
true);System.out.println("GroupingComparator---------------------------------"
);}@Override// Compare two WritableComparables.public int compare(WritableComparable w1, WritableComparable w2) {IntPair ip1 =
(IntPair) w1;IntPair ip2 =
(IntPair) w2;int l =
ip1.getFirst();int r =
ip2.getFirst();return l == r ? 0 : (l < r ? -1 : 1
);}}/*** @ClassName Map* @Description 自定義map類,將每行數據進行分拆,第一列的數據存入left變量,第二列數據存入right變量* 在map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,每個分區映射到一個reducer* 。每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序??梢钥吹?#xff0c;這本身就是一個二次排序。*/public static class Map
extendsMapper<LongWritable, Text, IntPair, IntWritable>
{private final IntPair intkey =
new IntPair();private final IntWritable intvalue =
new IntWritable();public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line =
value.toString();// 調用java自己的工具類StringTokenizer(),將map輸入的每行字符串按規則進行分割成每個字符串,這些規則有\t\n\r\f,基本上分割的結果都可以保證到最細的字符串粒度StringTokenizer tokenizer =
new StringTokenizer(line);int left = 0
;int right = 0
;if (tokenizer.hasMoreTokens()) {left =
Integer.parseInt(tokenizer.nextToken());System.out.println("left: " +
left);if (tokenizer.hasMoreTokens())right =
Integer.parseInt(tokenizer.nextToken());intkey.set(left, right);intvalue.set(right);context.write(intkey, intvalue);}}}// 自定義reducepublic static class Reduce
extendsReducer<IntPair, IntWritable, Text, IntWritable>
{private final Text left =
new Text();private static final Text SEPARATOR =
new Text("------------------------------------------------"
);public void reduce(IntPair key, Iterable<IntWritable>
values,Context context) throws IOException, InterruptedException {context.write(SEPARATOR, null);System.out.println("------------------------------------------------"
);left.set(Integer.toString(key.getFirst()));for (IntWritable val : values) {System.out.println("reduce: left " + left + " , val " +
val);context.write(left, val);}}}/*** @param args*/public static void main(String[] args)
throws IOException,InterruptedException, ClassNotFoundException {// 讀取hadoop配置File jarFile = EJob.createTempJar("bin"
);ClassLoader classLoader =
EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);Configuration conf =
new Configuration(
true);String[] otherArgs =
new String[2
];otherArgs[0] = "hdfs://192.168.1.100:9000/test_in/secondary_sort_data.txt"
;String time =
new SimpleDateFormat("yyyyMMddHHmmss").format(
new Date());otherArgs[1] = "hdfs://192.168.1.100:9000/test_out/mr-" +
time;Job job =
new Job(conf, "secondarysort"
);job.setJarByClass(SecondarySort.class);((JobConf) job.getConfiguration()).setJar(jarFile.toString());job.setMapperClass(Map.class);// 不再需要Combiner類型,因為Combiner的輸出類型<Text,// IntWritable>對Reduce的輸入類型<IntPair, IntWritable>不適用// job.setCombinerClass(Reduce.class);// 分區函數job.setPartitionerClass(FirstPartitioner.
class);// 分組函數job.setGroupingComparatorClass(GroupingComparator.
class);// Reducer類型job.setReducerClass(Reduce.
class);// map輸出Key的類型job.setMapOutputKeyClass(IntPair.
class);// map輸出Value的類型job.setMapOutputValueClass(IntWritable.
class);// reduce輸出Key的類型,是Text,因為使用的OutputFormatClass是TextOutputFormatjob.setOutputKeyClass(Text.
class);// reduce輸出Value的類型job.setOutputValueClass(IntWritable.
class);// 將輸入的數據集分割成小數據塊splites,同時提供一個RecordReder的實現。job.setInputFormatClass(TextInputFormat.
class);// 提供一個RecordWriter的實現,負責數據輸出。job.setOutputFormatClass(TextOutputFormat.
class);FileInputFormat.setInputPaths(job, new Path(otherArgs[0
]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1
]));// 提交jobif (job.waitForCompletion(
false)) {System.out.println("job ok !"
);} else {System.out.println("job error !"
);}}
} 執行結果如下所示:
------------------------------------------------
1?? ?2
------------------------------------------------
3?? ?4
------------------------------------------------
5?? ?6
------------------------------------------------
7?? ?8
7?? ?82
------------------------------------------------
12?? ?211
------------------------------------------------
20?? ?21
20?? ?53
20?? ?522
------------------------------------------------
31?? ?42
------------------------------------------------
40?? ?511
------------------------------------------------
50?? ?51
50?? ?52
50?? ?53
50?? ?53
50?? ?54
50?? ?62
50?? ?512
50?? ?522
------------------------------------------------
60?? ?51
60?? ?52
60?? ?53
60?? ?56
60?? ?56
60?? ?57
60?? ?57
60?? ?61
------------------------------------------------
63?? ?61
------------------------------------------------
70?? ?54
70?? ?55
70?? ?56
70?? ?57
70?? ?58
70?? ?58
------------------------------------------------
71?? ?55
71?? ?56
------------------------------------------------
73?? ?57
------------------------------------------------
74?? ?58
------------------------------------------------
203?? ?21
------------------------------------------------
530?? ?54
------------------------------------------------
730?? ?54
------------------------------------------------
740?? ?58
?
3?改進后的二次排序(可對字符串進行排序)
?
3.1?測試數據如下所示:
import java
import java
import java
import java
?????????? ?
import1 org
import org1
import1 org
import2 org2
import org
import2 org1
import1 org
import1 org
import org2
import2 org3
??????????? org
import org
import1 org
importin org
import org
hello time
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import service.plugin.EJob;public class SecondarySortString {// 自己定義的key類應該實現WritableComparable接口public static class IntPair
implements WritableComparable<IntPair>
{String first;String second;/*** Set the left and right values.*/public void set(String left, String right) {first =
left;second =
right;}public String getFirst() {return first;}public String getSecond() {return second;}// 反序列化,從流中的二進制轉換成IntPairpublic void readFields(DataInput in)
throws IOException {first =
in.readUTF();second =
in.readUTF();}// 序列化,將IntPair轉化成使用流傳送的二進制public void write(DataOutput out)
throws IOException {out.writeUTF(first);out.writeUTF(second);}// 重載 compareTo 方法,進行組合鍵 key 的比較,該過程是默認行為。// 分組后的二次排序會隱式調用該方法。public int compareTo(IntPair o) {if (!
first.equals(o.first)) {return first.compareTo(o.first);} else if (!
second.equals(o.second)) {return second.compareTo(o.second);} else {return 0
;}}// 新定義類應該重寫的兩個方法// The hashCode() method is used by the HashPartitioner (the default// partitioner in MapReduce)public int hashCode() {return first.hashCode() * 157 +
second.hashCode();}public boolean equals(Object right) {if (right ==
null)return false;if (
this ==
right)return true;if (right
instanceof IntPair) {IntPair r =
(IntPair) right;return r.first.equals(first) &&
r.second.equals(second);} else {return false;}}}/*** 分區函數類。根據first確定Partition。*/public static class FirstPartitioner
extends Partitioner<IntPair, Text>
{public int getPartition(IntPair key, Text value,
int numPartitions) {return Math.abs(key.getFirst().hashCode() * 127) %
numPartitions;}}/*** 分組函數類。只要first相同就屬于同一個組。*//** //第一種方法,實現接口RawComparator public static class GroupingComparator* implements RawComparator<IntPair> { public int compare(IntPair o1,* IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r* ? 0 : (l < r ? -1 : 1); }* //一個字節一個字節的比,直到找到一個不相同的字節,然后比這個字節的大小作為兩個字節流的大小比較結果。 public int* compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ return* WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2,* Integer.SIZE/8); } }*/// 第二種方法,繼承WritableComparatorpublic static class GroupingComparator
extends WritableComparator {protected GroupingComparator() {super(IntPair.
class,
true);}// Compare two WritableComparables.// 重載 compare:對組合鍵按第一個自然鍵排序分組public int compare(WritableComparable w1, WritableComparable w2) {IntPair ip1 =
(IntPair) w1;IntPair ip2 =
(IntPair) w2;String l =
ip1.getFirst();String r =
ip2.getFirst();return l.compareTo(r);}}// 自定義mappublic static class Map
extends Mapper<LongWritable, Text, IntPair, Text>
{private final IntPair keyPair =
new IntPair();String[] lineArr =
null;public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line =
value.toString();if(line.isEmpty()){return;}lineArr = line.split(" ", -1
);keyPair.set(lineArr[0], lineArr[1
]);context.write(keyPair, value);}}// 自定義reducepublic static class Reduce
extends Reducer<IntPair, Text, Text, Text>
{private static final Text SEPARATOR =
new Text("------------------------------------------------"
);public void reduce(IntPair key, Iterable<Text>
values, Context context)throws IOException, InterruptedException {context.write(SEPARATOR, null);for (Text val : values) {context.write(null, val);}}}public static void main(String[] args)
throws IOException,InterruptedException, ClassNotFoundException {File jarFile = EJob.createTempJar("bin"
);ClassLoader classLoader =
EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);Configuration conf =
new Configuration(
true);String[] otherArgs =
new String[2
];otherArgs[0] = "hdfs://192.168.1.100:9000/data/test_in/secondary_sort_data_string.txt"
;String time =
new SimpleDateFormat("yyyyMMddHHmmss").format(
new Date());otherArgs[1] = "hdfs://192.168.1.100:9000/data/test_out/mr-" +
time;// 實例化一道作業Job job =
new Job(conf, "secondarysort"
);job.setJarByClass(SecondarySort.class);((JobConf) job.getConfiguration()).setJar(jarFile.toString());// Mapper類型job.setMapperClass(Map.
class);// 不再需要Combiner類型,因為Combiner的輸出類型<Text,// IntWritable>對Reduce的輸入類型<IntPair, IntWritable>不適用// job.setCombinerClass(Reduce.class);// Reducer類型job.setReducerClass(Reduce.
class);// 分區函數job.setPartitionerClass(FirstPartitioner.
class);// 分組函數job.setGroupingComparatorClass(GroupingComparator.
class);// map 輸出Key的類型job.setMapOutputKeyClass(IntPair.
class);// map輸出Value的類型job.setMapOutputValueClass(Text.
class);// rduce輸出Key的類型,是Text,因為使用的OutputFormatClass是TextOutputFormatjob.setOutputKeyClass(Text.
class);// rduce輸出Value的類型job.setOutputValueClass(Text.
class);// 將輸入的數據集分割成小數據塊splites,同時提供一個RecordReder的實現。job.setInputFormatClass(TextInputFormat.
class);// 提供一個RecordWriter的實現,負責數據輸出。job.setOutputFormatClass(TextOutputFormat.
class);// 輸入hdfs路徑FileInputFormat.setInputPaths(job,
new Path(otherArgs[0
]));// 輸出hdfs路徑
// FileSystem.get(conf).delete(new Path(args[1]), true);FileOutputFormat.setOutputPath(job,
new Path(otherArgs[1
]));// 提交jobSystem.exit(job.waitForCompletion(
true) ? 0 : 1
);}
} ?
3.3?執行結果如下所示:
------------------------------------------------
??????????? org
?????????? ?
------------------------------------------------
hello time
------------------------------------------------
import java
import java
import java
import java
import org
import org
import org
import org1
import org2
------------------------------------------------
import1 org
import1 org
import1 org
import1 org
import1 org
------------------------------------------------
import2 org1
import2 org2
import2 org3
------------------------------------------------
importin org
?
轉https://www.cnblogs.com/minkaihui/p/4125672.html
轉載于:https://www.cnblogs.com/likanmama/p/7804949.html
總結
以上是生活随笔為你收集整理的hadoop二次排序的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。