使用secondary sort实现数据关联 完整示例代码
需要進行兩種數據的關聯, 費了好大勁, 最后才使用secondary sort解決
?
其中有幾個地方需要注意:
1 readFields 和 writeFields 的寫法需要完全一致, 否則寫入和讀取的數據就會錯亂。 (在此耽擱良久)
2 要override 成員函數hashCode(), 否則“相同”key的數據不一定在唯一的機器上
3 要定義groupbyComparator
?
在reduce操作之前,
------1 會對數據進行partition: RecordKey.hashCode, 該函數會被partitioner調用, 使得數據分配到不同的reduce機器上。 【因此對于我們的需求需要override】
------2 會使用groupbyComparator對數據進行分組:Comparator.compare()
------3 會排序: RecordKey.compareTo()
------4 要設置Comparator
?
?
ps: 自定義了partitioner,Job.setPartitionerClass(class), 但是沒起作用。 【0.19.1??? 0.19.2】, 就是在此耽擱良久, 后來才嘗試使用secondary sort
?
package com.qq.dma;public class testsecondarysort extends Configured implements Tool {public static class RecordKey implements WritableComparable<RecordKey>{String md5;String type;@Overridepublic void readFields(DataInput arg0) throws IOException {// TODO Auto-generated method stubString s = arg0.readLine();String t[] = s.split("/t");System.out.println("t.length: " + t.length + "<br>");System.out.println("content: " + s + "<br>");if(t.length == 2){md5 = t[0];type = t[1];}else{md5 = "0";type = "0";}}@Overridepublic void write(DataOutput arg0) throws IOException {// TODO Auto-generated method stubarg0.writeBytes(md5 + "/t" + type + "/n");}@Overridepublic int compareTo(RecordKey o) {System.out.println("recordkey compareto called <br>");// TODO Auto-generated method stubint r = md5.compareTo(o.md5);if(r != 0){if(r < 0)return -1;else if(r > 0)return 1;}else{ //按第2字段逆序r = type.compareTo(o.type);if(r < 0)return 1;else if(r > 0)return -1;}return 0;}public int hashCode(){return md5.hashCode();}}public class PkFkComparator extends WritableComparator {public PkFkComparator(){super(RecordKey.class);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {RecordKey key1 = (RecordKey)a;RecordKey key2 = (RecordKey)b;System.out.println("Comparator call compare");int r = key1.md5.compareTo(key2.md5);if(r == 0)return 0;else if(r > 0)return 1;else return -1;}}public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, RecordKey, Text> {StringBuffer newresult = new StringBuffer("");public void configure(JobConf job) {System.out.println("configure invoked/n");super.configure(job);{}}public void map(LongWritable arg0, Text arg1, OutputCollector<RecordKey, Text> arg2, Reporter arg3) throws IOException {newresult.setLength(0);String all[] = arg1.toString().split("/t");//if(all.length == 3){RecordKey k = new RecordKey();k.md5 = all[0];k.type = all[1];arg2.collect(k, new Text(all[2]));System.out.println("map: " + arg1.toString() + "<br>");}}}public static class Reduce extends MapReduceBase implements Reducer<RecordKey, Text, Text, Text> {private final String FieldSeperator = "/t";public static int reducetimes = 0;String md5 = "";//Overridepublic void close() {}//Overridepublic void configure(JobConf job) {super.configure(job);}public void reduce(RecordKey arg1, Iterator<Text> arg2, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { System.out.println( "the " + ++reducetimes + " times <br>"); while(arg2.hasNext()){output.collect(new Text(arg1.md5 + "/t" + arg1.type), arg2.next());}}}static int printUsage() {ToolRunner.printGenericCommandUsage(System.out);return -1;}public int run(String[] args) throws Exception {JobConf conf = new JobConf(getConf(), testsecondarysort.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(Text.class);conf.setMapOutputKeyClass(RecordKey.class);conf.setOutputValueGroupingComparator(PkFkComparator.class);conf.setOutputValueClass(Text.class);conf.setMapperClass(Map.class);conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);List<String> other_args = new ArrayList<String>();for (int i = 0; i < args.length; ++i) {try {if ("-m".equals(args[i])) {conf.setNumMapTasks(Integer.parseInt(args[++i]));} else if ("-r".equals(args[i])) {conf.setNumReduceTasks(Integer.parseInt(args[++i]));} else {other_args.add(args[i]);}} catch (NumberFormatException except) {return printUsage();} catch (ArrayIndexOutOfBoundsException except) {return printUsage();}}FileInputFormat.setInputPaths(conf, other_args.get(0));FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));JobClient.runJob(conf);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new testsecondarysort(), args);System.exit(res);} }
總結
以上是生活随笔為你收集整理的使用secondary sort实现数据关联 完整示例代码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 该死的java String
- 下一篇: oracle 中表示字符串使用单引号