日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop入门(十六)Mapreduce的单表关联程序

發(fā)布時(shí)間:2023/12/3 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop入门(十六)Mapreduce的单表关联程序 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

"單表關(guān)聯(lián)"要求從給出的數(shù)據(jù)中尋找所關(guān)心的數(shù)據(jù),它是對(duì)原始數(shù)據(jù)所包含信息的挖掘

1 實(shí)例描述

給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——祖父母)表
樣例輸入:?

child parent Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma

期望輸出:

grandchild??????? grandparent Tom???????????   Alice Tom???????????   Jesse Jone???????????   Alice Jone???????????   Jesse Tom???????????   Mary Tom???????????   Ben Jone???????????   Mary Jone???????????   Ben Philip??????????  ? Alice Philip???????????   Jesse Mark???????????   Alice Mark???????????   Jesse

?

2 問題分析

這個(gè)實(shí)例需要進(jìn)行單表連接,連接的是左表的parent列和右表的child列,且左表和右表是同一個(gè)表。
  連接結(jié)果中除去連接的兩列就是所需要的結(jié)果——"grandchild--grandparent"表。要用MapReduce解決這個(gè)實(shí)例,首先應(yīng)該考慮如何實(shí)現(xiàn)表的自連接;其次就是連接列的設(shè)置;最后是結(jié)果的整理
MapReduce的shuffle過程會(huì)將相同的key會(huì)連接在一起,所以可以將map結(jié)果的key設(shè)置成待連接的列,然后列中相同的值就自然會(huì)連接在一起了。

  

3.實(shí)現(xiàn)步驟

  • map階段將讀入數(shù)據(jù)分割成child和parent之后,將parent設(shè)置成key,child設(shè)置成value進(jìn)行輸出,并作為左表;再將同一對(duì)child和parent中的child設(shè)置成key,parent設(shè)置成value進(jìn)行輸出,作為右表
  • 為了區(qū)分輸出中的左右表,需要在輸出的value中再加上左右表的信息,比如在value的String最開始處加上字符1表示左表,加上字符2表示右表
  • ?reduce接收到連接的結(jié)果,其中每個(gè)key的value-list就包含了"grandchild--grandparent"關(guān)系。取出每個(gè)key的value-list進(jìn)行解析,將左表中的child放入一個(gè)數(shù)組,右表中的parent放入一個(gè)數(shù)組,然后對(duì)兩個(gè)數(shù)組求笛卡爾積就是最后的結(jié)果了
  • ?

    4.關(guān)鍵代碼

    package com.mk.mapreduce;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Objects;public class JoinSelf {public static class JoinSelfMapper extends Mapper<LongWritable, Text, Text, TableInfo> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if (StringUtils.isBlank(value.toString())) {System.out.println("空白行");return;}String[] values = value.toString().split("\\s+");if (values.length < 2 || values[0].equals("child")) {System.out.println("長(zhǎng)度不夠的行:" + value.toString());return;}context.write(new Text(values[0]), new TableInfo(TableInfo.PARENT, values[1]));context.write(new Text(values[1]), new TableInfo(TableInfo.CHILD, values[0]));}}public static class JoinSelfReducer extends Reducer<Text, TableInfo, Text, Text> {@Overrideprotected void setup(Context context) throws IOException, InterruptedException {context.write(new Text("grandchild"), new Text("grandparent"));}@Overrideprotected void reduce(Text key, Iterable<TableInfo> values, Context context) throws IOException, InterruptedException {List<String> grandChildren = new LinkedList<>();List<String> grandParents = new LinkedList<>();for (TableInfo v : values) {if (v.getTable() == TableInfo.CHILD) {grandChildren.add(v.value.toString());} else {grandParents.add(v.value.toString());}}if (!grandChildren.isEmpty() && !grandParents.isEmpty()) {grandChildren.sort((a,b)->a.compareTo(b));grandParents.sort((a,b)->a.compareTo(b));for (String grandChild :grandChildren)for (String grandParent : grandParents)context.write(new Text(grandChild), new Text(grandParent));}}}public static class TableInfo implements WritableComparable<TableInfo> {public static final int PARENT = 1;public static final int CHILD = 2;private int table;private Text value;public TableInfo() {}public TableInfo(int table, String value) {this.table = table;this.value = new Text(value);}public int getTable() {return table;}public void setTable(int table) {this.table = table;}public void setValue(Text value) {this.value = value;}@Overridepublic int compareTo(TableInfo o) {int c = table - o.table;if (c != 0)return c;return value.compareTo(o.value);}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(table);this.value.write(out);}@Overridepublic void readFields(DataInput in) throws IOException {this.table = in.readInt();if (this.value == null)this.value = new Text();this.value.readFields(in);}@Overridepublic String toString() {return "TableInfo{" +"table=\'" + table +"\', value=\'" + value +"\'}";}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/joinSelf/input";String output = "/joinSelf/output";Configuration conf = new Configuration();if (System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform", "true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path, true);Job job = new Job(conf, "JoinSelf");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(JoinSelf.class);job.setMapperClass(JoinSelfMapper.class);job.setReducerClass(JoinSelfReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableInfo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" + ret);} }

    ?

    編程中遇到的問題:

    write String到hadoop的問題

    read Text類初始化的問題

    mapper空白字符\u00A0分割字符串的問題

    多個(gè)孫子與多祖父母的問題

    ?

    ?

    創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

    總結(jié)

    以上是生活随笔為你收集整理的Hadoop入门(十六)Mapreduce的单表关联程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。