日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Spark详解(四):Spark组件以及消息通信原理

發布時間:2025/4/16 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark详解(四):Spark组件以及消息通信原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. Spark核心基本概念

  • Application(應用程序):指用戶編寫的Spark應用程序,包含驅動程序(Driver)和分布在集群中多個節點之上的Executor代碼,在執行過程中由一個或多個作業組成。
  • Driver(驅動程序):Spark中的Driver即運行上述Application的main函數并且創建SparkContext,其中創建SparkContext的目的是為了準備Spark應用程序的運行環境。在Spark中由SparkContext負責與ClusterManager通信,進行資源申請、任務的分配和監控等;當Executor部分運行完成時候,Driver負責將SparkContext關閉。通常用SparkContext 代表Driver。
  • Cluster Manager(集群資源管理器):指在集群上獲取資源的外部服務,目前有以下幾種。
  • Standalane:Spark原生的資源管理,是由Master負責資源的管理。
  • Hadoop Yarn:由Yarn中的ResourceManager負責資源的管理。
  • Mesos:由Mesos中的Mesos Master負責資源管理
  • Worker(工作節點):集群中任何可以運行Application代碼的節點,類似于YARN
    的NodeManager
  • Master(總控進程):Spark Standalane運行模型下的主節點,負責資源管理和分配資源來運行Spark Application。
  • Exeuctor(執行進程):Application運行Worker節點上的一個進程,該進程負責運行Task,并負責將數據存在內存或者磁盤上,每個Application都有各自一批的Executor。在Spark On Yarn 模式下,其進程名字為CoarseGraniedExecutorBackend,類似于Hadoop MapReduce中的Yarn Child。

2. 消息通信原理

2.1 Spark消息通信架構

在Spark中定義了通信框架接口,這些接口實現中調用Netty的具體方法(在Spark 2.0版本之前使用的是Akka)。在框架中以RpcEndPoint和RpcEndPointRef實現了Actor和ActorRef的相關動作,其中RpcEndPointRef是RpcEndPoint的引用,在消息通信中消息發送方持有引用RpcEndPointRef,它們之間的關系如下圖所示:

通信框架使用了工廠設計模式實現,這種設計方式實現了對Netty的解耦,能夠根據需要引入其他的消息通信工具。

具體的實現步驟如下:首先定義了RpcEnv和RpcEnvFactory兩個抽象類,在RpcEnv中定義了RPC通信框架的啟動、停止和關閉等抽象方法,在RpcEnvFactory中定義了創建抽象方法。然后在NettyRpcEnv和NettoyRpcEnvFactory類使用Netty對繼承的方法進行了實現。

在各個模塊中的實現,如Master和Worker等,會先使用RpcEnv的靜態方法創建RpcEnv實例,然后實例化Master,由于Master繼承與ThreadSafeRpcEndPoin,創建的Master實例是一個線程安全的終端點,接著調用RpcEnv的啟動終端點方法,把Master的終端點和其對應的引用注冊到RpcEnv中。在消息通信中,其他對象只需要獲取到了Master終端點的引用,就能發送消息給Master進行通信。下面是Master.scala中的startRpcEnvAndEndPoint方法:

/*** Start the Master and return a three tuple of:* (1) The Master RpcEnv* (2) The web UI bound port* (3) The REST server bound port, if any*/def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,conf: SparkConf): (RpcEnv, Int, Option[Int]) = {val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}

2.2 Spark啟動消息通信

Spark啟動過程中主要是進行Master和Worker之間的通信,其消息發送關系如下,首先由worker節點向Master發送注冊消息,然后Master處理完畢后,返回注冊成功消息或失敗消息。

其詳細過程如下:

(1) 當Master啟動后,隨之啟動各Worker,Worker啟動時會創建通信環境RpcEnv和終端點EndPoint,并向Master發送注冊Worker的消息RegisterWorker.Worker.tryRegisterAllMasters方法如下:

