日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析

發布時間:2024/1/23 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概述

前幾篇博文都在介紹Spark的調度,這篇博文我們從更加宏觀的調度看Spark,講講Spark的部署模式。Spark部署模式分以下幾種:

  • local 模式
  • local-cluster 模式
  • Standalone 模式
  • YARN 模式
  • Mesos 模式

我們先來簡單介紹下YARN模式,然后深入講解Standalone模式。

YARN 模式介紹

YARN介紹

YARN是一個資源管理、任務調度的框架,主要包含三大模塊:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。

其中,ResourceManager負責所有資源的監控、分配和管理;ApplicationMaster負責每一個具體應用程序的調度和協調;NodeManager負責每一個節點的維護。

對于所有的applications,RM擁有絕對的控制權和對資源的分配權。而每個AM則會和RM協商資源,同時和NodeManager通信來執行和監控task。幾個模塊之間的關系如圖所示。

Yarn Cluster 模式

Spark的Yarn Cluster 模式流程如下:

  • 本地用YARN Client 提交App 到 Yarn Resource Manager
  • Yarn Resource Manager 選個 YARN Node Manager,用它來
    • 創建個ApplicationMaster,SparkContext相當于是這個ApplicationMaster管的APP,生成YarnClusterScheduler與YarnClusterSchedulerBackend
    • 選擇集群中的容器啟動CoarseCrainedExecutorBackend,用來啟動spark.executor。
  • ApplicationMaster與CoarseCrainedExecutorBackend會有遠程調用。

Yarn Client 模式

Spark的Yarn Client 模式流程如下:

  • 本地啟動SparkContext,生成YarnClientClusterScheduler 和 YarnClientClusterSchedulerBackend
  • YarnClientClusterSchedulerBackend啟動yarn.Client,用它提交App 到 Yarn Resource Manager
  • Yarn Resource Manager 選個 YARN Node Manager,用它來選擇集群中的容器啟動CoarseCrainedExecutorBackend,用來啟動spark.executor
  • YarnClientClusterSchedulerBackend與CoarseCrainedExecutorBackend會有遠程調用。

