日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hadoop MapReduce实例解析

發布時間:2025/6/15 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop MapReduce实例解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、MapReduce理論簡介

1.1 MapReduce編程模型

MapReduce采用"分而治之"的思想,把對大規模數據集的操作,分發給一個主節點管理下的各個分節點共同完成,然后通過整合各個節點的中間結果,得到最終結果。簡單地說,MapReduce就是"任務的分解與結果的匯總"。

在Hadoop中,用于執行MapReduce任務的機器角色有兩個:一個是JobTracker;另一個是TaskTracker,JobTracker是用于調度工作的,TaskTracker是用于執行工作的。一個Hadoop集群中只有一臺JobTracker。

在分布式計算中,MapReduce框架負責處理了并行編程中分布式存儲、工作調度、負載均衡、容錯均衡、容錯處理以及網絡通信等復雜問題,把處理過程高度抽象為兩個函數:map和reduce,map負責把任務分解成多個任務,reduce負責把分解后多任務處理的結果匯總起來。

需要注意的是,用MapReduce來處理的數據集(或任務)必須具備這樣的特點:待處理的數據集可以分解成許多小的數據集,而且每一個小數據集都可以完全并行地進行處理。

1.2 MapReduce處理過程

在Hadoop中,每個MapReduce任務都被初始化為一個Job,每個Job又可以分為兩種階段:map階段和reduce階段。這兩個階段分別用兩個函數表示,即map函數和reduce函數。map函數接收一個<key,value>形式的輸入,然后同樣產生一個<key,value>形式的中間輸出,Hadoop函數接收一個如<key,(list of values)>形式的輸入,然后對這個value集合進行處理,每個reduce產生0或1個輸出,reduce的輸出也是<key,value>形式的。

MapReduce處理大數據集的過程

2、運行WordCount程序

單詞計數是最簡單也是最能體現MapReduce思想的程序之一,可以稱為MapReduce版"Hello World",該程序的完整代碼可以在Hadoop安裝包的"src/examples"目錄下找到。單詞計數主要完成功能是:統計一系列文本文件中每個單詞出現的次數,如下圖所示。

2.1 準備工作

現在以"hadoop"普通用戶登錄"Master.Hadoop"服務器。

1)創建本地示例文件

首先在"/home/hadoop"目錄下創建文件夾"file"。

接著創建兩個文本文件file1.txt和file2.txt,使file1.txt內容為"Hello World",而file2.txt的內容為"Hello Hadoop"。

2)在HDFS上創建輸入文件夾

3)上傳本地file中文件到集群的input目錄下

2.2 運行例子

1)在集群上運行WordCount程序

備注: 以input作為輸入目錄,output目錄作為輸出目錄。

已經編譯好的WordCount的Jar在"/usr/hadoop"下面,就是"hadoop-examples-1.0.0.jar",所以在下面執行命令時記得把路徑寫全了,不然會提示找不到該Jar包。

2)MapReduce執行過程顯示信息

Hadoop命令會啟動一個JVM來運行這個MapReduce程序,并自動獲得Hadoop的配置,同時把類的路徑(及其依賴關系)加入到Hadoop的庫中。以上就是Hadoop Job的運行記錄,從這里可以看到,這個Job被賦予了一個ID號:job_201202292213_0002,而且得知輸入文件有兩個(Total input paths to process : 2),同時還可以了解map的輸入輸出記錄(record數及字節數),以及reduce輸入輸出記錄。比如說,在本例中,map的task數量是2個,reduce的task數量是一個。map的輸入record數是2個,輸出record數是4個等信息。

2.3 查看結果

1)查看HDFS上output目錄內容

從上圖中知道生成了三個文件,我們的結果在" part-r-00000 "中。

2)查看結果輸出文件內容

3、WordCount源碼分析

3.1 特別數據類型介紹

Hadoop提供了如下內容的數據類型,這些數據類型都實現了WritableComparable接口,以便用這些類型定義的數據可以被序列化進行網絡傳輸和文件存儲,以及進行大小比較。

BooleanWritable:標準布爾型數值

ByteWritable:單字節數值

DoubleWritable:雙字節數

FloatWritable:浮點數

IntWritable:整型數

LongWritable:長整型數

Text:使用UTF8格式存儲的文本

NullWritable:當<key,value>中的key或value為空時使用

3.2 舊的WordCount分析

1)源代碼程序

