深入理解Spark 2.1 Core (三):任务调度器的原理与源码分析
提交Task
調(diào)用棧如下:
-
TaskSchedulerImpl.submitTasks
- CoarseGrainedSchedulerBackend.reviveOffers
-
CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers
- TaskSchedulerImpl.resourceOffers
- TaskSchedulerImpl.resourceOfferSingleTaskSet
- CoarseGrainedSchedulerBackend.DriverEndpoint.launchTasks
- TaskSchedulerImpl.resourceOffers
TaskSchedulerImpl.submitTasks
TaskSchedulerImpl是TaskScheduler的子類,重寫了submitTasks:
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {//生成TaskSetManagerval manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}//將manager等信息放入調(diào)度器schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}//分配資源backend.reviveOffers()}- 1
CoarseGrainedSchedulerBackend.reviveOffers
下面我們來(lái)講講上一節(jié)代碼中最后一句:
backend.reviveOffers()我們先回過頭來(lái)看TaskScheduler是如何啟動(dòng)的:
override def start() {backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")speculationScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryOrStopSparkContext(sc) {checkSpeculatableTasks()}}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)}}- 1
- 2
- 3
我們可以看到TaskScheduler.start會(huì)調(diào)用backend.start()。
backend是一個(gè)SchedulerBackend接口。SchedulerBackend接口由CoarseGrainedSchedulerBackend類實(shí)現(xiàn)。我們看下CoarseGrainedSchedulerBackend的start:
override def start() {val properties = new ArrayBuffer[(String, String)]for ((key, value) <- scheduler.sc.conf.getAll) {if (key.startsWith("spark.")) {properties += ((key, value))}}driverEndpoint = createDriverEndpointRef(properties)}- 1
- 2
我們可以看到CoarseGrainedSchedulerBackend的start會(huì)生成driverEndpoint,它是一個(gè)rpc的終端,一個(gè)RpcEndpoint接口,它由ThreadSafeRpcEndpoint接口實(shí)現(xiàn),而ThreadSafeRpcEndpoint由CoarseGrainedSchedulerBackend的內(nèi)部類DriverEndpoint實(shí)現(xiàn)。
CoarseGrainedSchedulerBackend的reviveOffers就是發(fā)送給這個(gè)rpc的終端ReviveOffers信號(hào)。
override def reviveOffers() {driverEndpoint.send(ReviveOffers)}- 1
CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers
DriverEndpoint有兩種發(fā)送信息的函數(shù)。一個(gè)是send,發(fā)送信息后不需要對(duì)方回復(fù)。一個(gè)是ask,發(fā)送信息后需要對(duì)方回復(fù)。
對(duì)應(yīng)著,也有兩種接收信息的函數(shù)。一個(gè)是receive,接收后不回復(fù)對(duì)方:
- 1
- 2
另外一個(gè)是receiveAndReply,接收后回復(fù)對(duì)方:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)} else {val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorRef.address, hostname,cores, cores, logUrls)CoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}executorRef.send(RegisteredExecutor)context.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))makeOffers()}case StopDriver =>context.reply(true)stop()case StopExecutors =>logInfo("Asking each executor to shut down")for ((_, executorData) <- executorDataMap) {executorData.executorEndpoint.send(StopExecutor)}context.reply(true)case RemoveExecutor(executorId, reason) =>executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))removeExecutor(executorId, reason)context.reply(true)case RetrieveSparkAppConfig =>val reply = SparkAppConfig(sparkProperties,SparkEnv.get.securityManager.getIOEncryptionKey())context.reply(reply)}private def makeOffers() {val activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}- 1
我們可以看到之前在CoarseGrainedSchedulerBackend的reviveOffers發(fā)送的ReviveOffers信號(hào)會(huì)在receive中被接收,從而調(diào)用makeOffers:
case ReviveOffers =>makeOffers()- 1
makeOffers做的工作為:
private def makeOffers() {//過濾掉被殺死的Executorval activeExecutors = executorDataMap.filterKeys(executorIsAlive)//根據(jù)activeExecutors生成workOffers,//即executor所能提供的資源信息。val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeq//scheduler.resourceOffers分配資源,//并launchTasks發(fā)送任務(wù)launchTasks(scheduler.resourceOffers(workOffers))}- 1
- 4
launchTasks主要的實(shí)現(xiàn)是向executor發(fā)送LaunchTask信號(hào):
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))TaskSchedulerImpl.resourceOffers
下面我們來(lái)深入上節(jié)scheduler.resourceOffers分配資源的函數(shù):
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {//標(biāo)記每個(gè)活的節(jié)點(diǎn)并記錄它的主機(jī)名//并且追蹤是否有新的executor加入var newExecAvail = falsefor (o <- offers) {if (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) = new HashSet[String]()}if (!executorIdToRunningTaskIds.contains(o.executorId)) {hostToExecutors(o.host) += o.executorIdexecutorAdded(o.executorId, o.host)executorIdToHost(o.executorId) = o.hostexecutorIdToRunningTaskIds(o.executorId) = HashSet[Long]()newExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host}}// 為了避免將Task集中分配到某些機(jī)器,隨機(jī)的打散它們val shuffledOffers = Random.shuffle(offers)// 建立每個(gè)worker的TaskDescription數(shù)組val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))//記錄各個(gè)worker的available Cpusval availableCpus = shuffledOffers.map(o => o.cores).toArray//獲取按照調(diào)度策略排序好的TaskSetManagerval sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))//如果有新的executor加入//則需要從新計(jì)算TaskSetManager的就近原則if (newExecAvail) {taskSet.executorAdded()}}// 得到調(diào)度序列中的每個(gè)TaskSet,// 然后按節(jié)點(diǎn)的locality級(jí)別增序分配資源// Locality優(yōu)先序列為: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYfor (taskSet <- sortedTaskSets) {var launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = false//按照就近原則分配for (currentMaxLocality <- taskSet.myLocalityLevels) {do {//resourceOfferSingleTaskSet為單個(gè)TaskSet分配資源,//若該LocalityLevel的節(jié)點(diǎn)下不能再為之分配資源了,//則返回falselaunchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}if (tasks.size > 0) {hasLaunchedTask = true}return tasks}- 1
- 63
這里涉及到兩個(gè)排序,首先調(diào)度器會(huì)對(duì)TaskSet進(jìn)行排序:
val sortedTaskSets = rootPool.getSortedTaskSetQueue取出每個(gè)TaskSet后,我們又會(huì)根據(jù)從近到遠(yuǎn)的Locality Level 的來(lái)對(duì)各個(gè)Task進(jìn)行資源的分配。
TaskSchedulerImpl.resourceOfferSingleTaskSet
接下來(lái)我們來(lái)看下為單個(gè)TaskSet分配資源的具體實(shí)現(xiàn):
private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: Seq[WorkerOffer],availableCpus: Array[Int],tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {var launchedTask = false//遍歷各個(gè)executorfor (i <- 0 until shuffledOffers.size) {val execId = shuffledOffers(i).executorIdval host = shuffledOffers(i).hostif (availableCpus(i) >= CPUS_PER_TASK) {try {//獲取taskSet中,相對(duì)于該execId, host所能接收的最大距離maxLocality的task//maxLocality的值在TaskSchedulerImpl.resourceOffers中從近到遠(yuǎn)的遍歷for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {tasks(i) += taskval tid = task.taskIdtaskIdToTaskSetManager(tid) = taskSettaskIdToExecutorId(tid) = execIdexecutorIdToRunningTaskIds(execId).add(tid)availableCpus(i) -= CPUS_PER_TASKassert(availableCpus(i) >= 0)launchedTask = true}} catch {case e: TaskNotSerializableException =>logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")return launchedTask}}}return launchedTask}- 1
- 17
CoarseGrainedSchedulerBackend.DriverEndpoint.launchTasks
我們回到CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers,看最后一步,發(fā)送任務(wù)的函數(shù)launchTasks:
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val serializedTask = ser.serialize(task)//若序列話Task大小達(dá)到Rpc限制,//則停止if (serializedTask.limit >= maxRpcMessageSize) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.rpc.message.maxSize (%d bytes). Consider increasing " +"spark.rpc.message.maxSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {// 減少改task所對(duì)應(yīng)的executor信息的core數(shù)量val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASKlogDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")//向executorEndpoint 發(fā)送LaunchTask 信號(hào)executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}}- 1
- 2
executorEndpoint接收到LaunchTask信號(hào)(包含SerializableBuffer(serializedTask) )后,會(huì)開始執(zhí)行任務(wù)。
調(diào)度任務(wù)
Pool.getSortedTaskSetQueue
上一章我們講到TaskSchedulerImpl.resourceOffers中會(huì)調(diào)用:
val sortedTaskSets = rootPool.getSortedTaskSetQueue獲取按照調(diào)度策略排序好的TaskSetManager。接下來(lái)我們深入講解這行代碼。
rootPool是一個(gè)Pool對(duì)象。Pool定義為:一個(gè)可調(diào)度的實(shí)體,代表著Pool的集合或者TaskSet的集合,即Schedulable為一個(gè)接口,由Pool類和TaskSetManager類實(shí)現(xiàn)
getSortedTaskSetQueue:
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {//生成TaskSetManager數(shù)組var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]//對(duì)調(diào)度實(shí)體進(jìn)行排序val sortedSchedulableQueue =schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable <- sortedSchedulableQueue) {//從調(diào)度實(shí)體中取得TaskSetManager數(shù)組sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue}sortedTaskSetQueue}- 1
其中調(diào)度算法taskSetSchedulingAlgorithm,會(huì)在Pool被生成時(shí)候根據(jù)SchedulingMode被設(shè)定為FairSchedulingAlgorithm或者FIFOSchedulingAlgorithm
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {schedulingMode match {case SchedulingMode.FAIR =>new FairSchedulingAlgorithm()case SchedulingMode.FIFO =>new FIFOSchedulingAlgorithm()case _ =>val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."throw new IllegalArgumentException(msg)}}- 1
- 7
TaskSchedulerImpl.initialize
Pool被生成是什么時(shí)候被生成的呢?我們來(lái)看下TaskSchedulerImpl的初始化就能發(fā)現(xiàn):
def initialize(backend: SchedulerBackend) {this.backend = backend// 創(chuàng)建一個(gè)名字為空的rootPoolrootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {//TaskSchedulerImpl在初始化時(shí),//根據(jù)SchedulingMode來(lái)創(chuàng)建不同的schedulableBuildercase SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)case _ =>throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")}}schedulableBuilder.buildPools()}- 1
- 15
FIFO 調(diào)度
FIFOSchedulableBuilder.addTaskSetManager
接下來(lái),我們回過頭看TaskSchedulerImpl.submitTasks中的schedulableBuilder.addTaskSetManager。
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)我們深入講一下addTaskSetManager:
schedulableBuilder是一個(gè)SchedulableBuilder接口,SchedulableBuilder接口由兩個(gè)類FIFOSchedulableBuilder和FairSchedulableBuilder實(shí)現(xiàn)。
我們這里先講解FIFOSchedulableBuilder,FIFOSchedulableBuilder的addTaskSetManager:
override def addTaskSetManager(manager: Schedulable, properties: Properties) {rootPool.addSchedulable(manager)}- 1
再看addSchedulable:
override def addSchedulable(schedulable: Schedulable) {require(schedulable != null)schedulableQueue.add(schedulable)schedulableNameToSchedulable.put(schedulable.name, schedulable)schedulable.parent = this}- 1
實(shí)際上是將manager加入到schedulableQueue(這里是FIFO的queue),將manger的name加入到一個(gè)名為schedulableNameToSchedulable的 ConcurrentHashMap[String, Schedulable]中,并將manager的parent設(shè)置為rootPool。
FIFOSchedulableBuilder.buildPools()
上述后一行代碼:
schedulableBuilder.buildPools()- 1
buildPools會(huì)因不同的調(diào)度器而異。如果是FIFOSchedulableBuilder,那么就為空:
override def buildPools() {// nothing}這是因?yàn)閞ootPool里面不包含其他的Pool,而是像上述所講的直接將manager的parent設(shè)置為rootPool。實(shí)際上,這是一種2層的樹形結(jié)構(gòu),第0層為rootPool,第二層葉子節(jié)點(diǎn)為各個(gè)manager:
FIFOSchedulingAlgorithm
一切就緒后,我們可以來(lái)看FIFO的核心調(diào)度算法了:
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {//priority實(shí)際上是 Job IDval priority1 = s1.priorityval priority2 = s2.priority//先比較Job IDvar res = math.signum(priority1 - priority2)if (res == 0) {//若Job ID相同,//則比較 Stage IDval stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}res < 0} }FAIR 調(diào)度
FairSchedulableBuilder.addTaskSetManager
FairSchedulableBuilder的addTaskSetManager會(huì)比FIFOSchedulableBuilder的復(fù)雜:
override def addTaskSetManager(manager: Schedulable, properties: Properties) {//先生成一個(gè)默認(rèn)的parentPoolvar poolName = DEFAULT_POOL_NAMEvar parentPool = rootPool.getSchedulableByName(poolName)//若有配置信息,//則根據(jù)配置信息得到poolNameif (properties != null) {//FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)parentPool = rootPool.getSchedulableByName(poolName)//若rootPool中沒有這個(gè)poolif (parentPool == null) {//我們會(huì)根據(jù)用戶在app上的配置生成新的pool,//而不是根據(jù)xml 文件parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)rootPool.addSchedulable(parentPool)logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))}}//將這個(gè)manager加入到這個(gè)poolparentPool.addSchedulable(manager)logInfo("Added task set " + manager.name + " tasks to pool " + poolName)} }- 3
FairSchedulableBuilder.buildPools()
FairSchedulableBuilder.buildPools需要根據(jù)$SPARK_HOME/conf/fairscheduler.xml文件來(lái)構(gòu)建調(diào)度樹。配置文件大致如下:
<allocations><pool name="production"><schedulingMode>FAIR</schedulingMode><weight>1</weight><minShare>2</minShare></pool><pool name="test"><schedulingMode>FIFO</schedulingMode><weight>2</weight><minShare>3</minShare></pool> </allocations>- 1
- 5
buildFairSchedulerPool:
private def buildFairSchedulerPool(is: InputStream) {//加載xml 文件val xml = XML.load(is)//遍歷for (poolNode <- (xml \\ POOLS_PROPERTY)) {val poolName = (poolNode \ POOL_NAME_PROPERTY).textvar schedulingMode = DEFAULT_SCHEDULING_MODEvar minShare = DEFAULT_MINIMUM_SHAREvar weight = DEFAULT_WEIGHTval xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).textif (xmlSchedulingMode != "") {try {schedulingMode = SchedulingMode.withName(xmlSchedulingMode)} catch {case e: NoSuchElementException =>logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +s"using the default schedulingMode: $schedulingMode")}}val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).textif (xmlMinShare != "") {minShare = xmlMinShare.toInt}val xmlWeight = (poolNode \ WEIGHT_PROPERTY).textif (xmlWeight != "") {weight = xmlWeight.toInt}//根據(jù)xml的配置,//最終生成一個(gè)新的Poolval pool = new Pool(poolName, schedulingMode, minShare, weight)//將這個(gè)Pool加入到rootPool中rootPool.addSchedulable(pool)logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(poolName, schedulingMode, minShare, weight))}}- 1
- 26
可想而知,FAIR 調(diào)度并不是簡(jiǎn)單的公平調(diào)度。我們會(huì)先根據(jù)xml配置文件生成很多pool加入rootPool中,而每個(gè)app會(huì)根據(jù)配置“Spark.scheduler.pool”的poolName,將TaskSetManager加入到某個(gè)pool中。其實(shí),rootPool還會(huì)對(duì)Pool也進(jìn)程一次調(diào)度。
所以,在FAIR調(diào)度策略中包含了兩層調(diào)度。第一層的rootPool內(nèi)的多個(gè)Pool,第二層是Pool內(nèi)的多個(gè)TaskSetManager。fairscheduler.xml文件中, weight(任務(wù)權(quán)重)和minShare(最小任務(wù)數(shù))是來(lái)設(shè)置第一層調(diào)度的,該調(diào)度使用的是FAIR算法。而第二層調(diào)度由schedulingMode設(shè)置。
但對(duì)于Standalone模式下的單個(gè)app,FAIR調(diào)度的多個(gè)Pool顯得雞肋,因?yàn)閍pp只能選擇一個(gè)Pool。但是我們可以在代碼級(jí)別硬編碼的去分配:
sc.setLocalProperty("spark.scheduler.pool", "Pool_1")FAIRSchedulingAlgorithm
接下來(lái),我們就來(lái)講解FAIR算法:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasks//若s1運(yùn)行的任務(wù)數(shù)小于s1的最小任務(wù)數(shù)val s1Needy = runningTasks1 < minShare1//若s2運(yùn)行的任務(wù)數(shù)小于s2的最小任務(wù)數(shù)val s2Needy = runningTasks2 < minShare2//minShareRatio = 運(yùn)行的任務(wù)數(shù)/最小任務(wù)數(shù) //代表著負(fù)載程度,越小,負(fù)載越小val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)//taskToWeightRatio = 運(yùn)行的任務(wù)數(shù)/權(quán)重//權(quán)重越大,越優(yōu)先//即taskToWeightRatio 越小 越優(yōu)先val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDoublevar compare = 0//若s1運(yùn)行的任務(wù)小于s1的最小任務(wù)數(shù),而s2不然//則s1優(yōu)先if (s1Needy && !s2Needy) {return true} //若s2運(yùn)行的任務(wù)小于s2的最小任務(wù)數(shù),而s1不然//則s2優(yōu)先else if (!s1Needy && s2Needy) {return false} //若s1 s2 運(yùn)行的任務(wù)都小于自己的的最小任務(wù)數(shù)//比較minShareRatio,哪個(gè)小,哪個(gè)優(yōu)先else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} //若s1 s2 運(yùn)行的任務(wù)都不小于自己的的最小任務(wù)數(shù)//比較taskToWeightRatio,哪個(gè)小,哪個(gè)優(yōu)先else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}} }至此,TaskScheduler在發(fā)送任務(wù)給executor前的工作就全部完成了。
總結(jié)
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (三):任务调度器的原理与源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (