日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

内存参数 计算_Spark统一内存管理的实现

發(fā)布時間:2023/12/10 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 内存参数 计算_Spark统一内存管理的实现 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

本文從源碼角度分析spark統(tǒng)一內(nèi)存管理的實(shí)現(xiàn)原理。

統(tǒng)一內(nèi)存管理對象的創(chuàng)建

統(tǒng)一內(nèi)存管理對象在SparkEnv中進(jìn)行創(chuàng)建和管理,這樣內(nèi)存管理就在Driver和Executor端中都可以使用。在SparkEnv的create函數(shù)中,創(chuàng)建內(nèi)存管理對象的實(shí)現(xiàn)代碼如下:

? ?val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) ?val memoryManager: MemoryManager = ? ? ?if (useLegacyMemoryManager) { ? ? ? ?new StaticMemoryManager(conf, numUsableCores) ? ? } else { // spark2默認(rèn)使用統(tǒng)一內(nèi)存管理模式,所以執(zhí)行這里 ? ? ? ?UnifiedMemoryManager(conf, numUsableCores) ? ? }

從以上代碼片段可知,使用靜態(tài)內(nèi)存管理還是統(tǒng)一內(nèi)存管理,是由參數(shù):spark.memory.useLegacyMode決定的。從spark-2.0開始默認(rèn)都是使用統(tǒng)一內(nèi)存管理,一般不會修改該參數(shù)。

所以,一般情況下,默認(rèn)會創(chuàng)建統(tǒng)一內(nèi)存管理:UnifiedMemoryManager對象。這幾個對象之間的關(guān)系,如圖1所示:

? ? ? ? ? ? ? ? ? ? ? ? ?圖1 內(nèi)存管理對象和SparkContext

統(tǒng)一內(nèi)存管理初始化

在創(chuàng)建統(tǒng)一內(nèi)存管理對象時,會進(jìn)行初始化操作。為了便于管理和分配內(nèi)存,在初始化初始化時會把內(nèi)存分成幾個部分:預(yù)留內(nèi)存,用戶內(nèi)存,執(zhí)行和存儲內(nèi)存。

統(tǒng)一內(nèi)存管理對象初始化時的主要步驟如下:

(1)計算JVM可用的最大內(nèi)存,保存在變量:systemMemory中,默認(rèn)從參數(shù)spark.testing.memory獲取值但一般不設(shè)置,所以會獲取:Runtime.getRuntime.maxMemory的值。

(2)計算需要預(yù)留的內(nèi)存數(shù):reservedMemory,先取參數(shù):spark.testing.reservedMemory的值,但一般不設(shè)置,此時使用默認(rèn)值:300M。

(3)計算系統(tǒng)使用內(nèi)存的最小值,它是預(yù)留內(nèi)存的1.5倍,也就是:minSystemMemory=reservedMemory * 1.5,若系統(tǒng)使用內(nèi)存比這個值小:systemMemory < minSystemMemory,則報錯:請增加spark.driver.memory的值。

(4)獲取executor的內(nèi)存值:val executorMemory = conf.getSizeAsBytes("spark.executor.memory"),若executorMemory < minSystemMemory,則報錯:請增加spark.executor.memory的值。

(5)計算系統(tǒng)可用內(nèi)存的總量,系統(tǒng)內(nèi)存-預(yù)留內(nèi)存,得到spark可以使用的總內(nèi)存:usableMemory = systemMemory - reservedMemory

(6)計算任務(wù)執(zhí)行和存儲可用內(nèi)存總量。計算公式是:usableMemory * memoryFraction。其中memoryFraction是一個小數(shù),是配置項spark.memory.fraction的值,默認(rèn)值是0.6。

(7)最大可用內(nèi)存已經(jīng)計算出來了,此時可以創(chuàng)建UnifiedMemoryManager對象了,代碼如下:

? ? new UnifiedMemoryManager( ? ? ?conf, ? ? ?maxHeapMemory = maxMemory, ? ? ?onHeapStorageRegionSize = ? ? ? (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, ? ? ?numCores = numCores)

從創(chuàng)建統(tǒng)一內(nèi)存管理對象的代碼中可以看出,默認(rèn)情況下任務(wù)的執(zhí)行內(nèi)存和存儲內(nèi)存是個占50%。可以通過參數(shù)spark.memory.storageFraction來調(diào)整執(zhí)行內(nèi)存和存儲內(nèi)存的占比。

完成統(tǒng)一內(nèi)存初始化后,內(nèi)存的劃分情況如圖2所示:

? ? ? ? ? ? ? ? ? ? ? ? ? ? 圖2 統(tǒng)一內(nèi)存初始化內(nèi)存分布

統(tǒng)一內(nèi)存管理的實(shí)現(xiàn)

前面已經(jīng)說明,統(tǒng)一內(nèi)存管理是在UnifiedMemoryManager類中實(shí)現(xiàn)的。下面我們來分析統(tǒng)一內(nèi)存管理的實(shí)現(xiàn)邏輯。

該類的聲明如下:

?private[spark] class UnifiedMemoryManager private[memory] ( ? ?conf: SparkConf, ? ?val maxHeapMemory: Long, ? ?onHeapStorageRegionSize: Long, ? ?numCores: Int)

統(tǒng)一內(nèi)存管理為spark提供了靈活使用內(nèi)存的機(jī)制。它把一塊大的可使用的內(nèi)存分成執(zhí)行內(nèi)存和存儲內(nèi)存。執(zhí)行內(nèi)存主要被Executor在執(zhí)行任務(wù)時使用,而存儲內(nèi)存主要用來存儲數(shù)據(jù)塊。

該類的成員變量說明如下

  • onHeapStorageRegionSize堆內(nèi)內(nèi)存區(qū)的大小,以字節(jié)為單位。該內(nèi)存區(qū)不是靜態(tài)保留的; 執(zhí)行器可以在必要時進(jìn)行借用。僅當(dāng)實(shí)際存儲內(nèi)存使用量超過此區(qū)域時,才能清除緩存塊。

  • maxHeapMemory最大可用堆內(nèi)存。該成員變量是通過函數(shù)getMaxMemory計算而來的,具體的計算方法見下面的分析。

  • numCores核數(shù)。

獲取執(zhí)行內(nèi)存

在執(zhí)行當(dāng)前任務(wù)內(nèi)存不足時會需要申請執(zhí)行內(nèi)存。申請內(nèi)存的過程可能會向存儲內(nèi)存池(StorageMemoryPool)借用一部分內(nèi)存,并把這部分內(nèi)存添加到執(zhí)行內(nèi)存池(ExecutionMemoryPool)中。能夠向存儲內(nèi)存池借用內(nèi)存必須滿足以下條件之一:

(1)存儲池的空閑內(nèi)存大于0;

(2)存儲是否已經(jīng)借用了執(zhí)行池的內(nèi)存。通過:存儲內(nèi)存池目前的大小減去初始化設(shè)置的存儲內(nèi)存池的大小是否大于0來進(jìn)行判斷,也就是計算storagePool.poolSize - storageRegionSize是否大于0。若大于0(已借用)表示可以分配。

在借用存儲內(nèi)存時,可能會把存儲池中的內(nèi)存釋放一部分,若這部分內(nèi)存的rdd設(shè)置了useDisk級別,還會把這些內(nèi)存的數(shù)據(jù)寫入磁盤,否則,這些內(nèi)存中的存儲數(shù)據(jù)就丟失了。

內(nèi)存塊的釋放是在MemoryStore對象中完成(后面的文章會詳細(xì)分析這實(shí)現(xiàn)),官方文檔中提到過,釋放老的內(nèi)存塊的算法是LRU(最近最少使用),這是由于在MemoryStore中內(nèi)存塊是以LinkedHashMap的結(jié)構(gòu)組織的,在鏈表的頭部就是“最近最少使用”的內(nèi)存塊。這部分內(nèi)容在分析MemoryStore的實(shí)現(xiàn)時再繼續(xù)講解。

下面分析獲取執(zhí)行內(nèi)存操作的實(shí)現(xiàn)邏輯。

acquireExecutionMemory函數(shù)

在統(tǒng)一內(nèi)存管理中實(shí)現(xiàn)獲取執(zhí)行內(nèi)存的函數(shù)是:acquireExecutionMemory。該函數(shù)的原型如下:

? ?override private[memory] def acquireExecutionMemory( ? ? ?numBytes: Long, ? ? ?taskAttemptId: Long, ? ? ?memoryMode: MemoryMode): Long = synchronized {...}

該函數(shù)嘗試為目前的執(zhí)行任務(wù)獲取numBytes執(zhí)行內(nèi)存。對于該函數(shù)需要注意以下幾點(diǎn):

(1)它嘗試獲取numBytes字節(jié)大小的內(nèi)存,返回能夠獲取的字節(jié)數(shù),若返回0,則表示無法分配內(nèi)存;

(2)它是同步函數(shù),所以當(dāng)有多個任務(wù)調(diào)用該函數(shù)時可能會阻塞,直到有足夠的內(nèi)存,這樣做是為了在把數(shù)據(jù)進(jìn)行持久化之前,讓每個任務(wù)都有機(jī)會獲取到1/2N的內(nèi)存(其中N是運(yùn)行的任務(wù)數(shù))。

(3)當(dāng)老的任務(wù)占用很多內(nèi)存,而新任務(wù)數(shù)又不斷增加時,阻塞就可能會發(fā)生。

實(shí)現(xiàn)邏輯

獲取執(zhí)行內(nèi)存操作的實(shí)現(xiàn)邏輯如下:

(1)根據(jù)參數(shù)memoryMode的值來選擇操作:若是堆內(nèi)模式(ON_HEAP),獲取堆內(nèi)的執(zhí)行和存儲池總量和堆內(nèi)可用存儲內(nèi)存總量,以及總的堆內(nèi)內(nèi)存大小。若是堆外模式(OFF_HEAP),獲取堆外的執(zhí)行和存儲池總量和堆外可用存儲內(nèi)存總量,以及總的堆外內(nèi)存大小。這一步的代碼實(shí)現(xiàn)如下:

? ? ?val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { ? ? ?// 堆內(nèi)模式 ? ? ?case MemoryMode.ON_HEAP => ( ? ? ? ?onHeapExecutionMemoryPool, ? ? ? ?onHeapStorageMemoryPool, ? ? ? ?onHeapStorageRegionSize, ? ? ? ?maxHeapMemory) ? ? ?// 堆外模式 ? ? ?case MemoryMode.OFF_HEAP => ( ? ? ? ?offHeapExecutionMemoryPool, ? ? ? ?offHeapStorageMemoryPool, ? ? ? ?offHeapStorageMemory, ? ? ? ?maxOffHeapMemory) ? }

(2)判斷是否需要增加執(zhí)行內(nèi)存池(ExecutionPool)。當(dāng)執(zhí)行內(nèi)存池中空閑內(nèi)存量小于需要申請的內(nèi)存量時,則會嘗試增加執(zhí)行池。嘗試增加執(zhí)行池的過程,本質(zhì)上就是向存儲池StorageMemoryPool借用內(nèi)存的過程。能夠成功借用存儲池的內(nèi)存,需要滿足以下兩個條件之一:1)存儲池有空閑內(nèi)存;2)存儲池的量大于初始化的量。(也就是說,已經(jīng)向執(zhí)行內(nèi)存池借用了一些內(nèi)存,存儲池大小增加了)

另外,這個過程可能執(zhí)行多次,每次嘗試都必須能夠獲取到一些內(nèi)存,可能會清除掉一些內(nèi)存中的數(shù)據(jù)塊,以防其他任務(wù)在緩存大的數(shù)據(jù)塊和清除數(shù)據(jù)之間進(jìn)行反復(fù)。那么,為什么每次只能清除一些內(nèi)存呢?這是因為在MemoryStore中,內(nèi)存是以MemoryEntry對象來組織和管理的,清理時也是以這個為單位進(jìn)行的,而每個這樣的對象的大小是不同的。

嘗試增加執(zhí)行內(nèi)存池大小的實(shí)現(xiàn)代碼如下:

?def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { ? ? ?if (extraMemoryNeeded > 0) { ? ? ? ?// 可以分配內(nèi)存的條件:1.存儲池有空閑內(nèi)存 或 2.存儲池已經(jīng)借用了執(zhí)行池的內(nèi)存 ? ? ? ?val memoryReclaimableFromStorage = math.max( ? ? ? ? ?storagePool.memoryFree, ? ? ? ? ?storagePool.poolSize - storageRegionSize) ? ? ? ? ? ? ? ?if (memoryReclaimableFromStorage > 0) { ? ? ? ? ?// 通過下面的函數(shù)來釋放存儲內(nèi)存池的內(nèi)存,減少存儲內(nèi)存池的大小。 ? ? ? ? ?val spaceToReclaim = storagePool.freeSpaceToShrinkPool( ? ? ? ? ? ?math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) ? ? ? ? ?// 到這里,說明存儲內(nèi)存池的空間已經(jīng)釋放,這一步只需要減少存儲內(nèi)存池的大小即可 ? ? ? ? ?storagePool.decrementPoolSize(spaceToReclaim) ? ? ? ? ?// 增加執(zhí)行內(nèi)存池大小的量 ? ? ? ? ?executionPool.incrementPoolSize(spaceToReclaim) ? ? ? } ? ? } ? }

要注意的是,執(zhí)行內(nèi)存池將借用的內(nèi)存均勻地分配給活動任務(wù),以限制每個任務(wù)的執(zhí)行內(nèi)存分配。保持這個大于執(zhí)行池大小是很重要的,這不考慮可以通過清除存儲而釋放的潛在內(nèi)存。另外,這個數(shù)量應(yīng)該保持在“maxMemory”以下,以便在任務(wù)中執(zhí)行內(nèi)存分配的公平性,否則,任務(wù)可能占用超過其平均份額的執(zhí)行內(nèi)存。

(3)然后調(diào)用executionPool.acquireMemory來獲取內(nèi)存,該函數(shù)的聲明如下:

?private[memory] def acquireMemory( ? ? ?numBytes: Long, // 想要獲取的內(nèi)存數(shù) ? ? ?taskAttemptId: Long, // 想要獲取內(nèi)存的任務(wù)數(shù) ? ? ?maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, // 一個回調(diào)函數(shù),用來增長內(nèi)存池的大小 ? ? ?computeMaxPoolSize: () => Long = () => poolSize // 回調(diào)函數(shù),用來獲取某個時刻允許獲取內(nèi)存的最大值 ? ? ): Long = lock.synchronized

該函數(shù)嘗試為給定任務(wù)獲取numBytes大小的內(nèi)存,并返回獲取到的內(nèi)存大小。該函數(shù)可能會阻塞,直到有足夠的內(nèi)存再返回。該函數(shù)的執(zhí)行邏輯大致如下:

  • 添加任務(wù)到taskMemory這個map中,該map保存了任務(wù)id和申請的內(nèi)存大小的對應(yīng)關(guān)系。

  • 調(diào)用maybeGrowExecutionPool回調(diào)函數(shù)來向storeage申請內(nèi)存,若內(nèi)存不夠該函數(shù)會釋放掉一些存儲內(nèi)存。一次釋放的內(nèi)存可能不夠,所以該函數(shù)可能會嘗試多次。

  • maybeGrowExecutionPool會調(diào)用memoryStore.evictBlocksToFreeSpace函數(shù),在該函數(shù)中會根據(jù)rdd和內(nèi)存模式等參數(shù)來清除一些內(nèi)存塊,釋放對應(yīng)大小的內(nèi)存,具體的實(shí)現(xiàn)過程在后面分析。

獲取存儲內(nèi)存

獲取存儲內(nèi)存的過程比獲取執(zhí)行內(nèi)存的過程要相對簡單。因為,獲取存儲內(nèi)存時不會強(qiáng)制釋放正在使用的執(zhí)行內(nèi)存,而只能從執(zhí)行池的空閑內(nèi)存中申請。

所以,申請存儲內(nèi)存的步驟主要是以下幾步:

(1)判斷需要申請的內(nèi)存數(shù)量,是否大于存儲池的空閑內(nèi)存量。若大于(存儲池的內(nèi)存量不夠),則向執(zhí)行池的空閑內(nèi)存申請一部分內(nèi)存。要注意:可能執(zhí)行池的空閑內(nèi)存也不夠,或根本就沒有空閑內(nèi)存。

(2)調(diào)用存儲池的內(nèi)存獲取函數(shù)獲取內(nèi)存,若空閑內(nèi)存不夠,則需要從存儲池中按LRU算法釋放一部分內(nèi)存。

獲取存儲內(nèi)存是在函數(shù)acquireStorageMemory中實(shí)現(xiàn),下面我們來分析一下該函數(shù)的具體實(shí)現(xiàn)。

acquireStorageMemory函數(shù)

該函數(shù)的原型如下:

? override def acquireStorageMemory( ? ? ?blockId: BlockId, ? ? ?numBytes: Long, ? ? ?memoryMode: MemoryMode): Boolean = synchronized {...}

該函數(shù)的參數(shù):

  • memoryMode: MemoryMode:該參數(shù)是內(nèi)存的模式,主要有兩種:ON_HEAP或OFF_HEAP。

  • numBytes:需要申請的內(nèi)存大小,單位是bytes

  • blockId:數(shù)據(jù)塊的ID,也是可能會被釋放的數(shù)據(jù)塊。若該id為空,則會通過LRU算法尋找需要釋放塊對應(yīng)的內(nèi)存。

該函數(shù)是一個同步函數(shù),若是多個線程同時調(diào)用該函數(shù),可能會阻塞。

實(shí)現(xiàn)分析

該函數(shù)的主要實(shí)現(xiàn)邏輯如下:

(1)根據(jù)參數(shù)memoryMode來獲取此種模式下的最大可以用存儲內(nèi)存,保存在變量maxMemory中。

(2)判斷內(nèi)存申請量(即參數(shù)numBytes)是否大于maxMemory,若申請內(nèi)存大于最大可用內(nèi)存,會失敗。報錯:該blockid的數(shù)據(jù)塊需要的內(nèi)存超過最大使用內(nèi)存。

(3)若申請的內(nèi)存大小:numBytes大于存儲池的空閑內(nèi)存大小,則需要從執(zhí)行池中“借用”一些空閑內(nèi)存。借用的意思是,從執(zhí)行池的空閑內(nèi)存中獲取一部分內(nèi)存,但要注意:最多從執(zhí)行池中借用空閑內(nèi)存量,不會釋放任務(wù)正在使用的執(zhí)行內(nèi)存。實(shí)現(xiàn)代碼如下:

?if (numBytes > storagePool.memoryFree) { // 所需內(nèi)存量大于可用存儲空閑內(nèi)存量,需要從執(zhí)行池中申請一部分內(nèi)存 ? ? ?val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, ? ? ? ?numBytes - storagePool.memoryFree) //最多獲取執(zhí)行池中空閑的內(nèi)存量大小 ? ? ?executionPool.decrementPoolSize(memoryBorrowedFromExecution) ?// 執(zhí)行內(nèi)存池減少內(nèi)存數(shù) ? ? ?storagePool.incrementPoolSize(memoryBorrowedFromExecution) // 存儲內(nèi)存池增加對應(yīng)內(nèi)存 ? }

注意:這一步是體現(xiàn)統(tǒng)一內(nèi)存思想的重要的一步。

(4)若能夠從執(zhí)行內(nèi)存池中借用成功,這一步就直接在存儲內(nèi)存池中申請內(nèi)存了。代碼很簡單,就是調(diào)用存儲內(nèi)存池的內(nèi)存申請函數(shù):

? ? ?storagePool.acquireMemory(blockId, numBytes)
storagePool#acquireMemory函數(shù)

該函數(shù)來完成存儲池的內(nèi)存申請工作。要注意,此時的存儲池可能有空閑的內(nèi)存,也可能沒有空閑內(nèi)存。當(dāng)存儲池沒有空閑內(nèi)存時,需要把已有的某些數(shù)據(jù)塊從存儲池中清除,以滿足當(dāng)前數(shù)據(jù)塊的存儲需要。

該函數(shù)的實(shí)現(xiàn)邏輯如下:

(1) 計算需要釋放的內(nèi)存量

需要申請的內(nèi)存量減去空閑的內(nèi)存量,就是需要釋放的內(nèi)存量。也就是說,需要從已經(jīng)使用的存儲內(nèi)存塊中釋放一部分內(nèi)存。

? ? val numBytesToFree = math.max(0, numBytes - memoryFree)

(2) 第(1)步已經(jīng)計算出來需要釋放的內(nèi)存量了。下面調(diào)用StorageMemoryPool.acquireMemory函數(shù)來申請內(nèi)存,釋放一定的數(shù)據(jù)塊。該函數(shù)會調(diào)用MemoryStore.evictBlocksToFreeSpace來清除數(shù)據(jù)塊。會被清除的數(shù)據(jù)塊的判斷如下:

?def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { //存儲模式相同,且blockId沒有被RDD占用 或則不是要替換相同RDD的不同數(shù)據(jù)塊 ? ?entry.memoryMode == memoryMode && ? (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))}

若有可以釋放的數(shù)據(jù)塊,還需要獲取一把寫鎖,加鎖的目的是防止目前還有其他的線程在讀該數(shù)據(jù)塊。當(dāng)鎖獲取成功后,就可以開始刪除數(shù)據(jù)塊了,具體的刪除過程是通過blockInfoManager.removeBlock來進(jìn)行的。該函數(shù)會把需要清除的元數(shù)據(jù)和數(shù)據(jù)塊從blockManager中刪除。

釋放內(nèi)存塊:MemoryStore#evictBlocksToFreeSpace函數(shù)

這是MemoryStore類的成員函數(shù),該函數(shù)完成內(nèi)存塊的釋放,若存儲級別包含useDisk還會把內(nèi)存中的數(shù)據(jù)保存到磁盤中。該函數(shù)的原型如下:

? ?private[spark] def evictBlocksToFreeSpace( ? ? ?blockId: Option[BlockId], ? ? ?space: Long, ? ? ?memoryMode: MemoryMode): Long = {

其中的blockId是數(shù)據(jù)塊的id,每個id都對應(yīng)一個內(nèi)存塊。釋放內(nèi)存塊的邏輯如下:

(1)遍歷內(nèi)存塊的隊列。這是一個LinkedHashMap,最后一次被訪問的內(nèi)存塊節(jié)點(diǎn)會放到鏈表的后面,這樣最近沒有被訪問的內(nèi)存塊就在隊列的頭部。

(2)檢查內(nèi)存塊是否可以被釋放。釋放內(nèi)存塊需要滿足以下條件:

1)內(nèi)存塊的模式必須和參數(shù)中memoryMode的值相等;

2)該blockId對應(yīng)的內(nèi)存塊沒有被其他RDD占用,或則不是要替換相同RDD的不同數(shù)據(jù)塊。

(3)若滿足以上兩個條件,就會釋放該內(nèi)存塊。釋放內(nèi)存塊的過程如下:

1)確認(rèn)內(nèi)存塊的寫鎖已經(jīng)鎖上了;

2)通過blockId的信息檢查存儲級別是否包含useDisk,若包含則把內(nèi)存的數(shù)據(jù)寫入到磁盤上。寫入磁盤 的過程是通過DiskStore對象來完成的。

(4)由于實(shí)際的內(nèi)存是通過MemoryStore來管理的,所以,最后一步就是從memoryStore中刪除并釋放blockId對應(yīng)的內(nèi)存塊,并減少M(fèi)emoryStore的內(nèi)存數(shù)量。到此,就完成了內(nèi)存釋放的整個過程。至于MemoryStore是如何釋放內(nèi)存的,會在分析MemoryStore時進(jìn)行分析。

計算可用堆內(nèi)存儲內(nèi)存:maxOnHeapStorageMemory函數(shù)

該函數(shù)用來計算堆內(nèi)可用內(nèi)存,邏輯很簡單:就是使用總的堆內(nèi)存儲內(nèi)存-為執(zhí)行器可分配的堆內(nèi)內(nèi)存:

? override def maxOnHeapStorageMemory: Long = synchronized { // 計算可用堆內(nèi)內(nèi)存 ? maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed }
計算對外存儲內(nèi)存:maxOffHeapStorageMemory函數(shù)

該函數(shù)用來計算可用堆外內(nèi)存:使用總堆外內(nèi)存-為執(zhí)行器分配的堆外內(nèi)存:

? override def maxOffHeapStorageMemory: Long = synchronized { // 計算可用堆外內(nèi)存 ? maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed }

小結(jié)

本文講述了spark統(tǒng)一內(nèi)存管理的實(shí)現(xiàn)原理。從實(shí)現(xiàn)層面來看,Spark的統(tǒng)一內(nèi)存管理都是在UnifiedMemoryManager類中實(shí)現(xiàn)。不管是執(zhí)行還是存儲內(nèi)存不足時,都可以向?qū)Ψ浇栌脙?nèi)存。但內(nèi)存不足時,可以根據(jù)LRU來釋放存儲正在使用的內(nèi)存,但不能釋放執(zhí)行時正在使用的內(nèi)存。

另外,最終的內(nèi)存塊釋放和數(shù)據(jù)塊的持久化是通過MemoryStore,DiskStore以及BlockManager這幾個系統(tǒng)來完成的,這些組件的原理會在后面的文章中繼續(xù)分析。

總結(jié)

以上是生活随笔為你收集整理的内存参数 计算_Spark统一内存管理的实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。