Spark分区器HashPartitioner和RangePartitioner代码详解
轉載:
https://www.iteblog.com/archives/1522.html
在Spark中分區器直接決定了RDD中分區的個數;也決定了RDD中每條數據經過Shuffle過程屬于哪個分區;也決定了Reduce的個數。這三點看起來是不同的方面的,但其深層的含義是一致的。
我們需要注意的是,只有Key-Value類型的RDD才有分區的,非Key-Value類型的RDD分區的值是None的。
在Spark中,存在兩類分區函數:HashPartitioner和RangePartitioner,它們都是繼承自Partitioner,主要提供了每個RDD有幾個分區(numPartitions)以及對于給定的值返回一個分區ID(0~numPartitions-1),也就是決定這個值是屬于那個分區的。
文章目錄
- 1 HashPartitioner分區
- 2 RangePartitioner分區
- 3 確認邊界
HashPartitioner分區
HashPartitioner分區的原理很簡單,對于給定的key,計算其hashCode,并除于分區的個數取余,如果余數小于0,則用余數+分區的個數,最后返回的值就是這個key所屬的分區ID。實現如下:
| ? class HashPartitioner(partitions: Int) extends Partitioner { ??require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") ? ??def numPartitions: Int = partitions ? ??def getPartition(key: Any): Int = key match { ????case null => 0 ????case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) ??} ? ??override def equals(other: Any): Boolean = other match { ????case h: HashPartitioner => ??????h.numPartitions == numPartitions ????case _ => ??????false ??} ? ??override def hashCode: Int = numPartitions } |
RangePartitioner分區
從HashPartitioner分區的實現原理我們可以看出,其結果可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據,這顯然不是我們需要的。而RangePartitioner分區則盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,也就是說一個分區中的元素肯定都是比另一個分區內的元素小或者大;但是分區內的元素是不能保證順序的。簡單的說就是將一定范圍內的數映射到某一個分區內。
前面討論過,RangePartitioner分區器的主要作用就是將一定范圍內的數映射到某一個分區內,所以它的實現中分界的算法尤為重要。這個算法對應的函數是rangeBounds。這個函數主要經歷了兩個過程:以Spark 1.1版本為界,Spark 1.1版本社區對rangeBounds函數進行了一次重大的重構。
因為在Spark 1.1版本之前,RangePartitioner分區對整個數據集進行了2次的掃描:一次是計算RDD中元素的個數;一次是進行采樣。具體的代碼如下:
| // An array of upper bounds for the first (partitions - 1) partitions private val rangeBounds: Array[K] = { ????if (partitions == 1) { ??????Array() ????} else { ??????val rddSize = rdd.count() ??????val maxSampleSize = partitions * 20.0 ??????val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0) ??????val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted ??????if (rddSample.length == 0) { ????????Array() ??????} else { ????????val bounds = new Array[K](partitions - 1) ????????for (i <- 0 until partitions - 1) { ??????????val index = (rddSample.length - 1) * (i + 1) / partitions ??????????bounds(i) = rddSample(index) ????????} ????????bounds ??????} ????} } |
注意看里面的rddSize的計算和rdd.sample的計算。所以如果你進行一次sortByKey操作就會對RDD掃描三次!而我們都知道,分區函數性能對整個Spark作業的性能是有直接的影響,而且影響很大,直接影響作業運行的總時間,所以社區不得不對RangePartitioner中的rangeBounds算法進行重構。
在閱讀新版本的RangePartitioner之前,建議先去了解一下Reservoir sampling(水塘抽樣),因為其中的實現用到了Reservoir sampling算法進行采樣。
采樣總數
在新的rangeBounds算法總,采樣總數做了一個限制,也就是最大只采樣1e6的樣本(也就是1000000):
| val sampleSize = math.min(20.0 * partitions, 1e6) |
所以如果你的分區個數為5,則采樣樣本數量為100.0
父RDD中每個分區采樣樣本數
按照我們的思路,正常情況下,父RDD每個分區需要采樣的數據量應該是sampleSize/rdd.partitions.size,但是我們看代碼的時候發現父RDD每個分區需要采樣的數據量是正常數的3倍。
| val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt |
這是因為父RDD各分區中的數據量可能會出現傾斜的情況,乘于3的目的就是保證數據量小的分區能夠采樣到足夠的數據,而對于數據量大的分區會進行第二次采樣。
采樣算法
這個地方就是RangePartitioner分區的核心了,其內部使用的就是水塘抽樣,而這個抽樣特別適合那種總數很大而且未知,并無法將所有的數據全部存放到主內存中的情況。也就是我們不需要事先知道RDD中元素的個數(不需要調用rdd.count()了!)。其主要實現如下:
| ? ? val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) ? def sketch[K : ClassTag]( ??????rdd: RDD[K], ??????sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { ????val shift = rdd.id ????// val classTagK = classTag[K] // to avoid serializing the entire partitioner object ????val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => ??????val seed = byteswap32(idx ^ (shift << 16)) ??????val (sample, n) = SamplingUtils.reservoirSampleAndCount( ????????iter, sampleSizePerPartition, seed) ??????Iterator((idx, n, sample)) ????}.collect() ????val numItems = sketched.map(_._2.toLong).sum ????(numItems, sketched) } ? def reservoirSampleAndCount[T: ClassTag]( ??????input: Iterator[T], ??????k: Int, ??????seed: Long = Random.nextLong()) ????: (Array[T], Int) = { ????val reservoir = new Array[T](k) ????// Put the first k elements in the reservoir. ????var i = 0 ????while (i < k && input.hasNext) { ??????val item = input.next() ??????reservoir(i) = item ??????i += 1 ????} ? ????// If we have consumed all the elements, return them. Otherwise do the replacement. ????if (i < k) { ??????// If input size < k, trim the array to return only an array of input size. ??????val trimReservoir = new Array[T](i) ??????System.arraycopy(reservoir, 0, trimReservoir, 0, i) ??????(trimReservoir, i) ????} else { ??????// If input size > k, continue the sampling process. ??????val rand = new XORShiftRandom(seed) ??????while (input.hasNext) { ????????val item = input.next() ????????val replacementIndex = rand.nextInt(i) ????????if (replacementIndex < k) { ??????????reservoir(replacementIndex) = item ????????} ????????i += 1 ??????} ??????(reservoir, i) } } |
RangePartitioner.sketch的第一個參數是rdd.map(_._1),也就是把父RDD的key傳進來,因為分區只需要對Key進行操作即可。該函數返回值是val (numItems, sketched) ,其中numItems相當于記錄rdd元素的總數;而sketched的類型是Array[(Int, Int, Array[K])],記錄的是分區的編號、該分區中總元素的個數以及從父RDD中每個分區采樣的數據。
sketch函數對父RDD中的每個分區進行采樣,并記錄下分區的ID和分區中數據總和。
reservoirSampleAndCount函數就是典型的水塘抽樣實現,唯一不同的是該算法還記錄下i的值,這個就是該分區中元素的總和。
我們之前討論過,父RDD各分區中的數據量可能不均勻,在極端情況下,有些分區內的數據量會占有整個RDD的絕大多數的數據,如果按照水塘抽樣進行采樣,會導致該分區所采樣的數據量不足,所以我們需要對該分區再一次進行采樣,而這次采樣使用的就是rdd的sample函數。實現如下:
| val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx, n, sample) => ??if (fraction * n > sampleSizePerPartition) { ????imbalancedPartitions += idx ??} else { ????// The weight is 1 over the sampling probability. ????val weight = (n.toDouble / sample.size).toFloat ????for (key <- sample) { ??????candidates += ((key, weight)) ????} ??} } if (imbalancedPartitions.nonEmpty) { ??// Re-sample imbalanced partitions with the desired sampling probability. ??val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) ??val seed = byteswap32(-rdd.id - 1) ??val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() ??val weight = (1.0 / fraction).toFloat ??candidates ++= reSampled.map(x => (x, weight)) } |
我們可以看到,重新采樣的采樣因子和Spark 1.1之前的采樣因子一致。對于滿足于fraction * n > sampleSizePerPartition條件的分區,我們對其再一次采樣。所有采樣完的數據全部存放在candidates 中。
確認邊界
從上面的采樣算法可以看出,對于不同的分區weight的值是不一樣的,這個值對應的就是每個分區的采樣間隔。
| def determineBounds[K : Ordering : ClassTag]( ????candidates: ArrayBuffer[(K, Float)], ????partitions: Int): Array[K] = { ??val ordering = implicitly[Ordering[K]] ??val ordered = candidates.sortBy(_._1) ??val numCandidates = ordered.size ??val sumWeights = ordered.map(_._2.toDouble).sum ??val step = sumWeights / partitions ??var cumWeight = 0.0 ??var target = step ??val bounds = ArrayBuffer.empty[K] ??var i = 0 ??var j = 0 ??var previousBound = Option.empty[K] ??while ((i < numCandidates) && (j < partitions - 1)) { ????val (key, weight) = ordered(i) ????cumWeight += weight ????if (cumWeight > target) { ??????// Skip duplicate values. ??????if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { ????????bounds += key ????????target += step ????????j += 1 ????????previousBound = Some(key) ??????} ????} ????i += 1 ??} ??bounds.toArray } |
這個函數最后返回的就是分區的劃分邊界。
注意,按照理想情況,選定的劃分邊界需要保證劃分后的分區中數據量是均勻的,但是這個算法中如果將cumWeight > target修改成cumWeight >= target的時候會保證各分區之間數據量更加均衡。可以看這里https://issues.apache.org/jira/browse/SPARK-10184。
定位分區ID
分區類的一個重要功能就是對給定的值計算其屬于哪個分區。這個算法并沒有太大的變化。
| def getPartition(key: Any): Int = { ??val k = key.asInstanceOf[K] ??var partition = 0 ??if (rangeBounds.length <= 128) { ????// If we have less than 128 partitions naive search ????while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { ??????partition += 1 ????} ??} else { ????// Determine which binary search method to use only once. ????partition = binarySearch(rangeBounds, k) ????// binarySearch either returns the match location or -[insertion point]-1 ????if (partition < 0) { ??????partition = -partition-1 ????} ????if (partition > rangeBounds.length) { ??????partition = rangeBounds.length ????} ??} ??if (ascending) { ????partition ??} else { ????rangeBounds.length - partition ??} } |
如果分區邊界數組的大小小于或等于128的時候直接變量數組,否則采用二分查找法確定key屬于某個分區。
總結
以上是生活随笔為你收集整理的Spark分区器HashPartitioner和RangePartitioner代码详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark数据倾斜的完美解决
- 下一篇: Spark 资源调度及任务调度