生活随笔
收集整理的這篇文章主要介紹了
Hadoop大数据——mapreduce的join算法
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
(1)Reduce side join
示例:
訂單數據
商品信息
實現機制:
通過將關聯的條件作為map輸出的key,將兩表滿足join條件的數據并攜帶數據所來源的文件信息,發往同一個reduce task,在reduce中進行數據的串聯
public class OrderJoin {static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {@Overrideprotected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{String line
= value
.toString();String
[] fields
= line
.split("\t");String itemid
= fields
[0];String name
= "你拿到的文件名";OrderJoinBean bean
= new OrderJoinBean();bean
.set(null
, null
, null
, null
, null
);context
.write(new Text(itemid
), bean
);}}static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {@Overrideprotected void reduce(Text key
, Iterable
<OrderJoinBean> beans
, Context context
) throws IOException
, InterruptedException
{}}
}
缺點:reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高
容易產生數據傾斜
注:也可利用二次排序的邏輯來實現reduce端join
(2)Map side join
–原理闡述
適用于關聯表中有小表的情形;
可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join并輸出最終結果
可以大大提高join操作的并發度,加快處理速度
–示例:先在mapper類中預先定義好小表,進行join
–引入實際場景中的解決方案:一次加載數據庫或者用distributedcache
public class TestDistributedCache {static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{FileReader in
= null
;BufferedReader reader
= null
;HashMap
<String,String> b_tab
= new HashMap<String, String>();String localpath
=null
;String uirpath
= null
;@Overrideprotected void setup(Context context
) throws IOException
, InterruptedException
{Path
[] files
= context
.getLocalCacheFiles();localpath
= files
[0].toString();URI
[] cacheFiles
= context
.getCacheFiles();in
= new FileReader("b.txt");reader
=new BufferedReader(in
);String line
=null
;while(null
!=(line
=reader
.readLine())){String
[] fields
= line
.split(",");b_tab
.put(fields
[0],fields
[1]);}IOUtils
.closeStream(reader
);IOUtils
.closeStream(in
);}@Overrideprotected void map(LongWritable key
, Text value
, Context context
) throws IOException
, InterruptedException
{String
[] fields
= value
.toString().split("\t");String a_itemid
= fields
[0];String a_amount
= fields
[1];String b_name
= b_tab
.get(a_itemid
);context
.write(new Text(a_itemid
), new Text(a_amount
+ "\t" + ":" + localpath
+ "\t" +b_name
));}}public static void main(String
[] args
) throws Exception
{Configuration conf
= new Configuration();Job job
= Job
.getInstance(conf
);job
.setJarByClass(TestDistributedCache
.class);job
.setMapperClass(TestDistributedCacheMapper
.class);job
.setOutputKeyClass(Text
.class);job
.setOutputValueClass(LongWritable
.class);FileInputFormat
.setInputPaths(job
, new Path(args
[0]));FileOutputFormat
.setOutputPath(job
, new Path(args
[1]));job
.setNumReduceTasks(0);job
.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
job
.waitForCompletion(true);}
}
總結
以上是生活随笔為你收集整理的Hadoop大数据——mapreduce的join算法的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。