MapReduce运行机制
相關(guān)鏈接?
MapReduce中Shuffle機(jī)制詳解——Map端Shuffle鏈接?
MapReduce中Shuffle機(jī)制詳解——Reduce端Shuffle鏈接
MapReduce將作業(yè)job的整個(gè)運(yùn)行過(guò)程分為兩個(gè)階段:Map階段和Reduce階段。按照時(shí)間順序包括:輸入分片(input split)、map階段、combiner階段、shuffle階段和reduce階段。系統(tǒng)執(zhí)行排序、將map輸出作為輸入傳給reducer的過(guò)程稱(chēng)為shuffle(shuffle是MapReducer的心臟)
Map階段由一定數(shù)量的 Map Task組成?
1. 輸入數(shù)據(jù)格式解析: InputFormat?
2. 輸入數(shù)據(jù)處理: Mapper?
3. 本地合并: Combiner(local reduce)?
4. 數(shù)據(jù)分組: Partitioner
Reduce階段由一定數(shù)量的 Reduce Task組成?
1. 數(shù)據(jù)遠(yuǎn)程拷貝?
2. 數(shù)據(jù)按照key排序?
3. 數(shù)據(jù)處理: Reducer?
4. 數(shù)據(jù)輸出格式: OutputFormat
1、流程簡(jiǎn)介?
?
各個(gè)map函數(shù)對(duì)所劃分的數(shù)據(jù)并行處理,從不同的輸入數(shù)據(jù)產(chǎn)生不同的中間結(jié)果輸出?
各個(gè)reduce也各自并行計(jì)算,各自負(fù)責(zé)處理不同的中間結(jié)果數(shù)據(jù)集合進(jìn)行reduce處理之前,必須等到所有的map函數(shù)做完?
在進(jìn)入reduce前需要有一個(gè)同步障(barrier)?
這個(gè)階段也負(fù)責(zé)對(duì)map的中間結(jié)果數(shù)據(jù)進(jìn)行收集整理(aggregation & shuffle)處理,以便reduce更有效地計(jì)算最終結(jié)果, 最終匯總所有reduce的輸出結(jié)果即可獲得最終結(jié)果
第一步:假設(shè)一個(gè)文件有三行英文單詞作為 MapReduce 的Input(輸入)這里經(jīng)過(guò) Splitting 過(guò)程把文件分割為3塊。分割后的3塊數(shù)據(jù)就可以并行處理,每一塊交給一個(gè) map 線程處理。?
第二步:每個(gè) map 線程中,以每個(gè)單詞為key,以1作為詞頻數(shù)value,然后輸出。?
第三步:每個(gè) map 的輸出要經(jīng)過(guò) shuffling(混洗),將相同的單詞key放在一個(gè)桶里面,然后交給 reduce 處理。?
第四步:reduce 接受到 shuffling 后的數(shù)據(jù), 會(huì)將相同的單詞進(jìn)行合并,得到每個(gè)單詞的詞頻數(shù),最后將統(tǒng)計(jì)好的每個(gè)單詞的詞頻數(shù)作為輸出結(jié)果。
上述就是 MapReduce 的大致流程,前兩步可以看做 map 階段,后兩步可以看做 reduce 階段。
2、輸入分片與類(lèi)型?
MapReduce將待處理數(shù)據(jù)執(zhí)行邏輯切片(即按照一個(gè)特定切片大小,將待處理數(shù)據(jù)劃分成邏輯上的多個(gè)split),然后每一個(gè)split分配一個(gè)mapTask并行實(shí)例處理
輸入分片(split)與map對(duì)應(yīng),是每個(gè)map處理的唯一單位。每個(gè)分片包括多條記錄,每個(gè)記錄都有對(duì)應(yīng)鍵值對(duì)。?
輸入切片的接口:InputSplit接口(不需要開(kāi)發(fā)人員直接處理,由InputFormat創(chuàng)建)
輸入分片(input split):在進(jìn)行map計(jì)算之前,mapreduce會(huì)根據(jù)輸入文件計(jì)算輸入分片(input split),每個(gè)輸入分片(input split)針對(duì)一個(gè)map任務(wù),輸入分片(input split)存儲(chǔ)的并非數(shù)據(jù)本身,而是一個(gè)分片長(zhǎng)度和一個(gè)記錄數(shù)據(jù)的位置的數(shù)組,輸入分片(input split)往往和hdfs的block(塊)關(guān)系很密切,存儲(chǔ)位置供MapReduce使用以便將map任務(wù)盡量放在分片數(shù)據(jù)附近,而長(zhǎng)度用來(lái)排序分片,以便優(yōu)化處理最大的分片,從而最小化作業(yè)運(yùn)行時(shí)間。
假如我們?cè)O(shè)定hdfs的塊的大小是64mb,如果我們輸入有三個(gè)文件,大小分別是3mb、65mb和127mb,那么mapreduce會(huì)把3mb文件分為一個(gè)輸入分片(input split),65mb則是兩個(gè)輸入分片(input split)而127mb也是兩個(gè)輸入分片(input split),換句話說(shuō)我們?nèi)绻趍ap計(jì)算前做輸入分片調(diào)整,例如合并小文件,那么就會(huì)有5個(gè)map任務(wù)將執(zhí)行,而且每個(gè)map執(zhí)行的數(shù)據(jù)大小不均,這個(gè)也是mapreduce優(yōu)化計(jì)算的一個(gè)關(guān)鍵點(diǎn)。
輸入類(lèi)型
FileInputFormat類(lèi):?
FileInputFormat 是所有使用文件作為其數(shù)據(jù)源的 InputFormat 實(shí)現(xiàn)的基類(lèi)。它提供了兩個(gè)功能:一個(gè)定義哪些文件包含在一個(gè)作業(yè)的輸入中;一個(gè)為輸入文件生成分片的實(shí)現(xiàn)。把分片分割成記錄的作業(yè)由其子類(lèi)來(lái)完成。
TextlnputFormat類(lèi):?
TextInputFormat 是默認(rèn)的 InputFormat。每條記錄是一行輸入。鍵是 LongWritable 類(lèi)型,存儲(chǔ)該行在整個(gè)文件中的字節(jié)偏移量。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車(chē)符),它是 Text 類(lèi)型的。但是輸入分片和HDFS塊之間可能不能很好的匹配,出現(xiàn)跨塊的情況
KeyValueTextlnputFormat類(lèi):?
TextInputFormat 的鍵,即每一行在文件中的字節(jié)偏移量,通常并不是特別有用。通常情況下,文件中的每一行是一個(gè)鍵/值對(duì),使用某個(gè)分界符進(jìn)行分隔,比如制表符。例如 以下由 Hadoop 默認(rèn) OutputFormat(即 TextOutputFormat)產(chǎn)生的輸出。如果要正確處理這類(lèi) 文件,KeyValueTextInputFormat 比較合適??梢酝ㄟ^(guò) key.value.separator.in.input.line 屬性來(lái)指定分隔符。它的默認(rèn)值是一個(gè)制表符。
NLineInputFormat類(lèi):?
與TextInputFormat一樣,鍵是文件中行的字節(jié)偏移量,值是行本身。主要是希望mapper收到固定行數(shù)的輸入。
MultipleInputs多種輸入:?
MultipleInputs類(lèi)處理多種格式的輸入,允許為每個(gè)輸入路徑指定InputFormat和Mapper。兩個(gè)mapper的輸出類(lèi)型是一樣的,所以reducer看到的是聚集后的map輸出,并不知道輸入是不同的mapper產(chǎn)生的。?
重載版本:addInputPath(),沒(méi)有mapper參數(shù),主要支持多種輸入格式只有一個(gè)mapper。?
3、Map與Reduce的個(gè)數(shù)
Map任務(wù)的個(gè)數(shù)
讀取數(shù)據(jù)產(chǎn)生多少個(gè)Mapper???
Mapper數(shù)據(jù)過(guò)大的話,會(huì)產(chǎn)生大量的小文件,過(guò)多的Mapper創(chuàng)建和初始化都會(huì)消耗大量的硬件資源?
Mapper數(shù)太小,并發(fā)度過(guò)小,Job執(zhí)行時(shí)間過(guò)長(zhǎng),無(wú)法充分利用分布式硬件資源
Mapper數(shù)量由什么決定???
(1)輸入文件數(shù)目(2)輸入文件的大小(3)配置參數(shù) 這三個(gè)因素決定的。?
輸入的目錄中文件的數(shù)量決定多少個(gè)map會(huì)被運(yùn)行起來(lái),應(yīng)用針對(duì)每一個(gè)分片運(yùn)行一個(gè)map,一般而言,對(duì)于每一個(gè)輸入的文件會(huì)有一個(gè)map split。如果輸入文件太大,超過(guò)了hdfs塊的大小(128M)那么對(duì)于同一個(gè)輸入文件我們會(huì)有多余2個(gè)的map運(yùn)行起來(lái)。
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
reduce任務(wù)的個(gè)數(shù)
Reduce任務(wù)是一個(gè)數(shù)據(jù)聚合的步驟,數(shù)量默認(rèn)為1。而使用過(guò)多的Reduce任務(wù)則意味著復(fù)雜的shuffle,并使輸出文件的數(shù)量激增。
一個(gè)job的ReduceTasks數(shù)量是通過(guò)mapreduce.job.reduces參數(shù)設(shè)置 也可以通過(guò)編程的方式,調(diào)用Job對(duì)象的setNumReduceTasks()方法來(lái)設(shè)置 一個(gè)節(jié)點(diǎn)Reduce任務(wù)數(shù)量上限由mapreduce.tasktracker.reduce.tasks.maximum設(shè)置(默認(rèn)2)。可以采用以下探試法來(lái)決定Reduce任務(wù)的合理數(shù)量: 1.每個(gè)reducer都可以在Map任務(wù)完成后立即執(zhí)行:0.95 * (節(jié)點(diǎn)數(shù)量 * mapreduce.tasktracker.reduce.tasks.maximum) 2.較快的節(jié)點(diǎn)在完成第一個(gè)Reduce任務(wù)后,馬上執(zhí)行第二個(gè):1.75 * (節(jié)點(diǎn)數(shù)量 * mapreduce.tasktracker.reduce.tasks.maximum)- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
4、小文件合并?
Mapper是基于文件自動(dòng)產(chǎn)生的,如何自己控制Mapper的個(gè)數(shù)?需要通過(guò)參數(shù)的控制來(lái)調(diào)節(jié)Mapper的個(gè)數(shù)。減少M(fèi)apper的個(gè)數(shù)就要合并小文件,這種小文件有可能是直接來(lái)自于數(shù)據(jù)源的小文件,也可能是Reduce產(chǎn)生的小文件。
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
5、Map階段?
Map階段是由一定數(shù)量的 Map Task組成。這些Map Task可以同時(shí)運(yùn)行,每個(gè)Map Task又是由以下三個(gè)部分組成。?
?
1. InputFormat輸入數(shù)據(jù)格式解析組件:?
因?yàn)椴煌臄?shù)據(jù)可能存儲(chǔ)的數(shù)據(jù)格式不一樣,這就需要有一個(gè)InputFormat組件來(lái)解析這些數(shù)據(jù)的存放格式,默認(rèn)情況下,它提供了一個(gè)TextInputFormat文本文件輸入格式來(lái)解釋數(shù)據(jù)格式。?
它會(huì)將文件的每一行解釋成(key,value),key代表每行偏移量,value代表每行數(shù)據(jù)內(nèi)容,通常情況我們不需要自定義InputFormat,因?yàn)镸apReduce提供了多種支持不同數(shù)據(jù)格式InputFormat的實(shí)現(xiàn)?
?
2. Mapper輸入數(shù)據(jù)處理:這個(gè)Mapper是必須要實(shí)現(xiàn)的,因?yàn)楦鶕?jù)不同的業(yè)務(wù)對(duì)數(shù)據(jù)有不同的處理?
?
3. Partitioner數(shù)據(jù)分組:?
Mapper數(shù)據(jù)處理之后輸出之前,輸出key會(huì)經(jīng)過(guò)Partitioner分組或者分桶選擇不同的reduce,默認(rèn)的情況下Partitioner會(huì)對(duì)map輸出的key進(jìn)行hash取模。?
比如有6個(gè)ReduceTask,它就是模6,如果key的hash值為0,就選擇第0個(gè)ReduceTask(為1,選Reduce Task1)。這樣不同的map對(duì)相同單詞key,它的hash值取模是一樣的,所以會(huì)交給同一個(gè)reduce來(lái)處理。
6、Reduce階段
Reduce Task要遠(yuǎn)程拷貝每個(gè)map處理的結(jié)果,從每個(gè)map中讀取一部分結(jié)果,每個(gè)Reduce Task拷貝哪些數(shù)據(jù),是由上面Partitioner決定的。
Reduce Task讀取完數(shù)據(jù)后,要按照key進(jìn)行排序,相同的key被分到一組,交給同一個(gè)Reduce Task處理
以WordCount為例,相同的單詞key分到一組,交個(gè)同一個(gè)Reducer處理,這樣就實(shí)現(xiàn)了對(duì)每個(gè)單詞的詞頻統(tǒng)計(jì)。
Reducer統(tǒng)計(jì)的結(jié)果將按照OutputFormat格式輸出(默認(rèn)情況下的輸出格式為T(mén)extOutputFormat)
總結(jié)
以上是生活随笔為你收集整理的MapReduce运行机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 微信小程序公众平台如何生成小程序码
- 下一篇: 将bgr彩色矩阵归一化到0-255之间