// 因為Master可能不止一個 private def tryRegisterAllMasters(): Array[JFuture[_]] = {masterRpcAddresses.map { masterAddress =>registerMasterThreadPool.submit(new Runnable {override def run(): Unit = {try {logInfo("Connecting to master " + masterAddress + "...")// 獲取Master終端點的引用val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)registerWithMaster(masterEndpoint)} catch {}... }private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {// 根據Master節點的引用發送注冊信息masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)).onComplete {// 返回注冊成功或失敗的結果// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>Utils.tryLogNonFatalError {handleRegisterResponse(msg)}case Failure(e) =>logError(s"Cannot register with master: ${masterEndpoint.address}", e)System.exit(1)}(ThreadUtils.sameThread)}

(2) Master收到消息后,需要對Worker發送的信息進行驗證、記錄。如果注冊成功,則發送RegisteredWorker消息給對應的Worker,告訴Worker已經完成注冊,
隨之進行步驟3,即Worker定期發送心跳給Master;如果注冊過程中失敗,則會發送RegisterWorkerFailed消息,Woker打印出錯日志并結束Worker啟動。Master.receiverAndReply方法如下:

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>logInfo("Registering worker %s:%d with %d cores, %s RAM".format(workerHost, workerPort, cores, Utils.megabytesToString(memory)))// Master處于STANDBY狀態if (state == RecoveryState.STANDBY) {context.reply(MasterInStandby)} else if (idToWorker.contains(id)) { // 在注冊列表中發現了該Worker節點context.reply(RegisterWorkerFailed("Duplicate worker ID"))} else {val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)// registerWorker方法會把Worker放到注冊列表中if (registerWorker(worker)) {persistenceEngine.addWorker(worker)context.reply(RegisteredWorker(self, masterWebUiUrl))schedule()} else {val workerAddress = worker.endpoint.addresslogWarning("Worker registration failed. Attempted to re-register worker at same " +"address: " + workerAddress)context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))}}... }

(3) 當Worker接收到注冊成功后,會定時發送心跳信息Heartbeat給Master,以便Master了解Worker的實時狀態。間隔時間可以在spark.worker.timeout中設置,注意,該設置值的1/4為心跳間隔。

2.3 Spark運行時消息通信

用戶提交應用程序時,應用程序的SparkContext會向Master發送注冊應用信息,并由Master給該應用分配Executor,Executor啟動后會向SparkContext發送注冊成功消息;

當SparkContext的RDD觸發行動操作后,通過DAGScheduler進行劃分stage,并將stage轉化為TaskSet,接著由TaskScheduler向注冊的Executor發送執行消息,Executor接收到任務消息后啟動并運行;最后當所有任務運行時,由Driver處理結果并回收資源。如下圖所示:

Spark啟動過程中主要是進行Master和Worker之間的通信,其消息發送關系如下,首先由worker節點向Master發送注冊消息,然后Master處理完畢后,返回注冊成功消息或失敗消息。

其詳細過程如下:

(1) 在SparkContext創建過程中會先實例化SchedulerBackend對象,standalone模式中實際創建的是StandaloneSchedulerBackend對象,在該對象啟動過程中會繼承父類DriverEndpoint和創建StandaloneAppClient的ClientEndpoint兩個終端點。

在ClientEndpoint的tryRegisterAllMasters方法中創建注冊線程池registerMasterThreadPool, 在該線程池中啟動注冊線程并向Master發送RegisterApplication注冊應用的消息,代碼如下:

