日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark详解(九):Spark存储原理分析

發布時間:2025/4/16 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark详解(九):Spark存储原理分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 整體架構

Spark存儲介質包括內存和磁盤等。Spakr的存儲采用了主從模式,也就是Master/Slave模式,整個存儲模塊使用了前面介紹的RPC的通信方式。其中,Master負責整個應用程序運行期間的數據塊元數據的管理和維護,而Slave一方面負責本地數據塊的狀態信息上報給Master,另一方面接受從Master傳來的執行命令,如獲取數據塊狀態、刪除RDD/數據塊等命令。在每個Slave中存在數據傳輸通道,根據需要在Slave之間進行遠程數據的讀取和寫入

(1)在應用程序啟動后,SparkContext會創建Driver端的SpakrEnv,在該SparkEnv中實列化BlockManager和BlockManagerMaster,在BlockManagerMaster內部創建消息通信的終端點BlockManagerMasterEndPoint

在Executor啟動時候,也會創建SparkEnv,在該SparkEnv中實列化BlockManger和負責數據傳輸服務的BlockTransferService終端點的引用。在BlockManger初始化的過程中,一方面會加入BlockManagerMasterEndpoint終端點的引用,另一方面會創建Executor消息通信BlockManagerSlaveEndpoint終端點,并把該終端點的引用注冊到Driver中,這樣Dirver和Executor相互持有通信終端點的引用,可以在應用程序執行過程中進行消息通信。

// 創建遠程數據傳輸服務,使用Nettyval blockTransferService =new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)//創建BlockMangerMaster,如果是Dirver端 在BlockMangerMaster內部,則創建終端點BlockManagerMasterEndpoint// 如果是Executor,則創建BlockManagerSlaveEndpoint的引用val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),conf, isDriver)// NB: blockManager is not valid until initialize() is called later.// 創建BlockManager,如果是Driver端包含BlockManagerMaster,如果是Executor包含的是BlockManagerMaster的引用,另外BlockManager包含了// 數據傳輸服務,當BlockManager調用initalize()方法初始化時真正生效val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,blockTransferService, securityManager, numUsableCores)

其中BlockManger初始化代碼如下,如果是Exeucor創建其消息通信的終端點BlockMangerSlaveEndpoint,并向Driver發送RegisterBlockManger消息,把該Executor的BlockManger和其包含的BlockMangerSlaveEndPoint注冊到BlockManagerMaster中。

