Spark详解(十):SparkShuffle机制原理分析
1. Spark Shuffle簡介
在Hadoop的MapReduce框架中Shuffle是連接Map和Reduce的橋梁,Map的輸出要用到Reduce中必須經過Shuffle這個環節。由于Shuffle階段涉及到磁盤的讀寫和網絡傳輸,因此Shuffle的性能高低直接影響到整個程序的性能和吞吐量。
Spakr Shuffle定義:Shuffle在中文的意思是“洗牌、混洗”的意思,在MapReduce過程中需要各個節點上的同一個類型數據匯集到某一節點中進行計算,把這些分布在不同節點的數據按照一定的規則匯集到一起的過程稱為Shuffle。
但是在Spark Shuffle中存在如下問題:
- 數據量非常大,達到TB甚至PB級別。這些數據分散到數百臺甚至數千臺的集群中運行,如果管理為后續的任務創建數據眾多的文件,以及處理大小超過內存的數據量呢?
- 如果對結果進行序列化和反序列化,以及傳輸之前如何進行壓縮呢?
2. Shuffle的寫操作
Spark在Shuffle的處理方式是一個迭代的過程,從最開始避免Hadoop多余的排序(即在Reduce之前獲取的數據經過排序),提供了基于哈希的Shuffle寫操作,但是這種方式在Map和Reduce的數量較大的情況下寫文件的數量大和緩存開銷過大的問題。為了解決這個問題,在Spark1.2版本中默認的Shuffle寫替換為基于排序的Shuffle寫,該操作會把所有的結果寫入到一個文件中,同時生成一個索引文件進行定位。
2.1 基于哈希的Shuffle寫過程
在Spark1.0之間實現的是基于哈希的Shuffle寫過程。在該機制中每個Mapper會根據Reduce的數量創建相應的Bucket,bucket的數據是M*R,其中M是Map的個數,R是Reduce的個數;Mapper生成的結果會根據設置地Partition算法填充到每個bucket中,這里的bucket是一個抽象的概念,在該機制中每一個bucket是一個文件;當Reduce啟動時,會根據任務的編號和鎖依賴的Mapper的編號從遠程或者本地取得相應的bucket作為Reduce的輸入進行處理,其處理流程如圖所示:
相比較于傳統的MapReduce,Spark假定大多數情況下Shuffle的數據排序是沒有必要的,比如WordCount,強制進行排序只能使得性能變差,因此Spark并不在Reduce端進行Merge Sort,而是使用聚合(Aggerator)。聚合實際上是一個HashMap,它以當前任務輸出結果作為Key的鍵值,以任意要combine類型為值,當在Word Count的Reduce進行單詞統計的時候,它會將Shuffle讀到的每一個鍵值對更新或者插入到HashMap中。這樣就不需要預先對所有的鍵值對進行mergeSort,而是來一個處理一個,省下了外部排序這個過程。
在HashShuffleWriter的writer方法中,通過ShuffleDependency是否定義了Aggregatror判斷是否需要在Map端對數據進行聚合操作,如果需要對數據進行聚合處理。然后調用ShuffleWriterGroup的writers方法得到一個DiskBlockObjectWriter對象,調用該對象的writer方法寫入。
2.2 基于排序的Shuffle寫操作
基于Hash的Shuffle寫操作能夠較好的完成Shuffle的數據寫入,但是存在兩大問題:
- 每個Shuffle Map Task作為后續的任務創建一個單獨的文件,因此在運行過程中文件的數量是M*R。這對于文件系統來說是一個很大的負擔,同時shuffle數據量不大而文件非常多的情況,隨機寫入會嚴重降低I/O的性能。
- 雖然Shuffle寫數據不需要存儲在內存再寫到磁盤,但是DiskBlockObjectWriter所帶來的開銷也是一個不容小視的內存開銷。
為了緩解Shuffle過程中產生過多的文件和Writer Handler的緩存開銷過大的問題,在SPark 1.1 版本中解決了Hadoop在Shuffle中的處理方式,引入了基于排序的Shuffle寫操作機制。在該機制中,每個Shuffle Map Task不會為后續的每個任務創建單獨的文件,而是會將所有的結果寫入到同一個文件中,對應生成一個Index文件進行索引。通過這種機制避免了大量文件的產生,一方面可以降低文件系統管理眾多文件的開銷,另一方面可以減少Writer Handler緩存所占用的內存大小,節省了內存同時避免了GC的風險和頻率。
前面我知道基于哈希的Shuffle寫操作輸出結果是放在HashMap中,沒有經過排序,但是對于一些如groupbyKey操作,如果使用HashMap,則需要將所有的鍵值對放入HashMap中并且將值合并成一個數組。可以想象未來能夠放下所欲數據,必須確保每一個Partition足夠小,并且內存能夠放下,這對于內存來說是一個很大的挑戰。為了減少內存的使用,可以將Aggregator的操作從內存轉移到磁盤中,在結束的時候再將這些不同的文件進行歸并排序,從而減少內存的使用量。
對于Shuffle的寫操作,主要是在SortShuffleWriter的write方法。在該方法中,首先判斷輸出結果在Map端是否需要合并(Combine), 如果需要合并,則外部排序中進行聚合并排序;如果不需要,則外部排序中不進行聚合和排序,例如sortByKey操作在Reduce端會進行聚合并排序。確認外部排序方式后,在外部排序中將使用PartitionedAppendOnlyMap來存放數據,當排序中的Map占用的內存已經超越了使用的閾值,則將Map中的內容溢寫到磁盤中,每一次溢寫產生一個不同的文件,當所有數據處理完畢后,在外部排序中有可能一部分計算結果在內存中,另一部分計算結果溢寫到一或多個文件中,這時通過merge操作將內存和spill文件中的內容合并整到一個文件中。
SortShuffleWriter的write方法代碼如下:
/** Write a bunch of records to this task's output */override def write(records: Iterator[Product2[K, V]]): Unit = {// 獲取Shuffle Map Task 的輸出結果的排序方式sorter = if (dep.mapSideCombine) {// 當輸出結果需要Combine,那么外部排序算法進行聚合require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is sortByKey.// 在這種情況下,我們既沒有將聚合器也沒有傳遞給排序器,因為我們不關心Key是否在每個分區中排序;// 如果正在運行的操作是sortByKey,那么將排序階段將在reduce端完成。// 其他情況下,當輸出結果不需要進行Combine操作,那么Shuffle Write將不進行聚合排序操作new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}// 根據獲取的排序方式,對數據進行排序并且寫入到內存緩沖區中。如果排序中的Map占用的內存// 已經超越了閾值,則將Map中的內容溢寫到磁盤中,每一次溢寫都將產生一個不同的文件sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).// 通過Shuffle編號和Map編號獲取該數據文件val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {// 通過Shuffle編號和Block編號獲取ShuffleBlock編號val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)// 外部排序中有可能一部分計算結果放在內存中,另一部分計算結果溢寫產生一個或者多個文件之中,// 這個時候通過Merge Sort操作將內存和splil文件的內容整合到一個文件中val partitionLengths = sorter.writePartitionedFile(blockId, tmp)// 創建索引文件,將每個partitoon在數據文件中的起始位置和結束位置寫入到索引文件中shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)// 將元數據信息寫入到MapStatus中,后續的任務可以通過該MapStatus得到處理結果信息mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}}在ExternalSorter的insterAll方法中,先判斷是否需要進行聚合(Aggregation),如果需要,則根據鍵值進行合并(Combine), 然后把這些數據寫入到內存緩沖區中,如果排序中Map占用的內存超過了閾值,則將Map中的內容溢寫到磁盤中,每一次溢寫產生一個不同的文件。如果不需要聚合,把數據排序寫到內存緩沖區。
def insertAll(records: Iterator[Product2[K, V]]): Unit = {// TODO: stop combining if we find that the reduction factor isn't high// 根據外部排序中是否需要進行聚合操作(Aggregator)val shouldCombine = aggregator.isDefinedif (shouldCombine) {// Combine values in-memory first using our AppendOnlyMap// 如果需要聚合,則使用PartitionedAppendOnlyMap根據鍵值進行合并val mergeValue = aggregator.get.mergeValueval createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = nullval update = (hadValue: Boolean, oldValue: C) => {if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}// 對數據進行排序寫入到內存緩沖區中,如果排序中的Map占用的內存以及超越了使用的閾值,// 則對Map中的內容溢寫到磁盤中,每一次溢寫產生一個不同的文件while (records.hasNext) {addElementsRead()kv = records.next()map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}} else {// Stick values into our buffer// 外部排序中,不需要聚合操作// 對數據進行排序寫入到內存緩沖區中while (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}}}3. Shuffle 讀操作
(1)在SparkEnv啟動時,會對ShuffleManage、BlockManager和MapOutputTracker等實例化。ShuffleManager配置項有SortShuffleManager和自定義的ShuffleManager兩種,
SortShuffleManager實例化BlockStoreShuffleReader,持有的實例是IndexShuffleBlockResolver實例。
(2)在BlockStoreShuffleReader的read方法中,調用mapOutputTracker的getMapSizesByExecutorId方法,由Executor的MapOutputTrackerWorker發送獲取結果狀態的
GetMapOutputStatuses消息給Driver端的MapOutputTrackerMaster,請求獲取上游Shuffle輸出結果對應的MapStatus,其中存放了結果數據信息,也就是我們之前在Spark作業執行中介紹的ShuffleMapTask執行結果元信息。
(3)知道Shuffle結果的位置信息后,對這些位置進行篩選,判斷是從本地還是遠程獲取這些數據。如果是本地直接調用BlockManager的getBlockData方法,在讀取數據的時候會根據寫入方式的不同采取不同的ShuffleBlockResolver讀取;如果是在遠程節點上,需要通過Netty網絡方式讀取數據。
在遠程讀取的時候會采用多線程的方式進行讀取,一般來說,會啟動5個線程到5個節點進行讀取數據,每次請求的數據大小不回超過系統設置的1/5,該大小由spark.reducer.maxSizeInFlight配置項進行設置,默認情況該配置為48MB。
(6)讀取數據后,判斷ShuffleDependency是否定義聚合(Aggregation), 如果需要,則根據鍵值進行聚合。在上游ShuffleMapTask已經做了合并,則在合并數據的基礎上做鍵值聚合。待數據處理完畢后,使用外部排序(ExternalSorter)對數據進行排序并放入存儲中。
Shuffle Read 類調用關系圖:
創建ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對于本地塊,從本地讀取對于遠程塊,通過遠程方法讀取
如果reduce端需要聚合:如果map端已經聚合過了,則對讀取到的聚合結果進行聚合; 如果map端沒有聚合,則針對未合并的<k,v>進行聚合
如果需要對key排序,則進行排序。基于sort的shuffle實現過程中,默認只是按照partitionId排序。在每一個partition內部并沒有排序,因此添加了keyOrdering變量,提供是否需要對分區內部的key排序
源碼:
Shuffle讀的起點是由ShuffledRDD.computer發起的,在該方法中會調用ShuffleManager的getReader方法,在前面我們已經知道Sort Based Shuffle使用的是BlockStoreShuffleReader的read方式。
// ResultTask或者ShuffleMapTask,在執行到ShuffledRDD時// 會調用compute方法來計算partition的數據override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]// 獲取Reader(BlockStoreShuffleReader),拉取shuffleMapTask/ResultTask,需要聚合的數據SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]}(2)在BlockStoreShuffleReader的read方法里先實例化ShuffleBlockFetcherIterator,在該實例化過程中,通過MapOutputTracker的getMapSizeByExecutorId獲取上游ShuffleMapTask輸出的元數據。先嘗試在本地的mapStatus獲取,如果獲取不到,則通過RPC通行框架,發送消息給MapOutputTrackerMaster,
請求獲取該ShuffleMapTask輸出數據的元數據,獲取這些元數據轉換成Seq[(BlockManagerId, Seq[(BlockId, Long)])]的序列。在這個序列中的元素包括兩部分信息,BlockManagerId可以定位數據所處的Executor,而Seq[(BlockId,Long)]可以定位Executor的數據塊編號和獲取數據的大小。
在MapOutputTracker的getMapSizesByExecutorId方法代碼如下:
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")// 通過shuffleId獲取上游ShuffleMapTask輸出數據的元數據val statuses = getStatuses(shuffleId)// Synchronize on the returned array because, on the driver, it gets mutated in place// 使用同步的方式把獲取到的MapStatuses轉為Seq[(BlockManagerId, Seq[(BlockId, Long)])]格式statuses.synchronized {return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)}}獲取上游的ShuffleMapTask輸出數據的元數據是在getStatuses方法中,在該方法中通過同步的方式嘗試在本地mapStatus中讀取,如果成功獲取,則返回這些信息;如果失敗,則通過RPC通信框架發送請求到MapOutputTrackerMaster進行獲取。
private def getStatuses(shuffleId: Int): Array[MapStatus] = {// 根據ShuffleMapTask的編號嘗試從本地獲取輸出結果的元數據MpaStatus,// 如果不能獲取這些信息,則向MapOutPutTracekrMaster請求獲取val statuses = mapStatuses.get(shuffleId).orNullif (statuses == null) {logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")val startTime = System.currentTimeMillisvar fetchedStatuses: Array[MapStatus] = nullfetching.synchronized {// Someone else is fetching it; wait for them to be done// 其他人在讀取該信息,等待其他人讀取完畢后再進行讀取while (fetching.contains(shuffleId)) {try {fetching.wait()} catch {case e: InterruptedException =>}}// Either while we waited the fetch happened successfully, or// someone fetched it in between the get and the fetching.synchronized.// 使用同步操作讀取指定Shuffle編號的數據,該操作要么成功讀取,要么其他人同時在讀取,此時把讀取Shuffle編號// 加入到fetching讀取列表中,以便后續中讀取。fetchedStatuses = mapStatuses.get(shuffleId).orNullif (fetchedStatuses == null) {// We have to do the fetch, get others to wait for us.fetching += shuffleId}}if (fetchedStatuses == null) {// We won the race to fetch the statuses; do sologInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)// This try-finally prevents hangs due to timeouts:try {// 發送消息給MapOutputTrackerMaster,獲取該ShuffleMapTask輸出的元數據val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))// 對獲取的元數據進行反序列化fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)logInfo("Got the output locations")mapStatuses.put(shuffleId, fetchedStatuses)} finally {fetching.synchronized {fetching -= shuffleIdfetching.notifyAll()}}}(3)獲取讀取數據位置信息后,返回到ShuffleBlockFetcherIterator的initalize方法,該方法是Shuffle讀的核心代碼所在。在該方法中通過調用splitLocalRemoteBlocks方法對獲取的數據位置信息進行區分,判斷數據所處的位置是本地節點還是遠程節點。如果是遠程節點使用fetchUpToMaxBytes方法,從遠程節點匯總獲取數據;如果是本地節點使用fetchLocalBlock方法獲取數據。
private[this] def initialize(): Unit = {// Add a task completion callback (called in both success case and failure case) to cleanup.context.addTaskCompletionListener(_ => cleanup())// Split local and remote blocks. 切分本地和遠程block// 對獲取數據位置的元數據進行分區,區分為本地節點還是遠程節點val remoteRequests = splitLocalRemoteBlocks()// Add the remote requests into our queue in a random orderfetchRequests ++= Utils.randomize(remoteRequests)assert ((0 == reqsInFlight) == (0 == bytesInFlight),"expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight)// Send out initial requests for blocks, up to our maxBytesInFlight// 對于遠程節點數據,使用Netty網絡方式讀取fetchUpToMaxBytes()val numFetches = remoteRequests.size - fetchRequests.sizelogInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))// Get Local Blocks// 對于本地數據,sort Based Shuffle使用的是IndexShuffleBlockResolver的getBlockData方法獲取數據fetchLocalBlocks()logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))}劃分本地節點還是遠程節點的splitLocalRemoteBlocks方法中劃分數據讀取方式:
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5// nodes, rather than blocking on reading output from one node.// 設置每次請求的大小不超過maxBytesInFlight的1/5,該閾值由spark.reducer.maxSizeInFlight配置,默認48MBval targetRequestSize = math.max(maxBytesInFlight / 5, 1L)logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)// Split local and remote blocks. Remote blocks are further split into FetchRequests of size// at most maxBytesInFlight in order to limit the amount of data in flight.val remoteRequests = new ArrayBuffer[FetchRequest]// Tracks total number of blocks (including zero sized blocks)var totalBlocks = 0for ((address, blockInfos) <- blocksByAddress) {totalBlocks += blockInfos.sizeif (address.executorId == blockManager.blockManagerId.executorId) {// Filter out zero-sized blocks// 當數據和所在BlockManager在一個節點時,把該信息加入到localBlocks列表中,// 需要過濾大小為0的數據塊localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)numBlocksToFetch += localBlocks.size} else {val iterator = blockInfos.iteratorvar curRequestSize = 0Lvar curBlocks = new ArrayBuffer[(BlockId, Long)]while (iterator.hasNext) {val (blockId, size) = iterator.next()// Skip empty blocksif (size > 0) {// 對于不空數據塊,把其信息加入到列表中curBlocks += ((blockId, size))remoteBlocks += blockIdnumBlocksToFetch += 1curRequestSize += size} else if (size < 0) {throw new BlockException(blockId, "Negative block size " + size)}// 按照不大于maxBytesInFlight的標準,把這些需要處理數據組合在一起if (curRequestSize >= targetRequestSize) {// Add this FetchRequestremoteRequests += new FetchRequest(address, curBlocks)curBlocks = new ArrayBuffer[(BlockId, Long)]logDebug(s"Creating fetch request of $curRequestSize at $address")curRequestSize = 0}}// Add in the final request// 剩余的處理數據組成一次請求if (curBlocks.nonEmpty) {remoteRequests += new FetchRequest(address, curBlocks)}}}logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks")remoteRequests}(4)數據讀取完畢后,回到BlockStoreShuffleReader的read方法,判斷是否定義聚合,如果需要,則根據鍵值調用Aggregator的combineCombinersByKey
方法進行聚合。聚合完畢,使用外部排序(ExternalSorter的insrtAll)對數據進行排序并放入內存中
總結
以上是生活随笔為你收集整理的Spark详解(十):SparkShuffle机制原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark详解(九):Spark存储原理
- 下一篇: Spark详解(十一):Spark运行架