日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Hadoop基础-MapReduce的Combiner用法案例

發布時間:2024/1/3 综合教程 23 生活家
生活随笔 收集整理的這篇文章主要介紹了 Hadoop基础-MapReduce的Combiner用法案例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

                      Hadoop基礎-MapReduce的Combiner用法案例

                                              作者:尹正杰

版權聲明:原創作品,謝絕轉載!否則將追究法律責任。

一.編寫年度最高氣溫統計

  如上圖說所示:有一個temp的文件,里面存放的是每年的數據,該數據全部是文本內容,大小2M左右,我已將他放在百度云(鏈接:https://pan.baidu.com/s/1CEcHAXlII2kKxbn1dmTPKA 密碼:jgp0),當你下載后,看到該文件的第15列到19列存放的是年份,而第87列到92列存放的是溫度,注意999是無效值,需要排除! 最終測試實驗結果如下:

  其實這個跟我上次寫的wordCount如出一轍,只需要稍微改動一下,就可以輕松實現這個統計結果,具體代碼如下:

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.mapreduce.maxtemp;
 7 
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Mapper;
12 
13 import java.io.IOException;
14 
15 /**
16  *      我們定義的map端類為MaxTempMapper,它需要繼承“org.apache.hadoop.mapreduce.Mapper.Mapper”,
17  * 該Mapper有四個參數,前兩個參數是指定map端輸入key和value的數據類型,而后兩個參數是指定map端輸出
18  * key和value的數據類型。
19  */
20 public class MaxTempMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
21 
22     /**
23      *
24      * @param key               : 表示輸入的key變量。
25      * @param value             : 表示輸入的value
26      * @param context           : 表示map端的上下文,它是負責將map端數據傳給reduce。
27      */
28     @Override
29     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
30         //得到一行數據
31         String line = value.toString();
32         //得到年份
33         String year = line.substring(15, 19);
34         //得到氣溫
35         int temp = Integer.parseInt(line.substring(87, 92));
36         //判斷temp不能為9999
37         if (temp != 9999){
38             //通過上線文將yaer和temp發給reduce端
39             context.write(new Text(year),new IntWritable(temp));
40         }
41     }
42 }

MaxTempMapper.java 文件內容

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.mapreduce.maxtemp;
 7 
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import java.io.IOException;
12 
13 /**
14  *      我們定義的reduce端類為MaxTempReducer,它需要繼承“org.apache.hadoop.mapreduce.Reducer.Reducer”,
15  * 該Reducer有四個參數,前兩個參數是指定map端輸入key和value的數據類型,而后兩個參數是指定map端輸出
16  * key和value的數據類型。
17  */
18 public class MaxTempReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
19     /**
20      *
21      * @param key               :  表示輸入的key變量。這里的key實際上就是mapper端傳過來的year。
22      * @param values            : 表示輸入的value,這個變量是可迭代的,因此傳遞的是多個值。這個value實際上就是傳過來的temp。
23      * @param context           : 表示reduce端的上下文,它是負責將reduce端數據傳給調用者(調用者可以傳遞的數據輸出到文件,也可以輸出到下一個MapReduce程序。
24      */
25     @Override
26     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
27         //給max變量定義一個最小的int初始值方便用于比較
28         int max = Integer.MIN_VALUE;
29         //由于輸入端只有一個key,因此value的所有值都屬于這個key的,我們需要做的是對value進行遍歷并將所有數據進行相加操作,最終的結果就得到了同一個key的出現的次數。
30         for (IntWritable value : values){
31             //獲取到value的get方法獲取到value的值。然后和max進行比較,將較大的值重新賦值給max
32             max = Math.max(max,value.get());
33         }
34         //我們將key原封不動的返回,并將key的values的所有int類型的參數進行折疊,最終返回單詞書以及該單詞總共出現的次數。
35         context.write(key,new IntWritable(max));
36     }
37 }

