Spark源码分析 -- SchedulableBuilder
SchedulableBuilder就是對(duì)Scheduleable tree的封裝,
在Pool層面(中間節(jié)點(diǎn)), 完成對(duì)TaskSet的調(diào)度(FIFO, FAIR)
在TaskSetManager 層面(葉子節(jié)點(diǎn)), 完成對(duì)TaskSet中task的調(diào)度(locality)以及track(retry)
TaskSetManager
用于封裝TaskSet, 主要提供對(duì)單個(gè)TaskSet內(nèi)部的tasks的track和schedule
所以主要的接口,
resourceOffer, 對(duì)于一個(gè)resource offer, 如何schedule一個(gè)task來執(zhí)行
statusUpdate, 對(duì)于task狀態(tài)的track
?
ClusterTaskSetManager
ClusterScheduler上對(duì)于TaskSetManager的實(shí)現(xiàn)
1 addPendingTask
locality, 在schedule時(shí)候需要考慮, 應(yīng)該優(yōu)先執(zhí)行盡可能近的task
所有未被執(zhí)行的tasks, 都是pending task, 并且是安裝不同locality粒度存儲(chǔ)在hashmap中的
pendingTasksForExecutor, hashmap, 每個(gè)executor被指定的task
pendingTasksForHost,? hashmap, 每個(gè)instance被指定的task
pendingTasksForRack, hashmap, 每個(gè)機(jī)架被指定的task
pendingTasksWithNoPrefs, ArrayBuffer, 沒有l(wèi)ocality preferences的tasks, 隨便在那邊執(zhí)行
allPendingTasks, ArrayBuffer, 所有的pending task
speculatableTasks, 重復(fù)的task, 熟悉hadoop的應(yīng)該容易理解
可以繼續(xù)看下addPendingTask, 如何把task加到各個(gè)list上去
addPendingTask(index: Int, readding: Boolean = false)
兩個(gè)參數(shù),
index, task的index, 用于從taskset中取得task
readding, 表示是否新的task, 因?yàn)楫?dāng)executor失敗的時(shí)候, 也需要把task重新再加到各個(gè)list中, list中有重復(fù)的task是沒有關(guān)系的, 因?yàn)檫x取task的時(shí)候會(huì)自動(dòng)忽略已經(jīng)run的task
?
2 resourceOffer
解決如何在taskset內(nèi)部schedule一個(gè)task, 主要需要考慮的是locality, 直接看注釋
其中比較意思的是, 對(duì)currentLocalityIndex的維護(hù)
初始時(shí)為0, PROCESS_LOCAL, 只能選擇PendingTasksForExecutor
每次調(diào)用resourceOffer, 都會(huì)計(jì)算和前一次task launch之間的時(shí)間間隔, 如果超時(shí)(各個(gè)locality的超時(shí)時(shí)間不同), currentLocalityIndex會(huì)加1, 即不斷的放寬
而代表前一次的lastLaunchTime, 只有在resourceOffer中成功的findTask時(shí)會(huì)被更新, 所以邏輯就是優(yōu)先選擇更local的task, 但當(dāng)findTask總失敗時(shí), 說明需要放寬
但是放寬后, 當(dāng)有比較local的task被選中時(shí), 這個(gè)currentLocalityIndex還會(huì)縮小, 因?yàn)槊看味紩?huì)把tasklocality賦值給currentLocality
?
3 statusUpdate
應(yīng)對(duì)statusUpdate, 主要是通過在clusterScheduler中注冊(cè)的listener通知DAGScheduler
當(dāng)然對(duì)于失敗的task, 還要再加到pending list里面去
// Set of pending tasks for each executor. These collections are actually// treated as stacks, in which new tasks are added to the end of the// ArrayBuffer and removed from the end. This makes it faster to detect// tasks that repeatedly fail because whenever a task failed, it is put// back at the head of the stack. They are also only cleaned up lazily;// when a task is launched, it remains in all the pending lists except// the one that it was launched from, but gets removed from them later.private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]// Set of pending tasks for each host. Similar to pendingTasksForExecutor,// but at host level.private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]// Set of pending tasks for each rack -- similar to the above.private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]// Set containing pending tasks with no locality preferences.val pendingTasksWithNoPrefs = new ArrayBuffer[Int]// Set containing all pending tasks (also used as a stack, as above).val allPendingTasks = new ArrayBuffer[Int]// Tasks that can be speculated. Since these will be a small fraction of total// tasks, we'll just hold them in a HashSet.val speculatableTasks = new HashSet[Int] ? // Figure out which locality levels we have in our TaskSet, so we can do delay schedulingval myLocalityLevels = computeValidLocalityLevels() // 當(dāng)前TaskSet里面的task locality有哪些val localityWaits = myLocalityLevels.map(getLocalityWait) // 每個(gè)locality level默認(rèn)的等待時(shí)間(從配置讀)// Delay scheduling variables: we keep track of our current locality level and the time we// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.// We then move down if we manage to launch a "more local" task.var currentLocalityIndex = 0 // 當(dāng)前myLocalityLevels中的index, 從0開始, 從最小的開始schedulevar lastLaunchTime = clock.getTime() // 記錄最后launch task的時(shí)間, 用于后面會(huì)算超時(shí), 如果發(fā)生超時(shí), currentLocalityIndex+1 ? /*** Add a task to all the pending-task lists that it should be on. If readding is set, we are* re-adding the task so only include it in each list if it's not already there.*/private def addPendingTask(index: Int, readding: Boolean = false) {// Utility method that adds `index` to a list only if readding=false or it's not already theredef addTo(list: ArrayBuffer[Int]) {if (!readding || !list.contains(index)) { // 新的的task或在該list里面沒有list += index}}var hadAliveLocations = falsefor (loc <- tasks(index).preferredLocations) {for (execId <- loc.executorId) {if (sched.isExecutorAlive(execId)) {addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) // 首先加到相應(yīng)的executor列表中hadAliveLocations = true}}if (sched.hasExecutorsAliveOnHost(loc.host)) {addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) // 加到host的列表中 for (rack <- sched.getRackForHost(loc.host)) {addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) // 加到Rack的列表中}hadAliveLocations = true}}if (!hadAliveLocations) { // 如果上面的選擇都失敗了, 或本來就沒有preferred locations, 那就加到pendingTasksWithNoPrefs中// Even though the task might've had preferred locations, all of those hosts or executors// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.addTo(pendingTasksWithNoPrefs)}if (!readding) { // 對(duì)于新的task, 需要加到allPendingTasks中allPendingTasks += index // No point scanning this whole list to find the old task there}} ? /*** Dequeue a pending task for a given node and return its index and locality level.* Only search for tasks matching the given locality constraint.*/private def findTask(execId: String, host: String, locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] ={// 先從Executor for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) { // findTaskFromList, Dequeue a pending task from the given list and return its index.return Some((index, TaskLocality.PROCESS_LOCAL))}// Node, 需要先check localityif (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { // locality >= TaskLocality.NODE_LOCAL for (index <- findTaskFromList(getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL))}}// Rack, 需要先check locality if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {for {rack <- sched.getRackForHost(host)index <- findTaskFromList(getPendingTasksForRack(rack))} {return Some((index, TaskLocality.RACK_LOCAL))}}// Look for no-pref tasks after rack-local tasks since they can run anywhere.for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {return Some((index, TaskLocality.PROCESS_LOCAL))}if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {for (index <- findTaskFromList(allPendingTasks)) {return Some((index, TaskLocality.ANY))}}// Finally, if all else has failed, find a speculative taskreturn findSpeculativeTask(execId, host, locality)}
?
/*** Respond to an offer of a single executor from the scheduler by finding a task*/override def resourceOffer(execId: String,host: String,availableCpus: Int,maxLocality: TaskLocality.TaskLocality): Option[TaskDescription] ={if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { // 前提是task沒有執(zhí)行完和有足夠的available cores(>1)val curTime = clock.getTime()var allowedLocality = getAllowedLocalityLevel(curTime) // 取到當(dāng)前allowed LocalityLevelif (allowedLocality > maxLocality) { // 不能超出作為參數(shù)傳入的maxLocality, 調(diào)用者限定allowedLocality = maxLocality // We're not allowed to search for farther-away tasks}findTask(execId, host, allowedLocality) match { // 調(diào)用findTask, 并對(duì)返回值進(jìn)行case, findTask邏輯很簡單就是依次從不同的locality中取taskcase Some((index, taskLocality)) => {// Found a task; do some bookkeeping and return a task descriptionval task = tasks(index)val taskId = sched.newTaskId()// Figure out whether this should count as a preferred launchlogInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(taskSet.id, index, taskId, execId, host, taskLocality))// Do various bookkeepingcopiesRunning(index) += 1val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)taskInfos(taskId) = infotaskAttempts(index) = info :: taskAttempts(index)// Update our locality level for delay schedulingcurrentLocalityIndex = getLocalityIndex(taskLocality) // 用當(dāng)前Task的locality來更新currentLocalityIndex, 這里index有可能會(huì)減少, 因?yàn)閠askLocality <= currentLocality lastLaunchTime = curTime // 更新lastLaunchTime // Serialize and return the taskval startTime = clock.getTime()// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here// we assume the task can be serialized without exceptions.val serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)val timeTaken = clock.getTime() - startTimeincreaseRunningTasks(1)logInfo("Serialized task %s:%d as %d bytes in %d ms".format(taskSet.id, index, serializedTask.limit, timeTaken))val taskName = "task %s:%d".format(taskSet.id, index)if (taskAttempts(index).size == 1)taskStarted(task,info)return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) // 最終返回schedule得到的那個(gè)task}case _ =>}}return None} ? /*** Get the level we can launch tasks according to delay scheduling, based on current wait time.*/private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) && // 發(fā)生超時(shí)currentLocalityIndex < myLocalityLevels.length - 1){// Jump to the next locality level, and remove our waiting time for the current one since// we don't want to count it again on the next onelastLaunchTime += localityWaits(currentLocalityIndex)currentLocalityIndex += 1 // currentLocalityIndex 加 1}myLocalityLevels(currentLocalityIndex)} ? /*** Find the index in myLocalityLevels for a given locality. This is also designed to work with* localities that are not in myLocalityLevels (in case we somehow get those) by returning the* next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY.*/def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = { // 查詢locality在myLocalityLevels中的indexvar index = 0while (locality > myLocalityLevels(index)) {index += 1}index} /*** Compute the locality levels used in this TaskSet. Assumes that all tasks have already been* added to queues using addPendingTask.*/ // 僅僅從各個(gè)pending list中看看當(dāng)前的taskset中的task有哪些preference locality, 從小到大 private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}val levels = new ArrayBuffer[TaskLocality.TaskLocality]if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {levels += PROCESS_LOCAL}if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {levels += NODE_LOCAL}if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {levels += RACK_LOCAL}levels += ANYlogDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))levels.toArray} /** Called by cluster scheduler when one of our tasks changes state */override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {SparkEnv.set(env)state match {case TaskState.FINISHED =>taskFinished(tid, state, serializedData)case TaskState.LOST =>taskLost(tid, state, serializedData)case TaskState.FAILED =>taskLost(tid, state, serializedData)case TaskState.KILLED =>taskLost(tid, state, serializedData)case _ =>}} def taskStarted(task: Task[_], info: TaskInfo) {sched.listener.taskStarted(task, info) } }?
Pool
一種對(duì)schedulableQueue的抽象, 什么是schedulable?
注釋說的, 包含Pools and TaskSetManagers, 這里設(shè)計(jì)有問題, 你會(huì)發(fā)現(xiàn)Pools和TaskSetManagers的核心接口完全不同, 雖然TaskSetManagers里面也實(shí)現(xiàn)了這些接口, 但都是meanless的
簡單理解成, 作者想要統(tǒng)一對(duì)待, 泛化Pools和TaskSetManagers, 所以這樣做了
所以對(duì)于Pool, 可以理解為TaskSetManagers的容器, 當(dāng)然由于Pool本身也是Schedulable, 所以容器里面也可以放Pool
核心接口getSortedTaskSetQueue, 通過配置不同的SchedulingAlgorithm來調(diào)度TaskSetManagers(或pool)
所以注意那些FIFO或FAIR都是用來調(diào)度TaskSet的, 所以Spark調(diào)度的基礎(chǔ)是stage
/*** An interface for schedulable entities.* there are two type of Schedulable entities(Pools and TaskSetManagers)*/ private[spark] trait Schedulable {var parent: Schedulable// child queuesdef schedulableQueue: ArrayBuffer[Schedulable]def schedulingMode: SchedulingModedef weight: Intdef minShare: Intdef runningTasks: Intdef priority: Intdef stageId: Intdef name: Stringdef increaseRunningTasks(taskNum: Int): Unitdef decreaseRunningTasks(taskNum: Int): Unitdef addSchedulable(schedulable: Schedulable): Unitdef removeSchedulable(schedulable: Schedulable): Unitdef getSchedulableByName(name: String): Schedulabledef executorLost(executorId: String, host: String): Unitdef checkSpeculatableTasks(): Booleandef getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]def hasPendingTasks(): Boolean }?
package org.apache.spark.scheduler.cluster //An Schedulable entity that represent collection of Pools or TaskSetManagers private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMode,initMinShare: Int,initWeight: Int)extends Schedulablewith Logging {var schedulableQueue = new ArrayBuffer[Schedulable] // 用于buffer Schedulable, TaskSetManagervar schedulableNameToSchedulable = new HashMap[String, Schedulable]var priority = 0var stageId = 0var name = poolNamevar parent:Schedulable = nullvar taskSetSchedulingAlgorithm: SchedulingAlgorithm = { // SchedulingAlgorithm其實(shí)就是定義comparator,后面好將TaskSet排序schedulingMode match {case SchedulingMode.FAIR => new FairSchedulingAlgorithm() // Faircase SchedulingMode.FIFO =>new FIFOSchedulingAlgorithm() // FIFO}}override def addSchedulable(schedulable: Schedulable) { // 增加一個(gè)TaskSetManagerschedulableQueue += schedulableschedulableNameToSchedulable(schedulable.name) = schedulableschedulable.parent= this}override def removeSchedulable(schedulable: Schedulable) { // 刪除一個(gè)TaskSetManager schedulableQueue -= schedulableschedulableNameToSchedulable -= schedulable.name}override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { // 返回排過序的TaskSetManager列表var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) // sortWith for (schedulable <- sortedSchedulableQueue) {sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() // 這里的schedulable有可能也是pool, 所以需要遞歸調(diào)用}return sortedTaskSetQueue} }?
SchedulableBuilder
上面說了Pool里面可以是TaskSetManagers也可以是pool, 這樣是不是可以形成tree
SchedulableBuilder就是對(duì)Schedulable Tree的封裝, 通過TaskSetManagers(葉節(jié)點(diǎn))和pools(中間節(jié)點(diǎn)), 來生成Schedulable Tree
這里只列出最簡單的FIFO, 看不出tree的感覺
對(duì)于FIFO很簡單, 直接使用一個(gè)Pool就可以, 把所有的TaskSet使用addSchedulable加進(jìn)去, 然后排序讀出來即可
這里沒有列出Fair的實(shí)現(xiàn), 比較復(fù)雜, 后面再分析吧
/*** An interface to build Schedulable tree* buildPools: build the tree nodes(pools)* addTaskSetManager: build the leaf nodes(TaskSetManagers)*/ private[spark] trait SchedulableBuilder {def buildPools()def addTaskSetManager(manager: Schedulable, properties: Properties) }private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)extends SchedulableBuilder with Logging {override def buildPools() {// nothing}override def addTaskSetManager(manager: Schedulable, properties: Properties) {rootPool.addSchedulable(manager)} }轉(zhuǎn)載于:https://www.cnblogs.com/fxjwind/p/3507307.html
總結(jié)
以上是生活随笔為你收集整理的Spark源码分析 -- SchedulableBuilder的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java-eclipse快捷键及设置
- 下一篇: 多个域名要选择合适的SSL证书