Standalone 模式介紹

  • 啟動app,在SparkContxt啟動過程中,先初始化DAGScheduler 和 TaskScheduler,并初始化 SparkDeploySchedulerBackend,并在其內部啟動DriverEndpoint和ClientEndpoint。
  • ClientEndpoint想Master注冊app,Master收到注冊信息后把該app加入到等待運行app列表中,等待由Master分配給該app worker。
  • app獲取到worker后,Master通知Worker的WorkerEndpont創建CoarseGrainedExecutorBackend進程,在該進程中創建執行容器executor
  • executor創建完畢后發送信息給Master和DriverEndpoint,告知Executor創建完畢,在SparkContext注冊,后等待DriverEndpoint發送執行任務的消息。
  • SparkContext分配TaskSet給CoarseGrainedExecutorBackend,按一定調度策略在executor執行。詳見:《深入理解Spark 2.1 Core (二):DAG調度器的實現與源碼分析 》與《深入理解Spark 2.1 Core (三):任務調度器的實現與源碼分析 》
  • CoarseGrainedExecutorBackend在Task處理的過程中,把處理Task的狀態發送給DriverEndpoint,Spark根據不同的執行結果來處理。若處理完畢,則繼續發送其他TaskSet。詳見:《深入理解Spark 2.1 Core (四):運算結果處理和容錯的實現與源碼分析 》
  • app運行完成后,SparkContext會進行資源回收,銷毀Worker的CoarseGrainedExecutorBackend進程,然后注銷自己。
  • Standalone 啟動集群

    啟動Master

    master.Master

    我們先來看下Master對象的main函數做了什么:

    private[deploy] object Master extends Logging {val SYSTEM_NAME = "sparkMaster"val ENDPOINT_NAME = "Master"def main(argStrings: Array[String]) {Utils.initDaemon(log)//創建SparkConfval conf = new SparkConf//解析SparkConf參數val args = new MasterArguments(argStrings, conf)val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,conf: SparkConf): (RpcEnv, Int, Option[Int]) = {val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)//創建Masterval masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)//返回 Master RpcEnv,//web UI 端口,//其他服務的端口(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)} }
    • 1

    master.MasterArguments

    接下來我們看看master是如何解析參數的:

    private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging {//默認配置var host = Utils.localHostName()var port = 7077var webUiPort = 8080//Spark屬性文件 //默認為 spark-default.confvar propertiesFile: String = null// 檢查環境變量if (System.getenv("SPARK_MASTER_IP") != null) {logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_HOST")host = System.getenv("SPARK_MASTER_IP")}if (System.getenv("SPARK_MASTER_HOST") != null) {host = System.getenv("SPARK_MASTER_HOST")}if (System.getenv("SPARK_MASTER_PORT") != null) {port = System.getenv("SPARK_MASTER_PORT").toInt}if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt}parse(args.toList)// 轉變SparkConfpropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)//環境變量的SPARK_MASTER_WEBUI_PORT//會被Spark屬性spark.master.ui.port所覆蓋if (conf.contains("spark.master.ui.port")) {webUiPort = conf.get("spark.master.ui.port").toInt}//解析命令行參數//命令行參數會把環境變量和Spark屬性都覆蓋@tailrecprivate def parse(args: List[String]): Unit = args match {case ("--ip" | "-i") :: value :: tail =>Utils.checkHost(value, "ip no longer supported, please use hostname " + value)host = valueparse(tail)case ("--host" | "-h") :: value :: tail =>Utils.checkHost(value, "Please use hostname " + value)host = valueparse(tail)case ("--port" | "-p") :: IntParam(value) :: tail =>port = valueparse(tail)case "--webui-port" :: IntParam(value) :: tail =>webUiPort = valueparse(tail)case ("--properties-file") :: value :: tail =>propertiesFile = valueparse(tail)case ("--help") :: tail =>printUsageAndExit(0)case Nil => case _ =>printUsageAndExit(1)}private def printUsageAndExit(exitCode: Int) {System.err.println("Usage: Master [options]\n" +"\n" +"Options:\n" +" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +" -h HOST, --host HOST Hostname to listen on\n" +" -p PORT, --port PORT Port to listen on (default: 7077)\n" +" --webui-port PORT Port for web UI (default: 8080)\n" +" --properties-file FILE Path to a custom Spark properties file.\n" +" Default is conf/spark-defaults.conf.")System.exit(exitCode)} }
    • 1
    • 37

    我們可以看到上述參數設置的優先級別為:

    <spark?default.conf<<

    啟動Worker

    worker.Worker

    我們先來看下Worker對象的main函數做了什么:

    private[deploy] object Worker extends Logging {val SYSTEM_NAME = "sparkWorker"val ENDPOINT_NAME = "Worker"def main(argStrings: Array[String]) {Utils.initDaemon(log)//創建SparkConfval conf = new SparkConf//解析SparkConf參數val args = new WorkerArguments(argStrings, conf)val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,args.memory, args.masters, args.workDir, conf = conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,cores: Int,memory: Int,masterUrls: Array[String],workDir: String,workerNumber: Option[Int] = None,conf: SparkConf = new SparkConf): RpcEnv = {val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))//創建WorkerrpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))rpcEnv}***

    worker.WorkerArguments

    worker.WorkerArguments與master.MasterArguments類似:

    private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {var host = Utils.localHostName()var port = 0var webUiPort = 8081var cores = inferDefaultCores()var memory = inferDefaultMemory()var masters: Array[String] = nullvar workDir: String = nullvar propertiesFile: String = null// 檢查環境變量if (System.getenv("SPARK_WORKER_PORT") != null) {port = System.getenv("SPARK_WORKER_PORT").toInt}if (System.getenv("SPARK_WORKER_CORES") != null) {cores = System.getenv("SPARK_WORKER_CORES").toInt}if (conf.getenv("SPARK_WORKER_MEMORY") != null) {memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY"))}if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt}if (System.getenv("SPARK_WORKER_DIR") != null) {workDir = System.getenv("SPARK_WORKER_DIR")}parse(args.toList)// 轉變SparkConfpropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)if (conf.contains("spark.worker.ui.port")) {webUiPort = conf.get("spark.worker.ui.port").toInt}checkWorkerMemory()@tailrecprivate def parse(args: List[String]): Unit = args match {case ("--ip" | "-i") :: value :: tail =>Utils.checkHost(value, "ip no longer supported, please use hostname " + value)host = valueparse(tail)case ("--host" | "-h") :: value :: tail =>Utils.checkHost(value, "Please use hostname " + value)host = valueparse(tail)case ("--port" | "-p") :: IntParam(value) :: tail =>port = valueparse(tail)case ("--cores" | "-c") :: IntParam(value) :: tail =>cores = valueparse(tail)case ("--memory" | "-m") :: MemoryParam(value) :: tail =>memory = valueparse(tail)//工作目錄case ("--work-dir" | "-d") :: value :: tail =>workDir = valueparse(tail)case "--webui-port" :: IntParam(value) :: tail =>webUiPort = valueparse(tail)case ("--properties-file") :: value :: tail =>propertiesFile = valueparse(tail)case ("--help") :: tail =>printUsageAndExit(0)case value :: tail =>if (masters != null) { // Two positional arguments were givenprintUsageAndExit(1)}masters = Utils.parseStandaloneMasterUrls(value)parse(tail)case Nil =>if (masters == null) { // No positional argument was givenprintUsageAndExit(1)}case _ =>printUsageAndExit(1)}***

    資源回收

    我們在概述中提到了“ app運行完成后,SparkContext會進行資源回收,銷毀Worker的CoarseGrainedExecutorBackend進程,然后注銷自己。”接下來我們就來講解下Master和Executor是如何感知到Application的退出的。
    調用棧如下:

    • SparkContext.stop
      • DAGScheduler.stop
        • TaskSchedulerImpl.stop
          • CoarseGrainedSchedulerBackend.stop
            • CoarseGrainedSchedulerBackend.stopExecutors
              • CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
                • CoarseGrainedExecutorBackend.receive
                  • Executor.stop
              • CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply

    SparkContext.stop

    SparkContext.stop會調用DAGScheduler.stop

    ***if (_dagScheduler != null) {Utils.tryLogNonFatalError {_dagScheduler.stop()}_dagScheduler = null}***

    DAGScheduler.stop

    DAGScheduler.stop會調用TaskSchedulerImpl.stop

    def stop() {//停止消息調度messageScheduler.shutdownNow()//停止事件處理循環eventProcessLoop.stop()//調用TaskSchedulerImpl.stoptaskScheduler.stop()}

    TaskSchedulerImpl.stop

    TaskSchedulerImpl.stop會調用CoarseGrainedSchedulerBackend.stop

    override def stop() {//停止推斷speculationScheduler.shutdown()//調用CoarseGrainedSchedulerBackend.stopif (backend != null) {backend.stop()}//停止結果獲取if (taskResultGetter != null) {taskResultGetter.stop()}starvationTimer.cancel()}

    CoarseGrainedSchedulerBackend.stop

    override def stop() {//調用stopExecutors()stopExecutors()try {if (driverEndpoint != null) {//發送StopDriver信號driverEndpoint.askWithRetry[Boolean](StopDriver)}} catch {case e: Exception =>throw new SparkException("Error stopping standalone scheduler's driver endpoint", e)}}

    CoarseGrainedSchedulerBackend.stopExecutors

    我們先來看下CoarseGrainedSchedulerBackend.stopExecutors

    def stopExecutors() {try {if (driverEndpoint != null) {logInfo("Shutting down all executors")//發送StopExecutors信號driverEndpoint.askWithRetry[Boolean](StopExecutors)}} catch {case e: Exception =>throw new SparkException("Error asking standalone scheduler to shut down executors", e)}}
    • 1
    • 12

    CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply

    DriverEndpoint接收并回應該信號:

    case StopExecutors =>logInfo("Asking each executor to shut down")for ((_, executorData) <- executorDataMap) {//給CoarseGrainedExecutorBackend發送StopExecutor信號executorData.executorEndpoint.send(StopExecutor)}context.reply(true)
    • 1

    CoarseGrainedExecutorBackend.receive

    CoarseGrainedExecutorBackend接收該信號:

    case StopExecutor =>stopping.set(true)logInfo("Driver commanded a shutdown")//這里并沒有直接關閉Executor,//因為Executor必須先返回確認幀給CoarseGrainedSchedulerBackend//所以,這的策略是給自己再發一個Shutdown信號,然后處理self.send(Shutdown)case Shutdown =>stopping.set(true)new Thread("CoarseGrainedExecutorBackend-stop-executor") {override def run(): Unit = {// executor.stop() 會調用 `SparkEnv.stop()` // 直到 RpcEnv 徹底結束 // 但是, 如果 `executor.stop()` 運行在和RpcEnv相同的線程里面, // RpcEnv 會等到`executor.stop()`結束后才能結束,// 這就產生了死鎖// 因此,我們需要新建一個線程executor.stop()}
    • 1

    Executor.stop

    def stop(): Unit = {env.metricsSystem.report()//關閉心跳heartbeater.shutdown()heartbeater.awaitTermination(10, TimeUnit.SECONDS)//關閉線程池threadPool.shutdown()if (!isLocal) {//停止SparkEnvenv.stop()}}

    CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply

    我們回過頭來看CoarseGrainedSchedulerBackend.stop,調用stopExecutors()結束后,會給 driverEndpoint發送StopDriver信號。CoarseGrainedSchedulerBackend.DriverEndpoint.接收信號并回復:

    case StopDriver =>context.reply(true)//停止driverEndpointstop()

    總結

    以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。