SparkRDD——行动算子
一、行動算子定義
spark的算子可以分為trans action算子 以及 action算子 ,即變換/轉換 算子。如果執行一個RDD算子并不觸發作業的提交,僅僅只是記錄作業中間處理過程,那么這就是trans action算子 ,相反如果執行這個 RDD 時會觸發 Spark Context 提交 Job 作業,那么它就是 action算子及行動算子。
總結來說就是在Spark中,轉換算子并不會馬上進行運算的,即所謂的“惰性運算”,而是在遇到行動算子時才會執行相應的語句的,觸發Spark的任務調度并開始進行計算。
我們可以將行動算子分為兩類:
- 1,數據運算類:主要用于觸發RDD計算,并得到計算結果返回給Spark程序或Shell界面;
- 2,數據存儲類:用于觸發RDD計算后,將結果保存到外部存儲系統中,如HDFS文件系統或數據庫。
二、總覽
一、數據運算類: 1、reduce 將rdd中的數據進行聚合,先進行分區內聚合,在進行分區間聚合 2、collect 將rdd中的數據按分區號采集,并以數組的形式返回所有數據 3、collectAsMap 收集Key/Value型RDD中的元素,并以map的形式返回數據 4、foreach 循環遍歷分區內數據,該算子執行位置是在Executor端 5、count 計算rdd中數據個數 6、first 取rdd中數據的第一個 7、take 取rdd中數據的前num個 8、takeOrdered 將rdd中的數據進行排序后取前num個 9、aggregate 類似于aggregateByKey算子,同樣兩個參數列表,分別傳遞初始值和分區內計算規則和分區間計算規則。 10、fold 簡化版的aggregate,分區內計算規則和分區間計算規則一樣。 11、countByKey 根據鍵值對中的key進行計數,返回一個map,對應了每個key在rdd中出現的次數。 12、countByValue 根據rdd中數據的數據值進行計數,注不是鍵值對中的value,同樣返回一個map,對應每個數據出現的次數。 13、max 求rdd中數據的最大值 14、min 求rdd中數據的最小值 二、數據存儲類: 1、saveAsTextFile 存儲為文本文件 2、saveAsObjectFile 存儲為二進制文件 3、saveAsSequenceFile 要求數據必須為<k,v>類型, 保存為 Sequencefile文件注:sequenceFile文件是Hadoop用來存儲二進制形式的 (Key,Value) 對而設計的一種平面文件。詳細可以看這篇文章了解:鏈接
三、數據運算類action算子
1、reduce
通過傳入的方法聚集rdd中所有的元素,先聚合分區內的數據,再聚合分區間的數據
def reduce(f: (T, T) => T): T
2、collect
數據采集,以數組Array的形式按分區順序返回數據集中的所有元素
def collect(): Array[T]
3、collectAsMap
收集Key/Value型RDD中的元素,并以map的形式返回數據
注:只有key/value類型的RDD才有這個方法
def collectAsMap(): Map[K, V]
4、foreach
循環遍歷分區內數據,該算子執行位置是在Executor端
def foreach(f: T => Unit): Unit
5、count
返回rdd中元素的個數,即collect返回的數組的長度
def count(): Long
6、first
返回rdd中的第一個元素,即collect返回的數組的第一個元素
def first(): T
7、take
返回rdd中的前n個元素,即collect返回的數組的前n個元素
def take(num: Int): Array[T]
8、takeOrdered
返回rdd中排序后的前n個元素
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
9、aggregate
與aggregateByKey類似,需要傳入兩個參數列表,列表元素意義也相同
- 第一個列表,傳入初始的比較值
- 第二個參數列表傳入兩個函數,分別表示分區內計算規則和分區間計算規則
aggregateByKey:初始值只會參與分區內計算
aggregate:初始值既會參與分區內計算也會參與分區間計算
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
10、fold
類似于foldByKey,即當aggregate的分區內和分區間計算規則相同時可以簡化使用fold,只需要傳入一個計算規則
def fold(zeroValue: T)(op: (T, T) => T): T
11、countByKey
用于統計鍵值對類型的數據中每個key出現的個數
def countByKey(): Map[K, Long]
12、countByValue
根據rdd中數據的數據值進行計數,注不是鍵值對中的value,同樣返回一個map,對應每個數據出現的次數。
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
13、max && min
返回rdd數據集中的最大值/最小值
def max()(implicit ord: Ordering[T]): T = withScope {this.reduce(ord.max) } def min()(implicit ord: Ordering[T]): T = withScope {this.reduce(ord.min) } val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1))) println(rdd.max()) println(rdd2.max()) println(rdd.min()) println(rdd2.min())總結
以上是生活随笔為你收集整理的SparkRDD——行动算子的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 前瞻:数据科学中的探索性数据分析(DEA
- 下一篇: 一个学员去了互联网大厂一个笔试题分享