10.Spark之RDD及编程接口
2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
1.起點Hello World
????val sc = new SparkContext("spark://...", "Hello World", "SPARK_HOME路徑", "APP_JAR路徑")
????val file = sc.textFile("hdfs:///root/Log")
????val filterRDD = file.filter(_.contains("Hello World"))
????filterRDD.cache()
????filterRDD.count()
第 1 行:在Spark中做任何操作,首先要創(chuàng)建一個Spark的上下文。
第 2 行:通過sc變量,利用textFile接口從HDFS文件系統(tǒng)讀入Log文件,返回一個變量file。
第 3 行:對file變量進行過濾操作。判斷每一行字符串是否包含“Hello World”字符串,生成新的變量filterRDD。
第 4 行:對filterRDD進行cache操作,以便后續(xù)操作重用filterRDD這個變量。
第 5 行:對filterRDD進行count計數(shù)操作,最后返回包含“Hello World”字符串的文本行數(shù)。
????短短五行程序,卻包含了Spark中很多重要的概念。下面逐一介紹Spark編程中的重要概念。
彈性分布式數(shù)據(jù)集RDD(Resilient Distributed DataSets):程序中的file和filterRDD變量都是RDD。
創(chuàng)建操作(creation operation):RDD的初始創(chuàng)建都是由SparkContext來負(fù)責(zé)的,將內(nèi)存中的集合或者外部文件系統(tǒng)作為輸入源。
轉(zhuǎn)換操作(transformation operation):將一個RDD通過一定的操作變換成另一個RDD,比如file通過filter操作變換成filterRDD,所以filter就是一個轉(zhuǎn)換操作。
控制操作(control operation):對RDD進行持久化。可以讓RDD保存在磁盤或者內(nèi)存中,以便后續(xù)重復(fù)使用。比如cache接口默認(rèn)將filterRDD緩存在內(nèi)存中。
行動操作 (action operation):由于Spark是惰性計算(lazy computing)的,所以對于任何RDD進行操作,都會出發(fā)Spark作業(yè)的運行,從而產(chǎn)生最終的結(jié)果。例如對filterRDD進行的count操作就是一個行動操作。Spark中的行動操作基本分為兩類,一類操作結(jié)果變成Scala集合或者標(biāo)量,另一類就將RDD保存到外部文件或者數(shù)據(jù)庫系統(tǒng)中。
????對于一個Spark數(shù)據(jù)處理程序而言,一般情況下RDD與操作之間的關(guān)系如下圖所示,經(jīng)過創(chuàng)建操作、轉(zhuǎn)換操作、控制操作、行動操作來完成一個作業(yè)。當(dāng)然在一個Spark應(yīng)用程序中,可以有多個行動操作,也就是有多個作業(yè)存在。
????
2.RDD的五個接口
????RDD是彈性分布式數(shù)據(jù)集,即一個RDD代表一個被分區(qū)的只讀數(shù)據(jù)集。一個RDD的生成只有兩種來源,在Hello World中已有所提現(xiàn):
來自內(nèi)存集合和外部存儲系統(tǒng)
通過轉(zhuǎn)換操作來自于其他RDD
????RDD沒必要隨時被實例化。由于RDD的接口只支持粗粒度的操作(即一個操作會被應(yīng)用到RDD的所有數(shù)據(jù)上),所以只要通過記錄下這些作用在RDD上的轉(zhuǎn)換操作,來構(gòu)建RDD的繼承關(guān)系(lineage),就可以有效的進行容錯處理,而不需要將實際的RDD數(shù)據(jù)進行拷貝。這對于RDD來說是一項非常強大的功能。也即是在一個Spark應(yīng)用程序中,我們所用到的每個RDD,在丟失或者操作失敗后都是可以重建的。
????應(yīng)用程序開發(fā)者還可以對RDD進行另外兩個方面的控制操作:持久化和分區(qū)。
開發(fā)者可以指明需要重用哪些RDD,選擇一種存儲策略(例如in-memory storage)將它們保存起來,以備使用。
開發(fā)者還可以讓RDD根據(jù)記錄中的鍵值在集群的機器之間重新分區(qū)。這對于RDD的位置優(yōu)化是非常有作用的。例如讓將要進行join操作的兩個數(shù)據(jù)集以同樣的方式進行哈希分區(qū)。
????如何表示這樣一個分區(qū)、高效容錯、支持持久化的分布式數(shù)據(jù)集呢?一般情況下抽象的RDD需要包含以下五個接口:
| partition | 分區(qū),一個RDD會有一個或者多個分區(qū) |
| preferredLocations(p) | 對于分區(qū)p而言,返回數(shù)據(jù)本地化計算的節(jié)點 |
| dependencies() | RDD的依賴關(guān)系 |
| compute(p, context) | 對于分區(qū)p而言,進行迭代計算 |
| partitioner() | RDD的分區(qū)函數(shù) |
2.1RDD分區(qū)(partitions)
????對于一個RDD而言,分區(qū)的多少涉及對這個RDD進行并行計算的粒度,每一個RDD分區(qū)的計算操作都在一個單獨的任務(wù)中被執(zhí)行。
????對于RDD分區(qū)而言,用戶可以自行指定多少分區(qū),如果未指定就會使用默認(rèn)值。可以利用RDD的成員變量partitions所返回的partition數(shù)組的大小來查詢一個RDD被劃分的分區(qū)數(shù)。例如,我們利用spark-shell交互式命令終端測試一下:
????指定分區(qū)數(shù)的情況:
????scala> val rdd = sc.parallelize(1 to 100, 2)
????scala> rdd.partitions.size
????
????未指定分區(qū)數(shù)的情況:(系統(tǒng)默認(rèn)的分區(qū)數(shù)是這個程序所分配到的資源的CPU核的個數(shù))
????scala> val rdd = sc.parallelize(1 to 100)
????scala> rdd.partitions.size
????
2.2RDD優(yōu)先位置(preferredLocations)
????RDD優(yōu)先位置屬性與Spark中的調(diào)度有關(guān),返回的是此RDD的每個partition所存儲的位置,按照“移動數(shù)據(jù)不如移動計算”的理念,在Spark進行任務(wù)調(diào)度的時候,盡可能的將任務(wù)分配到數(shù)據(jù)塊所存儲的位置。
????以從Hadoop中讀取數(shù)據(jù)生成RDD為例,preferredLocations返回每一個數(shù)據(jù)塊所在的機器名或者IP地址,如果每一塊數(shù)據(jù)是多份存儲的,那么就會返回多個機器地址。
????scala> var rdd = sc.textFile("hdfs://master:9000/input/wordcount.txt")
????scala> val hadoopRDD = rdd.dependencies(0).rdd
????scala> hadoopRDD.partitions.size
????scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))
????
2.3RDD依賴關(guān)系(dependencies)
????由于RDD是粗粒度的操作數(shù)據(jù)集,每一個轉(zhuǎn)換操作都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在Spark中存在兩種類型的依賴:窄依賴(Narrow Dependencies)、寬依賴(Wide Dependencies)。
窄依賴:每一個父RDD的分區(qū)最多只被子RDD的一個分區(qū)所使用。
寬依賴:多個子RDD的分區(qū)依賴于同一個父RDD的分區(qū)。
????在Spark中明確區(qū)分這兩種依賴關(guān)系有兩個方面的原因:
窄依賴可以在集群的一個節(jié)點上如流水線般執(zhí)行,可以計算所有父RDD的分區(qū);
寬依賴需要取得父RDD的所有分區(qū)上的數(shù)據(jù)進行計算,將會執(zhí)行類似于MapReduce一樣的Shuffle操作。
窄依賴在節(jié)點計算失敗后的恢復(fù)會更加有效,只需重新計算對應(yīng)的父RDD的分區(qū),而且可以在其它節(jié)點并行計算;
在寬依賴的繼承關(guān)系中,一個節(jié)點的失敗將會導(dǎo)致其父RDD的多個分區(qū)重新計算,這個代價是非常高的。
????
2.4RDD分區(qū)計算(compute)
????對于Spark中每個RDD的計算都是以partition(分區(qū))為單位的,而且RDD中的compute函數(shù)都是在對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。
????在下面程序中,rdd變量是一個被分成兩個分區(qū)的1~10集合,在rdd上連續(xù)進行轉(zhuǎn)換操作map和filter,由于compute函數(shù)只返回相應(yīng)分區(qū)數(shù)據(jù)的迭代器,所以只有最終實例化時才能顯示出兩個分區(qū)的最終計算結(jié)果。
?
2.5RDD分區(qū)函數(shù)(partitioner)
????在Spark中目前實現(xiàn)了兩種類型的分區(qū)函數(shù):HashPartitioner(哈希分區(qū))和RangePartitioner(區(qū)域分區(qū))。需要注意的是partitioner這個屬性只存在于(K, V)類型的RDD中,對于非(K, V)類型的partitioner的值就是None。partitioner函數(shù)既決定了RDD本身的分區(qū)數(shù)量,也可以作為父RDD Shuffle輸出(MapOutPut)中每個分區(qū)進行數(shù)據(jù)切割的依據(jù)。
????下面以HashPartitioner為例說明partitioner的功能。
?
3.RDD的四種操作
3.1創(chuàng)建操作
集合創(chuàng)建操作:RDD可以由內(nèi)部集合來生成,Spark提供了兩類函數(shù)來實現(xiàn):parallelize和makeRDD。
存儲創(chuàng)建操作:Spark整個生態(tài)系統(tǒng)與Hadoop完全兼容,Hadoop支持的文件類型或者數(shù)據(jù)庫類型,它同樣支持。
3.2轉(zhuǎn)換操作
基本轉(zhuǎn)換操作:
map[U: classTag](f: T => U): RDD(U):將RDD中類型為T的元素一對一地映射為類型為U的元素。
distinct(): RDD[T]:返回RDD中所有不一樣的元素。
flatMap[U: classTag](f: T => TraversableOnce[U]): RDD[U]:將RDD中的每一個元素進行一對多轉(zhuǎn)換。
repartition(numPartitions: Int): RDD[T]:相當(dāng)于coalesce函數(shù)中shuffle=true時的簡易實現(xiàn)。
coalesce(numPartitions: Int,shuffle: Boolean=false): RDD[T]:對RDD的分區(qū)進行重新分區(qū)。
randomSplit(weights: Array[Double],seed: Long=System.nanoTime): Array[RDD[T]]:根據(jù)weights權(quán)重將一個RDD切分成多個RDD。
glom():RDD[Array[T]]:將RDD每一個分區(qū)中類型為T的元素轉(zhuǎn)換成Array[T],這樣每一個分區(qū)就只有一個數(shù)組元素。
union(other: RDD[T]): RDD[T]:將兩個RDD集合中的數(shù)據(jù)進行合并,不會去重。
intersection(other: RDD[T]): RDD[T]:返回兩個RDD集合中的數(shù)據(jù)的交集,不含重復(fù)元素。
intersection(other: RDD[T], partitioner: Partitioner): RDD[T]:同上
subtract(other: RDD[T]): RDD[T]:返回在主RDD集合中出現(xiàn)但不在other中出現(xiàn)的元素。
subtract(other: RDD[T], partitioner: Partitioner): RDD[T]:同上
mapPatitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartiton: Boolean=false): RDD[U]
mapPatitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartiton: Boolean=false): RDD[U]
?
zip[U: ClassTag](other: RDD[U]): RDD[(T,U)]:將兩個RDD組成Key/Value形式的RDD,但它們的分區(qū)數(shù)量和元素數(shù)量必須相同,否則相同系統(tǒng)會拋出異常
zipPartitons(參數(shù)分多種情況,不一一列舉了):將多個RDD按照分區(qū)組成新的RDD,分區(qū)數(shù)相同,元素數(shù)沒有要求。
?
zipWithIndex(): RDD[(T,Long)]:將RDD中的元素和這個元素的ID組成鍵/值對。
zipWithUniqueId(): RDD[(T,Long)]:將RDD中的元素和一個唯一的ID組成鍵/值對。
鍵值RDD轉(zhuǎn)換操作
partitionBy(partitioner: Partitioner): RDD[(K,V)]:根據(jù)partitioner函數(shù)生成新的ShuffledRDD,原RDD重新分區(qū)
mapValues[U](f: V=>U): RDD[(K,U)]:針對[K,V]中的V進行map操作。
flatMapValues[U](f: V=>TraversableOnce[U]): RDD[(K,U)]:針對[K,V]中的V進行flatMap操作。
combineByKey(3個方法參數(shù)不同):
foldByKey(3個方法參數(shù)不同):
reduceByKey(3個方法參數(shù)不同):
groupByKey(3個方法參數(shù)不同):
cogroup(3個方法參數(shù)不同):
join(3個方法參數(shù)不同):
leftOuterJoin(3個方法參數(shù)不同):
rightOuterJoin(3個方法參數(shù)不同):
subtractByKey(3個方法參數(shù)不同):
3.3控制操作
cache(): RDD[T]:
persist(): RDD[T]:
persist(level: StorageLevel): RDD[T]:
在Spark中對RDD進行持久化操作是一項非常重要的功能,可以將RDD持久化在不同層次的存儲介質(zhì)中,以便后續(xù)的操作能夠重復(fù)使用,這對iterative(迭代)和interactive(交互)的應(yīng)用來說會極大的提高性能。
checkpoint:
checkpoint接口是將RDD持久化在HDFS中,與persist(如果也持久化在磁盤上)的一個區(qū)別是checkpoint將會切斷此RDD之前的依賴關(guān)系,而persist接口依然保留著RDD的依賴關(guān)系。checkpoint的主要作用有如下兩點:
(1)如果一個Spark程序會長時間駐留運行(如Spark Streaming一般會7*24小時運行),過長的依賴將會占用很多系統(tǒng)資源,定期將RDD進行checkpoint操作,能夠有效地節(jié)省系統(tǒng)資源。
(2)維護過長的依賴關(guān)系還會出現(xiàn)一個問題,如果Spark在運行過程中出現(xiàn)節(jié)點失敗的情況,那么RDD進行容錯重算的成本會非常高。
3.4行動操作
????行動操作是和轉(zhuǎn)換操作相對應(yīng)的一種對RDD的操作類型,在Spark程序中每調(diào)用一次行動操作,都會觸發(fā)一次Spark的調(diào)度 ? ? 并返回相應(yīng)的結(jié)果。從API來看,行動操作可以分為兩種類型:
行動操作將標(biāo)量或者集合返回給Spark的客戶端程序,比如返回RDD中數(shù)據(jù)集的數(shù)量或者一部分符合條件的數(shù)據(jù)。
first:返回RDD中的第一個元素。
count:返回RDD中元素的個數(shù)。
reduce(f: (T,T)=>T):對RDD中的元素進行二元計算,返回計算結(jié)果。
collect()/toArray():以集合形式返回RDD的元素。
take(num: Int):將RDD作為集合,返回集合中[0, num-1]下標(biāo)的元素。
top(num: Int):按照默認(rèn)的或者指定的排序規(guī)則,返回前num個元素。
takeOrdered(num: Int):以與top相反的排序規(guī)則,返回前num個元素。
aggregate[U](zeroValue: U)(seqOp: (U,T)=>U, combOp(U,U)=>U):aggregate行動操作中主要需要提供兩個函數(shù)。一個是seqOp函數(shù),其將RDD(元素類型為T,可以和U為同一類型)中的每一個分區(qū)的數(shù)據(jù)聚合成類型為U的值。另一個函數(shù)combOp將各個分區(qū)聚合起來的值合并在一起得到最終類型為U的返回值。
fold(zeroValue: T)(op: (T,T)=>T):aggregate的便利接口,op操作既是seqOp操作,也是combOp操作。
lookup(key: K): Seq[V]:lookup是針對(K,V)類型RDD的行動操作,對于給定的鍵值,返回與此鍵值對應(yīng)的所有值。
行動操作將RDD直接保存到外部文件系統(tǒng)或者數(shù)據(jù)庫中,比如HDFS文件系統(tǒng)中。
saveAsTextFile(path: String):
saveAsTextFile(path: String, codec: Class[_<:CompressionCodec]):
saveAsObjectFile(path: String):
saveAsHadoopFile[F<:OutputFormat[K,V]](path: String):
saveAsHadoopFile[F<:OutputFormat[K,V]](path: String, codec: Class[_<:CompressionCodec]):
saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: class[_], outputFormatClass: Class[]......):
saveAsHadoopDataset(conf: JobConf):
這是舊版本API中提供的七個將RDD存儲到外部介質(zhì)的函數(shù),前六個都是saveAsHadoopDataset的簡易實現(xiàn)版本,僅支持將RDD存儲到HDFS中,而saveAsHadoopDataset的參數(shù)類型是JobConf,所以它還可以將RDD保存到其它數(shù)據(jù)庫中,例如Hbase、MongoDB、Cassandra等。
Spark針對新版本Hadoop API提供了三個行動操作函數(shù)。
saveAsNewAPIHadoopFile[F<:NewOutputFormat[K,V]](path: String)(implicit fm: ClassTag[F]):
saveAsNewAPIHadoopFile(path: String, keyClass: Class[_],......):
saveAsNewAPIHadoopDataset(conf: Configuration):
前兩個API支持將RDD保存到HDFS中,第三個則支持所有MapReduce兼容的輸入輸出類型。
轉(zhuǎn)載于:https://my.oschina.net/xingkongxia/blog/611056
總結(jié)
以上是生活随笔為你收集整理的10.Spark之RDD及编程接口的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Oracle 查询当前日期
- 下一篇: WebPack在多页应用项目中的探索