Saprk排序
1、基礎排序算子sortBy和sortByKey
在Spark中存在兩種對RDD進行排序的函數,分別是 sortBy和sortByKey函數。sortBy是對標準的RDD進行排序,它是從Spark0.9.0之后才引入的。而sortByKey函數是對PairRDD進行排序,也就是有Key和Value的RDD。下面將分別對這兩個函數的實現以及使用進行說明。
1.1?sortBy
sortBy是在RDD下的
該函數最多可以傳三個參數: 第一個參數是一個函數,該函數的也有一個帶T泛型的參數,返回類型和RDD中元素的類型是一致的; 第二個參數是ascending,這參數決定排序后RDD中的元素是升序還是降序,默認是true,也就是升序; 第三個參數是numPartitions,該參數決定排序后的RDD的分區個數,默認排序后的分區個數和排序之前的個數相等,即為this.partitions.size。
從sortBy函數的實現可以看出,第一個參數是必須傳入的,而后面的兩個參數可以不傳入。而且sortBy函數函數的實現依賴于sortByKey函數,關于sortByKey函數后面會進行說明。keyBy函數也是RDD類中進行實現的,它的主要作用就是將傳進來的每個元素作用于f(x)中,并返回tuples類型的元素,也就變成了Key-Value類型的RDD了
1.2?sortByKey
sortByKey是在OrderedRDDFunctions類下的。sortByKey函數作用于Key-Value形式的RDD,并對Key進行排序
該函數返回的RDD一定是ShuffledRDD類型的,因為對源RDD進行排序,必須進行Shuffle操作,而Shuffle操作的結果RDD就是ShuffledRDD。其實這個函數的實現很優雅,里面用到了RangePartitioner,它可以使得相應的范圍Key數據分到同一個partition中,然后內部用到了mapPartitions對每個partition中的數據進行排序,而每個partition中數據的排序用到了標準的sort機制,避免了大量數據的shuffle。
在轉換調用sortByKey方法時,會從上下文中提取Ordering[K],private val ordering = implicitly[Ordering[K]]其中implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])這里的k:Ordering意思是指k必須是可以轉換成ordering的子類。下面有2種方式
//1、定義K的排序時使用隱式值 implicit val ord: Ordering[Person] = new Ordering[Person] { override def compare(x: Person, y: Person): Int = { x.name.compareTo(y.name) }}sc.textFile("").foreachRDD(rdd => rdd.map(msg => (Person(msg), msg)).sortByKey())//2、key類實現Ordered[k]接口中的compare方法 case class Person(name:String) extends Ordered[Person]{override def compare(that: Person): Int = {this.name.compare(that.name)} def run(): Unit ={println("Person...")} }sc.textFile("").foreachRDD(rdd => rdd.map(msg => (Person(msg), msg)).sortByKey())2、二次排序
二次排序就是指排序的時候考慮2個維度。如我們排序的時候按照第一個列降序排序,有一種情況,第一列的Key相同怎么排,這個時候可能就需要借助二次排序考慮第二列。
比較明智的方法是自定義排序的key,不采用其他的方式是因為現在二次排序,后面可能3次,5次等,采用自定義key的方式只要重新復寫自定義的key,就能用sortByKey。
spark要實現比較,就要實現Orderd這個排序的接口,另外一般也會序列化Serializable。sortByKey會根據Key進行排序,但是如果二次排序的話sortByKey不知道key是什么構建的想法就是,基于已有的數據自定義二次排序的key,sortByKey基于這個自定義的key進行比較。我們用mapToPair重新構造內容,加了自定義的key,value的內容就是已有的內容,根據排序然后把自定義的key去掉。
package com.quinto.sort/*** 自定義二次排序的key*/ class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {override def compare(that: SecondarySortKey): Int = {if(this.first - that.first!=0){this.first - that.first}else{this.second - that.second}} } package com.quinto.sortimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object SecondarySortApp {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SecondarySort").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("E:\\test\\a.txt")val sorted = lines.map(lines=> ( new SecondarySortKey(lines.split(" ")(0).toInt,lines.split(" ")(1).toInt),lines))sorted.sortByKey(false).map(line => line._2).collect().foreach(println)} }3、TopN
3.1 基礎TopN
排序后直接使用take算子取出前幾個,take算子后返回的是數組,不是rdd,不能用collect。
package com.quinto.sortimport org.apache.spark.{SparkConf, SparkContext}object BasicYopNApp {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("BasicYopNApp").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("E:\\test\\a.txt")//①生成k-v鍵值對方便sortByKey進行排序(Int已經實現了排序比較的接口)②降序排序③過濾出排序內容本身⑤獲取排名前5放入元素內容,元素內容構成一個Arraylines.map(line=>(line.toInt,line)).sortByKey(false).map(line=>line._2).take(5).foreach(println)} }3.2 分組TopN
分組排序就是有不同類型的數據,不同數據中每一種類型數據里面的TopN。
object GroupSortTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${GroupSortTopN.getClass.getSimpleName}").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/test.txt")//按照科目進行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目內排序,不是科目間的排序,所以需要把每個科目的信息匯總val course2Infos:RDD[(String, Iterable[String])] = course2Info.groupByKey()//按照key進行分組//分組內的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(5))}}sorted.foreach(println)sc.stop()} }3.3?優化分組TopN
上述在編碼過程當中使用groupByKey,我們說著這個算子的性能很差,因為沒有本地預聚合,所以應該在開發過程當中盡量避免使用,能用其它代替就代替。使用combineByKey模擬
object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${GroupSortByCombineByKeyTopN.getClass.getSimpleName}").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/test.txt")//按照科目進行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目內排序,不是科目間的排序,所以需要把每個科目的信息匯總val sorted= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 5) {ab.take(5)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 5) {ab.take(5)} else {ab}} }?
總結
- 上一篇: 计算机硬盘怎么设置ntfs,每次设置系统
- 下一篇: Codeforces Round #69