日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

第五章-分布式并行编程框架MapReduce

發(fā)布時間:2025/3/21 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 第五章-分布式并行编程框架MapReduce 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

第五章-分布式并行編程框架MapReduce

文章目錄

  • 第五章-分布式并行編程框架MapReduce
    • MapReduce概述
      • 分布式并行編程
      • MapReduce模型和函數(shù)
    • MapReduce體系結(jié)構(gòu)
    • MapReduce工作流程
      • 工作流程概述
      • 各個執(zhí)行階段
      • shuffle過程
    • 實例分析:WordCount
    • MapReduce的具體應(yīng)用
    • MapReduce編程實踐

MapReduce概述

分布式并行編程

過去很長一段時間,CPU的性能都遵循“摩爾定律”:【當(dāng)價格不變時,集成電路上可容納的元器件的數(shù)目,約每隔18個月便會增加一倍,性能也將提升一倍】。從2005年開始摩爾定律逐漸失效,需要處理的數(shù)據(jù)量快速增加,人們開始借助于分布式并行編程來提高程序性能。

分布式并行程序運行在大規(guī)模計算機集群上,可以并行執(zhí)行大規(guī)模數(shù)據(jù)處理任務(wù),從而獲得海量的計算能力。同時通過向集群中增加新的計算節(jié)點,就能很容易地實現(xiàn)集群計算能力的擴充。

谷歌公司最先提出了分布式并行編程模型 MapReduce,Hadoop MapReduce是它的開源實現(xiàn) 。谷歌的 MapReduce運行在分布式文件系統(tǒng) GFS上,Hadoop MapReduce運行在分布式文件系統(tǒng) HDFS上。相對而言,Hadoop MapReduce要比谷歌 MapReduce的使用門檻低很多,程序員即使沒有任何分布式編程開發(fā)經(jīng)驗,也可以很輕松地開發(fā)出分布式程序部署到計算機集群上。

集群的架構(gòu)容錯性硬件價格及擴展性編程和學(xué)習(xí)難度適用場景
傳統(tǒng)并行編程框架通常采用共享式架構(gòu)(共享內(nèi)存、共享存儲),底層通常采用統(tǒng)一的存儲區(qū)域網(wǎng)絡(luò)SAN容錯性差,其中一個硬件發(fā)生故障容易導(dǎo)致整個集群不可工作通常采用刀片服務(wù)器,高速網(wǎng)絡(luò)以及共享存儲區(qū)域網(wǎng)絡(luò) SAN,價格高,擴展性差編程難度大,需要解決做什么和怎么做的問題,編程原理和多線程編程邏輯類似,需要借助互斥量、信號量、鎖等機制,實現(xiàn)不同任務(wù)之間的同步和通信適用于實時、細(xì)粒度計算,尤其適用于計算密集型的應(yīng)用
MapReduce采用典型的非共享式架構(gòu)容錯性好,在整個集群中每個節(jié)點都有自己的內(nèi)存和存儲,任何一個節(jié)點出現(xiàn)問題不會影響其他節(jié)點正常運行,同時系統(tǒng)中設(shè)計了冗余和容錯機制整個集群可以隨意增加或減少相關(guān)的計算節(jié)點,普通PC機就可以實現(xiàn),價格低廉,擴展性好編程簡單,只需要告訴系統(tǒng)要解決什么問題,系統(tǒng)自動實現(xiàn)分布式部署,屏蔽分布式同步、通信、負(fù)載均衡、失敗恢復(fù)等底層細(xì)節(jié)一般適用于非實時的批處理及數(shù)據(jù)密集型應(yīng)用

MapReduce模型和函數(shù)

MapReduce將復(fù)雜的、運行于大規(guī)模集群上的并行計算過程高度地抽象到了兩個函數(shù):Map和Reduce。

MapReduce采用“分而治之”策略,一個存儲在分布式文件系統(tǒng)中的大規(guī)模數(shù)據(jù)集,會被切分成許多獨立的分片(split),這些分片可以被多個 Map任務(wù)并行處理。MapReduce框架會為每個 Map任務(wù)輸入一個數(shù)據(jù)子集,Map任務(wù)生成的結(jié)果會繼續(xù)作為 Reduce任務(wù)的輸入,最終由 Reduce任務(wù)輸出最后結(jié)果,并寫入分布式文件系統(tǒng)。

這里要特別強調(diào)一下,適合用 MapReduce來處理的數(shù)據(jù)集需要滿足一個前提條件:待處理的數(shù)據(jù)集可以分解成許多個小的數(shù)據(jù)集,而且每一個小數(shù)據(jù)集都可以完全并行地進行處理。

