處理數據類型為Value型的Transformation算子可以根據RDD變換算子的輸入分區與輸出分區關系分為以下幾種類型:
1)輸入分區與輸出分區一對一型?
2)輸入分區與輸出分區多對一型?
3)輸入分區與輸出分區多對多型?
4)輸出分區為輸入分區子集型?
5)還有一種特殊的輸入與輸出分區一對一的算子類型:Cache型。 Cache算子對RDD分區進行緩存
輸入分區與輸出分區一對一型
(1)map
1.map(func):數據集中的每個元素經過用戶自定義的函數轉換形成一個新的RDD,新的RDD叫MappedRDD.
package com.sf.transform.base;import org.apache.spark.SparkContext
import org.apache.spark.SparkConfobject Map {def main(args: Array[String]) {val conf =
new SparkConf().setMaster("local").setAppName("map"
)val sc =
new SparkContext(conf)val rdd = sc.parallelize(1 to 10)
//創建RDDval map = rdd.map(_*2)
//對RDD中的每個元素都乘于2map.foreach(x => print(x+" "
))sc.stop()}
} 輸出: 2 4 6 8 10 12 14 16 18 20
(RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分區集合,下同) (2)flatMap
與map類似,但每個元素輸入項都可以被映射到0個或多個的輸出項,最終將結果”扁平化“后輸出。
package com.sf.transform.baseimport org.apache.spark.SparkContext
import org.apache.spark.SparkConfobject FlatMap {def main(args: Array[String]) {val conf =
new SparkConf().setMaster("local").setAppName("map"
)val sc =
new SparkContext(conf)val rdd = sc.parallelize(1 to 5
)val fm = rdd.map(x => (1
to x)).collect()fm.foreach(x => print(x + " "
))sc.stop()}
} 輸出: 1 1 2 1 2 3 1 2 3 4 1 2 3 4 5 如果是map函數其輸出如下: Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)
?(RDD依賴圖)
?
?
?
(3)mapPartitions
mapPartitions函數獲取到每個分區的迭代器,在函數中通過這個分區整體的迭代器對整個分區的元素進行操作。 內部實現是生成MapPartitionsRDD。
類似與map,map作用于每個分區的每個元素,但mapPartitions作用于每個分區中
func的類型:Iterator[T] => Iterator[U] 假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,當在映射的過程中不斷的創建對象時就可以使用mapPartitions比map的效率要高很多,比如當向數據庫寫入數據時,如果使用map就需要為每個元素創建connection對象,但使用mapPartitions的話就需要為每個分區創建connetcion對象 (例3):輸出有女性的名字:
package com.sf.transform.baseimport org.apache.spark.SparkContext
import org.apache.spark.SparkConfobject MapPartitions {//定義函數def partitionsFun(
/*index : Int,*/ iter: Iterator[(String, String)]): Iterator[String] =
{var woman =
List[String]()while (iter.hasNext) {val next =
iter.next()next match {case (_, "female") => woman =
/*"["+index+"]"+*/ next._1 :: womancase _ =>
}}return woman.iterator}def main(args: Array[String]) {val conf =
new SparkConf().setMaster("local").setAppName("mappartitions"
)val sc =
new SparkContext(conf)val list = List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female"
))val rdd = sc.parallelize(list, 2)
//第二個參數是分區數
val mp =
rdd.mapPartitions(partitionsFun)/*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/mp.collect.foreach(x => (println(x + " ")))
//將分區中的元素轉換成Array再輸出
}
} 輸出: kpop lucy 其實這個效果可以用一條語句完成 val mp = rdd.mapPartitions(x => x.filter(_._2 == "female")).map(x => x._1) 之所以不那么做是為了演示函數的定義
??(RDD依賴圖) ?4.mapPartitionsWithIndex(func):與mapPartitions類似,不同的是函數多了個分區索引的參數
func類型:(Int, Iterator[T]) => Iterator[U] (例4):將例3橙色的注釋部分去掉即是 package com.sf.transform.baseimport org.apache.spark.SparkContext
import org.apache.spark.SparkConfobject MapPartitions {//定義函數def partitionsFun( iter: Iterator[(String, String)]): Iterator[String] = {var woman = List[String]()while (iter.hasNext) { val next = iter.next() next match { case (_, "female") => woman = next._1 :: woman case _ => } } return woman.iterator } def partitionsFun( index : Int, iter: Iterator[(String, String)]): Iterator[String] = { var woman = List[String]() while (iter.hasNext) { val next = iter.next() next match { case (_, "female") => woman = "["+index+"]"+ next._1 :: woman case _ => } } return woman.iterator } def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("mappartitions") val sc = new SparkContext(conf) val list = List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")) val rdd = sc.parallelize(list, 2) //第二個參數是分區數 val mp = rdd.mapPartitions(partitionsFun) println() mp.collect.foreach(x => (print(x + " "))) //將分區中的元素轉換成Array再輸出 println() val mp2 = rdd.mapPartitionsWithIndex(partitionsFun) mp2.collect.foreach(x => (print(x + " "))) //將分區中的元素轉換成Array再輸出 } }
輸出:(帶了分區索引)
[Stage 0:> (0 + 0) / 2]kpop lucy
[0]kpop [1]lucy (4)glom
glom函數將每個分區形成一個數組,內部實現是返回的RDD[Array[T]]。?
圖中,方框代表一個分區。 該圖表示含有V1、 V2、 V3的分區通過函數glom形成一個數組Array[(V1),(V2),(V3)]。
源碼:
?
[plain]?view plaincopy
/**???*?Return?an?RDD?created?by?coalescing?all?elements?within?each?partition?into?an?array.???*/??def?glom():?RDD[Array[T]]?=?{????new?MapPartitionsRDD[Array[T],?T](this,?(context,?pid,?iter)?=>?Iterator(iter.toArray))??}?? ?
輸入分區與輸出分區多對一型
(1)union
使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合并的RDD元素數據類型相同,并不進行去重操作,保存所有元素。如果想去重,可以使用distinct()。++符號相當于uion函數操作。
圖中,左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一個RDD。V1、V1、V2、V8形成一個分區,其他元素同理進行合并。
源碼:
?
[plain]?view plaincopy
/**???*?Return?the?union?of?this?RDD?and?another?one.?Any?identical?elements?will?appear?multiple???*?times?(use?`.distinct()`?to?eliminate?them).???*/??def?union(other:?RDD[T]):?RDD[T]?=?{????if?(partitioner.isDefined?&&?other.partitioner?==?partitioner)?{??????new?PartitionerAwareUnionRDD(sc,?Array(this,?other))????}?else?{??????new?UnionRDD(sc,?Array(this,?other))????}??}?? ?
(2)certesian
對兩個RDD內的所有元素進行笛卡爾積操作。操作后,內部實現返回CartesianRDD。?
左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。?
大方框代表RDD,大方框中的小方框代表RDD分區。 例如,V1和另一個RDD中的W1、 W2、 Q5進行笛卡爾積運算形成(V1,W1)、(V1,W2)、(V1,Q5)。
源碼:
?
[plain]?view plaincopy
/**???*?Return?the?Cartesian?product?of?this?RDD?and?another?one,?that?is,?the?RDD?of?all?pairs?of???*?elements?(a,?b)?where?a?is?in?`this`?and?b?is?in?`other`.???*/??def?cartesian[U:?ClassTag](other:?RDD[U]):?RDD[(T,?U)]?=?new?CartesianRDD(sc,?this,?other)?? ?
輸入分區與輸出分區多對多型
groupBy
將元素通過函數生成相應的Key,數據就轉化為Key-Value格式,之后將Key相同的元素分為一組。
val cleanF = sc.clean(f)中sc.clean函數將用戶函數預處理;?
this.map(t => (cleanF(t), t)).groupByKey(p)對數據map進行函數操作,再對groupByKey進行分組操作。其中,p中確定了分區個數和分區函數,也就決定了并行化的程度。
圖中,方框代表一個RDD分區,相同key的元素合并到一個組。 例如,V1,V2合并為一個Key-Value對,其中key為“ V” ,Value為“ V1,V2” ,形成V,Seq(V1,V2)。
源碼:
?
[plain]?view plaincopy
/**???*?Return?an?RDD?of?grouped?items.?Each?group?consists?of?a?key?and?a?sequence?of?elements???*?mapping?to?that?key.?The?ordering?of?elements?within?each?group?is?not?guaranteed,?and???*?may?even?differ?each?time?the?resulting?RDD?is?evaluated.???*???*?Note:?This?operation?may?be?very?expensive.?If?you?are?grouping?in?order?to?perform?an???*?aggregation?(such?as?a?sum?or?average)?over?each?key,?using?[[PairRDDFunctions.aggregateByKey]]???*?or?[[PairRDDFunctions.reduceByKey]]?will?provide?much?better?performance.???*/??def?groupBy[K](f:?T?=>?K)(implicit?kt:?ClassTag[K]):?RDD[(K,?Iterable[T])]?=????groupBy[K](f,?defaultPartitioner(this))????/**???*?Return?an?RDD?of?grouped?elements.?Each?group?consists?of?a?key?and?a?sequence?of?elements???*?mapping?to?that?key.?The?ordering?of?elements?within?each?group?is?not?guaranteed,?and???*?may?even?differ?each?time?the?resulting?RDD?is?evaluated.???*???*?Note:?This?operation?may?be?very?expensive.?If?you?are?grouping?in?order?to?perform?an???*?aggregation?(such?as?a?sum?or?average)?over?each?key,?using?[[PairRDDFunctions.aggregateByKey]]???*?or?[[PairRDDFunctions.reduceByKey]]?will?provide?much?better?performance.???*/??def?groupBy[K](f:?T?=>?K,?numPartitions:?Int)(implicit?kt:?ClassTag[K]):?RDD[(K,?Iterable[T])]?=????groupBy(f,?new?HashPartitioner(numPartitions))????/**???*?Return?an?RDD?of?grouped?items.?Each?group?consists?of?a?key?and?a?sequence?of?elements???*?mapping?to?that?key.?The?ordering?of?elements?within?each?group?is?not?guaranteed,?and???*?may?even?differ?each?time?the?resulting?RDD?is?evaluated.???*???*?Note:?This?operation?may?be?very?expensive.?If?you?are?grouping?in?order?to?perform?an???*?aggregation?(such?as?a?sum?or?average)?over?each?key,?using?[[PairRDDFunctions.aggregateByKey]]???*?or?[[PairRDDFunctions.reduceByKey]]?will?provide?much?better?performance.???*/??def?groupBy[K](f:?T?=>?K,?p:?Partitioner)(implicit?kt:?ClassTag[K],?ord:?Ordering[K]?=?null)??????:?RDD[(K,?Iterable[T])]?=?{????val?cleanF?=?sc.clean(f)????this.map(t?=>?(cleanF(t),?t)).groupByKey(p)??}?? ?
輸出分區為輸入分區子集型
(1)filter
filter的功能是對元素進行過濾,對每個元素應用f函數,返回值為true的元素在RDD中保留,返回為false的將過濾掉。 內部實現相當于生成FilteredRDD(this,sc.clean(f))。
圖中,每個方框代表一個RDD分區。 T可以是任意的類型。通過用戶自定義的過濾函數f,對每個數據項進行操作,將滿足條件,返回結果為true的數據項保留。 例如,過濾掉V2、 V3保留了V1,將區分命名為V1’。
源碼:
?
[plain]?view plaincopy
/**???*?Return?a?new?RDD?containing?only?the?elements?that?satisfy?a?predicate.???*/??def?filter(f:?T?=>?Boolean):?RDD[T]?=?{????val?cleanF?=?sc.clean(f)????new?MapPartitionsRDD[T,?T](??????this,??????(context,?pid,?iter)?=>?iter.filter(cleanF),??????preservesPartitioning?=?true)??}?? ?
(2)distinct
distinct將RDD中的元素進行去重操作。?
圖中,每個方框代表一個分區,通過distinct函數,將數據去重。 例如,重復數據V1、 V1去重后只保留一份V1。
源碼:
?
[plain]?view plaincopy
/**???*?Return?a?new?RDD?containing?the?distinct?elements?in?this?RDD.???*/??def?distinct(numPartitions:?Int)(implicit?ord:?Ordering[T]?=?null):?RDD[T]?=????map(x?=>?(x,?null)).reduceByKey((x,?y)?=>?x,?numPartitions).map(_._1)????/**???*?Return?a?new?RDD?containing?the?distinct?elements?in?this?RDD.???*/??def?distinct():?RDD[T]?=?distinct(partitions.size)?? ?
(3)subtract
subtract相當于進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。?
圖中,左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。V1在兩個RDD中均有,根據差集運算規則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。
源碼:
?
[plain]?view plaincopy
/**???*?Return?an?RDD?with?the?elements?from?`this`?that?are?not?in?`other`.???*???*?Uses?`this`?partitioner/partition?size,?because?even?if?`other`?is?huge,?the?resulting???*?RDD?will?be?<=?us.???*/??def?subtract(other:?RDD[T]):?RDD[T]?=????subtract(other,?partitioner.getOrElse(new?HashPartitioner(partitions.size)))????/**???*?Return?an?RDD?with?the?elements?from?`this`?that?are?not?in?`other`.???*/??def?subtract(other:?RDD[T],?numPartitions:?Int):?RDD[T]?=????subtract(other,?new?HashPartitioner(numPartitions))????/**???*?Return?an?RDD?with?the?elements?from?`this`?that?are?not?in?`other`.???*/??def?subtract(other:?RDD[T],?p:?Partitioner)(implicit?ord:?Ordering[T]?=?null):?RDD[T]?=?{????if?(partitioner?==?Some(p))?{??????//?Our?partitioner?knows?how?to?handle?T?(which,?since?we?have?a?partitioner,?is??????//?really?(K,?V))?so?make?a?new?Partitioner?that?will?de-tuple?our?fake?tuples??????val?p2?=?new?Partitioner()?{????????override?def?numPartitions?=?p.numPartitions????????override?def?getPartition(k:?Any)?=?p.getPartition(k.asInstanceOf[(Any,?_)]._1)??????}??????//?Unfortunately,?since?we're?making?a?new?p2,?we'll?get?ShuffleDependencies??????//?anyway,?and?when?calling?.keys,?will?not?have?a?partitioner?set,?even?though??????//?the?SubtractedRDD?will,?thanks?to?p2's?de-tupled?partitioning,?already?be??????//?partitioned?by?the?right/real?keys?(e.g.?p).??????this.map(x?=>?(x,?null)).subtractByKey(other.map((_,?null)),?p2).keys????}?else?{??????this.map(x?=>?(x,?null)).subtractByKey(other.map((_,?null)),?p).keys????}??}?? ?
(4)sample
sample將RDD這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。?
參數說明:
withReplacement=true, 表示有放回的抽樣;?
withReplacement=false, 表示無放回的抽樣。
每個方框是一個RDD分區。通過sample函數,采樣50%的數據。V1、V2、U1、U2、U3、U4采樣出數據V1和U1、U2,形成新的RDD。
源碼:
?
[plain]?view plaincopy
/**???*?Return?a?sampled?subset?of?this?RDD.???*/??def?sample(withReplacement:?Boolean,??????fraction:?Double,??????seed:?Long?=?Utils.random.nextLong):?RDD[T]?=?{????require(fraction?>=?0.0,?"Negative?fraction?value:?"?+?fraction)????if?(withReplacement)?{??????new?PartitionwiseSampledRDD[T,?T](this,?new?PoissonSampler[T](fraction),?true,?seed)????}?else?{??????new?PartitionwiseSampledRDD[T,?T](this,?new?BernoulliSampler[T](fraction),?true,?seed)????}??}?? ?
(5)takeSample
takeSample()函數和上面的sample函數是一個原理,但是不使用相對比例采樣,而是按設定的采樣個數進行采樣,同時返回結果不再是RDD,而是相當于對采樣后的數據進行collect(),返回結果的集合為單機的數組。
圖中,左側的方框代表分布式的各個節點上的分區,右側方框代表單機上返回的結果數組。通過takeSample對數據采樣,設置為采樣一份數據,返回結果為V1。
源碼:
?
[plain]?view plaincopy
/**???*?Return?a?fixed-size?sampled?subset?of?this?RDD?in?an?array???*???*?@param?withReplacement?whether?sampling?is?done?with?replacement???*?@param?num?size?of?the?returned?sample???*?@param?seed?seed?for?the?random?number?generator???*?@return?sample?of?specified?size?in?an?array???*/??def?takeSample(withReplacement:?Boolean,??????num:?Int,??????seed:?Long?=?Utils.random.nextLong):?Array[T]?=?{????val?numStDev?=??10.0??????if?(num?<?0)?{??????throw?new?IllegalArgumentException("Negative?number?of?elements?requested")????}?else?if?(num?==?0)?{??????return?new?Array[T](0)????}??????val?initialCount?=?this.count()????if?(initialCount?==?0)?{??????return?new?Array[T](0)????}??????val?maxSampleSize?=?Int.MaxValue?-?(numStDev?*?math.sqrt(Int.MaxValue)).toInt????if?(num?>?maxSampleSize)?{??????throw?new?IllegalArgumentException("Cannot?support?a?sample?size?>?Int.MaxValue?-?"?+????????s"$numStDev?*?math.sqrt(Int.MaxValue)")????}??????val?rand?=?new?Random(seed)????if?(!withReplacement?&&?num?>=?initialCount)?{??????return?Utils.randomizeInPlace(this.collect(),?rand)????}??????val?fraction?=?SamplingUtils.computeFractionForSampleSize(num,?initialCount,??????withReplacement)??????var?samples?=?this.sample(withReplacement,?fraction,?rand.nextInt()).collect()??????//?If?the?first?sample?didn't?turn?out?large?enough,?keep?trying?to?take?samples;????//?this?shouldn't?happen?often?because?we?use?a?big?multiplier?for?the?initial?size????var?numIters?=?0????while?(samples.length?<?num)?{??????logWarning(s"Needed?to?re-sample?due?to?insufficient?sample?size.?Repeat?#$numIters")??????samples?=?this.sample(withReplacement,?fraction,?rand.nextInt()).collect()??????numIters?+=?1????}??????Utils.randomizeInPlace(samples,?rand).take(num)??}?? ?
Cache型
(1)cache
cache將RDD元素從磁盤緩存到內存,相當于persist(MEMORY_ONLY)函數的功能。?
圖中,每個方框代表一個RDD分區,左側相當于數據分區都存儲在磁盤,通過cache算子將數據緩存在內存。
源碼:
?
[plain]?view plaincopy
/**?Persist?this?RDD?with?the?default?storage?level?(`MEMORY_ONLY`).?*/??def?cache():?this.type?=?persist()?? ?
(2)persist
persist函數對RDD進行緩存操作。數據緩存在哪里由StorageLevel枚舉類型確定。?
有幾種類型的組合,DISK代表磁盤,MEMORY代表內存,SER代表數據是否進行序列化存儲。StorageLevel是枚舉類型,代表存儲模式,如,MEMORY_AND_DISK_SER代表數據可以存儲在內存和磁盤,并且以序列化的方式存儲。 其他同理。
圖中,方框代表RDD分區。 disk代表存儲在磁盤,mem代表存儲在內存。 數據最初全部存儲在磁盤,通過persist(MEMORY_AND_DISK)將數據緩存到內存,但是有的分區無法容納在內存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤,將含有U1,U2的RDD仍舊存儲在內存。
源碼:
?
[plain]?view plaincopy
/**???*?Set?this?RDD's?storage?level?to?persist?its?values?across?operations?after?the?first?time???*?it?is?computed.?This?can?only?be?used?to?assign?a?new?storage?level?if?the?RDD?does?not???*?have?a?storage?level?set?yet..???*/??def?persist(newLevel:?StorageLevel):?this.type?=?{????//?TODO:?Handle?changes?of?StorageLevel????if?(storageLevel?!=?StorageLevel.NONE?&&?newLevel?!=?storageLevel)?{??????throw?new?UnsupportedOperationException(????????"Cannot?change?storage?level?of?an?RDD?after?it?was?already?assigned?a?level")????}????sc.persistRDD(this)????//?Register?the?RDD?with?the?ContextCleaner?for?automatic?GC-based?cleanup????sc.cleaner.foreach(_.registerRDDForCleanup(this))????storageLevel?=?newLevel????this??}????/**?Persist?this?RDD?with?the?default?storage?level?(`MEMORY_ONLY`).?*/??def?persist():?this.type?=?persist(StorageLevel.MEMORY_ONLY)?? 原文鏈接:http://blog.csdn.net/jasonding1354
總結
以上是生活随笔為你收集整理的RDD之四:Value型Transformation算子的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。