日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用secondary sort实现数据关联 完整示例代码

發布時間:2025/3/21 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用secondary sort实现数据关联 完整示例代码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

需要進行兩種數據的關聯, 費了好大勁, 最后才使用secondary sort解決

?

其中有幾個地方需要注意:

1 readFields 和 writeFields 的寫法需要完全一致, 否則寫入和讀取的數據就會錯亂。 (在此耽擱良久)

2 要override 成員函數hashCode(), 否則“相同”key的數據不一定在唯一的機器上

3 要定義groupbyComparator

?

在reduce操作之前,

------1 會對數據進行partition: RecordKey.hashCode, 該函數會被partitioner調用, 使得數據分配到不同的reduce機器上。 【因此對于我們的需求需要override】

------2 會使用groupbyComparator對數據進行分組:Comparator.compare()

------3 會排序: RecordKey.compareTo()

------4 要設置Comparator

?

?

ps: 自定義了partitioner,Job.setPartitionerClass(class), 但是沒起作用。 【0.19.1??? 0.19.2】, 就是在此耽擱良久, 后來才嘗試使用secondary sort

?

package com.qq.dma;public class testsecondarysort extends Configured implements Tool {public static class RecordKey implements WritableComparable<RecordKey>{String md5;String type;@Overridepublic void readFields(DataInput arg0) throws IOException {// TODO Auto-generated method stubString s = arg0.readLine();String t[] = s.split("/t");System.out.println("t.length: " + t.length + "<br>");System.out.println("content: " + s + "<br>");if(t.length == 2){md5 = t[0];type = t[1];}else{md5 = "0";type = "0";}}@Overridepublic void write(DataOutput arg0) throws IOException {// TODO Auto-generated method stubarg0.writeBytes(md5 + "/t" + type + "/n");}@Overridepublic int compareTo(RecordKey o) {System.out.println("recordkey compareto called <br>");// TODO Auto-generated method stubint r = md5.compareTo(o.md5);if(r != 0){if(r < 0)return -1;else if(r > 0)return 1;}else{ //按第2字段逆序r = type.compareTo(o.type);if(r < 0)return 1;else if(r > 0)return -1;}return 0;}public int hashCode(){return md5.hashCode();}}public class PkFkComparator extends WritableComparator {public PkFkComparator(){super(RecordKey.class);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {RecordKey key1 = (RecordKey)a;RecordKey key2 = (RecordKey)b;System.out.println("Comparator call compare");int r = key1.md5.compareTo(key2.md5);if(r == 0)return 0;else if(r > 0)return 1;else return -1;}}public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, RecordKey, Text> {StringBuffer newresult = new StringBuffer("");public void configure(JobConf job) {System.out.println("configure invoked/n");super.configure(job);{}}public void map(LongWritable arg0, Text arg1, OutputCollector<RecordKey, Text> arg2, Reporter arg3) throws IOException {newresult.setLength(0);String all[] = arg1.toString().split("/t");//if(all.length == 3){RecordKey k = new RecordKey();k.md5 = all[0];k.type = all[1];arg2.collect(k, new Text(all[2]));System.out.println("map: " + arg1.toString() + "<br>");}}}public static class Reduce extends MapReduceBase implements Reducer<RecordKey, Text, Text, Text> {private final String FieldSeperator = "/t";public static int reducetimes = 0;String md5 = "";//Overridepublic void close() {}//Overridepublic void configure(JobConf job) {super.configure(job);}public void reduce(RecordKey arg1, Iterator<Text> arg2, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { System.out.println( "the " + ++reducetimes + " times <br>"); while(arg2.hasNext()){output.collect(new Text(arg1.md5 + "/t" + arg1.type), arg2.next());}}}static int printUsage() {ToolRunner.printGenericCommandUsage(System.out);return -1;}public int run(String[] args) throws Exception {JobConf conf = new JobConf(getConf(), testsecondarysort.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(Text.class);conf.setMapOutputKeyClass(RecordKey.class);conf.setOutputValueGroupingComparator(PkFkComparator.class);conf.setOutputValueClass(Text.class);conf.setMapperClass(Map.class);conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);List<String> other_args = new ArrayList<String>();for (int i = 0; i < args.length; ++i) {try {if ("-m".equals(args[i])) {conf.setNumMapTasks(Integer.parseInt(args[++i]));} else if ("-r".equals(args[i])) {conf.setNumReduceTasks(Integer.parseInt(args[++i]));} else {other_args.add(args[i]);}} catch (NumberFormatException except) {return printUsage();} catch (ArrayIndexOutOfBoundsException except) {return printUsage();}}FileInputFormat.setInputPaths(conf, other_args.get(0));FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));JobClient.runJob(conf);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new testsecondarysort(), args);System.exit(res);} }

總結

以上是生活随笔為你收集整理的使用secondary sort实现数据关联 完整示例代码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产91精品欧美 | 欧美永久 | 亚洲熟女一区二区 | 亚洲国产aⅴ精品一区二区 日韩黄色在线视频 | 久久视频精品 | 麻豆传媒mv| 大黄网站在线观看 | 人成午夜 | 日韩一区欧美一区 | 欧美爽妇 | 在线免费看黄视频 | 99久久久精品 | 精品伊人| av国产精品 | 美女精品一区二区 | 日本福利视频导航 | 亚洲国产成人va在线观看天堂 | 久久久久久欧美 | 精品视频在线观看 | 特黄一区 | 在线观看黄色国产 | 国产成人av免费观看 | 国产毛片在线视频 | 久草超碰在线 | 色网站观看 | 欧美区国产区 | 免费av手机在线观看 | 中文字幕av在线免费观看 | 91色交| 懂色av蜜臀av粉嫩av分享吧 | 又黄又色又爽的视频 | 欧美 亚洲 | 亚洲无码精品国产 | 美女流白浆视频 | 无码av天堂一区二区三区 | 大尺度网站在线观看 | 最近中文字幕mv免费高清在线 | 国产一级免费看 | 精品久久久久久亚洲 | 欧美日韩中文一区 | 在线免费你懂的 | 国产性猛交xxxⅹ交酡全过程 | 在线观看www视频 | 日本捏奶吃奶的视频 | 国产精品美女在线 | 天堂网色 | 国产欧美日韩精品区一区二污污污 | av成人毛片 | 成人综合婷婷国产精品久久 | 美女扒开尿口让男人桶 | 成年人免费看的视频 | 银娇在线观看 | 中文字幕欧美人妻精品 | 亚洲丝袜中文字幕 | 国产不卡在线播放 | 国产污视频在线看 | 欧美国产一级片 | 福利在线国产 | 怡红院成人网 | 日本成人中文字幕 | 亚洲4438| 久久影视大全 | 欧美日韩国产一区二区在线观看 | 国产高清免费在线观看 | 97碰| 午夜伦理在线观看 | 黄色在线免费观看视频 | 亚洲av无码精品色午夜果冻不卡 | 日韩一二三四五区 | 99热这| 黄色茄子视频 | 亚洲一区二区三区香蕉 | 国产精品传媒在线 | 在线视频久久 | 91成人免费视频 | 欧美一线高本道 | 一级大片儿 | 久久av一区 | 娇小tube性极品娇小 | 四虎影院免费视频 | 国产一级av毛片 | 亚洲少妇一区二区 | 色婷婷精品久久二区二区密 | 日韩二级片 | 国产精品久久久久久久久久久久午夜片 | 九九热精品视频在线 | 射久久| 欧美性久久 | 免费观看一级一片 | 日韩福利视频一区 | 亚洲一二三四 | 综合久久久久久久久久久 | 尹人综合网 | 污污在线免费观看 | 男女污污视频在线观看 | 欧美a网 | 亚洲精品久久视频 | 欧美日韩在线视频一区二区三区 | 国产成人精品亚洲线观看 |