日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

發(fā)布時間:2025/3/8 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄
天小天:(一)Spark Streaming 算子梳理 — 簡單介紹streaming運行邏輯
天小天:(二)Spark Streaming 算子梳理 — flatMap和mapPartitions
天小天:(三)Spark Streaming 算子梳理 — transform算子
天小天:(四)Spark Streaming 算子梳理 — Kafka createDirectStream
天小天:(五)Spark Streaming 算子梳理 — foreachRDD
天小天:(六)Spark Streaming 算子梳理 — glom算子
天小天:(七)Spark Streaming 算子梳理 — repartition算子
天小天:(八)Spark Streaming 算子梳理 — window算子

前言

本文主要講解repartiion的作用及原理。

作用

repartition用來調(diào)整父RDD的分區(qū)數(shù),入?yún)檎{(diào)整之后的分區(qū)數(shù)。由于使用方法比較簡單,這里就不寫例子了。

源碼分析

接下來從源碼的角度去分析是如何實現(xiàn)重新分區(qū)的。

DStream

/*** Return a new DStream with an increased or decreased level of parallelism. Each RDD in the* returned DStream has exactly numPartitions partitions.*/def repartition(numPartitions: Int): DStream[T] = ssc.withScope {this.transform(_.repartition(numPartitions))}

從方法中可以看到,實現(xiàn)repartition的方式是通過Dstream的transform算子之間調(diào)用RDD的repartition算子實現(xiàn)的。

接下來就是看看RDD的repartition算子是如何實現(xiàn)的。

RDD

/*** Return a new RDD that has exactly numPartitions partitions.** Can increase or decrease the level of parallelism in this RDD. Internally, this uses* a shuffle to redistribute data.** If you are decreasing the number of partitions in this RDD, consider using `coalesce`,* which can avoid performing a shuffle.** TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.*/def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}

首先可以看到RDD的repartition的實現(xiàn)是調(diào)用時coalesce方法。其中入?yún)⒂袃蓚€第一個是numPartitions為重新分區(qū)后的分區(qū)數(shù)量,第二個參數(shù)為是否shuffle,這里的入?yún)閠rue代表會進行shuffle。

接下來看下coalesce是如何實現(xiàn)的。

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] = withScope {require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")if (shuffle) {// 是否經(jīng)過shuffle,repartition是走這個邏輯/** Distributes elements evenly across output partitions, starting from a random partition. */// distributePartition是shuffle的邏輯,// 對迭代器中的每個元素分派不同的key,shuffle時根據(jù)這些key平均的把元素分發(fā)到下一個stage的各個partition中。val distributePartition = (index: Int, items: Iterator[T]) => {var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)items.map { t =>// Note that the hash code of the key will just be the key itself. The HashPartitioner// will mod it with the number of total partitions.position = position + 1(position, t)}} : Iterator[(Int, T)]// include a shuffle step so that our upstream tasks are still distributednew CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), // 為每個元素分配key,分配的邏輯為distributePartitionnew HashPartitioner(numPartitions)), // ShuffledRDD 根據(jù)key進行混洗numPartitions,partitionCoalescer).values} else {// 如果不經(jīng)過shuffle之間返回CoalescedRDDnew CoalescedRDD(this, numPartitions, partitionCoalescer)}}

從源碼中可以看到無論是否經(jīng)過shuffle最終返回的都是CoalescedRDD。其中區(qū)別是經(jīng)過shuffle需要為每個元素分配key,并根據(jù)key將所有的元素平均分配到task中。

CoalescedRDD

private[spark] class CoalescedRDD[T: ClassTag](@transient var prev: RDD[T], // 父RDDmaxPartitions: Int, // 最大partition數(shù)量,這里就是重新分區(qū)后的partition數(shù)量partitionCoalescer: Option[PartitionCoalescer] = None // 重新分區(qū)算法,入?yún)⒛J為None)extends RDD[T](prev.context, Nil) { // Nil since we implement getDependenciesrequire(maxPartitions > 0 || maxPartitions == prev.partitions.length,s"Number of partitions ($maxPartitions) must be positive.")if (partitionCoalescer.isDefined) {require(partitionCoalescer.get.isInstanceOf[Serializable],"The partition coalescer passed in must be serializable.")}override def getPartitions: Array[Partition] = {// 獲取重新算法,默認為DefaultPartitionCoalescerval pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())// coalesce方法是根據(jù)傳入的rdd和最大分區(qū)數(shù)計算出每個新的分區(qū)處理哪些舊的分區(qū)pc.coalesce(maxPartitions, prev).zipWithIndex.map {case (pg, i) => // pg為partitionGroup即舊的partition組成的集合,集合里的partition對應一個新的partitionval ids = pg.partitions.map(_.index).toArraynew CoalescedRDDPartition(i, prev, ids, pg.prefLoc) //組成一個新的parititon}}override def compute(partition: Partition, context: TaskContext): Iterator[T] = {// 當執(zhí)行到這里時分區(qū)已經(jīng)重新分配好了,這部分代碼也是執(zhí)行在新的分區(qū)的task中的。// 新的partition取出就的partition對應的所有partition并以此調(diào)用福rdd的迭代器執(zhí)行next計算。partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>firstParent[T].iterator(parentPartition, context)}}override def getDependencies: Seq[Dependency[_]] = {Seq(new NarrowDependency(prev) {def getParents(id: Int): Seq[Int] =partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices})}override def clearDependencies() {super.clearDependencies()prev = null}/*** Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,* then the preferred machine will be one which most parent splits prefer too.* @param partition* @return the machine most preferred by split*/override def getPreferredLocations(partition: Partition): Seq[String] = {partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq} }

對于CoalescedRDD來講getPartitions方法是最核心的方法。舊的parition對應哪些新的partition就是在這個方法里計算出來的。具體的算法是在DefaultPartitionCoalescer的coalesce方法體現(xiàn)出來的。

compute方法是在新的task中執(zhí)行的,即分區(qū)已經(jīng)重新分配好,并且拉取父RDD指定parition對應的元素提供給下游迭代器計算。

圖示

寫下來用兩張圖解釋下是如何repartition

無shuffle

有shuffle

總結(jié)

以上repartition的邏輯基本就已經(jīng)介紹完了。其中DefaultPartitionCoalescer中重新分區(qū)的算法邏輯并沒有展開說。這里以后如果有時間會再寫一篇詳細介紹。

總結(jié)

以上是生活随笔為你收集整理的spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。