日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

深入理解spark两种调度模式:FIFO,FAIR模式

發布時間:2025/1/21 98 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入理解spark两种调度模式:FIFO,FAIR模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

深入理解spark-兩種調度模式FIFO,FAIR模式

前面我們應知道了一個任務提交會由DAG拆分為job,stage,task,最后提交給TaskScheduler,在提交taskscheduler中會根據master初始化taskscheduler和schedulerbackend兩個類,并且初始化一個調度池;

1.調度池比較

根據mode初始化調度池pool

def initialize(backend: SchedulerBackend) {this.backend = backend// temporarily set rootPool name to empty 這里可以看到調度池初始化最小設置為0rootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)}}schedulableBuilder.buildPools()}

FIFO模式

這個會根據spark.scheduler.mode 來設置FIFO or FAIR,默認的是FIFO模式;

FIFO模式什么都不做,實現默認的schedulerableBUilder方法,建立的調度池也為空,addTasksetmaneger也是調用默認的;

可以簡單的理解為,默認模式FIFO什么也不做。。

FAIR模式

fair模式則重寫了buildpools的方法,讀取默認路徑 $SPARK_HOME/conf/fairscheduler.xml文件,也可以通過參數spark.scheduler.allocation.file設置用戶自定義配置文件。

文件中配置的是

poolname 線程池名

schedulermode 調度模式(FIFO,FAIR僅有兩種)

minshare 初始大小的線程核數

wight 調度池的權重

override def buildPools() {var is: Option[InputStream] = Nonetry {is = Option {schedulerAllocFile.map { f =>new FileInputStream(f)}.getOrElse {Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)}}is.foreach { i => buildFairSchedulerPool(i) }} finally {is.foreach(_.close())}// finally create "default" poolbuildDefaultPool()}

同時也重寫了addtaskmanager方法

override def addTaskSetManager(manager: Schedulable, properties: Properties) {var poolName = DEFAULT_POOL_NAMEvar parentPool = rootPool.getSchedulableByName(poolName)if (properties != null) {poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)parentPool = rootPool.getSchedulableByName(poolName)if (parentPool == null) {// we will create a new pool that user has configured in app// instead of being defined in xml fileparentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)rootPool.addSchedulable(parentPool)logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))}}parentPool.addSchedulable(manager)logInfo("Added task set " + manager.name + " tasks to pool " + poolName)}

這一段邏輯中是把配置文件中的pool,或者default pool放入rootPool中,然后把TaskSetManager存入rootPool對應的子pool;

2.調度算法比較

除了初始化的調度池不一致外,其實現的調度算法也不一致

實現的調度池Pool,在內部實現方法中也會根據mode不一致來實現調度的不同

var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {schedulingMode match {case SchedulingMode.FAIR =>new FairSchedulingAlgorithm()case SchedulingMode.FIFO =>new FIFOSchedulingAlgorithm()}}

FIFO模式

FIFO模式的調度方式很容易理解,比較stageID,誰小誰先執行;

這也很好理解,stageID小的任務一般來說是遞歸的最底層,是最先提交給調度池的;

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}if (res < 0) {true} else {false}} }

FAIR模式

fair模式來說的話,稍微復雜一點;

但是還是比較容易看懂,

1.先比較兩個stage的 runningtask使用的核數,其實也可以理解為task的數量,誰小誰的優先級高;

2.比較兩個stage的 runningtask 權重,誰的權重大誰先執行;

3.如果前面都一直,則比較名字了(字符串比較),誰大誰先執行;

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasksval s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDoubleval minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDoubleval taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDoublevar compare: Int = 0if (s1Needy && !s2Needy) {return true} else if (!s1Needy && s2Needy) {return false} else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}}

總結:雖然了解一下spark的調度模式,以前在執行中基本都沒啥用到,沒想到spark還有這樣的隱藏功能。

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的深入理解spark两种调度模式:FIFO,FAIR模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。