生活随笔
收集整理的這篇文章主要介紹了
深入理解Spark 2.1 Core (十三):sparkEnv类源码分析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
sparkEnv為運行的Spark實例(master,worker,executor等)持有運行環境相關的對象,sparkenv管理serializer, Akka actor system, block manager, map output tracker等對象。sparkEnv主要被內部使用,后面可能僅供內部使用。sparkEnv最重要的方法是createDriverEnv方法,該方法有三個參數:?conf: SparkConf,;isLocal:Boolean;?listenerBus: LiveListenerBus。LiveListenerBus以監聽器方式監聽各種事件并處理。
[java] view plaincopy
private[spark]?def?createDriverEnv(??????conf:?SparkConf,??????isLocal:?Boolean,??????listenerBus:?LiveListenerBus,??????mockOutputCommitCoordinator:?Option[OutputCommitCoordinator]?=?None):?SparkEnv?=?{????assert(conf.contains("spark.driver.host"),?"spark.driver.host?is?not?set?on?the?driver!")????assert(conf.contains("spark.driver.port"),?"spark.driver.port?is?not?set?on?the?driver!")????val?hostname?=?conf.get("spark.driver.host")????val?port?=?conf.get("spark.driver.port").toInt????create(??????conf,??????SparkContext.DRIVER_IDENTIFIER,??????hostname,??????port,??????isDriver?=?true,??????isLocal?=?isLocal,??????listenerBus?=?listenerBus,??????mockOutputCommitCoordinator?=?mockOutputCommitCoordinator????)??}??
上述方法最后調用create方法來創建:主要創建securityManager、ActorSystem、mapOutputTracker、ShuffleManager、ShuffleMemoryManger、BlockTranferService、BlockManagerMaster,BlockManager、BroadCastManager、CacheManager、HttpFileServer、metricssystem:
[java] view plaincopy
private?def?create(??????conf:?SparkConf,??????executorId:?String,??????hostname:?String,??????port:?Int,??????isDriver:?Boolean,??????isLocal:?Boolean,??????listenerBus:?LiveListenerBus?=?null,??????numUsableCores:?Int?=?0,??????mockOutputCommitCoordinator:?Option[OutputCommitCoordinator]?=?None):?SparkEnv?=?{??????????if?(isDriver)?{??????assert(listenerBus?!=?null,?"Attempted?to?create?driver?SparkEnv?with?null?listener?bus!")????}????????val?securityManager?=?new?SecurityManager(conf)??????????????val?(actorSystem,?boundPort)?=?{??????val?actorSystemName?=?if?(isDriver)?driverActorSystemName?else?executorActorSystemName??????AkkaUtils.createActorSystem(actorSystemName,?hostname,?port,?conf,?securityManager)????}??????????if?(isDriver)?{??????conf.set("spark.driver.port",?boundPort.toString)????}?else?{??????conf.set("spark.executor.port",?boundPort.toString)????}??????????def?instantiateClass[T](className:?String):?T?=?{??????val?cls?=?Class.forName(className,?true,?Utils.getContextOrSparkClassLoader)??????????????????try?{????????cls.getConstructor(classOf[SparkConf],?java.lang.Boolean.TYPE)??????????.newInstance(conf,?new?java.lang.Boolean(isDriver))??????????.asInstanceOf[T]??????}?catch?{????????case?_:?NoSuchMethodException?=>??????????try?{????????????cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]??????????}?catch?{????????????case?_:?NoSuchMethodException?=>??????????????cls.getConstructor().newInstance().asInstanceOf[T]??????????}??????}????}??????????????def?instantiateClassFromConf[T](propertyName:?String,?defaultClassName:?String):?T?=?{??????instantiateClass[T](conf.get(propertyName,?defaultClassName))????}??????val?serializer?=?instantiateClassFromConf[Serializer](??????"spark.serializer",?"org.apache.spark.serializer.JavaSerializer")????logDebug(s"Using?serializer:?${serializer.getClass}")??????val?closureSerializer?=?instantiateClassFromConf[Serializer](??????"spark.closure.serializer",?"org.apache.spark.serializer.JavaSerializer")??????def?registerOrLookup(name:?String,?newActor:?=>?Actor):?ActorRef?=?{??????if?(isDriver)?{????????logInfo("Registering?"?+?name)????????actorSystem.actorOf(Props(newActor),?name?=?name)??????}?else?{????????AkkaUtils.makeDriverRef(name,?conf,?actorSystem)??????}????}????????val?mapOutputTracker?=??if?(isDriver)?{??????new?MapOutputTrackerMaster(conf)????}?else?{??????new?MapOutputTrackerWorker(conf)????}??????????????mapOutputTracker.trackerActor?=?registerOrLookup(??????"MapOutputTracker",??????new?MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],?conf))??????????val?shortShuffleMgrNames?=?Map(??????"hash"?->?"org.apache.spark.shuffle.hash.HashShuffleManager",??????"sort"?->?"org.apache.spark.shuffle.sort.SortShuffleManager")????val?shuffleMgrName?=?conf.get("spark.shuffle.manager",?"sort")????val?shuffleMgrClass?=?shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase,?shuffleMgrName)????val?shuffleManager?=?instantiateClass[ShuffleManager](shuffleMgrClass)??????val?shuffleMemoryManager?=?new?ShuffleMemoryManager(conf)??????val?blockTransferService?=??????conf.get("spark.shuffle.blockTransferService",?"netty").toLowerCase?match?{????????case?"netty"?=>??????????new?NettyBlockTransferService(conf,?securityManager,?numUsableCores)????????case?"nio"?=>??????????new?NioBlockTransferService(conf,?securityManager)??????}??????val?blockManagerMaster?=?new?BlockManagerMaster(registerOrLookup(??????"BlockManagerMaster",??????new?BlockManagerMasterActor(isLocal,?conf,?listenerBus)),?conf,?isDriver)??????????val?blockManager?=?new?BlockManager(executorId,?actorSystem,?blockManagerMaster,??????serializer,?conf,?mapOutputTracker,?shuffleManager,?blockTransferService,?securityManager,??????numUsableCores)??????val?broadcastManager?=?new?BroadcastManager(isDriver,?conf,?securityManager)??????val?cacheManager?=?new?CacheManager(blockManager)??????val?httpFileServer?=??????if?(isDriver)?{????????val?fileServerPort?=?conf.getInt("spark.fileserver.port",?0)????????val?server?=?new?HttpFileServer(conf,?securityManager,?fileServerPort)????????server.initialize()????????conf.set("spark.fileserver.uri",??server.serverUri)????????server??????}?else?{????????null??????}??????val?metricsSystem?=?if?(isDriver)?{????????????????????????MetricsSystem.createMetricsSystem("driver",?conf,?securityManager)????}?else?{????????????????????????conf.set("spark.executor.id",?executorId)??????val?ms?=?MetricsSystem.createMetricsSystem("executor",?conf,?securityManager)??????ms.start()??????ms????}??????????????????val?sparkFilesDir:?String?=?if?(isDriver)?{??????Utils.createTempDir(Utils.getLocalDir(conf),?"userFiles").getAbsolutePath????}?else?{??????"."????}??????????if?(conf.contains("spark.cache.class"))?{??????logWarning("The?spark.cache.class?property?is?no?longer?being?used!?Specify?storage?"?+????????"levels?using?the?RDD.persist()?method?instead.")????}??????val?outputCommitCoordinator?=?mockOutputCommitCoordinator.getOrElse?{??????new?OutputCommitCoordinator(conf)????}????val?outputCommitCoordinatorActor?=?registerOrLookup("OutputCommitCoordinator",??????new?OutputCommitCoordinatorActor(outputCommitCoordinator))????outputCommitCoordinator.coordinatorActor?=?Some(outputCommitCoordinatorActor)??????new?SparkEnv(??????executorId,??????actorSystem,??????serializer,??????closureSerializer,??????cacheManager,??????mapOutputTracker,??????shuffleManager,??????broadcastManager,??????blockTransferService,??????blockManager,??????securityManager,??????httpFileServer,??????sparkFilesDir,??????metricsSystem,??????shuffleMemoryManager,??????outputCommitCoordinator,??????conf)??}?
總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (十三):sparkEnv类源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。