日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

06 MapReduce工作机制

發(fā)布時間:2023/11/30 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 06 MapReduce工作机制 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

MapReduce作業(yè)的執(zhí)行流程

1、提交作業(yè)

在提交JobConf對象之後,用戶程序調用JobClient的runJob方法

runJob方法會先行調用JobSubmissionProtocol接口所定義的submitJob方法,並將作業(yè)提交給JobTracker。

緊接著,runJob不斷循環(huán),並在循環(huán)中調用JobSumissionProtocol的getTaskCompletionEvents方法,獲取TaskCompletionEvent類的對象實例,了解作業(yè)的實時執(zhí)行情況。如果發(fā)現(xiàn)作業(yè)運行狀態(tài)有更新,就將狀態(tài)報告給JobTracker。

作業(yè)完成後,如果成功則顯示作業(yè)計數(shù)器,否則,將導致作業(yè)失敗的錯誤記錄到控制臺。


submitJob方法所做的工作:

1)從JobTracker處獲取當前作業(yè)ID號;

2)檢查作業(yè)的輸入輸出路徑;

3)計算作業(yè)的輸入劃分,並將劃分信息寫入Job.split文件,如果寫入失敗就會返回錯誤。split信息包括文件名,此split在文件中的起始位置,split的location信息(位於哪個節(jié)點)。

4)將運行作業(yè)所需要的資源——JAR文件、配置文件、輸入劃分等——複製到作業(yè)對應的HDFS上;

5)調用JobTracker對象的submitJob()方法真正提交作業(yè)。


2、初始化作業(yè)

在客戶端用戶作業(yè)調度JobTracker對象的submitJob()方法後,JobTracker會把此調用放入內部的TaskScheduler變量中,然後進行調度,默認的調度方法是JobQueueTaskScheduler,也就是FIFO調度方式。

當客戶作業(yè)被調度執(zhí)行時,JobTracker會創(chuàng)建一個代表這個作業(yè)的JobInProgress對象,並將任務和記錄信息封裝到這個對象中,以便跟蹤任務的狀態(tài)和進程。

接下來JobInProgress對象的initTasks函數(shù)會對任務進行初始化操作。

初始化所做的工作:

1)從HDFS中讀取作業(yè)對應的job.split,爲後面的Map任務的分配做好準備;

2)創(chuàng)建並初始化Map任務和Reduce任務。

initTasks先根據(jù)輸入數(shù)據(jù)劃分信息中的個數(shù)設定Map Task的個數(shù),然後爲每個Map Task生成一個TaskInProgress來處理input split,並將Map Task放入nonRunningMapCache,以便在JobTracker向TaskTracker分配Map Task的時候使用。

接下來根據(jù)JobConf中的mapred.reduce.tasks屬性利用setNumReduceTasks()方法來設置reduce tasks的個數(shù),然後採用類似Map Task的方式將Reduce Task放入nonRunningReduces中,以便向TaskTracker分配Reduce Task時使用。

3)創(chuàng)建兩個初始化Task,根據(jù)個數(shù)和輸入劃分已經(jīng)配置的信息,分別初始化Map和Reduce。


3、分配任務

TaskTracker和JobTracker之間的通信和任務的分配是通過心跳機制完成的。

TaskTracker作爲一個單獨的JVM執(zhí)行一個簡單的循環(huán),主要實現(xiàn)每隔一段時間向JobTracker發(fā)送心跳(Heartbeat):告訴JobTracker此TaskTracker是否存活,是否準備執(zhí)行新的任務。

JobTracker接收到心跳信息,如果有待分配任務,它就會爲TaskTracker分配一個任務,並將分配信息封裝在心跳通信的返回值中返回給TaskTracker。

TaskTracker從心跳方法的Response中得知此TaskTracker需要做的事情,如果是一個新的Task則將它加入本機的任務隊列中。


TaskTracker首先發(fā)送自己的狀態(tài)(主要是Map任務和Reduce任務的個數(shù)是否小於上限),並根據(jù)自身條件選擇是否向JobTracker請求新的Task,最後發(fā)送心跳。

JobTracker接收到TaskTracker的心跳後首先分析心跳信息,如果發(fā)現(xiàn)TaskTracker在請求一個Task,那麼任務調度器就會將任務和任務信息封裝起來返回給TaskTracker。


針對Map任務和Reduce任務,TaskTracker有固定數(shù)量的任務槽(Map任務和Reduce任務的個數(shù)都有上限)。

當TaskTracker從JobTracker返回的心跳信息中獲取新的任務信息時,它會將Map任務或者Reduce任務加入對應的任務槽中。

在JobTracker爲TaskTracker分配Map任務時,爲了減小網(wǎng)絡帶寬,會考慮將map任務數(shù)據(jù)本地化。它會根據(jù)TaskTracker的網(wǎng)絡位置,選取一個距離此TaskTracker map任務最近的輸入劃分文件分配給此TaskTracker。

