日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce实现join操作

發布時間:2024/9/30 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce实现join操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
前陣子把MapReduce實現join操作的算法設想清楚了,但一直沒有在代碼層面落地。今天終于費了些功夫把整個流程走了一遭,期間經歷了諸多麻煩并最終得以將其一一搞定,再次深切體會到,什么叫從計算模型到算法實現還有很多路要走。

數據準備

首先是準備好數據。這個倒已經是一個熟練的過程,所要做的是把示例數據準備好,記住路徑和字段分隔符。 準備好下面兩張表: (1)m_ys_lab_jointest_a(以下簡稱表A) 建表語句為: [sql]?view plaincopyprint?
  • create?table?if?not?exists?m_ys_lab_jointest_a?(??
  • ?????id?bigint,??
  • ?????name?string??
  • )??
  • row?format?delimited??
  • fields?terminated?by?'9'??
  • lines?terminated?by?'10'??
  • stored?as?textfile;??
  • 數據:
    id???? name
    1???? 北京
    2???? 天津
    3???? 河北
    4???? 山西
    5???? 內蒙古
    6???? 遼寧
    7???? 吉林
    8???? 黑龍江

    (2)m_ys_lab_jointest_b(以下簡稱表B) 建表語句為: [sql]?view plaincopyprint?
  • create?table?if?not?exists?m_ys_lab_jointest_b?(??
  • ?????id?bigint,??
  • ?????statyear?bigint,??
  • ?????num?bigint??
  • )??
  • row?format?delimited??
  • fields?terminated?by?'9'??
  • lines?terminated?by?'10'??
  • stored?as?textfile;??
  • 數據:
    id ? ? statyear???? num
    1???? 2010???? 1962
    1???? 2011???? 2019
    2???? 2010???? 1299
    2???? 2011???? 1355
    4???? 2010???? 3574
    4???? 2011???? 3593
    9???? 2010???? 2303
    9???? 2011???? 2347

    我們的目的是,以id為key做join操作,得到以下表: m_ys_lab_jointest_ab
    id???? name ? ?statyear?????num
    1?????? 北京??? 2011??? 2019
    1?????? 北京??? 2010??? 1962
    2?????? 天津??? 2011??? 1355
    2?????? 天津??? 2010??? 1299
    4?????? 山西??? 2011??? 3593
    4?????? 山西??? 2010??? 3574

    計算模型

    整個計算過程是: (1)在map階段,把所有記錄標記成<key, value>的形式,其中key是id,value則根據來源不同取不同的形式:來源于表A的記錄,value的值為"a#"+name;來源于表B的記錄,value的值為"b#"+score。 (2)在reduce階段,先把每個key下的value列表拆分為分別來自表A和表B的兩部分,分別放入兩個向量中。然后遍歷兩個向量做笛卡爾積,形成一條條最終結果。 如下圖所示:

    代碼

    代碼如下: [java]?view plaincopyprint?
  • import?java.io.IOException;??
  • import?java.util.HashMap;??
  • import?java.util.Iterator;??
  • import?java.util.Vector;??
  • ??
  • import?org.apache.hadoop.io.LongWritable;??
  • import?org.apache.hadoop.io.Text;??
  • import?org.apache.hadoop.io.Writable;??
  • import?org.apache.hadoop.mapred.FileSplit;??
  • import?org.apache.hadoop.mapred.JobConf;??
  • import?org.apache.hadoop.mapred.MapReduceBase;??
  • import?org.apache.hadoop.mapred.Mapper;??
  • import?org.apache.hadoop.mapred.OutputCollector;??
  • import?org.apache.hadoop.mapred.RecordWriter;??
  • import?org.apache.hadoop.mapred.Reducer;??
  • import?org.apache.hadoop.mapred.Reporter;??
  • ??
  • /**?
  • ?*?MapReduce實現Join操作?
  • ?*/??
  • public?class?MapRedJoin?{??
  • ????public?static?final?String?DELIMITER?=?"\u0009";?//?字段分隔符??
  • ??????
  • ????//?map過程??
  • ????public?static?class?MapClass?extends?MapReduceBase?implements??
  • ????????????Mapper<LongWritable,?Text,?Text,?Text>?{??
  • ??????????????????????????
  • ????????public?void?configure(JobConf?job)?{??
  • ????????????super.configure(job);??
  • ????????}??
  • ??????????
  • ????????public?void?map(LongWritable?key,?Text?value,?OutputCollector<Text,?Text>?output,??
  • ????????????????Reporter?reporter)?throws?IOException,?ClassCastException?{??
  • ????????????//?獲取輸入文件的全路徑和名稱??
  • ????????????String?filePath?=?((FileSplit)reporter.getInputSplit()).getPath().toString();??
  • ????????????//?獲取記錄字符串??
  • ????????????String?line?=?value.toString();??
  • ????????????//?拋棄空記錄??
  • ????????????if?(line?==?null?||?line.equals(""))?return;???
  • ??????????????
  • ????????????//?處理來自表A的記錄??
  • ????????????if?(filePath.contains("m_ys_lab_jointest_a"))?{??
  • ????????????????String[]?values?=?line.split(DELIMITER);?//?按分隔符分割出字段??
  • ????????????????if?(values.length?<?2)?return;??
  • ??????????????????
  • ????????????????String?id?=?values[0];?//?id??
  • ????????????????String?name?=?values[1];?//?name??
  • ??????????????????
  • ????????????????output.collect(new?Text(id),?new?Text("a#"+name));??
  • ????????????}??
  • ????????????//?處理來自表B的記錄??
  • ????????????else?if?(filePath.contains("m_ys_lab_jointest_b"))?{??
  • ????????????????String[]?values?=?line.split(DELIMITER);?//?按分隔符分割出字段??
  • ????????????????if?(values.length?<?3)?return;??
  • ??????????????????
  • ????????????????String?id?=?values[0];?//?id??
  • ????????????????String?statyear?=?values[1];?//?statyear??
  • ????????????????String?num?=?values[2];?//num??
  • ??????????????????
  • ????????????????output.collect(new?Text(id),?new?Text("b#"+statyear+DELIMITER+num));??
  • ????????????}??
  • ????????}??
  • ????}??
  • ??????
  • ????//?reduce過程??
  • ????public?static?class?Reduce?extends?MapReduceBase??
  • ????????????implements?Reducer<Text,?Text,?Text,?Text>?{??
  • ????????public?void?reduce(Text?key,?Iterator<Text>?values,??
  • ????????????????OutputCollector<Text,?Text>?output,?Reporter?reporter)??
  • ????????????????throws?IOException?{??
  • ??????????????????????
  • ????????????Vector<String>?vecA?=?new?Vector<String>();?//?存放來自表A的值??
  • ????????????Vector<String>?vecB?=?new?Vector<String>();?//?存放來自表B的值??
  • ??????????????
  • ????????????while?(values.hasNext())?{??
  • ????????????????String?value?=?values.next().toString();??
  • ????????????????if?(value.startsWith("a#"))?{??
  • ????????????????????vecA.add(value.substring(2));??
  • ????????????????}?else?if?(value.startsWith("b#"))?{??
  • ????????????????????vecB.add(value.substring(2));??
  • ????????????????}??
  • ????????????}??
  • ??????????????
  • ????????????int?sizeA?=?vecA.size();??
  • ????????????int?sizeB?=?vecB.size();??
  • ??????????????
  • ????????????//?遍歷兩個向量??
  • ????????????int?i,?j;??
  • ????????????for?(i?=?0;?i?<?sizeA;?i?++)?{??
  • ????????????????for?(j?=?0;?j?<?sizeB;?j?++)?{??
  • ????????????????????output.collect(key,?new?Text(vecA.get(i)?+?DELIMITER?+vecB.get(j)));??
  • ????????????????}??
  • ????????????}?????
  • ????????}??
  • ????}??
  • ??????
  • ????protected?void?configJob(JobConf?conf)?{??
  • ????????conf.setMapOutputKeyClass(Text.class);??
  • ????????conf.setMapOutputValueClass(Text.class);??
  • ????????conf.setOutputKeyClass(Text.class);??
  • ????????conf.setOutputValueClass(Text.class);??
  • ????????conf.setOutputFormat(ReportOutFormat.class);??
  • ????}??
  • }??
  • 技術細節

    下面說一下其中的若干技術細節: (1)由于輸入數據涉及兩張表,我們需要判斷當前處理的記錄是來自表A還是來自表B。Reporter類getInputSplit()方法可以獲取輸入數據的路徑,具體代碼如下: String?filePath?= ((FileSplit)reporter.getInputSplit()).getPath().toString(); (2)map的輸出的結果,同id的所有記錄(不管來自表A還是表B)都在同一個key下保存在同一個列表中,在reduce階段需要將其拆開,保存為相當于笛卡爾積的m x n條記錄。由于事先不知道m、n是多少,這里使用了兩個向量(可增長數組)來分別保存來自表A和表B的記錄,再用一個兩層嵌套循環組織出我們需要的最終結果。 (3)在MapReduce中可以使用System.out.println()方法輸出,以方便調試。不過System.out.println()的內容不會在終端顯示,而是輸出到了stdout和stderr這兩個文件中,這兩個文件位于logs/userlogs/attempt_xxx目錄下。可以通過web端的歷史job查看中的“Analyse This Job”來查看stdout和stderr的內容。

    總結

    以上是生活随笔為你收集整理的MapReduce实现join操作的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。