日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

10.Spark之RDD及编程接口

發(fā)布時間:2025/3/20 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 10.Spark之RDD及编程接口 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

1.起點Hello World

????val sc = new SparkContext("spark://...", "Hello World", "SPARK_HOME路徑", "APP_JAR路徑")

????val file = sc.textFile("hdfs:///root/Log")

????val filterRDD = file.filter(_.contains("Hello World"))

????filterRDD.cache()

????filterRDD.count()

  • 第 1 行:在Spark中做任何操作,首先要創(chuàng)建一個Spark的上下文。

  • 第 2 行:通過sc變量,利用textFile接口從HDFS文件系統(tǒng)讀入Log文件,返回一個變量file。

  • 第 3 行:對file變量進行過濾操作。判斷每一行字符串是否包含“Hello World”字符串,生成新的變量filterRDD。

  • 第 4 行:對filterRDD進行cache操作,以便后續(xù)操作重用filterRDD這個變量。

  • 第 5 行:對filterRDD進行count計數(shù)操作,最后返回包含“Hello World”字符串的文本行數(shù)。

????短短五行程序,卻包含了Spark中很多重要的概念。下面逐一介紹Spark編程中的重要概念。

  • 彈性分布式數(shù)據(jù)集RDD(Resilient Distributed DataSets):程序中的file和filterRDD變量都是RDD。

  • 創(chuàng)建操作(creation operation):RDD的初始創(chuàng)建都是由SparkContext來負(fù)責(zé)的,將內(nèi)存中的集合或者外部文件系統(tǒng)作為輸入源。

  • 轉(zhuǎn)換操作(transformation operation):將一個RDD通過一定的操作變換成另一個RDD,比如file通過filter操作變換成filterRDD,所以filter就是一個轉(zhuǎn)換操作。

  • 控制操作(control operation):對RDD進行持久化。可以讓RDD保存在磁盤或者內(nèi)存中,以便后續(xù)重復(fù)使用。比如cache接口默認(rèn)將filterRDD緩存在內(nèi)存中。

  • 行動操作 (action operation):由于Spark是惰性計算(lazy computing)的,所以對于任何RDD進行操作,都會出發(fā)Spark作業(yè)的運行,從而產(chǎn)生最終的結(jié)果。例如對filterRDD進行的count操作就是一個行動操作。Spark中的行動操作基本分為兩類,一類操作結(jié)果變成Scala集合或者標(biāo)量,另一類就將RDD保存到外部文件或者數(shù)據(jù)庫系統(tǒng)中。

  • ????對于一個Spark數(shù)據(jù)處理程序而言,一般情況下RDD與操作之間的關(guān)系如下圖所示,經(jīng)過創(chuàng)建操作、轉(zhuǎn)換操作、控制操作、行動操作來完成一個作業(yè)。當(dāng)然在一個Spark應(yīng)用程序中,可以有多個行動操作,也就是有多個作業(yè)存在。

    ????

    2.RDD的五個接口

    ????RDD是彈性分布式數(shù)據(jù)集,即一個RDD代表一個被分區(qū)的只讀數(shù)據(jù)集。一個RDD的生成只有兩種來源,在Hello World中已有所提現(xiàn):

  • 來自內(nèi)存集合和外部存儲系統(tǒng)

  • 通過轉(zhuǎn)換操作來自于其他RDD

  • ????RDD沒必要隨時被實例化。由于RDD的接口只支持粗粒度的操作(即一個操作會被應(yīng)用到RDD的所有數(shù)據(jù)上),所以只要通過記錄下這些作用在RDD上的轉(zhuǎn)換操作,來構(gòu)建RDD的繼承關(guān)系(lineage),就可以有效的進行容錯處理,而不需要將實際的RDD數(shù)據(jù)進行拷貝。這對于RDD來說是一項非常強大的功能。也即是在一個Spark應(yīng)用程序中,我們所用到的每個RDD,在丟失或者操作失敗后都是可以重建的。

    ????應(yīng)用程序開發(fā)者還可以對RDD進行另外兩個方面的控制操作:持久化和分區(qū)。

  • 開發(fā)者可以指明需要重用哪些RDD,選擇一種存儲策略(例如in-memory storage)將它們保存起來,以備使用。

  • 開發(fā)者還可以讓RDD根據(jù)記錄中的鍵值在集群的機器之間重新分區(qū)。這對于RDD的位置優(yōu)化是非常有作用的。例如讓將要進行join操作的兩個數(shù)據(jù)集以同樣的方式進行哈希分區(qū)。

  • ????如何表示這樣一個分區(qū)、高效容錯、支持持久化的分布式數(shù)據(jù)集呢?一般情況下抽象的RDD需要包含以下五個接口:

    partition分區(qū),一個RDD會有一個或者多個分區(qū)
    preferredLocations(p)對于分區(qū)p而言,返回數(shù)據(jù)本地化計算的節(jié)點
    dependencies()RDD的依賴關(guān)系
    compute(p, context)對于分區(qū)p而言,進行迭代計算
    partitioner()RDD的分區(qū)函數(shù)

    2.1RDD分區(qū)(partitions)

    ????對于一個RDD而言,分區(qū)的多少涉及對這個RDD進行并行計算的粒度,每一個RDD分區(qū)的計算操作都在一個單獨的任務(wù)中被執(zhí)行。

    ????對于RDD分區(qū)而言,用戶可以自行指定多少分區(qū),如果未指定就會使用默認(rèn)值。可以利用RDD的成員變量partitions所返回的partition數(shù)組的大小來查詢一個RDD被劃分的分區(qū)數(shù)。例如,我們利用spark-shell交互式命令終端測試一下:

    ????指定分區(qū)數(shù)的情況:

    ????scala> val rdd = sc.parallelize(1 to 100, 2)

    ????scala> rdd.partitions.size

    ????

    ????未指定分區(qū)數(shù)的情況:(系統(tǒng)默認(rèn)的分區(qū)數(shù)是這個程序所分配到的資源的CPU核的個數(shù))

    ????scala> val rdd = sc.parallelize(1 to 100)

    ????scala> rdd.partitions.size

    ????

    2.2RDD優(yōu)先位置(preferredLocations)

    ????RDD優(yōu)先位置屬性與Spark中的調(diào)度有關(guān),返回的是此RDD的每個partition所存儲的位置,按照“移動數(shù)據(jù)不如移動計算”的理念,在Spark進行任務(wù)調(diào)度的時候,盡可能的將任務(wù)分配到數(shù)據(jù)塊所存儲的位置。

    ????以從Hadoop中讀取數(shù)據(jù)生成RDD為例,preferredLocations返回每一個數(shù)據(jù)塊所在的機器名或者IP地址,如果每一塊數(shù)據(jù)是多份存儲的,那么就會返回多個機器地址。

    ????scala> var rdd = sc.textFile("hdfs://master:9000/input/wordcount.txt")

    ????scala> val hadoopRDD = rdd.dependencies(0).rdd

    ????scala> hadoopRDD.partitions.size

    ????scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))

    ????

    2.3RDD依賴關(guān)系(dependencies)

    ????由于RDD是粗粒度的操作數(shù)據(jù)集,每一個轉(zhuǎn)換操作都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在Spark中存在兩種類型的依賴:窄依賴(Narrow Dependencies)、寬依賴(Wide Dependencies)。

    • 窄依賴:每一個父RDD的分區(qū)最多只被子RDD的一個分區(qū)所使用。

    • 寬依賴:多個子RDD的分區(qū)依賴于同一個父RDD的分區(qū)。

    ????在Spark中明確區(qū)分這兩種依賴關(guān)系有兩個方面的原因:

  • 窄依賴可以在集群的一個節(jié)點上如流水線般執(zhí)行,可以計算所有父RDD的分區(qū);

    寬依賴需要取得父RDD的所有分區(qū)上的數(shù)據(jù)進行計算,將會執(zhí)行類似于MapReduce一樣的Shuffle操作。

  • 窄依賴在節(jié)點計算失敗后的恢復(fù)會更加有效,只需重新計算對應(yīng)的父RDD的分區(qū),而且可以在其它節(jié)點并行計算;

    在寬依賴的繼承關(guān)系中,一個節(jié)點的失敗將會導(dǎo)致其父RDD的多個分區(qū)重新計算,這個代價是非常高的。

  • ????

    2.4RDD分區(qū)計算(compute)

    ????對于Spark中每個RDD的計算都是以partition(分區(qū))為單位的,而且RDD中的compute函數(shù)都是在對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。

    ????在下面程序中,rdd變量是一個被分成兩個分區(qū)的1~10集合,在rdd上連續(xù)進行轉(zhuǎn)換操作map和filter,由于compute函數(shù)只返回相應(yīng)分區(qū)數(shù)據(jù)的迭代器,所以只有最終實例化時才能顯示出兩個分區(qū)的最終計算結(jié)果。


    2.5RDD分區(qū)函數(shù)(partitioner)

    ????在Spark中目前實現(xiàn)了兩種類型的分區(qū)函數(shù):HashPartitioner(哈希分區(qū))和RangePartitioner(區(qū)域分區(qū))。需要注意的是partitioner這個屬性只存在于(K, V)類型的RDD中,對于非(K, V)類型的partitioner的值就是None。partitioner函數(shù)既決定了RDD本身的分區(qū)數(shù)量,也可以作為父RDD Shuffle輸出(MapOutPut)中每個分區(qū)進行數(shù)據(jù)切割的依據(jù)。

    ????下面以HashPartitioner為例說明partitioner的功能。

    ?

    3.RDD的四種操作

    3.1創(chuàng)建操作

  • 集合創(chuàng)建操作:RDD可以由內(nèi)部集合來生成,Spark提供了兩類函數(shù)來實現(xiàn):parallelize和makeRDD。

  • 存儲創(chuàng)建操作:Spark整個生態(tài)系統(tǒng)與Hadoop完全兼容,Hadoop支持的文件類型或者數(shù)據(jù)庫類型,它同樣支持。

  • 3.2轉(zhuǎn)換操作

  • 基本轉(zhuǎn)換操作:

    map[U: classTag](f: T => U): RDD(U):將RDD中類型為T的元素一對一地映射為類型為U的元素。

    distinct(): RDD[T]:返回RDD中所有不一樣的元素。

    flatMap[U: classTag](f: T => TraversableOnce[U]): RDD[U]:將RDD中的每一個元素進行一對多轉(zhuǎn)換。


    repartition(numPartitions: Int): RDD[T]:相當(dāng)于coalesce函數(shù)中shuffle=true時的簡易實現(xiàn)。

    coalesce(numPartitions: Int,shuffle: Boolean=false): RDD[T]:對RDD的分區(qū)進行重新分區(qū)。

    randomSplit(weights: Array[Double],seed: Long=System.nanoTime): Array[RDD[T]]:根據(jù)weights權(quán)重將一個RDD切分成多個RDD。

    glom():RDD[Array[T]]:將RDD每一個分區(qū)中類型為T的元素轉(zhuǎn)換成Array[T],這樣每一個分區(qū)就只有一個數(shù)組元素。

    union(other: RDD[T]): RDD[T]:將兩個RDD集合中的數(shù)據(jù)進行合并,不會去重。

    intersection(other: RDD[T]): RDD[T]:返回兩個RDD集合中的數(shù)據(jù)的交集,不含重復(fù)元素。

    intersection(other: RDD[T], partitioner: Partitioner): RDD[T]:同上

    subtract(other: RDD[T]): RDD[T]:返回在主RDD集合中出現(xiàn)但不在other中出現(xiàn)的元素。

    subtract(other: RDD[T], partitioner: Partitioner): RDD[T]:同上

    mapPatitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartiton: Boolean=false): RDD[U]

    mapPatitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartiton: Boolean=false): RDD[U]

    ?

    zip[U: ClassTag](other: RDD[U]): RDD[(T,U)]:將兩個RDD組成Key/Value形式的RDD,但它們的分區(qū)數(shù)量和元素數(shù)量必須相同,否則相同系統(tǒng)會拋出異常

    zipPartitons(參數(shù)分多種情況,不一一列舉了):將多個RDD按照分區(qū)組成新的RDD,分區(qū)數(shù)相同,元素數(shù)沒有要求。

    zipWithIndex(): RDD[(T,Long)]:將RDD中的元素和這個元素的ID組成鍵/值對。

    zipWithUniqueId(): RDD[(T,Long)]:將RDD中的元素和一個唯一的ID組成鍵/值對。

  • 鍵值RDD轉(zhuǎn)換操作

    partitionBy(partitioner: Partitioner): RDD[(K,V)]:根據(jù)partitioner函數(shù)生成新的ShuffledRDD,原RDD重新分區(qū)

    mapValues[U](f: V=>U): RDD[(K,U)]:針對[K,V]中的V進行map操作。

    flatMapValues[U](f: V=>TraversableOnce[U]): RDD[(K,U)]針對[K,V]中的V進行flatMap操作。

    combineByKey(3個方法參數(shù)不同)

    foldByKey(3個方法參數(shù)不同)

    reduceByKey(3個方法參數(shù)不同)

    groupByKey(3個方法參數(shù)不同)

    cogroup(3個方法參數(shù)不同)

    join(3個方法參數(shù)不同)

    leftOuterJoin(3個方法參數(shù)不同)

    rightOuterJoin(3個方法參數(shù)不同)

    subtractByKey(3個方法參數(shù)不同)

  • 3.3控制操作

  • cache(): RDD[T]

    persist(): RDD[T]

    persist(level: StorageLevel): RDD[T]

    在Spark中對RDD進行持久化操作是一項非常重要的功能,可以將RDD持久化在不同層次的存儲介質(zhì)中,以便后續(xù)的操作能夠重復(fù)使用,這對iterative(迭代)和interactive(交互)的應(yīng)用來說會極大的提高性能。

  • checkpoint

    checkpoint接口是將RDD持久化在HDFS中,與persist(如果也持久化在磁盤上)的一個區(qū)別是checkpoint將會切斷此RDD之前的依賴關(guān)系,而persist接口依然保留著RDD的依賴關(guān)系。checkpoint的主要作用有如下兩點:

    (1)如果一個Spark程序會長時間駐留運行(如Spark Streaming一般會7*24小時運行),過長的依賴將會占用很多系統(tǒng)資源,定期將RDD進行checkpoint操作,能夠有效地節(jié)省系統(tǒng)資源。

    (2)維護過長的依賴關(guān)系還會出現(xiàn)一個問題,如果Spark在運行過程中出現(xiàn)節(jié)點失敗的情況,那么RDD進行容錯重算的成本會非常高。

  • 3.4行動操作

    ????行動操作是和轉(zhuǎn)換操作相對應(yīng)的一種對RDD的操作類型,在Spark程序中每調(diào)用一次行動操作,都會觸發(fā)一次Spark的調(diào)度 ? ? 并返回相應(yīng)的結(jié)果。從API來看,行動操作可以分為兩種類型:

  • 行動操作將標(biāo)量或者集合返回給Spark的客戶端程序,比如返回RDD中數(shù)據(jù)集的數(shù)量或者一部分符合條件的數(shù)據(jù)。

    first:返回RDD中的第一個元素。

    count:返回RDD中元素的個數(shù)。

    reduce(f: (T,T)=>T):對RDD中的元素進行二元計算,返回計算結(jié)果。

    collect()/toArray():以集合形式返回RDD的元素。

    take(num: Int):將RDD作為集合,返回集合中[0, num-1]下標(biāo)的元素。

    top(num: Int):按照默認(rèn)的或者指定的排序規(guī)則,返回前num個元素。

    takeOrdered(num: Int):以與top相反的排序規(guī)則,返回前num個元素。

    aggregate[U](zeroValue: U)(seqOp: (U,T)=>U, combOp(U,U)=>U):aggregate行動操作中主要需要提供兩個函數(shù)。一個是seqOp函數(shù),其將RDD(元素類型為T,可以和U為同一類型)中的每一個分區(qū)的數(shù)據(jù)聚合成類型為U的值。另一個函數(shù)combOp將各個分區(qū)聚合起來的值合并在一起得到最終類型為U的返回值。

    fold(zeroValue: T)(op: (T,T)=>T)aggregate的便利接口,op操作既是seqOp操作,也是combOp操作。

    lookup(key: K): Seq[V]:lookup是針對(K,V)類型RDD的行動操作,對于給定的鍵值,返回與此鍵值對應(yīng)的所有值。


  • 行動操作將RDD直接保存到外部文件系統(tǒng)或者數(shù)據(jù)庫中,比如HDFS文件系統(tǒng)中。

    saveAsTextFile(path: String)

    saveAsTextFile(path: String, codec: Class[_<:CompressionCodec])

    saveAsObjectFile(path: String)

    saveAsHadoopFile[F<:OutputFormat[K,V]](path: String)

    saveAsHadoopFile[F<:OutputFormat[K,V]](path: String, codec: Class[_<:CompressionCodec])

    saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: class[_], outputFormatClass: Class[]......)

    saveAsHadoopDataset(conf: JobConf)

    這是舊版本API中提供的七個將RDD存儲到外部介質(zhì)的函數(shù),前六個都是saveAsHadoopDataset的簡易實現(xiàn)版本,僅支持將RDD存儲到HDFS中,而saveAsHadoopDataset的參數(shù)類型是JobConf,所以它還可以將RDD保存到其它數(shù)據(jù)庫中,例如Hbase、MongoDB、Cassandra等。

    Spark針對新版本Hadoop API提供了三個行動操作函數(shù)。

    saveAsNewAPIHadoopFile[F<:NewOutputFormat[K,V]](path: String)(implicit fm: ClassTag[F])

    saveAsNewAPIHadoopFile(path: String, keyClass: Class[_],......)

    saveAsNewAPIHadoopDataset(conf: Configuration)

    前兩個API支持將RDD保存到HDFS中,第三個則支持所有MapReduce兼容的輸入輸出類型。

  • 轉(zhuǎn)載于:https://my.oschina.net/xingkongxia/blog/611056

    總結(jié)

    以上是生活随笔為你收集整理的10.Spark之RDD及编程接口的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。