最好的情況是,劃分文件就在TaskTracker本地。


4、執(zhí)行任務

TaskTracker申請到新的任務之後,就要在本地運行任務了。

運行任務的第一步是將任務本地化(將任務運行所必須的數(shù)據(jù)、配置信息、程序代碼從HDFS複製到TaskTracker本地)。

主要通過調用localizeJob()方法來完成的,工作如下:

1)將job.split複製到本地;

2)將job.jar複製到本地;

3)將job的配置信息寫入job.xml;

4)創(chuàng)建本地任務目錄,解壓job.jar;

5)調用launchTaskForJob()方法發(fā)佈任務。


任務本地化後,可以通過調用launchTaskForJob()真正啓動起來。

lauchTaskForJob()又會調用launchTask()方法啓動任務。

launchTask()方法首先爲任務創(chuàng)建本地目錄,然後啓動TaskRunner。

在啓動TaskRunner後,對於Map任務,會啓動MapTaskRunner;對於Reduce任務則啓動ReduceTaskRunner。


之後,TaskRunner又會啓動新的Java虛擬機來運行每個任務。

以Map任務爲例,任務執(zhí)行的簡單流程是:

1)配置任務執(zhí)行參數(shù)(獲取Java程序的執(zhí)行環(huán)境和配置參數(shù)等);

2)在Child臨時文件表中添加Map任務信息(運行Map和Reduce任務的主進程是Child類);

3)配置log文件夾,然後配置Map任務的通信和輸出參數(shù);

4)讀取input split,生成RecordReader讀取數(shù)據(jù);

5)爲Map任務生成MapRunnable,依次從RecordReader中接收數(shù)據(jù),並調用Mapper的Map函數(shù)進行處理;

6)最後將Map函數(shù)的輸出調用collect收集到MapOutputBuffer中。


5、更新任務執(zhí)行進度和狀態(tài)

MapReduce作業(yè)是一個長時間運行的批量作業(yè),有時候可能需要運行數(shù)小時。

總體來講,MapReduce作業(yè)的進度由下面幾項組成:

Mapper或Reducer讀入或寫出一條記錄,在報告中設置狀態(tài)描述,增加計數(shù)器,調用Reporter對象的progress()方法。


由MapReduce作業(yè)分割成的每個任務都有一組計數(shù)器,它們對任務執(zhí)行過程中的進度組成事件進行計數(shù)。

如果任務要報告進度,它便會設置一個標誌以表明狀態(tài)變化將會發(fā)送到TaskTracker上。

另一個監(jiān)聽線程檢查到這標誌後,會告知TaskTracker當前的任務狀態(tài)。


同時,TaskTracker在每隔5秒發(fā)送給JobTracker的心跳中封裝任務狀態(tài),報告自己的任務執(zhí)行狀態(tài)。


通過心跳通信機制,所有TaskTracker的統(tǒng)計信息都會匯總到JobTracker處。

JobTracker將這些信息合併產(chǎn)生一個全局作業(yè)進度統(tǒng)計信息。

最後,JobClient通過每秒查看JobTracker來接收作業(yè)進度的最新狀態(tài)。


6、完成作業(yè)

當JobTracker接收到最後一個任務的已完成通知後,便把作業(yè)狀態(tài)設置爲成功。

JobClient也將及時告知用戶作業(yè)已完成。

最後從runJob()方法處返回。

在返回後,JobTracker會清空作業(yè)的工作狀態(tài),並指示TaskTracker也清空作業(yè)的工作狀態(tài),比如刪除中間輸出等。



錯誤處理機制

Hadoop利用冗餘數(shù)據(jù)來解決硬件故障,以保證數(shù)據(jù)安全和任務執(zhí)行。


硬件故障:

JobTracker機器故障——HA

TaskTracker機器故障:重新執(zhí)行任務


任務失敗:

用戶代碼缺陷或者進程崩潰引起的任務失敗。


用戶代碼缺陷會導致它在執(zhí)行過程中拋出異常。此時JVM進程會自動退出,並向TaskTracker父進程發(fā)送錯誤消息,同時錯誤消息也會寫入log文件,最後TaskTracker將此次任務嘗試標記失敗。

對於進程崩潰引起的任務失敗i,TaskTracker的監(jiān)聽程序會發(fā)現(xiàn)進程退出,此時TaskTracker也會將此次任務嘗試標記爲失敗。

對於死循環(huán)程序或執(zhí)行時間太長的程序,由於TaskTracker沒有接收到進度更新,它也會將此次任務嘗試標記爲失敗,並殺死程序對應的進程。


在以上情況中,TaskTracker將任務嘗試標記爲失敗之後會將TaskTracker自身的任務計數(shù)器減1,以便向JobTracker申請新的任務。

TaskTracker也會通過心跳機制告訴JobTracker本地的一個任務嘗試失敗。

