hadoop--Reduce Join
目錄
- Reduce Join
- Reduce Join案例
- 需求
- 需求分析
- 運行結(jié)果
- 缺點
- 源碼
Reduce Join
Map 端的主要工作:為來自不同表或文件的 key/value 對,打標(biāo)簽以區(qū)別不同來源的記 錄。然后用連接字段作為 key,其余部分和新加的標(biāo)志作為 value,最后進(jìn)行輸出;
Reduce 端的主要工作:在 Reduce 端以連接字段作為 key 的分組已經(jīng)完成,我們只需要 在每一個分組當(dāng)中將那些來源于不同文件的記錄(在 Map 階段已經(jīng)打標(biāo)志)分開,最后進(jìn) 行合并就 ok 了。
Reduce Join案例
需求
訂單表order:
產(chǎn)品表pd:
將上述兩表中的數(shù)據(jù)根據(jù)商品pid合并到訂單數(shù)據(jù)表中,要求呈現(xiàn)出如下圖:
需求分析
通過將關(guān)聯(lián)條件作為 Map 輸出的 key,將兩表滿足 Join 條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源
的文件信息,發(fā)往同一個 ReduceTask,在 Reduce 中進(jìn)行數(shù)據(jù)的串聯(lián)。
Reduce端表合并(數(shù)據(jù)傾斜)
運行結(jié)果
本地order.txt
本地pd.txt
輸出
缺點
缺點: 這種方式中,合并的操作是在 Reduce 階段完成,Reduce 端的處理壓力太大,Map節(jié)點的運算負(fù)載則很低,資源利用率不高,且在 Reduce 階段極易產(chǎn)生數(shù)據(jù)傾斜。
解決方案: Map 端實現(xiàn)數(shù)據(jù)合并==>
源碼
tips:
hadoop迭代器中使用了對象重用,即迭代時value始終指向一個內(nèi)存地址(引用值始終不變),改變的是引用指向的內(nèi)存地址中的數(shù)據(jù)。
TableBean類:
package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.io.Writable;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;public class TableBean implements Writable {private String id; //訂單idprivate String pid; //商品idprivate int amount; //商品數(shù)量private String pname; // 商品名稱private String flag; //標(biāo)記表 order pd//空餐構(gòu)造public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}//序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {//反序列化應(yīng)和序列化順序一致this.id = in.readUTF();this.pid = in.readUTF();this.amount = in.readInt();this.pname = in.readUTF();this.flag = in.readUTF();}@Overridepublic String toString() {// id pname amountreturn id + "\t" + pname + "\t" + amount;} }TableMapper類:
package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.File; import java.io.IOException;public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {private String fileName;private Text outK = new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//初始化 order pd//獲取文件名稱 一個文件只獲取一次FileSplit split = (FileSplit)context.getInputSplit();fileName = split.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1. 獲取一行String line = value.toString();//2.判斷是哪個文件的if(fileName.contains("order")){ //處理的是訂單表orderString[] split = line.split("\t");//封裝outK.set(split[1]);outV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2])); //要轉(zhuǎn)換為String類型outV.setPname("");outV.setFlag("order");}else{ //處理的是產(chǎn)品表pdString[] split = line.split("\t");outK.set(split[0]);outV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setPname(split[1]);outV.setFlag("pd");}//寫出context.write(outK,outV);} }TableReducer類:
package com.xiaobai.mapreduce.reduceJoin;import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList;public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {// 01 1001 1 order// 01 1004 4 order// 01 小米 pd//創(chuàng)建2個集合ArrayList<TableBean> orderBeans = new ArrayList<>();TableBean pdBean = new TableBean();//循環(huán)遍歷for (TableBean value : values) {if("order".equals(value.getFlag())){ //order表//創(chuàng)建臨時TableBean對象tmptableBeanTableBean tmptableBean = new TableBean();try {BeanUtils.copyProperties(tmptableBean,value); //使用工具類BeanUtils將value賦值給tmptableBean} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}orderBeans.add(tmptableBean);}else{ //pd表try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//循環(huán)遍歷orderBeans,賦值pdnamefor (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());//id相同context.write(orderBean,NullWritable.get());}} }TableDriver類:
package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.fs.Path;import java.io.IOException;public class TableDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new org.apache.hadoop.fs.Path("/Users/jane/Desktop/test/JoinTest"));FileOutputFormat.setOutputPath(job,new Path("/Users/jane/Desktop/hadoop/JoinTestOutput"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);} } 新人創(chuàng)作打卡挑戰(zhàn)賽發(fā)博客就能抽獎!定制產(chǎn)品紅包拿不停!總結(jié)
以上是生活随笔為你收集整理的hadoop--Reduce Join的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jQuery经典面试题及答案精选[转]
- 下一篇: 第六章 图 学习小结