def initialize(appId: String): Unit = {// 在Executor中啟動遠程數據傳輸服務blockTransferService,根據配置啟動傳輸服務器BlockTransferService// 該服務器啟動后等待其他節點發送請求blockTransferService.init(this)shuffleClient.init(appId)// 獲取BlockManager的編號blockManagerId = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port)// 獲取Shuffle服務編號,如果啟動外部Shuffle服務,則加入外部Shuffle服務端口信息,// 否則使用BlockManager的編號shuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}// 把Executor的BlockManager注冊到BlockManagerMaster中,啟動包括其終端點BlockMangaerSlaveEndPoint的引用,Master端持有該引用可以向Executor發送信息master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)// Register Executors' configuration with the local shuffle service, if one should exist.// 如果外部Shuffle服務啟動并且為Executor節點,則注冊該外部Shuffle服務if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}}

(2)當寫入、更新或刪除數據完畢后,發送數據塊的最新狀態消息UpdateBlockInfo給BlockMangerMasterEndPoint終端點,由其更新數據塊的元數據。該終端點的元數據存放在BlockMangerMasterEndPint的3個HashMap中。

// Mapping from block manager id to the block manager's information.// 該HashMap中存放了BlockMangerId與BlockMangerInfo的對應,其中BlockMangerInfo包含了Executor內存使用情況、數據塊的使用情況、已被緩存的數據塊和Executor終端點引用// 通過該引用可以向Execuotr發送消息private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]// Mapping from executor ID to block manager ID.// 該HashMap中存放了ExecutorID和BlockMangerID對應列表private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]// Mapping from block id to the set of block managers that have the block.// 該HashMap存放了BlockId和BlockManagerId序列對應的列表,原因在于一個數據塊可能存在多個副本,保持在多個Executor中

在更新數據的元數據時,更新BlockManagerInfo和BlockLocations兩個列表

  • 在處理BokcMangerInfo時,傳入的BlockMangerId、blockId和SotrageLevel等參數,通過這些參數判斷數據的操作是插入、更新還是刪除操作。
  • 在處理blockLoacations,根據blockId判斷blockLocations中是否包含該數據庫。如果包含該數據塊,則根據數據塊的操作,當進行數據更新時,更新數據塊所在的BlockMangerid信息,當進行數據刪除時,則移除該BlockMangerid信息,在刪除過程中判斷數據塊對應的Executor是否為空,如果為空表示在集群中刪除了該數據塊,則在blockLoactions刪除該數據塊信息。

(3)應用程序數據存儲后,在獲取遠程節點數據、獲取RDD執行的首選位置等操作時需要根據數據塊的編號查詢數據塊所處的位置,此時發送GetLoacations或獲取數據塊的位置信息。

(4)Spark提供刪除RDD、數據塊、廣播變量的方式。當數據需要刪除的時候,提交刪除信息給BlockMangerSlaveEndPoint終端點,在該終端點發起刪除操作,刪除操作一方面需要刪除Driver端元數據信息,另一方面需要發送消息通知Executor,刪除對應的物理數據。

首先在SparkContext中調用unpersistRDD方法,在該方法中發送removeRDD消息給BlockMangerMasterEndPoint;然后,該終端點接收到消息時,從blockLocations列表中找出RDD對應的數據存在BlockManagerId列表,查詢完畢之后,更新blockLoactions和blockMangerInfo兩個元數據列表;最后,把獲取的BlockManger列表,發送消息給所在的BlockMangerSlaveEndPoint終端點,通知其刪除該Executor上的RDD,刪除時調用BlockManager的removeRDD方法,刪除Executor上RDD對應的數據塊。

private def removeRdd(rddId: Int): Future[Seq[Int]] = {// First remove the metadata for the given RDD, and then asynchronously remove the blocks// from the slaves.// Find all blocks for the given RDD, remove the block from both blockLocations and// the blockManagerInfo that is tracking the blocks.// 在blockLocations和blockManagerInfo中刪除該RDD的數據元信息// 首先根據RDD編號獲取該RDD存儲的數據塊信息val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)blocks.foreach { blockId =>//然后根據該數據塊的信息找出這些數據塊在blockManagerId中的列表,遍歷這些列表并刪除// BlockManager包含該數據塊的元數據,同時刪除blockLocations對應數據塊的元數據val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))blockLocations.remove(blockId)}// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.// The dispatcher is used as an implicit argument into the Future sequence construction.//最后發送RemoveRDD消息給Executor,通知其刪除RDDval removeMsg = RemoveRdd(rddId)Future.sequence(blockManagerInfo.values.map { bm =>bm.slaveEndpoint.ask[Int](removeMsg)}.toSeq)}

在實際研究存儲首先之前,我們在來看一下Spark存儲模塊之間的關系,如下圖所示,整個模塊中BlockManger時其核心,它不僅提供存儲模塊處理各種存儲方式的讀寫方法,而且為Shuffle模塊提供數據處理等操作。

BlockManger存在與Dirver端和每個Executor中,在Driver端的BlockManger保存了數據的元數據信息,而在Executor的BlockManger根據接受到消息進行操作:

  • 當Executor的BlockManger接受到讀取數據時,根據數據塊所在節點是否為本地使用BlockManger不同的方法進行處理。如果在本地,則直接調用MemeoryStore和DiskStore中的取方法getVlaues/getBytes進行讀取;如果在遠程,則調用BlockTransferService的服務進行獲取遠程數據節點上的數據。
  • 當Executor的BlockManger接收到寫入數據時,如果不需要創建副本,則調用BlockStore的接口方法進行處理,根據數據寫入的存儲模型,決定調用對應的寫入方法。

2. 存儲級別

Spark時基于內存的計算,但是RDD的數據不僅可以存儲到內存中,還可以使用persist或者cache 方顯示的將RDD的數據存儲到內存或者磁盤中。

private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {// TODO: Handle changes of StorageLevel// 如果RDD指定了非NONE的存儲級別,該存儲級別不能被修改if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}// If this is the first time this RDD is marked for persisting, register it// with the SparkContext for cleanups and accounting. Do this only once.// 當RDD原來的存儲級別為None時,可以對RDD進行持久化處理,在處理之前需要先清楚SparkContext中原來的存儲元數據,然后加入該持久信息if (storageLevel == StorageLevel.NONE) {sc.cleaner.foreach(_.registerRDDForCleanup(this))sc.persistRDD(this)}// 當RDD原來的存儲級別為NONE時,把RDD存儲級別修改為傳入的新值storageLevel = newLevelthis}

persist操作時控制操作的一種,它只是改變了原有的RDD的元數據信息,并沒有進行數據的存儲操作操作,正在進行是在RDD的iteratior方法中。對于cache方法而言,它只是persist的特例,即persist的方法參數為MEMORY_ONLY的情況。

在StorageLevel類中,根據useDisk、useMmeory、uesOffHeap、deserialized、replicaiton5個參數的組和。Spakr提供了12中存儲級別的緩存策略,這可以將RDD持久化到內存、磁盤和外部存儲系統,或者是以序列化的方式持久化到內存等,甚至可以在集群中不同節點之間存儲多個副本呢。

self.useDisk = useDisk self.useMemory = useMemory self.useOffHeap = useOffHeap self.deserialized = deserialized self.replication = replicationStorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1) 存儲級別描述
NONE不進行數據存儲
MEMORY_ONLY將RDD作為反序列化的對象存儲JVM中。如果RDD不能被內存裝下,一些分區將不會被緩存,并且在需要的時候被重新計算。這是默認的存儲級別
MEMORY_AND_DISK將RDD作為反序列化的對象存儲到JVM中。如果RDD不能內存裝下,超出的分區將被保存在磁盤上,并且在需要的時候被讀取
MEMORY_ONLY_SER將RDD作為序列化的對象進行存儲
MEMORY_AND_DISK_SER與MEMORY_ONLY_SER類似,但是把超出的內存部分分區將存儲到硬盤中而不是每次需要的時候進行重新計算
DISK_ONLY只將RDD分區存儲到硬盤上
DISK_ONLY_2與上述的存儲級別一樣,但是將每個分區都復制到兩個集群之上
OFF_HEAP可以將RDD存儲到分布式文件系統中,如Alluxio

3. RDD存儲調用

RDD 包含多個Partition,每個Partition對應一個或者多個數據塊Block,每個Block,每個Block擁有唯一的編號BlockId,對應于數據塊編號規則為:“rdd”+rddId+"_"+splitIndex,其中splitIndex為該數據塊對應的Partition序號。

首先RDD通過Transfermation操作,比如map獲取flatMap操作,調用RDD構造相應的MapPartitionsRDD。然后通過在提交作業之后,運行相應的Task,執行MapPartitionsRDD的compute方法,在compute方法中調用RDD的iterator方法。

實際發送數據操作是任務執行的時候發生的,RDD調用iterator方法時發生的。在調用過程中,先根據數據塊Block的編號在判斷是否已經按照指定的存儲級別進行存儲,如果存在該數據塊Block,則從本地或遠程節點讀取數據;如果不存在該數據塊Block,則調用RDD的計算方法輸出結果,并把結果按照指定的存儲級別進行存儲。

iterator 函數實現大體是這么個流程:
1 若標記了有緩存,則取緩存,取不到則進行”計算或讀檢查點”。完了再存入緩存,以備后續使用。
2 若未標記有緩存,則直接進行”計算或讀檢查點”。
3 “計算或讀檢查點”這個過程也做兩個判斷:有做過checkpoint,沒有做過checkpoint。做過checkpoint則可以讀取到檢查點數據返回。無則調該rdd的實現類的computer函數計算。computer函數實現方式就是向上遞歸“獲取父rdd分區數據進行計算”,直到遇到檢查點rdd獲取有緩存的rdd。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {// 如果存在存儲級別,嘗試讀取內存的數據進行迭代計算getOrCompute(split, context)} else {// 如果不存在存儲級別,則直接讀取數據進行迭代計算或者讀取檢查點結構進行迭代計算computeOrReadCheckpoint(split, context)}}

其中調用的getOrCompute時方法存儲邏輯的核心。

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {// 通過RDD編號和partition序號獲取數據塊的Block的編號val blockId = RDDBlockId(id, partition.index)var readCachedBlock = true// This method is called on executors, so we need call SparkEnv.get instead of sc.env.// 由于該方法由Executor調用,可以使用SparkEnv代替sc.env// 根據數據塊Block的編號先讀取數據,然后在更新數據,這里是讀寫數據的入口SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {// 如果數據塊不在內存中,則嘗試讀取檢查點結果進行迭代計算readCachedBlock = falsecomputeOrReadCheckpoint(partition, context)}) match {case Left(blockResult) =>// 對getOrElseUpdate返回結果進行處理,該結果表示處理成功,記錄結果度量信息if (readCachedBlock) {val existingMetrics = context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}// 對getOrElseUpdate返回結果進行處理,該結果表示保存失敗,例如數據太大無法放到內存中// 并且也無法保存到磁盤中,把該返回結果給調用者,由其決定如何處理。case Right(iter) =>new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}}

在getOrCompute調用getOrElseUpdate方法,該方法時存儲讀寫數據的入口點。

def getOrElseUpdate[T](blockId: BlockId,level: StorageLevel,classTag: ClassTag[T],makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {// Attempt to read the block from local or remote storage. If it's present, then we don't need// to go through the local-get-or-put path.// 嘗試從本地或遠程存儲中讀取塊。 如果它存在,那么我們不需要通過local-get-or-put路徑。get[T](blockId)(classTag) match {case Some(block) =>return Left(block)case _ =>// Need to compute the block.}// Initially we hold no locks on this block.// 寫數據的入口doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {case None =>// doPut() didn't hand work back to us, so the block already existed or was successfully// stored. Therefore, we now hold a read lock on the block.val blockResult = getLocalValues(blockId).getOrElse {// Since we held a read lock between the doPut() and get() calls, the block should not// have been evicted, so get() not returning the block indicates some internal error.releaseLock(blockId)throw new SparkException(s"get() failed for block $blockId even though we held a lock")}// We already hold a read lock on the block from the doPut() call and getLocalValues()// acquires the lock again, so we need to call releaseLock() here so that the net number// of lock acquisitions is 1 (since the caller will only call release() once).releaseLock(blockId)Left(blockResult)case Some(iter) =>// The put failed, likely because the data was too large to fit in memory and could not be// dropped to disk. Therefore, we need to pass the input iterator back to the caller so// that they can decide what to do with the values (e.g. process them without caching).Right(iter)}}

4. 讀數據的過程

BlockManager的get方法是讀數據的入口點,在讀取時分為本地讀取和遠程節點讀取兩個步驟。本地讀取使用getLocalValues方法,該方法根據不同的存儲級別直接調用不同存儲實現方法;而遠程讀取使用getRemoteValues方法,在getRemoteVulaes方法中調用了GetRemoteBytes方法,在方法中調用遠程數據傳輸類BLockTransferService的fetchBlockSync進行處理,使用Netty的fetchBlocks方法讀取數據,整個數據讀取類調用如下:

5.寫數據過程

前面分析當中,我們了解到BlockManger的doPutIterator方法是寫數據的入口。在該方法中,根據數據是否緩存到內存中進行處理。如果不緩存到內存中,則調用BlockManager的putIterator方法直接存儲磁盤;如果緩存到內存中,則先判斷數據存儲級別是否進行了反序列化。如果設置了反序列化,則說明獲取的數據為值類型,調用putIteratorAsVaules方法把數據存入內存;如果沒有設置反序列化,則獲取的數據為字節類型,調用putIteratorAsBytes方法把數據放入內存中。在把數據存入內存中的時候,則需要判斷內存中展開數據大小是否滿足,當足夠調用BlockManger的putArray方法寫入內存,否則把數據寫入到磁盤中。

在寫入數據完成的時候,一方面吧數據塊的元數據發送給Driver端的BlockMangerMasterEndPoint終端點,請求其更新數據元數據,另一方面判斷是否需要創建數據副本,如果需要調用replicate方法,把數據寫到遠程節點上,類似于讀取遠程節點的數據,Spark提供Netty方式寫入數據。

總結

以上是生活随笔為你收集整理的Spark详解(九):Spark存储原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。