日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...

發布時間:2024/4/14 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本套系列博客從真實商業環境抽取案例進行總結和分享,并給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸作者(秦凱新)所有,禁止轉載,歡迎學習。

  • Spark商業環境實戰-Spark內置框架rpc通訊機制及RpcEnv基礎設施
  • Spark商業環境實戰-Spark事件監聽總線流程分析
  • Spark商業環境實戰-Spark存儲體系底層架構剖析
  • Spark商業環境實戰-Spark底層多個MessageLoop循環線程執行流程分析
  • Spark商業環境實戰-Spark一級資源調度Shedule機制及SpreadOut模式源碼深入剖析
  • Spark商業環境實戰-Spark二級調度系統Stage劃分算法和最佳任務調度細節剖析
  • Spark商業環境實戰-Spark任務延遲調度及調度池Pool架構剖析
  • Spark商業環境實戰-Task粒度的緩存聚合排序結構AppendOnlyMap詳細剖析
  • Spark商業環境實戰-ExternalSorter 外部排序器在Spark Shuffle過程中設計思路剖析
  • Spark商業環境實戰-ShuffleExternalSorter外部排序器在Spark Shuffle過程中的設計思路剖析
  • Spark商業環境實戰-Spark ShuffleManager內存緩沖器SortShuffleWriter設計思路剖析
  • Spark商業環境實戰-Spark ShuffleManager內存緩沖器UnsafeShuffleWriter設計思路剖析
  • Spark商業環境實戰-Spark ShuffleManager內存緩沖器BypassMergeSortShuffleWriter設計思路剖析
  • Spark商業環境實戰-Spark Shuffle 核心組件BlockStoreShuffleReader內核原理深入剖析
  • Spark商業環境實戰-Spark Shuffle 管理器SortShuffleManager內核原理深入剖析
  • Spark商業環境實戰-Spark PersistenceEngine持久化引擎高可用機制內核原理深入剖析
  • Spark商業環境實戰-StreamingContext啟動流程及Dtream 模板源碼剖析
  • Spark商業環境實戰-ReceiverTracker 啟動過程及接收器 receiver RDD 任務提交機制源碼剖析
  • Spark商業環境實戰-SparkStreaming數據流從Batch到Block定時轉化過程源碼深度剖析
  • Spark商業環境實戰-SparkStreaming之JobGenerator周期性任務數據處理邏輯源碼深度剖析
  • [Spark商業環境實戰-SparkStreaming Graph 處理鏈迭代過程源碼深度剖析]
  • [Spark商業環境實戰-JobGenerator 數據清理流程源碼深度剖析]
  • [Spark商業環境實戰-SparkStreaming 容錯機制源碼深度剖析]
  • [Spark商業環境實戰-SparkStreaming 之No Receiver方式基于Kafka 拉取內幕源碼深度剖析]
  • [Spark商業環境實戰-SparkStreaming 反壓機制控制消費速率內幕源碼深度剖析]

1 PersistenceEngine持久化引擎

1.1 PersistenceEngine的啟動

  • 選擇故障恢復機制,主要有ZOOKEEPER 和 FILESYSTEM 。

    private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") 復制代碼
  • PersistenceEngine 的初始化是放在Master的onStart()方法中,用于初始化持久化引擎。

    val serializer = new JavaSerializer(conf)

    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {case "ZOOKEEPER" =>logInfo("Persisting recovery state to ZooKeeper")val zkFactory =new ZooKeeperRecoveryModeFactory(conf, serializer)(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))case "FILESYSTEM" =>val fsFactory =new FileSystemRecoveryModeFactory(conf, serializer)(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))case "CUSTOM" =>val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]).newInstance(conf, serializer).asInstanceOf[StandaloneRecoveryModeFactory](factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))case _ =>(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}persistenceEngine = persistenceEngine_leaderElectionAgent = leaderElectionAgent_ 復制代碼

1.2 PersistenceEngine的功能

  • PersistenceEngine主要用于當Master發生故障時,來讀取持久化的Application,Worker,Driver的詳細信息。
  • PersistenceEngine同樣負責寫入持久化Application,Worker,Driver的詳細信息。

(1)PersistenceEngine 的調用時機:

  • 在新的Application注冊之前。
  • 在新的Worker注冊之前。
  • 在removeApplication和removeWorker方法被調用的時候

舉例如下:

persistenceEngine.removeWorker(worker) 復制代碼

1.3 PersistenceEngine的抽象模板,也即調用時機

abstract class PersistenceEngine {/*** Defines how the object is serialized and persisted. Implementation will* depend on the store used.*/def persist(name: String, obj: Object): Unit/*** Defines how the object referred by its name is removed from the store.*/def unpersist(name: String): Unit/*** Gives all objects, matching a prefix. This defines how objects are* read/deserialized back.*/def read[T: ClassTag](prefix: String): Seq[T]final def addApplication(app: ApplicationInfo): Unit = {persist("app_" + app.id, app)}final def removeApplication(app: ApplicationInfo): Unit = {unpersist("app_" + app.id)}final def addWorker(worker: WorkerInfo): Unit = {persist("worker_" + worker.id, worker)}final def removeWorker(worker: WorkerInfo): Unit = {unpersist("worker_" + worker.id)}final def addDriver(driver: DriverInfo): Unit = {persist("driver_" + driver.id, driver)}final def removeDriver(driver: DriverInfo): Unit = {unpersist("driver_" + driver.id)}/*** Returns the persisted data sorted by their respective ids (which implies that they're* sorted by time of creation).*/final def readPersistedData(rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {rpcEnv.deserialize { () =>(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))}}def close() {} } 復制代碼

