简单的combineByKey算子【看完就懂系列】
代碼先行:?
val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0))) // val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,這個函數把當前的值作為參數,此時我們可以對其做些附加操作(類型轉換)并把它返回?(這一步類似于初始化操作)* mergeValue: (C, V) => C,該函數把元素V合并到之前的元素C(createCombiner)上 (這個操作在每個分區內進行)* mergeCombiners: (C, C) => C,該函數把2個元素C合并 (這個操作在不同分區間進行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)輸出結果:
88.0****** 88.0%%%%%%95.0 183.0%%%%%%88.0 93.0****** 93.0%%%%%%95.0 188.0%%%%%%98.0 98.0****** (George,271.0) (KangKang,286.0) (limu,98.0)圖示:
那么怎么走第三部呢?
mergeCombiners: (C, C) => C,該函數把2個元素C合并 (這個操作在不同分區間進行) val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3) // val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,這個函數把當前的值作為參數,此時我們可以對其做些附加操作(類型轉換)并把它返回?(這一步類似于初始化操作)* mergeValue: (C, V) => C,該函數把元素V合并到之前的元素C(createCombiner)上 (這個操作在每個分區內進行)* mergeCombiners: (C, C) => C,該函數把2個元素C合并 (這個操作在不同分區間進行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)結果展示:
88.0****** 88.0%%%%%%95.0 183.0%%%%%%88.0 271.0%%%%%%88.0 359.0%%%%%%95.0 88.0****** 88.0%%%%%%88.0 176.0%%%%%%95.0 271.0%%%%%%88.0 359.0%%%%%%88.0 95.0****** 95.0%%%%%%88.0 93.0****** 93.0%%%%%%95.0 188.0%%%%%%98.0 98.0****** 454.0@@@@@@447.0 901.0@@@@@@183.0 (George,1084.0) (limu,98.0) (KangKang,286.0)圖示:
【總結】
方法的第一個操作在相同分區相同key的時候只操作一次,然后一直進行第二個操作,如果不同分區中有相同的key值則進行第三步操作,否則不執行第三步操作。【因為第二步操作已經把結果算出來了】
友情提示:之所以我們的輸出第二步操作時沒有輸出最終結果,原因在于,為了返回值。我們把輸出語句放在了前面,也就是說輸出語句后,還有一步加的操作。
?
趁熱打鐵:
val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3)//求平均數val res: RDD[(String, (Int, Double))] = rdd.combineByKey(score => (1, score),(total: (Int, Double), newScore) => (total._1 + 1, total._2 + newScore),(total: (Int, Double), sum: (Int, Double)) => (total._1 + sum._1, total._2 + sum._2))val fin: RDD[(String, Double)] = res.map{case (name,(num,score)) => (name,score/num)}fin.foreach(println)輸出結果:
(George,90.33333333333333) (limu,98.0) (KangKang,95.33333333333333)再此總結:【誰讓它不是很好理解】
combineByKey
是針對不同partition進行操作的。它的第一個參數用于數據初始化(后面著重講),第二個是針對一個partition的combine操作函數,第三個是在所有partition都combine完畢后,針對所有臨時結果進行combine操作的函數。
友情補充:
關于數據初始化
之前有人說,初始化是對每個數據進行操作,這其實是錯誤的。應該是針對每個partition中,每個key下的第一個數據進行操作。這句話怎么理解呢?看代碼:
val rdd1 = sc.parallelize(List(1,2,2,3,3,3,3,4,4,4,4,4), 2)
val rdd2 = rdd1.map((_, 1))
val rdd3 = rdd2.combineByKey(-_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)
val rdd4 = rdd2.combineByKey(+_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)
?
rdd2.collect
rdd3.collect
rdd4.collect
?
Array((1,1), (2,1), (2,1), (3,1), (3,1), (3,1), (3,1), (4,1), (4,1), (4,1), (4,1), (4,1))
Array((4,3), (2,0), (1,-1), (3,0))
Array((4,5), (2,2), (1,1), (3,4)) ?
?
在上述代碼中,(1,1), (2,1), (2,1), (3,1), (3,1), (3,1) 被劃分到第一個partition,(3,1), (4,1), (4,1), (4,1), (4,1), (4,1) 被劃分到第二個。于是有如下操作:
?
(1, 1):由于只有1個,所以在值取負的情況下,自然輸出(1, -1)
(2, 1):由于有2個,第一個取負,第二個不變,因此combine后為(2, 0)
(3, 1):partition1中有3個,參照上述規則,combine后為(3, 1),partition2中有1個,因此combine后為(3, -1)。在第二次combine時,不會有初始化操作,因此直接相加,結果為(3, 0)
(4, 1):過程同上,結果為(4, 3)
總結
以上是生活随笔為你收集整理的简单的combineByKey算子【看完就懂系列】的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 小练习——过滤掉出现次数最多的数据
- 下一篇: Hadoop的TextInputForm