Spark常用函数讲解之键值RDD转换
摘要:
RDD:彈性分布式數據集,是一種特殊集合?? 支持多種來源?? 有容錯機制?? 可以被緩存?? 支持并行操作,一個RDD代表一個分區里的數據集
RDD有兩種操作算子:
? ? ? ? Transformation(轉換):Transformation屬于延遲計算,當一個RDD轉換成另一個RDD時并沒有立即進行轉換,僅僅是記住 ? ? ? 了數據集的邏輯操作
? ? ? ? ?Ation(執行):觸發Spark作業的運行,真正觸發轉換算子的計算
?
本系列主要講解Spark中常用的函數操作:
? ? ? ? ?1.RDD基本轉換
? ? ? ? ?2.鍵-值RDD轉換
? ? ? ???3.Action操作篇
?
本節所講函數
1.mapValus
2.flatMapValues
3.comineByKey
4.foldByKey
5.reduceByKey
6.groupByKey
7.sortByKey
8.cogroup
9.join
10.LeftOutJoin
11.RightOutJoin
?
1.mapValus(fun):對[K,V]型數據中的V值map操作
(例1):對每個的的年齡加2
| 1 2 3 4 5 6 7 8 9 10 | object MapValues { ??def main(args: Array[String]) { ????val conf =?new?SparkConf().setMaster("local").setAppName("map") ????val sc =?new?SparkContext(conf) ????val list = List(("mobin",22),("kpop",20),("lufei",23)) ????val rdd = sc.parallelize(list) ????val mapValuesRDD = rdd.mapValues(_+2) ????mapValuesRDD.foreach(println) ??} } |
輸出:
(mobin,24) (kpop,22) (lufei,25)(RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分區集合,下同)
?
?
2.flatMapValues(fun):對[K,V]型數據中的V值flatmap操作
(例2):
| 1 2 3 4 | //省略<br>val list = List(("mobin",22),("kpop",20),("lufei",23)) val rdd = sc.parallelize(list) val mapValuesRDD = rdd.flatMapValues(x => Seq(x,"male")) mapValuesRDD.foreach(println) |
輸出:
(mobin,22) (mobin,male) (kpop,20) (kpop,male) (lufei,23) (lufei,male)如果是mapValues會輸出:
(mobin,List(22, male)) (kpop,List(20, male)) (lufei,List(23, male))(RDD依賴圖)
?
?
3.comineByKey(createCombiner,mergeValue,mergeCombiners,partitioner,mapSideCombine)
?
? ?comineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions)
?
? ?comineByKey(createCombiner,mergeValue,mergeCombiners)
?
createCombiner:在第一次遇到Key時創建組合器函數,將RDD數據集中的V類型值轉換C類型值(V => C),
如例3:
mergeValue:合并值函數,再次遇到相同的Key時,將createCombiner道理的C類型值與這次傳入的V類型值合并成一個C類型值(C,V)=>C,
如例3:
mergeCombiners:合并組合器函數,將C類型值兩兩合并成一個C類型值
如例3:
?
partitioner:使用已有的或自定義的分區函數,默認是HashPartitioner
?
mapSideCombine:是否在map端進行Combine操作,默認為true
?
注意前三個函數的參數類型要對應;第一次遇到Key時調用createCombiner,再次遇到相同的Key時調用mergeValue合并值
?
(例3):統計男性和女生的個數,并以(性別,(名字,名字....),個數)的形式輸出
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | object CombineByKey { ??def main(args: Array[String]) { ????val conf =?new?SparkConf().setMaster("local").setAppName("combinByKey") ????val sc =?new?SparkContext(conf) ????val people = List(("male",?"Mobin"), ("male",?"Kpop"), ("female",?"Lucy"), ("male",?"Lufei"), ("female",?"Amy")) ????val rdd = sc.parallelize(people) ????val combinByKeyRDD = rdd.combineByKey( ??????(x: String) => (List(x),?1), ??????(peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 +?1), ??????(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) ????combinByKeyRDD.foreach(println) ????sc.stop() ??} } |
輸出:
(male,(List(Lufei, Kpop, Mobin),3)) (female,(List(Amy, Lucy),2))過程分解:
Partition1: K="male" --> ("male","Mobin") --> createCombiner("Mobin") => peo1 = ( List("Mobin") , 1 ) K="male" --> ("male","Kpop") --> mergeValue(peo1,"Kpop") => peo2 = ( "Kpop" :: peo1_1 , 1 + 1 ) //Key相同調用mergeValue函數對值進行合并 K="female" --> ("female","Lucy") --> createCombiner("Lucy") => peo3 = ( List("Lucy") , 1 )Partition2: K="male" --> ("male","Lufei") --> createCombiner("Lufei") => peo4 = ( List("Lufei") , 1 ) K="female" --> ("female","Amy") --> createCombiner("Amy") => peo5 = ( List("Amy") , 1 )Merger Partition: K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin)) K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))(RDD依賴圖)
?
4.foldByKey(zeroValue)(func)
?
? foldByKey(zeroValue,partitioner)(func)
?
? foldByKey(zeroValue,numPartitiones)(func)
?
foldByKey函數是通過調用CombineByKey函數實現的
?
zeroVale:對V進行初始化,實際上是通過CombineByKey的createCombiner實現的 ?V => ?(zeroValue,V),再通過func函數映射成新的值,即func(zeroValue,V),如例4可看作對每個V先進行 ?V=> 2?+ V ?
?
func:?Value將通過func函數按Key值進行合并(實際上是通過CombineByKey的mergeValue,mergeCombiners函數實現的,只不過在這里,這兩個函數是相同的)
例4:
| 1 2 3 4 5 | //省略 ????val people = List(("Mobin",?2), ("Mobin",?1), ("Lucy",?2), ("Amy",?1), ("Lucy",?3)) ????val rdd = sc.parallelize(people) ????val foldByKeyRDD = rdd.foldByKey(2)(_+_) ????foldByKeyRDD.foreach(println) |
輸出:
(Amy,2) (Mobin,4) (Lucy,6)先對每個V都加2,再對相同Key的value值相加。
?
?
5.reduceByKey(func,numPartitions):按Key進行分組,使用給定的func函數聚合value值,?numPartitions設置分區數,提高作業并行度
例5
| 1 2 3 4 5 6 | //省略 val arr = List(("A",3),("A",2),("B",1),("B",3)) val rdd = sc.parallelize(arr) val reduceByKeyRDD = rdd.reduceByKey(_ +_) reduceByKeyRDD.foreach(println) sc.stop |
輸出:
(A,5) (A,4)(RDD依賴圖)
?
6.groupByKey(numPartitions):按Key進行分組,返回[K,Iterable[V]],numPartitions設置分區數,提高作業并行度
例6:
| 1 2 3 4 5 6 | //省略 val arr = List(("A",1),("B",2),("A",2),("B",3)) val rdd = sc.parallelize(arr) val groupByKeyRDD = rdd.groupByKey() groupByKeyRDD.foreach(println) sc.stop |
輸出:
(B,CompactBuffer(2, 3)) (A,CompactBuffer(1, 2))?
以上foldByKey,reduceByKey,groupByKey函數最終都是通過調用combineByKey函數實現的
?
7.sortByKey(accending,numPartitions):返回以Key排序的(K,V)鍵值對組成的RDD,accending為true時表示升序,為false時表示降序,numPartitions設置分區數,提高作業并行度
例7:
| 1 2 3 4 5 6 | //省略sc val arr = List(("A",1),("B",2),("A",2),("B",3)) val rdd = sc.parallelize(arr) val sortByKeyRDD = rdd.sortByKey() sortByKeyRDD.foreach(println) sc.stop |
輸出:
(A,1) (A,2) (B,2) (B,3)?
8.cogroup(otherDataSet,numPartitions):對兩個RDD(如:(K,V)和(K,W))相同Key的元素先分別做聚合,最后返回(K,Iterator<V>,Iterator<W>)形式的RDD,numPartitions設置分區數,提高作業并行度
例8:
| 1 2 3 4 5 6 7 8 | //省略 val arr = List(("A",?1), ("B",?2), ("A",?2), ("B",?3)) val arr1 = List(("A",?"A1"), ("B",?"B1"), ("A",?"A2"), ("B",?"B2")) val rdd1 = sc.parallelize(arr,?3) val rdd2 = sc.parallelize(arr1,?3) val groupByKeyRDD = rdd1.cogroup(rdd2) groupByKeyRDD.foreach(println) sc.stop |
輸出:
(B,(CompactBuffer(2, 3),CompactBuffer(B1, B2))) (A,(CompactBuffer(1, 2),CompactBuffer(A1, A2)))(RDD依賴圖)
?
9.join(otherDataSet,numPartitions):對兩個RDD先進行cogroup操作形成新的RDD,再對每個Key下的元素進行笛卡爾積,numPartitions設置分區數,提高作業并行度
例9
| 1 2 3 4 5 6 7 | //省略 val arr = List(("A",?1), ("B",?2), ("A",?2), ("B",?3)) val arr1 = List(("A",?"A1"), ("B",?"B1"), ("A",?"A2"), ("B",?"B2")) val rdd = sc.parallelize(arr,?3) val rdd1 = sc.parallelize(arr1,?3) val groupByKeyRDD = rdd.join(rdd1) groupByKeyRDD.foreach(println) |
輸出:
(B,(2,B1)) (B,(2,B2)) (B,(3,B1)) (B,(3,B2))(A,(1,A1)) (A,(1,A2)) (A,(2,A1)) (A,(2,A2)(RDD依賴圖)
?
10.LeftOutJoin(otherDataSet,numPartitions):左外連接,包含左RDD的所有數據,如果右邊沒有與之匹配的用None表示,numPartitions設置分區數,提高作業并行度
例10:
| 1 2 3 4 5 6 7 8 | //省略 val arr = List(("A",?1), ("B",?2), ("A",?2), ("B",?3),("C",1)) val arr1 = List(("A",?"A1"), ("B",?"B1"), ("A",?"A2"), ("B",?"B2")) val rdd = sc.parallelize(arr,?3) val rdd1 = sc.parallelize(arr1,?3) val leftOutJoinRDD = rdd.leftOuterJoin(rdd1) leftOutJoinRDD .foreach(println) sc.stop |
輸出:
(B,(2,Some(B1))) (B,(2,Some(B2))) (B,(3,Some(B1))) (B,(3,Some(B2)))(C,(1,None))(A,(1,Some(A1))) (A,(1,Some(A2))) (A,(2,Some(A1))) (A,(2,Some(A2)))?
11.RightOutJoin(otherDataSet,?numPartitions):右外連接,包含右RDD的所有數據,如果左邊沒有與之匹配的用None表示,numPartitions設置分區數,提高作業并行度
例11:
| 1 2 3 4 5 6 7 8 | //省略 val arr = List(("A",?1), ("B",?2), ("A",?2), ("B",?3)) val arr1 = List(("A",?"A1"), ("B",?"B1"), ("A",?"A2"), ("B",?"B2"),("C","C1")) val rdd = sc.parallelize(arr,?3) val rdd1 = sc.parallelize(arr1,?3) val rightOutJoinRDD = rdd.rightOuterJoin(rdd1) rightOutJoinRDD.foreach(println) sc.stop |
輸出:
(B,(Some(2),B1)) (B,(Some(2),B2)) (B,(Some(3),B1)) (B,(Some(3),B2))(C,(None,C1))(A,(Some(1),A1)) (A,(Some(1),A2)) (A,(Some(2),A1)) (A,(Some(2),A2))?
以上例子源碼地址:https://github.com/Mobin-F/SparkExample/tree/master/src/main/scala/com/mobin/SparkRDDFun/TransFormation/RDDBase
總結
以上是生活随笔為你收集整理的Spark常用函数讲解之键值RDD转换的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark RDD使用详解2--RDD创
- 下一篇: Spark RDD中cache和pers