我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。MapReduce V1實現中,主要存在3個主要的分布式進程(角色):JobClient、JobTracker和TaskTracker,我們主要是以這三個角色的實際處理活動為主線,并結合源碼,分析實際處理流程。上一篇我們分析了Job提交過程中JobClient端的處理流程(詳見文章?MapReduce V1:Job提交流程之JobClient端分析),這里我們繼續詳細分析Job提交在JobTracker端的具體流程。通過閱讀源碼可以發現,這部分的處理邏輯還是有點復雜,經過梳理,更加細化清晰的流程,如下圖所示:
上圖中主要分為兩大部分:一部分是JobClient基于RPC調用提交Job到JobTracker后,在JobTracker端觸發TaskScheduler所注冊的一系列Listener進行Job信息初始化;另一部分是JobTracker端監聽Job隊列的線程,監聽到Job狀態發生變更觸發一系列Listener更新狀態。我們從這兩個方面展開分析:
JobTracker接收Job提交
JobTracker接收到JobClient提交的Job,在JobTracker端具體執行流程,描述如下:
JobClient基于JobSubmissionProtocol協議遠程調用JobTracker的submitJob方法提交JobJobTracker接收提交的Job,創建一個JobInProgress對象,將其放入內部維護的Map<JobID, JobInProgress> jobs隊列中觸發JobQueueJobInProgressListener執行JobQueueJobInProgressListener的jobAdded方法,創建JobSchedulingInfo對象,并放入到JobQueueJobInProgressListener內部維護的Map<JobSchedulingInfo, JobInProgress> jobQueue隊列中觸發EagerTaskInitializationListener執行EagerTaskInitializationListener的jobAdded方法,將JobInProgress對象加入到List<JobInProgress> jobInitQueue隊列中在JobTracker端使用TaskScheduler進行Job/Task的調度,可以通過mapred.jobtracker.taskScheduler配置所使用的TaskScheduler實現類,默認使用的實現類JobQueueTaskScheduler,如下所示:
| 2 | Class<??extends?TaskScheduler> schedulerClass |
| 3 | ??= conf.getClass("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class, TaskScheduler.class); |
| 4 | taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf); |
如果想要使用其他的TaskScheduler實現,可以在mapred-site.xml中配置mapred.jobtracker.taskScheduler的屬性值,覆蓋默認的調度策略即可。
在JobQueueTaskScheduler實現類中,注冊了2個JobInProgressListener,JobInProgressListener是用來監聽由JobClient端提交后在JobTracker端Job(在JobTracker端維護的JobInProgress)生命周期變化,并觸發相應事件(jobAdded/jobUpdated/jobRemoved)的,如下所示:
| 01 | protected?JobQueueJobInProgressListener jobQueueJobInProgressListener; |
| 02 | protected?EagerTaskInitializationListener eagerTaskInitializationListener; |
| 03 | private?float?padFraction; |
| 05 | public?JobQueueTaskScheduler() { |
| 06 | ??this.jobQueueJobInProgressListener =?new?JobQueueJobInProgressListener(); |
| 10 | public?synchronized?void?start()?throws?IOException { |
| 12 | ??taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);?// taskTrackerManager是JobTracker的引用 |
| 13 | ??eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); |
| 14 | ??eagerTaskInitializationListener.start(); |
| 15 | ??taskTrackerManager.addJobInProgressListener(eagerTaskInitializationListener); |
JobTracker維護一個List<JobInProgressListener> jobInProgressListeners隊列,在TaskScheduler(默認JobQueueTaskScheduler )啟動的時候向JobTracker注冊。在JobClient提交Job后,在JobTracker段創建一個對應的JobInProgress對象,并將其放入到jobs隊列后,觸發這一組JobInProgressListener的jobAdded方法。
JobTracker管理Job提交
JobTracker接收到提交的Job后,需要對提交的Job進行初始化操作,具體流程如下所示:
EagerTaskInitializationListener.JobInitManager線程監控EagerTaskInitializationListener內部的List<JobInProgress> jobInitQueue隊列加載一個EagerTaskInitializationListener.InitJob線程去初始化Job在EagerTaskInitializationListener.InitJob線程中,調用JobTracker的initJob方法初始化Job調用JobInProgress的initTasks方法初始化該Job對應的Tasks從HDFS讀取該Job對應的splits信息,創建MapTask和ReduceTask(在JobTracker端維護的Task實際上是TaskInProgress)Job狀態變更,觸發JobQueueJobInProgressListener如果Job優先級(Priority)/開始時間發生變更,則對Map<JobSchedulingInfo, JobInProgress> jobQueue隊列進行重新排序;如果Job完成,則將Job從jobQueue隊列中移除Job狀態變更,觸發EagerTaskInitializationListener如果Job優先級(Priority)/開始時間發生變更,則對List<JobInProgress> jobInitQueue隊列進行重新排序下面,我們分析的Job初始化,以及Task初始化,都是在JobTracker端執行的工作,主要是為了管理Job和Task的運行,創建了對應的數據結構,Job對應JobInProgress,Task對應TaskInProgress。我們分析說明如下:
JobTracker接收到JobClient提交的Job,在放到JobTracker的Map<JobID, JobInProgress> jobs隊列后,觸發2個JobInProgressListener執行jobAdded方法,首先會放到EagerTaskInitializationListener的List<JobInProgress> jobInitQueue隊列中。在EagerTaskInitializationListener內部,有一個內部線程類JobInitManager在監控jobInitQueue隊列,如果有新的JobInProgress對象加入到隊列,則取出并啟動一個新的初始化線程InitJob去初始化該Job,代碼如下所示:
| 01 | class?JobInitManager?implements?Runnable { |
| 04 | ????JobInProgress job =?null; |
| 07 | ????????synchronized?(jobInitQueue) { |
| 08 | ??????????while?(jobInitQueue.isEmpty()) { |
| 09 | ????????????jobInitQueue.wait(); |
| 11 | ??????????job = jobInitQueue.remove(0);?// 取出JobInProgress |
| 13 | ????????threadPool.execute(new?InitJob(job));?// 創建一個InitJob線程去初始化該JobInProgress |
| 14 | ??????}?catch?(InterruptedException t) { |
| 15 | ????????LOG.info("JobInitManagerThread interrupted."); |
| 19 | ????LOG.info("Shutting down thread pool"); |
| 20 | ????threadPool.shutdownNow(); |
然后,在InitJob線程中,調用JobTracker的initJob方法初始化Job,如下所示:
| 01 | class?InitJob?implements?Runnable { |
| 03 | ??private?JobInProgress job; |
| 05 | ??public?InitJob(JobInProgress job) { |
| 10 | ????ttm.initJob(job);?// TaskTrackerManager ttm,調用JobTracker的initJob方法初始化 |
JobTracker中的initJob方法的主要邏輯,如下所示:
| 01 | JobStatus prevStatus = (JobStatus)job.getStatus().clone(); |
| 02 | LOG.info("Initializing "?+ job.getJobID()); |
| 03 | job.initTasks();?// 調用JobInProgress的initTasks方法初始化Task |
| 04 | // Inform the listeners if the job state has changed |
| 05 | // Note : that the job will be in PREP state. |
| 06 | JobStatus newStatus = (JobStatus)job.getStatus().clone(); |
| 07 | if?(prevStatus.getRunState() != newStatus.getRunState()) { |
| 08 | ??JobStatusChangeEvent event = |
| 09 | ????new?JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, |
| 11 | ??synchronized?(JobTracker.this) { |
| 12 | ????updateJobInProgressListeners(event);?// 更新Job相關隊列的狀態 |
實際上,在JobTracker中的initJob方法中最核心的邏輯,就是初始化組成該Job的MapTask和ReduceTask,它們在JobTracker端都抽象為TaskInProgress。
在JobClient提交Job的過程中,已經將該Job所對應的資源復制到HDFS,在JobTracker端需要讀取這些信息來創建MapTask和ReduceTask。我們回顧一下:默認情況下,split和對應的元數據存儲路徑分別為/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.split和/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.splitmetainfo,在創建MapTask和ReduceTask只需要split的元數據信息即可,我們看一下job.splitmetainfo文件存儲的數據格式如下圖所示:
上圖中,META_SPLIT_FILE_HEADER的值為META-SPL,版本version的值為1,numSplits的值根據實際Job輸入split大小計算的到,SplitMetaInfo包括的信息為split所存放的節點位置個數、所有的節點位置信息、split在文件中的起始偏移量、split數據的長度。有了這些描述信息,JobTracker就可以知道一個Job需要創建幾個MapTask,實現代碼如下所示:
| 1 | ????TaskSplitMetaInfo[] splits = createSplits(jobId); |
| 3 | ????numMapTasks = splits.length; |
| 5 | ????maps =?new?TaskInProgress[numMapTasks];?// MapTask在JobTracker的表示為TaskInProgress |
| 6 | ????for(int?i=0; i < numMapTasks; ++i) { |
| 7 | ??????inputLength += splits[i].getInputDataLength(); |
| 8 | ??????maps[i] =?new?TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf,?this, i, numSlotsPerMap); |
而ReduceTask的個數,根據用戶在配置Job時指定的Reduce的個數,創建ReduceTask的代碼,如下所示:
| 4 | this.reduces =?new?TaskInProgress[numReduceTasks]; |
| 5 | for?(int?i =?0; i < numReduceTasks; i++) { |
| 6 | ??reduces[i] =?new?TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf,this, numSlotsPerReduce); |
| 7 | ??nonRunningReduces.add(reduces[i]); |
除了創建MapTask和ReduceTask之外,還會創建setup和cleanup task,每個Job的MapTask和ReduceTask各對應一個,即共計2個setup task和2個cleanup task。setup task用來初始化MapTask/ReduceTask,而cleanup task用來清理MapTask/ReduceTask。創建setup和cleanup task,代碼如下所示:
| 01 | // create cleanup two cleanup tips, one map and one reduce. |
| 02 | cleanup =?new?TaskInProgress[2];?// cleanup task,map對應一個,reduce對應一個 |
| 04 | // cleanup map tip. This map doesn't use any splits. Just assign an empty split. |
| 05 | TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; |
| 06 | cleanup[0] =?new?TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,?this, numMapTasks,?1); |
| 07 | cleanup[0].setJobCleanupTask(); |
| 10 | cleanup[1] =?new?TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf,?this,?1); |
| 11 | cleanup[1].setJobCleanupTask(); |
| 13 | // create two setup tips, one map and one reduce. |
| 14 | setup =?new?TaskInProgress[2];?// setup task,map對應一個,reduce對應一個 |
| 16 | // setup map tip. This map doesn't use any split. Just assign an empty split. |
| 17 | setup[0] =?new?TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,?this, numMapTasks +?1,?1); |
| 18 | setup[0].setJobSetupTask(); |
| 21 | setup[1] =?new?TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks +?1, jobtracker, conf,?this,?1); |
| 22 | setup[1].setJobSetupTask(); |
一個Job在JobInProgress中進行初始化Task,這里初始化Task使得該Job滿足被調度的要求,比如,知道一個Job有哪些Task組成,每個Task對應哪個split等等。在初始化完成后,置一個Task初始化完成標志,如下所示:
| 01 | synchronized(jobInitKillStatus){ |
| 02 | ??jobInitKillStatus.initDone =?true; |
| 04 | ??// set this before the throw to make sure cleanup works properly |
| 05 | ??tasksInited =?true;?// 置Task初始化完成標志 |
| 07 | ??if(jobInitKillStatus.killed) { |
| 08 | ????throw?new?KillInterruptedException("Job "?+ jobId +?" killed in init"); |
在置tasksInited = true;后,該JobInProgress就可以被TaskScheduler進行調度了,調度時,是以Task(MapTask/ReduceTask)為單位分派給TaskTracker。而對于哪些TaskTracker可以運行Task,需要通過TaskTracker向JobTracker周期性發送的心跳得到TaskTracker的健康狀況信息、節點資源信息等來確定,是否該TaskTracker可以運行一個Job的一個或多個Task。
總結
以上是生活随笔為你收集整理的MapReduce V1:Job提交流程之JobTracker端分析的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。