日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark 常用算子详解(转换算子、行动算子、控制算子)

發布時間:2024/3/12 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark 常用算子详解(转换算子、行动算子、控制算子) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark簡介

Spark是專為大規模數據處理而設計的快速通用的計算引擎;
Spark擁有Hadoop MapReduce所具有的優點,但是運行速度卻比MapReduce有很大的提升,特別是在數據挖掘、機器學習等需要迭代的領域可提升100x倍的速度:

  • Spark是基于內存進行數據處理的,MapReduce是基于磁盤進行數據處理的;
  • Spark中具有DAG有向無環圖,DAG有向無環圖在此過程中減少了shuffle以及落地磁盤的次數;

  • Spark流程

  • Spark Application的運行環境:創建SparkConf對象
      • 可以設置Application name;
      • 可以設置運行模式及資源需求;
  • 創建SparkContext對象;
      • SparkContext向資源管理器申請運行Executor資源,并啟動StandaloneExecutorbackend;
      • Executor向SparkContext申請Task;
      • SparkContext將程序分發給Executor;
      • SparkContext構建成DAG圖,將DAG圖分解成Stage、將Taskset發送給Task Scheduler,最后由Task Scheduler將Task發送給Executor運行;
      • Task在Excutor上運行,運行完釋放所有的資源;
  • 基于Spark的上下文創建一個RDD,對RDD進行處理;
  • 應用程序中y有Action累算子來觸發Transformation類算子執行;
  • 關閉Spark上下文對象SparkContext;

  • value 類型

    細類型算子
    輸入分區與輸出分區一對一型map flatMap mapPartitions glom
    輸入分區與輸出分區多對一型union cartesain
    輸入分區與輸出分區多對多型groupBy
    輸出分區為輸入分區子集型filter distinct substract sample takeSample
    Cache型cache persist

    key-value類型

    細類型算子
    輸入分區與輸出分區一對一mapValues
    對單個RDD或兩個RDD聚集單個RDD聚集: combineByKey reduceByKey partitionBy; 兩個RDD聚集: Cogroup
    連接join leftOutJoin和 rightOutJoin

    Action算子

    細類型算子
    無輸出foreach
    HDFSsaveAsTextFile saveAsObjectFile
    Scala集合和數據類型collect collectAsMap reduceByKeyLocally lookup count top reduce fold aggregate

    轉換算子(Transformations)

    不觸發提交作業,只是完成作業中間過程處理;Transformation 操作是延遲計算的,也就是說從一個RDD 轉換生成另一個 RDD 的轉換操作不是馬上執行,需要等到有 Action 操作的時候才會真正觸發運算。Transformation參數類型為value或者key-value的形式;

    轉換算子是延遲執行的,也叫懶加載執行

    map

    將原來RDD的每個數據通過map中的用戶自定義函數映射為一個新的元素,源碼中map算子相當于初始化一個RDD --------- f(x)=x -> y

    • scala源碼 def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

    flatMap

    將原來 RDD 中的每個元素通過函數 f 轉換為新的元素,并將生成的 RDD 的每個集合中的元素合并為一個集合,內部創建 FlatMappedRDD(this,sc.clean(f))。 ------ f: T => TraversableOnce[U]

    • scala源碼

      def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}
    • 例子

      lines.flatMap{lines => {lines.split(" ")

    mapPartitions

    mapPartitions 函 數 獲 取 到 每 個 分 區 的 迭 代器,在 函 數 中 通 過 這 個 分 區 整 體 的 迭 代 器 對整 個 分 區 的 元 素 進 行 操 作。 內 部 實 現 是 生 成 -------f (iter)=>iter.f ilter(_>=3)

    • scala源碼

      def filter(f: T => Boolean): RDD[T] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[T, T](this,(context, pid, iter) => iter.filter(cleanF),preservesPartitioning = true)}

    glom

    glom函數將每個分區形成一個數組,內部實現是返回的GlommedRDD。 圖4中的每個方框代表一個RDD分區。圖4中的方框代表一個分區。 該圖表示含有V1、 V2、 V3的分區通過函數glom形成一數組Array[(V1),(V2),(V3)]

    • scala源碼

      def glom(): RDD[Array[T]] = withScope {new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))}

    union

    使用 union 函數時需要保證兩個 RDD 元素的數據類型相同,返回的 RDD 數據類型和被合并的 RDD 元素數據類型相同,并不進行去重操作,保存所有元素。如果想去重可以使用 distinct()(并集)

    • scala源碼

      def union(other: RDD[T]): RDD[T] = withScope {sc.union(this, other)}

    cartesian

    對 兩 個 RDD 內 的 所 有 元 素 進 行 笛 卡 爾 積 操 作。 操 作 后, 內 部 實 現 返 回CartesianRDD。圖6中左側大方框代表兩個 RDD,大方框內的小方框代表 RDD 的分區。右側大方框代表合并后的 RDD,大方框內的小方框代表分區。圖6中的大方框代表RDD,大方框中的小方框代表RDD分區。

    • scala源碼

      def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {new CartesianRDD(sc, this, other)}

    groupBy

    將元素通過函數生成相應的 Key,數據就轉化為 Key-Value 格式,之后將 Key 相同的元素分為一組。

    函數實現如下:

    1)將用戶函數預處理:

    val cleanF = sc.clean(f)

    2)對數據 map 進行函數操作,最后再進行 groupByKey 分組操作。

    this.map(t => (cleanF(t), t)).groupByKey(p)
    • scala源碼

      def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {groupBy[K](f, defaultPartitioner(this))}

    distinct

    對數據進行去重

    • scala源碼

      /*** Return a new RDD containing the distinct elements in this RDD.*/def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)}/*** Return a new RDD containing the distinct elements in this RDD.*/def distinct(): RDD[T] = withScope {distinct(partitions.length)}

    subtract

    subtract相當于進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素;

    • scala源碼

      /*** Return an RDD with the elements from `this` that are not in `other`.** Uses `this` partitioner/partition size, because even if `other` is huge, the resulting* RDD will be <= us.*/def subtract(other: RDD[T]): RDD[T] = withScope {subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))}/*** Return an RDD with the elements from `this` that are not in `other`.*/def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {subtract(other, new HashPartitioner(numPartitions))}/*** Return an RDD with the elements from `this` that are not in `other`.*/def subtract(other: RDD[T],p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {if (partitioner == Some(p)) {// Our partitioner knows how to handle T (which, since we have a partitioner, is// really (K, V)) so make a new Partitioner that will de-tuple our fake tuplesval p2 = new Partitioner() {override def numPartitions: Int = p.numPartitionsoverride def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)}

    sample

    sample 將 RDD 這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。內部實現是生成 SampledRDD(withReplacement, fraction, seed)。

    函數參數設置:

    withReplacement=true,表示有放回的抽樣。 withReplacement=false,表示無放回的抽樣。
    • scala源碼
    def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] = {require(fraction >= 0,s"Fraction must be nonnegative, but got ${fraction}")withScope {require(fraction >= 0.0, "Negative fraction value: " + fraction)if (withReplacement) {new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)} else {new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)}}}

    takeSample

    takeSample()函數和上面的sample函數是一個原理,但是不使用相對比例采樣,而是按設定的采樣個數進行采樣,同時返回結果不再是RDD,而是相當于對采樣后的數據進行Collect(),返回結果集為單機的數組

    • scala源碼
    //返回集為數組def takeSample(withReplacement: Boolean,num: Int,seed: Long = Utils.random.nextLong): Array[T] = withScope {val numStDev = 10.0require(num >= 0, "Negative number of elements requested")require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),"Cannot support a sample size > Int.MaxValue - " +s"$numStDev * math.sqrt(Int.MaxValue)")if (num == 0) {new Array[T](0)} else {val initialCount = this.count()if (initialCount == 0) {new Array[T](0)} else {val rand = new Random(seed)if (!withReplacement && num >= initialCount) {Utils.randomizeInPlace(this.collect(), rand)} else {val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,withReplacement)var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()// If the first sample didn't turn out large enough, keep trying to take samples;// this shouldn't happen often because we use a big multiplier for the initial sizevar numIters = 0while (samples.length < num) {logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()numIters += 1}Utils.randomizeInPlace(samples, rand).take(num)}}}}

    mapValues

    針對(key, Value)型數據中的value進行map操作,而不對key進行處理

    • scala源碼
    /*** Pass each value in the key-value pair RDD through a map function without changing the keys;* this also retains the original RDD's partitioning.*/def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {val cleanF = self.context.clean(f)new MapPartitionsRDD[(K, U), (K, V)](self,(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },preservesPartitioning = true)}

    combineByKey

    • scala源碼
    def combineByKey[C](createCombiner: V => C, //C不存在的情況下,比如通過V創建seq CmergeValue: (C, V) => C, //當C已經存在的情況下需要merge,比如把item V加入到seq C中,或者疊加mergeCombiners: (C, C) => C, //合并兩個Cpartitioner: Partitioner, //Partitioner,Shuffle時需要的PartitionermapSideCombine: Boolean = true, //為了減小傳輸量,很多 combine 可以在 map端先做,比如疊加,可以先在一個 partition 中把所有相同的 key 的 value 疊加,再 shuff le。//傳輸需要序列化,用戶可以自定義序列化類serializer: Serializer = null): RDD[(K, C)] = self.withScope {combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,partitioner, mapSideCombine, serializer)(null)}

    reduceByKey

    reduceByKey是比combineByKey更簡單的一種情況,只是兩個值合并成一個值 -------- *(Int,Int V) >> (Int, IntC)*
    • scala源碼
    /*** Merge the values for each key using an associative and commutative reduce function. This will* also perform the merging locally on each mapper before sending results to a reducer, similarly* to a "combiner" in MapReduce.*/def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)}/*** Merge the values for each key using an associative and commutative reduce function. This will* also perform the merging locally on each mapper before sending results to a reducer, similarly* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.*/def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {reduceByKey(new HashPartitioner(numPartitions), func)}/*** Merge the values for each key using an associative and commutative reduce function. This will* also perform the merging locally on each mapper before sending results to a reducer, similarly* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/* parallelism level.*/def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {reduceByKey(defaultPartitioner(self), func)}

    partitionBy

    partitionBy函數對RDD進行分區操作;------ partitionBy(partitioner:Partitioner)

    • scala源碼
    /*** Return a copy of the RDD partitioned using the specified partitioner.*/def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {throw new SparkException("HashPartitioner cannot partition array keys.")}if (self.partitioner == Some(partitioner)) {self} else {new ShuffledRDD[K, V, V](self, partitioner)}}

    cogroup

    cogroup函數將兩個RDD進行協同劃分,對兩個RDD中的key-valuel類型的元素,每個RDD相同key的元素風別聚合為一個集合,并且返回兩個RDD中對應Key的元素集合的迭代器

    • scala源碼
    def cogroup[W1, W2, W3](other1: RDD[(K, W1)],other2: RDD[(K, W2)],other3: RDD[(K, W3)],partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {throw new SparkException("HashPartitioner cannot partition array keys.")}val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)cg.mapValues { case Array(vs, w1s, w2s, w3s) =>(vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W1]],w2s.asInstanceOf[Iterable[W2]],w3s.asInstanceOf[Iterable[W3]])}}

    join

    join 對兩個需要連接的 RDD 進行 cogroup函數操作,將相同 key 的數據能夠放到一個分區,在 cogroup 操作之后形成的新 RDD 對每個key 下的元素進行笛卡爾積的操作,返回的結果再展平,對應 key 下的所有元組形成一個集合。最后返回 RDD[(K, (V, W))]。

    • scala源碼
    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {this.cogroup(other, partitioner).flatMapValues( pair =>for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w))}

    sortyByKey(sortBy)

    作用再(Key, Value)格式的數據上,根據Key進行升序或降序排序

    • scala源碼
    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] = self.withScope{val part = new RangePartitioner(numPartitions, self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)}def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {this.keyBy[K](f).sortByKey(ascending, numPartitions).values}

    Action算子(行動算子)

    本質上在Action算子中通過SparkContext觸發SparkContext提交job作業。Action 算子會觸發 Spark 提交作業(Job),并將數據輸出 Spark系統。

    foreach

    foreach對RDD中的每個元素都應用f函數操作,不返回RDD和Array,而返回Uint;(遍歷)

    • scala源碼
    def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}

    saveAsTextFile

    函數將數據輸出,存儲到HDFS的制定目錄下

    saveAsObjectFile

    將風趣中的每10個元素組成一個Array,然后將這個Array序列化,映射為Null,BytesWritable(Y)的元素,寫入HDFS為SequenceFile的格式;

    map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))

    colloect

    相當于toArray,collect將分布式的RDD返回為一個單機的scala Array數組,在這個數組上運用scala的函數式操作;


    左側方框代表 RDD 分區,右側方框代表單機內存中的數組。通過函數操作,將結果返回到 Driver 程序所在的節點,以數組形式存儲。

    collectAsMap

    collectAsMap對(K,V)型的RDD數據返回一個單機HashMap;
    對于重復K的RDD元素,后面的元素覆蓋前面的元素

    lookup

    Lookup函數對(Key,Value)型的RDD操作,返回指定Key對應的元素形成的Seq。 這個函數處理優化的部分在于,如果這個RDD包含分區器,則只會對應處理K所在的分區,然后返回由(K,V)形成的Seq。 如果RDD不包含分區器,則需要對全RDD元素進行暴力掃描處理,搜索指定K對應的元素。 ------- lookup(key:K):Seq[V]

    count

    count(計數器)返回整個RDD的元素個數

    defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum

    top

    top可返回最大的k個元素

    top(num:Int)(implicit ord:Ordering[T]):Array[T]
    • top返回最大的k個元素。
    • take返回最小的k個元素。
    • takeOrdered返回最小的k個元素,并且在返回的數組中保持元素的順序。
    • first相當于top(1)返回整個RDD中的前k個元素,可以定義排序的方式Ordering[T],返回的是一個含前k個元素的數組.

    reduce

    reduce函數相當于對RDD中的元素進行reduceLeft函數的操作; ---- Some(iter.reduceLeft(cleanF))

    reduceLeft先對兩個元素<K,V>進行reduce函數操作,然后將結果和迭代器取出的下一個元素<k,V>進行reduce函數操作,直到迭代器遍歷完所有元素,得到最后結果。在RDD中,先對每個分區中的所有元素<K,V>的集合分別進行reduceLeft。 每個分區形成的結果相當于一個元素<K,V>,再對這個結果集合進行reduceleft操作;

    fold

    fold和reduce的原理相同,但是與reduce不同,相當于每個reduce時,迭代器取的第一個元素是zeroValue;

    控制算子

    控制算子有三種,cache,persist,checkpoint,以上算子都可以將RDD持久化,持久化的單位是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系;

    cache

    cache默認將RDD的數據持久化到內存中,相當與persist(MEMORY_ONLY)函數的功能;

    chche () = persist()=persist(StorageLevel.Memory_Only)

    persist

    可以指定持久話的級別,最常用的是MEMORY_ONLTY和MEMORY_AND_DISK;"_2"表示副本數;

    持久化有如下級別:

    cache和persist的注意事項

    • cache和persist都是懶執行,必須有一個action類算子觸發執行。
    • cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數據了。持久化的單位是partition
    • cache和persist算子后不能立即緊跟action算子。

    checkpoint

    checkpoint將RDD持久化到磁盤,還可以切斷RDD之間的依賴關系

    • checkpoint 的執行原理:

      • 當RDD的job執行完畢后,會從finalRDD從后往前回溯。
      • 當回溯到某一個RDD調用了checkpoint方法,會對當前的RDD做一個標記。
      • Spark框架會自動啟動一個新的job,重新計算這個RDD的數據,將數據持久化到HDFS上。
    • 優化:對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job只需要將內存中的數據拷貝導HDFS上就可以,省去重新計算這一步;

    例如:

    SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("checkpoint");JavaSparkContext sc = new JavaSparkContext(conf);sc.setCheckpointDir("./checkpoint");JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));parallelize.checkpoint();parallelize.count();sc.stop();

    總結

    以上是生活随笔為你收集整理的Spark 常用算子详解(转换算子、行动算子、控制算子)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。