日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark 源码分析 -- RDD

發布時間:2025/7/14 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark 源码分析 -- RDD 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

關于RDD, 詳細可以參考Spark的論文, 下面看下源碼
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Represents an immutable, partitioned collection of elements that can be operated on in parallel.

* Internally, each RDD is characterized by five main properties:
*? - A list of partitions
*? - A function for computing each split
*? - A list of dependencies on other RDDs
*? - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*? - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

RDD分為一下幾類,

basic(org.apache.spark.rdd.RDD): This class contains the basic operations available on all RDDs, such as `map`, `filter`, and `persist`.

org.apache.spark.rdd.PairRDDFunctions: contains operations available only on RDDs of key-value pairs, such as `groupByKey` and `join`

org.apache.spark.rdd.DoubleRDDFunctions: contains operations available only on RDDs of Doubles

org.apache.spark.rdd.SequenceFileRDDFunctions: contains operations available on RDDs that can be saved as SequenceFiles

?

RDD首先是泛型類, T表示存放數據的類型, 在處理數據是都是基于Iterator[T]
以SparkContext和依賴關系Seq deps為初始化參數
從RDD提供的這些接口大致就可以知道, 什么是RDD
1. RDD是一塊數據, 可能比較大的數據, 所以不能保證可以放在一個機器的memory中, 所以需要分成partitions, 分布在集群的機器的memory
所以自然需要getPartitions, partitioner如果分區, getPreferredLocations分區如何考慮locality

Partition的定義很簡單, 只有id, 不包含data

trait Partition extends Serializable {/*** Get the split's index within its parent RDD*/def index: Int// A better default implementation of HashCodeoverride def hashCode(): Int = index }

2. RDD之間是有關聯的, 一個RDD可以通過compute邏輯把父RDD的數據轉化成當前RDD的數據, 所以RDD之間有因果關系
并且通過getDependencies, 可以取到所有的dependencies

3. RDD是可以被persisit的, 常用的是cache, 即StorageLevel.MEMORY_ONLY

4. RDD是可以被checkpoint的, 以提高failover的效率, 當有很長的RDD鏈時, 單純的依賴replay會比較低效

5. RDD.iterator可以產生用于迭代真正數據的Iterator[T]

6. 在RDD上可以做各種transforms和actions

abstract class RDD[T: ClassManifest](@transient private var sc: SparkContext, //@transient, 不需要序列化@transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging { /**輔助構造函數, 專門用于初始化1對1依賴關系的RDD,這種還是很多的, filter, map...

Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) // 不同于一般的RDD, 這種情況因為只有一個parent, 所以直接傳入parent RDD對象即可
// =======================================================================// Methods that should be implemented by subclasses of RDD// =======================================================================/** Implemented by subclasses to compute a given partition. */def compute(split: Partition, context: TaskContext): Iterator[T]/*** Implemented by subclasses to return the set of partitions in this RDD. This method will only* be called once, so it is safe to implement a time-consuming computation in it.*/protected def getPartitions: Array[Partition]/*** Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only* be called once, so it is safe to implement a time-consuming computation in it.*/protected def getDependencies: Seq[Dependency[_]] = deps/** Optionally overridden by subclasses to specify placement preferences. */protected def getPreferredLocations(split: Partition): Seq[String] = Nil/** Optionally overridden by subclasses to specify how they are partitioned. */val partitioner: Option[Partitioner] = None// =======================================================================// Methods and fields available on all RDDs// =======================================================================/** The SparkContext that created this RDD. */def sparkContext: SparkContext = sc/** A unique ID for this RDD (within its SparkContext). */val id: Int = sc.newRddId()/** A friendly name for this RDD */var name: String = null/*** Set this RDD's storage level to persist its values across operations after the first time* it is computed. This can only be used to assign a new storage level if the RDD does not* have a storage level set yet..*/def persist(newLevel: StorageLevel): RDD[T] = {// TODO: Handle changes of StorageLevelif (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}storageLevel = newLevel// Register the RDD with the SparkContextsc.persistentRdds(id) = thisthis}/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */def cache(): RDD[T] = persist() /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */def getStorageLevel = storageLevel// Our dependencies and partitions will be gotten by calling subclass's methods below, and will// be overwritten when we're checkpointedprivate var dependencies_ : Seq[Dependency[_]] = null @transient private var partitions_ : Array[Partition] = null /** An Option holding our checkpoint RDD, if we are checkpointed
* checkpoint就是把RDD存到磁盤文件中, 以提高failover的效率, 雖然也可以選擇replay
* 并且在RDD的實現中, 如果存在checkpointRDD, 則可以直接從中讀到RDD數據, 而不需要compute */
private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD) /*** Internal method to this RDD; will read from cache if applicable, or otherwise compute it.* This should ''not'' be called by users directly, but is available for implementors of custom* subclasses of RDD.*/ /** 這是RDD訪問數據的核心, 在RDD中的Partition中只包含id而沒有真正數據
* 那么如果獲取RDD的數據? 參考storage模塊
* 在cacheManager.getOrCompute中, 會將RDD和Partition id對應到相應的block, 并從中讀出數據*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {//StorageLevel不為None,說明這個RDD persist過, 可以直接讀出來SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)} else {computeOrReadCheckpoint(split, context) //如果沒有persisit過, 只有從新計算出, 或從checkpoint中讀出}}
// Transformations (return a new RDD) //...... 各種transformations的接口,map, union... /*** Return a new RDD by applying a function to all elements of this RDD.*/def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) // Actions (launch a job to return a value to the user program) //......各種actions的接口,count, collect... /*** Return the number of elements in the RDD.*/def count(): Long = {// 只有在action中才會真正調用runJob, 所以transform都是lazy的sc.runJob(this, (iter: Iterator[T]) => {var result = 0Lwhile (iter.hasNext) {result += 1Liter.next()}result}).sum} // =======================================================================// Other internal methods and fields// ======================================================================= /** Returns the first parent RDD
返回第一個parent RDD*/
protected[spark] def firstParent[U: ClassManifest] = {dependencies.head.rdd.asInstanceOf[RDD[U]]} //................ }

