第五章-分布式并行编程框架MapReduce
第五章-分布式并行編程框架MapReduce
文章目錄
- 第五章-分布式并行編程框架MapReduce
- MapReduce概述
- 分布式并行編程
- MapReduce模型和函數(shù)
- MapReduce體系結(jié)構(gòu)
- MapReduce工作流程
- 工作流程概述
- 各個執(zhí)行階段
- shuffle過程
- 實例分析:WordCount
- MapReduce的具體應(yīng)用
- MapReduce編程實踐
MapReduce概述
分布式并行編程
過去很長一段時間,CPU的性能都遵循“摩爾定律”:【當(dāng)價格不變時,集成電路上可容納的元器件的數(shù)目,約每隔18個月便會增加一倍,性能也將提升一倍】。從2005年開始摩爾定律逐漸失效,需要處理的數(shù)據(jù)量快速增加,人們開始借助于分布式并行編程來提高程序性能。
分布式并行程序運行在大規(guī)模計算機集群上,可以并行執(zhí)行大規(guī)模數(shù)據(jù)處理任務(wù),從而獲得海量的計算能力。同時通過向集群中增加新的計算節(jié)點,就能很容易地實現(xiàn)集群計算能力的擴充。
谷歌公司最先提出了分布式并行編程模型 MapReduce,Hadoop MapReduce是它的開源實現(xiàn) 。谷歌的 MapReduce運行在分布式文件系統(tǒng) GFS上,Hadoop MapReduce運行在分布式文件系統(tǒng) HDFS上。相對而言,Hadoop MapReduce要比谷歌 MapReduce的使用門檻低很多,程序員即使沒有任何分布式編程開發(fā)經(jīng)驗,也可以很輕松地開發(fā)出分布式程序部署到計算機集群上。
| 傳統(tǒng)并行編程框架 | 通常采用共享式架構(gòu)(共享內(nèi)存、共享存儲),底層通常采用統(tǒng)一的存儲區(qū)域網(wǎng)絡(luò)SAN | 容錯性差,其中一個硬件發(fā)生故障容易導(dǎo)致整個集群不可工作 | 通常采用刀片服務(wù)器,高速網(wǎng)絡(luò)以及共享存儲區(qū)域網(wǎng)絡(luò) SAN,價格高,擴展性差 | 編程難度大,需要解決做什么和怎么做的問題,編程原理和多線程編程邏輯類似,需要借助互斥量、信號量、鎖等機制,實現(xiàn)不同任務(wù)之間的同步和通信 | 適用于實時、細(xì)粒度計算,尤其適用于計算密集型的應(yīng)用 |
| MapReduce | 采用典型的非共享式架構(gòu) | 容錯性好,在整個集群中每個節(jié)點都有自己的內(nèi)存和存儲,任何一個節(jié)點出現(xiàn)問題不會影響其他節(jié)點正常運行,同時系統(tǒng)中設(shè)計了冗余和容錯機制 | 整個集群可以隨意增加或減少相關(guān)的計算節(jié)點,普通PC機就可以實現(xiàn),價格低廉,擴展性好 | 編程簡單,只需要告訴系統(tǒng)要解決什么問題,系統(tǒng)自動實現(xiàn)分布式部署,屏蔽分布式同步、通信、負(fù)載均衡、失敗恢復(fù)等底層細(xì)節(jié) | 一般適用于非實時的批處理及數(shù)據(jù)密集型應(yīng)用 |
MapReduce模型和函數(shù)
MapReduce將復(fù)雜的、運行于大規(guī)模集群上的并行計算過程高度地抽象到了兩個函數(shù):Map和Reduce。
MapReduce采用“分而治之”策略,一個存儲在分布式文件系統(tǒng)中的大規(guī)模數(shù)據(jù)集,會被切分成許多獨立的分片(split),這些分片可以被多個 Map任務(wù)并行處理。MapReduce框架會為每個 Map任務(wù)輸入一個數(shù)據(jù)子集,Map任務(wù)生成的結(jié)果會繼續(xù)作為 Reduce任務(wù)的輸入,最終由 Reduce任務(wù)輸出最后結(jié)果,并寫入分布式文件系統(tǒng)。
這里要特別強調(diào)一下,適合用 MapReduce來處理的數(shù)據(jù)集需要滿足一個前提條件:待處理的數(shù)據(jù)集可以分解成許多個小的數(shù)據(jù)集,而且每一個小數(shù)據(jù)集都可以完全并行地進行處理。
MapReduce設(shè)計的一個理念就是“計算向數(shù)據(jù)靠攏”,而不是“數(shù)據(jù)向計算靠攏”,因為,移動數(shù)據(jù)需要大量的網(wǎng)絡(luò)傳輸開銷,在大規(guī)模數(shù)據(jù)環(huán)境下開銷更為驚人。所以,移動計算要比移動數(shù)據(jù)更加經(jīng)濟。
MapReduce框架采用了Master/Slave架構(gòu),包括一個 Master和若干個Slave。Master上運行JobTracker, Slave上運行TaskTracker。
Map函數(shù)和 Reduce函數(shù)都是以<key, value>作為輸入,按一定的映射規(guī)則轉(zhuǎn)換成另一個或一批<key, value>進行輸出。
| Map | <k1,v1> | List(<k2,v2>) | 將小數(shù)據(jù)集(split)進一步解析成一批<key,value>對,輸入 Map函數(shù)中進行處理。每一個輸入的<k1,v1>會輸出一批<k2,v2>,<k2,v2>是計算的中間結(jié)果 |
| Reduce | <k2,List(v2)> | <k3,v3> | 輸入的中間結(jié)果<k2,List(v2)>中的 List(v2)表示是一批屬于同一個 k2的 value |
- Map函數(shù)將輸入的元素轉(zhuǎn)換成<key,value>形式的鍵值對,鍵和值的類型也是任意的。
- Reduce函數(shù)將輸入的一系列具有相同鍵的鍵值對以某種方式組合起來,輸出處理后的鍵值對,輸出結(jié)果合并為一個文件。
MapReduce體系結(jié)構(gòu)
MapReduce體系結(jié)構(gòu)主要由四個部分組成,分別是: Client、JobTracker、TaskTracker以及 Task。
Client:
- 用戶編寫的 MapReduce程序通過 Client提交到 JobTracker端
- 用戶可通過 Client提供的一些接口查看作業(yè)運行狀態(tài)
JobTracker:
- 負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度
- 監(jiān)控所有 TaskTracker與 Job的健康狀況,一旦發(fā)現(xiàn)失敗,就將 相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點
- 會跟蹤任務(wù)的執(zhí)行進度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器(TaskScheduler),而調(diào)度器會在資源出現(xiàn)空閑時,選擇合適的任務(wù)去使用這些資源
- 調(diào)度器是一個可插拔的模塊,用戶可以根據(jù)自己的實際應(yīng)用要求設(shè)計調(diào)度器
TaskTracker:
- 接收 JobTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動新任務(wù)、殺死任務(wù)等)
- TaskTracker 會周期性地通過“心跳”將本節(jié)點上資源的使用情況 和任務(wù)的運行進度匯報給 JobTracker
- TaskTracker 使用“slot”等量劃分本節(jié)點上的資源量(CPU、內(nèi) 存等)。一個 Task 獲取到一個 slot 后才有機會運行,而 Hadoop調(diào)度器的作用就是將各個 TaskTracker上的空閑 slot分配給 Task使用。 slot 分為 Map slot 和 Reduce slot 兩種,分別供 MapTask 和 Reduce Task 使用
Task:
- Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動
MapReduce工作流程
工作流程概述
MapReduce的核心思想是“分而治之”。一個大的 MapReduce作業(yè),首先會被拆分成許多個 Map任務(wù)在多臺機器上并行處理,每個 Map任務(wù)通常運行在數(shù)據(jù)存儲的節(jié)點上。當(dāng) Map任務(wù)結(jié)束后,會生成以<key,value>形式表示的中間結(jié)果,這些中間結(jié)果會被分發(fā)到多個 Reduce任務(wù)在多臺機器上并行執(zhí)行,具有相同 key的<key,value>會被發(fā)送到同一個 Reduce任務(wù)那里。Reduce任務(wù)會被中間結(jié)果進行匯總計算得到最后結(jié)果,并輸出到分布式文件系統(tǒng)中。
- 不同的 Map任務(wù)之間不會進行通信,不同的 Reduce任務(wù)之間也不會發(fā)生任何信息交換
- 只有當(dāng) Map任務(wù)全部結(jié)束后,Reduce過程才能開始
- 用戶不能顯式地從一臺機器向另一臺機器發(fā)送消息
- 所有的數(shù)據(jù)交換都是通過 MapReduce框架自身去實現(xiàn)的
- Map任務(wù)的輸入文件、Reduce任務(wù)的處理結(jié)果都是保存在分布式文件系統(tǒng)中,而 Map任務(wù)處理的中間結(jié)果保存在本地磁盤中。
各個執(zhí)行階段
MapReduce算法的執(zhí)行過程:
shuffle過程
Shuffle:是指對 Map輸出的結(jié)果進行分區(qū)、排序、合并、歸并等處理并交給 Reduce的過程,分為 Map端的操作和 Reduce端的操作。
|
|
實例分析:WordCount
WordCount程序任務(wù)
| 輸入 | 一個包含大量單詞的文本文件 |
| 輸出 | 文件中每個單詞及其出現(xiàn)次數(shù)(頻數(shù)),并按照單詞 字母順序排序,每個單詞和其頻數(shù)占一行,單詞和頻 數(shù)之間有間隔 |
一個WordCount執(zhí)行過程的實例
MapReduce的具體應(yīng)用
MapReduce可以很好地應(yīng)用于各種計算問題:
- 關(guān)系代數(shù)運算(選擇、投影、并、交、差、連接)
- 分組與聚合運算
- 矩陣-向量乘法
- 矩陣乘法
在 MapReduce環(huán)境下執(zhí)行兩個關(guān)系的連接操作的方法如下:
假設(shè)關(guān)系 R(A,B),S(B,C)都存儲在一個文件中,為了連接這些關(guān)系,必須把來自每個關(guān)系的各個元組都和一個鍵關(guān)聯(lián),這個鍵就是屬性 B的值。可以使用 Map過程把來自 R的每個元組<a,b>轉(zhuǎn)換成一個鍵值對<b,<R,a>>,其中的鍵就是 b,值就是<R,a>。注意,這里把關(guān)系 R包含在值中,這樣做可以使得我們在 Reduce階段只把那些來自 R的元組和來自 S的元組進行匹配。
類似地,使用 Map過程把來自 S的每個元組<b,c>轉(zhuǎn)換成一個鍵值對<b,<S,c>>,鍵是 b,值是<S,c>。Reduce進程的任務(wù)就是,把來自關(guān)系 R和 S的具有共同屬性 B值的元組進行合并。這樣,所有具有特定 B值的元組必須被發(fā)送到同一個 Reduce進程。
MapReduce編程實踐
任務(wù)要求:用 MapReduce實現(xiàn)對輸入文件中的單詞做詞頻統(tǒng)計
實踐一共分為四步:
1.編寫 Map處理邏輯
public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}} }2.編寫 Reduce處理邏輯
public static class IntSumReducerextends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);} }3.編寫 Main函數(shù)
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1); }編譯打包代碼請參考另一篇博客 簡單的MapReduce實踐
完整代碼:
import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducerextends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }總結(jié)
以上是生活随笔為你收集整理的第五章-分布式并行编程框架MapReduce的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 简单的MongoDB实践
- 下一篇: 决策树分类实验