日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析

發(fā)布時(shí)間:2024/1/23 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

概述

前幾篇博文都在介紹Spark的調(diào)度,這篇博文我們從更加宏觀的調(diào)度看Spark,講講Spark的部署模式。Spark部署模式分以下幾種:

  • local 模式
  • local-cluster 模式
  • Standalone 模式
  • YARN 模式
  • Mesos 模式

我們先來簡單介紹下YARN模式,然后深入講解Standalone模式。

YARN 模式介紹

YARN介紹

YARN是一個(gè)資源管理、任務(wù)調(diào)度的框架,主要包含三大模塊:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。

其中,ResourceManager負(fù)責(zé)所有資源的監(jiān)控、分配和管理;ApplicationMaster負(fù)責(zé)每一個(gè)具體應(yīng)用程序的調(diào)度和協(xié)調(diào);NodeManager負(fù)責(zé)每一個(gè)節(jié)點(diǎn)的維護(hù)。

對(duì)于所有的applications,RM擁有絕對(duì)的控制權(quán)和對(duì)資源的分配權(quán)。而每個(gè)AM則會(huì)和RM協(xié)商資源,同時(shí)和NodeManager通信來執(zhí)行和監(jiān)控task。幾個(gè)模塊之間的關(guān)系如圖所示。

Yarn Cluster 模式

Spark的Yarn Cluster 模式流程如下:

  • 本地用YARN Client 提交App 到 Yarn Resource Manager
  • Yarn Resource Manager 選個(gè) YARN Node Manager,用它來
    • 創(chuàng)建個(gè)ApplicationMaster,SparkContext相當(dāng)于是這個(gè)ApplicationMaster管的APP,生成YarnClusterScheduler與YarnClusterSchedulerBackend
    • 選擇集群中的容器啟動(dòng)CoarseCrainedExecutorBackend,用來啟動(dòng)spark.executor。
  • ApplicationMaster與CoarseCrainedExecutorBackend會(huì)有遠(yuǎn)程調(diào)用。

Yarn Client 模式

Spark的Yarn Client 模式流程如下:

  • 本地啟動(dòng)SparkContext,生成YarnClientClusterScheduler 和 YarnClientClusterSchedulerBackend
  • YarnClientClusterSchedulerBackend啟動(dòng)yarn.Client,用它提交App 到 Yarn Resource Manager
  • Yarn Resource Manager 選個(gè) YARN Node Manager,用它來選擇集群中的容器啟動(dòng)CoarseCrainedExecutorBackend,用來啟動(dòng)spark.executor
  • YarnClientClusterSchedulerBackend與CoarseCrainedExecutorBackend會(huì)有遠(yuǎn)程調(diào)用。

