生活随笔
收集整理的這篇文章主要介紹了
【原创】MapReduce编程系列之表连接
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
? ? ? ? 需要連接的表如下:其中左邊是child,右邊是parent,我們要做的是找出grandchild和grandparent的對應關系,為此需要進行表的連接。
Tom Lucy
Tom Jim
Lucy David
Lucy Lili
Jim Lilei
Jim SuSan
Lily Green
Lily Bians
Green Well
Green MillShell
Havid James
James LiT
Richard Cheng
Cheng LiHua
誠然,在寫MR程序的時候要結合MR數據處理的一些特性。例如如果我們用默認的TextInputFormat來處理傳入的文件數據,傳入的格式是key為行號,value為這一行的值(如上例中的第一行,key為0,value為[Tom,Lucy]),在shuffle過程中,我們的值如果有相同的key,會merge到一起(這一點很重要!)。我們利用shuffle階段的特性,merge到一組的數據夠成一組關系,然后我們在這組關系中想辦法區分晚輩和長輩,最后對merge里的value一一作處理,分離出grandchild和grandparent的關系。 例如,Tom Lucy傳入處理后我們將其反轉,成為Lucy Tom輸出。當然,輸出的時候,為了達到join的效果,我們要輸出兩份,因為join要兩個表,一個表為L1:child parent,一個表為L2:child parent,為了達到關聯的目的和利用shuffle階段的特性,我們需要將L1反轉,把parent放在前面,這樣L1表中的parent和L2表中的child如果字段是相同的那么在shuffle階段就能merge到一起。還有,為了區分merge到一起后如何區分child和parent,我們把L1表中反轉后的child(未來的 grandchild)字段后面加一個1,L2表中parent(未來的grandparent)字段后加2。
1 package com.test.join;
2
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.Iterator;
6
7 import org.apache.hadoop.conf.Configuration;
8 import org.apache.hadoop.fs.Path;
9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15
16 public class STJoin {
17
18 public static class STJoinMapper
extends Mapper<Object, Text, Text, Text>
{
19
20 @Override
21 protected void map(Object key, Text value, Context context)
22 throws IOException, InterruptedException {
23 // TODO Auto-generated method stub
24 String[] rela = value.toString().trim().split(" ",2
);
25 if (rela.length!=2
)
26 return ;
27 String child = rela[0
];
28 String parent = rela[1
];
29 context.write(
new Text(parent),
new Text((child+"1"
)));
30 context.write(
new Text(child),
new Text((parent+"2"
)));
31
32 }
33
34 }
35 public static class STJoinReducer
extends Reducer<Text, Text, Text, Text>
{
36
37 @Override
38 protected void reduce(Text arg0, Iterable<Text>
arg1,Context context)
39 throws IOException, InterruptedException {
40 // TODO Auto-generated method stub
41 ArrayList<String> grandParent =
new ArrayList<>
();
42 ArrayList<String> grandChild =
new ArrayList<>
();
43 Iterator<Text> iterator =
arg1.iterator();
44 while (iterator.hasNext()){
45 String text =
iterator.next().toString();
46 if (text.endsWith("1"
))
47 grandChild.add(text.substring(0, text.length()-1
));
48 if (text.endsWith("2"
))
49 grandParent.add(text.substring(0, text.length()-1
));
50 }
51
52 for (String grandparent:grandParent){
53 for (String grandchild:grandChild){
54 context.write(
new Text(grandchild),
new Text(grandparent));
55 }
56 }
57 }
58 }
59
60
61 public static void main(String args[])
throws IOException, ClassNotFoundException, InterruptedException {
62 Configuration conf =
new Configuration();
63 Job job =
new Job(conf,"STJoin"
);
64 job.setMapperClass(STJoinMapper.
class );
65 job.setReducerClass(STJoinReducer.
class );
66 job.setOutputKeyClass(Text.
class );
67 job.setOutputValueClass(Text.
class );
68 FileInputFormat.addInputPath(job,
new Path("hdfs://localhost:9000/user/hadoop/STJoin/joinFile"
));
69 FileOutputFormat.setOutputPath(job,
new Path("hdfs://localhost:9000/user/hadoop/STJoin/joinResult"
));
70
71 System.exit(job.waitForCompletion(
true )?0:1
);
72 }
73 }
?
Richard LiHua
Lily Well
Lily MillShell
Havid LiT
Tom Lilei
Tom SuSan
Tom Lili
Tom David ?以上代碼在hadoop1.0.3平臺實現
轉載于:https://www.cnblogs.com/gslyyq/p/mapreduce.html
總結
以上是生活随笔 為你收集整理的【原创】MapReduce编程系列之表连接 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。