MaxTempReducer.java 文件內容

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.mapreduce.maxtemp;
 7 
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.IntWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 import java.io.IOException;
17 
18 public class MaxTempApp {
19 
20     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
21         //實例化一個Configuration,它會自動去加載本地的core-site.xml配置文件的fs.defaultFS屬性。(該文件放在項目的resources目錄即可。)
22         Configuration conf = new Configuration();
23         //將hdfs寫入的路徑定義在本地,需要修改默認為文件系統,這樣就可以覆蓋到之前在core-site.xml配置文件讀取到的數據。
24         conf.set("fs.defaultFS","file:///");
25         //創建一個任務對象job,別忘記把conf穿進去喲!
26         Job job = Job.getInstance(conf);
27         //給任務起個名字
28         job.setJobName("WordCount");
29         //指定main函數所在的類,也就是當前所在的類名
30         job.setJarByClass(MaxTempApp.class);
31         //指定map的類名,這里指定咱們自定義的map程序即可
32         job.setMapperClass(MaxTempMapper.class);
33         //指定reduce的類名,這里指定咱們自定義的reduce程序即可
34         job.setReducerClass(MaxTempReducer.class);
35         //設置輸出key的數據類型
36         job.setOutputKeyClass(Text.class);
37         //設置輸出value的數據類型
38         job.setOutputValueClass(IntWritable.class);
39         //設置輸入路徑,需要傳遞兩個參數,即任務對象(job)以及輸入路徑
40         FileInputFormat.addInputPath(job,new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\MapReduce\temp"));
41         //初始化HDFS文件系統,此時我們需要把讀取到的fs.defaultFS屬性傳給fs對象。我的目的是調用該對象的delete方法,刪除已經存在的文件夾
42         FileSystem fs = FileSystem.get(conf);
43         //通過fs的delete方法可以刪除文件,第一個參數指的是刪除文件對象,第二參數是指遞歸刪除,一般用作刪除目錄
44         Path outPath = new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\MapReduce\out");
45         if (fs.exists(outPath)){
46             fs.delete(outPath,true);
47         }
48         //設置輸出路徑,需要傳遞兩個參數,即任務對象(job)以及輸出路徑
49         FileOutputFormat.setOutputPath(job,outPath);
50         //等待任務執行結束,將里面的值設置為true。
51         job.waitForCompletion(true);
52     }
53 }

MaxTempApp.java 文件內容

  關于MapReduce處理的大致流程,我畫了一個草圖,如下:

  上述代碼實現過程很簡單,用到了一個Map程序和一個Reduce程序,那么問題來了,不用Reduce程序也能實現相同的效果嗎?只用一個Map程序就把這個這個事情搞定可以嗎?答案是肯定的,我們只需用Combiner就可以幫我們實現,那什么是Combiner呢?Combiner就相當于Map端的Reduce,用于減少網絡間分發,屬于預聚合階段。Combiner適用場景:不適用于平均值,適用于最大值,最小值等等。接下來我們一起來研究研究它。

二.Combiner

1>.Combiner適用場景

  簡單來說:Combiner相當于Map端的Reduce,用于減少網絡間分發,屬于預聚合階段,不適用于平均值,適用于最大值,最小值等等。具體用法我都不啰嗦了,一切盡在注釋中!

2>.只有一個Map的情況

  在上面的截圖中我已經簡單的分析了MapReduce的大致關系,其實實際生成環境中一個Map和一個Reduce的情況并不能代表所有,而是很多情況都是多個Map和多個Reduce,為了方便說明,我這里就簡單的畫一個Map和一個Reduce的情況,如果想要了解單個Reduce或者多個Reduce以及沒有Re

