日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RDD之四:Value型Transformation算子

發布時間:2025/3/17 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RDD之四:Value型Transformation算子 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

處理數據類型為Value型的Transformation算子可以根據RDD變換算子的輸入分區與輸出分區關系分為以下幾種類型:

1)輸入分區與輸出分區一對一型?
2)輸入分區與輸出分區多對一型?
3)輸入分區與輸出分區多對多型?
4)輸出分區為輸入分區子集型?
5)還有一種特殊的輸入與輸出分區一對一的算子類型:Cache型。 Cache算子對RDD分區進行緩存

輸入分區與輸出分區一對一型

(1)map

1.map(func):數據集中的每個元素經過用戶自定義的函數轉換形成一個新的RDD,新的RDD叫MappedRDD.

package com.sf.transform.base;import org.apache.spark.SparkContext import org.apache.spark.SparkConfobject Map {def main(args: Array[String]) {val conf = new SparkConf().setMaster("local").setAppName("map")val sc = new SparkContext(conf)val rdd = sc.parallelize(1 to 10) //創建RDDval map = rdd.map(_*2) //對RDD中的每個元素都乘于2map.foreach(x => print(x+" "))sc.stop()} } 輸出: 2 4 6 8 10 12 14 16 18 20 (RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分區集合,下同)

(2)flatMap

與map類似,但每個元素輸入項都可以被映射到0個或多個的輸出項,最終將結果”扁平化“后輸出。

package com.sf.transform.baseimport org.apache.spark.SparkContext import org.apache.spark.SparkConfobject FlatMap {def main(args: Array[String]) {val conf = new SparkConf().setMaster("local").setAppName("map")val sc = new SparkContext(conf)val rdd = sc.parallelize(1 to 5)val fm = rdd.map(x => (1 to x)).collect()fm.foreach(x => print(x + " "))sc.stop()} } 輸出: 1 1 2 1 2 3 1 2 3 4 1 2 3 4 5 如果是map函數其輸出如下: Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)

?(RDD依賴圖)

?

?

?

(3)mapPartitions

mapPartitions函數獲取到每個分區的迭代器,在函數中通過這個分區整體的迭代器對整個分區的元素進行操作。 內部實現是生成MapPartitionsRDD。

類似與map,map作用于每個分區的每個元素,但mapPartitions作用于每個分區中

func的類型:Iterator[T] => Iterator[U] 假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,當在映射的過程中不斷的創建對象時就可以使用mapPartitions比map的效率要高很多,比如當向數據庫寫入數據時,如果使用map就需要為每個元素創建connection對象,但使用mapPartitions的話就需要為每個分區創建connetcion對象 (例3):輸出有女性的名字: package com.sf.transform.baseimport org.apache.spark.SparkContext import org.apache.spark.SparkConfobject MapPartitions {//定義函數def partitionsFun( /*index : Int,*/ iter: Iterator[(String, String)]): Iterator[String] = {var woman = List[String]()while (iter.hasNext) {val next = iter.next()next match {case (_, "female") => woman = /*"["+index+"]"+*/ next._1 :: womancase _ =>}}return woman.iterator}def main(args: Array[String]) {val conf = new SparkConf().setMaster("local").setAppName("mappartitions")val sc = new SparkContext(conf)val list = List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female"))val rdd = sc.parallelize(list, 2) //第二個參數是分區數 val mp = rdd.mapPartitions(partitionsFun)/*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/mp.collect.foreach(x => (println(x + " "))) //將分區中的元素轉換成Array再輸出 } } 輸出: kpop lucy 其實這個效果可以用一條語句完成 val mp = rdd.mapPartitions(x => x.filter(_._2 == "female")).map(x => x._1) 之所以不那么做是為了演示函數的定義 ??(RDD依賴圖)

?4.mapPartitionsWithIndex(func):與mapPartitions類似,不同的是函數多了個分區索引的參數

func類型:(Int, Iterator[T]) => Iterator[U] (例4):將例3橙色的注釋部分去掉即是 package com.sf.transform.baseimport org.apache.spark.SparkContext import org.apache.spark.SparkConfobject MapPartitions {//定義函數def partitionsFun( iter: Iterator[(String, String)]): Iterator[String] = {var woman = List[String]()while (iter.hasNext) { val next = iter.next() next match { case (_, "female") => woman = next._1 :: woman case _ => } } return woman.iterator } def partitionsFun( index : Int, iter: Iterator[(String, String)]): Iterator[String] = { var woman = List[String]() while (iter.hasNext) { val next = iter.next() next match { case (_, "female") => woman = "["+index+"]"+ next._1 :: woman case _ => } } return woman.iterator } def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("mappartitions") val sc = new SparkContext(conf) val list = List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")) val rdd = sc.parallelize(list, 2) //第二個參數是分區數 val mp = rdd.mapPartitions(partitionsFun) println() mp.collect.foreach(x => (print(x + " "))) //將分區中的元素轉換成Array再輸出 println() val mp2 = rdd.mapPartitionsWithIndex(partitionsFun) mp2.collect.foreach(x => (print(x + " "))) //將分區中的元素轉換成Array再輸出 } }

輸出:(帶了分區索引)

[Stage 0:> (0 + 0) / 2]kpop lucy [0]kpop [1]lucy

(4)glom

glom函數將每個分區形成一個數組,內部實現是返回的RDD[Array[T]]。?

圖中,方框代表一個分區。 該圖表示含有V1、 V2、 V3的分區通過函數glom形成一個數組Array[(V1),(V2),(V3)]。

源碼:

?

[plain]?view plaincopy
  • /**??
  • ?*?Return?an?RDD?created?by?coalescing?all?elements?within?each?partition?into?an?array.??
  • ?*/??
  • def?glom():?RDD[Array[T]]?=?{??
  • ??new?MapPartitionsRDD[Array[T],?T](this,?(context,?pid,?iter)?=>?Iterator(iter.toArray))??
  • }??
  • ?

    輸入分區與輸出分區多對一型

    (1)union

    使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合并的RDD元素數據類型相同,并不進行去重操作,保存所有元素。如果想去重,可以使用distinct()。++符號相當于uion函數操作。

    圖中,左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一個RDD。V1、V1、V2、V8形成一個分區,其他元素同理進行合并。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Return?the?union?of?this?RDD?and?another?one.?Any?identical?elements?will?appear?multiple??
  • ?*?times?(use?`.distinct()`?to?eliminate?them).??
  • ?*/??
  • def?union(other:?RDD[T]):?RDD[T]?=?{??
  • ??if?(partitioner.isDefined?&&?other.partitioner?==?partitioner)?{??
  • ????new?PartitionerAwareUnionRDD(sc,?Array(this,?other))??
  • ??}?else?{??
  • ????new?UnionRDD(sc,?Array(this,?other))??
  • ??}??
  • }??
  • ?

    (2)certesian

    對兩個RDD內的所有元素進行笛卡爾積操作。操作后,內部實現返回CartesianRDD。?

    左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。?
    大方框代表RDD,大方框中的小方框代表RDD分區。 例如,V1和另一個RDD中的W1、 W2、 Q5進行笛卡爾積運算形成(V1,W1)、(V1,W2)、(V1,Q5)。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Return?the?Cartesian?product?of?this?RDD?and?another?one,?that?is,?the?RDD?of?all?pairs?of??
  • ?*?elements?(a,?b)?where?a?is?in?`this`?and?b?is?in?`other`.??
  • ?*/??
  • def?cartesian[U:?ClassTag](other:?RDD[U]):?RDD[(T,?U)]?=?new?CartesianRDD(sc,?this,?other)??
  • ?

    輸入分區與輸出分區多對多型

    groupBy

    將元素通過函數生成相應的Key,數據就轉化為Key-Value格式,之后將Key相同的元素分為一組。

    val cleanF = sc.clean(f)中sc.clean函數將用戶函數預處理;?
    this.map(t => (cleanF(t), t)).groupByKey(p)對數據map進行函數操作,再對groupByKey進行分組操作。其中,p中確定了分區個數和分區函數,也就決定了并行化的程度。


    圖中,方框代表一個RDD分區,相同key的元素合并到一個組。 例如,V1,V2合并為一個Key-Value對,其中key為“ V” ,Value為“ V1,V2” ,形成V,Seq(V1,V2)。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Return?an?RDD?of?grouped?items.?Each?group?consists?of?a?key?and?a?sequence?of?elements??
  • ?*?mapping?to?that?key.?The?ordering?of?elements?within?each?group?is?not?guaranteed,?and??
  • ?*?may?even?differ?each?time?the?resulting?RDD?is?evaluated.??
  • ?*??
  • ?*?Note:?This?operation?may?be?very?expensive.?If?you?are?grouping?in?order?to?perform?an??
  • ?*?aggregation?(such?as?a?sum?or?average)?over?each?key,?using?[[PairRDDFunctions.aggregateByKey]]??
  • ?*?or?[[PairRDDFunctions.reduceByKey]]?will?provide?much?better?performance.??
  • ?*/??
  • def?groupBy[K](f:?T?=>?K)(implicit?kt:?ClassTag[K]):?RDD[(K,?Iterable[T])]?=??
  • ??groupBy[K](f,?defaultPartitioner(this))??
  • ??
  • /**??
  • ?*?Return?an?RDD?of?grouped?elements.?Each?group?consists?of?a?key?and?a?sequence?of?elements??
  • ?*?mapping?to?that?key.?The?ordering?of?elements?within?each?group?is?not?guaranteed,?and??
  • ?*?may?even?differ?each?time?the?resulting?RDD?is?evaluated.??
  • ?*??
  • ?*?Note:?This?operation?may?be?very?expensive.?If?you?are?grouping?in?order?to?perform?an??
  • ?*?aggregation?(such?as?a?sum?or?average)?over?each?key,?using?[[PairRDDFunctions.aggregateByKey]]??
  • ?*?or?[[PairRDDFunctions.reduceByKey]]?will?provide?much?better?performance.??
  • ?*/??
  • def?groupBy[K](f:?T?=>?K,?numPartitions:?Int)(implicit?kt:?ClassTag[K]):?RDD[(K,?Iterable[T])]?=??
  • ??groupBy(f,?new?HashPartitioner(numPartitions))??
  • ??
  • /**??
  • ?*?Return?an?RDD?of?grouped?items.?Each?group?consists?of?a?key?and?a?sequence?of?elements??
  • ?*?mapping?to?that?key.?The?ordering?of?elements?within?each?group?is?not?guaranteed,?and??
  • ?*?may?even?differ?each?time?the?resulting?RDD?is?evaluated.??
  • ?*??
  • ?*?Note:?This?operation?may?be?very?expensive.?If?you?are?grouping?in?order?to?perform?an??
  • ?*?aggregation?(such?as?a?sum?or?average)?over?each?key,?using?[[PairRDDFunctions.aggregateByKey]]??
  • ?*?or?[[PairRDDFunctions.reduceByKey]]?will?provide?much?better?performance.??
  • ?*/??
  • def?groupBy[K](f:?T?=>?K,?p:?Partitioner)(implicit?kt:?ClassTag[K],?ord:?Ordering[K]?=?null)??
  • ????:?RDD[(K,?Iterable[T])]?=?{??
  • ??val?cleanF?=?sc.clean(f)??
  • ??this.map(t?=>?(cleanF(t),?t)).groupByKey(p)??
  • }??
  • ?

    輸出分區為輸入分區子集型

    (1)filter

    filter的功能是對元素進行過濾,對每個元素應用f函數,返回值為true的元素在RDD中保留,返回為false的將過濾掉。 內部實現相當于生成FilteredRDD(this,sc.clean(f))。

    圖中,每個方框代表一個RDD分區。 T可以是任意的類型。通過用戶自定義的過濾函數f,對每個數據項進行操作,將滿足條件,返回結果為true的數據項保留。 例如,過濾掉V2、 V3保留了V1,將區分命名為V1’。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Return?a?new?RDD?containing?only?the?elements?that?satisfy?a?predicate.??
  • ?*/??
  • def?filter(f:?T?=>?Boolean):?RDD[T]?=?{??
  • ??val?cleanF?=?sc.clean(f)??
  • ??new?MapPartitionsRDD[T,?T](??
  • ????this,??
  • ????(context,?pid,?iter)?=>?iter.filter(cleanF),??
  • ????preservesPartitioning?=?true)??
  • }??
  • ?

    (2)distinct

    distinct將RDD中的元素進行去重操作。?

    圖中,每個方框代表一個分區,通過distinct函數,將數據去重。 例如,重復數據V1、 V1去重后只保留一份V1。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Return?a?new?RDD?containing?the?distinct?elements?in?this?RDD.??
  • ?*/??
  • def?distinct(numPartitions:?Int)(implicit?ord:?Ordering[T]?=?null):?RDD[T]?=??
  • ??map(x?=>?(x,?null)).reduceByKey((x,?y)?=>?x,?numPartitions).map(_._1)??
  • ??
  • /**??
  • ?*?Return?a?new?RDD?containing?the?distinct?elements?in?this?RDD.??
  • ?*/??
  • def?distinct():?RDD[T]?=?distinct(partitions.size)??
  • ?

    (3)subtract

    subtract相當于進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。?

    圖中,左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。V1在兩個RDD中均有,根據差集運算規則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Return?an?RDD?with?the?elements?from?`this`?that?are?not?in?`other`.??
  • ?*??
  • ?*?Uses?`this`?partitioner/partition?size,?because?even?if?`other`?is?huge,?the?resulting??
  • ?*?RDD?will?be?<=?us.??
  • ?*/??
  • def?subtract(other:?RDD[T]):?RDD[T]?=??
  • ??subtract(other,?partitioner.getOrElse(new?HashPartitioner(partitions.size)))??
  • ??
  • /**??
  • ?*?Return?an?RDD?with?the?elements?from?`this`?that?are?not?in?`other`.??
  • ?*/??
  • def?subtract(other:?RDD[T],?numPartitions:?Int):?RDD[T]?=??
  • ??subtract(other,?new?HashPartitioner(numPartitions))??
  • ??
  • /**??
  • ?*?Return?an?RDD?with?the?elements?from?`this`?that?are?not?in?`other`.??
  • ?*/??
  • def?subtract(other:?RDD[T],?p:?Partitioner)(implicit?ord:?Ordering[T]?=?null):?RDD[T]?=?{??
  • ??if?(partitioner?==?Some(p))?{??
  • ????//?Our?partitioner?knows?how?to?handle?T?(which,?since?we?have?a?partitioner,?is??
  • ????//?really?(K,?V))?so?make?a?new?Partitioner?that?will?de-tuple?our?fake?tuples??
  • ????val?p2?=?new?Partitioner()?{??
  • ??????override?def?numPartitions?=?p.numPartitions??
  • ??????override?def?getPartition(k:?Any)?=?p.getPartition(k.asInstanceOf[(Any,?_)]._1)??
  • ????}??
  • ????//?Unfortunately,?since?we're?making?a?new?p2,?we'll?get?ShuffleDependencies??
  • ????//?anyway,?and?when?calling?.keys,?will?not?have?a?partitioner?set,?even?though??
  • ????//?the?SubtractedRDD?will,?thanks?to?p2's?de-tupled?partitioning,?already?be??
  • ????//?partitioned?by?the?right/real?keys?(e.g.?p).??
  • ????this.map(x?=>?(x,?null)).subtractByKey(other.map((_,?null)),?p2).keys??
  • ??}?else?{??
  • ????this.map(x?=>?(x,?null)).subtractByKey(other.map((_,?null)),?p).keys??
  • ??}??
  • }??
  • ?

    (4)sample

    sample將RDD這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。?
    參數說明:

    withReplacement=true, 表示有放回的抽樣;?
    withReplacement=false, 表示無放回的抽樣。


    每個方框是一個RDD分區。通過sample函數,采樣50%的數據。V1、V2、U1、U2、U3、U4采樣出數據V1和U1、U2,形成新的RDD。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Return?a?sampled?subset?of?this?RDD.??
  • ?*/??
  • def?sample(withReplacement:?Boolean,??
  • ????fraction:?Double,??
  • ????seed:?Long?=?Utils.random.nextLong):?RDD[T]?=?{??
  • ??require(fraction?>=?0.0,?"Negative?fraction?value:?"?+?fraction)??
  • ??if?(withReplacement)?{??
  • ????new?PartitionwiseSampledRDD[T,?T](this,?new?PoissonSampler[T](fraction),?true,?seed)??
  • ??}?else?{??
  • ????new?PartitionwiseSampledRDD[T,?T](this,?new?BernoulliSampler[T](fraction),?true,?seed)??
  • ??}??
  • }??
  • ?

    (5)takeSample

    takeSample()函數和上面的sample函數是一個原理,但是不使用相對比例采樣,而是按設定的采樣個數進行采樣,同時返回結果不再是RDD,而是相當于對采樣后的數據進行collect(),返回結果的集合為單機的數組。

    圖中,左側的方框代表分布式的各個節點上的分區,右側方框代表單機上返回的結果數組。通過takeSample對數據采樣,設置為采樣一份數據,返回結果為V1。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Return?a?fixed-size?sampled?subset?of?this?RDD?in?an?array??
  • ?*??
  • ?*?@param?withReplacement?whether?sampling?is?done?with?replacement??
  • ?*?@param?num?size?of?the?returned?sample??
  • ?*?@param?seed?seed?for?the?random?number?generator??
  • ?*?@return?sample?of?specified?size?in?an?array??
  • ?*/??
  • def?takeSample(withReplacement:?Boolean,??
  • ????num:?Int,??
  • ????seed:?Long?=?Utils.random.nextLong):?Array[T]?=?{??
  • ??val?numStDev?=??10.0??
  • ??
  • ??if?(num?<?0)?{??
  • ????throw?new?IllegalArgumentException("Negative?number?of?elements?requested")??
  • ??}?else?if?(num?==?0)?{??
  • ????return?new?Array[T](0)??
  • ??}??
  • ??
  • ??val?initialCount?=?this.count()??
  • ??if?(initialCount?==?0)?{??
  • ????return?new?Array[T](0)??
  • ??}??
  • ??
  • ??val?maxSampleSize?=?Int.MaxValue?-?(numStDev?*?math.sqrt(Int.MaxValue)).toInt??
  • ??if?(num?>?maxSampleSize)?{??
  • ????throw?new?IllegalArgumentException("Cannot?support?a?sample?size?>?Int.MaxValue?-?"?+??
  • ??????s"$numStDev?*?math.sqrt(Int.MaxValue)")??
  • ??}??
  • ??
  • ??val?rand?=?new?Random(seed)??
  • ??if?(!withReplacement?&&?num?>=?initialCount)?{??
  • ????return?Utils.randomizeInPlace(this.collect(),?rand)??
  • ??}??
  • ??
  • ??val?fraction?=?SamplingUtils.computeFractionForSampleSize(num,?initialCount,??
  • ????withReplacement)??
  • ??
  • ??var?samples?=?this.sample(withReplacement,?fraction,?rand.nextInt()).collect()??
  • ??
  • ??//?If?the?first?sample?didn't?turn?out?large?enough,?keep?trying?to?take?samples;??
  • ??//?this?shouldn't?happen?often?because?we?use?a?big?multiplier?for?the?initial?size??
  • ??var?numIters?=?0??
  • ??while?(samples.length?<?num)?{??
  • ????logWarning(s"Needed?to?re-sample?due?to?insufficient?sample?size.?Repeat?#$numIters")??
  • ????samples?=?this.sample(withReplacement,?fraction,?rand.nextInt()).collect()??
  • ????numIters?+=?1??
  • ??}??
  • ??
  • ??Utils.randomizeInPlace(samples,?rand).take(num)??
  • }??
  • ?

    Cache型

    (1)cache

    cache將RDD元素從磁盤緩存到內存,相當于persist(MEMORY_ONLY)函數的功能。?

    圖中,每個方框代表一個RDD分區,左側相當于數據分區都存儲在磁盤,通過cache算子將數據緩存在內存。

    源碼:

    ?

    [plain]?view plaincopy
  • /**?Persist?this?RDD?with?the?default?storage?level?(`MEMORY_ONLY`).?*/??
  • def?cache():?this.type?=?persist()??
  • ?

    (2)persist

    persist函數對RDD進行緩存操作。數據緩存在哪里由StorageLevel枚舉類型確定。?
    有幾種類型的組合,DISK代表磁盤,MEMORY代表內存,SER代表數據是否進行序列化存儲。StorageLevel是枚舉類型,代表存儲模式,如,MEMORY_AND_DISK_SER代表數據可以存儲在內存和磁盤,并且以序列化的方式存儲。 其他同理。



    圖中,方框代表RDD分區。 disk代表存儲在磁盤,mem代表存儲在內存。 數據最初全部存儲在磁盤,通過persist(MEMORY_AND_DISK)將數據緩存到內存,但是有的分區無法容納在內存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤,將含有U1,U2的RDD仍舊存儲在內存。

    源碼:

    ?

    [plain]?view plaincopy
  • /**??
  • ?*?Set?this?RDD's?storage?level?to?persist?its?values?across?operations?after?the?first?time??
  • ?*?it?is?computed.?This?can?only?be?used?to?assign?a?new?storage?level?if?the?RDD?does?not??
  • ?*?have?a?storage?level?set?yet..??
  • ?*/??
  • def?persist(newLevel:?StorageLevel):?this.type?=?{??
  • ??//?TODO:?Handle?changes?of?StorageLevel??
  • ??if?(storageLevel?!=?StorageLevel.NONE?&&?newLevel?!=?storageLevel)?{??
  • ????throw?new?UnsupportedOperationException(??
  • ??????"Cannot?change?storage?level?of?an?RDD?after?it?was?already?assigned?a?level")??
  • ??}??
  • ??sc.persistRDD(this)??
  • ??//?Register?the?RDD?with?the?ContextCleaner?for?automatic?GC-based?cleanup??
  • ??sc.cleaner.foreach(_.registerRDDForCleanup(this))??
  • ??storageLevel?=?newLevel??
  • ??this??
  • }??
  • ??
  • /**?Persist?this?RDD?with?the?default?storage?level?(`MEMORY_ONLY`).?*/??
  • def?persist():?this.type?=?persist(StorageLevel.MEMORY_ONLY)??
  • 原文鏈接:http://blog.csdn.net/jasonding1354

    總結

    以上是生活随笔為你收集整理的RDD之四:Value型Transformation算子的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。