日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop大数据——mapreduce的join算法

發布時間:2025/1/21 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop大数据——mapreduce的join算法 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

(1)Reduce side join
示例:
訂單數據
商品信息
實現機制:
通過將關聯的條件作為map輸出的key,將兩表滿足join條件的數據并攜帶數據所來源的文件信息,發往同一個reduce task,在reduce中進行數據的串聯

public class OrderJoin {static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到一行數據,并且要分辨出這行數據所屬的文件String line = value.toString();String[] fields = line.split("\t");// 拿到itemidString itemid = fields[0];// 獲取到這一行所在的文件名(通過inpusplit)String name = "你拿到的文件名";// 根據文件名,切分出各字段(如果是a,切分出兩個字段,如果是b,切分出3個字段)OrderJoinBean bean = new OrderJoinBean();bean.set(null, null, null, null, null);context.write(new Text(itemid), bean);}}static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {//拿到的key是某一個itemid,比如1000//拿到的beans是來自于兩類文件的bean// {1000,amount} {1000,amount} {1000,amount} --- {1000,price,name}//將來自于b文件的bean里面的字段,跟來自于a的所有bean進行字段拼接并輸出}} }

缺點:reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高
容易產生數據傾斜
注:也可利用二次排序的邏輯來實現reduce端join
(2)Map side join
–原理闡述
適用于關聯表中有小表的情形;
可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join并輸出最終結果
可以大大提高join操作的并發度,加快處理速度
–示例:先在mapper類中預先定義好小表,進行join
–引入實際場景中的解決方案:一次加載數據庫或者用distributedcache

public class TestDistributedCache {static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{FileReader in = null;BufferedReader reader = null;HashMap<String,String> b_tab = new HashMap<String, String>();String localpath =null;String uirpath = null;//是在map任務初始化的時候調用一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗證用Path[] files = context.getLocalCacheFiles();localpath = files[0].toString();URI[] cacheFiles = context.getCacheFiles();//緩存文件的用法——直接用本地IO來讀取//這里讀的數據是map task所在機器本地工作目錄中的一個小文件in = new FileReader("b.txt");reader =new BufferedReader(in);String line =null;while(null!=(line=reader.readLine())){String[] fields = line.split(",");b_tab.put(fields[0],fields[1]);}IOUtils.closeStream(reader);IOUtils.closeStream(in);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//這里讀的是這個map task所負責的那一個切片數據(在hdfs上)String[] fields = value.toString().split("\t");String a_itemid = fields[0];String a_amount = fields[1];String b_name = b_tab.get(a_itemid);// 輸出結果 1001 98.9 banancontext.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(TestDistributedCache.class);job.setMapperClass(TestDistributedCacheMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//這里是我們正常的需要處理的數據所在路徑FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//不需要reducerjob.setNumReduceTasks(0);//分發一個文件到task進程的工作目錄job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));//分發一個歸檔文件到task進程的工作目錄 // job.addArchiveToClassPath(archive);//分發jar包到task節點的classpath下 // job.addFileToClassPath(jarfile);job.waitForCompletion(true);} }

總結

以上是生活随笔為你收集整理的Hadoop大数据——mapreduce的join算法的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。