spark rdd Transformation和Action 剖析
1.看到 這篇總結(jié)的這么好, 就悄悄的轉(zhuǎn)過來,供學(xué)習(xí)
?
wordcount.toDebugString查看RDD的繼承鏈條
所以廣義的講,對任何函數(shù)進(jìn)行某一項操作都可以認(rèn)為是一個算子,甚至包括求冪次,開方都可以認(rèn)為是一個算子,只是有的算子我們用了一個符號來代替他所要進(jìn)行的運算罷了,所以大家看到算子就不要糾結(jié),他和f(x)的f沒區(qū)別,它甚至和加減乘除的基本運算符號都沒有區(qū)別,只是他可以對單對象操作罷了(有的符號比如大于、小于號要對多對象操作)。又比如取概率P{X<x},概率是集合{X<x}(他是屬于實數(shù)集的子集)對[0,1]區(qū)間的一個映射,我們知道實數(shù)域和[0,1]區(qū)間是可以一一映射的(這個后面再說),所以取概率符號P,我們認(rèn)為也是一個算子,和微分,積分算子算子沒區(qū)別。
總而言之,算子就是映射,就是關(guān)系,就是**變換**!
**mapPartitions(f)**
f函數(shù)的輸入輸出都是每個分區(qū)集合的迭代器Iterator
def mapPartitions[U](f: (Iterator[T])?=> Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
該函數(shù)和map函數(shù)類似,只不過映射函數(shù)的參數(shù)由RDD中的每一個元素變成了RDD中每一個分區(qū)的迭代器。如果在映射的過程中需要頻繁創(chuàng)建額外的對象,使用mapPartitions要比map高效的過。
比如,將RDD中的所有數(shù)據(jù)通過JDBC連接寫入數(shù)據(jù)庫,如果使用map函數(shù),可能要為每一個元素都創(chuàng)建一個connection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個分區(qū)建立一個connection。
參數(shù)preservesPartitioning表示是否保留父RDD的partitioner分區(qū)信息。
參考文章:
http://lxw1234.com/archives/2015/07/348.htm
union(other: RDD[T])操作不去重,去重需要distinct()
subtract取兩個RDD中非公共的元素
sample返回RDD,takeSample直接返回數(shù)組(數(shù)組里面的元素為RDD中元素,類似于collect)
keyvalue之類的操作都在**PairRDDFunctions.scala**中
mapValues只對value進(jìn)行運算
groupBy相同key的元素的value組成集合
coGroup是在groupBy的基礎(chǔ)上
coGroup操作多個RDD,是兩個RDD里相同key的兩個value集合組成的元組
參考文章:
http://www.iteblog.com/archives/1280
**combineByKey和reduceByKey,groupByKey(內(nèi)部都是通過combineByKey)**
源碼分析:
????reduceByKey??mapSideCombine: Boolean = true
????
????groupByKey??mapSideCombine=false
所以優(yōu)先使用reduceByKey,參考文章:http://www.iteblog.com/archives/1357
**join操作**
本質(zhì)是先coGroup再笛卡爾積
????
??????def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
????this.cogroup(other, partitioner).flatMapValues( pair =>
??????for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
????)
??????}
??
**yield**?關(guān)鍵字的簡短總結(jié):
????針對每一次 for 循環(huán)的迭代, yield 會產(chǎn)生一個值,被循環(huán)記錄下來 (內(nèi)部實現(xiàn)上,像是一個緩沖區(qū)).
????當(dāng)循環(huán)結(jié)束后, 會返回所有 yield 的值組成的集合.
????返回集合的類型與被遍歷的集合類型是一致的.
參考文章:
http://unmi.cc/scala-yield-samples-for-loop/
cache persist也是lazy級別的
Action本質(zhì)sc.runJob
foreach
collect()相當(dāng)于toArray返回一個數(shù)組
collectAsMap()對keyvalue類型的RDD操作返回一個HashMap,key重復(fù)后面的元素會覆蓋前面的元素reduce
源碼解析:先調(diào)用collect()再放到HashMap[K, V]中
?????def collectAsMap(): Map[K, V] = {
????val data = self.collect()
????val map = new mutable.HashMap[K, V]
????map.sizeHint(data.length)
????data.foreach { pair => map.put(pair._1, pair._2) }
????map
??????}
????
**reduceByKeyLocally**相當(dāng)于reduceByKey+collectAsMap()
該函數(shù)將RDD[K,V]中每個K對應(yīng)的V值根據(jù)映射函數(shù)來運算,運算結(jié)果映射到一個Map[K,V]中,而不是RDD[K,V]。
參考文章:
http://lxw1234.com/archives/2015/07/360.htm
**lookup**也是針對keyvalue返回指定key對應(yīng)的value形成的seq
????def lookup(key: K): Seq[V]?
**reduce fold(每個分區(qū)是串行,有個初始值) aggregate(并行,與fold類似)**
前兩個元素作用的結(jié)果與第三元素作用依次類推
**SequenceFile**文件是Hadoop用來存儲二進(jìn)制形式的key-value對而設(shè)計的一種平面文件(Flat File)。目前,也有不少人在該文件的基礎(chǔ)之上提出了一些HDFS中小文件存儲的解決方案,他們的基本思路就是將小文件進(jìn)行合并成一個大文件,同時對這些小文件的位置信息構(gòu)建索引。不過,這類解決方案還涉及到Hadoop的另一種文件格式——**MapFile**文件。SequenceFile文件并不保證其存儲的key-value數(shù)據(jù)是按照key的某個順序存儲的,同時不支持append操作。
參考文章:http://blog.csdn.net/xhh198781/article/details/7693358
**saveAsTextFile**->TextOutputFormat??(key為null,value為元素toString)
**saveAsObjectFile**(二進(jìn)制)->saveAsSequenceFile->SequenceFileOutputFormat(key為null,value為BytesWritable)
cache\persist???
**checkpoint()**機(jī)制避免緩存丟失(內(nèi)存不足)要重新計算帶來的性能開銷,會導(dǎo)致另外一個作業(yè),比緩存更可靠
?
SparkContex.setCheckpointDir設(shè)置目錄位置
轉(zhuǎn)載于:https://www.cnblogs.com/nucdy/p/8029220.html
總結(jié)
以上是生活随笔為你收集整理的spark rdd Transformation和Action 剖析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring @EventListene
- 下一篇: ajax 三种数据格式