Hadoop入门(十六)Mapreduce的单表关联程序
"單表關(guān)聯(lián)"要求從給出的數(shù)據(jù)中尋找所關(guān)心的數(shù)據(jù),它是對(duì)原始數(shù)據(jù)所包含信息的挖掘
1 實(shí)例描述
給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——祖父母)表
樣例輸入:?
期望輸出:
grandchild??????? grandparent Tom??????????? Alice Tom??????????? Jesse Jone??????????? Alice Jone??????????? Jesse Tom??????????? Mary Tom??????????? Ben Jone??????????? Mary Jone??????????? Ben Philip?????????? ? Alice Philip??????????? Jesse Mark??????????? Alice Mark??????????? Jesse?
2 問題分析
這個(gè)實(shí)例需要進(jìn)行單表連接,連接的是左表的parent列和右表的child列,且左表和右表是同一個(gè)表。
連接結(jié)果中除去連接的兩列就是所需要的結(jié)果——"grandchild--grandparent"表。要用MapReduce解決這個(gè)實(shí)例,首先應(yīng)該考慮如何實(shí)現(xiàn)表的自連接;其次就是連接列的設(shè)置;最后是結(jié)果的整理
MapReduce的shuffle過程會(huì)將相同的key會(huì)連接在一起,所以可以將map結(jié)果的key設(shè)置成待連接的列,然后列中相同的值就自然會(huì)連接在一起了。
3.實(shí)現(xiàn)步驟
?
4.關(guān)鍵代碼
package com.mk.mapreduce;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Objects;public class JoinSelf {public static class JoinSelfMapper extends Mapper<LongWritable, Text, Text, TableInfo> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if (StringUtils.isBlank(value.toString())) {System.out.println("空白行");return;}String[] values = value.toString().split("\\s+");if (values.length < 2 || values[0].equals("child")) {System.out.println("長(zhǎng)度不夠的行:" + value.toString());return;}context.write(new Text(values[0]), new TableInfo(TableInfo.PARENT, values[1]));context.write(new Text(values[1]), new TableInfo(TableInfo.CHILD, values[0]));}}public static class JoinSelfReducer extends Reducer<Text, TableInfo, Text, Text> {@Overrideprotected void setup(Context context) throws IOException, InterruptedException {context.write(new Text("grandchild"), new Text("grandparent"));}@Overrideprotected void reduce(Text key, Iterable<TableInfo> values, Context context) throws IOException, InterruptedException {List<String> grandChildren = new LinkedList<>();List<String> grandParents = new LinkedList<>();for (TableInfo v : values) {if (v.getTable() == TableInfo.CHILD) {grandChildren.add(v.value.toString());} else {grandParents.add(v.value.toString());}}if (!grandChildren.isEmpty() && !grandParents.isEmpty()) {grandChildren.sort((a,b)->a.compareTo(b));grandParents.sort((a,b)->a.compareTo(b));for (String grandChild :grandChildren)for (String grandParent : grandParents)context.write(new Text(grandChild), new Text(grandParent));}}}public static class TableInfo implements WritableComparable<TableInfo> {public static final int PARENT = 1;public static final int CHILD = 2;private int table;private Text value;public TableInfo() {}public TableInfo(int table, String value) {this.table = table;this.value = new Text(value);}public int getTable() {return table;}public void setTable(int table) {this.table = table;}public void setValue(Text value) {this.value = value;}@Overridepublic int compareTo(TableInfo o) {int c = table - o.table;if (c != 0)return c;return value.compareTo(o.value);}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(table);this.value.write(out);}@Overridepublic void readFields(DataInput in) throws IOException {this.table = in.readInt();if (this.value == null)this.value = new Text();this.value.readFields(in);}@Overridepublic String toString() {return "TableInfo{" +"table=\'" + table +"\', value=\'" + value +"\'}";}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/joinSelf/input";String output = "/joinSelf/output";Configuration conf = new Configuration();if (System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform", "true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path, true);Job job = new Job(conf, "JoinSelf");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(JoinSelf.class);job.setMapperClass(JoinSelfMapper.class);job.setReducerClass(JoinSelfReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableInfo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" + ret);} }?
編程中遇到的問題:
write String到hadoop的問題
read Text類初始化的問題
mapper空白字符\u00A0分割字符串的問題
多個(gè)孫子與多祖父母的問題
?
?
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Hadoop入门(十六)Mapreduce的单表关联程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 中小型企业文档管理和工作流程解决方案中小
- 下一篇: Hadoop入门(十七)Mapreduc