文章目錄
- 五-中, Spark 算子吐血總結
- 5.1.4.3 RDD 轉換算子(Transformation)
- 1. Value類型
- 1.1 `map`
- 1.2 `mapPartitions`
- 1.3 `mapPartitionsWithIndex`
- 1.4 `flatMap`
- 1.5 `glom`
- 1.6 `groupBy`
- 1.7 `filter`
- 1.8 `sample`
- 1.9 `distinct`
- 1.10 `coalesce`
- 1.11 `repartition`
- 1.12 `sortBy`
- 2. 雙Value類型
- 2.13 `intersection`
- 2.14 `union`
- 2.15 `subtract`
- 2.16 `zip`
- 3. Key-Value 類型
- 3.17 partitionBy
- 3.18 reduceByKey
- 3.19 groupByKey
- 3.20 aggregateByKey
- 3.21 foldByKey
- 3.22 combineByKey
- 3.23 sortByKey
- 3.24 join
- 3.25 leftOuterJoin
- 3.26 cogroup
- 5.1.4.4 RDD 行動算子 (Action)
- 1. reduce
- 2. collect
- 3. count
- 4. first
- 5. take
- 6. takeOrdered
- 7. aggregate
- 8. fold
- 9. countByKey
- 10. save相關的算子
- 11. foreach
五-中, Spark 算子吐血總結
5.1.4.3 RDD 轉換算子(Transformation)
什么是算子?
在流處理、交互式查詢中有個常用的概念是“算子”,在英文中被成為“Operation”,在數(shù)學上可以解釋為一個函數(shù)空間到另一個函數(shù)空間上的映射O:X->X,其實就是一個處理單元,往往是指一個函數(shù),在使用算子時往往會有輸入和輸出,算子則完成相應數(shù)據(jù)的轉化,比如:Group、Sort等都是算子。
從大方向來說, Spark算子(RDD方法)大致可以分為以下兩類:
Transformation 變換/轉換算子 : 這種變換并不觸發(fā)提交作業(yè), 而是完成作業(yè)中間過程處理; Transformation 操作是延遲計算的, 也就是說從一個RDD轉換為另一個RDD的轉換操作不是馬上執(zhí)行, 需要等到有Action操作(行動算子)的時候才會真正觸發(fā)運算;Action 行動算子: 這類算子會觸發(fā)SparkContext 提交Job作業(yè), 并將數(shù)據(jù)輸出到Spark系統(tǒng);
從小方向來說, Spark算子大致分為三類:
Value數(shù)據(jù)類型的Transformation算子, 這種變換并不觸發(fā)提交作業(yè), 針對處理的數(shù)據(jù)項是Value型的數(shù)據(jù);Key-Value數(shù)據(jù)類型的Transformation算子, 這種變換并不觸發(fā)提交作業(yè), 針對處理的數(shù)據(jù)項是Key-Value型的數(shù)據(jù)對;Action算子, 這類算子會觸發(fā)SparkContext提交Job作業(yè);
1. Value類型
1.1 map
函數(shù)簽名函數(shù)說明
| def map[U: ClassTag](f: T => U): RDD[U] | 將待處理的數(shù)據(jù)逐條進行映射轉換, 這里的轉換可以是類型的轉換, 也可以是值的轉換 |
| 其實就是Scala集合函數(shù)中的map((數(shù)據(jù)變量:數(shù)據(jù)類型) => {對每一條數(shù)據(jù)的映射操作}) |
val dataRDD
: RDD
[Int] = sparkContext
.makeRDD
(List
(1,2,3,4))
val dataRDD1
: RDD
[Int] = dataRDD
.map
(num
=> {num
* 2
})
val dataRDD2
: RDD
[String] = dataRDD1
.map
(num
=> {"" + num
})
map的并行計算🌰:
// 1. rdd的計算一個分區(qū)內(nèi)的數(shù)據(jù)是一個一個執(zhí)行邏輯
// 只有前面一個數(shù)據(jù)全部的邏輯執(zhí)行完畢后,才會執(zhí)行下一個數(shù)據(jù)。
// 分區(qū)內(nèi)數(shù)據(jù)的執(zhí)行是有序的。
// 2. 不同分區(qū)數(shù)據(jù)計算是無序的。
1.2 mapPartitions
函數(shù)簽名函數(shù)說明
| def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] | 將待處理的數(shù)據(jù)以分區(qū)為單位發(fā)送到計算結點進行處理, 這里的處理是指可以進行任意的處理, 哪怕是過濾數(shù)據(jù) |
| mapPartition, 傳遞一個迭代器, 返回一個迭代器 |
| 什么, 你的不是迭代器怎么辦? 用List包裝, 再獲取集合的迭代器即可 |
栗子: 獲取每個數(shù)據(jù)分區(qū)的最大值
def main
(args
: Array
[String]): Unit = {val conf
: SparkConf
= new SparkConf
()conf
.setMaster
("local[*]")conf
.setAppName
("memoryRDD")val sc
: SparkContext
= new SparkContext
(conf
)val rdd
= sc
.makeRDD
(List
(1,2,3,4,5,6,7,8,9), 5)val mapRDD
:RDD
[Int] = rdd
.mapPartitions
(iter
=> List
(iter
.max
).iterator
)mapRDD
.collect
().foreach
(println
)mapRDD
.saveAsTextFile
("output")sc
.stop
()
}
Q: map 和 mapPartitions的區(qū)別 ?
角度區(qū)別
| 1.數(shù)據(jù)處理角度 | map算子是分區(qū)內(nèi)一個數(shù)據(jù)一個數(shù)據(jù)的執(zhí)行, 類似于串行操作; 而 mapPartitions算子是以分區(qū)為單位進行批處理操作; |
| 2.功能角度 | map算子主要目的是將數(shù)據(jù)源中的數(shù)據(jù)進行轉換和改變, 但不會減少或增多數(shù)據(jù); 而mapPatitiions算子需要傳遞一個迭代器,返回一個迭代器, 沒有要求?元素個數(shù)保持不變, 所以可增加或減少數(shù)據(jù); |
| 3. 性能的角度 | map算子類似于串行操作, 性能較低; mapPartitions算子類似于批處理, 性能較高; 但是mapPartitions算子會長時間占用內(nèi)存, 那么這樣會導致內(nèi)存可能不夠用, 出現(xiàn)內(nèi)存溢出錯誤, 內(nèi)存有限時, 使用map而不是mapPartitions |
1.3 mapPartitionsWithIndex
函數(shù)簽名函數(shù)說明
| def mapPartitionsWithIndex[U: ClassTag](f: (Index, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | 將待處理的數(shù)據(jù)以分區(qū)為單位發(fā)送到計算節(jié)點進行處理, 這里的處理可以使任意處理哪怕是過濾數(shù)據(jù), 在處理時同時可以獲取當前分區(qū)索引 |
| 入?yún)⑹?分區(qū)索引, 迭代器), 出參是迭代器 |
舉個🌰:
def main
(args
: Array
[String]): Unit = {val conf
: SparkConf
= new SparkConf
()conf
.setMaster
("local[*]")conf
.setAppName
("memoryRDD")val sc
: SparkContext
= new SparkContext
(conf
)val rdd
: RDD
[Int] = sc
.makeRDD
(List
(1,2,3,4), 2)val rddWithIndex
: RDD
[Int] = rdd
.mapPartitionsWithIndex
((index
, iterator
) => {index
match {case 1 => iterator
case _
=> Nil
.iterator
}})rddWithIndex
.collect
().foreach
(println
)rddWithIndex
.saveAsTextFile
("output")sc
.stop
()}
如何直接打印數(shù)據(jù)所在的分區(qū)?
val mpiRDD
= rdd
.mapPartitionsWithIndex
((index
, iter
) => {iter
.map
(num
=> {(index
, num
)})})
1.4 flatMap
函數(shù)簽名函數(shù)說明
| def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] | 將處理的數(shù)據(jù)進行扁平化后再進行映射處理, 扁平映射算子 |
| 入?yún)⑹荰, 出參要求是一個可迭代的集合 |
def main
(args
: Array
[String]): Unit = {val conf
: SparkConf
= new SparkConf
()conf
.setMaster
("local[*]")conf
.setAppName
("memoryRDD")val sc
: SparkContext
= new SparkContext
(conf
)val rdd
= sc
.makeRDD
(List
(List
(1,2),3,List
(4,5)))val rddMap
= rdd
.flatMap
(dat
=> {dat
match {case i
: Int => List
(i
)case j
: List
[_
] => j
}})rddMap
.saveAsTextFile
("output")rddMap
.collect
().foreach
(println
)sc
.stop
()
}
1.5 glom
函數(shù)簽名函數(shù)說明
| def glom(): RDD[Array[T]] | 將同一個分區(qū)的數(shù)據(jù)直接轉換為相同類型的內(nèi)存數(shù)組進行處理, 分區(qū)不變 |
| 將同一個分區(qū)里的元素合并到一個array中 |
計算所有分區(qū)最大值求和(分區(qū)內(nèi)取最大值,分區(qū)間最大值求和)
def main
(args
: Array
[String]): Unit = {val sparkConf
= new SparkConf
().setMaster
("local[*]").setAppName
("Operator")val sc
= new SparkContext
(sparkConf
)val rdd
: RDD
[Int] = sc
.makeRDD
(List
(1,2,3,4), 2)val glomRDD
: RDD
[Array
[Int]] = rdd
.glom
()val maxRDD
: RDD
[Int] = glomRDD
.map
(array
=> {array
.max
})println
(maxRDD
.collect
().sum
)sc
.stop
()}
}
1.6 groupBy
函數(shù)簽名函數(shù)說明
| def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] | 將數(shù)據(jù)根據(jù)指定的規(guī)則進行分組, 分區(qū)默認不變, 但是數(shù)據(jù)會被打亂重新組合, 這就是shuffle. 極限情況下, 數(shù)據(jù)可能被分在同一個分區(qū)中; |
| groupBy(f:elem => 對 elem的操作) |
一個組的數(shù)據(jù)在一個分區(qū)中, 但是并不是說一個分區(qū)中只有一個組;
List(“Hello”, “hive”, “hbase”, “Hadoop”)根據(jù)單詞首寫字母進行分組。
sc.groupBy(_.charAt(0))
從服務器日志數(shù)據(jù) apache.log 中獲取每個時間段訪問量。
//1. 配置文件對象val conf = new SparkConf();conf.setAppName("groupby")conf.setMaster("local[*]")//2. sparkcontextval sc = new SparkContext(conf)//3. 創(chuàng)建RDDval rddString: RDD[String] = sc.textFile("datas/apache.log")//4.val hourRDD: RDD[(String, Int)] = rddString.map(line => {//取出時間val strArr = line.split(" ")val time = strArr(3)//對time格式化//指定格式化模式val sdfOfDate = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")//格式化時間val date: Date = sdfOfDate.parse(time)//指定格式化模式val sdfOfHour = new SimpleDateFormat("HH")//格式化時間, 獲取到了小時val hour: String = sdfOfHour.format(date)(hour, 1)})//5. 分組val hourOfGroupedRDD: RDD[(String, Iterable[(String, Int)])] = hourRDD.groupBy(_._1)//6. 匯總計算val res: RDD[(String, Int)] = hourOfGroupedRDD.map {case (hour: String, iter: Iterable[(String, Int)]) =>(hour, iter.size)}res.collect().foreach(println)
WordCount
1.7 filter
函數(shù)簽名函數(shù)說明
| def filter(f: T => Boolean): RDD[T] | 按照規(guī)則篩選過濾; 處理后, 分區(qū)不變, 但是分區(qū)內(nèi)的數(shù)據(jù)可能不均衡, 生產(chǎn)環(huán)境, 可能會出現(xiàn)數(shù)據(jù)傾斜; |
| 符合規(guī)則的數(shù)據(jù)保留,不符合規(guī)則的數(shù)據(jù)丟棄 |
簡單例子:
val dataRDD
= sparkContext
.makeRDD(List(1,2,3,4),1)
val dataRDD1
= dataRDD
.filter(_
%2 == 0)
從服務器日志數(shù)據(jù) apache.log 中獲取 2015 年 5 月 17 日的請求路徑
val conf
= new SparkConf
();conf
.setAppName
("groupby")conf
.setMaster
("local[*]")val sc
= new SparkContext
(conf
)val rddString
: RDD
[String] = sc
.textFile
("datas/apache.log")val timeAndPathRDD
: RDD
[(String, String)] = rddString
.map
(line
=> {val strArr
= line
.split
(" ")val timeAndPathTuple
: Tuple2
[String, String] = (strArr
(3), strArr
(6))val sdfOfDate
= new SimpleDateFormat
("dd/MM/yyyy:HH:mm:ss")val date
: Date
= sdfOfDate
.parse
(timeAndPathTuple
._1
)val sdfOfTime
= new SimpleDateFormat
("dd/MM/yyyy")val time
: String = sdfOfTime
.format
(date
)(time
, timeAndPathTuple
._2
)})val res
= timeAndPathRDD
.filter
(_
._1
.equals
("17/05/2015"))res
.collect
().foreach
(println
)
1.8 sample
函數(shù)簽名函數(shù)說明
| def sample(withReplacement: Boolean,fraction:Double,seed: Long = Utils.random.nextLong): RDD[T] | 根據(jù)指定的規(guī)則從數(shù)據(jù)集中抽取數(shù)據(jù) |
| withReplacement | 是否放回(抽獎) |
| fraction | 概率 |
| seed | 種子 |
val dataRDD
= sparkContext
.makeRDD
(List
(1,2,3,4),1)val dataRDD1
= dataRDD
.sample
(false, 0.5)val dataRDD2
= dataRDD
.sample
(true, 2)
1.9 distinct
函數(shù)簽名函數(shù)說明
| def distinct()(implicit ord: Ordering[T] = null): RDD[T] | 將數(shù)據(jù)集中重復的數(shù)據(jù)去重 |
| def distinct(numPartitions: Int)(implicit ord:Ordering[T] = null): RDD[T] | |
val dataRDD
= sparkContext
.makeRDD
(List
(1,2,3,4,1,2),1)
val dataRDD1
= dataRDD
.distinct
()
val dataRDD2
= dataRDD
.distinct
(2)
1.10 coalesce
函數(shù)簽名函數(shù)說明
| def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] | 根據(jù)數(shù)據(jù)量縮減分區(qū),用于大數(shù)據(jù)集過濾后, 提高小數(shù)據(jù)集的執(zhí)行效率 |
| 當spark程序中, 存在過多的小任務的時候, 可以通過coalesce方法, 收縮合并分區(qū), 減少分區(qū)的個數(shù), 減少任務調度成本 |
注意: coalesce 默認不會打亂分區(qū)中的數(shù)據(jù). 縮減分區(qū)主要是單純的進行分區(qū)間的合并, 為了避免可能的數(shù)據(jù)傾斜, 此方法的參數(shù) shuffle = true, 通過shuffle去平衡數(shù)據(jù);
val dataRDD
= sparkContext
.makeRDD(List(1,2,3,4,1,2),6)
val dataRDD1
= dataRDD
.coalesce(2)
Q: 如何擴大分區(qū)??
1.11 repartition
函數(shù)簽名函數(shù)說明
| def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] | 該操作內(nèi)部其實執(zhí)行的是coalesce, 參數(shù)shuffle的默認值為true |
| 無論是將分區(qū)數(shù)多的RDD 轉換為分區(qū)數(shù)少的 RDD,還是將分區(qū)數(shù)少的 RDD 轉換為分區(qū)數(shù)多的 RDD,repartition操作都可以完成,因為無論如何都會經(jīng) shuffle 過程。 |
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-EVOB3uZv-1646825459606)(2022-03-08-16-45-38.png)]
coalesce 和 repartition 區(qū)別?
1.12 sortBy
函數(shù)簽名函數(shù)說明
| def sortBy[K](f: (T) => K, ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] | 在排序之前,可以將數(shù)據(jù)通過 f 函數(shù)進行處理,之后按照 f 函數(shù)處理的結果進行排序,默認為升序排列 |
| 排序后新產(chǎn)生的 RDD 的分區(qū)數(shù)與原 RDD 的分區(qū)數(shù)一致。中間存在 shuffle 的過程 |
2. 雙Value類型
2.13 intersection
函數(shù)簽名函數(shù)說明
| def intersection(other: RDD[T]): RDD[T] | 對源RDD和參數(shù)RDD求交集后返回一個新的RDD |
| |
2.14 union
函數(shù)簽名函數(shù)說明
| def union(other: RDD[T]): RDD[T] | 對源RDD和參數(shù)RDD求并集后返回一個新的RDD |
2.15 subtract
函數(shù)簽名函數(shù)說明
| def subtract(other: RDD[T]): RDD[T] | 以一個 RDD 元素為主,去除兩個 RDD 中重復元素,將其他元素保留下來 |
| |
2.16 zip
函數(shù)簽名函數(shù)說明
| def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] | 將兩個 RDD 中的元素,以鍵值對(拉鏈)的形式進行合并. |
| 鍵值對中的 Key 為第 1 個 RDD中的元素,Value 為第 2 個 RDD 中的相同位置的元素。 |
def main
(args
: Array
[String]): Unit = {val sparkConf
= new SparkConf
().setMaster
("local[*]").setAppName
("Operator")val sc
= new SparkContext
(sparkConf
)val rdd1
= sc
.makeRDD
(List
(1,2,3,4))val rdd2
= sc
.makeRDD
(List
(3,4,5,6))val rdd7
= sc
.makeRDD
(List
("3","4","5","6"))val rdd3
: RDD
[Int] = rdd1
.intersection
(rdd2
)println
(rdd3
.collect
().mkString
(","))val rdd4
: RDD
[Int] = rdd1
.union
(rdd2
)println
(rdd4
.collect
().mkString
(","))val rdd5
: RDD
[Int] = rdd1
.subtract
(rdd2
)println
(rdd5
.collect
().mkString
(","))val rdd6
: RDD
[(Int, Int)] = rdd1
.zip
(rdd2
)val rdd8
= rdd1
.zip
(rdd7
)println
(rdd6
.collect
().mkString
(","))sc
.stop
()}
思考一個問題:如果兩個 RDD 數(shù)據(jù)類型不一致怎么辦?
思考一個問題:如果兩個 RDD 數(shù)據(jù)分區(qū)不一致怎么辦?
思考一個問題:如果兩個 RDD 分區(qū)數(shù)據(jù)數(shù)量不一致怎么辦?
交集,并集和差集要求兩個數(shù)據(jù)源數(shù)據(jù)類型保持一致拉鏈操作兩個數(shù)據(jù)源的類型可以不一致[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-eyLK105S-1646825459608)(2022-03-08-17-00-31.png)]
3. Key-Value 類型
3.17 partitionBy
函數(shù)簽名函數(shù)說明
| def partitionBy(partitioner: Partitioner): RDD[(K, V)] | 將數(shù)據(jù)按照Partitioner重新進行分區(qū); Spark 默認的分區(qū)器是 HashPartitioner |
| |
-
思考一個問題:如果重分區(qū)的分區(qū)器和當前 RDD 的分區(qū)器一樣怎么辦?
-
思考一個問題:Spark 還有其他分區(qū)器嗎?
- RangePartitioner 一般在排序中使用
思考一個問題:如果想按照自己的方法進行數(shù)據(jù)分區(qū)怎么辦?
-
自己寫一個分區(qū)器(待補充)
思考一個問題:哪那么多問題?
3.18 reduceByKey
函數(shù)簽名函數(shù)說明
| def reduceByKey(func: (V, V) => V): RDD[(K, V)] | 可以將數(shù)據(jù)按照相同的Key對Value進行聚合 |
| def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] | |
val dataRDD1
= sparkContext
.makeRDD
(List
(("a",1),("b",2),("c",3)))
val dataRDD2
= dataRDD1
.reduceByKey
(_
+_
)
val dataRDD3
= dataRDD1
.reduceByKey
(_
+_
, 2)
Q: redeceByKey 和 groupBykey的區(qū)別?
從shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前對分區(qū)內(nèi)相同 key 的數(shù)據(jù)進行預聚合(combine)功能,這樣會減少落盤的數(shù)據(jù)量,而 groupByKey 只是進行分組,不存在數(shù)據(jù)量減少的問題,reduceByKey 性能比較高。
從功能的角度:reduceByKey 其實包含分組和聚合的功能。GroupByKey 只能分組,不能聚合,所以在分組聚合的場合下,推薦使用 reduceByKey,如果僅僅是分組而不需要聚合。那么還是只能使用 groupByKey
3.19 groupByKey
函數(shù)簽名函數(shù)說明
| def groupByKey(): RDD[(K, Iterable[V])] | 將數(shù)據(jù)源的數(shù)據(jù)根據(jù)key對value進行分組 |
| def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] | |
| def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] | |
| |
val dataRDD1
= sparkContext
.makeRDD
(List
(("a",1),("b",2),("c",3)))
val dataRDD2
= dataRDD1
.groupByKey
()
val dataRDD3
= dataRDD1
.groupByKey
(2)
val dataRDD4
= dataRDD1
.groupByKey
(new HashPartitioner
(2))
3.20 aggregateByKey
函數(shù)簽名函數(shù)說明
| def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] | 將數(shù)據(jù)根據(jù)不同的規(guī)則進行分區(qū)內(nèi)計算和分區(qū)間計算 |
| |
思考一個問題:分區(qū)內(nèi)計算規(guī)則和分區(qū)間計算規(guī)則相同怎么辦?
3.21 foldByKey
函數(shù)簽名函數(shù)說明
| def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] | 當分區(qū)內(nèi)計算規(guī)則和分區(qū)間計算規(guī)則相同時,aggregateByKey 就可以簡化為 foldByKey |
3.22 combineByKey
函數(shù)簽名函數(shù)說明
| def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] | 最通用的對 key-value 型 rdd 進行聚集操作的聚集函數(shù)(aggregation function) |
| 類似于arrregate(), combineByKey() 允許用戶返回值的類型與輸入不一致 |
3.23 sortByKey
函數(shù)簽名函數(shù)說明
| def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] | 在一個(K,V)的 RDD 上調用,K 必須實現(xiàn) Ordered 接口(特質),返回一個按照 key 進行排序 |
| 的 | |
3.24 join
函數(shù)簽名函數(shù)說明
| def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] | 在類型為(K,V)和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素連接在一起的 |
| (K,(V,W))的 RDD | |
3.25 leftOuterJoin
函數(shù)簽名函數(shù)說明
| def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] | 類似于 SQL 語句的左外連接 |
3.26 cogroup
函數(shù)簽名函數(shù)說明
| def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] | 在類型為(K,V)和(K,W)的 RDD 上調用,返回一個(K,(Iterable,Iterable))類型的 RDD |
5.1.4.4 RDD 行動算子 (Action)
1. reduce
函數(shù)簽名函數(shù)說明
| def reduce(f: (T, T) => T): T | 聚集 RDD 中的所有元素,先聚合分區(qū)內(nèi)數(shù)據(jù),再聚合分區(qū)間數(shù)據(jù) |
2. collect
函數(shù)簽名函數(shù)說明
| def collect(): Array[T] | 在驅動程序中,以數(shù)組 Array 的形式返回數(shù)據(jù)集的所有元素 |
3. count
函數(shù)簽名函數(shù)說明
| def count(): Long | 返回 RDD 中元素的個數(shù) |
4. first
函數(shù)簽名函數(shù)說明
| def first(): T | 返回 RDD 中的第一個元素 |
| |
5. take
函數(shù)簽名函數(shù)說明
| def take(num: Int): Array[T] | 返回一個由 RDD 的前 n 個元素組成的數(shù)組 |
| |
6. takeOrdered
函數(shù)簽名函數(shù)說明
| def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] | 返回該 RDD 排序后的前 n 個元素組成的數(shù)組 |
| |
7. aggregate
函數(shù)簽名函數(shù)說明
| def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U | 分區(qū)的數(shù)據(jù)通過初始值和分區(qū)內(nèi)的數(shù)據(jù)進行聚合,然后再和初始值進行分區(qū)間的數(shù)據(jù)聚合 |
8. fold
函數(shù)簽名函數(shù)說明
| def fold(zeroValue: T)(op: (T, T) => T): T | 統(tǒng)計每種 key 的個數(shù) |
9. countByKey
函數(shù)簽名函數(shù)說明
| def countByKey(): Map[K, Long] | 折疊操作,aggregate 的簡化版操作 |
10. save相關的算子
函數(shù)簽名函數(shù)說明
| def saveAsTextFile(path: String): Unit | |
| def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit | 將數(shù)據(jù)保存到不同格式的文件中 |
| def saveAsObjectFile(path: String): Unit | |
| def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit | |
11. foreach
總結
以上是生活随笔為你收集整理的五-中, Spark 算子 吐血总结(转化+行动算子共三十七个)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。