日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark RDD使用详解3--Value型Transformation算子

發(fā)布時間:2024/1/17 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark RDD使用详解3--Value型Transformation算子 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

處理數(shù)據(jù)類型為Value型的Transformation算子可以根據(jù)RDD變換算子的輸入分區(qū)與輸出分區(qū)關系分為以下幾種類型:

1)輸入分區(qū)與輸出分區(qū)一對一型?
2)輸入分區(qū)與輸出分區(qū)多對一型?
3)輸入分區(qū)與輸出分區(qū)多對多型?
4)輸出分區(qū)為輸入分區(qū)子集型?
5)還有一種特殊的輸入與輸出分區(qū)一對一的算子類型:Cache型。 Cache算子對RDD分區(qū)進行緩存

輸入分區(qū)與輸出分區(qū)一對一型

(1)map

將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€新的元素。源碼中的map算子相當于初始化一個RDD,新RDD叫作MapPartitionsRDD(this,sc.clean(f))。

圖中,每個方框表示一個RDD分區(qū),左側(cè)的分區(qū)經(jīng)過用戶自定義函數(shù)f:T->U映射為右側(cè)的新的RDD分區(qū)。但是實際只有等到Action算子觸發(fā)后,這個f函數(shù)才會和其他函數(shù)在一個Stage中對數(shù)據(jù)進行運算。 V1輸入f轉(zhuǎn)換輸出V’ 1。

源碼:

