日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Shuffle Write阶段磁盘文件分析

發布時間:2024/1/23 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Shuffle Write阶段磁盘文件分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

流程分析

入口處:

org.apache.spark.scheduler.ShuffleMapTask.runTask


override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0L var writer: ShuffleWriter[Any, Any] = null try {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e} }

這里manager 拿到的是

先看private[spark] trait ShuffleManager? 是一個接口,

SortShuffleManager實現了該接口。

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging

org.apache.spark.shuffle.sort.SortShuffleWriter

我們看他是如何拿到可以寫磁盤的那個sorter的。

override def getWriter[K, V](handle: ShuffleHandle,mapId: Int,context: TaskContext): ShuffleWriter[K, V] = {numMapsForShuffle.putIfAbsent(handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)val env = SparkEnv.get handle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],bypassMergeSortHandle,mapId,context,env.conf)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)} } 這里case了2種情況:/** * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the * serialized shuffle. 是否序列化 */ private[spark] class SerializedShuffleHandle[K, V](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, V])extends BaseShuffleHandle(shuffleId, numMaps, dependency) { }/** * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the * bypass merge sort shuffle path. 繞過歸并排序的shuffle路徑。 */ private[spark] class BypassMergeSortShuffleHandle[K, V](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, V])extends BaseShuffleHandle(shuffleId, numMaps, dependency) { }這里再看看BaseShuffleHandle /** * A basic ShuffleHandle implementation that just captures registerShuffle's parameters. */ private[spark] class BaseShuffleHandle[K, V, C](shuffleId: Int,val numMaps: Int,val dependency: ShuffleDependency[K, V, C])extends ShuffleHandle(shuffleId)
繼續看abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}是一個抽象類,實現了序列化。

繼續new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
類的定義:private[spark] class SortShuffleWriter[K, V, C](shuffleBlockResolver: IndexShuffleBlockResolver,handle: BaseShuffleHandle[K, V, C],mapId: Int,context: TaskContext)extends ShuffleWriter[K, V] with Logging然后write操作
/** Write a bunch of records to this task's output */ 一串 bunch override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}} } 我們分析的線路假設需要做mapSideCombine sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C](dep.aggregator, Some(dep.partitioner), dep.keyOrdering, de.serializer)

接著將map的輸出放到sorter當中:

sorter.insertAll(records) //備注一下sorter位置 //private var sorter: ExternalSorter[K, V, _] = null //import org.apache.spark.util.collection.ExternalSorter
def insertAll(records: Iterator[Product2[K, V]]): Unit = {// TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefinedif (shouldCombine) {// Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValueval createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => {if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}while (records.hasNext) {addElementsRead()kv = records.next()map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}} else {// Stick values into our buffer while (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}} }

其中insertAll 的流程是這樣的:

while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}

private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 // Size of object batches when reading/writing from serializers. // // Objects are written in batches, with each batch using its own serialization stream. This // cuts down on the size of reference-tracking maps constructed when deserializing a stream. // // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers // grow internal data structures by growing + copying every time the number of objects doubles. private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)// Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. @volatile private var map = new PartitionedAppendOnlyMap[K, C] @volatile private var buffer = new PartitionedPairBuffer[K, C]

里面的map 其實就是PartitionedAppendOnlyMap,這個是全內存的一個結構。當把這個寫滿了,才會觸發spill操作。你可以看到maybeSpillCollection在PartitionedAppendOnlyMap每次更新后都會被調用。

一旦發生呢個spill后,產生的文件名稱是:

"temp_shuffle_" + id

邏輯在這:

val (blockId, file) = diskBlockManager.createTempShuffleBlock() def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId))}

產生的所有 spill文件被被記錄在一個數組里:

private val spills = new ArrayBuffer[SpilledFile]

迭代完一個task對應的partition數據后,會做merge操作,把磁盤上的spill文件和內存的,迭代處理,得到一個新的iterator,這個iterator的元素會是這個樣子的:

(p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator,ordering.isDefined))

其中p 是reduce 對應的partitionId, p對應的所有數據都會在其對應的iterator中。

接著會獲得最后的輸出文件名:

val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

文件名格式會是這樣的:

"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"

其中reduceId 是一個固定值NOOP_REDUCE_ID,默認為0。

然后開始真實寫入文件

val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)

寫入文件的過程過程是這樣的:

for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId,outputFile, serInstance,fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length } }

剛剛我們說了,這個 this.partitionedIterator 其實內部元素是reduce partitionID -> 實際record 的 iterator,所以它其實是順序寫每個分區的記錄,寫完形成一個fileSegment,并且記錄偏移量。這樣后續每個的reduce就可以根據偏移量拿到自己需要的數據。對應的文件名,前面也提到了,是:

"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"

剛剛我們說偏移量,其實是存在內存里的,所以接著要持久化,通過下面的writeIndexFile來完成:

shuffleBlockResolver.writeIndexFile(dep.shuffleId,mapId, partitionLengths)

具體的文件名是:

"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"

至此,一個task的寫入操作完成,對應一個文件。

最終結論

所以最后的結論是,一個Executor 最終對應的文件數應該是:

MapNum (注:不包含index文件)

同時持有并且會進行寫入的文件數最多為::

CoreNum
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Spark Shuffle Write阶段磁盘文件分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。