Spark笔记:复杂RDD的API的理解(上)
本篇接著講解RDD的API,講解那些不是很容易理解的API,同時(shí)本篇文章還將展示如何將外部的函數(shù)引入到RDD的API里使用,最后通過對RDD的API深入學(xué)習(xí),我們還講講一些和RDD開發(fā)相關(guān)的scala語法。
1) ?aggregate(zeroValue)(seqOp,combOp)
?該函數(shù)的功能和reduce函數(shù)一樣,也是對數(shù)據(jù)進(jìn)行聚合操作,不過aggregate可以返回和原RDD不同的數(shù)據(jù)類型,使用時(shí)候還要提供初始值。
我們來看看下面的用法,代碼如下:
| 1 2 3 4 | val?rddInt:?RDD[Int]?=?sc.parallelize(List(1,?2,?3,?4,?5),?1) ? val?rddAggr1:?(Int, Int)?=?rddInt.aggregate((0,?0))((x, y)?=> (x._1?+ y, x._2?+?1), (x, y)?=> (x._1?+ y._1, x._2?+ y._2)) println("====aggregate 1====:"?+ rddAggr1.toString())?// (15,5) |
該方法是將有數(shù)字組成的RDD的數(shù)值進(jìn)行求和,同時(shí)還要統(tǒng)計(jì)元素的個(gè)數(shù),這樣我們就可以計(jì)算出一個(gè)平均值,這點(diǎn)在實(shí)際運(yùn)算中是非常有用的。
假如讀者不太懂scala語言,或者就算懂那么一點(diǎn)點(diǎn)scala語法,該API的使用還是讓人很難理解的,這個(gè)x是什么東西,這個(gè)y又是什么東西,為什么它們放在一起這么運(yùn)算就可以得到預(yù)期結(jié)果呢?
其實(shí)aggregate方法使用了scala里元組的結(jié)構(gòu),元組是scala里很具特色的數(shù)據(jù)結(jié)構(gòu),我們看看下面的代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | val?tuple2Param1:Tuple2[String,Int]?=?Tuple2("x01",12)// 標(biāo)準(zhǔn)定義二元組 val?tuple2Param2:(String,Int)?=?("x02",29)// 字面量定義二元組 ? /* 結(jié)果: x01:12*/ println("====tuple2Param1====:"?+ tuple2Param1._1?+?":"?+ tuple2Param1._2) /* 結(jié)果: x02:29 */ println("====tuple2Param2====:"?+ tuple2Param2._1?+?":"?+ tuple2Param2._2) ? val?tuple6Param1:Tuple6[String,Int,Int,Int,Int,String]?=?Tuple6("xx01",1,2,3,4,"x1x")// 標(biāo)準(zhǔn)定義6元組 val?tuple6Param2:(String,Int,Int,Int,Int,String)?=?("xx02",1,2,3,4,"x2x")// 字面量定義6元組 ? /* 結(jié)果: xx01:1:2:3:4:x1x */ println("====tuple6Param1====:"?+ tuple6Param1._1?+?":"?+ tuple6Param1._2?+?":"?+ tuple6Param1._3?+?":"?+ tuple6Param1._4?+?":"?+ tuple6Param1._5?+?":"?+ tuple6Param1._6) /* 結(jié)果: xx02:1:2:3:4:x2x */ println("====tuple6Param2====:"?+ tuple6Param2._1?+?":"?+ tuple6Param2._2?+?":"?+ tuple6Param2._3?+?":"?+ tuple6Param2._4?+?":"?+ tuple6Param2._5?+?":"?+ tuple6Param2._6) |
元組在scala里使用Tuple來構(gòu)造,不過實(shí)際運(yùn)用中我們會(huì)給Tuple帶上數(shù)字后綴,例如Tuple2就是二元組它包含兩個(gè)元素,Tuple6是6元組它包含6個(gè)元素,元組看起來很像數(shù)組,但是數(shù)組只能存儲(chǔ)相同數(shù)據(jù)類型的數(shù)據(jù)結(jié)構(gòu),而元組是可以存儲(chǔ)不同數(shù)據(jù)類型的數(shù)據(jù)結(jié)構(gòu),元組里元素訪問使用_1,_2這樣的形式,第一個(gè)元素是從1開始標(biāo)記的,這點(diǎn)和數(shù)組是不同的。實(shí)際使用中我們很少使用Tuple構(gòu)造元組,而是使用字面量定義方式(參見代碼注釋),由此我們可以看出spark里鍵值對RDD其實(shí)就是使用二元組來表示鍵值對數(shù)據(jù)結(jié)構(gòu),回到aggregate方法,它的運(yùn)算也是通過二元組這種數(shù)據(jù)結(jié)構(gòu)完成的。
下面我們來看看aggregate的運(yùn)算過程,這里我將aggregate方法里的算子都使用外部函數(shù),代碼如下所示:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | def?aggrFtnOne(par:?((Int, Int), Int)):?(Int, Int)?=?{ ??/* ?????*aggregate的初始值為(0,0): ??????====aggrFtnOne Param===:((0,0),1) ??====aggrFtnOne Param===:((1,1),2) ??====aggrFtnOne Param===:((3,2),3) ??====aggrFtnOne Param===:((6,3),4) ??====aggrFtnOne Param===:((10,4),5)*/ ??/* ?????*aggregate的初始值為(1,1): ??????====aggrFtnOne Param===:((1,1),1) ??????====aggrFtnOne Param===:((2,2),2) ??????====aggrFtnOne Param===:((4,3),3) ??????====aggrFtnOne Param===:((7,4),4) ??????====aggrFtnOne Param===:((11,5),5) ?????* */ ??println("====aggrFtnOne Param===:"?+ par.toString()) ??val?ret:?(Int, Int)?=?(par._1._1?+ par._2, par._1._2?+?1) ??ret } ? def?aggrFtnTwo(par:?((Int, Int), (Int, Int))):?(Int, Int)?=?{ ??/*aggregate的初始值為(0,0):::::((0,0),(15,5))*/ ??/*aggregate的初始值為(1,1):::::((1,1),(16,6))*/ ??println("====aggrFtnTwo Param===:"?+ par.toString()) ??val?ret:?(Int, Int)?=?(par._1._1?+ par._2._1, par._1._2?+ par._2._2) ??ret } ? ??val?rddAggr2:?(Int, Int)?=?rddInt.aggregate((0,?0))((x, y)?=> aggrFtnOne(x, y), (x, y)?=> aggrFtnTwo(x, y))?// 參數(shù)可以省略元組的括號(hào) ??println("====aggregate 2====:"?+ rddAggr2.toString())?// (15,5) ? ??val?rddAggr3:?(Int, Int)?=?rddInt.aggregate((1,?1))((x, y)?=> aggrFtnOne((x, y)), (x, y)?=> aggrFtnTwo((x, y)))?// 參數(shù)使用元組的括號(hào) ??println("====aggregate 3====:"?+ rddAggr3.toString())?// (17,7) |
由以上代碼我們就可以清晰看到aggregate方法的實(shí)際運(yùn)算過程了。
aggrFtnOne方法的參數(shù)格式是((Int, Int), Int),這個(gè)復(fù)合二元組里第二個(gè)元素才是實(shí)際的值,而第一個(gè)元素就是我們給出的初始化值,第一個(gè)元素里的第一個(gè)值就是我們實(shí)際求和的值,里面第二個(gè)元素就是累計(jì)記錄元素個(gè)數(shù)的值。
aggrFtnTwo方法的參數(shù)里的二元組第一個(gè)元素還是初始化值,第二個(gè)元素則是aggrFtnOne計(jì)算的結(jié)果,這樣我們就可以計(jì)算出我們要的結(jié)果。
作為對比我將初始化參數(shù)改為(1,1)二元組,最終結(jié)果在求和運(yùn)算以及計(jì)算元素個(gè)數(shù)上都會(huì)加2,這是因?yàn)槌跏蓟祪纱螀⑷肭蠛退碌?#xff0c;由上面代碼我們可以很清晰的看到原因所在。
如果我們想要結(jié)果二元組里第一個(gè)元素求積那么初始化值就不能是(0,0),而應(yīng)該是(1,0),理解了原理我們就很清晰知道初始值該如何設(shè)定了,具體代碼如下:
| 1 2 | val?rddAggr4:?(Int, Int)?=?rddInt.aggregate((1,?0))((x, y)?=> (x._1?* y, x._2?+?1), (x, y)?=> (x._1?* y._1, x._2?+ y._2)) println("====aggregate 4====:"?+ rddAggr4.toString())?// (120,5) |
2) fold(zero)(func)
該函數(shù)和reduce函數(shù)功能一樣,只不過使用時(shí)候需要加上初始化值。
代碼如下所示:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | def?foldFtn(par:?(Int, Int)):?Int?=?{ ??/*fold初始值為0: ??????=====foldFtn Param====:(0,1) ??????=====foldFtn Param====:(1,2) ??????=====foldFtn Param====:(3,3) ??????=====foldFtn Param====:(6,4) ??????=====foldFtn Param====:(10,5) ??????=====foldFtn Param====:(0,15) ?????* */ ??/* ?????* fold初始值為1: ??????=====foldFtn Param====:(1,1) ??????=====foldFtn Param====:(2,2) ??????=====foldFtn Param====:(4,3) ??????=====foldFtn Param====:(7,4) ??????=====foldFtn Param====:(11,5) ??????=====foldFtn Param====:(1,16) ?????* */ ??println("=====foldFtn Param====:"?+ par.toString()) ??val?ret:?Int?=?par._1?+ par._2 ??ret } ? ??val?rddFold2:?Int?=?rddInt.fold(0)((x, y)?=> foldFtn(x, y))?// 參數(shù)可以省略元組的括號(hào) ??println("====fold 2=====:"?+ rddFold2)?// 15 ? ??val?rddFold3:?Int?=?rddInt.fold(1)((x, y)?=> foldFtn((x, y)))?// 參數(shù)使用元組的括號(hào) ??println("====fold 3====:"?+ rddFold3)?// 17 |
我們發(fā)現(xiàn)當(dāng)初始化值為1時(shí)候,求和增加的不是1而是2,原因就是fold計(jì)算時(shí)候?yàn)榱藴慅R一個(gè)完整的二元組,在第一個(gè)元素計(jì)算以及最后一個(gè)元素計(jì)算時(shí)候都會(huì)讓初始化值湊數(shù)組成二元組,因此初始值會(huì)被使用兩遍求和,因此實(shí)際結(jié)果就不是增加1,而是增加2了。
作為對比我們看看reduce實(shí)際運(yùn)算過程,代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | def?reduceFtn(par:(Int,Int)):Int?=?{ ??/* ???* ======reduceFtn Param=====:1:2 ???????????======reduceFtn Param=====:3:3 ?????======reduceFtn Param=====:6:4 ?????======reduceFtn Param=====:10:5 ???*/ ??println("======reduceFtn Param=====:"?+ par._1?+?":"?+ par._2) ??par._1?+ par._2 } ? ??val?rddReduce1:Int?=?rddInt.reduce((x,y)?=> x + y) ??println("====rddReduce 1====:"?+ rddReduce1)// 15 ??? ??val?rddReduce2:Int?=?rddInt.reduce((x,y)?=> reduceFtn(x,y)) ??println("====rddReduce 2====:"?+ rddReduce2)// 15 |
3) combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)]
combineByKey作用是使用不同的返回類型合并具有相同鍵的值,combineByKey適用鍵值對RDD,普通RDD是沒有這個(gè)方法。
有上面定義我們看到combineByKey會(huì)經(jīng)過三輪運(yùn)算,前一個(gè)運(yùn)算步驟結(jié)果就是下一個(gè)運(yùn)算步驟的參數(shù),我們看下面的代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | def?combineFtnOne(par:Int):(Int,Int)?=?{ ??/* ???* ====combineFtnOne Param====:2 ?????====combineFtnOne Param====:5 ?????====combineFtnOne Param====:8 ?????====combineFtnOne Param====:3 ???*/ ??println("====combineFtnOne Param====:"?+ par) ??val?ret:(Int,Int)?=?(par,1) ??ret } ? def?combineFtnTwo(par:((Int,Int),Int)):(Int,Int)?=?{ ??/* ????====combineFtnTwo Param====:((2,1),12) ????====combineFtnTwo Param====:((8,1),9) ???* */ ??println("====combineFtnTwo Param====:"?+ par.toString()) ??val?ret:(Int,Int)?=?(par._1._1?+ par._2,par._1._2?+?1) ??ret } ? def?combineFtnThree(par:((Int,Int),(Int,Int))):(Int,Int)?=?{ ??/* ???* 無結(jié)果打印 ???*/ ??println("@@@@@@@@@@@@@@@@@@") ??println("====combineFtnThree Param===:"?+ par.toString()) ??val?ret:(Int,Int)?=?(par._1._1?+ par._2._1,par._1._2?+ par._2._2) ??ret } ? ??val?rddPair:?RDD[(String, Int)]?=?sc.parallelize(List(("x01",?2), ("x02",?5), ("x03",?8), ("x04",?3), ("x01",?12), ("x03",?9)),?1) ??? ??/* def combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)] */??? ??val?rddCombine1:RDD[(String,(Int,Int))]?=?rddPair.combineByKey(x?=> (x,?1), (com:?(Int, Int), x)?=> (com._1?+ x, com._2?+?1), (com1:?(Int, Int), com2:?(Int, Int))?=> (com1._1?+ com2._1, com1._2?+ com2._2)) ??println("====combineByKey 1====:"?+ rddCombine1.collect().mkString(","))?// (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1)) ??? ??val?rddCombine2:RDD[(String,(Int,Int))]?=?rddPair.combineByKey(x?=> combineFtnOne(x), (com:?(Int, Int), x)?=> combineFtnTwo(com,x), (com1:?(Int, Int), com2:?(Int, Int))?=> combineFtnThree(com1,com2)) ??println("=====combineByKey 2====:"?+ rddCombine2.collect().mkString(","))?// (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1)) |
這個(gè)算法和上面aggregate求和方法很像,不過combineByKey很奇怪,它第三個(gè)算子似乎并沒有被執(zhí)行,第二個(gè)算子打印的信息也不齊備,不過我認(rèn)為它們都執(zhí)行了,只不過有些語句沒有打印出來,至于原因?yàn)楹?#xff0c;我以后再研究下吧。
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Spark笔记:复杂RDD的API的理解(上)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark笔记:RDD基本操作(上)
- 下一篇: Spark笔记:复杂RDD的API的理解