MapReduce多用户任务调度器——容量调度器(Capacity Scheduler)原理和源码研究
前言:為了研究需要,將Capacity Scheduler和Fair Scheduler的原理和代碼進行學習,用兩篇文章作為記錄。如有理解錯誤之處,歡迎批評指正。
容量調度器(Capacity Scheduler)是Yahoo公司開發的多用戶調度器。多用戶調度器的使用場景很多,根據資料1的說法,Hadoop集群的用戶量越來越大,不同用戶提交的應用程序具有不同的服務質量要求(QoS):
1. 批處理作業:耗時較長,對完成時間沒有嚴格要求。如數據挖掘、機器學習等應用。
2. 交互式作業:期望及時返回結果。如Hive等應用。
3. 生產性作業:要求一定量的的資源保證。如統計值計算、垃圾數據分析等。
傳統的FIFO調度器不能滿足應用對響應時間和資源的多樣化要求,多用戶多隊列調度器應運而生。容量調度器即是其中被廣泛應用的一種。
一、基本思想
容量調度器以隊列為單位劃分資源,每個隊列都有資源使用的下限和上限。每個用戶也可以設定資源使用上限。一個隊列的剩余資源可以共享給另一個隊列,其他隊列使用后還可以歸還。管理員可以約束單個隊列、用戶或作業的資源使用。支持資源密集型作業,可以給某些作業分配多個slot(這是比較特殊的一點)。支持作業優先級,但不支持資源搶占。
這里明確一下用戶、隊列和作業之間的關系。Hadoop以隊列為單位管理資源,每個隊列分配到一定的資源,用戶只能向一個或幾個隊列提交作業。隊列管理體現為兩方面:1. 用戶權限管理:Hadoop用戶管理模塊建立在操作系統用戶和用戶組之間的映射之上,允許一個操作系統用戶或者用戶組對應一個或者多個隊列。同時可以配置每個隊列的管理員用戶。隊列信息配置在mapred-site.xml文件中,包括隊列的名稱,是否啟用權限管理功能等信息,且不支持動態加載。隊列權限選項配置在mapred-queue-acls.xml文件中,可以配置某個用戶或用戶組在某個隊列中的某種權限。權限包括作業提交權限和作業管理權限。2. 系統資源管理:管理員可以配置每個隊列和每個用戶的可用資源量信息,為調度器提供調度依據。這些信息配置在調度器自己的配置文件(如Capacity-Scheduler.xml)中。關于每個配置文件的常見內容見附錄。
二、整體架構
總體來說,容量調度器的工作流程分5個步驟:
1. 用戶提交作業到JobTracker。
2. JobTracker將提交的作業交給Capacity Scheduler的監聽器JobQueuesManager,并將作業加入等待隊列,由JobInitializationPoller線程初始化。
3. TaskTracker通過心跳信息要求JobTracker為其分配任務。
4. JobTracker調用Capacity Scheduler的assignTasks方法為其分配任務。
5. JobTracker將分配到的任務返回給TaskTracker。
接下,我們結合源代碼依次研究上述過程。
三、實現細節
1. 調度器的啟動
回憶一下,前面談到調度器啟動是由JobTracker調用調度器的start方法實現的,首先來看start方法:
?
// initialize our queues from the config settingsif (null == schedConf) {schedConf = new CapacitySchedulerConf();}首先生成配置對象,容量調度器定義了自己的配置對象,構造時會加載調度器自己的配置文件作為資源,并初始化一些默認的配置選項:
?
?
public CapacitySchedulerConf() {rmConf = new Configuration(false);rmConf.addResource(SCHEDULER_CONF_FILE);initializeDefaults();}private void initializeDefaults() {defaultUlimitMinimum = rmConf.getInt("mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);defaultUserLimitFactor = rmConf.getFloat("mapred.capacity-scheduler.default-user-limit-factor", 1.0f);defaultSupportPriority = rmConf.getBoolean("mapred.capacity-scheduler.default-supports-priority", false);defaultMaxActiveTasksPerQueueToInitialize = rmConf.getInt("mapred.capacity-scheduler.default-maximum-active-tasks-per-queue", 200000);defaultMaxActiveTasksPerUserToInitialize = rmConf.getInt("mapred.capacity-scheduler.default-maximum-active-tasks-per-user", 100000);defaultInitToAcceptJobsFactor =rmConf.getInt("mapred.capacity-scheduler.default-init-accept-jobs-factor", 10);}例如,第一個默認值表示每個用戶的最低資源保障,默認為100%。第三個默認值表示是否考慮作業優先級,默認是不考慮。其他配置可以參考資料1中的講解。接下來,初始化隊列信息,隊列信息由QueueManager對象獲得,該對象的構造過程如下:
?
?
public QueueManager(Configuration conf) {checkDeprecation(conf);conf.addResource(QUEUE_ACLS_FILE_NAME);// Get configured ACLs and state for each queueaclsEnabled = conf.getBoolean("mapred.acls.enabled", false);queues.putAll(parseQueues(conf)); }synchronized private Map<String, Queue> parseQueues(Configuration conf) {Map<String, Queue> queues = new HashMap<String, Queue>();// First get the queue namesString[] queueNameValues = conf.getStrings("mapred.queue.names",new String[]{JobConf.DEFAULT_QUEUE_NAME});for (String name : queueNameValues) {Map queueACLs = getQueueAcls(name, conf);if (queueACLs == null) {LOG.error("The queue, " + name + " does not have a configured ACL list");}queues.put(name, new Queue(name, getQueueAcls(name, conf),getQueueState(name, conf), QueueMetrics.create(name, conf)));}return queues;}首先,獲取用戶權限配置文件mapred-queue-acls.xml。然后通過mapred-site.xml中的配置解析并生成隊列的列表queues。解析的過程是,先獲取每個隊列的名字,再通過名字獲取隊列的權限配置,最后依據這些信息以及隊列狀態和隊列度量對象構造一個隊列并加入結果列表。如上面代碼。在初始化隊列之前還有構造出每個隊列對應的CapacitySchedulerQueue對象:
?
?
Map<String, CapacitySchedulerQueue> parseQueues(Collection<String> queueNames, CapacitySchedulerConf schedConf) throws IOException {Map<String, CapacitySchedulerQueue> queueInfoMap = new HashMap<String, CapacitySchedulerQueue>();// Sanity check: there should be at least one queue. if (0 == queueNames.size()) {throw new IllegalStateException("System has no queue configured");}float totalCapacityPercent = 0.0f;for (String queueName: queueNames) {float capacityPercent = schedConf.getCapacity(queueName);if (capacityPercent == -1.0) {throw new IOException("Queue '" + queueName + "' doesn't have configured capacity!");} totalCapacityPercent += capacityPercent;// create our Queue and add to our hashmapCapacitySchedulerQueue queue = new CapacitySchedulerQueue(queueName, schedConf);queueInfoMap.put(queueName, queue);}if (Math.floor(totalCapacityPercent) != 100) {throw new IllegalArgumentException("Sum of queue capacities not 100% at "+ totalCapacityPercent);} return queueInfoMap;}容量調度器隊列對象被裝入一個以隊列名為鍵的map中返回并用于初始化。獲取隊列后要進行初始化,由函數initialize完成:
?
?
void initialize(QueueManager queueManager,Map<String, CapacitySchedulerQueue> newQueues,Configuration conf, CapacitySchedulerConf schedConf) {// Memory related configsinitializeMemoryRelatedConf(conf);// Setup queuesfor (Map.Entry<String, CapacitySchedulerQueue> e : newQueues.entrySet()) {String newQueueName = e.getKey();CapacitySchedulerQueue newQueue = e.getValue();CapacitySchedulerQueue currentQueue = queueInfoMap.get(newQueueName);if (currentQueue != null) {currentQueue.initializeQueue(newQueue);LOG.info("Updated queue configs for " + newQueueName);} else {queueInfoMap.put(newQueueName, newQueue);LOG.info("Added new queue: " + newQueueName);}}// Set SchedulingDisplayInfofor (String queueName : queueInfoMap.keySet()) {SchedulingDisplayInfo schedulingInfo = new SchedulingDisplayInfo(queueName, this);queueManager.setSchedulerInfo(queueName, schedulingInfo);}// Inform the queue manager jobQueuesManager.setQueues(queueInfoMap);// let our mgr objects know about the queuesmapScheduler.initialize(queueInfoMap);reduceScheduler.initialize(queueInfoMap);// scheduling tunablesmaxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();maxTasksToAssignAfterOffSwitch = schedConf.getMaxTasksToAssignAfterOffSwitch();}具體過程如下:首先根據配置對象初始化跟內存相關的一些變量;然后檢查某個隊列是否在queueInfoMap數據結構中,若在,就更新隊列信息,若不在,則加入其中,該數據結構提供了一個快速通過隊列名訪問隊列的途徑;接下來設置每個隊列的調度信息用于展示或日志;然后將隊列map交給監聽器對象JobQueuesMananger;接著,將隊列信息再交給map和reduce調度器對象,每個調度器對象維護了可以獲取任務的隊列列表,用于調度時的隊列選擇;最后設置批量任務分配的最大數量。
?
上述過程中,用于不同任務調度的mapScheduler和reduceScheduler值得進一步研究。隊列會被加入到map或reduce調度器的優先級隊列中:
?
queuesForAssigningTasks.clear();queuesForAssigningTasks.addAll(queues.values());Collections.sort(queuesForAssigningTasks, queueComparator);隊列的優先級由queueComparator定義,map和reduce的比較器實現基本相同,只是任務類型不同:
?
?
public int compare(CapacitySchedulerQueue q1, CapacitySchedulerQueue q2) {// look at how much capacity they've filled. Treat a queue with// capacity=0 equivalent to a queue running at capacityTaskType taskType = getTaskType();double r1 = (0 == q1.getCapacity(taskType))? 1.0f:(double)q1.getNumSlotsOccupied(taskType)/(double) q1.getCapacity(taskType);double r2 = (0 == q2.getCapacity(taskType))? 1.0f:(double)q2.getNumSlotsOccupied(taskType)/(double) q2.getCapacity(taskType);if (r1<r2) return -1;else if (r1>r2) return 1;else return 0;}上述compare方法的實現表明,隊列的資源使用率越高,在隊列列表中的順序越靠后,優先級越低。也就是說,Capacity Scheduler總是選擇資源利用率最低的隊列。至此,隊列初始化分析完畢。
?
接下來,調度器將監聽器對象注冊到JobTracker:
?
// listen to job changestaskTrackerManager.addJobInProgressListener(jobQueuesManager);然后啟動初始化線程:
?
?
//Start thread for initializationif (initializationPoller == null) {this.initializationPoller = new JobInitializationPoller(jobQueuesManager, schedConf, queueNames, taskTrackerManager);}initializationPoller.init(queueNames.size(), schedConf);initializationPoller.setDaemon(true);initializationPoller.start();初始化線程initializationPoller是個后臺線程。init方法為每個隊列指定一個初始化線程,線程總數總是小于或等于隊列的數量。然后啟動每個初始化線程。
最后設置用于顯示調度器信息的Servlet:
?
?
if (taskTrackerManager instanceof JobTracker) {JobTracker jobTracker = (JobTracker) taskTrackerManager;HttpServer infoServer = jobTracker.infoServer;infoServer.setAttribute("scheduler", this);infoServer.addServlet("scheduler", "/scheduler",CapacitySchedulerServlet.class);}至此,調度器啟動完畢。
2. 作業初始化
由于初始化的作業不能得到調度會占用過多內存,容量調度器通過兩種策略初始化作業:1. 優先初始化最可能被調度器調度的作業;2. 限制用戶初始化作業數目。詳細過程如下:作業被提交到JobTracker后,調度器的監聽器調用jobAdded方法:
?
// add job to the right queueCapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());這條語句將作業加入對應的隊列中。接下來調用隊列的addWaitingJob方法:
?
?
synchronized void addWaitingJob(JobInProgress job) throws IOException {JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job);String user = job.getProfile().getUser();// Check acceptance limitscheckJobSubmissionLimits(job, user);waitingJobs.put(jobSchedInfo, job);// Update user statsUserInfo userInfo = users.get(user);if (userInfo == null) {userInfo = new UserInfo(comparator);users.put(user, userInfo);}userInfo.jobAdded(jobSchedInfo, job);}在該方法中,首先生成調度信息對象,此對象與默認的FIFO調度器的調度信息對象一樣。然后檢查三個約束:
?
1. 作業的任務數是否超過每個用戶最大任務數
?
if (job.desiredTasks() > maxActiveTasksPerUser) {throw new IOException("Job '" + job.getJobID() + "' from user '" + user +"' rejected since it has " + job.desiredTasks() + " tasks which" +" exceeds the limit of " + maxActiveTasksPerUser + " tasks per-user which can be initialized for queue '" + queueName + "'");}2. 隊列中等待初始化、已經初始化的作業數目和在運行的作業數不能超過可接受值
?
?
// Across all jobs in queueint queueWaitingJobs = getNumWaitingJobs();int queueInitializingJobs = getNumInitializingJobs();int queueRunningJobs = getNumRunningJobs();if ((queueWaitingJobs + queueInitializingJobs + queueRunningJobs) >= maxJobsToAccept) {throw new IOException("Job '" + job.getJobID() + "' from user '" + user + "' rejected since queue '" + queueName + "' already has " + queueWaitingJobs + " waiting jobs, " + queueInitializingJobs + " initializing jobs and " + queueRunningJobs + " running jobs - Exceeds limit of " +maxJobsToAccept + " jobs to accept");}3. 用戶等待初始化、已經初始化和在運行的作業數不能超過可接受值
?
?
// Across all jobs of the userint userWaitingJobs = getNumWaitingJobsByUser(user);int userInitializingJobs = getNumInitializingJobsByUser(user);int userRunningJobs = getNumRunningJobsByUser(user);if ((userWaitingJobs + userInitializingJobs + userRunningJobs) >= maxJobsPerUserToAccept) {throw new IOException("Job '" + job.getJobID() + "' rejected since user '" + user + "' already has " + userWaitingJobs + " waiting jobs, " +userInitializingJobs + " initializing jobs and " +userRunningJobs + " running jobs - " +" Exceeds limit of " + maxJobsPerUserToAccept + " jobs to accept" +" in queue '" + queueName + "' per user");}
若有一個約束不滿足,則拋出異常。否則將作業加入等待初始化隊列。最后調用調度器的jobAdded方法通知調度器:
?
?
// called when a job is addedsynchronized void jobAdded(JobInProgress job) throws IOException {CapacitySchedulerQueue queue = queueInfoMap.get(job.getProfile().getQueueName());// Inform the queuequeue.jobAdded(job);// setup scheduler specific job informationpreInitializeJob(job);}首先獲取隊列,然后告訴隊列有作業加入,并將隊列中提交該作業的用戶的作業數更新。最后依據配置計算每個任務需要的slot數目(容量調度器支持大內存作業)。
?
作業初始化線程的入口在JobInitializationPoller(以下簡稱poller)的run方法。
?
public void run() {while (running) {try {cleanUpInitializedJobsList();selectJobsToInitialize();if (!this.isInterrupted()) {Thread.sleep(sleepInterval);}} catch (InterruptedException e) {LOG.error("Job Initialization poller interrupted"+ StringUtils.stringifyException(e));}}}?
在該方法中,首先從initializedJobs數據結構中清除一些作業,這些作業是正在運行且獲得調度的作業或者是運行完成的作業。接著,調用selectJobsToInitialize方法來選擇等待初始化的作業。具體過程如下,對于每個隊列,首先選擇該隊列中處于waitingJobs列表中的作業:
?
ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);選擇的原則是:一看該作業是否已經初始化;若不是,二檢查隊列中作業總數(正在運行和正在初始化)和允許的活動任務數是否超過上限;若沒有,檢查提交該作業的用戶是不是有過多的作業(正在運行和正在初始化)或活動的任務;若仍不是,則進一步檢查作業是否處于PREP狀態(沒有被kill掉),然后放入篩選結果列表,并通知所在隊列,將其放入initializingJobs列表。以上過程詳見getJobsToInitialized方法的實現,這里不贅述。
?
下面獲取一個分配給該隊列的初始化線程,并將選擇初始化的作業加入屬于相應隊列的調度列表中:
?
JobInitializationThread t = threadsToQueueMap.get(queue);for (JobInProgress job : jobsToInitialize) {t.addJobsToQueue(queue, job);}每個初始化線程維護了一個map(jobsPerQueue),通過隊列名字可以找到由該線程初始化的隊列的作業調度列表。
最后,我們來看初始化線程JobInitializationThread的run方法,該方法中不停地調用initializeJobs方法:
?
?
void initializeJobs() {// while there are more jobs to initialize...while (currentJobCount.get() > 0) {Set<String> queues = jobsPerQueue.keySet();for (String queue : queues) {JobInProgress job = getFirstJobInQueue(queue);if (job == null) {continue;}LOG.info("Initializing job : " + job.getJobID() + " in Queue "+ job.getProfile().getQueueName() + " For user : "+ job.getProfile().getUser());if (startIniting) {setInitializingJob(job);ttm.initJob(job);setInitializingJob(null);} else {break;}}}}從代碼可見,獲取隊列中第一個作業,將其交給JobTracker的initJob初始化,初始化詳細過程見前面的一系列文章。至此,容量調度器的作業初始化過程分析完畢。
作為該小節的結束,這里說一下每個隊列中維護的幾個數據結構:
?
this.waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);this.initializingJobs =new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);this.runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);正如名稱暗示的那樣,三個列表分別持有等待初始化的作業、正在初始化的作業和正在運行的作業。它們共有的參數為一個Comparator對象,用于定義列表中作業的順序。
它的初始化如下:
?
?
if (supportsPriorities) {// use the default priority-aware comparatorcomparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;}else {comparator = STARTTIME_JOB_COMPARATOR;}如果調度器支持優先級,則比較器對象初始化為FIFO調度器中的FIFO比較器,原則是首先比較優先級,再比較開始時間,最后比較作業ID。如果調度器不支持優先級,則初始化為開始時間比較器,即先來先服務。
?
初始化線程會從waitingJobs列表中選擇要初始化的作業,被選擇的作業會放入initializingJobs列表,初始化后得到調度的作業會進入runningJobs列表。有關作業的調度見下一小節。
3. 任務調度
容量調度器采用三層調度模型:首先選擇一個隊列,其次選擇一個作業,最后選擇作業的任務。任務選擇由調度器的assignTasks方法完成,下面詳述該方法。
首先調用下面方法更新各個隊列的資源使用信息:
?
updateAllQueues(mapClusterCapacity, reduceClusterCapacity);具體到每個隊列中,調用隊列的updateAll方法。
?
首先更新隊列最新的map和reduce資源量:
?
// Compute new capacities for maps and reducesmapSlots.updateCapacities(capacityPercent, maxCapacityPercent, mapClusterCapacity);reduceSlots.updateCapacities(capacityPercent, maxCapacityPercent, reduceClusterCapacity);接下來將以下信息更新到每個作業的調度信息對象中:
?
?
j.setSchedulingInfo(CapacityTaskScheduler.getJobQueueSchedInfo(numMapsRunningForThisJob, numRunningMapSlots,numReservedMapSlotsForThisJob,numReducesRunningForThisJob, numRunningReduceSlots,numReservedReduceSlotsForThisJob));包括:作業正在運行的map和reduce作業數,作業正在使用的map和reduce資源數和為這個作業保留的map和reduce資源數。
?
然后將每個作業的資源使用信息反映到該作業所在隊列的相關信息中:
?
update(TaskType.MAP, j, j.getProfile().getUser(), numMapsRunningForThisJob, numMapSlotsForThisJob);update(TaskType.REDUCE, j, j.getProfile().getUser(), numReducesRunningForThisJob, numReduceSlotsForThisJob);包括隊列中正在運行的任務數,正在使用的資源量和用戶使用的資源量等信息。
?
更新后,通過addMapTasks和addReduceTask兩個方法調度任務:
?
// schedule tasksList<Task> result = new ArrayList<Task>();addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);關于隊列和作業的優先級前面已經提到,這里關注任務的優先順序。在addMapTasks方法中,調用CapacityScheduler的assignTasks方法:
?
?
JobInProgress job = taskTracker.getJobForFallowSlot(type);if (job != null) {if (availableSlots >= job.getNumSlotsPerTask(type)) {// Unreserve taskTracker.unreserveSlots(type, job);// We found a suitable job. Get task from it.if (type == TaskType.MAP) {// Don't care about locality!job.overrideSchedulingOpportunities();}return obtainNewTask(taskTrackerStatus, job, true);} else {// Re-reserve the current tasktrackertaskTracker.reserveSlots(type, job, availableSlots);return TaskLookupResult.getMemFailedResult(); }}首先,判斷TaskTracker是否正為某個作業預留資源(該作業為內存密集型,一個任務可能需要多個slot,上次調度沒有足夠的slot分配,故將其預留給該作業用于下次調度。這是容量調度器的大內存任務調度機制),若有預留,則判斷當前可用的資源是否能滿足該作業,若能則不再預留資源,并調用obtainNewTask方法將資源分配給該作業執行;若不能,繼續將當前資源預留給該作業,并返回內存失敗的結果。
?
如果TaskTracker沒有為某個作業預留資源,對于隊列集合中的每個隊里,從中選擇一個作業,并調用obtainNewTask方法獲得一個任務。當遇到當前可用資源不能滿足一個任務時,也要預留資源。注意,每次獲取一個任務都會返回獲取的狀態,代碼如下:
?
for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {//This call is for optimization if we are already over the//maximum-capacity we avoid traversing the queues.if (!queue.assignSlotsToQueue(type, 1)) {continue;}TaskLookupResult tlr = getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch);TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {continue; // Look in other queues.}// if we find a task, returnif (lookUpStatus == TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND ||lookUpStatus == TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {return tlr;}// if there was a memory mismatch, returnelse if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {return tlr;}}最后來分析一下,獲取任務的核心方法obtainNewTask:
?
?
TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job, boolean assignOffSwitch) throws IOException {ClusterStatus clusterStatus = scheduler.taskTrackerManager.getClusterStatus();int numTaskTrackers = clusterStatus.getTaskTrackers();int numUniqueHosts = scheduler.taskTrackerManager.getNumberOfUniqueHosts();// Inform the job it is about to get a scheduling opportunityjob.schedulingOpportunity();// First, try to get a 'local' taskTask t = job.obtainNewNodeOrRackLocalMapTask(taskTracker,numTaskTrackers,numUniqueHosts);if (t != null) {return TaskLookupResult.getTaskFoundResult(t, job); }// Next, try to get an 'off-switch' task if appropriate// Do not bother as much about locality for High-RAM jobsif (job.getNumSlotsPerMap() > 1 || (assignOffSwitch && job.scheduleOffSwitch(numTaskTrackers))) {t = job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);}return (t != null) ? TaskLookupResult.getOffSwitchTaskFoundResult(t, job) :TaskLookupResult.getNoTaskFoundResult();}與FIFO調度器的實現類似,首先也要試圖找到一個具有數據本地新的任務。若沒找到,則分配一個內存密集型任務或off-switch的任務。具體的分配過程參見前面的文章對FIFO任務調度的分析( MapReduce任務調度與執行原理之任務調度)。若仍然沒有找到,則返回沒有找到結果。
?
如果獲取到的任務數達到一次心跳返回的任務最大數量,則返回:
?
if (tasks.size() >= maxTasksPerHeartbeat) {return;}?
為了盡量提高任務的數據本地性,容量調度器采用了作業延遲調度機制:如果一個作業中未找到滿足數據本地性的任務,則會讓該作業跳過一定數目的機會,直到找到一個滿足數據本地性的任務或到達跳過次數上限。
?
if (job.getNumSlotsPerMap() > 1 || (assignOffSwitch && job.scheduleOffSwitch(numTaskTrackers))) {t = job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);}assignOffSwitch為true表示還未分配過不具有數據本地性的任務,scheduleOffSwitch用于判斷作業是否到達跳過次數上限:
?
?
public boolean scheduleOffSwitch(int numTaskTrackers) {long missedTaskTrackers = getNumSchedulingOpportunities();long requiredSlots = Math.min((desiredMaps() - finishedMaps()), numTaskTrackers);return (requiredSlots * localityWaitFactor) < missedTaskTrackers;}localityWaitFactor表示作業輸入數據所在結點數占結點總數的比例,requiredSlots表示作業還需要的資源數,二者的乘積來衡量跳過次數的上限,而missedTaskTrackers即為跳過次數。missedTaskTrackers每次分配任務時都會增加,如果分配到本地任務,則返回任務,該變量會重置為0;若沒有分配到,則表示跳過一次。在分配到非本地性任務后跳過次數也會重置為0。
?
reduce任務的分配機制相對簡單,只采用了大內存任務調度策略,調度器只要找到一個合適的reduce任務即返回,且沒有延遲調度。至此,容量調度器任務調度分析結束。
下一篇文章計劃學習Fair Scheduler。如有錯誤和問題,歡迎批評指正。
參考資料
?
【1】《Hadoop技術內幕--深入解析MapReduce架構設計與實現原理》董西成 【2】 ?Hadoop 1.0.0 源碼
?
2013年10月7日
?
?
?
?
總結
以上是生活随笔為你收集整理的MapReduce多用户任务调度器——容量调度器(Capacity Scheduler)原理和源码研究的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 对ios中CGContextRef和im
- 下一篇: 在新项目中要思考的技术点