Standalone 模式介紹

  • 啟動(dòng)app,在SparkContxt啟動(dòng)過程中,先初始化DAGScheduler 和 TaskScheduler,并初始化 SparkDeploySchedulerBackend,并在其內(nèi)部啟動(dòng)DriverEndpoint和ClientEndpoint。
  • ClientEndpoint想Master注冊(cè)app,Master收到注冊(cè)信息后把該app加入到等待運(yùn)行app列表中,等待由Master分配給該app worker。
  • app獲取到worker后,Master通知Worker的WorkerEndpont創(chuàng)建CoarseGrainedExecutorBackend進(jìn)程,在該進(jìn)程中創(chuàng)建執(zhí)行容器executor
  • executor創(chuàng)建完畢后發(fā)送信息給Master和DriverEndpoint,告知Executor創(chuàng)建完畢,在SparkContext注冊(cè),后等待DriverEndpoint發(fā)送執(zhí)行任務(wù)的消息。
  • SparkContext分配TaskSet給CoarseGrainedExecutorBackend,按一定調(diào)度策略在executor執(zhí)行。詳見:《深入理解Spark 2.1 Core (二):DAG調(diào)度器的實(shí)現(xiàn)與源碼分析 》與《深入理解Spark 2.1 Core (三):任務(wù)調(diào)度器的實(shí)現(xiàn)與源碼分析 》
  • CoarseGrainedExecutorBackend在Task處理的過程中,把處理Task的狀態(tài)發(fā)送給DriverEndpoint,Spark根據(jù)不同的執(zhí)行結(jié)果來處理。若處理完畢,則繼續(xù)發(fā)送其他TaskSet。詳見:《深入理解Spark 2.1 Core (四):運(yùn)算結(jié)果處理和容錯(cuò)的實(shí)現(xiàn)與源碼分析 》
  • app運(yùn)行完成后,SparkContext會(huì)進(jìn)行資源回收,銷毀Worker的CoarseGrainedExecutorBackend進(jìn)程,然后注銷自己。
  • Standalone 啟動(dòng)集群

    啟動(dòng)Master

    master.Master

    我們先來看下Master對(duì)象的main函數(shù)做了什么:

    private[deploy] object Master extends Logging {val SYSTEM_NAME = "sparkMaster"val ENDPOINT_NAME = "Master"def main(argStrings: Array[String]) {Utils.initDaemon(log)//創(chuàng)建SparkConfval conf = new SparkConf//解析SparkConf參數(shù)val args = new MasterArguments(argStrings, conf)val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,conf: SparkConf): (RpcEnv, Int, Option[Int]) = {val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)//創(chuàng)建Masterval masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)//返回 Master RpcEnv,//web UI 端口,//其他服務(wù)的端口(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)} }
    • 1

    master.MasterArguments

    接下來我們看看master是如何解析參數(shù)的:

    private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging {//默認(rèn)配置var host = Utils.localHostName()var port = 7077var webUiPort = 8080//Spark屬性文件 //默認(rèn)為 spark-default.confvar propertiesFile: String = null// 檢查環(huán)境變量if (System.getenv("SPARK_MASTER_IP") != null) {logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_HOST")host = System.getenv("SPARK_MASTER_IP")}if (System.getenv("SPARK_MASTER_HOST") != null) {host = System.getenv("SPARK_MASTER_HOST")}if (System.getenv("SPARK_MASTER_PORT") != null) {port = System.getenv("SPARK_MASTER_PORT").toInt}if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt}parse(args.toList)// 轉(zhuǎn)變SparkConfpropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)//環(huán)境變量的SPARK_MASTER_WEBUI_PORT//會(huì)被Spark屬性spark.master.ui.port所覆蓋if (conf.contains("spark.master.ui.port")) {webUiPort = conf.get("spark.master.ui.port").toInt}//解析命令行參數(shù)//命令行參數(shù)會(huì)把環(huán)境變量和Spark屬性都覆蓋@tailrecprivate def parse(args: List[String]): Unit = args match {case ("--ip" | "-i") :: value :: tail =>Utils.checkHost(value, "ip no longer supported, please use hostname " + value)host = valueparse(tail)case ("--host" | "-h") :: value :: tail =>Utils.checkHost(value, "Please use hostname " + value)host = valueparse(tail)case ("--port" | "-p") :: IntParam(value) :: tail =>port = valueparse(tail)case "--webui-port" :: IntParam(value) :: tail =>webUiPort = valueparse(tail)case ("--properties-file") :: value :: tail =>propertiesFile = valueparse(tail)case ("--help") :: tail =>printUsageAndExit(0)case Nil => case _ =>printUsageAndExit(1)}private def printUsageAndExit(exitCode: Int) {System.err.println("Usage: Master [options]\n" +"\n" +"Options:\n" +" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +" -h HOST, --host HOST Hostname to listen on\n" +" -p PORT, --port PORT Port to listen on (default: 7077)\n" +" --webui-port PORT Port for web UI (default: 8080)\n" +" --properties-file FILE Path to a custom Spark properties file.\n" +" Default is conf/spark-defaults.conf.")System.exit(exitCode)} }
    • 1
    • 37

    我們可以看到上述參數(shù)設(shè)置的優(yōu)先級(jí)別為:

    統(tǒng)環(huán)<spark?default.conf<數(shù)<應(yīng)級(jí)數(shù)設(shè)

    啟動(dòng)Worker

    worker.Worker

    我們先來看下Worker對(duì)象的main函數(shù)做了什么:

    private[deploy] object Worker extends Logging {val SYSTEM_NAME = "sparkWorker"val ENDPOINT_NAME = "Worker"def main(argStrings: Array[String]) {Utils.initDaemon(log)//創(chuàng)建SparkConfval conf = new SparkConf//解析SparkConf參數(shù)val args = new WorkerArguments(argStrings, conf)val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,args.memory, args.masters, args.workDir, conf = conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,cores: Int,memory: Int,masterUrls: Array[String],workDir: String,workerNumber: Option[Int] = None,conf: SparkConf = new SparkConf): RpcEnv = {val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))//創(chuàng)建WorkerrpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))rpcEnv}***

    worker.WorkerArguments

    worker.WorkerArguments與master.MasterArguments類似:

    private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {var host = Utils.localHostName()var port = 0var webUiPort = 8081var cores = inferDefaultCores()var memory = inferDefaultMemory()var masters: Array[String] = nullvar workDir: String = nullvar propertiesFile: String = null// 檢查環(huán)境變量if (System.getenv("SPARK_WORKER_PORT") != null) {port = System.getenv("SPARK_WORKER_PORT").toInt}if (System.getenv("SPARK_WORKER_CORES") != null) {cores = System.getenv("SPARK_WORKER_CORES").toInt}if (conf.getenv("SPARK_WORKER_MEMORY") != null) {memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY"))}if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt}if (System.getenv("SPARK_WORKER_DIR") != null) {workDir = System.getenv("SPARK_WORKER_DIR")}parse(args.toList)// 轉(zhuǎn)變SparkConfpropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)if (conf.contains("spark.worker.ui.port")) {webUiPort = conf.get("spark.worker.ui.port").toInt}checkWorkerMemory()@tailrecprivate def parse(args: List[String]): Unit = args match {case ("--ip" | "-i") :: value :: tail =>Utils.checkHost(value, "ip no longer supported, please use hostname " + value)host = valueparse(tail)case ("--host" | "-h") :: value :: tail =>Utils.checkHost(value, "Please use hostname " + value)host = valueparse(tail)case ("--port" | "-p") :: IntParam(value) :: tail =>port = valueparse(tail)case ("--cores" | "-c") :: IntParam(value) :: tail =>cores = valueparse(tail)case ("--memory" | "-m") :: MemoryParam(value) :: tail =>memory = valueparse(tail)//工作目錄case ("--work-dir" | "-d") :: value :: tail =>workDir = valueparse(tail)case "--webui-port" :: IntParam(value) :: tail =>webUiPort = valueparse(tail)case ("--properties-file") :: value :: tail =>propertiesFile = valueparse(tail)case ("--help") :: tail =>printUsageAndExit(0)case value :: tail =>if (masters != null) { // Two positional arguments were givenprintUsageAndExit(1)}masters = Utils.parseStandaloneMasterUrls(value)parse(tail)case Nil =>if (masters == null) { // No positional argument was givenprintUsageAndExit(1)}case _ =>printUsageAndExit(1)}***

    資源回收

    我們?cè)诟攀鲋刑岬搅恕?app運(yùn)行完成后,SparkContext會(huì)進(jìn)行資源回收,銷毀Worker的CoarseGrainedExecutorBackend進(jìn)程,然后注銷自己?!苯酉聛砦覀兙蛠碇v解下Master和Executor是如何感知到Application的退出的。
    調(diào)用棧如下:

    • SparkContext.stop
      • DAGScheduler.stop
        • TaskSchedulerImpl.stop
          • CoarseGrainedSchedulerBackend.stop
            • CoarseGrainedSchedulerBackend.stopExecutors
              • CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
                • CoarseGrainedExecutorBackend.receive
                  • Executor.stop
              • CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply

    SparkContext.stop

    SparkContext.stop會(huì)調(diào)用DAGScheduler.stop

    ***if (_dagScheduler != null) {Utils.tryLogNonFatalError {_dagScheduler.stop()}_dagScheduler = null}***

    DAGScheduler.stop

    DAGScheduler.stop會(huì)調(diào)用TaskSchedulerImpl.stop

    def stop() {//停止消息調(diào)度messageScheduler.shutdownNow()//停止事件處理循環(huán)eventProcessLoop.stop()//調(diào)用TaskSchedulerImpl.stoptaskScheduler.stop()}

    TaskSchedulerImpl.stop

    TaskSchedulerImpl.stop會(huì)調(diào)用CoarseGrainedSchedulerBackend.stop

    override def stop() {//停止推斷speculationScheduler.shutdown()//調(diào)用CoarseGrainedSchedulerBackend.stopif (backend != null) {backend.stop()}//停止結(jié)果獲取if (taskResultGetter != null) {taskResultGetter.stop()}starvationTimer.cancel()}

    CoarseGrainedSchedulerBackend.stop

    override def stop() {//調(diào)用stopExecutors()stopExecutors()try {if (driverEndpoint != null) {//發(fā)送StopDriver信號(hào)driverEndpoint.askWithRetry[Boolean](StopDriver)}} catch {case e: Exception =>throw new SparkException("Error stopping standalone scheduler's driver endpoint", e)}}

    CoarseGrainedSchedulerBackend.stopExecutors

    我們先來看下CoarseGrainedSchedulerBackend.stopExecutors

    def stopExecutors() {try {if (driverEndpoint != null) {logInfo("Shutting down all executors")//發(fā)送StopExecutors信號(hào)driverEndpoint.askWithRetry[Boolean](StopExecutors)}} catch {case e: Exception =>throw new SparkException("Error asking standalone scheduler to shut down executors", e)}}
    • 1
    • 12

    CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply

    DriverEndpoint接收并回應(yīng)該信號(hào):

    case StopExecutors =>logInfo("Asking each executor to shut down")for ((_, executorData) <- executorDataMap) {//給CoarseGrainedExecutorBackend發(fā)送StopExecutor信號(hào)executorData.executorEndpoint.send(StopExecutor)}context.reply(true)
    • 1

    CoarseGrainedExecutorBackend.receive

    CoarseGrainedExecutorBackend接收該信號(hào):

    case StopExecutor =>stopping.set(true)logInfo("Driver commanded a shutdown")//這里并沒有直接關(guān)閉Executor,//因?yàn)镋xecutor必須先返回確認(rèn)幀給CoarseGrainedSchedulerBackend//所以,這的策略是給自己再發(fā)一個(gè)Shutdown信號(hào),然后處理self.send(Shutdown)case Shutdown =>stopping.set(true)new Thread("CoarseGrainedExecutorBackend-stop-executor") {override def run(): Unit = {// executor.stop() 會(huì)調(diào)用 `SparkEnv.stop()` // 直到 RpcEnv 徹底結(jié)束 // 但是, 如果 `executor.stop()` 運(yùn)行在和RpcEnv相同的線程里面, // RpcEnv 會(huì)等到`executor.stop()`結(jié)束后才能結(jié)束,// 這就產(chǎn)生了死鎖// 因此,我們需要新建一個(gè)線程executor.stop()}
    • 1

    Executor.stop

    def stop(): Unit = {env.metricsSystem.report()//關(guān)閉心跳heartbeater.shutdown()heartbeater.awaitTermination(10, TimeUnit.SECONDS)//關(guān)閉線程池threadPool.shutdown()if (!isLocal) {//停止SparkEnvenv.stop()}}

    CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply

    我們回過頭來看CoarseGrainedSchedulerBackend.stop,調(diào)用stopExecutors()結(jié)束后,會(huì)給 driverEndpoint發(fā)送StopDriver信號(hào)。CoarseGrainedSchedulerBackend.DriverEndpoint.接收信號(hào)并回復(fù):

    case StopDriver =>context.reply(true)//停止driverEndpointstop()

    總結(jié)

    以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。