?
  • /**

  • * Return a new RDD by applying a function to all elements of this RDD.

  • */

  • def map[U: ClassTag](f: T => U): RDD[U] = {

  • val cleanF = sc.clean(f)

  • new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

  • }

  • (2)flatMap

    將原來RDD中的每個元素通過函數(shù)f轉(zhuǎn)換為新的元素,并將生成的RDD的每個集合中的元素合并為一個集合。 內(nèi)部創(chuàng)建FlatMappedRDD(this,sc.clean(f))。

    圖中,小方框表示RDD的一個分區(qū),對分區(qū)進行flatMap函數(shù)操作,flatMap中傳入的函數(shù)為f:T->U,T和U可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認為是一個RDD分區(qū),小方框代表一個集合。 V1、 V2、 V3在一個集合作為RDD的一個數(shù)據(jù)項,轉(zhuǎn)換為V’ 1、 V’ 2、 V’ 3后,將結(jié)合拆散,形成為RDD中的數(shù)據(jù)項。

    源碼:

    ?

    ?
  • /**

  • * Return a new RDD by first applying a function to all elements of this

  • * RDD, and then flattening the results.

  • */

  • def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {

  • val cleanF = sc.clean(f)

  • new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))

  • }

  • ?

    (3)mapPartitions

    mapPartitions函數(shù)獲取到每個分區(qū)的迭代器,在函數(shù)中通過這個分區(qū)整體的迭代器對整個分區(qū)的元素進行操作。 內(nèi)部實現(xiàn)是生成MapPartitionsRDD。

    圖中,用戶通過函數(shù)f(iter) => iter.filter(_>=3)對分區(qū)中的所有數(shù)據(jù)進行過濾,>=3的數(shù)據(jù)保留。一個方塊代表一個RDD分區(qū),含有1、 2、 3的分區(qū)過濾只剩下元素3。

    源碼:

    ?

    ?
  • /**

  • * Return a new RDD by applying a function to each partition of this RDD.

  • *

  • * `preservesPartitioning` indicates whether the input function preserves the partitioner, which

  • * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.

  • */

  • def mapPartitions[U: ClassTag](

  • f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {

  • val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)

  • new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)

  • }

  • ?

    (4)glom

    glom函數(shù)將每個分區(qū)形成一個數(shù)組,內(nèi)部實現(xiàn)是返回的RDD[Array[T]]。?

    圖中,方框代表一個分區(qū)。 該圖表示含有V1、 V2、 V3的分區(qū)通過函數(shù)glom形成一個數(shù)組Array[(V1),(V2),(V3)]。

    源碼:

    ?

    ?
  • /**

  • * Return an RDD created by coalescing all elements within each partition into an array.

  • */

  • def glom(): RDD[Array[T]] = {

  • new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))

  • }

  • ?

    輸入分區(qū)與輸出分區(qū)多對一型

    (1)union

    使用union函數(shù)時需要保證兩個RDD元素的數(shù)據(jù)類型相同,返回的RDD數(shù)據(jù)類型和被合并的RDD元素數(shù)據(jù)類型相同,并不進行去重操作,保存所有元素。如果想去重,可以使用distinct()。++符號相當于uion函數(shù)操作。

    圖中,左側(cè)的大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一個RDD。V1、V1、V2、V8形成一個分區(qū),其他元素同理進行合并。

    源碼:

    ?

    ?
  • /**

  • * Return the union of this RDD and another one. Any identical elements will appear multiple

  • * times (use `.distinct()` to eliminate them).

  • */

  • def union(other: RDD[T]): RDD[T] = {

  • if (partitioner.isDefined && other.partitioner == partitioner) {

  • new PartitionerAwareUnionRDD(sc, Array(this, other))

  • } else {

  • new UnionRDD(sc, Array(this, other))

  • }

  • }

  • ?

    (2)certesian

    對兩個RDD內(nèi)的所有元素進行笛卡爾積操作。操作后,內(nèi)部實現(xiàn)返回CartesianRDD。?

    左側(cè)的大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。?
    大方框代表RDD,大方框中的小方框代表RDD分區(qū)。 例如,V1和另一個RDD中的W1、 W2、 Q5進行笛卡爾積運算形成(V1,W1)、(V1,W2)、(V1,Q5)。

    源碼:

    ?

    ?
  • /**

  • * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of

  • * elements (a, b) where a is in `this` and b is in `other`.

  • */

  • def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)

  • ?

    輸入分區(qū)與輸出分區(qū)多對多型

    groupBy

    將元素通過函數(shù)生成相應的Key,數(shù)據(jù)就轉(zhuǎn)化為Key-Value格式,之后將Key相同的元素分為一組。

    val cleanF = sc.clean(f)中sc.clean函數(shù)將用戶函數(shù)預處理;?
    this.map(t => (cleanF(t), t)).groupByKey(p)對數(shù)據(jù)map進行函數(shù)操作,再對groupByKey進行分組操作。其中,p中確定了分區(qū)個數(shù)和分區(qū)函數(shù),也就決定了并行化的程度。


    圖中,方框代表一個RDD分區(qū),相同key的元素合并到一個組。 例如,V1,V2合并為一個Key-Value對,其中key為“ V” ,Value為“ V1,V2” ,形成V,Seq(V1,V2)。

    源碼:

    ?

    ?
  • /**

  • * Return an RDD of grouped items. Each group consists of a key and a sequence of elements

  • * mapping to that key. The ordering of elements within each group is not guaranteed, and

  • * may even differ each time the resulting RDD is evaluated.

  • *

  • * Note: This operation may be very expensive. If you are grouping in order to perform an

  • * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]

  • * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.

  • */

  • def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =

  • groupBy[K](f, defaultPartitioner(this))

  • ?
  • /**

  • * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements

  • * mapping to that key. The ordering of elements within each group is not guaranteed, and

  • * may even differ each time the resulting RDD is evaluated.

  • *

  • * Note: This operation may be very expensive. If you are grouping in order to perform an

  • * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]

  • * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.

  • */

  • def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =

  • groupBy(f, new HashPartitioner(numPartitions))

  • ?
  • /**

  • * Return an RDD of grouped items. Each group consists of a key and a sequence of elements

  • * mapping to that key. The ordering of elements within each group is not guaranteed, and

  • * may even differ each time the resulting RDD is evaluated.

  • *

  • * Note: This operation may be very expensive. If you are grouping in order to perform an

  • * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]

  • * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.

  • */

  • def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)

  • : RDD[(K, Iterable[T])] = {

  • val cleanF = sc.clean(f)

  • this.map(t => (cleanF(t), t)).groupByKey(p)

  • }

  • ?

    輸出分區(qū)為輸入分區(qū)子集型

    (1)filter

    filter的功能是對元素進行過濾,對每個元素應用f函數(shù),返回值為true的元素在RDD中保留,返回為false的將過濾掉。 內(nèi)部實現(xiàn)相當于生成FilteredRDD(this,sc.clean(f))。

    圖中,每個方框代表一個RDD分區(qū)。 T可以是任意的類型。通過用戶自定義的過濾函數(shù)f,對每個數(shù)據(jù)項進行操作,將滿足條件,返回結(jié)果為true的數(shù)據(jù)項保留。 例如,過濾掉V2、 V3保留了V1,將區(qū)分命名為V1’。

    源碼:

    ?

    ?
  • /**

  • * Return a new RDD containing only the elements that satisfy a predicate.

  • */

  • def filter(f: T => Boolean): RDD[T] = {

  • val cleanF = sc.clean(f)

  • new MapPartitionsRDD[T, T](

  • this,

  • (context, pid, iter) => iter.filter(cleanF),

  • preservesPartitioning = true)

  • }

  • ?

    (2)distinct

    distinct將RDD中的元素進行去重操作。?

    圖中,每個方框代表一個分區(qū),通過distinct函數(shù),將數(shù)據(jù)去重。 例如,重復數(shù)據(jù)V1、 V1去重后只保留一份V1。

    源碼:

    ?

    ?
  • /**

  • * Return a new RDD containing the distinct elements in this RDD.

  • */

  • def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =

  • map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  • ?
  • /**

  • * Return a new RDD containing the distinct elements in this RDD.

  • */

  • def distinct(): RDD[T] = distinct(partitions.size)

  • (3)subtract

    subtract相當于進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。?

    圖中,左側(cè)的大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。V1在兩個RDD中均有,根據(jù)差集運算規(guī)則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。

    源碼:

    ?

    ?
  • /**

  • * Return an RDD with the elements from `this` that are not in `other`.

  • *

  • * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting

  • * RDD will be <= us.

  • */

  • def subtract(other: RDD[T]): RDD[T] =

  • subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))

  • ?
  • /**

  • * Return an RDD with the elements from `this` that are not in `other`.

  • */

  • def subtract(other: RDD[T], numPartitions: Int): RDD[T] =

  • subtract(other, new HashPartitioner(numPartitions))

  • ?
  • /**

  • * Return an RDD with the elements from `this` that are not in `other`.

  • */

  • def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {

  • if (partitioner == Some(p)) {

  • // Our partitioner knows how to handle T (which, since we have a partitioner, is

  • // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples

  • val p2 = new Partitioner() {

  • override def numPartitions = p.numPartitions

  • override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)

  • }

  • // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies

  • // anyway, and when calling .keys, will not have a partitioner set, even though

  • // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be

  • // partitioned by the right/real keys (e.g. p).

  • this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys

  • } else {

  • this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys

  • }

  • }

  • ?

    (4)sample

    sample將RDD這個集合內(nèi)的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。?
    參數(shù)說明:

    withReplacement=true, 表示有放回的抽樣;?
    withReplacement=false, 表示無放回的抽樣。


    每個方框是一個RDD分區(qū)。通過sample函數(shù),采樣50%的數(shù)據(jù)。V1、V2、U1、U2、U3、U4采樣出數(shù)據(jù)V1和U1、U2,形成新的RDD。

    源碼:

    ?

    ?
  • /**

  • * Return a sampled subset of this RDD.

  • */

  • def sample(withReplacement: Boolean,

  • fraction: Double,

  • seed: Long = Utils.random.nextLong): RDD[T] = {

  • require(fraction >= 0.0, "Negative fraction value: " + fraction)

  • if (withReplacement) {

  • new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)

  • } else {

  • new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)

  • }

  • }

  • ?

    (5)takeSample

    takeSample()函數(shù)和上面的sample函數(shù)是一個原理,但是不使用相對比例采樣,而是按設定的采樣個數(shù)進行采樣,同時返回結(jié)果不再是RDD,而是相當于對采樣后的數(shù)據(jù)進行collect(),返回結(jié)果的集合為單機的數(shù)組。

    圖中,左側(cè)的方框代表分布式的各個節(jié)點上的分區(qū),右側(cè)方框代表單機上返回的結(jié)果數(shù)組。通過takeSample對數(shù)據(jù)采樣,設置為采樣一份數(shù)據(jù),返回結(jié)果為V1。

    源碼:

    ?

    ?
  • /**

  • * Return a fixed-size sampled subset of this RDD in an array

  • *

  • * @param withReplacement whether sampling is done with replacement

  • * @param num size of the returned sample

  • * @param seed seed for the random number generator

  • * @return sample of specified size in an array

  • */

  • def takeSample(withReplacement: Boolean,

  • num: Int,

  • seed: Long = Utils.random.nextLong): Array[T] = {

  • val numStDev = 10.0

  • ?
  • if (num < 0) {

  • throw new IllegalArgumentException("Negative number of elements requested")

  • } else if (num == 0) {

  • return new Array[T](0)

  • }

  • ?
  • val initialCount = this.count()

  • if (initialCount == 0) {

  • return new Array[T](0)

  • }

  • ?
  • val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt

  • if (num > maxSampleSize) {

  • throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +

  • s"$numStDev * math.sqrt(Int.MaxValue)")

  • }

  • ?
  • val rand = new Random(seed)

  • if (!withReplacement && num >= initialCount) {

  • return Utils.randomizeInPlace(this.collect(), rand)

  • }

  • ?
  • val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,

  • withReplacement)

  • ?
  • var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

  • ?
  • // If the first sample didn't turn out large enough, keep trying to take samples;

  • // this shouldn't happen often because we use a big multiplier for the initial size

  • var numIters = 0

  • while (samples.length < num) {

  • logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")

  • samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

  • numIters += 1

  • }

  • ?
  • Utils.randomizeInPlace(samples, rand).take(num)

  • }

  • ?

    Cache型

    (1)cache

    cache將RDD元素從磁盤緩存到內(nèi)存,相當于persist(MEMORY_ONLY)函數(shù)的功能。?

    圖中,每個方框代表一個RDD分區(qū),左側(cè)相當于數(shù)據(jù)分區(qū)都存儲在磁盤,通過cache算子將數(shù)據(jù)緩存在內(nèi)存。

    源碼:

    ?

    ?
  • /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

  • def cache(): this.type = persist()

  • ?

    (2)persist

    persist函數(shù)對RDD進行緩存操作。數(shù)據(jù)緩存在哪里由StorageLevel枚舉類型確定。?
    有幾種類型的組合,DISK代表磁盤,MEMORY代表內(nèi)存,SER代表數(shù)據(jù)是否進行序列化存儲。StorageLevel是枚舉類型,代表存儲模式,如,MEMORY_AND_DISK_SER代表數(shù)據(jù)可以存儲在內(nèi)存和磁盤,并且以序列化的方式存儲。 其他同理。



    圖中,方框代表RDD分區(qū)。 disk代表存儲在磁盤,mem代表存儲在內(nèi)存。 數(shù)據(jù)最初全部存儲在磁盤,通過persist(MEMORY_AND_DISK)將數(shù)據(jù)緩存到內(nèi)存,但是有的分區(qū)無法容納在內(nèi)存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤,將含有U1,U2的RDD仍舊存儲在內(nèi)存。

    源碼:

    ?

    ?
  • /**

  • * Set this RDD's storage level to persist its values across operations after the first time

  • * it is computed. This can only be used to assign a new storage level if the RDD does not

  • * have a storage level set yet..

  • */

  • def persist(newLevel: StorageLevel): this.type = {

  • // TODO: Handle changes of StorageLevel

  • if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {

  • throw new UnsupportedOperationException(

  • "Cannot change storage level of an RDD after it was already assigned a level")

  • }

  • sc.persistRDD(this)

  • // Register the RDD with the ContextCleaner for automatic GC-based cleanup

  • sc.cleaner.foreach(_.registerRDDForCleanup(this))

  • storageLevel = newLevel

  • this

  • }

  • ?
  • /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

  • def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  • 原文鏈接:http://blog.csdn.net/jasonding1354

    總結(jié)

    以上是生活随笔為你收集整理的Spark RDD使用详解3--Value型Transformation算子的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。