日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hadoop join

發布時間:2023/12/10 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop join 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在介紹這個實例之前,請各位參考:http://bjyjtdj.iteye.com/blog/1453410。

reduce side join是一種最簡單的join方式,其主要思想如下:
?在map階段,map函數同時讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對,對每條數據打一個標簽(tag),比如:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件中的數據打標簽。在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 然后對于同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的連接操作。在這個例子中我們假設有兩個數據文件如下:

user.csv文件:

"ID","NAME","SEX"
"1","user1","0"
"2","user2","0"
"3","user3","0"
"4","user4","1"
"5","user5","0"
"6","user6","0"
"7","user7","1"
"8","user8","0"
"9","user9","0"

order.csv文件:

"USER_ID","NAME"
"1","order1"
"2","order2"
"3","order3"
"4","order4"
"7","order7"
"8","order8"
"9","order9"


目前網上的例子大多是基于0.20以前版本的API寫的,所以咱們采用新的API來寫,具體代碼如下:

public class MyJoin {public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{//最好在map方法外定義變量,以減少map計算時創建對象的個數private Text key = new Text();private Text value = new Text();private String[] keyValue = null;@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{//采用的數據輸入格式是TextInputFormat,//文件被分為一系列以換行或者制表符結束的行,//key是每一行的位置(偏移量,LongWritable類型),//value是每一行的內容,Text類型,所有我們要把key從value中解析出來keyValue = value.toString().split(",", 2);this.key.set(keyValue[0]);this.value.set(keyValue[1]);context.write(this.key, this.value);}}public static class Reduce extends Reducer<Text, Text, Text, Text>{//最好在reduce方法外定義變量,以減少reduce計算時創建對象的個數private Text value = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{StringBuilder valueStr = new StringBuilder();//values中的每一個值是不同數據文件中的具有相同key的值//即是map中輸出的多個文件相同key的value值集合for(Text val : values){valueStr.append(val);valueStr.append(",");}this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());context.write(key, this.value);}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "MyJoin");job.setJarByClass(MyJoin.class);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);//job.setCombinerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//分別采用TextInputFormat和TextOutputFormat作為數據的輸入和輸出格式//如果不設置,這也是Hadoop默認的操作方式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }

轉發:https://blog.csdn.net/huashetianzu/article/details/7819244

總結

以上是生活随笔為你收集整理的hadoop join的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。