文章目錄
- 五-中, Spark 算子吐血總結
- 5.1.4.3 RDD 轉換算子(Transformation)
- 1. Value類型
- 1.1 `map`
- 1.2 `mapPartitions`
- 1.3 `mapPartitionsWithIndex`
- 1.4 `flatMap`
- 1.5 `glom`
- 1.6 `groupBy`
- 1.7 `filter`
- 1.8 `sample`
- 1.9 `distinct`
- 1.10 `coalesce`
- 1.11 `repartition`
- 1.12 `sortBy`
- 2. 雙Value類型
- 2.13 `intersection`
- 2.14 `union`
- 2.15 `subtract`
- 2.16 `zip`
- 3. Key-Value 類型
- 3.17 partitionBy
- 3.18 reduceByKey
- 3.19 groupByKey
- 3.20 aggregateByKey
- 3.21 foldByKey
- 3.22 combineByKey
- 3.23 sortByKey
- 3.24 join
- 3.25 leftOuterJoin
- 3.26 cogroup
- 5.1.4.4 RDD 行動算子 (Action)
- 1. reduce
- 2. collect
- 3. count
- 4. first
- 5. take
- 6. takeOrdered
- 7. aggregate
- 8. fold
- 9. countByKey
- 10. save相關的算子
- 11. foreach
五-中, Spark 算子吐血總結
5.1.4.3 RDD 轉換算子(Transformation)
什么是算子?
在流處理、交互式查詢中有個常用的概念是“算子”,在英文中被成為“Operation”,在數學上可以解釋為一個函數空間到另一個函數空間上的映射O:X->X,其實就是一個處理單元,往往是指一個函數,在使用算子時往往會有輸入和輸出,算子則完成相應數據的轉化,比如:Group、Sort等都是算子。
從大方向來說, Spark算子(RDD方法)大致可以分為以下兩類:
Transformation 變換/轉換算子 : 這種變換并不觸發提交作業, 而是完成作業中間過程處理; Transformation 操作是延遲計算的, 也就是說從一個RDD轉換為另一個RDD的轉換操作不是馬上執行, 需要等到有Action操作(行動算子)的時候才會真正觸發運算;Action 行動算子: 這類算子會觸發SparkContext 提交Job作業, 并將數據輸出到Spark系統;
從小方向來說, Spark算子大致分為三類:
Value數據類型的Transformation算子, 這種變換并不觸發提交作業, 針對處理的數據項是Value型的數據;Key-Value數據類型的Transformation算子, 這種變換并不觸發提交作業, 針對處理的數據項是Key-Value型的數據對;Action算子, 這類算子會觸發SparkContext提交Job作業;
1. Value類型
1.1 map
函數簽名函數說明
| def map[U: ClassTag](f: T => U): RDD[U] | 將待處理的數據逐條進行映射轉換, 這里的轉換可以是類型的轉換, 也可以是值的轉換 |
| 其實就是Scala集合函數中的map((數據變量:數據類型) => {對每一條數據的映射操作}) |
val dataRDD
: RDD
[Int] = sparkContext
.makeRDD
(List
(1,2,3,4))
val dataRDD1
: RDD
[Int] = dataRDD
.map
(num
=> {num
* 2
})
val dataRDD2
: RDD
[String] = dataRDD1
.map
(num
=> {"" + num
})
map的并行計算🌰:
// 1. rdd的計算一個分區內的數據是一個一個執行邏輯
// 只有前面一個數據全部的邏輯執行完畢后,才會執行下一個數據。
// 分區內數據的執行是有序的。
// 2. 不同分區數據計算是無序的。
1.2 mapPartitions
函數簽名函數說明
| def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] | 將待處理的數據以分區為單位發送到計算結點進行處理, 這里的處理是指可以進行任意的處理, 哪怕是過濾數據 |
| mapPartition, 傳遞一個迭代器, 返回一個迭代器 |
| 什么, 你的不是迭代器怎么辦? 用List包裝, 再獲取集合的迭代器即可 |
栗子: 獲取每個數據分區的最大值
def main
(args
: Array
[String]): Unit = {val conf
: SparkConf
= new SparkConf
()conf
.setMaster
("local[*]")conf
.setAppName
("memoryRDD")val sc
: SparkContext
= new SparkContext
(conf
)val rdd
= sc
.makeRDD
(List
(1,2,3,4,5,6,7,8,9), 5)val mapRDD
:RDD
[Int] = rdd
.mapPartitions
(iter
=> List
(iter
.max
).iterator
)mapRDD
.collect
().foreach
(println
)mapRDD
.saveAsTextFile
("output")sc
.stop
()
}
Q: map 和 mapPartitions的區別 ?
角度區別
| 1.數據處理角度 | map算子是分區內一個數據一個數據的執行, 類似于串行操作; 而 mapPartitions算子是以分區為單位進行批處理操作; |
| 2.功能角度 | map算子主要目的是將數據源中的數據進行轉換和改變, 但不會減少或增多數據; 而mapPatitiions算子需要傳遞一個迭代器,返回一個迭代器, 沒有要求?元素個數保持不變, 所以可增加或減少數據; |
| 3. 性能的角度 | map算子類似于串行操作, 性能較低; mapPartitions算子類似于批處理, 性能較高; 但是mapPartitions算子會長時間占用內存, 那么這樣會導致內存可能不夠用, 出現內存溢出錯誤, 內存有限時, 使用map而不是mapPartitions |
1.3 mapPartitionsWithIndex
函數簽名函數說明
| def mapPartitionsWithIndex[U: ClassTag](f: (Index, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | 將待處理的數據以分區為單位發送到計算節點進行處理, 這里的處理可以使任意處理哪怕是過濾數據, 在處理時同時可以獲取當前分區索引 |
| 入參是(分區索引, 迭代器), 出參是迭代器 |
舉個🌰:
def main
(args
: Array
[String]): Unit = {val conf
: SparkConf
= new SparkConf
()conf
.setMaster
("local[*]")conf
.setAppName
("memoryRDD")val sc
: SparkContext
= new SparkContext
(conf
)val rdd
: RDD
[Int] = sc
.makeRDD
(List
(1,2,3,4), 2)val rddWithIndex
: RDD
[Int] = rdd
.mapPartitionsWithIndex
((index
, iterator
) => {index
match {case 1 => iterator
case _
=> Nil
.iterator
}})rddWithIndex
.collect
().foreach
(println
)rddWithIndex
.saveAsTextFile
("output")sc
.stop
()}
如何直接打印數據所在的分區?
val mpiRDD
= rdd
.mapPartitionsWithIndex
((index
, iter
) => {iter
.map
(num
=> {(index
, num
)})})
1.4 flatMap
函數簽名函數說明
| def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] | 將處理的數據進行扁平化后再進行映射處理, 扁平映射算子 |
| 入參是T, 出參要求是一個可迭代的集合 |
def main
(args
: Array
[String]): Unit = {val conf
: SparkConf
= new SparkConf
()conf
.setMaster
("local[*]")conf
.setAppName
("memoryRDD")val sc
: SparkContext
= new SparkContext
(conf
)val rdd
= sc
.makeRDD
(List
(List
(1,2),3,List
(4,5)))val rddMap
= rdd
.flatMap
(dat
=> {dat
match {case i
: Int => List
(i
)case j
: List
[_
] => j
}})rddMap
.saveAsTextFile
("output")rddMap
.collect
().foreach
(println
)sc
.stop
()
}
1.5 glom
函數簽名函數說明
| def glom(): RDD[Array[T]] | 將同一個分區的數據直接轉換為相同類型的內存數組進行處理, 分區不變 |
| 將同一個分區里的元素合并到一個array中 |
計算所有分區最大值求和(分區內取最大值,分區間最大值求和)
def main
(args
: Array
[String]): Unit = {val sparkConf
= new SparkConf
().setMaster
("local[*]").setAppName
("Operator")val sc
= new SparkContext
(sparkConf
)val rdd
: RDD
[Int] = sc
.makeRDD
(List
(1,2,3,4), 2)val glomRDD
: RDD
[Array
[Int]] = rdd
.glom
()val maxRDD
: RDD
[Int] = glomRDD
.map
(array
=> {array
.max
})println
(maxRDD
.collect
().sum
)sc
.stop
()}
}
1.6 groupBy
函數簽名函數說明
| def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] | 將數據根據指定的規則進行分組, 分區默認不變, 但是數據會被打亂重新組合, 這就是shuffle. 極限情況下, 數據可能被分在同一個分區中; |
| groupBy(f:elem => 對 elem的操作) |
一個組的數據在一個分區中, 但是并不是說一個分區中只有一個組;
List(“Hello”, “hive”, “hbase”, “Hadoop”)根據單詞首寫字母進行分組。
sc.groupBy(_.charAt(0))
從服務器日志數據 apache.log 中獲取每個時間段訪問量。
//1. 配置文件對象val conf = new SparkConf();conf.setAppName("groupby")conf.setMaster("local[*]")//2. sparkcontextval sc = new SparkContext(conf)//3. 創建RDDval rddString: RDD[String] = sc.textFile("datas/apache.log")//4.val hourRDD: RDD[(String, Int)] = rddString.map(line => {//取出時間val strArr = line.split(" ")val time = strArr(3)//對time格式化//指定格式化模式val sdfOfDate = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")//格式化時間val date: Date = sdfOfDate.parse(time)//指定格式化模式val sdfOfHour = new SimpleDateFormat("HH")//格式化時間, 獲取到了小時val hour: String = sdfOfHour.format(date)(hour, 1)})//5. 分組val hourOfGroupedRDD: RDD[(String, Iterable[(String, Int)])] = hourRDD.groupBy(_._1)//6. 匯總計算val res: RDD[(String, Int)] = hourOfGroupedRDD.map {case (hour: String, iter: Iterable[(String, Int)]) =>(hour, iter.size)}res.collect().foreach(println)
WordCount
1.7 filter
函數簽名函數說明
| def filter(f: T => Boolean): RDD[T] | 按照規則篩選過濾; 處理后, 分區不變, 但是分區內的數據可能不均衡, 生產環境, 可能會出現數據傾斜; |
| 符合規則的數據保留,不符合規則的數據丟棄 |
簡單例子:
val dataRDD
= sparkContext
.makeRDD(List(1,2,3,4),1)
val dataRDD1
= dataRDD
.filter(_
%2 == 0)
從服務器日志數據 apache.log 中獲取 2015 年 5 月 17 日的請求路徑
val conf
= new SparkConf
();conf
.setAppName
("groupby")conf
.setMaster
("local[*]")val sc
= new SparkContext
(conf
)val rddString
: RDD
[String] = sc
.textFile
("datas/apache.log")val timeAndPathRDD
: RDD
[(String, String)] = rddString
.map
(line
=> {val strArr
= line
.split
(" ")val timeAndPathTuple
: Tuple2
[String, String] = (strArr
(3), strArr
(6))val sdfOfDate
= new SimpleDateFormat
("dd/MM/yyyy:HH:mm:ss")val date
: Date
= sdfOfDate
.parse
(timeAndPathTuple
._1
)val sdfOfTime
= new SimpleDateFormat
("dd/MM/yyyy")val time
: String = sdfOfTime
.format
(date
)(time
, timeAndPathTuple
._2
)})val res
= timeAndPathRDD
.filter
(_
._1
.equals
("17/05/2015"))res
.collect
().foreach
(println
)
1.8 sample
函數簽名函數說明
| def sample(withReplacement: Boolean,fraction:Double,seed: Long = Utils.random.nextLong): RDD[T] | 根據指定的規則從數據集中抽取數據 |
| withReplacement | 是否放回(抽獎) |
| fraction | 概率 |
| seed | 種子 |
val dataRDD
= sparkContext
.makeRDD
(List
(1,2,3,4),1)val dataRDD1
= dataRDD
.sample
(false, 0.5)val dataRDD2
= dataRDD
.sample
(true, 2)
1.9 distinct
函數簽名函數說明
| def distinct()(implicit ord: Ordering[T] = null): RDD[T] | 將數據集中重復的數據去重 |
| def distinct(numPartitions: Int)(implicit ord:Ordering[T] = null): RDD[T] | |
val dataRDD
= sparkContext
.makeRDD
(List
(1,2,3,4,1,2),1)
val dataRDD1
= dataRDD
.distinct
()
val dataRDD2
= dataRDD
.distinct
(2)
1.10 coalesce
函數簽名函數說明
| def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] | 根據數據量縮減分區,用于大數據集過濾后, 提高小數據集的執行效率 |
| 當spark程序中, 存在過多的小任務的時候, 可以通過coalesce方法, 收縮合并分區, 減少分區的個數, 減少任務調度成本 |
注意: coalesce 默認不會打亂分區中的數據. 縮減分區主要是單純的進行分區間的合并, 為了避免可能的數據傾斜, 此方法的參數 shuffle = true, 通過shuffle去平衡數據;
val dataRDD
= sparkContext
.makeRDD(List(1,2,3,4,1,2),6)
val dataRDD1
= dataRDD
.coalesce(2)
Q: 如何擴大分區??
1.11 repartition
函數簽名函數說明
| def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] | 該操作內部其實執行的是coalesce, 參數shuffle的默認值為true |
| 無論是將分區數多的RDD 轉換為分區數少的 RDD,還是將分區數少的 RDD 轉換為分區數多的 RDD,repartition操作都可以完成,因為無論如何都會經 shuffle 過程。 |
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-EVOB3uZv-1646825459606)(2022-03-08-16-45-38.png)]
coalesce 和 repartition 區別?
1.12 sortBy
函數簽名函數說明
| def sortBy[K](f: (T) => K, ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] | 在排序之前,可以將數據通過 f 函數進行處理,之后按照 f 函數處理的結果進行排序,默認為升序排列 |
| 排序后新產生的 RDD 的分區數與原 RDD 的分區數一致。中間存在 shuffle 的過程 |
2. 雙Value類型
2.13 intersection
函數簽名函數說明
| def intersection(other: RDD[T]): RDD[T] | 對源RDD和參數RDD求交集后返回一個新的RDD |
| |
2.14 union
函數簽名函數說明
| def union(other: RDD[T]): RDD[T] | 對源RDD和參數RDD求并集后返回一個新的RDD |
2.15 subtract
函數簽名函數說明
| def subtract(other: RDD[T]): RDD[T] | 以一個 RDD 元素為主,去除兩個 RDD 中重復元素,將其他元素保留下來 |
| |
2.16 zip
函數簽名函數說明
| def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] | 將兩個 RDD 中的元素,以鍵值對(拉鏈)的形式進行合并. |
| 鍵值對中的 Key 為第 1 個 RDD中的元素,Value 為第 2 個 RDD 中的相同位置的元素。 |
def main
(args
: Array
[String]): Unit = {val sparkConf
= new SparkConf
().setMaster
("local[*]").setAppName
("Operator")val sc
= new SparkContext
(sparkConf
)val rdd1
= sc
.makeRDD
(List
(1,2,3,4))val rdd2
= sc
.makeRDD
(List
(3,4,5,6))val rdd7
= sc
.makeRDD
(List
("3","4","5","6"))val rdd3
: RDD
[Int] = rdd1
.intersection
(rdd2
)println
(rdd3
.collect
().mkString
(","))val rdd4
: RDD
[Int] = rdd1
.union
(rdd2
)println
(rdd4
.collect
().mkString
(","))val rdd5
: RDD
[Int] = rdd1
.subtract
(rdd2
)println
(rdd5
.collect
().mkString
(","))val rdd6
: RDD
[(Int, Int)] = rdd1
.zip
(rdd2
)val rdd8
= rdd1
.zip
(rdd7
)println
(rdd6
.collect
().mkString
(","))sc
.stop
()}
思考一個問題:如果兩個 RDD 數據類型不一致怎么辦?
思考一個問題:如果兩個 RDD 數據分區不一致怎么辦?
思考一個問題:如果兩個 RDD 分區數據數量不一致怎么辦?
交集,并集和差集要求兩個數據源數據類型保持一致拉鏈操作兩個數據源的類型可以不一致[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-eyLK105S-1646825459608)(2022-03-08-17-00-31.png)]
3. Key-Value 類型
3.17 partitionBy
函數簽名函數說明
| def partitionBy(partitioner: Partitioner): RDD[(K, V)] | 將數據按照Partitioner重新進行分區; Spark 默認的分區器是 HashPartitioner |
| |
3.18 reduceByKey
函數簽名函數說明
| def reduceByKey(func: (V, V) => V): RDD[(K, V)] | 可以將數據按照相同的Key對Value進行聚合 |
| def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] | |
val dataRDD1
= sparkContext
.makeRDD
(List
(("a",1),("b",2),("c",3)))
val dataRDD2
= dataRDD1
.reduceByKey
(_
+_
)
val dataRDD3
= dataRDD1
.reduceByKey
(_
+_
, 2)
Q: redeceByKey 和 groupBykey的區別?
從shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前對分區內相同 key 的數據進行預聚合(combine)功能,這樣會減少落盤的數據量,而 groupByKey 只是進行分組,不存在數據量減少的問題,reduceByKey 性能比較高。
從功能的角度:reduceByKey 其實包含分組和聚合的功能。GroupByKey 只能分組,不能聚合,所以在分組聚合的場合下,推薦使用 reduceByKey,如果僅僅是分組而不需要聚合。那么還是只能使用 groupByKey
3.19 groupByKey
函數簽名函數說明
| def groupByKey(): RDD[(K, Iterable[V])] | 將數據源的數據根據key對value進行分組 |
| def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] | |
| def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] | |
| |
val dataRDD1
= sparkContext
.makeRDD
(List
(("a",1),("b",2),("c",3)))
val dataRDD2
= dataRDD1
.groupByKey
()
val dataRDD3
= dataRDD1
.groupByKey
(2)
val dataRDD4
= dataRDD1
.groupByKey
(new HashPartitioner
(2))
3.20 aggregateByKey
函數簽名函數說明
| def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] | 將數據根據不同的規則進行分區內計算和分區間計算 |
| |
思考一個問題:分區內計算規則和分區間計算規則相同怎么辦?
3.21 foldByKey
函數簽名函數說明
| def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] | 當分區內計算規則和分區間計算規則相同時,aggregateByKey 就可以簡化為 foldByKey |
3.22 combineByKey
函數簽名函數說明
| def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] | 最通用的對 key-value 型 rdd 進行聚集操作的聚集函數(aggregation function) |
| 類似于arrregate(), combineByKey() 允許用戶返回值的類型與輸入不一致 |
3.23 sortByKey
函數簽名函數說明
| def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] | 在一個(K,V)的 RDD 上調用,K 必須實現 Ordered 接口(特質),返回一個按照 key 進行排序 |
| 的 | |
3.24 join
函數簽名函數說明
| def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] | 在類型為(K,V)和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素連接在一起的 |
| (K,(V,W))的 RDD | |
3.25 leftOuterJoin
函數簽名函數說明
| def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] | 類似于 SQL 語句的左外連接 |
3.26 cogroup
函數簽名函數說明
| def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] | 在類型為(K,V)和(K,W)的 RDD 上調用,返回一個(K,(Iterable,Iterable))類型的 RDD |
5.1.4.4 RDD 行動算子 (Action)
1. reduce
函數簽名函數說明
| def reduce(f: (T, T) => T): T | 聚集 RDD 中的所有元素,先聚合分區內數據,再聚合分區間數據 |
2. collect
函數簽名函數說明
| def collect(): Array[T] | 在驅動程序中,以數組 Array 的形式返回數據集的所有元素 |
3. count
函數簽名函數說明
| def count(): Long | 返回 RDD 中元素的個數 |
4. first
函數簽名函數說明
| def first(): T | 返回 RDD 中的第一個元素 |
| |
5. take
函數簽名函數說明
| def take(num: Int): Array[T] | 返回一個由 RDD 的前 n 個元素組成的數組 |
| |
6. takeOrdered
函數簽名函數說明
| def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] | 返回該 RDD 排序后的前 n 個元素組成的數組 |
| |
7. aggregate
函數簽名函數說明
| def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U | 分區的數據通過初始值和分區內的數據進行聚合,然后再和初始值進行分區間的數據聚合 |
8. fold
函數簽名函數說明
| def fold(zeroValue: T)(op: (T, T) => T): T | 統計每種 key 的個數 |
9. countByKey
函數簽名函數說明
| def countByKey(): Map[K, Long] | 折疊操作,aggregate 的簡化版操作 |
10. save相關的算子
函數簽名函數說明
| def saveAsTextFile(path: String): Unit | |
| def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit | 將數據保存到不同格式的文件中 |
| def saveAsObjectFile(path: String): Unit | |
| def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit | |
11. foreach
總結
以上是生活随笔為你收集整理的五-中, Spark 算子 吐血总结(转化+行动算子共三十七个)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。