?

這里先只討論一些basic的RDD, pairRDD會單獨討論

FilteredRDD

One-to-one Dependency, FilteredRDD

使用FilteredRDD, 將當前RDD作為第一個參數, f函數作為第二個參數, 返回值是filter過后的RDD

/*** Return a new RDD containing only the elements that satisfy a predicate.*/def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))

在compute中, 對parent RDD的Iterator[T]進行filter操作

private[spark] class FilteredRDD[T: ClassManifest]( //filter是典型的one-to-one dependency, 使用輔助構造函數 prev: RDD[T], //parent RDDf: T => Boolean) //f,過濾函數extends RDD[T](prev) {//firstParent會從deps中取出第一個RDD對象, 就是傳入的prev RDD, 在One-to-one Dependency中,parent和child的partition信息相同override def getPartitions: Array[Partition] = firstParent[T].partitionsoverride val partitioner = prev.partitioner // Since filter cannot change a partition's keysoverride def compute(split: Partition, context: TaskContext) =firstParent[T].iterator(split, context).filter(f) //compute就是真正產生RDD的邏輯 }

?

UnionRDD

Range Dependency, 仍然是narrow的

先看看如果使用union的, 第二個參數是, 兩個RDD的array, 返回值就是把這兩個RDD union后產生的新的RDD

/*** Return the union of this RDD and another one. Any identical elements will appear multiple* times (use `.distinct()` to eliminate them).*/def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))

?

先定義UnionPartition, Union操作的特點是, 只是把多個RDD的partition合并到一個RDD中, 而partition本身沒有變化, 所以可以直接重用parent partition

3個參數
idx, partition id, 在當前UnionRDD中的序號
rdd, parent RDD
splitIndex, parent partition的id

