Hadoop入门(六)Mapreduce
一、Mapreduce概述
MapReduce是一個編程模型,用以進行大數據量的計算
?
二、Hadoop MapReduce
(1)MapReduce是什么
Hadoop MapReduce是一個軟件框架,基于該框架能夠容易地編寫應用程序,這些應用程序能夠運行在由上千個商用機器組成的大集群上,并以一種可靠的,具有容錯能力的方式并行地處理上TB級別的海量數據集
Mapreduce的特點:
?
(2)MapReduce做什么
MapReduce的思想就是“分而治之”
1)Mapper負責“分”
把復雜的任務分解為若干個“簡單的任務”來處理。“簡單的任務”包含三層含義:
2)Reducer負責對map階段的結果進行匯總
至于需要多少個Reducer,可以根據具體問題,
通過在mapred-site.xml配置文件里設置參數mapred.reduce.tasks的值,缺省值為1。
?
三、MapReduce工作機制
作業執行涉及4個獨立的實體
mapreduce作業工作流程圖
Mapreduce作業的4個對象
?
mapreduce運行步驟1
首先是客戶端要編寫好mapreduce程序,配置好mapreduce的作業也就是job,
接下來就是提交job了,提交job是提交到JobTracker上的,這個時候JobTracker就會構建這個job,具體就是分配一個新的job任務的ID值
接下來它會做檢查操作,這個檢查就是確定輸出目錄是否存在,如果存在那么job就不能正常運行下去,JobTracker會拋出錯誤給客戶端,接下來還要檢查輸入目錄是否存在,如果不存在同樣拋出錯誤,如果存在JobTracker會根據輸入計算輸入分片(Input Split),如果分片計算不出來也會拋出錯誤,至于輸入分片我后面會做講解的,這些都做好了JobTracker就會配置Job需要的資源了。
分配好資源后,JobTracker就會初始化作業,初始化主要做的是將Job放入一個內部的隊列,讓配置好的作業調度器能調度到這個作業,作業調度器會初始化這個job,初始化就是創建一個正在運行的job對象(封裝任務和記錄信息),以便JobTracker跟蹤job的狀態和進程。
mapreduce運行步驟2
初始化完畢后,作業調度器會獲取輸入分片信息(input split),每個分片創建一個map任務。
接下來就是任務分配了,這個時候tasktracker會運行一個簡單的循環機制定期發送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個時間,心跳就是jobtracker和tasktracker溝通的橋梁,通過心跳,jobtracker可以監控tasktracker是否存活,也可以獲取tasktracker處理的狀態和問題,同時tasktracker也可以通過心跳里的返回值獲取jobtracker給它的操作指令。
任務分配好后就是執行任務了。在任務執行時候jobtracker可以通過心跳機制監控tasktracker的狀態和進度,同時也能計算出整個job的狀態和進度,而tasktracker也可以本地監控自己的狀態和進度。當jobtracker獲得了最后一個完成指定任務的tasktracker操作成功的通知時候,jobtracker會把整個job狀態置為成功,然后當客戶端查詢job運行狀態時候(注意:這個是異步操作),客戶端會查到job完成的通知的。如果job中途失敗,mapreduce也會有相應機制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯誤處理機制都能保證提交的job能正常完成。
?
四、mapreduce運行機制
在Hadoop中,一個MapReduce作業會把輸入的數據集切分為若干獨立的數據塊,由Map任務以完全并行的方式處理。框架會對Map的輸出先進行排序,然后把結果輸入給Reduce任務。
作業的輸入和輸出都會被存儲在文件系統中,整個框架負責任務的調度和監控,以及重新執行已經關閉的任務。MapReduce框架和分布式文件系統是運行在一組相同的節點,計算節點和存儲節點都是在一起的
(1) MapReduce的輸入輸出
一個MapReduce作業的輸入和輸出類型: 會有三組<key,value>鍵值對類型的存在
?
(2)Mapreduce作業的處理流程
?
按照時間順序包括:
輸入分片(input split)
map階段
shuffle階段:map shuffle(partition、sort/group、combiner、partition? merge)和reduce?shuffle(copy、merge(sort/group))
reduce階段
1)輸入分片(input split)
在進行map計算之前,mapreduce會根據輸入文件計算輸入分片(input split),每個輸入分片(input split)針對一個map任務
輸入分片(input split)存儲的并非數據本身,而是一個分片長度和一個記錄數據的位置的數組,輸入分片(input split)往往和hdfs的block(塊)關系很密切
?? ?假如我們設定hdfs的塊的大小是64mb,如果我們輸入有三個文件,大小分別是3mb、65mb和127mb,那么mapreduce會把3mb文件分為一個輸入分片(input split),65mb則是兩個輸入分片(input split)而127mb也是兩個輸入分片(input split)
? ? 即我們如果在map計算前做輸入分片調整,例如合并小文件,那么就會有5個map任務將執行,而且每個map執行的數據大小不均,這個也是mapreduce優化計算的一個關鍵點。
2)map階段
程序員編寫好的map函數了,因此map函數效率相對好控制,而且一般map操作都是本地化操作也就是在數據存儲節點上進行;
3)combiner階段
combiner階段是程序員可以選擇的,combiner其實也是一種reduce操作,因此我們看見WordCount類里是用reduce進行加載的。
Combiner是一個本地化的reduce操作,它是map運算的后續操作,主要是在map計算出中間文件前做一個簡單的合并重復key值的操作,例如我們對文件里的單詞頻率做統計,map計算時候如果碰到一個hadoop的單詞就會記錄為1,但是這篇文章里hadoop可能會出現n多次,那么map輸出文件冗余就會很多,因此在reduce計算前對相同的key做一個合并操作,那么文件會變小,這樣就提高了寬帶的傳輸效率,畢竟hadoop計算力寬帶資源往往是計算的瓶頸也是最為寶貴的資源,但是combiner操作是有風險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終輸入,
例如:
如果計算只是求總數,最大值,最小值可以使用combiner,但是做平均值計算使用combiner的話,最終的reduce計算結果就會出錯。
4)shuffle階段
將map的輸出作為reduce的輸入的過程就是shuffle了。
5)reduce階段
和map函數一樣也是程序員編寫的,最終結果是存儲在hdfs上的。
?
五、Mapreduce框架的相關問題
jobtracker的單點故障
jobtracker和hdfs的namenode一樣也存在單點故障,單點故障一直是hadoop被人詬病的大問題,為什么hadoop的做的文件系統和mapreduce計算框架都是高容錯的,但是最重要的管理節點的故障機制卻如此不好,我認為主要是namenode和jobtracker在實際運行中都是在內存操作,而做到內存的容錯就比較復雜了,只有當內存數據被持久化后容錯才好做,namenode和jobtracker都可以備份自己持久化的文件,但是這個持久化都會有延遲,因此真的出故障,任然不能整體恢復,另外hadoop框架里包含zookeeper框架,zookeeper可以結合jobtracker,用幾臺機器同時部署jobtracker,保證一臺出故障,有一臺馬上能補充上,不過這種方式也沒法恢復正在跑的mapreduce任務。
?
六、Mapreduce的單詞計數實例
public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {}}public static void main(String[] args) throws Exception {//…?} }(1)Map
?public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{??private final static IntWritable one = new IntWritable(1);private Text word = new Text();???????public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}map的方法
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
這里有三個參數,前面兩個Object key, Text value就是輸入的key和value,第三個參數Context context這是可以記錄輸入的key和value
例如:context.write(word, one);此外context還會記錄map運算的狀態。
(2)reduce
?public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}reduce函數的方法
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}
reduce函數的輸入也是一個key/value的形式,
不過它的value是一個迭代器的形式Iterable<IntWritable> values,
也就是說reduce的輸入是一個key對應一組的值的value,reduce也有context和map的context作用一致。
(3)main函數
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} Configuration conf = new Configuration();運行mapreduce程序前都要初始化Configuration,該類主要是讀取mapreduce系統配置信息,這些信息包括hdfs還有mapreduce,也就是安裝hadoop時候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解為啥要這么做,這個是沒有深入思考mapreduce計算框架造成,我們程序員開發mapreduce時候只是在填空,在map函數和reduce函數里編寫實際進行的業務邏輯,其它的工作都是交給mapreduce框架自己操作的,但是至少我們要告訴它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而這些信息就在conf包下的配置文件里。
? ? String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}If的語句好理解,就是運行WordCount程序時候一定是兩個參數,如果不是就會報錯退出。至于第一句里的GenericOptionsParser類,它是用來解釋常用hadoop命令,并根據需要為Configuration對象設置相應的值,其實平時開發里我們不太常用它,而是讓類實現Tool接口,然后再main函數里使用ToolRunner運行程序,而ToolRunner內部會調用GenericOptionsParser。
? ? Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);第一行就是在構建一個job,在mapreduce框架里一個mapreduce任務也叫mapreduce作業也叫做一個mapreduce的job,而具體的map和reduce運算就是task了,這里我們構建一個job,構建時候有兩個參數,一個是conf這個就不累述了,一個是這個job的名稱。
第二行就是裝載程序員編寫好的計算程序,例如我們的程序類名就是WordCount了。這里我要做下糾正,雖然我們編寫mapreduce程序只需要實現map函數和reduce函數,但是實際開發我們要實現三個類,第三個類是為了配置mapreduce如何運行map和reduce函數,準確的說就是構建一個mapreduce能執行的job了,例如WordCount類。
第三行和第五行就是裝載map函數和reduce函數實現類了,這里多了個第四行,這個是裝載Combiner類,這個我后面講mapreduce運行機制時候會講述,其實本例去掉第四行也沒有關系,但是使用了第四行理論上運行效率會更好。
這個是定義輸出的key/value的類型,也就是最終存儲在hdfs上結果文件的key/value的類型。?
? FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);第一行就是構建輸入的數據文件,
第二行是構建輸出的數據文件,
最后一行如果job運行成功了,我們的程序就會正常退出。FileInputFormat和FileOutputFormat可以設置輸入輸出文件路徑,
mapreduce計算時候,輸入文件必須存在,要不直Mr任務直接退出。輸出一般是一個文件夾,而且該文件夾不能存在。
?
?
總結
以上是生活随笔為你收集整理的Hadoop入门(六)Mapreduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎么选择进口电脑椅怎么选择进口电脑椅子
- 下一篇: Hadoop入门(十二)Intellij