3>.實現代碼

  MaxTempMapper.java 文件內容如下:
 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.mapreduce.maxtemp;
 7 
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Mapper;
12 
13 import java.io.IOException;
14 
15 /**
16  *      我們定義的map端類為MaxTempMapper,它需要繼承“org.apache.hadoop.mapreduce.Mapper.Mapper”,
17  * 該Mapper有四個參數,前兩個參數是指定map端輸入key和value的數據類型,而后兩個參數是指定map端輸出
18  * key和value的數據類型。
19  */
20 public class MaxTempMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
21 
22     /**
23      *
24      * @param key               : 表示輸入的key變量。
25      * @param value             : 表示輸入的value
26      * @param context           : 表示map端的上下文,它是負責將map端數據傳給reduce。
27      */
28     @Override
29     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
30         //得到一行數據
31         String line = value.toString();
32         //得到年份
33         String year = line.substring(15, 19);
34         //得到氣溫
35         int temp = Integer.parseInt(line.substring(87, 92));
36         //判斷temp不能為9999
37         if (temp != 9999){
38             //通過上線文將yaer和temp發給reduce端
39             context.write(new Text(year),new IntWritable(temp));
40         }
41     }
42 }
  MaxTempReducer.java文件內容如下:
 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.mapreduce.maxtemp;
 7 
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import java.io.IOException;
12 
13 /**
14  *      我們定義的reduce端類為MaxTempReducer,它需要繼承“org.apache.hadoop.mapreduce.Reducer.Reducer”,
15  * 該Reducer有四個參數,前兩個參數是指定map端輸入key和value的數據類型,而后兩個參數是指定map端輸出
16  * key和value的數據類型。
17  */
18 public class MaxTempReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
19     /**
20      *
21      * @param key               :  表示輸入的key變量。這里的key實際上就是mapper端傳過來的year。
22      * @param values            : 表示輸入的value,這個變量是可迭代的,因此傳遞的是多個值。這個value實際上就是傳過來的temp。
23      * @param context           : 表示reduce端的上下文,它是負責將reduce端數據傳給調用者(調用者可以傳遞的數據輸出到文件,也可以輸出到下一個MapReduce程序。
24      */
25     @Override
26     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
27         //給max變量定義一個最小的int初始值方便用于比較
28         int max = Integer.MIN_VALUE;
29         //由于輸入端只有一個key,因此value的所有值都屬于這個key的,我們需要做的是對value進行遍歷并將所有數據進行相加操作,最終的結果就得到了同一個key的出現的次數。
30         for (IntWritable value : values){
31             //獲取到value的get方法獲取到value的值。然后和max進行比較,將較大的值重新賦值給max
32             max = Math.max(max,value.get());
33         }
34         //我們將key原封不動的返回,并將key的values的所有int類型的參數進行折疊,最終返回單詞書以及該單詞總共出現的次數。
35         context.write(key,new IntWritable(max));
36     }
37 }
  MaxTempApp.java文件內容如下:
 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.mapreduce.maxtemp;
 7 
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.IntWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 import java.io.IOException;
17 
18 public class MaxTempApp {
19 
20     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
21         //實例化一個Configuration,它會自動去加載本地的core-site.xml配置文件的fs.defaultFS屬性。(該文件放在項目的resources目錄即可。)
22         Configuration conf = new Configuration();
23         //將hdfs寫入的路徑定義在本地,需要修改默認為文件系統,這樣就可以覆蓋到之前在core-site.xml配置文件讀取到的數據。
24         conf.set("fs.defaultFS","file:///");
25         //創建一個任務對象job,別忘記把conf穿進去喲!
26         Job job = Job.getInstance(conf);
27         //給任務起個名字
28         job.setJobName("WordCount");
29         //指定main函數所在的類,也就是當前所在的類名
30         job.setJarByClass(MaxTempApp.class);
31         //指定map的類名,這里指定咱們自定義的map程序即可
32         job.setMapperClass(MaxTempMapper.class);
33         //指定Combiner的類名,這里指定咱們自定義的reduce程序即可,注意,咱們這里沒有設置Reduce程序,只是用了Map和Combiner。
34         job.setCombinerClass(MaxTempReducer.class);
35         //設置輸出key的數據類型
36         job.setOutputKeyClass(Text.class);
37         //設置輸出value的數據類型
38         job.setOutputValueClass(IntWritable.class);
39         //設置輸入路徑,需要傳遞兩個參數,即任務對象(job)以及輸入路徑
40         FileInputFormat.addInputPath(job,new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\MapReduce\temp"));
41         //初始化HDFS文件系統,此時我們需要把讀取到的fs.defaultFS屬性傳給fs對象。我的目的是調用該對象的delete方法,刪除已經存在的文件夾
42         FileSystem fs = FileSystem.get(conf);
43         //通過fs的delete方法可以刪除文件,第一個參數指的是刪除文件對象,第二參數是指遞歸刪除,一般用作刪除目錄
44         Path outPath = new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\MapReduce\out");
45         if (fs.exists(outPath)){
46             fs.delete(outPath,true);
47         }
48         //設置輸出路徑,需要傳遞兩個參數,即任務對象(job)以及輸出路徑
49         FileOutputFormat.setOutputPath(job,outPath);
50         //等待任務執行結束,將里面的值設置為true。
51         job.waitForCompletion(true);
52     }
53 }

  

總結

以上是生活随笔為你收集整理的Hadoop基础-MapReduce的Combiner用法案例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。