Spark详解(六):Spark集群资源调度算法原理
1. 應(yīng)用程序之間
在Standalone模式下,Master提供里資源管理調(diào)度功能。在調(diào)度過程中,Master先啟動等待列表中應(yīng)用程序的Driver,這個Driver盡可能分散在集群的Worker節(jié)點上,然后根據(jù)集群的內(nèi)存和CPU使用情況,對等待運行的應(yīng)用程序進行資源分配。默認分配規(guī)則是有條件的FIFO,先分配的應(yīng)用程序會盡可能多的獲取滿足條件的資源,后分配的應(yīng)用程序只能在剩余資源中再次篩選。如果沒有合適資源的應(yīng)用程序只能等待。Master.scheduler方法如下:
private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// 隨機打亂Worker節(jié)點val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0// 這是只對Standalone下的Cluster模式才生效,client模式Driver是在客戶端for (driver <- waitingDrivers.toList) { var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}// 對等待應(yīng)用程序按照順序分配運行資源startExecutorsOnWorkers()}默認情況下,在Standalone模式下,每個應(yīng)用程序可以分配到的CPU核數(shù)可以由spark.deploy.defaultCores進行設(shè)置,但是該配置默認為Int.max,也就是不限制,從而應(yīng)用程序會盡可能獲取CPU資源。為了限制每個應(yīng)用程序使用CPU資源,用戶一方面可以設(shè)置spark.core.max配置項,約束每個應(yīng)用程序所能申請的最大CPU核數(shù);另一方面可以設(shè)置spark.executor.cores配置項,用于設(shè)置在每個Executor上啟動的CPU核數(shù)。
/*** Schedule and launch executors on workers*/private def startExecutorsOnWorkers(): Unit = {// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app// in the queue, then the second app, etc.// 使用FIFO算法運行應(yīng)用,即先注冊的應(yīng)用先運行for (app <- waitingApps if app.coresLeft > 0) {val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor// Filter out workers that don't have enough resources to launch an executorval usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor.getOrElse(1)).sortBy(_.coresFree).reverse// 一種是spreadOutApps,就是把應(yīng)用運行在盡量多的Worker上,另一種是非spreadOutAppsval assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate them// 給每個worker分配完application要求的cpu core之后,遍歷worker啟動executorfor (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))}}}對于Worker的分配策略有兩種:一種是盡量把應(yīng)用程序運行可能多的Worker上,這種分配算法不僅能充分利用集群資源,還有利于數(shù)據(jù)本地性;另一種就是應(yīng)用程序運行在盡量少的Worker上,這種適用于CPU密集型而內(nèi)存使用較少的場景。配置項為spark.deploy.spreadOut。主要代碼為:Master.scheduleExecutorsOnWorkers方法實現(xiàn)。
private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {// 應(yīng)用程序中每個Executor所需要的CPU核數(shù)val coresPerExecutor = app.desc.coresPerExecutor// 每個Executor所需的最少核數(shù),如果設(shè)置了coresPerExecutor則為該值,否則為1val minCoresPerExecutor = coresPerExecutor.getOrElse(1)// 如果沒有設(shè)置coresPerExecutor,那么每個Worker上只有一個Executor,并盡可能分配資源val oneExecutorPerWorker = coresPerExecutor.isEmpty// 每個Executor需要分配多少內(nèi)存val memoryPerExecutor = app.desc.memoryPerExecutorMB// 集群中可用的Worker節(jié)點的數(shù)量val numUsable = usableWorkers.length// Worker節(jié)點所能提供的CPU核數(shù)數(shù)組val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker// Worker分配Executor個數(shù)數(shù)組val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker// 需要分配的CPU核數(shù),為應(yīng)用程序所需CPU核數(shù)和可用CPU核數(shù)最小值var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)/** Return whether the specified worker can launch an executor for this app. *//*** 返回指定的Worker節(jié)點是否能夠啟動Executor,滿足條件:* 1. 應(yīng)用程序需要分配CPU核數(shù)>=每個Executor所需的最少CPU核數(shù)* 2. 是否有足夠的CPU核數(shù),判斷條件為該Worker節(jié)點可用CPU核數(shù)-該Worker節(jié)點已分配的CPU核數(shù)>=每個Executor所需最少CPU核數(shù)* 如果在該Worker節(jié)點上允許啟動新的Executor,需要追加以下兩個條件:* 1. 判斷內(nèi)存是否足夠啟動Executor,其方法是:當(dāng)前Worker節(jié)點可用內(nèi)存-該Worker已分配的內(nèi)存>=每個Executor分配的內(nèi)存大小* 2. 已經(jīng)分配給該應(yīng)用程序的Executor數(shù)量+已經(jīng)運行該應(yīng)用程序的Executor數(shù)量<該應(yīng)用程序Executor設(shè)置的最大值*/def canLaunchExecutor(pos: Int): Boolean = {val keepScheduling = coresToAssign >= minCoresPerExecutorval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor// If we allow multiple executors per worker, then we can always launch new executors.// Otherwise, if there is already an executor on this worker, just give it more cores.// 啟動新Executor條件是:該Worker節(jié)點允許啟動多個Executor或者在該Worker節(jié)點上沒有為該應(yīng)用程序分配Executorval launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0// 如果在該Worker節(jié)點上允許啟動多個Executor,那么該Executor節(jié)點滿足啟動條件就可以啟動新Executor,// 否則只能啟動一個Executor并盡可能的多分配CPU核數(shù)if (launchingNewExecutor) {val assignedMemory = assignedExecutors(pos) * memoryPerExecutorval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutorval underLimit = assignedExecutors.sum + app.executors.size < app.executorLimitkeepScheduling && enoughCores && enoughMemory && underLimit} else {// We're adding cores to an existing executor, so no need// to check memory and executor limitskeepScheduling && enoughCores}}// Keep launching executors until no more workers can accommodate any// more executors, or if we have reached this application's limitsvar freeWorkers = (0 until numUsable).filter(canLaunchExecutor)// 在可用的Worker節(jié)點中啟動Executor,在Worker節(jié)點每次分配資源時,分配給Executor所需的最少CPU核數(shù),該過程是通過多次輪詢進行,// 直到?jīng)]有Worker節(jié)點滿足啟動Executor條件活著已經(jīng)達到應(yīng)用程序限制。在分配過程中Worker節(jié)點可能多次分配,// 如果該Worker節(jié)點可以啟動多個Executor,則每次分配的時候啟動新的Executor并賦予資源;// 如果該Worker節(jié)點只能啟動一個Executor,則每次分配的時候把資源追加到該Executorwhile (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = true// 滿足 keepScheduling標(biāo)志為真(第一次分配或者集中運行)和該Worker節(jié)點滿足// 啟動Executor條件時,進行資源分配while (keepScheduling && canLaunchExecutor(pos)) {// 每次分配CPU核數(shù)為Executor所需的最少CPU核數(shù)coresToAssign -= minCoresPerExecutorassignedCores(pos) += minCoresPerExecutor// If we are launching one executor per worker, then every iteration assigns 1 core// to the executor. Otherwise, every iteration assigns cores to a new executor.// 如果設(shè)置每個Executor啟動CPU核數(shù),則該Worker只能為該應(yīng)用程序啟動1個Executor,// 否則在每次分配中啟動1個新的Executorif (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}// Spreading out an application means spreading out its executors across as// many workers as possible. If we are not spreading out, then we should keep// scheduling executors on this worker until we use all of its resources.// Otherwise, just move on to the next worker.// 如果是分散運行,則在某一Worker節(jié)點上做完資源分配立即移到下一個Worker節(jié)點,// 如果是集中運行,則持續(xù)在某一Worker節(jié)點上做資源分配,知道使用完該Worker節(jié)點所有資源。// 傳入的Worker節(jié)點列表是按照CPU核數(shù)倒序排列,在集中運行時,會盡可能少的使用Worker節(jié)點if (spreadOutApps) {keepScheduling = false}}}// 繼續(xù)從上一次分配完的可用Worker節(jié)點列表獲取滿足啟動Executor的Worker節(jié)點列表freeWorkers = freeWorkers.filter(canLaunchExecutor)}// 返回每個Worker節(jié)點分配的CPU核數(shù)assignedCores}tips:
關(guān)于branch-2.0,這個算法是在Spark 1.4.2的版本中優(yōu)化的。在以前,Worker節(jié)點中,只能為某應(yīng)用程序啟動一個Executor。輪詢分配資源時,Worker節(jié)點每次分配1個CPU核數(shù),這樣有可能會造成某個Worker節(jié)點最終分配CPU核數(shù)小于每個Executor所需CPU核數(shù),那么該節(jié)點將不啟動該Executor。例如:
在集群中有4個Worker節(jié)點,每個節(jié)點擁有16個CPU核數(shù),其中設(shè)置了spark.cores.max=48和spark.executor.cores=16,由于每個Worker只啟動一個Executor,按照每次分配一個CPU核數(shù),則每個Worker節(jié)點的Executor將分配到12個CPU核數(shù),每個由于12<16, 所以沒有Executor能啟動?,F(xiàn)在改進的算法是,如果設(shè)置了spark.executor.cores,那么每次分配的時候就分配這個指定的CPU核數(shù)
2. 作業(yè)以及調(diào)度階段之間
Spark應(yīng)用程序提交執(zhí)行時,會根據(jù)RDD依賴關(guān)系形成有向無環(huán)圖(DAG),然后交給DAGScheduler進行劃分作業(yè)和調(diào)度階段。這些作業(yè)之間沒有任何依賴關(guān)系,對于多個作業(yè)之間的調(diào)度,Spark目前提供了兩種不同的調(diào)度策略,一種是FIFO模式,這也是目前默認的模式;還有一種是FAIR模式,該模式的調(diào)度可以通過兩個參數(shù)來決定Job執(zhí)行的優(yōu)先模式,兩個參數(shù)分別是minShare(最小任務(wù)數(shù))和weight(任務(wù)的權(quán)重)。
2.1 創(chuàng)建調(diào)度池
在TaskSchedulerImpl.initialize方法中先創(chuàng)建根調(diào)度池rootPool對象,然后根據(jù)系統(tǒng)配置的調(diào)度模式創(chuàng)建調(diào)度創(chuàng)建器,針對兩種調(diào)度策略進行具體實例化FIFOSchedulableBuilder或者FairSchedulableBuilder,最終使用調(diào)度器創(chuàng)建buildPools方法根調(diào)度池rootPool下創(chuàng)建調(diào)度池。
def initialize(backend: SchedulerBackend) {this.backend = backend// temporarily set rootPool name to emptyrootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)case _ =>throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")}}schedulableBuilder.buildPools()}2. 調(diào)度池中加入調(diào)度內(nèi)容
在TaskSchedulerImpl.submitTask方法中,先把調(diào)度階段拆分為任務(wù)集,然后把這些任務(wù)集交給管理器TaskSetManager進行管理,最后把該任務(wù)集的管理器加入到調(diào)度池中,等待分配執(zhí)行。在FIFO中,由于創(chuàng)建的buildPools方法為空,所以在根調(diào)度器rootPool中并沒有下級調(diào)度池,而是直接包含了一組TaskSetManager;而在Fair調(diào)度器中,根調(diào)度池rootPool中包含了下級調(diào)度池Pool,在這些下級調(diào)度池Pool包含一組TaskSetManager。
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {// 為每一個taskSet創(chuàng)建一個taskSetManager// taskSetManager在后面負責(zé),TaskSet的任務(wù)執(zhí)行狀況的監(jiān)視和管理val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])// 把manager加入內(nèi)存緩存中stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}// 將該任務(wù)集的管理器加入到系統(tǒng)調(diào)度池中,由系統(tǒng)統(tǒng)一調(diào)配,該調(diào)度器屬于應(yīng)用級別// 支持FIFO和FAIR兩種,默認FIFOschedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)3. 提供已排序的任務(wù)集管理器
在TaskSchedulerImpl.resourceOffers方法中進行資源分配時,會從根調(diào)度池rootPool獲取以及排序的任務(wù)管理器,排序算法由兩種調(diào)度策略提供。
// 獲取按照調(diào)度策略排序好的TaskSetManager// 從rootPool中取出排序了的TaskSetManager// 在創(chuàng)建完TaskScheduler StandaloneSchedulerBackend之后,會執(zhí)行initialize()方法,其實會創(chuàng)建一個調(diào)度池// 這里就是所有提交的TaskSetManager,首先會放入這個調(diào)度池中,然后再執(zhí)行task分配算法的時候,會從這個調(diào)度池中,取出排好隊的TaskSetManagerval sortedTaskSets = rootPool.getSortedTaskSetQueue在FIFO調(diào)度策略中,由于根調(diào)度池rootPool直接包含了多個作業(yè)的任務(wù)管理器,在比較時,首先需要比較作業(yè)的優(yōu)先級(根據(jù)作業(yè)編號判斷,作業(yè)編號越小優(yōu)先級越高),如果是同一個作業(yè),則會比較調(diào)度階段優(yōu)先級(根據(jù)調(diào)度階段編號判斷,調(diào)度階段編號越小優(yōu)先級越高)
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {//獲取作業(yè)優(yōu)先級,實際上是作業(yè)編號val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)//如果是同一個作業(yè),再比較調(diào)度階段優(yōu)先級if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}res < 0} }在FAIR調(diào)度策略中包含了兩層調(diào)度,第一層調(diào)度池rootPool中包含了下級調(diào)度池Pool,第二層為下級調(diào)度池Pool包含多個TaskSetManager。具體配置參見$SPARK_HOME/conf/fairscheduler.xml文件,在該文件中包含多個下級調(diào)度池Pool配置項,其中minShare(最小任務(wù)數(shù))和weight(任務(wù)的權(quán)重)用來設(shè)置第一級調(diào)度算法,而SchedulingMode參數(shù)是用來設(shè)置第二層調(diào)度算法。
在FAIR算法中,先獲取兩個調(diào)度器的饑餓程度,饑餓程度為正在運行的任務(wù)是否小于最小任務(wù),如果是,則表示該調(diào)度處于饑餓程度。獲取饑餓程度后進行如下比較:
- 如果某個調(diào)度處于解狀態(tài),另一個處于非饑餓狀態(tài),則先滿足處于饑餓狀態(tài)的調(diào)度
- 如果兩個調(diào)度都處于饑餓狀態(tài),則比較資源比,先們在這資源比較少的調(diào)度。
- 如果兩個調(diào)度處于非饑餓狀態(tài),則比較權(quán)重比,先滿足權(quán)重比少的調(diào)度。
- 以上情況均相同的情況,根據(jù)調(diào)度的名稱進行排序。
3. 任務(wù)之間
我們先了解數(shù)據(jù)本地型和延遲兩個概念。
3.1 數(shù)據(jù)本地型
數(shù)據(jù)的計算盡可能在數(shù)據(jù)所在的節(jié)點上進行,這樣可以減少數(shù)據(jù)在網(wǎng)絡(luò)上的傳輸,畢竟移動計算比移動數(shù)據(jù)代價來得小些。進一步看,數(shù)據(jù)如果在運行的節(jié)點的內(nèi)存中,就能夠進一步減少磁盤I/O的傳輸。在Spark中數(shù)據(jù)本地型優(yōu)先級從高到低為PROCESS_LOCAL>NODE_LOCAL>NO_PREF>PACK_LOCAL>ANY,即最好是任務(wù)運行的節(jié)點內(nèi)存中存在數(shù)據(jù),次好是同一個Node(同一個機器)上,再次是同機架,最后是同一個為。其中任務(wù)數(shù)據(jù)本地型通過以下情況來確定:
- 如果任務(wù)處于作業(yè)的開始的調(diào)度階段,這些任務(wù)對于的RDD分區(qū)都有首選運行位置,該位置也是任務(wù)運行的首選位置,數(shù)據(jù)本地性為NODE_LOCAL.
- 如果任務(wù)處于非作業(yè)開頭的調(diào)度階段,可以根據(jù)父調(diào)度階段運行的位置得到任務(wù)的首選位置,這種情況下,如果Executor處于活躍狀態(tài),則數(shù)據(jù)本地性為PROCESS_LOCAL;如果Executor不處于活動狀態(tài),但存在父調(diào)度階段運行結(jié)果,則數(shù)據(jù)本地性為NODE_LOCAL
- 如果沒有首選位置,則數(shù)據(jù)本地型為NO_PREF
3.2 延遲執(zhí)行
在任務(wù)分配運行節(jié)點時先判斷最佳運行節(jié)點是否空閑,如果該節(jié)點沒有足夠的資源運行該任務(wù),在這種情況下任務(wù)會等待一定的時間;如果在等待時間內(nèi)該節(jié)點釋放足夠的資源,則任務(wù)在該節(jié)點運行,如果還不足會找出次佳的節(jié)點運行。通過這樣的方式進行能夠讓任務(wù)運行在更高級別的數(shù)據(jù)本地性節(jié)點,從而減少磁盤I/O和網(wǎng)絡(luò)傳輸。一般來說PROCESS_LOCAL和NODE_LOCAL兩個數(shù)據(jù)本地性級別進行等待,系統(tǒng)默認延遲時間為3s。
Spakr任務(wù)分配的原則就是讓任務(wù)運行在數(shù)據(jù)本地性優(yōu)先級別更高的節(jié)點上,甚至可以為此等待一定的時間。該任務(wù)分配有TaskSchedulerImpl.resourceOffers方法實現(xiàn),在該方法中先對應(yīng)于程序獲取的資源(Worker節(jié)點)進行混洗,以使任務(wù)能夠更加均衡的分散在集群中運行。然后對任務(wù)集對應(yīng)的TaskManager根據(jù)設(shè)置地調(diào)度算法進行排序,最后對TaskSetManager中的任務(wù)按照數(shù)據(jù)本地性分配任務(wù)運行節(jié)點,在分配時先根據(jù)任務(wù)集的本地性從優(yōu)先級高到低進行分配任務(wù),在分配的過程中動態(tài)地判斷集群中節(jié)點運行的請,通過延遲執(zhí)行等待數(shù)據(jù)本地性更高的節(jié)點運行。
《新程序員》:云原生和全面數(shù)字化實踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的Spark详解(六):Spark集群资源调度算法原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark详解(七):SparkCont
- 下一篇: Spark详解(八):Spark 容错以