日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

五-中, Spark 算子 吐血总结(转化+行动算子共三十七个)

發(fā)布時間:2024/3/12 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 五-中, Spark 算子 吐血总结(转化+行动算子共三十七个) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • 五-中, Spark 算子吐血總結
      • 5.1.4.3 RDD 轉換算子(Transformation)
        • 1. Value類型
          • 1.1 `map`
          • 1.2 `mapPartitions`
          • 1.3 `mapPartitionsWithIndex`
          • 1.4 `flatMap`
          • 1.5 `glom`
          • 1.6 `groupBy`
          • 1.7 `filter`
          • 1.8 `sample`
          • 1.9 `distinct`
          • 1.10 `coalesce`
          • 1.11 `repartition`
          • 1.12 `sortBy`
        • 2. 雙Value類型
          • 2.13 `intersection`
          • 2.14 `union`
          • 2.15 `subtract`
          • 2.16 `zip`
        • 3. Key-Value 類型
          • 3.17 partitionBy
          • 3.18 reduceByKey
          • 3.19 groupByKey
          • 3.20 aggregateByKey
          • 3.21 foldByKey
          • 3.22 combineByKey
          • 3.23 sortByKey
          • 3.24 join
          • 3.25 leftOuterJoin
          • 3.26 cogroup
      • 5.1.4.4 RDD 行動算子 (Action)
        • 1. reduce
        • 2. collect
          • 3. count
          • 4. first
          • 5. take
          • 6. takeOrdered
          • 7. aggregate
          • 8. fold
          • 9. countByKey
          • 10. save相關的算子
          • 11. foreach

五-中, Spark 算子吐血總結

5.1.4.3 RDD 轉換算子(Transformation)

什么是算子?

在流處理、交互式查詢中有個常用的概念是“算子”,在英文中被成為“Operation”,在數(shù)學上可以解釋為一個函數(shù)空間到另一個函數(shù)空間上的映射O:X->X,其實就是一個處理單元,往往是指一個函數(shù),在使用算子時往往會有輸入和輸出,算子則完成相應數(shù)據(jù)的轉化,比如:Group、Sort等都是算子。