JobTracker接到任務失敗的通知後,通過重置任務狀態(tài),將其加入到調度隊列來重新分配該任務執(zhí)行。

JobTracker會嘗試避免將失敗的任務再次分配給運行失敗的TaskTracker。

如果此任務嘗試了4次(次數(shù)可以進行設置)仍沒有完成,則整個任務失敗。


Shuffle和排序

Shuffle過程的性能與整個MapReduce的性能直接相關。

Shuffle過程包含在Map和Reduce兩端中。

在Map端是對Map的結果進行partition, sort, spill,然後merge屬於同一個劃分的輸出並寫在磁盤上,同時按照不同的劃分將結果發(fā)送給對應的Reduce(Map輸出的劃分與Reduce的對應關係由JobTracker確定)。

Reduce端又會將各個Map送來的屬於同一個劃分的輸出進行合併(merge),然後對merge的結果進行排序,最後交給Reduce處理。


Map端

Map的輸出結果是由collector函數(shù)處理的。

當輸出內存緩衝區(qū)內容達到設定的閾值時,把緩衝取內容spill到磁盤中。

collector函數(shù)將緩衝區(qū)中的內容寫出時,會調用sortAndSpill函數(shù)。

sortAndSpill每被調用一次就會創(chuàng)建一個spill文件,然後按照key對需要寫出的數(shù)據(jù)進行排序,最後按照劃分的順序將所有需要寫出的結果寫入這個spill文件中。

如果用戶作業(yè)配置了combiner類,那麼在寫出過程中會先調用combineAndSpill()再寫出,對結果進一步合併,是爲了讓Map的輸出數(shù)據(jù)更加緊湊。

Map任務結束之後,用mergeParts()將所有的spill文件中的數(shù)據(jù)按照劃分重新組織,以便Reduce處理。


Reduce端

分成三個階段:複製Map輸出、排序合併、Reduce處理。

Reduce定期向JobTracker獲取Map的輸出位置。一旦拿到位置,Reduce任務就會從此輸出對應的TaskTracker上複製輸出到本地,而不是等到所有的Map任務結束。如果Map的輸出很小,則會被複製到執(zhí)行Reduce任務的TaskTracker節(jié)點的內存中,否則會放入磁盤中。

在Reduce複製Map的輸出結果的同時,Reduce任務就進入了合併(merge)階段。這一階段主要的任務是將從各個Map TaskTracker上複製的Map輸出文件進行整合,並維持數(shù)據(jù)原來的順序。

reduce端的最後階段就是對合併的文件進行reduce處理。


shuffle過程的優(yōu)化

在一個任務中,完成單位任務使用時間最多的一般都是IO操作。

在Map端,主要就是shuffle階段中緩衝區(qū)內容超過閾值後的寫出操作。可以增加io.sort.mb的值來減少寫出次數(shù)。

在Reduce端,在複製Map輸出的時候直接將複製的結果放在內存中同樣能夠提升性能,前提是留下的內存足夠Reduce任務執(zhí)行。所以在Reduce函數(shù)的內存需求很小的情況下,將mapred.inmem.merge.threshold設置爲0,將mapred.job.reduce.input.buffer.percent設置爲1.0或更低能夠讓IO操作更少,提升shuffle性能。


任務執(zhí)行細節(jié)


當JobTracker檢測到所有任務中存在運行時過於緩慢的任務時,就會啓動另一個相同的任務作爲備份。原始任務和備份任務中只要有一個完成,另一個就會被終止。

默認開啓,可通過

mapred.map.tasks.speculative.execution

mapred.reduce.tasks.speculative.execution

屬性值來爲Map和Reduce任務開啓或關閉這個功能。


任務JVM重用

當TaskTracker被分配一個任務時,就會在本地啓動一個新的JVM來運行這個任務。

對於有大量零碎輸入文件的Map任務而言,爲每一個Map任務啓動一個JVM是有改善空間的,即讓後續(xù)的任務重用此JVM,這樣就省下新任務啓動新的JVM的時間。

可通過mapred.job.reuse.jvm.num.tasks配置。默認1,不重用;設置爲大於1的數(shù)來啓動重用;設置爲-1表示共享此JVM的任務數(shù)目不受限制。


跳過壞記錄

當skipping模式啓動時,如果任務連續(xù)失敗兩次,他會將自己正在處理的記錄告訴TaskTracker,然後TaskTracker會重新運行該任務並在運行到先前任務報告的記錄時直接跳過。

skipping模式只能跳過一條錯誤記錄。

mapred.map.max.attemps和mapred.reduce.max.attemps兩個屬性可以增加跳過的錯誤記錄個數(shù)。

此模式默認關閉,可通過SkipBadRecord類單獨爲Map和Reduce任務啓動它。



總結

以上是生活随笔為你收集整理的06 MapReduce工作机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。