package org.apache.hadoop.examples; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount {public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text();public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } }public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount");conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class);FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1]));JobClient.runJob(conf); } }

3)主方法 Main 分析

public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount");conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class);FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1]));JobClient.runJob(conf); }

首先講解一下 Job 的 初始化過程 。 main 函數調用 Jobconf 類來對 MapReduce Job 進行初始化,然后調用 setJobName() 方法命名這個 Job 。對Job進行合理的命名有助于 更快 地找到Job,以便在JobTracker和Tasktracker的頁面中對其進行 監視 。

JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

接著設置Job輸出結果<key,value>的中key和value數據類型,因為結果是<單詞,個數>,所以key設置為"Text"類型,相當于Java中String類型。Value設置為"IntWritable",相當于Java中的int類型。

conf.setOutputKeyClass(Text.class ); conf.setOutputValueClass(IntWritable.class );

然后設置Job處理的Map(拆分)、Combiner(中間結果合并)以及Reduce(合并)的相關處理類。這里用Reduce類來進行Map產生的中間結果合并,避免給網絡數據傳輸產生壓力。

conf.setMapperClass(Map.class ); conf.setCombinerClass(Reduce.class ); conf.setReducerClass(Reduce.class );

接著就是調用setInputPath()和setOutputPath()設置輸入輸出路徑。

conf.setInputFormat(TextInputFormat.class ); conf.setOutputFormat(TextOutputFormat.class );

(1)InputFormat和InputSplit

InputSplit是Hadoop定義的用來 傳送 給每個 單獨 的 map 的 數據 ,InputSplit 存儲 的并 非 數據本身 , 而是一個 分片長度 和一個 記錄數據位置 的 數組 。 生成InputSplit的方法 可以通過 InputFormat() 來 設置 。

當數據傳送給 map 時,map會將輸入 分片 傳送到 InputFormat ,InputFormat則 調用 方法 getRecordReader() 生成 RecordReader , RecordReader再通過 creatKey() 、 creatValue() 方法 創建 可供map處理的 <key,value> 對。簡而言之,InputFormat()方法是用來生成可供map處理的<key,value>對的。

Hadoop預定義了多種方法將不同類型的輸入數據轉化為map能夠處理的<key,value>對,它們都繼承自InputFormat,分別是:

InputFormat||---BaileyBorweinPlouffe.BbpInputFormat|---ComposableInputFormat|---CompositeInputFormat|---DBInputFormat|---DistSum.Machine.AbstractInputFormat|---FileInputFormat|---CombineFileInputFormat|---KeyValueTextInputFormat|---NLineInputFormat|---SequenceFileInputFormat|---TeraInputFormat|---TextInputFormat

其中 TextInputFormat 是Hadoop 默認 的輸入方法,在TextInputFormat中,每個文件(或其一部分)都會單獨地作為map的輸入,而這個是繼承自FileInputFormat的。之后,每行數據都會生成一條記錄,每條記錄則表示成<key,value>形式:

key值是每個數據的記錄在 數據分片 中 字節偏移量 ,數據類型是 LongWritable ;
value值是每行的內容,數據類型是 Text 。

(2)OutputFormat

每一種 輸 入 格式 都有一種 輸 出 格式 與其對應。默認的輸出格式是 TextOutputFormat ,這種輸出方式與輸入類似,會將每條記錄以一行的形式存入文本文件。不過,它的 鍵和值 可以是 任意形式 的,因為程序 內容 會調用 toString() 方法將鍵和值轉換為 String 類型再輸出。

3)Map類中map方法分析

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text();public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } }

Map類 繼承自 MapReduceBase ,并且它實現了 Mapper接口 ,此接口是一個 規范類型 ,它有4種形式的參數,分別用來指定map的 輸入 key值類型、 輸入 value值類型、 輸出 key值類型和 輸出 value值類型。在本例中,因為使用的是TextInputFormat,它的輸出key值是LongWritable類型,輸出value值是Text類型,所以map的輸入類型為<LongWritable,Text>。在本例中需要輸出<word,1>這樣的形式,因此輸出的key值類型是Text,輸出的value值類型是IntWritable。

實現此接口類還需要實現map方法,map方法會具體負責對輸入進行操作,在本例中,map方法對輸入的行以空格為單位進行切分,然后使用 OutputCollect 收集輸出的<word,1>。

4)Reduce類中reduce方法分析

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }

Reduce類 也是繼承自 MapReduceBase 的,需要實現Reducer接口。Reduce類以map的輸出作為輸入,因此Reduce的輸入類型是<Text,Intwritable>。而Reduce的輸出是 單詞 和 它的數目 ,因此,它的輸出類型是<Text,IntWritable>。Reduce類也要實現reduce方法,在此方法中,reduce函數將輸入的key值作為輸出的key值,然后將獲得多個value值加起來,作為輸出的值。

3.3 新的WordCount分析

1)源代碼程序

package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }public static void main (String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

1)Map過程

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } }

Map過程需要繼承org.apache.hadoop.mapreduce包中 Mapper 類,并 重寫 其map方法。通過在map方法中添加兩句把key值和value值輸出到控制臺的代碼,可以發現map方法中value值存儲的是文本文件中的一行(以回車符為行結束標記),而key值為該行的首字母相對于文本文件的首地址的偏移量。然后StringTokenizer類將每一行拆分成為一個個的單詞,并將<word,1>作為map方法的結果輸出,其余的工作都交有 MapReduce框架 處理。

2)Reduce過程

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

Reduce過程需要繼承org.apache.hadoop.mapreduce包中 Reducer 類,并 重寫 其reduce方法。Map過程輸出<key,values>中key為單個單詞,而values是對應單詞的計數值所組成的列表,Map的輸出就是Reduce的輸入,所以reduce方法只要遍歷values并求和,即可得到某個單詞的總次數。

3)執行MapReduce任務

public static void main (String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }

在MapReduce中,由Job對象負責管理和運行一個計算任務,并通過Job的一些方法對任務的參數進行相關的設置。此處設置了使用TokenizerMapper完成Map過程中的處理和使用IntSumReducer完成Combine和Reduce過程中的處理。還設置了Map過程和Reduce過程的輸出類型:key的類型為Text,value的類型為IntWritable。任務的輸出和輸入 路徑 則由命令行參數指定,并由FileInputFormat和FileOutputFormat分別設定。完成相應任務的參數設定后,即可調用 job.waitForCompletion() 方法執行任務。

4、WordCount處理過程

本節將對WordCount進行更詳細的講解。詳細執行步驟如下:

1)將文件拆分成splits,由于測試用的文件較小,所以每個文件為一個split,并將文件按行分割形成<key,value>對,如圖4-1所示。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所占的字符數(Windows和Linux環境會不同)。

圖4-1 分割過程

2)將分割好的<key,value>對交給用戶定義的map方法進行處理,生成新的<key,value>對,如圖4-2所示。

圖4-2 執行map方法

3)得到map方法輸出的<key,value>對后,Mapper會將它們按照key值進行排序,并執行Combine過程,將key至相同value值累加,得到Mapper的最終輸出結果。如圖4-3所示。

圖4-3 Map端排序及Combine過程

4)Reducer先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,得到新的<key,value>對,并作為WordCount的輸出結果,如圖4-4所示。

圖4-4 Reduce端排序及輸出結果

5、MapReduce新舊改變

Hadoop最新版本的MapReduce Release 0.20.0的API包括了一個全新的Mapreduce JAVA API,有時候也稱為上下文對象。

新的API類型上不兼容以前的API,所以,以前的應用程序需要重寫才能使新的API發揮其作用 。

新的API和舊的API之間有下面幾個明顯的區別。

新的API傾向于使用抽象類,而不是接口,因為這更容易擴展。例如,你可以添加一個方法(用默認的實現)到一個抽象類而不需修改類之前的實現方法。在新的API中,Mapper和Reducer是抽象類。

新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API則是放在org.apache.hadoop.mapred中的。

新的API廣泛使用context object(上下文對象),并允許用戶代碼與MapReduce系統進行通信。例如,MapContext基本上充當著JobConf的OutputCollector和Reporter的角色。

新的API同時支持"推"和"拉"式的迭代。在這兩個新老API中,鍵/值記錄對被推mapper中,但除此之外,新的API允許把記錄從map()方法中拉出,這也適用于reducer。"拉"式的一個有用的例子是分批處理記錄,而不是一個接一個。

新的API統一了配置。舊的API有一個特殊的JobConf對象用于作業配置,這是一個對于Hadoop通常的Configuration對象的擴展。在新的API中,這種區別沒有了,所以作業配置通過Configuration來完成。作業控制的執行由Job類來負責,而不是JobClient,它在新的API中已經蕩然無存。

總結

以上是生活随笔為你收集整理的hadoop MapReduce实例解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。