MapReduce設(shè)計的一個理念就是“計算向數(shù)據(jù)靠攏”,而不是“數(shù)據(jù)向計算靠攏”,因為,移動數(shù)據(jù)需要大量的網(wǎng)絡(luò)傳輸開銷,在大規(guī)模數(shù)據(jù)環(huán)境下開銷更為驚人。所以,移動計算要比移動數(shù)據(jù)更加經(jīng)濟。

MapReduce框架采用了Master/Slave架構(gòu),包括一個 Master和若干個Slave。Master上運行JobTracker, Slave上運行TaskTracker。

Map函數(shù)和 Reduce函數(shù)都是以<key, value>作為輸入,按一定的映射規(guī)則轉(zhuǎn)換成另一個或一批<key, value>進行輸出。

函數(shù)輸入輸出說明
Map<k1,v1>List(<k2,v2>)將小數(shù)據(jù)集(split)進一步解析成一批<key,value>對,輸入 Map函數(shù)中進行處理。每一個輸入的<k1,v1>會輸出一批<k2,v2>,<k2,v2>是計算的中間結(jié)果
Reduce<k2,List(v2)><k3,v3>輸入的中間結(jié)果<k2,List(v2)>中的 List(v2)表示是一批屬于同一個 k2的 value
  • Map函數(shù)將輸入的元素轉(zhuǎn)換成<key,value>形式的鍵值對,鍵和值的類型也是任意的。
  • Reduce函數(shù)將輸入的一系列具有相同鍵的鍵值對以某種方式組合起來,輸出處理后的鍵值對,輸出結(jié)果合并為一個文件。

MapReduce體系結(jié)構(gòu)

MapReduce體系結(jié)構(gòu)主要由四個部分組成,分別是: Client、JobTracker、TaskTracker以及 Task。

Client:

  • 用戶編寫的 MapReduce程序通過 Client提交到 JobTracker端
  • 用戶可通過 Client提供的一些接口查看作業(yè)運行狀態(tài)

JobTracker:

  • 負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度
  • 監(jiān)控所有 TaskTracker與 Job的健康狀況,一旦發(fā)現(xiàn)失敗,就將 相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點
  • 會跟蹤任務(wù)的執(zhí)行進度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器(TaskScheduler),而調(diào)度器會在資源出現(xiàn)空閑時,選擇合適的任務(wù)去使用這些資源
  • 調(diào)度器是一個可插拔的模塊,用戶可以根據(jù)自己的實際應(yīng)用要求設(shè)計調(diào)度器

TaskTracker:

  • 接收 JobTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動新任務(wù)、殺死任務(wù)等)
  • TaskTracker 會周期性地通過“心跳”將本節(jié)點上資源的使用情況 和任務(wù)的運行進度匯報給 JobTracker
  • TaskTracker 使用“slot”等量劃分本節(jié)點上的資源量(CPU、內(nèi) 存等)。一個 Task 獲取到一個 slot 后才有機會運行,而 Hadoop調(diào)度器的作用就是將各個 TaskTracker上的空閑 slot分配給 Task使用。 slot 分為 Map slot 和 Reduce slot 兩種,分別供 MapTask 和 Reduce Task 使用

Task:

  • Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動

MapReduce工作流程

工作流程概述

MapReduce的核心思想是“分而治之”。一個大的 MapReduce作業(yè),首先會被拆分成許多個 Map任務(wù)在多臺機器上并行處理,每個 Map任務(wù)通常運行在數(shù)據(jù)存儲的節(jié)點上。當(dāng) Map任務(wù)結(jié)束后,會生成以<key,value>形式表示的中間結(jié)果,這些中間結(jié)果會被分發(fā)到多個 Reduce任務(wù)在多臺機器上并行執(zhí)行,具有相同 key的<key,value>會被發(fā)送到同一個 Reduce任務(wù)那里。Reduce任務(wù)會被中間結(jié)果進行匯總計算得到最后結(jié)果,并輸出到分布式文件系統(tǒng)中。

  • 不同的 Map任務(wù)之間不會進行通信,不同的 Reduce任務(wù)之間也不會發(fā)生任何信息交換
  • 只有當(dāng) Map任務(wù)全部結(jié)束后,Reduce過程才能開始
  • 用戶不能顯式地從一臺機器向另一臺機器發(fā)送消息
  • 所有的數(shù)據(jù)交換都是通過 MapReduce框架自身去實現(xiàn)的
  • Map任務(wù)的輸入文件、Reduce任務(wù)的處理結(jié)果都是保存在分布式文件系統(tǒng)中,而 Map任務(wù)處理的中間結(jié)果保存在本地磁盤中。

各個執(zhí)行階段

