hadoop--Map Join
生活随笔
收集整理的這篇文章主要介紹了
hadoop--Map Join
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
目錄
- Map Join
- 使用場景
- 優點
- 具體辦法:采用 DistributedCache
- Map Join 案例
- 需求
- 需求分析
- 源碼
Map Join
使用場景
Map Join 適用于一張表十分小、一張表很大的場景。
優點
思考:在 Reduce 端處理過多的表,非常容易產生數據傾斜。怎么辦?
在 Map 端緩存多張表,提前處理業務邏輯,這樣增加 Map 端業務,減少 Reduce 端數
據的壓力,盡可能的減少數據傾斜。
具體辦法:采用 DistributedCache
緩存普通文件到 Task 運行節點:
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));如果是集群運行,需要設置 HDFS 路徑:
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));Map Join 案例
需求
:同Reduce Join案例;
需求分析
MapJoin 適用于關聯表中有小表的情形。
Map端表合并案例分析(Distributedcache)
源碼
MapJoinMapper類
package com.xiaobai.mapreduce.mapjoin;import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap;public class MapJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {private HashMap<String, String> pdMap = new HashMap<>();private Text outK = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//獲取緩存的文件,并把文件內容封裝到集合 pd.txtURI[] cacheFiles = context.getCacheFiles();FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));//從流中讀取數據BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));String line;while(StringUtils.isNotEmpty(line = reader.readLine())){//切割String[] fields = line.split("\t");//賦值pdMap.put(fields[0],fields[1]);}//關流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//處理order.txtString line = value.toString();String[] fields = line.split("\t");//獲取pidString pname = pdMap.get(fields[1]);//獲取訂單id和訂單數量//封裝outK.set(fields[0] + "\t" + pname + "\t" + fields[2]);context.write(outK,NullWritable.get());} }MapJoinDriver類
package com.xiaobai.mapreduce.mapjoin;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {//1. 獲取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2. 設置加載jar包路徑job.setJarByClass(MapJoinDriver.class);//3. 關聯mapperjob.setMapperClass(MapJoinMapper.class);//4.設置Map輸出kv類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//5.設置最終輸出kv類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//加載緩存數據job.addCacheFile(new URI("/Users/jane/Desktop/test/JoinTest/pd.txt"));//Map端join的邏輯不需要reduce階段,設置reduceTask數量為0job.setNumReduceTasks(0);//6. 設置輸入輸出路徑FileInputFormat.setInputPaths(job,new Path("/Users/jane/Desktop/test/JoinTest"));FileOutputFormat.setOutputPath(job,new Path("/Users/jane/Desktop/hadoop/MapJoinTestOutput"));//7. 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);} }總結
以上是生活随笔為你收集整理的hadoop--Map Join的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C#数据结构-广义表和递归
- 下一篇: PART 5: INTEGRATING