深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析
啟動AppClient
調用棧如下:
- StandaloneSchedulerBackend.start
- StandaloneAppClient.start
- StandaloneAppClient.ClientEndpoint.onStart
- StandaloneAppClient.registerWithMaster
- StandaloneAppClient.tryRegisterAllMasters
- StandaloneAppClient.registerWithMaster
- StandaloneAppClient.ClientEndpoint.onStart
- StandaloneAppClient.start
- Master.receive
- Master.createApplication
- Master.registerApplication
- StandaloneAppClient.ClientEndpoint.receive
StandaloneSchedulerBackend.start
在Standalone模式下,SparkContext中的backend是StandaloneSchedulerBackend。在StandaloneSchedulerBackend.start中可以看到:
***val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)val initialExecutorLimit =if (Utils.isDynamicAllocationEnabled(conf)) {Some(0)} else {None}val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)//創建AppClientclient = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)//啟動AppClientclient.start()***- 1
- 8
StandaloneAppClient.start
def start() {//生成了ClientEndpoint,于是調用其onStartendpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))}- 1
StandaloneAppClient.ClientEndpoint.onStart
調用registerWithMaster
override def onStart(): Unit = {try {registerWithMaster(1)} catch {case e: Exception =>logWarning("Failed to connect to master", e)markDisconnected()stop()}}StandaloneAppClient.registerWithMaster
private def registerWithMaster(nthRetry: Int) {//向所有的Master注冊當前App//一旦成功連接的一個master,其他將被取消registerMasterFutures.set(tryRegisterAllMasters())registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {override def run(): Unit = {if (registered.get) {registerMasterFutures.get.foreach(_.cancel(true))registerMasterThreadPool.shutdownNow()} //若達到最大嘗試次數,則標志死亡,默認為3else if (nthRetry >= REGISTRATION_RETRIES) {markDead("All masters are unresponsive! Giving up.")} else {registerMasterFutures.get.foreach(_.cancel(true))registerWithMaster(nthRetry + 1)}}}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))}- 1
StandaloneAppClient.tryRegisterAllMasters
給Master發送RegisterApplication信號:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {for (masterAddress <- masterRpcAddresses) yield {registerMasterThreadPool.submit(new Runnable {override def run(): Unit = try {if (registered.get) {return}logInfo("Connecting to master " + masterAddress.toSparkURL + "...")val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)masterRef.send(RegisterApplication(appDescription, self))} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}})}}- 1
- 17
Master.receive
Master.receive接收并處理RegisterApplication信號
case RegisterApplication(description, driver) =>// 若之前注冊過if (state == RecoveryState.STANDBY) {// 忽略} else {logInfo("Registering app " + description.name)//創建appval app = createApplication(description, driver)//注冊appregisterApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)//持久化persistenceEngine.addApplication(app)//回復RegisteredApplication信號driver.send(RegisteredApplication(app.id, self))//資源調度schedule()}- 1
- 13
讓我們深入來看下Master是如何注冊app的。
Master.createApplication
先創建app:
private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):ApplicationInfo = {val now = System.currentTimeMillis()val date = new Date(now)//根據日期生成appIdval appId = newApplicationId(date)//傳入 時間,appId, 描述信息, 日期, driver, 默認核數,//生成app信息new ApplicationInfo(now, appId, desc, date, driver, defaultCores)}- 1
Master.registerApplication
再注冊app:
private def registerApplication(app: ApplicationInfo): Unit = {//若已有這個app地址,//則返回val appAddress = app.driver.addressif (addressToApp.contains(appAddress)) {logInfo("Attempted to re-register application at same address: " + appAddress)return}//向 applicationMetricsSystem 注冊appSourceapplicationMetricsSystem.registerSource(app.appSource)//將app加入到 集合//HashSet[ApplicationInfo]apps += app//更新 id到App //HashMap[String, ApplicationInfo]idToApp(app.id) = app//更新 endpoint到App// HashMap[RpcEndpointRef, ApplicationInfo]endpointToApp(app.driver) = app//更新 address到App// HashMap[RpcAddress, ApplicationInfo]addressToApp(appAddress) = app// 加入到等待數組中//ArrayBuffer[ApplicationInfo]waitingApps += appif (reverseProxy) {webUi.addProxyTargets(app.id, app.desc.appUiUrl)}}- 1
StandaloneAppClient.ClientEndpoint.receive
case RegisteredApplication(appId_, masterRef) =>//這里的代碼有兩個缺陷://1. 一個Master可能接收到多個注冊請求,// 并且回復多個RegisteredApplication信號,//這會導致網絡不穩定。//2.若master正在變化,//則會接收到多個RegisteredApplication信號//設置appIdappId.set(appId_)//編輯已經注冊registered.set(true)//創建master信息master = Some(masterRef)//綁定監聽listener.connected(appId.get)- 11
邏輯資源調度
我們可以看到在上一章,Master.receive接收并處理RegisterApplication信號時的最后一行代碼:
//資源調度schedule()- 1
下面,我們就來講講資源調度。
調用棧如下:
- Master.schedule
- Master.startExecutorsOnWorkers
- Master.scheduleExecutorsOnWorkers
- Master.allocateWorkerResourceToExecutors
- Master.startExecutorsOnWorkers
Master.schedule
該方法主要來在等待的app之間調度資源。每次有新的app加入或者可用資源改變的時候,這個方法都會被調用:
private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// 得到活的Worker,// 并打亂它們val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))// worker數量val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0//為driver分配資源,//該調度策略為FIFO的策略,//先來的driver會先滿足其資源所需的條件for (driver <- waitingDrivers.toList) { var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}//啟動worker上的executorstartExecutorsOnWorkers()}- 1
- 2
- 2
Master.startExecutorsOnWorkers
接下來我們來看下executor的啟動:
private def startExecutorsOnWorkers(): Unit = {// 這里還是使用的FIFO的調度方式for (app <- waitingApps if app.coresLeft > 0) {val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor// 過濾掉資源不夠啟動executor的worker// 并按資源逆序排序val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor.getOrElse(1)).sortBy(_.coresFree).reverse//調度worker上的executor,//確定在每個worker上給這個app分配多少核val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)//分配for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))}}}Master.scheduleExecutorsOnWorkers
接下來我們就來講講核心的worker上的executor資源調度。在將現在的Spark代碼之前,我們看看在Spark1.4之前,這部分邏輯是如何實現的:
***val numUsable = usableWorkers.length// 用來記錄每個worker已經分配的核數val assigned = new Array[Int](numUsable) var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)var pos = 0while (toAssign > 0) {//遍歷worker,//若當前worker還存在資源,//則分配掉1個核。//直到workers的資源全都被分配掉,//或者是app所需要的資源被滿足。if (usableWorkers(pos).coresFree - assigned(pos) > 0) {toAssign -= 1assigned(pos) += 1}pos = (pos + 1) % numUsable} ***- 1
在Spark1.4的時候,這段代碼被修改了。我們來想一下,以上代碼有什么問題?
問題就在于,core是一個一個的被分配的。設想,一個集群中有4 worker,每個worker有16個core。用戶想啟動3個executor,且每個executor擁有16個core。于是,他會這樣配置參數:
spark.cores.max = 48 spark.executor.cores = 16顯然,我們集群的資源是能滿足用戶的需求的。但如果一次只能分配一個core,那最終的結果是每個worker上都分配了12個core。由于12 < 16, 所有沒有一個executor能夠啟動。
下面,我們回過頭來看現在的源碼中是如何實現這部分邏輯的:
private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {val coresPerExecutor = app.desc.coresPerExecutorval minCoresPerExecutor = coresPerExecutor.getOrElse(1)val oneExecutorPerWorker = coresPerExecutor.isEmptyval memoryPerExecutor = app.desc.memoryPerExecutorMBval numUsable = usableWorkers.length// 用來記錄每個worker已經分配的核數val assignedCores = new Array[Int](numUsable) // 用來記錄每個worker已經分配的executor數val assignedExecutors = new Array[Int](numUsable)// 剩余總共資源 var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//判斷是否能啟動Executordef canLaunchExecutor(pos: Int): Boolean = {//先省略}//過濾去能啟動executor的Workervar freeWorkers = (0 until numUsable).filter(canLaunchExecutor)//調度資源,//直到worker上的executor被分配完while (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = truewhile (keepScheduling && canLaunchExecutor(pos)) {// minCoresPerExecutor 是用戶設置的 spark.executor.corescoresToAssign -= minCoresPerExecutorassignedCores(pos) += minCoresPerExecutor// 若用戶沒有設置 spark.executor.cores// 則oneExecutorPerWorker就為True// 也就是說,assignedCores中的core都被一個executor使用// 若用戶設置了spark.executor.cores,// 則該Worker的assignedExecutors會加1if (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}//資源分配算法有兩種:// 1. 盡量打散,將一個app盡可能的分配到不同的節點上,// 這有利于充分利用集群的資源,// 在這種情況下,spreadOutApps設為True,// 于是,該worker分配好了一個executor之后就退出循環// 輪詢到下一個worker// 2. 盡量集中,將一個app盡可能的分配到同一個的節點上,// 這適合cpu密集型而內存占用比較少的app// 在這種情況下,spreadOutApps設為False,// 于是,繼續下一輪的循環// 在該Worker上分配executorif (spreadOutApps) {keepScheduling = false}}}freeWorkers = freeWorkers.filter(canLaunchExecutor)}assignedCores}- 1
- 4
接下來看下該函數的內部函數canLaunchExecutor:
def canLaunchExecutor(pos: Int): Boolean = {// 條件1 :若集群剩余core >= spark.executor.coresval keepScheduling = coresToAssign >= minCoresPerExecutor// 條件2: 若該Worker上的剩余core >= spark.executor.coresval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor// 條件3: 若設置了spark.executor.cores // 或者 該Worker還未分配executorval launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0if (launchingNewExecutor) {val assignedMemory = assignedExecutors(pos) * memoryPerExecutor// 條件4:若該Worker上的剩余內存 >= spark.executor.memoryval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor// 條件5: 若分配了該executor后,// 總共分配的core數量 <= spark.cores.maxval underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit//若滿足 條件3,//且滿足 條件1,條件2,條件4,條件5//則返回TruekeepScheduling && enoughCores && enoughMemory && underLimit} else {//若不滿足 條件3,//即一個worker只有一個executor//且滿足 條件1,條件2//也返回True。// 返回后,不會增加 assignedExecutorskeepScheduling && enoughCores}}- 1
- 2
通過以上源碼,我們可以清楚看到,Spark1.4以后新的邏輯實現其實就是將分配單位從原來的一個core,變為了一個executor(即spark.executor.cores)。而若一個worker上只有一個executor(即沒有設置spark.executor.cores),那么就按照原來的邏輯實現。
值得我注意的是:
//直到worker上的executor被分配完while (freeWorkers.nonEmpty)一個app會盡可能的使用掉集群的所有資源,所以設置spark.cores.max參數是非常有必要的!
Master.allocateWorkerResourceToExecutors
現在我們回到上述提到的Master.startExecutorsOnWorkers中,深入allocateWorkerResourceToExecutors:
private def allocateWorkerResourceToExecutors(app: ApplicationInfo,assignedCores: Int,coresPerExecutor: Option[Int],worker: WorkerInfo): Unit = {// 該work上的executor數量// 若沒設置 spark.executor.cores// 則為1val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)// 分配給一個executor的core數量// 若沒設置 spark.executor.cores// 則為該worker上所分配的所有core是數量val coresToAssign = coresPerExecutor.getOrElse(assignedCores)for (i <- 1 to numExecutors) {//創建該executor信息//并把它加入到app信息中//并返回executor信息val exec = app.addExecutor(worker, coresToAssign)//啟動launchExecutor(worker, exec)app.state = ApplicationState.RUNNING}}- 1
- 9
- 10
要注意的是
app.state = ApplicationState.RUNNING這句代碼并不是將該app從waitingApp隊列中去除。若在該次資源調度中該app并沒有啟動足夠的executor,等到集群資源變化時,會再次資源調度,在waitingApp中遍歷到該app,其coresLeft > 0。
for (app <- waitingApps if app.coresLeft > 0)我們這里做一個實驗:
- 我們的實驗集群是4*8核的集群:
- 第1個app,我們申請4個executor,該executor為4個core:
可以看到集群資源:
app1的executor:
- 第2個app,我們申請4個executor,該executor為6個core:
可以看到集群資源:
app2的executor:
我們可以看到,Spark只為app2分配了3個executor。
- 當我們把app1退出
會發現集群資源狀態:
app2的executor:
會發現新增加了一個“ worker-20170102151129”的executor。
其實,只要集群中的app沒結束,它們都會在waitingApps中,當該app結束時,才會將這個app從waitingApps中移除
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) { ***waitingApps -= app *** }物理資源調度與啟動Executor
接下來,我們就來講邏輯上資源調度完后,該如何物理上資源調度,即啟動Executor。
調用棧如下:
- Master.launchExecutor
- Worker.receive
- ExecutorRunner.start
- ExecutorRunner.fetchAndRunExecutor
- ExecutorRunner.start
- CoarseGrainedExecutorBackend.main
- CoarseGrainedExecutorBackend.run
- CoarseGrainedExecutorBackend.onStart
- CoarseGrainedExecutorBackend.run
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
- CoarseGrainedExecutorBackend.receive
Master.launchExecutor
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)//在worker信息中加入executor信息worker.addExecutor(exec)//給worker發送LaunchExecutor信號worker.endpoint.send(LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))//給driver發送ExecutorAdded信號exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))}Worker.receive
worker接收到LaunchExecutor信號后處理:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>if (masterUrl != activeMasterUrl) {logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")} else {try {logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))// 創建executor的工作目錄// shuffle持久化結果會存在這個目錄下// 節點應每塊磁盤大小盡可能相同// 并在配置中在每塊磁盤上都設置SPARK_WORKER_DIR,// 以增加IO性能val executorDir = new File(workDir, appId + "/" + execId)if (!executorDir.mkdirs()) {throw new IOException("Failed to create directory " + executorDir)}// 為app創建本地dir// app完成后,此目錄會被刪除val appLocalDirs = appDirectories.getOrElse(appId,Utils.getOrCreateLocalRootDirs(conf).map { dir =>val appDir = Utils.createDirectory(dir, namePrefix = "executor")Utils.chmod700(appDir)appDir.getAbsolutePath()}.toSeq)appDirectories(appId) = appLocalDirs//創建ExecutorRunnerval manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,workerUri,conf,appLocalDirs, ExecutorState.RUNNING)executors(appId + "/" + execId) = manager//啟動ExecutorRunnermanager.start()coresUsed += cores_memoryUsed += memory_// 向Master發送ExecutorStateChanged信號sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))} catch {case e: Exception =>logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)if (executors.contains(appId + "/" + execId)) {executors(appId + "/" + execId).kill()executors -= appId + "/" + execId}sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,Some(e.toString), None))}}- 1
- 5
ExecutorRunner.start
接下來我們深入看下ExecutorRunner
private[worker] def start() {//創建worker線程workerThread = new Thread("ExecutorRunner for " + fullId) {override def run() { fetchAndRunExecutor() }}//啟動worker線程workerThread.start()// 創建Shutdownhook線程 // 用于worker關閉時,殺掉executorshutdownHook = ShutdownHookManager.addShutdownHook { () =>if (state == ExecutorState.RUNNING) {state = ExecutorState.FAILED}killProcess(Some("Worker shutting down")) }}ExecutorRunner.fetchAndRunExecutor
workerThread執行主要是調用fetchAndRunExecutor,下面我們來看下這個方法:
private def fetchAndRunExecutor() {try {// 創建進程builderval builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),memory, sparkHome.getAbsolutePath, substituteVariables)val command = builder.command()val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")logInfo(s"Launch command: $formattedCommand")//創建進程builder執行目錄builder.directory(executorDir)//為進程builder設置環境變量builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")val baseUrl =if (conf.getBoolean("spark.ui.reverseProxy", false)) {s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="} else {s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="}builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")//啟動進程builder,創建進程process = builder.start()val header = "Spark Executor Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)// 重定向它的stdout和stderr到文件中val stdout = new File(executorDir, "stdout")stdoutAppender = FileAppender(process.getInputStream, stdout, conf)val stderr = new File(executorDir, "stderr")Files.write(header, stderr, StandardCharsets.UTF_8)stderrAppender = FileAppender(process.getErrorStream, stderr, conf)// 等待進程退出。// 當driver通知該進程退出// executor會退出并返回0或者非0的exitCodeval exitCode = process.waitFor()state = ExecutorState.EXITEDval message = "Command exited with code " + exitCode// 給Worker發送ExecutorStateChanged信號worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))} catch {case interrupted: InterruptedException =>logInfo("Runner thread for executor " + fullId + " interrupted")state = ExecutorState.KILLEDkillProcess(None)case e: Exception =>logError("Error running executor", e)state = ExecutorState.FAILEDkillProcess(Some(e.toString))}} }- 1
- 1
CoarseGrainedExecutorBackend.main
builder start的是CoarseGrainedExecutorBackend實例進程,我們看下它的主函數:
def main(args: Array[String]) {var driverUrl: String = nullvar executorId: String = nullvar hostname: String = nullvar cores: Int = 0var appId: String = nullvar workerUrl: Option[String] = Noneval userClassPath = new mutable.ListBuffer[URL]()// 設置參數var argv = args.toListwhile (!argv.isEmpty) {argv match {case ("--driver-url") :: value :: tail =>driverUrl = valueargv = tailcase ("--executor-id") :: value :: tail =>executorId = valueargv = tailcase ("--hostname") :: value :: tail =>hostname = valueargv = tailcase ("--cores") :: value :: tail =>cores = value.toIntargv = tailcase ("--app-id") :: value :: tail =>appId = valueargv = tailcase ("--worker-url") :: value :: tail =>workerUrl = Some(value)argv = tailcase ("--user-class-path") :: value :: tail =>userClassPath += new URL(value)argv = tailcase Nil =>case tail =>System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")printUsageAndExit()}}if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||appId == null) {printUsageAndExit()}//調用run方法run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)System.exit(0)}- 1
- 29
CoarseGrainedExecutorBackend.run
private def run(driverUrl: String,executorId: String,hostname: String,cores: Int,appId: String,workerUrl: Option[String],userClassPath: Seq[URL]) {Utils.initDaemon(log)SparkHadoopUtil.get.runAsSparkUser { () =>Utils.checkHost(hostname)val executorConf = new SparkConfval port = executorConf.getInt("spark.executor.port", 0)val fetcher = RpcEnv.create("driverPropsFetcher",hostname,port,executorConf,new SecurityManager(executorConf),clientMode = true)val driver = fetcher.setupEndpointRefByURI(driverUrl)// 給driver發送RetrieveSparkAppConfig信號,// 并根據返回的信息創建屬性val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))fetcher.shutdown()// 根據這些屬性來創建SparkEnvval driverConf = new SparkConf()for ((key, value) <- props) {if (SparkConf.isExecutorStartupConf(key)) {driverConf.setIfMissing(key, value)} else {driverConf.set(key, value)}}if (driverConf.contains("spark.yarn.credentials.file")) {logInfo("Will periodically update credentials from: " +driverConf.get("spark.yarn.credentials.file"))SparkHadoopUtil.get.startCredentialUpdater(driverConf)}val env = SparkEnv.createExecutorEnv(driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)// 創建CoarseGrainedExecutorBackend Endpointenv.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))// 創建WorkerWatcher Endpoint// 用來給worker發送心跳,// 告訴worker 這個進程還活著workerUrl.foreach { url =>env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))}env.rpcEnv.awaitTermination()SparkHadoopUtil.get.stopCredentialUpdater()}}- 46
CoarseGrainedExecutorBackend.onStart
new CoarseGrainedExecutorBackend 會調用CoarseGrainedExecutorBackend.onStart:
override def onStart() {logInfo("Connecting to driver: " + driverUrl)rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>//向driver端發送RegisterExecutor信號driver = Some(ref)ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))}(ThreadUtils.sameThread).onComplete {case Success(msg) =>case Failure(e) =>exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)}(ThreadUtils.sameThread)}- 1
CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)} else {// 設置executor信息val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorRef.address, hostname,cores, cores, logUrls)CoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}//向executor端發送RegisteredExecutor信號executorRef.send(RegisteredExecutor)context.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))makeOffers()}- 1
makeOffers()所做的邏輯,在《深入理解Spark 2.1 Core (三):任務調度器的原理與源碼分析 》里已經講解過。主要是調度任務,并想executor發送任務。
CoarseGrainedExecutorBackend.receive
CoarseGrainedExecutorBackend接收到來自driver的RegisteredExecutor信號后:
case RegisteredExecutor =>logInfo("Successfully registered with driver")try {//創建executorexecutor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}至此,Executor就成功的啟動了!
總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (