日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hadoop--Map Join

發布時間:2025/3/17 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop--Map Join 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • Map Join
    • 使用場景
    • 優點
    • 具體辦法:采用 DistributedCache
  • Map Join 案例
    • 需求
    • 需求分析
  • 源碼

Map Join

使用場景

Map Join 適用于一張表十分小、一張表很大的場景。

優點

思考:在 Reduce 端處理過多的表,非常容易產生數據傾斜。怎么辦?
在 Map 端緩存多張表,提前處理業務邏輯,這樣增加 Map 端業務,減少 Reduce 端數
據的壓力,盡可能的減少數據傾斜。

具體辦法:采用 DistributedCache

  • 在 Mapper 的 setup 階段,將文件讀取到緩存集合中;
  • 在 Driver 驅動類中加載緩存。
  • 緩存普通文件到 Task 運行節點:

    job.addCacheFile(new URI("file:///e:/cache/pd.txt"));

    如果是集群運行,需要設置 HDFS 路徑:

    job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

    Map Join 案例

    需求

    :同Reduce Join案例;

    需求分析

    MapJoin 適用于關聯表中有小表的情形。

    Map端表合并案例分析(Distributedcache)

    源碼

    MapJoinMapper類

    package com.xiaobai.mapreduce.mapjoin;import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap;public class MapJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {private HashMap<String, String> pdMap = new HashMap<>();private Text outK = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//獲取緩存的文件,并把文件內容封裝到集合 pd.txtURI[] cacheFiles = context.getCacheFiles();FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));//從流中讀取數據BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));String line;while(StringUtils.isNotEmpty(line = reader.readLine())){//切割String[] fields = line.split("\t");//賦值pdMap.put(fields[0],fields[1]);}//關流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//處理order.txtString line = value.toString();String[] fields = line.split("\t");//獲取pidString pname = pdMap.get(fields[1]);//獲取訂單id和訂單數量//封裝outK.set(fields[0] + "\t" + pname + "\t" + fields[2]);context.write(outK,NullWritable.get());} }

    MapJoinDriver類

    package com.xiaobai.mapreduce.mapjoin;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {//1. 獲取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2. 設置加載jar包路徑job.setJarByClass(MapJoinDriver.class);//3. 關聯mapperjob.setMapperClass(MapJoinMapper.class);//4.設置Map輸出kv類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//5.設置最終輸出kv類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//加載緩存數據job.addCacheFile(new URI("/Users/jane/Desktop/test/JoinTest/pd.txt"));//Map端join的邏輯不需要reduce階段,設置reduceTask數量為0job.setNumReduceTasks(0);//6. 設置輸入輸出路徑FileInputFormat.setInputPaths(job,new Path("/Users/jane/Desktop/test/JoinTest"));FileOutputFormat.setOutputPath(job,new Path("/Users/jane/Desktop/hadoop/MapJoinTestOutput"));//7. 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);} }

    總結

    以上是生活随笔為你收集整理的hadoop--Map Join的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。