private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int)extends Partition {var split: Partition = rdd.partitions(splitIndex)//從parent RDD中取出相應的partition, 重用def iterator(context: TaskContext) = rdd.iterator(split, context)//Iterator也可以重用def preferredLocations() = rdd.preferredLocations(split)override val index: Int = idx//partition id是新的, 因為多個合并后, 序號肯定會發生變化 }

定義UnionRDD

class UnionRDD[T: ClassManifest](sc: SparkContext,@transient var rdds: Seq[RDD[T]]) //parent RDD Seqextends RDD[T](sc, Nil) { // Nil since we implement getDependenciesoverride def getPartitions: Array[Partition] = {val array = new Array[Partition](rdds.map(_.partitions.size).sum) //UnionRDD的partition數,是所有parent RDD中的partition數目的和var pos = 0for (rdd <- rdds; split <- rdd.partitions) {array(pos) = new UnionPartition(pos, rdd, split.index) //創建所有的UnionPartitionpos += 1}array}override def getDependencies: Seq[Dependency[_]] = {val deps = new ArrayBuffer[Dependency[_]]var pos = 0for (rdd <- rdds) { deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)//創建RangeDependencypos += rdd.partitions.size)//由于是RangeDependency, 所以pos的遞增是加上整個區間size}deps}override def compute(s: Partition, context: TaskContext): Iterator[T] =s.asInstanceOf[UnionPartition[T]].iterator(context)//Union的compute非常簡單,什么都不需要做override def getPreferredLocations(s: Partition): Seq[String] =s.asInstanceOf[UnionPartition[T]].preferredLocations() }

轉載于:https://www.cnblogs.com/fxjwind/p/3489107.html

總結

以上是生活随笔為你收集整理的Spark 源码分析 -- RDD的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 69视频在线观看免费 | 最新中文字幕在线播放 | 偷拍中国夫妇高潮视频 | 国产精品污www一区二区三区 | 超碰1997| 真人毛片97级无遮挡精品 | 久久大胆人体 | 捆绑裸体绳奴bdsm亚洲 | 福利小视频在线播放 | 国产av电影一区二区三区 | 日韩免费影院 | 婷婷色亚洲 | 91久久精品日日躁夜夜躁欧美 | 狠狠爱亚洲 | 国产麻豆剧传媒精品国产av | 最新理伦片eeuss影院 | 18欧美性xxxx极品hd | 草比视频在线观看 | 朴麦妮原版视频高清资源 | 国产精品一区二区三区高潮 | 美女高潮流白浆视频 | 亚洲黄色一级 | 极品少妇在线观看 | 日本高清有码 | 三级国产三级在线 | 男女插插插视频 | 女人被男人躁得好爽免费视频 | 制服丝袜亚洲 | 天天色亚洲 | 欧美三级色图 | 麻豆乱码国产一区二区三区 | 欧美日韩国产高清 | 亚洲中文字幕无码av永久 | 3d动漫精品啪啪一区二区下载 | 天堂在线免费观看视频 | 国产精品无码午夜福利 | 欧美另类视频在线 | missav | 免费高清av在线看 | 国产淫片 | 欧美videos另类精品 | 欧美丝袜脚交 | 欧美三级在线看 | 亚洲爽片| 一本一道久久a久久综合蜜桃 | 日本高清视频免费看 | 九色在线观看视频 | 偷拍亚洲色图 | 97久久人国产精品婷婷 | 久久av一区二区三 | 亚洲免费在线观看av | juliaann欧美二区三区 | 夜色在线视频 | 精品欧美色视频网站在线观看 | 91一级片| 在线观看国产免费视频 | 我和单位漂亮少妇激情 | 一区二区三区视频网 | 在线观看亚洲视频 | 一区二区三区四区日韩 | 福利片在线观看 | 四川操bbb | 福利小视频在线观看 | 欧美你懂得| 亚洲av无码一区二区三区四区 | 亚洲欧美经典 | 成人精品一区二区三区在线 | 一级视频在线观看 | 精品国产黄色 | 怡红院精品视频 | 午夜免费视频 | 国产免费黄色小视频 | 精品国产综合区久久久久久 | 婷婷激情网站 | 尤物视频在线观看 | 中国zzji女人高潮免费 | 麻豆影视网站 | 少妇人妻综合久久中文字幕 | 欧美一区二区三区电影 | 九九热播视频 | 性xxxx狂欢老少配o | 午夜激情网站 | 国产精品黄色在线观看 | av秋霞| 福利视频在线 | 亚洲无毛 | 特大黑人巨人吊xxxx | 麻豆视频污 | 亚洲免费观看在线 | 欧美大片在线 | 调教驯服丰满美艳麻麻在线视频 | 成人免费精品 | 日韩色图在线观看 | 91精品视频免费观看 | 娇小的粉嫩xxx极品 国产精品人人爽人人爽 | 男男车车的车车网站w98免费 | 干一干操一操 | ts人妖另类精品视频系列 | 欧美成人黑人猛交 | 在线观看的毛片 |