MapReduce算法的執(zhí)行過程:

  • 使用 InputFormat模塊做 Map前的預(yù)處理,然后將輸入文件切分為邏輯上的多個 InputSplit,每個 InputSplit并沒有對文件進行實際切割,只是記錄了要處理的位置和長度。
  • 通過 RecordReader(RR)根據(jù) InputSplit中的信息來處理 InputSplit中的具體記錄,加載數(shù)據(jù)并轉(zhuǎn)換為適合 Map任務(wù)讀取的鍵值對,輸入給 Map任務(wù)。
  • Map任務(wù)根據(jù)用戶自定義的映射規(guī)則,輸出一系列<key,value>作為中間結(jié)果。
  • 為了讓 Reduce可以并行處理 Map的結(jié)果,需要對 Map的輸出進行一定的分區(qū)(Partition)、排序(Sort)、合并(Combine)、歸并(Merge)等操作,得到<key,value-list>形式的中間結(jié)果,再交給對應(yīng)的 Reduce進行處理。這個過程將無序的<key,value>處理成了有序的<key,value-list>,成為 shuffle
  • Reduce以一系列的<key,value-list>作為輸入,執(zhí)行用戶定義的邏輯,輸出結(jié)果給 OutputFormat模塊。
  • OutputFormat模塊驗證輸出目錄是否已經(jīng)存在、輸出結(jié)果類型是否符合配置文件中的配置類型。如果都滿足,輸出 Reduce的結(jié)果到分布式文件系統(tǒng)。
  • shuffle過程

    Shuffle:是指對 Map輸出的結(jié)果進行分區(qū)、排序、合并、歸并等處理并交給 Reduce的過程,分為 Map端的操作和 Reduce端的操作。

    Map端的 Shuffle過程Reduce端的 Shuffle過程
    • 輸入數(shù)據(jù)和執(zhí)行Map任務(wù)
    • 寫入緩存
    • 溢寫 (Spill)
    • 文件歸并 (merge)
    • “領(lǐng)取” (Fetch) 數(shù)據(jù)
    • 歸并數(shù)據(jù)

    實例分析:WordCount

    WordCount程序任務(wù)

    WordCount說明
    輸入一個包含大量單詞的文本文件
    輸出文件中每個單詞及其出現(xiàn)次數(shù)(頻數(shù)),并按照單詞 字母順序排序,每個單詞和其頻數(shù)占一行,單詞和頻 數(shù)之間有間隔

    一個WordCount執(zhí)行過程的實例

    Map過程示意圖用戶沒有定義Combiner時的Reduce過程示意圖用戶有定義Combiner時的Reduce過程示意圖

    MapReduce的具體應(yīng)用

    MapReduce可以很好地應(yīng)用于各種計算問題:

    • 關(guān)系代數(shù)運算(選擇、投影、并、交、差、連接)
    • 分組與聚合運算
    • 矩陣-向量乘法
    • 矩陣乘法

    在 MapReduce環(huán)境下執(zhí)行兩個關(guān)系的連接操作的方法如下:

    假設(shè)關(guān)系 R(A,B),S(B,C)都存儲在一個文件中,為了連接這些關(guān)系,必須把來自每個關(guān)系的各個元組都和一個鍵關(guān)聯(lián),這個鍵就是屬性 B的值。可以使用 Map過程把來自 R的每個元組<a,b>轉(zhuǎn)換成一個鍵值對<b,<R,a>>,其中的鍵就是 b,值就是<R,a>。注意,這里把關(guān)系 R包含在值中,這樣做可以使得我們在 Reduce階段只把那些來自 R的元組和來自 S的元組進行匹配。

    類似地,使用 Map過程把來自 S的每個元組<b,c>轉(zhuǎn)換成一個鍵值對<b,<S,c>>,鍵是 b,值是<S,c>。Reduce進程的任務(wù)就是,把來自關(guān)系 R和 S的具有共同屬性 B值的元組進行合并。這樣,所有具有特定 B值的元組必須被發(fā)送到同一個 Reduce進程。

    MapReduce編程實踐

    任務(wù)要求:用 MapReduce實現(xiàn)對輸入文件中的單詞做詞頻統(tǒng)計

    實踐一共分為四步:

  • 編寫 Map處理邏輯
  • 編寫 Reduce處理邏輯
  • 編寫 Main函數(shù)
  • 編譯打包代碼
  • 1.編寫 Map處理邏輯

    public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}} }

    2.編寫 Reduce處理邏輯

    public static class IntSumReducerextends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);} }

    3.編寫 Main函數(shù)

    public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1); }

    編譯打包代碼請參考另一篇博客 簡單的MapReduce實踐

    完整代碼:

    import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducerextends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }

    總結(jié)

    以上是生活随笔為你收集整理的第五章-分布式并行编程框架MapReduce的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。