日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java关联查询实战_MapReduce实战(五)实现关联查询

發(fā)布時間:2025/3/15 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java关联查询实战_MapReduce实战(五)实现关联查询 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

需求:

利用MapReduce程序,實現(xiàn)SQL語句中的join關(guān)聯(lián)查詢。

訂單數(shù)據(jù)表order:

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

1003

20150710

P0003

4

商品信息表product:

pid

pname

category_id

price

P0001

小米6

1000

2499

P0002

錘子T3

1001

2500

P0003

三星S8

1002

6999

假如數(shù)據(jù)量巨大,兩表的數(shù)據(jù)是以文件的形式存儲在HDFS中,需要用mapreduce程序來實現(xiàn)一下SQL查詢運算:

select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

分析:

通過將關(guān)聯(lián)的條件作為map輸出的key,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個reduce task,在reduce中進行數(shù)據(jù)的串聯(lián)。

實現(xiàn):

首先,我們將表中的數(shù)據(jù)轉(zhuǎn)換成我們需要的格式:

order.txt:

1001,20150710,P0001,2

1002,20150710,P0001,3

1002,20150710,P0002,3

1003,20150710,P0003,4

product.txt:

P0001,小米6,1000,2499P0002,錘子T3,1001,2500P0003,三星S8,1002,6999

并且導(dǎo)入到HDFS的/join/srcdata目錄下面。

因為我們有兩種格式的文件,所以在map階段需要根據(jù)文件名進行一下判斷,不同的文案進行不同的處理。同理,在reduce階段我們也要針對同一key(pid)的不同種類數(shù)據(jù)進行判斷,是通過判斷id是否為空字符串進行判斷的。

InfoBean.java:

packagecom.darrenchan.mr.bean;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importorg.apache.hadoop.io.Writable;/*** id date pid amount pname category_id price

*

*@authorchenchi

**/

public class InfoBean implementsWritable {private String id;//訂單id

privateString date;private String pid;//產(chǎn)品id

privateString amount;privateString pname;privateString category_id;privateString price;publicInfoBean() {

}publicInfoBean(String id, String date, String pid, String amount, String pname, String category_id, String price) {super();this.id =id;this.date =date;this.pid =pid;this.amount =amount;this.pname =pname;this.category_id =category_id;this.price =price;

}publicString getId() {returnid;

}public voidsetId(String id) {this.id =id;

}publicString getDate() {returndate;

}public voidsetDate(String date) {this.date =date;

}publicString getPid() {returnpid;

}public voidsetPid(String pid) {this.pid =pid;

}publicString getAmount() {returnamount;

}public voidsetAmount(String amount) {this.amount =amount;

}publicString getPname() {returnpname;

}public voidsetPname(String pname) {this.pname =pname;

}publicString getCategory_id() {returncategory_id;

}public voidsetCategory_id(String category_id) {this.category_id =category_id;

}publicString getPrice() {returnprice;

}public voidsetPrice(String price) {this.price =price;

}

@OverridepublicString toString() {return "InfoBean [id=" + id + ", date=" + date + ", pid=" + pid + ", amount=" + amount + ", pname=" +pname+ ", category_id=" + category_id + ", price=" + price + "]";

}/*** id date pid amount pname category_id price*/@Overridepublic void readFields(DataInput in) throwsIOException {

id=in.readUTF();

date=in.readUTF();

pid=in.readUTF();

amount=in.readUTF();

pname=in.readUTF();

category_id=in.readUTF();

price=in.readUTF();

}

@Overridepublic void write(DataOutput out) throwsIOException {

out.writeUTF(id);

out.writeUTF(date);

out.writeUTF(pid);

out.writeUTF(amount);

out.writeUTF(pname);

out.writeUTF(category_id);

out.writeUTF(price);

}

}

Join.java:

