日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop入门(六)Mapreduce

發布時間:2023/12/3 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop入门(六)Mapreduce 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、Mapreduce概述

MapReduce是一個編程模型,用以進行大數據量的計算

?

二、Hadoop MapReduce

(1)MapReduce是什么

Hadoop MapReduce是一個軟件框架,基于該框架能夠容易地編寫應用程序,這些應用程序能夠運行在由上千個商用機器組成的大集群上,并以一種可靠的,具有容錯能力的方式并行地處理上TB級別的海量數據集

Mapreduce的特點:

  • 軟件框架
  • 并行處理
  • 可靠且容錯
  • 大規模集群
  • 海量數據集
    ?
  • (2)MapReduce做什么

    MapReduce的思想就是“分而治之”

    1)Mapper負責“分”
    把復雜的任務分解為若干個“簡單的任務”來處理。“簡單的任務”包含三層含義:

  • 數據或計算的規模相對原任務要大大縮小
  • 就近計算原則,任務會分配到存放著所需數據的節點上進行計算
  • 這些小任務可以并行計算彼此間幾乎沒有依賴關系
  • 2)Reducer負責對map階段的結果進行匯總
    至于需要多少個Reducer,可以根據具體問題,
    通過在mapred-site.xml配置文件里設置參數mapred.reduce.tasks的值,缺省值為1。
    ?

    三、MapReduce工作機制

    作業執行涉及4個獨立的實體

  • 客戶端,用來提交MapReduce作業
  • JobTracker,用來協調作業的運行
  • TaskTracker,用來處理作業劃分后的任務
  • HDFS,用來在其它實體間共享作業文件
  • mapreduce作業工作流程圖

    Mapreduce作業的4個對象

  • 客戶端(client):編寫mapreduce程序,配置作業,提交作業,這就是程序員完成的工作;
  • JobTracker:初始化作業,分配作業,與TaskTracker通信,協調整個作業的執行;
  • TaskTracker:保持與JobTracker的通信,在分配的數據片段上執行Map或Reduce任務,TaskTracker和JobTracker的不同有個很重要的方面,就是在執行任務時候TaskTracker可以有n多個,JobTracker則只會有一個(JobTracker只能有一個就和hdfs里namenode一樣存在單點故障,我會在后面的mapreduce的相關問題里講到這個問題的)
  • Hdfs:保存作業的數據、配置信息等等,最后的結果也是保存在hdfs上面
  • ?

    mapreduce運行步驟1

     首先是客戶端要編寫好mapreduce程序,配置好mapreduce的作業也就是job,
    接下來就是提交job了,提交job是提交到JobTracker上的,這個時候JobTracker就會構建這個job,具體就是分配一個新的job任務的ID值

    接下來它會做檢查操作,這個檢查就是確定輸出目錄是否存在,如果存在那么job就不能正常運行下去,JobTracker會拋出錯誤給客戶端,接下來還要檢查輸入目錄是否存在,如果不存在同樣拋出錯誤,如果存在JobTracker會根據輸入計算輸入分片(Input Split),如果分片計算不出來也會拋出錯誤,至于輸入分片我后面會做講解的,這些都做好了JobTracker就會配置Job需要的資源了。

    分配好資源后,JobTracker就會初始化作業,初始化主要做的是將Job放入一個內部的隊列,讓配置好的作業調度器能調度到這個作業,作業調度器會初始化這個job,初始化就是創建一個正在運行的job對象(封裝任務和記錄信息),以便JobTracker跟蹤job的狀態和進程。

    mapreduce運行步驟2

    初始化完畢后,作業調度器會獲取輸入分片信息(input split),每個分片創建一個map任務。

    接下來就是任務分配了,這個時候tasktracker會運行一個簡單的循環機制定期發送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個時間,心跳就是jobtracker和tasktracker溝通的橋梁,通過心跳,jobtracker可以監控tasktracker是否存活,也可以獲取tasktracker處理的狀態和問題,同時tasktracker也可以通過心跳里的返回值獲取jobtracker給它的操作指令。

    任務分配好后就是執行任務了。在任務執行時候jobtracker可以通過心跳機制監控tasktracker的狀態和進度,同時也能計算出整個job的狀態和進度,而tasktracker也可以本地監控自己的狀態和進度。當jobtracker獲得了最后一個完成指定任務的tasktracker操作成功的通知時候,jobtracker會把整個job狀態置為成功,然后當客戶端查詢job運行狀態時候(注意:這個是異步操作),客戶端會查到job完成的通知的。如果job中途失敗,mapreduce也會有相應機制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯誤處理機制都能保證提交的job能正常完成。

    ?

    四、mapreduce運行機制

    在Hadoop中,一個MapReduce作業會把輸入的數據集切分為若干獨立的數據塊,由Map任務以完全并行的方式處理。框架會對Map的輸出先進行排序,然后把結果輸入給Reduce任務。

    作業的輸入和輸出都會被存儲在文件系統中,整個框架負責任務的調度和監控,以及重新執行已經關閉的任務。MapReduce框架和分布式文件系統是運行在一組相同的節點,計算節點和存儲節點都是在一起的

    (1) MapReduce的輸入輸出

    一個MapReduce作業的輸入和輸出類型: 會有三組<key,value>鍵值對類型的存在

    ?

    (2)Mapreduce作業的處理流程

    ?

    按照時間順序包括:
    輸入分片(input split)
    map階段
    shuffle階段:map shuffle(partition、sort/group、combiner、partition? merge)和reduce?shuffle(copy、merge(sort/group))
    reduce階段

    1)輸入分片(input split)

    在進行map計算之前,mapreduce會根據輸入文件計算輸入分片(input split),每個輸入分片(input split)針對一個map任務
    輸入分片(input split)存儲的并非數據本身,而是一個分片長度和一個記錄數據的位置的數組,輸入分片(input split)往往和hdfs的block(塊)關系很密切

    ?? ?假如我們設定hdfs的塊的大小是64mb,如果我們輸入有三個文件,大小分別是3mb、65mb和127mb,那么mapreduce會把3mb文件分為一個輸入分片(input split),65mb則是兩個輸入分片(input split)而127mb也是兩個輸入分片(input split)
    ? ? 即我們如果在map計算前做輸入分片調整,例如合并小文件,那么就會有5個map任務將執行,而且每個map執行的數據大小不均,這個也是mapreduce優化計算的一個關鍵點。

    2)map階段
    程序員編寫好的map函數了,因此map函數效率相對好控制,而且一般map操作都是本地化操作也就是在數據存儲節點上進行;

    3)combiner階段

    combiner階段是程序員可以選擇的,combiner其實也是一種reduce操作,因此我們看見WordCount類里是用reduce進行加載的。

    Combiner是一個本地化的reduce操作,它是map運算的后續操作,主要是在map計算出中間文件前做一個簡單的合并重復key值的操作,例如我們對文件里的單詞頻率做統計,map計算時候如果碰到一個hadoop的單詞就會記錄為1,但是這篇文章里hadoop可能會出現n多次,那么map輸出文件冗余就會很多,因此在reduce計算前對相同的key做一個合并操作,那么文件會變小,這樣就提高了寬帶的傳輸效率,畢竟hadoop計算力寬帶資源往往是計算的瓶頸也是最為寶貴的資源,但是combiner操作是有風險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終輸入,
    例如:

    如果計算只是求總數,最大值,最小值可以使用combiner,但是做平均值計算使用combiner的話,最終的reduce計算結果就會出錯。

    4)shuffle階段

    將map的輸出作為reduce的輸入的過程就是shuffle了。

    5)reduce階段

    和map函數一樣也是程序員編寫的,最終結果是存儲在hdfs上的。

    ?

    五、Mapreduce框架的相關問題

    jobtracker的單點故障

    jobtracker和hdfs的namenode一樣也存在單點故障,單點故障一直是hadoop被人詬病的大問題,為什么hadoop的做的文件系統和mapreduce計算框架都是高容錯的,但是最重要的管理節點的故障機制卻如此不好,我認為主要是namenode和jobtracker在實際運行中都是在內存操作,而做到內存的容錯就比較復雜了,只有當內存數據被持久化后容錯才好做,namenode和jobtracker都可以備份自己持久化的文件,但是這個持久化都會有延遲,因此真的出故障,任然不能整體恢復,另外hadoop框架里包含zookeeper框架,zookeeper可以結合jobtracker,用幾臺機器同時部署jobtracker,保證一臺出故障,有一臺馬上能補充上,不過這種方式也沒法恢復正在跑的mapreduce任務。

    ?

    六、Mapreduce的單詞計數實例

    public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {}}public static void main(String[] args) throws Exception {//…?} }

    (1)Map

    ?public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{??private final static IntWritable one = new IntWritable(1);private Text word = new Text();???????public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}

    map的方法

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
    這里有三個參數,前面兩個Object key, Text value就是輸入的key和value,第三個參數Context context這是可以記錄輸入的key和value

    例如:context.write(word, one);此外context還會記錄map運算的狀態。

    (2)reduce

    ?public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}

    reduce函數的方法

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}

      reduce函數的輸入也是一個key/value的形式,
    不過它的value是一個迭代器的形式Iterable<IntWritable> values,

    也就是說reduce的輸入是一個key對應一組的值的value,reduce也有context和map的context作用一致。

    (3)main函數

    public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} Configuration conf = new Configuration();

    運行mapreduce程序前都要初始化Configuration,該類主要是讀取mapreduce系統配置信息,這些信息包括hdfs還有mapreduce,也就是安裝hadoop時候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解為啥要這么做,這個是沒有深入思考mapreduce計算框架造成,我們程序員開發mapreduce時候只是在填空,在map函數和reduce函數里編寫實際進行的業務邏輯,其它的工作都是交給mapreduce框架自己操作的,但是至少我們要告訴它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而這些信息就在conf包下的配置文件里。

    ? ? String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}

    If的語句好理解,就是運行WordCount程序時候一定是兩個參數,如果不是就會報錯退出。至于第一句里的GenericOptionsParser類,它是用來解釋常用hadoop命令,并根據需要為Configuration對象設置相應的值,其實平時開發里我們不太常用它,而是讓類實現Tool接口,然后再main函數里使用ToolRunner運行程序,而ToolRunner內部會調用GenericOptionsParser。

    ? ? Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);

    第一行就是在構建一個job,在mapreduce框架里一個mapreduce任務也叫mapreduce作業也叫做一個mapreduce的job,而具體的map和reduce運算就是task了,這里我們構建一個job,構建時候有兩個參數,一個是conf這個就不累述了,一個是這個job的名稱。
      
    第二行就是裝載程序員編寫好的計算程序,例如我們的程序類名就是WordCount了。這里我要做下糾正,雖然我們編寫mapreduce程序只需要實現map函數和reduce函數,但是實際開發我們要實現三個類,第三個類是為了配置mapreduce如何運行map和reduce函數,準確的說就是構建一個mapreduce能執行的job了,例如WordCount類。
      
    第三行和第五行就是裝載map函數和reduce函數實現類了,這里多了個第四行,這個是裝載Combiner類,這個我后面講mapreduce運行機制時候會講述,其實本例去掉第四行也沒有關系,但是使用了第四行理論上運行效率會更好。

    ? ? job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);

    這個是定義輸出的key/value的類型,也就是最終存儲在hdfs上結果文件的key/value的類型。?

    ? FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);

    第一行就是構建輸入的數據文件,
    第二行是構建輸出的數據文件,
    最后一行如果job運行成功了,我們的程序就會正常退出。FileInputFormat和FileOutputFormat可以設置輸入輸出文件路徑,
    mapreduce計算時候,輸入文件必須存在,要不直Mr任務直接退出。輸出一般是一個文件夾,而且該文件夾不能存在。

    ?

    ?

    總結

    以上是生活随笔為你收集整理的Hadoop入门(六)Mapreduce的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。