/*** Register with all masters asynchronously and returns an array `Future`s for cancellation.*/private def tryRegisterAllMasters(): Array[JFuture[_]] = {// 遍歷所有的Master, 這是一個for推導式,會構造會一個集合for (masterAddress <- masterRpcAddresses) yield {// 在線程池中啟動注冊線程,當該線程讀到應用注冊成功標識registered==true時退出注冊線程registerMasterThreadPool.submit(new Runnable {override def run(): Unit = try {if (registered.get) { // private val registered = new AtomicBoolean(false) 原子類型return}logInfo("Connecting to master " + masterAddress.toSparkURL + "...")val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)// 發送注冊消息masterRef.send(RegisterApplication(appDescription, self))} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}})}}

當Master接收到注冊應用消息時,在registerApplication方法中記錄應用信息并把該應用加入到等待運行列表中,發送注冊成功消息
RegisteredApplication給ClientEndpoint,同時調用startExecutorsOnWorkers方法運行應用。Master.startExecutorsOnWorkers方法代碼如下:

case RegisterApplication(description, driver) =>// TODO Prevent repeated registrations from some driverif (state == RecoveryState.STANDBY) {// ignore, don't send response} else {logInfo("Registering app " + description.name)val app = createApplication(description, driver)registerApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)// 使用持久化引擎,將Application進行持久化persistenceEngine.addApplication(app)driver.send(RegisteredApplication(app.id, self))schedule()} private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// 對Worker節點進行隨機排序val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0// 按照順序在集群中啟動Driver,Driver盡量在不同的Worker節點上運行for (driver <- waitingDrivers.toList) { var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}startExecutorsOnWorkers()}private def startExecutorsOnWorkers(): Unit = {// 使用FIFO算法運行應用,即先注冊的應用先運行for (app <- waitingApps if app.coresLeft > 0) {val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor// Filter out workers that don't have enough resources to launch an executorval usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor.getOrElse(1)).sortBy(_.coresFree).reverse// 一種是spreadOutApps,就是把應用運行在盡量多的Worker上,另一種是非spreadOutAppsval assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate them// 給每個worker分配完application要求的cpu core之后,遍歷worker啟動executorfor (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))}}}

(2) StandaloneAppClient.ClientEndpoint接收到Master發送的RegisteredApplication消息,需要把注冊標識registered置為true。代碼如下:

case RegisteredApplication(appId_, masterRef) =>appId.set(appId_)registered.set(true)master = Some(masterRef)listener.connected(appId.get)

(3) 在Master類的starExecutorsOnWorkers方法中分配資源運行應用程序時,調用allocateWorkerResourceToExecutors方法實現在Worker中啟動Executor。當
Worker收到Master發送過來的LaunchExecutor消息,先實例化ExecutorRunner對象,在ExecutorRunner啟動中會創建進程生成器ProcessBuilder, 然后由該生成器使用command創建CoarseGrainedExecutorBackend對象,該對象是Executor運行的容器,最后Worker發送ExecutorStateChanged消息給Master,通知Executor容器已經創建完畢。

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>if (masterUrl != activeMasterUrl) {logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")} else {try {logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))// 創建executor執行目錄val executorDir = new File(workDir, appId + "/" + execId)if (!executorDir.mkdirs()) {throw new IOException("Failed to create directory " + executorDir)}// 創建executor本地目錄,當應用程序結束后由worker刪除val appLocalDirs = appDirectories.getOrElse(appId,Utils.getOrCreateLocalRootDirs(conf).map { dir =>val appDir = Utils.createDirectory(dir, namePrefix = "executor")Utils.chmod700(appDir)appDir.getAbsolutePath()}.toSeq)appDirectories(appId) = appLocalDirs// 在ExecutorRunner中創建CoarseGrainedExecutorBackend對象,創建的是使用應用信息中的command,而command在// StandaloneSchedulerBackend的start方法中構建val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,workerUri,conf,appLocalDirs, ExecutorState.RUNNING)executors(appId + "/" + execId) = managermanager.start() // 啟動ExecutorRunnercoresUsed += cores_memoryUsed += memory_sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))} catch {...}}

在ExecutorRunner創建中調用了fetchAndRunExecutor方法進行實現,在該方法中command內容在StandaloneSchedulerBackend中定義,指定構造Executor運行容器CoarseGrainedExecutorBackend,
代碼如下:

private def fetchAndRunExecutor() {try {// 通過應用程序信息和環境配置創建構造器builderval builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),memory, sparkHome.getAbsolutePath, substituteVariables)val command = builder.command()val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")logInfo(s"Launch command: $formattedCommand")// 在構造器builder中添加執行目錄等信息builder.directory(executorDir)builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")// Add webUI log urlsval baseUrl =s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")// 啟動構造器,創建CoarseGrainedExecutorBackend實例process = builder.start()val header = "Spark Executor Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)// 輸出CoarseGrainedExecutorBackend實例運行信息val stdout = new File(executorDir, "stdout")stdoutAppender = FileAppender(process.getInputStream, stdout, conf)val stderr = new File(executorDir, "stderr")Files.write(header, stderr, StandardCharsets.UTF_8)stderrAppender = FileAppender(process.getErrorStream, stderr, conf)// 等待CoarseGrainedExecutorBackend運行結束,當結束時向Worker發送退出狀態信息val exitCode = process.waitFor() state = ExecutorState.EXITEDval message = "Command exited with code " + exitCodeworker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))} catch {...}}

(4) Master接收到Worker發送的ExecutorStateChanged消息,代碼如下:

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>// 找到executor對應的app,然后flatMap,通過app內部的緩存獲取executor信息val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))execOption match {case Some(exec) =>// 設置executor的當前狀態val appInfo = idToApp(appId)val oldState = exec.stateexec.state = stateif (state == ExecutorState.RUNNING) {assert(oldState == ExecutorState.LAUNCHING,s"executor $execId state transfer from $oldState to RUNNING is illegal")appInfo.resetRetryCount()}// 向Driver發送ExecutorUpdated消息exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))...

(5) 在3中的CoarseGrainedExecutorBackend啟動方法onStart中,會發送注冊Executor消息RegisterExecutor給DriverEndpoint,DriverEndpoint先判斷該Executor是否已經注冊,在makeOffers()方法
中分配運行任務資源,最后發送LaunchTask消息執行任務。

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)} else {...// 記錄executor編號以及該executor需要使用的核數addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorRef.address, hostname,cores, cores, logUrls)// 創建executor編號和其具體信息的鍵值列表CoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}// 回復Executor完成注冊消息并在監聽總線中加入添加executor事件executorRef.send(RegisteredExecutor)context.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))// 分配運行任務資源并發送LaunchTask消息執行任務makeOffers()}

(6) CoarseGrainedExecutorBackend接收到Executor注冊成功RegisteredExecutor消息時,在CoarseGrainedExecutorBackend容器中實例化
Executor對象。啟動完畢后,會定時向Driver發送心跳信息, 等待接收從DriverEndpoint發送執行任務的消息。CoarseGrainedExecutorBackend處理注冊成功代碼如下:

// 向driver注冊成功了,返回RegisteredExecutor消息 case RegisteredExecutor =>logInfo("Successfully registered with driver")try {// 新建Executor, 該Executor會定時向Driver發送心跳信息,等待Driver下發任務executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)} catch {...}

(7) CoarseGrainedExecutorBackend的Executor啟動后接收從DriverEndpoint發送的LaunchTask執行任務消息,任務執行是在Executor的launchTask方法實現的。在執行時會創建TaskRunner進程,由該進程進行任務處理,
處理完畢后發送StateUpdate消息返回給CoarseGrainedExecutorBackend。任務執行和獲取結果見后?

def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit = {// 對于每一個task創建一個TaskRunnerval tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)// 將taskRunner放入內存緩存runningTasks.put(taskId, tr)// 將taskRunner放入線程池中,會自動排隊threadPool.execute(tr)}

(8) 在TaskRunner執行任務完成時,會向DriverEndpoint發送StatusUpdate消息,DriverEndpoint接收到消息會調用TaskSchedulerImpl的statusUpdate方法,根據任務執行不同的結果處理,處理完畢后再給該Executor分配執行任務。代碼如下:

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {var failedExecutor: Option[String] = Nonevar reason: Option[ExecutorLossReason] = Nonesynchronized {try {taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>if (state == TaskState.LOST) {// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,// where each executor corresponds to a single task, so mark the executor as failed.val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException("taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))if (executorIdToRunningTaskIds.contains(execId)) {reason = Some(SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))removeExecutor(execId, reason.get)failedExecutor = Some(execId)}}// 調用TaskSchedulerImpl的statusUpdate方法,根據任務執行不同的結果處理if (TaskState.isFinished(state)) {cleanupTaskState(tid)taskSet.removeRunningTask(tid)if (state == TaskState.FINISHED) {// 任務執行成功后,回收該Executor運行該任務的CPU,再根據實際情況分配任務taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}}case None =>logError(("Ignoring update with state %s for TID %s because its task set is gone (this is " +"likely the result of receiving duplicate task finished status updates) or its " +"executor has been marked as failed.").format(state, tid))}} catch {case e: Exception => logError("Exception in statusUpdate", e)}}// Update the DAGScheduler without holding a lock on this, since that can deadlockif (failedExecutor.isDefined) {assert(reason.isDefined)dagScheduler.executorLost(failedExecutor.get, reason.get)backend.reviveOffers()}}

總結

以上是生活随笔為你收集整理的Spark详解(四):Spark组件以及消息通信原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。