packagecom.darrenchan.mr.join;importjava.io.IOException;importjava.lang.reflect.InvocationTargetException;importjava.util.ArrayList;importjava.util.List;importorg.apache.commons.beanutils.BeanUtils;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importcom.darrenchan.mr.bean.InfoBean;public classJoin {/*** Mapper類

*@authorchenchi

**/

public static class JoinMapper extends Mapper{//提前在這里new一個對象,剩下的就是改變它的值,不至于在map方法中創(chuàng)建出大量的InfoBean對象

InfoBean infoBean = newInfoBean();

Text text= new Text();//理由同上

@Overrideprotected voidmap(LongWritable key, Text value, Context context)throwsIOException, InterruptedException {//首先,要判斷文件名稱,讀的是訂單數(shù)據(jù)還是商品數(shù)據(jù)

FileSplit inputSplit =(FileSplit) context.getInputSplit();

String name= inputSplit.getPath().getName();//文件名稱

if(name.startsWith("order")){//來自訂單數(shù)據(jù)

String line =value.toString();

String[] fields= line.split(",");

String id= fields[0];

String date= fields[1];

String pid= fields[2];

String amount= fields[3];

infoBean.setId(id);

infoBean.setDate(date);

infoBean.setPid(pid);

infoBean.setAmount(amount);//對于訂單數(shù)據(jù)來說,后面三個屬性都置為""//之所以不置為null,是因為其要進行序列化和反序列化

infoBean.setPname("");

infoBean.setCategory_id("");

infoBean.setPrice("");

text.set(pid);

context.write(text, infoBean);

}else{//來自商品數(shù)據(jù)

String line =value.toString();

String[] fields= line.split(",");

String pid= fields[0];

String pname= fields[1];

String category_id= fields[2];

String price= fields[3];

infoBean.setPname(pname);

infoBean.setCategory_id(category_id);

infoBean.setPrice(price);

infoBean.setPid(pid);//對于訂單數(shù)據(jù)來說,后面三個屬性都置為""//之所以不置為null,是因為其要進行序列化和反序列化

infoBean.setId("");

infoBean.setDate("");

infoBean.setAmount("");

text.set(pid);

context.write(text, infoBean);

}

}

}public static class JoinReducer extends Reducer{//訂單數(shù)據(jù)中一個pid會有多條數(shù)據(jù)//商品數(shù)據(jù)中一個pid只有一條

@Overrideprotected void reduce(Text key, Iterable values, Context context) throwsIOException, InterruptedException {

List list = new ArrayList();//存儲訂單數(shù)據(jù)中的多條

InfoBean info = new InfoBean();//存儲商品數(shù)據(jù)中的一條

for(InfoBean infoBean : values) {if(!"".equals(infoBean.getId())){//來自訂單數(shù)據(jù)

InfoBean infoBean2 = newInfoBean();try{

BeanUtils.copyProperties(infoBean2, infoBean);

}catch(Exception e) {

e.printStackTrace();

}

list.add(infoBean2);

}else{//來自商品數(shù)據(jù)

try{

BeanUtils.copyProperties(info, infoBean);

}catch (IllegalAccessException |InvocationTargetException e) {

e.printStackTrace();

}

}

}for(InfoBean infoBean : list) {

infoBean.setPname(info.getPname());

infoBean.setCategory_id(info.getCategory_id());

infoBean.setPrice(info.getPrice());

context.write(infoBean, NullWritable.get());

}

}

}public static void main(String[] args) throwsException {

Configuration conf= newConfiguration();

Job job=Job.getInstance(conf);

job.setJarByClass(Join.class);

job.setMapperClass(JoinMapper.class);

job.setReducerClass(JoinReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(InfoBean.class);

job.setOutputKeyClass(InfoBean.class);

job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

注:這里有一個地方需要注意,就是reduce方法的Iterable values,一定要new 新對象,不能直接賦值,因為迭代器的內(nèi)容在不斷變化。

執(zhí)行指令:hadoop jar mywc.jar cn.darrenchan.hadoop.mr.wordcount.WCRunner /wc/src /wc/output

運行效果:

但是呢?這種方式是有缺陷的,什么缺陷呢?

這種方式中,join的操作是在reduce階段完成,reduce端的處理壓力太大,map節(jié)點的運算負載則很低,資源利用率不高,且在reduce階段極易產(chǎn)生數(shù)據(jù)傾斜。什么叫數(shù)據(jù)傾斜呢?比如在中國買小米6的人特別多,三星S8的人特別少,匯總的時候,當匯總小米6的pid的時候就運算壓力特別大,而S8的pid的時候運算壓力就特別小,顯然負載不均衡。

那么我們應(yīng)該用什么方法進行解決呢?就是map端join實現(xiàn)方式了。

我們將業(yè)務(wù)操作移到了map端,reduce甚至可以不用了,因為商品表一般內(nèi)容不多,所以我們可以提前加載到內(nèi)存中,運行map方法的時候直接查找即可,利用了MapReduce的分布式緩存。

代碼如下:

packagecom.darrenchan.mr.mapedjoin;importjava.io.BufferedReader;importjava.io.File;importjava.io.FileInputStream;importjava.io.IOException;importjava.io.InputStreamReader;importjava.net.URI;importjava.net.URISyntaxException;importjava.util.HashMap;importjava.util.Map;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FSDataInputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importcom.darrenchan.mr.bean.InfoBean;public classMapedJoin {public static class MapedJoinMapper extends Mapper{//用一個map來存儲商品信息表

private Map map = new HashMap<>();//提前在這里new一個對象,剩下的就是改變它的值,不至于在map方法中創(chuàng)建出大量的InfoBean對象

InfoBean infoBean = newInfoBean();

@Overrideprotected void setup(Context context) throwsIOException, InterruptedException {//因為已經(jīng)加載到本地目錄了,所以可以本地讀取

FileInputStream inputStream = new FileInputStream(new File("product.txt"));

InputStreamReader isr= newInputStreamReader(inputStream);

BufferedReader br= newBufferedReader(isr);

String line= null;while ((line = br.readLine()) != null) {

String[] fields= line.split(",");

map.put(fields[0], line);

}

br.close();

}

@Overrideprotected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {//判斷文件類型,就不用讀取商品數(shù)據(jù)了

FileSplit inputSplit =(FileSplit) context.getInputSplit();

String name=inputSplit.getPath().getName();if (name.startsWith("order")) {

String line=value.toString();

String[] fields= line.split(",");

String id= fields[0];

String date= fields[1];

String pid= fields[2];

String amount= fields[3];

infoBean.setId(id);

infoBean.setDate(date);

infoBean.setPid(pid);

infoBean.setAmount(amount);

String product=map.get(pid);

String[] splits= product.split(",");

String pname= splits[1];

String category_id= splits[2];

String price= splits[3];

infoBean.setPname(pname);

infoBean.setCategory_id(category_id);

infoBean.setPrice(price);

context.write(infoBean, NullWritable.get());

}

}

}public static void main(String[] args) throwsException {

Configuration conf= newConfiguration();

Job job=Job.getInstance(conf);

job.setJarByClass(MapedJoin.class);

job.setMapperClass(MapedJoinMapper.class);

job.setMapOutputKeyClass(InfoBean.class);

job.setMapOutputValueClass(NullWritable.class);//map端join的邏輯不需要reduce階段,設(shè)置reducetask數(shù)量為0//因為即便不寫reduce,它也默認啟動一個reduce

job.setNumReduceTasks(0);//指定需要緩存一個文件到所有的maptask運行節(jié)點工作目錄

/*job.addArchiveToClassPath(archive);*///緩存jar包到task運行節(jié)點的classpath中

/*job.addFileToClassPath(file);*///緩存普通文件到task運行節(jié)點的classpath中

/*job.addCacheArchive(uri);*///緩存壓縮包文件到task運行節(jié)點的工作目錄

/*job.addCacheFile(uri)*///緩存普通文件到task運行節(jié)點的工作目錄//將產(chǎn)品表文件緩存到task工作節(jié)點的工作目錄中去//就可以直接本地讀取了

job.addCacheFile(new URI("/join/srcdata/product.txt"));

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));boolean b = job.waitForCompletion(true);

System.exit(b? 0 : 1);

}

}

結(jié)果同上。

總結(jié)

以上是生活随笔為你收集整理的java关联查询实战_MapReduce实战(五)实现关联查询的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。