提高spark任务稳定性1 - Blacklist 机制
文章目錄
- 背景
- 介紹
- 相關(guān)參數(shù)
- 實現(xiàn)細(xì)節(jié)
- TaskSetBlacklist
- BlacklistTracker
- 黑名單判斷的時機
- 如果所有的節(jié)點都被加入了黑名單?
- 結(jié)語
背景
一個 spark 應(yīng)用的產(chǎn)生過程: 獲取需求 -> 編寫spark代碼 -> 測試通過 -> 扔上平臺調(diào)度。
往往應(yīng)用會正常運行一段時間,突然有一天運行失敗,或是失敗了一次才運行成功。
從開發(fā)者的角度看,我的代碼沒問題,測試也通過了,之前一段都運行好好的,怎么突然就失敗了呢?為什么我重新調(diào)度又能正常運行了,是不是你們平臺不穩(wěn)定?是什么導(dǎo)致了上述問題?
分布式集群中,特別是高負(fù)載的情況下,就會引發(fā)很多意想不到的問題,例如:
為什么 task 失敗后還會被 schedular 重新調(diào)度在原來的 node 或是 executor上?
數(shù)據(jù)本地性(spark會優(yōu)先把task調(diào)度在有相應(yīng)數(shù)據(jù)的節(jié)點上)導(dǎo)致。
是否只能聽天由命,每次失敗后重新調(diào)度? 如果任務(wù)有SLA的限制怎么辦?
介紹
spark 2.1 中增加了 blacklist 機制,當(dāng)前(2.3.0)還是試驗性質(zhì)的功能,黑名單機制允許你設(shè)置 task 在 executor / node 上失敗次數(shù)的閾值, 從而避免了一路走到黑的情況出現(xiàn)。 ?
相關(guān)參數(shù)
| spark.blacklist.enabled | false | 是否開啟黑名單機制 |
| spark.blacklist.timeout | 1h | 對于被加入 application 黑名單的 executor/節(jié)點 ,多長時間后無條件的移出黑名單以運行新任務(wù) |
| spark.blacklist.task.maxTaskAttemptsPerExecutor | 1 | 對于同一個 task 在某個 executor 中的失敗重試閾值。達(dá)到閾值后,在執(zhí)行這個 task 時,該 executor 將被加入黑名單 |
| spark.blacklist.task.maxTaskAttemptsPerNode | 2 | 對于同一個 task 在某個節(jié)點上的失敗重試閾值。達(dá)到閾值后,在執(zhí)行這個 task 時,該節(jié)點將被加入黑名單 |
| spark.blacklist.stage.maxFailedTasksPerExecutor | 2 | 一個 stage 中,不同的 task 在同一個 executor 的失敗閾值。達(dá)到閾值后,在執(zhí)行這個 stage 時該 executor 將會被加入黑名單 |
| spark.blacklist.stage.maxFailedExecutorsPerNode | 2 | 一個 stage 中,不同的 executor 加入黑名單的閾值。達(dá)到閾值后,在執(zhí)行這個 stage 時該節(jié)點將會被加入黑名單 |
| spark.blacklist.application.maxFailedTasksPerExecutor | 2 | 在同一個 executor 中,不同的 task的失敗閾值 。達(dá)到閾值后,在整個 appliction 運行期間,該 executor 都會被加入黑名單,加入時間超過 spark.blacklist.timeout 后,自動從黑名單中移除。值得注意的是,如果開啟了 dynamic allocation,這些 executor 可能會由于空閑時間過長被回收。 |
| spark.blacklist.application.maxFailedExecutorsPerNode | 2 | 在一個節(jié)點中,不同 executor 加入 application 黑名單的閾值。達(dá)到這個閾值后,該節(jié)點會進(jìn)入 application 黑名單,加入時間超過 spark.blacklist.timeout 后,自動從黑名單中移除。值得注意的是,如果開啟了 dynamic allocation,該節(jié)點上的 executor 可能會由于空閑時間過長被回收。 |
| spark.blacklist.killBlacklistedExecutors | false | 如果開啟該配置,spark 會自動關(guān)閉并重啟加入黑名單的 executor,如果整個節(jié)點都加入了黑名單,則該節(jié)點上的所有 executor 都會被關(guān)閉。 |
| spark.blacklist.application.fetchFailure.enabled | false | 如果開啟該配置,當(dāng)發(fā)生 fetch failure時,立即將該 executor 加入到黑名單。要是開啟了 external shuffle service,整個節(jié)點都會被加入黑名單。 |
實現(xiàn)細(xì)節(jié)
因為是實驗性質(zhì)的功能,所以代碼可能會隨時變動。
只貼出部分核心代碼。
TaskSetBlacklist
黑名單賬本:
//k:executor v:該executor上每個 task 的失敗情況(task失敗的次數(shù)和最近一次失敗時間) val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()//k:節(jié)點,v:該節(jié)點上有失敗任務(wù)的 executor private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]() //k:節(jié)點, v:該節(jié)點上加入黑名單的 taskId private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()//加入黑名單的 executor private val blacklistedExecs = new HashSet[String]() //加入黑名單的 node private val blacklistedNodes = new HashSet[String]() // 判斷 executor 是否加入了給定 task 的黑名單 def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {execToFailures.get(executorId).exists { execFailures =>execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR} }//判斷 node 是否加入了給定 task 的黑名單 def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index)) }當(dāng)有task失敗時,TaskSetManager 會調(diào)用更新黑名單的操作:
閾值參數(shù):
- MAX_TASK_ATTEMPTS_PER_EXECUTOR:每個 executor 上最大的任務(wù)重試次數(shù)
- MAX_TASK_ATTEMPTS_PER_NODE:每個 node 上最大的任務(wù)重試次數(shù)
- MAX_FAILURES_PER_EXEC_STAGE:一個 stage 中,每個executor 上最多任務(wù)失敗次數(shù)
- MAX_FAILED_EXEC_PER_NODE_STAGE:一個 stage 中,每個節(jié)點上 executor 的最多失敗次數(shù)
BlacklistTracker
實現(xiàn)原理和TaskSetBlacklist,下文就不再貼出黑名單判斷,黑名單對象等代碼。
與 TaskSetBlacklist 不同的是,在一個 taskSet 完全成功之前,BlacklistTracker 無法獲取到任務(wù)失敗的情況。
核心代碼:
當(dāng)一個 taskSet 執(zhí)行成功時會調(diào)用以下代碼,流程如下:
- 將 executor 及其對應(yīng)的到期時間加入到 application 的黑名單中,從executor失敗列表中移除該 executor,并更新 nextExpiryTime,用于下次啟動任務(wù)的時候判斷黑名單是否已到期
- 根據(jù) spark.blacklist.killBlacklistedExecutors 判斷是否要殺死 executor
- 更新 node 上的 executor 失敗次數(shù)
- 如果一個節(jié)點上的 executor 的失敗次數(shù)大于等于閾值并且不在黑名單中
- 將 node 及其對應(yīng)的到期時間加入到 application 的黑名單中
- 如果開啟了 spark.blacklist.killBlacklistedExecutors,則將此 node 上的所有 executor 殺死
- BLACKLIST_TIMEOUT_MILLIS:加入黑名單后的過期時間
- MAX_FAILURES_PER_EXEC:每個executor上最多的task失敗次數(shù)
- MAX_FAILED_EXEC_PER_NODE: 每個節(jié)點上加入黑名單的executor的最大數(shù)量
黑名單判斷的時機
一個 stage 提交的調(diào)用鏈:
TaskSchedulerImpl.submitTasks ->
CoarseGrainedSchedulerBackend.reviveOffers ->
CoarseGrainedSchedulerBackend.makeOffers ->
TaskSchedulerImpl.resourceOffers ->
TaskSchedulerImpl.resourceOfferSingleTaskSet ->
CoarseGrainedSchedulerBackend.launchTasks
appliaction 級別的黑名單在 TaskSchedulerImpl.resourceOffers 中完成判斷,stage/task 級別的黑名單在 TaskSchedulerImpl.resourceOfferSingleTaskSet 中完成判斷。
如果所有的節(jié)點都被加入了黑名單?
如果將task的重試次數(shù)設(shè)置的比較高,有可能會出現(xiàn)這個問題,這個時候。將會中斷這個 stage 的執(zhí)行
TaskSchedulerImpl.resourceOffers
if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors) }結(jié)語
簡單的來說,對于一個 application ,提供了三種級別的黑名單可以用于 executor/node: task blacklist -> stage blacklist -> application blacklist
通過這些黑名單的設(shè)置可以避免由于 task 反復(fù)調(diào)度在有問題的 executor/node (壞盤,磁盤滿了,shuffle fetch 失敗,環(huán)境錯誤等)上,進(jìn)而導(dǎo)致整個 Application 運行失敗的情況。
tip: BlacklistTracker.updateBlacklistForFetchFailure 在當(dāng)前版本(2.3.0)存在BUG SPARK-24021,將在 2.3.1 進(jìn)行修復(fù)。如果打開了 spark.blacklist.application.fetchFailure.enabled 配置將會受到影響。
總結(jié)
以上是生活随笔為你收集整理的提高spark任务稳定性1 - Blacklist 机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 120亿光年外发现大量水:为地球储量14
- 下一篇: 基带信号与载波信号