大方向來說, Spark算子(RDD方法)大致可以分為以下兩類:

  • Transformation 變換/轉換算子 : 這種變換并不觸發(fā)提交作業(yè), 而是完成作業(yè)中間過程處理; Transformation 操作是延遲計算的, 也就是說從一個RDD轉換為另一個RDD的轉換操作不是馬上執(zhí)行, 需要等到有Action操作(行動算子)的時候才會真正觸發(fā)運算;
  • Action 行動算子: 這類算子會觸發(fā)SparkContext 提交Job作業(yè), 并將數(shù)據(jù)輸出到Spark系統(tǒng);
  • 小方向來說, Spark算子大致分為三類:

  • Value數(shù)據(jù)類型的Transformation算子, 這種變換并不觸發(fā)提交作業(yè), 針對處理的數(shù)據(jù)項是Value型的數(shù)據(jù);
  • Key-Value數(shù)據(jù)類型的Transformation算子, 這種變換并不觸發(fā)提交作業(yè), 針對處理的數(shù)據(jù)項是Key-Value型的數(shù)據(jù)對;
  • Action算子, 這類算子會觸發(fā)SparkContext提交Job作業(yè);
  • 1. Value類型

    1.1 map
    函數(shù)簽名函數(shù)說明
    def map[U: ClassTag](f: T => U): RDD[U]將待處理的數(shù)據(jù)逐條進行映射轉換, 這里的轉換可以是類型的轉換, 也可以是值的轉換
    其實就是Scala集合函數(shù)中的map((數(shù)據(jù)變量:數(shù)據(jù)類型) => {對每一條數(shù)據(jù)的映射操作})
    val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4)) val dataRDD1: RDD[Int] = dataRDD.map(num => {num * 2 }) val dataRDD2: RDD[String] = dataRDD1.map(num => {"" + num })

    map的并行計算🌰:
    // 1. rdd的計算一個分區(qū)內(nèi)的數(shù)據(jù)是一個一個執(zhí)行邏輯
    // 只有前面一個數(shù)據(jù)全部的邏輯執(zhí)行完畢后,才會執(zhí)行下一個數(shù)據(jù)。
    // 分區(qū)內(nèi)數(shù)據(jù)的執(zhí)行是有序的。
    // 2. 不同分區(qū)數(shù)據(jù)計算是無序的。

    1.2 mapPartitions
    函數(shù)簽名函數(shù)說明
    def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]將待處理的數(shù)據(jù)以分區(qū)為單位發(fā)送到計算結點進行處理, 這里的處理是指可以進行任意的處理, 哪怕是過濾數(shù)據(jù)
    mapPartition, 傳遞一個迭代器, 返回一個迭代器
    什么, 你的不是迭代器怎么辦? 用List包裝, 再獲取集合的迭代器即可

    栗子: 獲取每個數(shù)據(jù)分區(qū)的最大值

    def main(args: Array[String]): Unit = {//1. 新建配置文件對象val conf: SparkConf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("memoryRDD")//2. 新建sparkContextval sc: SparkContext = new SparkContext(conf)//3. 讀取文件val rdd= sc.makeRDD(List(1,2,3,4,5,6,7,8,9), 5)//由上面講過的內(nèi)存中數(shù)據(jù)的分區(qū)規(guī)則/**分區(qū)號: 數(shù)據(jù)* 0: 1* 1: 2,3* 2: 4,5* 3: 6,7* 4: 8,9*///mapPartitons()的入?yún)⒑统鰠⒍际堑?!!!!//mapOartitions()一次計算的是一個分區(qū)的數(shù)據(jù)val mapRDD:RDD[Int] = rdd.mapPartitions(//iter.max 求出每個分區(qū)的最大值, 是int//但是這個函數(shù)要求出參是迭代器!iter => List(iter.max).iterator)// mapPartitions : 可以以分區(qū)為單位進行數(shù)據(jù)轉換操作// 但是會將整個分區(qū)的數(shù)據(jù)加載到內(nèi)存進行引用// 如果處理完的數(shù)據(jù)是不會被釋放掉,存在對象的引用。// 在內(nèi)存較小,數(shù)據(jù)量較大的場合下,容易出現(xiàn)內(nèi)存溢出。//4. collect 從內(nèi)存中收集數(shù)據(jù), 迭代輸出mapRDD.collect().foreach(println)mapRDD.saveAsTextFile("output")//5. 關閉資源sc.stop() }

    Q: map 和 mapPartitions的區(qū)別 ?

    角度區(qū)別
    1.數(shù)據(jù)處理角度map算子是分區(qū)內(nèi)一個數(shù)據(jù)一個數(shù)據(jù)的執(zhí)行, 類似于串行操作; 而 mapPartitions算子是以分區(qū)為單位進行批處理操作;
    2.功能角度map算子主要目的是將數(shù)據(jù)源中的數(shù)據(jù)進行轉換和改變, 但不會減少或增多數(shù)據(jù); 而mapPatitiions算子需要傳遞一個迭代器,返回一個迭代器, 沒有要求?元素個數(shù)保持不變, 所以可增加或減少數(shù)據(jù);
    3. 性能的角度map算子類似于串行操作, 性能較低; mapPartitions算子類似于批處理, 性能較高; 但是mapPartitions算子會長時間占用內(nèi)存, 那么這樣會導致內(nèi)存可能不夠用, 出現(xiàn)內(nèi)存溢出錯誤, 內(nèi)存有限時, 使用map而不是mapPartitions
    1.3 mapPartitionsWithIndex
    函數(shù)簽名函數(shù)說明
    def mapPartitionsWithIndex[U: ClassTag](f: (Index, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]將待處理的數(shù)據(jù)以分區(qū)為單位發(fā)送到計算節(jié)點進行處理, 這里的處理可以使任意處理哪怕是過濾數(shù)據(jù), 在處理時同時可以獲取當前分區(qū)索引
    入?yún)⑹?分區(qū)索引, 迭代器), 出參是迭代器

    舉個🌰:

    //就記住這一點就會用了 // mapWithPartitionsIndex, 入?yún)⑹?index, iterator), 出參是 iterator def main(args: Array[String]): Unit = {//1. 新建配置文件對象val conf: SparkConf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("memoryRDD")//2. 新建sparkContextval sc: SparkContext = new SparkContext(conf)//3. 讀取文件val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)val rddWithIndex: RDD[Int] = rdd.mapPartitionsWithIndex((index, iterator) => {index match {case 1 => iteratorcase _ => Nil.iterator}})//4. 存儲結果到文件rddWithIndex.collect().foreach(println)rddWithIndex.saveAsTextFile("output")//5. 關閉資源sc.stop()}

    如何直接打印數(shù)據(jù)所在的分區(qū)?

    val mpiRDD = rdd.mapPartitionsWithIndex((index, iter) => {// 1, 2, 3, 4//(0,1)(2,2),(4,3),(6,4)iter.map(num => {(index, num)})})
    1.4 flatMap
    函數(shù)簽名函數(shù)說明
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]將處理的數(shù)據(jù)進行扁平化后再進行映射處理, 扁平映射算子
    入?yún)⑹荰, 出參要求是一個可迭代的集合
    def main(args: Array[String]): Unit = {//1. 新建配置文件對象val conf: SparkConf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("memoryRDD")//2. 新建sparkContextval sc: SparkContext = new SparkContext(conf)//3. 讀取文件val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))val rddMap = rdd.flatMap(dat => {dat match {case i: Int => List(i)case j: List[_] => j}})//4. 存儲結果到文件rddMap.saveAsTextFile("output")rddMap.collect().foreach(println)//5. 關閉資源sc.stop() }
    1.5 glom
    函數(shù)簽名函數(shù)說明
    def glom(): RDD[Array[T]]同一個分區(qū)的數(shù)據(jù)直接轉換為相同類型的內(nèi)存數(shù)組進行處理, 分區(qū)不變
    將同一個分區(qū)里的元素合并到一個array中

    計算所有分區(qū)最大值求和(分區(qū)內(nèi)取最大值,分區(qū)間最大值求和)

    def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - glomval rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)// 【1,2】,【3,4】// 【2】,【4】// 【6】val glomRDD: RDD[Array[Int]] = rdd.glom()val maxRDD: RDD[Int] = glomRDD.map(array => {array.max})println(maxRDD.collect().sum)sc.stop()} }
    1.6 groupBy
    函數(shù)簽名函數(shù)說明
    def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]將數(shù)據(jù)根據(jù)指定的規(guī)則進行分組, 分區(qū)默認不變, 但是數(shù)據(jù)會被打亂重新組合, 這就是shuffle. 極限情況下, 數(shù)據(jù)可能被分在同一個分區(qū)中;
    groupBy(f:elem => 對 elem的操作)

    一個組的數(shù)據(jù)在一個分區(qū)中, 但是并不是說一個分區(qū)中只有一個組;

  • List(“Hello”, “hive”, “hbase”, “Hadoop”)根據(jù)單詞首寫字母進行分組。
  • sc.groupBy(_.charAt(0))
  • 從服務器日志數(shù)據(jù) apache.log 中獲取每個時間段訪問量。
  • //1. 配置文件對象val conf = new SparkConf();conf.setAppName("groupby")conf.setMaster("local[*]")//2. sparkcontextval sc = new SparkContext(conf)//3. 創(chuàng)建RDDval rddString: RDD[String] = sc.textFile("datas/apache.log")//4.val hourRDD: RDD[(String, Int)] = rddString.map(line => {//取出時間val strArr = line.split(" ")val time = strArr(3)//對time格式化//指定格式化模式val sdfOfDate = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")//格式化時間val date: Date = sdfOfDate.parse(time)//指定格式化模式val sdfOfHour = new SimpleDateFormat("HH")//格式化時間, 獲取到了小時val hour: String = sdfOfHour.format(date)(hour, 1)})//5. 分組val hourOfGroupedRDD: RDD[(String, Iterable[(String, Int)])] = hourRDD.groupBy(_._1)//6. 匯總計算val res: RDD[(String, Int)] = hourOfGroupedRDD.map {case (hour: String, iter: Iterable[(String, Int)]) =>(hour, iter.size)}res.collect().foreach(println)

  • WordCount
  • 1.7 filter
    函數(shù)簽名函數(shù)說明
    def filter(f: T => Boolean): RDD[T]按照規(guī)則篩選過濾; 處理后, 分區(qū)不變, 但是分區(qū)內(nèi)的數(shù)據(jù)可能不均衡, 生產(chǎn)環(huán)境, 可能會出現(xiàn)數(shù)據(jù)傾斜;
    符合規(guī)則的數(shù)據(jù)保留,不符合規(guī)則的數(shù)據(jù)丟棄

    簡單例子:

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1) val dataRDD1 = dataRDD.filter(_%2 == 0)

    從服務器日志數(shù)據(jù) apache.log 中獲取 2015 年 5 月 17 日的請求路徑

    //1. 配置文件對象val conf = new SparkConf();conf.setAppName("groupby")conf.setMaster("local[*]")//2. sparkcontextval sc = new SparkContext(conf)//3. 創(chuàng)建RDDval rddString: RDD[String] = sc.textFile("datas/apache.log")//4.val timeAndPathRDD: RDD[(String, String)] = rddString.map(line => {//取出時間val strArr = line.split(" ")val timeAndPathTuple: Tuple2[String, String] = (strArr(3), strArr(6))//對time格式化//指定格式化模式val sdfOfDate = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")//格式化時間val date: Date = sdfOfDate.parse(timeAndPathTuple._1)//指定格式化模式val sdfOfTime = new SimpleDateFormat("dd/MM/yyyy")//格式化時間, 獲取到了小時val time: String = sdfOfTime.format(date)(time, timeAndPathTuple._2)})//過濾val res = timeAndPathRDD.filter(_._1.equals("17/05/2015"))res.collect().foreach(println)
    1.8 sample
    函數(shù)簽名函數(shù)說明
    def sample(withReplacement: Boolean,fraction:Double,seed: Long = Utils.random.nextLong): RDD[T]根據(jù)指定的規(guī)則從數(shù)據(jù)集中抽取數(shù)據(jù)
    withReplacement是否放回(抽獎)
    fraction概率
    seed種子

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)// 抽取數(shù)據(jù)不放回(伯努利算法)// 伯努利算法:又叫 0、1 分布。例如扔硬幣,要么正面,要么反面。// 具體實現(xiàn):根據(jù)種子和隨機算法算出一個數(shù)和第二個參數(shù)設置幾率比較,小于第二個參數(shù)要,大于不要// 第一個參數(shù):抽取的數(shù)據(jù)是否放回,false:不放回// 第二個參數(shù):抽取的幾率,范圍在[0,1]之間,0:全不取;1:全取;// 第三個參數(shù):隨機數(shù)種子val dataRDD1 = dataRDD.sample(false, 0.5)// 抽取數(shù)據(jù)放回(泊松算法)// 第一個參數(shù):抽取的數(shù)據(jù)是否放回,true:放回;false:不放回// 第二個參數(shù):重復數(shù)據(jù)的幾率,范圍大于等于 0.表示每一個元素被期望抽取到的次數(shù)// 第三個參數(shù):隨機數(shù)種子val dataRDD2 = dataRDD.sample(true, 2)
    1.9 distinct
    函數(shù)簽名函數(shù)說明
    def distinct()(implicit ord: Ordering[T] = null): RDD[T]將數(shù)據(jù)集中重復的數(shù)據(jù)去重
    def distinct(numPartitions: Int)(implicit ord:Ordering[T] = null): RDD[T]
    val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1) val dataRDD1 = dataRDD.distinct() val dataRDD2 = dataRDD.distinct(2)

    1.10 coalesce
    函數(shù)簽名函數(shù)說明
    def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]根據(jù)數(shù)據(jù)量縮減分區(qū),用于大數(shù)據(jù)集過濾后, 提高小數(shù)據(jù)集的執(zhí)行效率
    當spark程序中, 存在過多的小任務的時候, 可以通過coalesce方法, 收縮合并分區(qū), 減少分區(qū)的個數(shù), 減少任務調度成本

    注意: coalesce 默認不會打亂分區(qū)中的數(shù)據(jù). 縮減分區(qū)主要是單純的進行分區(qū)間的合并, 為了避免可能的數(shù)據(jù)傾斜, 此方法的參數(shù) shuffle = true, 通過shuffle去平衡數(shù)據(jù);

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),6) val dataRDD1 = dataRDD.coalesce(2)

    Q: 如何擴大分區(qū)??

    • 其實, 直接用下面的這個方法就可以了.
    1.11 repartition
    函數(shù)簽名函數(shù)說明
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]該操作內(nèi)部其實執(zhí)行的是coalesce, 參數(shù)shuffle的默認值為true
    無論是將分區(qū)數(shù)多的RDD 轉換為分區(qū)數(shù)少的 RDD,還是將分區(qū)數(shù)少的 RDD 轉換為分區(qū)數(shù)多的 RDD,repartition操作都可以完成,因為無論如何都會經(jīng) shuffle 過程。

    [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-EVOB3uZv-1646825459606)(2022-03-08-16-45-38.png)]

    coalesce 和 repartition 區(qū)別?

    1.12 sortBy
    函數(shù)簽名函數(shù)說明
    def sortBy[K](f: (T) => K, ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]在排序之前,可以將數(shù)據(jù)通過 f 函數(shù)進行處理,之后按照 f 函數(shù)處理的結果進行排序,默認為升序排列
    排序后新產(chǎn)生的 RDD 的分區(qū)數(shù)與原 RDD 的分區(qū)數(shù)一致。中間存在 shuffle 的過程

    2. 雙Value類型

    2.13 intersection
    函數(shù)簽名函數(shù)說明
    def intersection(other: RDD[T]): RDD[T]對源RDD和參數(shù)RDD求交集后返回一個新的RDD
    2.14 union
    函數(shù)簽名函數(shù)說明
    def union(other: RDD[T]): RDD[T]源RDD參數(shù)RDD求并集后返回一個新的RDD
    2.15 subtract
    函數(shù)簽名函數(shù)說明
    def subtract(other: RDD[T]): RDD[T]以一個 RDD 元素為主,去除兩個 RDD 中重復元素,將其他元素保留下來
    2.16 zip
    函數(shù)簽名函數(shù)說明
    def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]將兩個 RDD 中的元素,以鍵值對(拉鏈)的形式進行合并.
    鍵值對中的 Key 為第 1 個 RDD中的元素,Value 為第 2 個 RDD 中的相同位置的元素。
    def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - 雙Value類型// 交集,并集和差集要求兩個數(shù)據(jù)源數(shù)據(jù)類型保持一致// 拉鏈操作兩個數(shù)據(jù)源的類型可以不一致val rdd1 = sc.makeRDD(List(1,2,3,4))val rdd2 = sc.makeRDD(List(3,4,5,6))val rdd7 = sc.makeRDD(List("3","4","5","6"))// 交集 : 【3,4】val rdd3: RDD[Int] = rdd1.intersection(rdd2)//val rdd8 = rdd1.intersection(rdd7)println(rdd3.collect().mkString(","))// 并集 : 【1,2,3,4,3,4,5,6】val rdd4: RDD[Int] = rdd1.union(rdd2)println(rdd4.collect().mkString(","))// 差集 : 【1,2】val rdd5: RDD[Int] = rdd1.subtract(rdd2)println(rdd5.collect().mkString(","))// 拉鏈 : 【1-3,2-4,3-5,4-6】val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)val rdd8 = rdd1.zip(rdd7)println(rdd6.collect().mkString(","))sc.stop()}

    思考一個問題:如果兩個 RDD 數(shù)據(jù)類型不一致怎么辦?
    思考一個問題:如果兩個 RDD 數(shù)據(jù)分區(qū)不一致怎么辦?
    思考一個問題:如果兩個 RDD 分區(qū)數(shù)據(jù)數(shù)量不一致怎么辦?

  • 交集,并集和差集要求兩個數(shù)據(jù)源數(shù)據(jù)類型保持一致
  • 拉鏈操作兩個數(shù)據(jù)源的類型可以不一致
  • [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-eyLK105S-1646825459608)(2022-03-08-17-00-31.png)]
  • 3. Key-Value 類型

    3.17 partitionBy
    函數(shù)簽名函數(shù)說明
    def partitionBy(partitioner: Partitioner): RDD[(K, V)]將數(shù)據(jù)按照Partitioner重新進行分區(qū); Spark 默認的分區(qū)器是 HashPartitioner

    • 思考一個問題:如果重分區(qū)的分區(qū)器和當前 RDD 的分區(qū)器一樣怎么辦?

      • 產(chǎn)生的結果都是一樣的
    • 思考一個問題:Spark 還有其他分區(qū)器嗎?

      • RangePartitioner 一般在排序中使用
        思考一個問題:如果想按照自己的方法進行數(shù)據(jù)分區(qū)怎么辦?
    • 自己寫一個分區(qū)器(待補充)
      思考一個問題:哪那么多問題?

    3.18 reduceByKey
    函數(shù)簽名函數(shù)說明
    def reduceByKey(func: (V, V) => V): RDD[(K, V)]可以將數(shù)據(jù)按照相同的Key對Value進行聚合
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.reduceByKey(_+_) val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

    Q: redeceByKey 和 groupBykey的區(qū)別?

    從shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前對分區(qū)內(nèi)相同 key 的數(shù)據(jù)進行預聚合(combine)功能,這樣會減少落盤的數(shù)據(jù)量,而 groupByKey 只是進行分組,不存在數(shù)據(jù)量減少的問題,reduceByKey 性能比較高。
    從功能的角度:reduceByKey 其實包含分組和聚合的功能。GroupByKey 只能分組,不能聚合,所以在分組聚合的場合下,推薦使用 reduceByKey,如果僅僅是分組而不需要聚合。那么還是只能使用 groupByKey

    3.19 groupByKey
    函數(shù)簽名函數(shù)說明
    def groupByKey(): RDD[(K, Iterable[V])]將數(shù)據(jù)源的數(shù)據(jù)根據(jù)key對value進行分組
    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.groupByKey() val dataRDD3 = dataRDD1.groupByKey(2) val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
    3.20 aggregateByKey
    函數(shù)簽名函數(shù)說明
    def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]將數(shù)據(jù)根據(jù)不同的規(guī)則進行分區(qū)內(nèi)計算和分區(qū)間計算

    思考一個問題:分區(qū)內(nèi)計算規(guī)則和分區(qū)間計算規(guī)則相同怎么辦?

    3.21 foldByKey
    函數(shù)簽名函數(shù)說明
    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]當分區(qū)內(nèi)計算規(guī)則和分區(qū)間計算規(guī)則相同時,aggregateByKey 就可以簡化為 foldByKey
    3.22 combineByKey
    函數(shù)簽名函數(shù)說明
    def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)]最通用的對 key-value 型 rdd 進行聚集操作的聚集函數(shù)(aggregation function)
    類似于arrregate(), combineByKey() 允許用戶返回值的類型與輸入不一致
    3.23 sortByKey
    函數(shù)簽名函數(shù)說明
    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]在一個(K,V)的 RDD 上調用,K 必須實現(xiàn) Ordered 接口(特質),返回一個按照 key 進行排序
    3.24 join
    函數(shù)簽名函數(shù)說明
    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]在類型為(K,V)和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素連接在一起的
    (K,(V,W))的 RDD
    3.25 leftOuterJoin
    函數(shù)簽名函數(shù)說明
    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]類似于 SQL 語句的左外連接
    3.26 cogroup
    函數(shù)簽名函數(shù)說明
    def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]在類型為(K,V)和(K,W)的 RDD 上調用,返回一個(K,(Iterable,Iterable))類型的 RDD

    5.1.4.4 RDD 行動算子 (Action)

    1. reduce

    函數(shù)簽名函數(shù)說明
    def reduce(f: (T, T) => T): T聚集 RDD 中的所有元素,先聚合分區(qū)內(nèi)數(shù)據(jù),再聚合分區(qū)間數(shù)據(jù)

    2. collect

    函數(shù)簽名函數(shù)說明
    def collect(): Array[T]在驅動程序中,以數(shù)組 Array 的形式返回數(shù)據(jù)集的所有元素
    3. count
    函數(shù)簽名函數(shù)說明
    def count(): Long返回 RDD 中元素的個數(shù)
    4. first
    函數(shù)簽名函數(shù)說明
    def first(): T返回 RDD 中的第一個元素
    5. take
    函數(shù)簽名函數(shù)說明
    def take(num: Int): Array[T]返回一個由 RDD 的前 n 個元素組成的數(shù)組
    6. takeOrdered
    函數(shù)簽名函數(shù)說明
    def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]返回該 RDD 排序后的前 n 個元素組成的數(shù)組
    7. aggregate
    函數(shù)簽名函數(shù)說明
    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U分區(qū)的數(shù)據(jù)通過初始值和分區(qū)內(nèi)的數(shù)據(jù)進行聚合,然后再和初始值進行分區(qū)間的數(shù)據(jù)聚合
    8. fold
    函數(shù)簽名函數(shù)說明
    def fold(zeroValue: T)(op: (T, T) => T): T統(tǒng)計每種 key 的個數(shù)
    9. countByKey
    函數(shù)簽名函數(shù)說明
    def countByKey(): Map[K, Long]折疊操作,aggregate 的簡化版操作
    10. save相關的算子
    函數(shù)簽名函數(shù)說明
    def saveAsTextFile(path: String): Unit
    def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit將數(shù)據(jù)保存到不同格式的文件中
    def saveAsObjectFile(path: String): Unit
    def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit
    11. foreach
    函數(shù)簽名函數(shù)說明

    總結

    以上是生活随笔為你收集整理的五-中, Spark 算子 吐血总结(转化+行动算子共三十七个)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。