Spark详解(四):Spark组件以及消息通信原理
1. Spark核心基本概念
- Application(應用程序):指用戶編寫的Spark應用程序,包含驅動程序(Driver)和分布在集群中多個節點之上的Executor代碼,在執行過程中由一個或多個作業組成。
- Driver(驅動程序):Spark中的Driver即運行上述Application的main函數并且創建SparkContext,其中創建SparkContext的目的是為了準備Spark應用程序的運行環境。在Spark中由SparkContext負責與ClusterManager通信,進行資源申請、任務的分配和監控等;當Executor部分運行完成時候,Driver負責將SparkContext關閉。通常用SparkContext 代表Driver。
- Cluster Manager(集群資源管理器):指在集群上獲取資源的外部服務,目前有以下幾種。
- Standalane:Spark原生的資源管理,是由Master負責資源的管理。
- Hadoop Yarn:由Yarn中的ResourceManager負責資源的管理。
- Mesos:由Mesos中的Mesos Master負責資源管理
- Worker(工作節點):集群中任何可以運行Application代碼的節點,類似于YARN
的NodeManager - Master(總控進程):Spark Standalane運行模型下的主節點,負責資源管理和分配資源來運行Spark Application。
- Exeuctor(執行進程):Application運行Worker節點上的一個進程,該進程負責運行Task,并負責將數據存在內存或者磁盤上,每個Application都有各自一批的Executor。在Spark On Yarn 模式下,其進程名字為CoarseGraniedExecutorBackend,類似于Hadoop MapReduce中的Yarn Child。
2. 消息通信原理
2.1 Spark消息通信架構
在Spark中定義了通信框架接口,這些接口實現中調用Netty的具體方法(在Spark 2.0版本之前使用的是Akka)。在框架中以RpcEndPoint和RpcEndPointRef實現了Actor和ActorRef的相關動作,其中RpcEndPointRef是RpcEndPoint的引用,在消息通信中消息發送方持有引用RpcEndPointRef,它們之間的關系如下圖所示:
通信框架使用了工廠設計模式實現,這種設計方式實現了對Netty的解耦,能夠根據需要引入其他的消息通信工具。
具體的實現步驟如下:首先定義了RpcEnv和RpcEnvFactory兩個抽象類,在RpcEnv中定義了RPC通信框架的啟動、停止和關閉等抽象方法,在RpcEnvFactory中定義了創建抽象方法。然后在NettyRpcEnv和NettoyRpcEnvFactory類使用Netty對繼承的方法進行了實現。
在各個模塊中的實現,如Master和Worker等,會先使用RpcEnv的靜態方法創建RpcEnv實例,然后實例化Master,由于Master繼承與ThreadSafeRpcEndPoin,創建的Master實例是一個線程安全的終端點,接著調用RpcEnv的啟動終端點方法,把Master的終端點和其對應的引用注冊到RpcEnv中。在消息通信中,其他對象只需要獲取到了Master終端點的引用,就能發送消息給Master進行通信。下面是Master.scala中的startRpcEnvAndEndPoint方法:
/*** Start the Master and return a three tuple of:* (1) The Master RpcEnv* (2) The web UI bound port* (3) The REST server bound port, if any*/def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,conf: SparkConf): (RpcEnv, Int, Option[Int]) = {val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}2.2 Spark啟動消息通信
Spark啟動過程中主要是進行Master和Worker之間的通信,其消息發送關系如下,首先由worker節點向Master發送注冊消息,然后Master處理完畢后,返回注冊成功消息或失敗消息。
其詳細過程如下:
(1) 當Master啟動后,隨之啟動各Worker,Worker啟動時會創建通信環境RpcEnv和終端點EndPoint,并向Master發送注冊Worker的消息RegisterWorker.Worker.tryRegisterAllMasters方法如下:
// 因為Master可能不止一個 private def tryRegisterAllMasters(): Array[JFuture[_]] = {masterRpcAddresses.map { masterAddress =>registerMasterThreadPool.submit(new Runnable {override def run(): Unit = {try {logInfo("Connecting to master " + masterAddress + "...")// 獲取Master終端點的引用val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)registerWithMaster(masterEndpoint)} catch {}... }private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {// 根據Master節點的引用發送注冊信息masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)).onComplete {// 返回注冊成功或失敗的結果// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>Utils.tryLogNonFatalError {handleRegisterResponse(msg)}case Failure(e) =>logError(s"Cannot register with master: ${masterEndpoint.address}", e)System.exit(1)}(ThreadUtils.sameThread)}(2) Master收到消息后,需要對Worker發送的信息進行驗證、記錄。如果注冊成功,則發送RegisteredWorker消息給對應的Worker,告訴Worker已經完成注冊,
隨之進行步驟3,即Worker定期發送心跳給Master;如果注冊過程中失敗,則會發送RegisterWorkerFailed消息,Woker打印出錯日志并結束Worker啟動。Master.receiverAndReply方法如下:
(3) 當Worker接收到注冊成功后,會定時發送心跳信息Heartbeat給Master,以便Master了解Worker的實時狀態。間隔時間可以在spark.worker.timeout中設置,注意,該設置值的1/4為心跳間隔。
2.3 Spark運行時消息通信
用戶提交應用程序時,應用程序的SparkContext會向Master發送注冊應用信息,并由Master給該應用分配Executor,Executor啟動后會向SparkContext發送注冊成功消息;
當SparkContext的RDD觸發行動操作后,通過DAGScheduler進行劃分stage,并將stage轉化為TaskSet,接著由TaskScheduler向注冊的Executor發送執行消息,Executor接收到任務消息后啟動并運行;最后當所有任務運行時,由Driver處理結果并回收資源。如下圖所示:
Spark啟動過程中主要是進行Master和Worker之間的通信,其消息發送關系如下,首先由worker節點向Master發送注冊消息,然后Master處理完畢后,返回注冊成功消息或失敗消息。
其詳細過程如下:
(1) 在SparkContext創建過程中會先實例化SchedulerBackend對象,standalone模式中實際創建的是StandaloneSchedulerBackend對象,在該對象啟動過程中會繼承父類DriverEndpoint和創建StandaloneAppClient的ClientEndpoint兩個終端點。
在ClientEndpoint的tryRegisterAllMasters方法中創建注冊線程池registerMasterThreadPool, 在該線程池中啟動注冊線程并向Master發送RegisterApplication注冊應用的消息,代碼如下:
/*** Register with all masters asynchronously and returns an array `Future`s for cancellation.*/private def tryRegisterAllMasters(): Array[JFuture[_]] = {// 遍歷所有的Master, 這是一個for推導式,會構造會一個集合for (masterAddress <- masterRpcAddresses) yield {// 在線程池中啟動注冊線程,當該線程讀到應用注冊成功標識registered==true時退出注冊線程registerMasterThreadPool.submit(new Runnable {override def run(): Unit = try {if (registered.get) { // private val registered = new AtomicBoolean(false) 原子類型return}logInfo("Connecting to master " + masterAddress.toSparkURL + "...")val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)// 發送注冊消息masterRef.send(RegisterApplication(appDescription, self))} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}})}}當Master接收到注冊應用消息時,在registerApplication方法中記錄應用信息并把該應用加入到等待運行列表中,發送注冊成功消息
RegisteredApplication給ClientEndpoint,同時調用startExecutorsOnWorkers方法運行應用。Master.startExecutorsOnWorkers方法代碼如下:
(2) StandaloneAppClient.ClientEndpoint接收到Master發送的RegisteredApplication消息,需要把注冊標識registered置為true。代碼如下:
case RegisteredApplication(appId_, masterRef) =>appId.set(appId_)registered.set(true)master = Some(masterRef)listener.connected(appId.get)(3) 在Master類的starExecutorsOnWorkers方法中分配資源運行應用程序時,調用allocateWorkerResourceToExecutors方法實現在Worker中啟動Executor。當
Worker收到Master發送過來的LaunchExecutor消息,先實例化ExecutorRunner對象,在ExecutorRunner啟動中會創建進程生成器ProcessBuilder, 然后由該生成器使用command創建CoarseGrainedExecutorBackend對象,該對象是Executor運行的容器,最后Worker發送ExecutorStateChanged消息給Master,通知Executor容器已經創建完畢。
在ExecutorRunner創建中調用了fetchAndRunExecutor方法進行實現,在該方法中command內容在StandaloneSchedulerBackend中定義,指定構造Executor運行容器CoarseGrainedExecutorBackend,
代碼如下:
(4) Master接收到Worker發送的ExecutorStateChanged消息,代碼如下:
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>// 找到executor對應的app,然后flatMap,通過app內部的緩存獲取executor信息val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))execOption match {case Some(exec) =>// 設置executor的當前狀態val appInfo = idToApp(appId)val oldState = exec.stateexec.state = stateif (state == ExecutorState.RUNNING) {assert(oldState == ExecutorState.LAUNCHING,s"executor $execId state transfer from $oldState to RUNNING is illegal")appInfo.resetRetryCount()}// 向Driver發送ExecutorUpdated消息exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))...(5) 在3中的CoarseGrainedExecutorBackend啟動方法onStart中,會發送注冊Executor消息RegisterExecutor給DriverEndpoint,DriverEndpoint先判斷該Executor是否已經注冊,在makeOffers()方法
中分配運行任務資源,最后發送LaunchTask消息執行任務。
(6) CoarseGrainedExecutorBackend接收到Executor注冊成功RegisteredExecutor消息時,在CoarseGrainedExecutorBackend容器中實例化
Executor對象。啟動完畢后,會定時向Driver發送心跳信息, 等待接收從DriverEndpoint發送執行任務的消息。CoarseGrainedExecutorBackend處理注冊成功代碼如下:
(7) CoarseGrainedExecutorBackend的Executor啟動后接收從DriverEndpoint發送的LaunchTask執行任務消息,任務執行是在Executor的launchTask方法實現的。在執行時會創建TaskRunner進程,由該進程進行任務處理,
處理完畢后發送StateUpdate消息返回給CoarseGrainedExecutorBackend。任務執行和獲取結果見后?
(8) 在TaskRunner執行任務完成時,會向DriverEndpoint發送StatusUpdate消息,DriverEndpoint接收到消息會調用TaskSchedulerImpl的statusUpdate方法,根據任務執行不同的結果處理,處理完畢后再給該Executor分配執行任務。代碼如下:
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {var failedExecutor: Option[String] = Nonevar reason: Option[ExecutorLossReason] = Nonesynchronized {try {taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>if (state == TaskState.LOST) {// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,// where each executor corresponds to a single task, so mark the executor as failed.val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException("taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))if (executorIdToRunningTaskIds.contains(execId)) {reason = Some(SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))removeExecutor(execId, reason.get)failedExecutor = Some(execId)}}// 調用TaskSchedulerImpl的statusUpdate方法,根據任務執行不同的結果處理if (TaskState.isFinished(state)) {cleanupTaskState(tid)taskSet.removeRunningTask(tid)if (state == TaskState.FINISHED) {// 任務執行成功后,回收該Executor運行該任務的CPU,再根據實際情況分配任務taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}}case None =>logError(("Ignoring update with state %s for TID %s because its task set is gone (this is " +"likely the result of receiving duplicate task finished status updates) or its " +"executor has been marked as failed.").format(state, tid))}} catch {case e: Exception => logError("Exception in statusUpdate", e)}}// Update the DAGScheduler without holding a lock on this, since that can deadlockif (failedExecutor.isDefined) {assert(reason.isDefined)dagScheduler.executorLost(failedExecutor.get, reason.get)backend.reviveOffers()}}總結
以上是生活随笔為你收集整理的Spark详解(四):Spark组件以及消息通信原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark详解(三):Spark编程模型
- 下一篇: Spark详解(五):Spark作业执行