日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce V1:Job提交流程之JobTracker端分析

發布時間:2024/1/17 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce V1:Job提交流程之JobTracker端分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。MapReduce V1實現中,主要存在3個主要的分布式進程(角色):JobClient、JobTracker和TaskTracker,我們主要是以這三個角色的實際處理活動為主線,并結合源碼,分析實際處理流程。

上一篇我們分析了Job提交過程中JobClient端的處理流程(詳見文章?MapReduce V1:Job提交流程之JobClient端分析),這里我們繼續詳細分析Job提交在JobTracker端的具體流程。通過閱讀源碼可以發現,這部分的處理邏輯還是有點復雜,經過梳理,更加細化清晰的流程,如下圖所示:

上圖中主要分為兩大部分:一部分是JobClient基于RPC調用提交Job到JobTracker后,在JobTracker端觸發TaskScheduler所注冊的一系列Listener進行Job信息初始化;另一部分是JobTracker端監聽Job隊列的線程,監聽到Job狀態發生變更觸發一系列Listener更新狀態。我們從這兩個方面展開分析:

JobTracker接收Job提交

JobTracker接收到JobClient提交的Job,在JobTracker端具體執行流程,描述如下:

  • JobClient基于JobSubmissionProtocol協議遠程調用JobTracker的submitJob方法提交Job
  • JobTracker接收提交的Job,創建一個JobInProgress對象,將其放入內部維護的Map<JobID, JobInProgress> jobs隊列中
  • 觸發JobQueueJobInProgressListener
  • 執行JobQueueJobInProgressListener的jobAdded方法,創建JobSchedulingInfo對象,并放入到JobQueueJobInProgressListener內部維護的Map<JobSchedulingInfo, JobInProgress> jobQueue隊列中
  • 觸發EagerTaskInitializationListener
  • 執行EagerTaskInitializationListener的jobAdded方法,將JobInProgress對象加入到List<JobInProgress> jobInitQueue隊列中
  • 在JobTracker端使用TaskScheduler進行Job/Task的調度,可以通過mapred.jobtracker.taskScheduler配置所使用的TaskScheduler實現類,默認使用的實現類JobQueueTaskScheduler,如下所示:

    1// Create the scheduler
    2 Class<??extends?TaskScheduler> schedulerClass
    3 ??= conf.getClass("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class, TaskScheduler.class);
    4taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);

    如果想要使用其他的TaskScheduler實現,可以在mapred-site.xml中配置mapred.jobtracker.taskScheduler的屬性值,覆蓋默認的調度策略即可。
    在JobQueueTaskScheduler實現類中,注冊了2個JobInProgressListener,JobInProgressListener是用來監聽由JobClient端提交后在JobTracker端Job(在JobTracker端維護的JobInProgress)生命周期變化,并觸發相應事件(jobAdded/jobUpdated/jobRemoved)的,如下所示:

    01 protected?JobQueueJobInProgressListener jobQueueJobInProgressListener;
    02 protected?EagerTaskInitializationListener eagerTaskInitializationListener;
    03 private?float?padFraction;
    04?
    05 public?JobQueueTaskScheduler() {
    06 ??this.jobQueueJobInProgressListener =?new?JobQueueJobInProgressListener();
    07}
    08?
    09@Override
    10 public?synchronized?void?start()?throws?IOException {
    11 ??super.start();
    12 ??taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);?// taskTrackerManager是JobTracker的引用
    13 ??eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
    14 ??eagerTaskInitializationListener.start();
    15 ??taskTrackerManager.addJobInProgressListener(eagerTaskInitializationListener);
    16}

    JobTracker維護一個List<JobInProgressListener> jobInProgressListeners隊列,在TaskScheduler(默認JobQueueTaskScheduler )啟動的時候向JobTracker注冊。在JobClient提交Job后,在JobTracker段創建一個對應的JobInProgress對象,并將其放入到jobs隊列后,觸發這一組JobInProgressListener的jobAdded方法。

    JobTracker管理Job提交

    JobTracker接收到提交的Job后,需要對提交的Job進行初始化操作,具體流程如下所示:

  • EagerTaskInitializationListener.JobInitManager線程監控EagerTaskInitializationListener內部的List<JobInProgress> jobInitQueue隊列
  • 加載一個EagerTaskInitializationListener.InitJob線程去初始化Job
  • 在EagerTaskInitializationListener.InitJob線程中,調用JobTracker的initJob方法初始化Job
  • 調用JobInProgress的initTasks方法初始化該Job對應的Tasks
  • 從HDFS讀取該Job對應的splits信息,創建MapTask和ReduceTask(在JobTracker端維護的Task實際上是TaskInProgress)
  • Job狀態變更,觸發JobQueueJobInProgressListener
  • 如果Job優先級(Priority)/開始時間發生變更,則對Map<JobSchedulingInfo, JobInProgress> jobQueue隊列進行重新排序;如果Job完成,則將Job從jobQueue隊列中移除
  • Job狀態變更,觸發EagerTaskInitializationListener
  • 如果Job優先級(Priority)/開始時間發生變更,則對List<JobInProgress> jobInitQueue隊列進行重新排序
  • 下面,我們分析的Job初始化,以及Task初始化,都是在JobTracker端執行的工作,主要是為了管理Job和Task的運行,創建了對應的數據結構,Job對應JobInProgress,Task對應TaskInProgress。我們分析說明如下:

    • Job初始化

    JobTracker接收到JobClient提交的Job,在放到JobTracker的Map<JobID, JobInProgress> jobs隊列后,觸發2個JobInProgressListener執行jobAdded方法,首先會放到EagerTaskInitializationListener的List<JobInProgress> jobInitQueue隊列中。在EagerTaskInitializationListener內部,有一個內部線程類JobInitManager在監控jobInitQueue隊列,如果有新的JobInProgress對象加入到隊列,則取出并啟動一個新的初始化線程InitJob去初始化該Job,代碼如下所示:

    01 class?JobInitManager?implements?Runnable {
    02?
    03 ??public?void?run() {
    04 ????JobInProgress job =?null;
    05 ????while?(true) {
    06 ??????try?{
    07 ????????synchronized?(jobInitQueue) {
    08 ??????????while?(jobInitQueue.isEmpty()) {
    09 ????????????jobInitQueue.wait();
    10 ??????????}
    11 ??????????job = jobInitQueue.remove(0);?// 取出JobInProgress
    12 ????????}
    13 ????????threadPool.execute(new?InitJob(job));?// 創建一個InitJob線程去初始化該JobInProgress
    14 ??????}?catch?(InterruptedException t) {
    15 ????????LOG.info("JobInitManagerThread interrupted.");
    16 ????????break;
    17 ??????}
    18 ????}
    19 ????LOG.info("Shutting down thread pool");
    20 ????threadPool.shutdownNow();
    21 ??}
    22}

    然后,在InitJob線程中,調用JobTracker的initJob方法初始化Job,如下所示:

    01 class?InitJob?implements?Runnable {
    02?
    03 ??private?JobInProgress job;
    04 ??
    05 ??public?InitJob(JobInProgress job) {
    06 ????this.job = job;
    07 ??}
    08 ??
    09 ??public?void?run() {
    10 ????ttm.initJob(job);?// TaskTrackerManager ttm,調用JobTracker的initJob方法初始化
    11 ??}
    12}

    JobTracker中的initJob方法的主要邏輯,如下所示:

    01JobStatus prevStatus = (JobStatus)job.getStatus().clone();
    02 LOG.info("Initializing "?+ job.getJobID());
    03 job.initTasks();?// 調用JobInProgress的initTasks方法初始化Task
    04// Inform the listeners if the job state has changed
    05// Note : that the job will be in PREP state.
    06JobStatus newStatus = (JobStatus)job.getStatus().clone();
    07 if?(prevStatus.getRunState() != newStatus.getRunState()) {
    08 ??JobStatusChangeEvent event =
    09 ????new?JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
    10 ????????newStatus);
    11 ??synchronized?(JobTracker.this) {
    12 ????updateJobInProgressListeners(event);?// 更新Job相關隊列的狀態
    13 ??}
    14}

    實際上,在JobTracker中的initJob方法中最核心的邏輯,就是初始化組成該Job的MapTask和ReduceTask,它們在JobTracker端都抽象為TaskInProgress。

    • 初始化Task

    在JobClient提交Job的過程中,已經將該Job所對應的資源復制到HDFS,在JobTracker端需要讀取這些信息來創建MapTask和ReduceTask。我們回顧一下:默認情況下,split和對應的元數據存儲路徑分別為/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.split和/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.splitmetainfo,在創建MapTask和ReduceTask只需要split的元數據信息即可,我們看一下job.splitmetainfo文件存儲的數據格式如下圖所示:

    上圖中,META_SPLIT_FILE_HEADER的值為META-SPL,版本version的值為1,numSplits的值根據實際Job輸入split大小計算的到,SplitMetaInfo包括的信息為split所存放的節點位置個數、所有的節點位置信息、split在文件中的起始偏移量、split數據的長度。有了這些描述信息,JobTracker就可以知道一個Job需要創建幾個MapTask,實現代碼如下所示:

    1 ????TaskSplitMetaInfo[] splits = createSplits(jobId);
    2...
    3 ????numMapTasks = splits.length;
    4...
    5 ????maps =?new?TaskInProgress[numMapTasks];?// MapTask在JobTracker的表示為TaskInProgress
    6 ????for(int?i=0; i < numMapTasks; ++i) {
    7 ??????inputLength += splits[i].getInputDataLength();
    8 ??????maps[i] =?new?TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf,?this, i, numSlotsPerMap);
    9 ????}

    而ReduceTask的個數,根據用戶在配置Job時指定的Reduce的個數,創建ReduceTask的代碼,如下所示:

    1//
    2// Create reduce tasks
    3//
    4 this.reduces =?new?TaskInProgress[numReduceTasks];
    5 for?(int?i =?0; i < numReduceTasks; i++) {
    6 ??reduces[i] =?new?TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf,this, numSlotsPerReduce);
    7 ??nonRunningReduces.add(reduces[i]);
    8}

    除了創建MapTask和ReduceTask之外,還會創建setup和cleanup task,每個Job的MapTask和ReduceTask各對應一個,即共計2個setup task和2個cleanup task。setup task用來初始化MapTask/ReduceTask,而cleanup task用來清理MapTask/ReduceTask。創建setup和cleanup task,代碼如下所示:

    01// create cleanup two cleanup tips, one map and one reduce.
    02 cleanup =?new?TaskInProgress[2];?// cleanup task,map對應一個,reduce對應一個
    03?
    04// cleanup map tip. This map doesn't use any splits. Just assign an empty split.
    05TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
    06 cleanup[0] =?new?TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,?this, numMapTasks,?1);
    07 cleanup[0].setJobCleanupTask();
    08?
    09// cleanup reduce tip.
    10 cleanup[1] =?new?TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf,?this,?1);
    11 cleanup[1].setJobCleanupTask();
    12?
    13// create two setup tips, one map and one reduce.
    14 setup =?new?TaskInProgress[2];?// setup task,map對應一個,reduce對應一個
    15?
    16// setup map tip. This map doesn't use any split. Just assign an empty split.
    17 setup[0] =?new?TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,?this, numMapTasks +?1,?1);
    18 setup[0].setJobSetupTask();
    19?
    20// setup reduce tip.
    21 setup[1] =?new?TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks +?1, jobtracker, conf,?this,?1);
    22 setup[1].setJobSetupTask();

    一個Job在JobInProgress中進行初始化Task,這里初始化Task使得該Job滿足被調度的要求,比如,知道一個Job有哪些Task組成,每個Task對應哪個split等等。在初始化完成后,置一個Task初始化完成標志,如下所示:

    01 synchronized(jobInitKillStatus){
    02 ??jobInitKillStatus.initDone =?true;
    03?
    04 ??// set this before the throw to make sure cleanup works properly
    05 ??tasksInited =?true;?// 置Task初始化完成標志
    06?
    07 ??if(jobInitKillStatus.killed) {
    08 ????throw?new?KillInterruptedException("Job "?+ jobId +?" killed in init");
    09 ??}
    10}

    在置tasksInited = true;后,該JobInProgress就可以被TaskScheduler進行調度了,調度時,是以Task(MapTask/ReduceTask)為單位分派給TaskTracker。而對于哪些TaskTracker可以運行Task,需要通過TaskTracker向JobTracker周期性發送的心跳得到TaskTracker的健康狀況信息、節點資源信息等來確定,是否該TaskTracker可以運行一個Job的一個或多個Task。

    總結

    以上是生活随笔為你收集整理的MapReduce V1:Job提交流程之JobTracker端分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。