MapReduce过程详解及其性能优化
http://blog.csdn.net/aijiudu/article/details/72353510
廢話不說直接來一張圖如下:
?
從JVM的角度看Map和Reduce
Map階段包括:
第一讀數(shù)據(jù):從HDFS讀取數(shù)據(jù)
?
1、問題:讀取數(shù)據(jù)產(chǎn)生多少個Mapper??
? ? Mapper數(shù)據(jù)過大的話,會產(chǎn)生大量的小文件,由于Mapper是基于虛擬機的,過多的Mapper創(chuàng)建和初始化及關(guān)閉虛擬機都會消耗大量的硬件資源;
? ??Mapper數(shù)太小,并發(fā)度過小,Job執(zhí)行時間過長,無法充分利用分布式硬件資源;
2、Mapper數(shù)量由什么決定???
??(1)輸入文件數(shù)目
??(2)輸入文件的大小
??(3)配置參數(shù)
這三個因素決定的。
涉及參數(shù):
? ? mapreduce.input.fileinputformat.split.minsize //啟動map最小的split size大小,默認(rèn)0
? ? mapreduce.input.fileinputformat.split.maxsize //啟動map最大的split size大小,默認(rèn)256M
? ? dfs.block.size//block塊大小,默認(rèn)64M
? ? 計算公式:splitSize =? Math.max(minSize, Math.min(maxSize, blockSize));
? ? 例如默認(rèn)情況下:例如一個文件800M,Block大小是128M,那么Mapper數(shù)目就是7個。6個Mapper處理的數(shù)據(jù)是128M,1個Mapper處理的數(shù)據(jù)是32M;
再例如一個目錄下有三個文件大小分別為:5M10M 150M 這個時候其實會產(chǎn)生四個Mapper處理的數(shù)據(jù)分別是5M,10M,128M,22M。
Mapper是基于文件自動產(chǎn)生的,如果想要自己控制Mapper的個數(shù)???
就如上面,5M,10M的數(shù)據(jù)很快處理完了,128M要很長時間;這個就需要通過參數(shù)的控制來調(diào)節(jié)Mapper的個數(shù)。
減少Mapper的個數(shù)的話,就要合并小文件,這種小文件有可能是直接來自于數(shù)據(jù)源的小文件,也可能是Reduce產(chǎn)生的小文件。
設(shè)置合并器:(set都是在hive腳本,也可以配置Hadoop)
? ??設(shè)置合并器本身:
? ??set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
? ??set hive.merge.mapFiles=true;
? ??set hive.merge.mapredFiles=true;
? ??set hive.merge.size.per.task=256000000;//每個Mapper要處理的數(shù)據(jù),就把上面的5M10M……合并成為一個
一般還要配合一個參數(shù):
? ??set mapred.max.split.size=256000000 // mapred切分的大小
? ??set mapred.min.split.size.per.node=128000000//低于128M就算小文件,數(shù)據(jù)在一個節(jié)點會合并,在多個不同的節(jié)點會把數(shù)據(jù)抓過來進(jìn)行合并。
Hadoop中的參數(shù):可以通過控制文件的數(shù)量控制mapper數(shù)量
? ??mapreduce.input.fileinputformat.split.minsize(default:0),小于這個值會合并
? ??mapreduce.input.fileinputformat.split.maxsize?大于這個值會切分
第二處理數(shù)據(jù):
Partition說明
對于map輸出的每一個鍵值對,系統(tǒng)都會給定一個partition,partition值默認(rèn)是通過計算key的hash值后對Reduce task的數(shù)量取模獲得。如果一個鍵值對的partition值為1,意味著這個鍵值對會交給第一個Reducer處理。
自定義partitioner的情況:
??? 1、我們知道每一個Reduce的輸出都是有序的,但是將所有Reduce的輸出合并到一起卻并非是全局有序的,如果要做到全局有序,我們該怎么做呢?最簡單的方式,只設(shè)置一個Reduce task,但是這樣完全發(fā)揮不出集群的優(yōu)勢,而且能應(yīng)對的數(shù)據(jù)量也很受限。最佳的方式是自己定義一個Partitioner,用輸入數(shù)據(jù)的最大值除以系統(tǒng)Reduce task數(shù)量的商作為分割邊界,也就是說分割數(shù)據(jù)的邊界為此商的1倍、2倍至numPartitions-1倍,這樣就能保證執(zhí)行partition后的數(shù)據(jù)是整體有序的。
???2、解決數(shù)據(jù)傾斜:另一種需要我們自己定義一個Partitioner的情況是各個Reduce task處理的鍵值對數(shù)量極不平衡。對于某些數(shù)據(jù)集,由于很多不同的key的hash值都一樣,導(dǎo)致這些鍵值對都被分給同一個Reducer處理,而其他的Reducer處理的鍵值對很少,從而拖延整個任務(wù)的進(jìn)度。當(dāng)然,編寫自己的Partitioner必須要保證具有相同key值的鍵值對分發(fā)到同一個Reducer。
???3、自定義的Key包含了好幾個字段,比如自定義key是一個對象,包括type1,type2,type3,只需要根據(jù)type1去分發(fā)數(shù)據(jù),其他字段用作二次排序。
環(huán)形緩沖區(qū)
? ??Map的輸出結(jié)果是由collector處理的,每個Map任務(wù)不斷地將鍵值對輸出到在內(nèi)存中構(gòu)造的一個環(huán)形數(shù)據(jù)結(jié)構(gòu)中。使用環(huán)形數(shù)據(jù)結(jié)構(gòu)是為了更有效地使用內(nèi)存空間,在內(nèi)存中放置盡可能多的數(shù)據(jù)。
? ??這個數(shù)據(jù)結(jié)構(gòu)其實就是個字節(jié)數(shù)組,叫Kvbuffer,名如其義,但是這里面不光放置了數(shù)據(jù),還放置了一些索引數(shù)據(jù),給放置索引數(shù)據(jù)的區(qū)域起了一個Kvmeta的別名,在Kvbuffer的一塊區(qū)域上穿了一個IntBuffer(字節(jié)序采用的是平臺自身的字節(jié)序)的馬甲。數(shù)據(jù)區(qū)域和索引數(shù)據(jù)區(qū)域在Kvbuffer中是相鄰不重疊的兩個區(qū)域,用一個分界點來劃分兩者,分界點不是亙古不變的,而是每次Spill之后都會更新一次。初始的分界點是0,數(shù)據(jù)的存儲方向是向上增長,索引數(shù)據(jù)的存儲方向是向下增長Kvbuffer的存放指針bufindex是一直悶著頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之后,bufindex增長為4,一個Int型的value寫完之后,bufindex增長為8。
? ??索引是對在kvbuffer中的鍵值對的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,占用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數(shù)據(jù)。比如Kvindex初始位置是-4,當(dāng)?shù)谝粋€鍵值對寫完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然后Kvindex跳到-8位置,等第二個鍵值對和索引寫完之后,Kvindex跳到-12位置。
第三寫數(shù)據(jù)到磁盤
? ??Mapper中的Kvbuffer的大小默認(rèn)100M,可以通過mapreduce.task.io.sort.mb(default:100)參數(shù)來調(diào)整。可以根據(jù)不同的硬件尤其是內(nèi)存的大小來調(diào)整,調(diào)大的話,會減少磁盤spill的次數(shù)此時如果內(nèi)存足夠的話,一般都會顯著提升性能。spill一般會在Buffer空間大小的80%開始進(jìn)行spill(因為spill的時候還有可能別的線程在往里寫數(shù)據(jù),因為還預(yù)留空間,有可能有正在寫到Buffer中的數(shù)據(jù)),可以通過mapreduce.map.sort.spill.percent(default:0.80)進(jìn)行調(diào)整,Map Task在計算的時候會不斷產(chǎn)生很多spill文件,在Map Task結(jié)束前會對這些spill文件進(jìn)行合并,這個過程就是merge的過程。mapreduce.task.io.sort.factor(default:10),代表進(jìn)行merge的時候最多能同時merge多少spill,如果有100個spill個文件,此時就無法一次完成整個merge的過程,這個時候需要調(diào)大mapreduce.task.io.sort.factor(default:10)來減少merge的次數(shù),從而減少磁盤的操作;
Spill這個重要的過程是由Spill線程承擔(dān),Spill線程從Map任務(wù)接到“命令”之后就開始正式干活,干的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具爭議性的Sort。
? ??Combiner存在的時候,此時會根據(jù)Combiner定義的函數(shù)對map的結(jié)果進(jìn)行合并,什么時候進(jìn)行Combiner操作呢???和Map在一個JVM中,是由min.num.spill.for.combine的參數(shù)決定的,默認(rèn)是3,也就是說spill的文件數(shù)在默認(rèn)情況下由三個的時候就要進(jìn)行combine操作,最終減少磁盤數(shù)據(jù);
減少磁盤IO和網(wǎng)絡(luò)IO還可以進(jìn)行:壓縮,對spill,merge文件都可以進(jìn)行壓縮。中間結(jié)果非常的大,IO成為瓶頸的時候壓縮就非常有用,可以通過mapreduce.map.output.compress(default:false)設(shè)置為true進(jìn)行壓縮,數(shù)據(jù)會被壓縮寫入磁盤,讀數(shù)據(jù)讀的是壓縮數(shù)據(jù)需要解壓,在實際經(jīng)驗中Hive在Hadoop的運行的瓶頸一般都是IO而不是CPU,壓縮一般可以10倍的減少IO操作,壓縮的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一種比較平衡選擇,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)參數(shù)設(shè)置。但這個過程會消耗CPU,適合IO瓶頸比較大。
Shuffle和Reduce階段包括:
一、Copy
? ??1、由于job的每一個map都會根據(jù)reduce(n)數(shù)將數(shù)據(jù)分成map 輸出結(jié)果分成n個partition,所以map的中間結(jié)果中是有可能包含每一個reduce需要處理的部分?jǐn)?shù)據(jù)的。所以,為了優(yōu)化reduce的執(zhí)行時間,hadoop中是等job的第一個map結(jié)束后,所有的reduce就開始嘗試從完成的map中下載該reduce對應(yīng)的partition部分?jǐn)?shù)據(jù),因此map和reduce是交叉進(jìn)行的,其實就是shuffle。Reduce任務(wù)通過HTTP向各個Map任務(wù)拖取(下載)它所需要的數(shù)據(jù)(網(wǎng)絡(luò)傳輸),Reducer是如何知道要去哪些機器取數(shù)據(jù)呢?一旦map任務(wù)完成之后,就會通過常規(guī)心跳通知應(yīng)用程序的Application Master。reduce的一個線程會周期性地向master詢問,直到提取完所有數(shù)據(jù)(如何知道提取完?)數(shù)據(jù)被reduce提走之后,map機器不會立刻刪除數(shù)據(jù),這是為了預(yù)防reduce任務(wù)失敗需要重做。因此map輸出數(shù)據(jù)是在整個作業(yè)完成之后才被刪除掉的。
? ??2、reduce進(jìn)程啟動數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請求maptask所在的TaskTracker獲取maptask的輸出文件。由于map通常有許多個,所以對一個reduce來說,下載也可以是并行的從多個map下載,那到底同時到多少個Mapper下載數(shù)據(jù)??這個并行度是可以通過mapreduce.reduce.shuffle.parallelcopies(default5)調(diào)整。默認(rèn)情況下,每個Reducer只會有5個map端并行的下載線程在從map下數(shù)據(jù),如果一個時間段內(nèi)job完成的map有100個或者更多,那么reduce也最多只能同時下載5個map的數(shù)據(jù),所以這個參數(shù)比較適合map很多并且完成的比較快的job的情況下調(diào)大,有利于reduce更快的獲取屬于自己部分的數(shù)據(jù)。?在Reducer內(nèi)存和網(wǎng)絡(luò)都比較好的情況下,可以調(diào)大該參數(shù);
? ??3、reduce的每一個下載線程在下載某個map數(shù)據(jù)的時候,有可能因為那個map中間結(jié)果所在機器發(fā)生錯誤,或者中間結(jié)果的文件丟失,或者網(wǎng)絡(luò)瞬斷等等情況,這樣reduce的下載就有可能失敗,所以reduce的下載線程并不會無休止的等待下去,當(dāng)一定時間后下載仍然失敗,那么下載線程就會放棄這次下載,并在隨后嘗試從另外的地方下載(因為這段時間map可能重跑)。reduce下載線程的這個最大的下載時間段是可以通過mapreduce.reduce.shuffle.read.timeout(default180000秒)調(diào)整的。如果集群環(huán)境的網(wǎng)絡(luò)本身是瓶頸,那么用戶可以通過調(diào)大這個參數(shù)來避免reduce下載線程被誤判為失敗的情況。一般情況下都會調(diào)大這個參數(shù),這是企業(yè)級最佳實戰(zhàn)。
二、MergeSort
?????1、這里的merge和map端的merge動作類似,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中,然后當(dāng)使用內(nèi)存達(dá)到一定量的時候才spill磁盤。這里的緩沖區(qū)大小要比map端的更為靈活,它基于JVM的heap size設(shè)置。這個內(nèi)存大小的控制就不像map一樣可以通過io.sort.mb來設(shè)定了,而是通過另外一個參數(shù)?mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f?源碼里面寫死了)?來設(shè)置,這個參數(shù)其實是一個百分比,意思是說,shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task。JVM的heapsize的70%。內(nèi)存到磁盤merge的啟動門限可以通過mapreduce.reduce.shuffle.merge.percent(default0.66)配置。也就是說,如果該reduce task的最大heap使用量(通常通過mapreduce.admin.reduce.child.java.opts來設(shè)置,比如設(shè)置為-Xmx1024m)的一定比例用來緩存數(shù)據(jù)。默認(rèn)情況下,reduce會使用其heapsize的70%來在內(nèi)存中緩存數(shù)據(jù)。假設(shè)?mapreduce.reduce.shuffle.input.buffer.percent?為0.7,reducetask的max heapsize為1G,那么用來做下載數(shù)據(jù)緩存的內(nèi)存就為大概700MB左右。這700M的內(nèi)存,跟map端一樣,也不是要等到全部寫滿才會往磁盤刷的,而是當(dāng)這700M中被使用到了一定的限度(通常是一個百分比),就會開始往磁盤刷(刷磁盤前會先做sortMerge)。這個限度閾值也是可以通過參數(shù)?mapreduce.reduce.shuffle.merge.percent(default0.66)來設(shè)定。與map 端類似,這也是溢寫的過程,這個過程中如果你設(shè)置有Combiner,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件。這種merge方式一直在運行,直到?jīng)]有map端的數(shù)據(jù)時才結(jié)束,然后啟動磁盤到磁盤的merge方式生成最終的那個文件。
????這里需要強調(diào)的是,merge有三種形式:1)內(nèi)存到內(nèi)存(memToMemMerger)2)內(nèi)存中Merge(inMemoryMerger)3)磁盤上的Merge(onDiskMerger)具體包括兩個:(一)Copy過程中磁盤合并(二)磁盤到磁盤。
??? (1)內(nèi)存到內(nèi)存Merge(memToMemMerger)??? Hadoop定義了一種MemToMem合并,這種合并將內(nèi)存中的map輸出合并,然后再寫入內(nèi)存。這種合并默認(rèn)關(guān)閉,可以通過mapreduce.reduce.merge.memtomem.enabled(default:false)
打開,當(dāng)map輸出文件達(dá)到mapreduce.reduce.merge.memtomem.threshold時,觸發(fā)這種合并。
????(2)內(nèi)存中Merge(inMemoryMerger):當(dāng)緩沖中數(shù)據(jù)達(dá)到配置的閾值時,這些數(shù)據(jù)在內(nèi)存中被合并、寫入機器磁盤。閾值有2種配置方式:
??????? 配置內(nèi)存比例:前面提到reduceJVM堆內(nèi)存的一部分用于存放來自map任務(wù)的輸入,在這基礎(chǔ)之上配置一個開始合并數(shù)據(jù)的比例。假設(shè)用于存放map輸出的內(nèi)存為500M,mapreduce.reduce.shuffle.merge.percent配置為0.66,則當(dāng)內(nèi)存中的數(shù)據(jù)達(dá)到330M的時候,會觸發(fā)合并寫入。
???配置map輸出數(shù)量: 通過mapreduce.reduce.merge.inmem.threshold配置。在合并的過程中,會對被合并的文件做全局的排序。如果作業(yè)配置了Combiner,則會運行combine函數(shù),減少寫入磁盤的數(shù)據(jù)量。
? ??(3)磁盤上的Merge(onDiskMerger):
??????????? (3.1)Copy過程中磁盤Merge:在copy過來的數(shù)據(jù)不斷寫入磁盤的過程中,一個后臺線程會把這些文件合并為更大的、有序的文件。如果map的輸出結(jié)果進(jìn)行了壓縮,則在合并過程中,需要在內(nèi)存中解壓后才能給進(jìn)行合并。這里的合并只是為了減少最終合并的工作量,也就是在map輸出還在拷貝時,就開始進(jìn)行一部分合并工作。合并的過程一樣會進(jìn)行全局排序。
??????????? (3.2)最終磁盤中Merge:當(dāng)所有map輸出都拷貝完畢之后,所有數(shù)據(jù)被最后合并成一個整體有序的文件,作為reduce任務(wù)的輸入。這個合并過程是一輪一輪進(jìn)行的,最后一輪的合并結(jié)果直接推送給reduce作為輸入,節(jié)省了磁盤操作的一個來回。最后(所以map輸出都拷貝到reduce之后)進(jìn)行合并的map輸出可能來自合并后寫入磁盤的文件,也可能來及內(nèi)存緩沖,在最后寫入內(nèi)存的map輸出可能沒有達(dá)到閾值觸發(fā)合并,所以還留在內(nèi)存中。
???每一輪合并不一定合并平均數(shù)量的文件數(shù),指導(dǎo)原則是使用整個合并過程中寫入磁盤的數(shù)據(jù)量最小,為了達(dá)到這個目的,則需要最終的一輪合并中合并盡可能多的數(shù)據(jù),因為最后一輪的數(shù)據(jù)直接作為reduce的輸入,無需寫入磁盤再讀出。因此我們讓最終的一輪合并的文件數(shù)達(dá)到最大,即合并因子的值,通過mapreduce.task.io.sort.factor(default:10)來配置。
如上圖:Reduce階段中一個Reduce過程 可能的合并方式為:假設(shè)現(xiàn)在有20個map輸出文件,合并因子配置為5,則需要4輪的合并。最終的一輪確保合并5個文件,其中包括2個來自前2輪的合并結(jié)果,因此原始的20個中,再留出3個給最終一輪。
三、Reduce函數(shù)調(diào)用(用戶自定義業(yè)務(wù)邏輯)
??? 1、當(dāng)reduce將所有的map上對應(yīng)自己partition的數(shù)據(jù)下載完成后,就會開始真正的reduce計算階段。reducetask真正進(jìn)入reduce函數(shù)的計算階段,由于reduce計算時肯定也是需要消耗內(nèi)存的,而在讀取reduce需要的數(shù)據(jù)時,同樣是需要內(nèi)存作為buffer,這個參數(shù)是控制,reducer需要多少的內(nèi)存百分比來作為reduce讀已經(jīng)sort好的數(shù)據(jù)的buffer大小??默認(rèn)用多大內(nèi)存呢??默認(rèn)情況下為0,也就是說,默認(rèn)情況下,reduce是全部從磁盤開始讀處理數(shù)據(jù)。可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代碼MergeManagerImpl.java:674行)來設(shè)置reduce的緩存。如果這個參數(shù)大于0,那么就會有一定量的數(shù)據(jù)被緩存在內(nèi)存并輸送給reduce,當(dāng)reduce計算邏輯消耗內(nèi)存很小時,可以分一部分內(nèi)存用來緩存數(shù)據(jù),可以提升計算的速度。所以默認(rèn)情況下都是從磁盤讀取數(shù)據(jù),如果內(nèi)存足夠大的話,務(wù)必設(shè)置該參數(shù)讓reduce直接從緩存讀數(shù)據(jù),這樣做就有點Spark Cache的感覺;
??? 2、Reduce在這個階段,框架為已分組的輸入數(shù)據(jù)中的每個 <key, (list of values)>對調(diào)用一次?reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任務(wù)的輸出通常是通過調(diào)用 OutputCollector.collect(WritableComparable,Writable)寫入文件系統(tǒng)的。Reducer的輸出是沒有排序的。
性能調(diào)優(yōu)
? ??如果能夠根據(jù)情況對shuffle過程進(jìn)行調(diào)優(yōu),對于提供MapReduce性能很有幫助。相關(guān)的參數(shù)配置列在后面的表格中。
一個通用的原則是給shuffle過程分配盡可能大的內(nèi)存,當(dāng)然你需要確保map和reduce有足夠的內(nèi)存來運行業(yè)務(wù)邏輯。因此在實現(xiàn)Mapper和Reducer時,應(yīng)該盡量減少內(nèi)存的使用,例如避免在Map中不斷地疊加。
運行map和reduce任務(wù)的JVM,內(nèi)存通過mapred.child.java.opts屬性來設(shè)置,盡可能設(shè)大內(nèi)存。容器的內(nèi)存大小通過mapreduce.map.memory.mb和mapreduce.reduce.memory.mb來設(shè)置,默認(rèn)都是1024M。
map優(yōu)化
? ??在map端,避免寫入多個spill文件可能達(dá)到最好的性能,一個spill文件是最好的。通過估計map的輸出大小,設(shè)置合理的mapreduce.task.io.sort.*屬性,使得spill文件數(shù)量最小。例如盡可能調(diào)大mapreduce.task.io.sort.mb。
map端相關(guān)的屬性如下表:
reduce優(yōu)化
? ??在reduce端,如果能夠讓所有數(shù)據(jù)都保存在內(nèi)存中,可以達(dá)到最佳的性能。通常情況下,內(nèi)存都保留給reduce函數(shù),但是如果reduce函數(shù)對內(nèi)存需求不是很高,將mapreduce.reduce.merge.inmem.threshold(觸發(fā)合并的map輸出文件數(shù))設(shè)為0,mapreduce.reduce.input.buffer.percent(用于保存map輸出文件的堆內(nèi)存比例)設(shè)為1.0,可以達(dá)到很好的性能提升。在2008年的TB級別數(shù)據(jù)排序性能測試中,Hadoop就是通過將reduce的中間數(shù)據(jù)都保存在內(nèi)存中勝利的。
reduce端相關(guān)屬性:
通用優(yōu)化
Hadoop默認(rèn)使用4KB作為緩沖,這個算是很小的,可以通過io.file.buffer.size來調(diào)高緩沖池大小。
轉(zhuǎn)載于:https://www.cnblogs.com/felixzh/p/8604188.html
總結(jié)
以上是生活随笔為你收集整理的MapReduce过程详解及其性能优化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python201811210作业4
- 下一篇: ACM数论-素数