1.4 PersistenceEngine 的基于文件系統持久化和基于Zookeeper的持久化

  • 基于文件系統持久化FileSystemPersistenceEngine

    private def serializeIntoFile(file: File, value: AnyRef) {val created = file.createNewFile()if (!created) { throw new IllegalStateException("Could not create file: " + file) }val fileOut = new FileOutputStream(file)var out: SerializationStream = nullUtils.tryWithSafeFinally {out = serializer.newInstance().serializeStream(fileOut)out.writeObject(value)} {fileOut.close()if (out != null) {out.close()}}} 復制代碼
  • 基于Zookeeper的持久化ZooKeeperPersistenceEngine

    Curator是Netflix公司開源的Zookeeper客戶端,注意這里會把ApplicationInfo,WorkerInfo,DriverInfo等數據通過ZooKeeperPersistenceEngine將數據存儲到Zookeeper的不同Znode節點上。

    這里Zookeeper能撐得住嗎??疑問

    private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)private def serializeIntoFile(path: String, value: AnyRef) {val serialized = serializer.newInstance().serialize(value)val bytes = new Array[Byte](serialized.remaining())serialized.get(bytes)zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)} 復制代碼

2 領導選舉機制

所謂選舉機制就是注冊監聽機制,一旦監聽到Master掛了,就會進行回調監聽。

主要有:

  • ZooKeeperLeaderElectionAgent
  • MonarchyLeaderAgent

接下來主要以ZooKeeperLeaderElectionAgent為例:

2.1 借雞生蛋的道理

  • 通過/leader_election這個目錄進行監聽:

    val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"private def start() {logInfo("Starting ZooKeeper LeaderElection agent")zk = SparkCuratorUtil.newClient(conf)leaderLatch = new LeaderLatch(zk, WORKING_DIR)leaderLatch.addListener(this)leaderLatch.start()}private def updateLeadershipStatus(isLeader: Boolean) {if (isLeader && status == LeadershipStatus.NOT_LEADER) {status = LeadershipStatus.LEADERmasterInstance.electedLeader()} else if (!isLeader && status == LeadershipStatus.LEADER) {status = LeadershipStatus.NOT_LEADERmasterInstance.revokedLeadership()}} 復制代碼
  • 通過監聽/leader_election對應目錄來進行選舉

    override def isLeader() {synchronized {// could have lost leadership by now.if (!leaderLatch.hasLeadership) {return}logInfo("We have gained leadership")updateLeadershipStatus(true)}}override def notLeader() {synchronized {// could have gained leadership by now.if (leaderLatch.hasLeadership) {return}logInfo("We have lost leadership")updateLeadershipStatus(false)}} 復制代碼

3 Master 在選舉中要做什么

Master自己給自己發送消息,開始進行恢復操作:

  • Master繼承了LeaderElectable,因此實現了electedLeader方法:

    override def electedLeader() {self.send(ElectedLeader) } 復制代碼
  • Master 的行動beginRecovery和CompleteRecovery

    override def receive: PartialFunction[Any, Unit] = {case ElectedLeader =>val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {RecoveryState.ALIVE} else {RecoveryState.RECOVERING}logInfo("I have been elected leader! New state: " + state)if (state == RecoveryState.RECOVERING) {beginRecovery(storedApps, storedDrivers, storedWorkers) <=神來之筆recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(CompleteRecovery) <=神來之筆}}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)} 復制代碼
  • Master 的行動beginRecovery

    private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],storedWorkers: Seq[WorkerInfo]) {for (app <- storedApps) {logInfo("Trying to recover app: " + app.id)try {registerApplication(app)app.state = ApplicationState.UNKNOWNapp.driver.send(MasterChanged(self, masterWebUiUrl))} catch {case e: Exception => logInfo("App " + app.id + " had exception on reconnect")}} 復制代碼
  • Master 的行動completeRecovery

    private def completeRecovery() {// Ensure "only-once" recovery semantics using a short synchronization period.if (state != RecoveryState.RECOVERING) { return }state = RecoveryState.COMPLETING_RECOVERY// Kill off any workers and apps that didn't respond to us.workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_, "Not responding for recovery"))apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)// Update the state of recovered apps to RUNNINGapps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)// Reschedule drivers which were not claimed by any workersdrivers.filter(_.worker.isEmpty).foreach { d =>logWarning(s"Driver ${d.id} was not found after master recovery")if (d.desc.supervise) {logWarning(s"Re-launching ${d.id}")relaunchDriver(d)} else {removeDriver(d.id, DriverState.ERROR, None)logWarning(s"Did not re-launch ${d.id} because it was not supervised")}} 復制代碼

4 總結

秦凱新 于深圳

總結

以上是生活随笔為你收集整理的Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。