日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark常用函数比较

發布時間:2025/3/20 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark常用函数比较 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

算法分類:轉換(transformation)和執行(action)

查看算子使用demo

coalesce & repartition & partitionBy

reparation是coalesce的特殊情況 ,reparation會將coalesce中的shuffle參數設置為true,會使用HashPartitioner重新混洗分區,如果原有分區數據不均勻可以用reparation來重新混洗分區,使數據均勻分布,重新混洗過的分區和新的分區時寬依賴的關系

coalesce shuffle參數為false的情況 不會重新混洗分區,它是合并分區,比如把原來1000個分區合并成100個,父rdd和子rdd是窄依賴,

coalesce當shuffle參數設置為false時,如果設置的新partition數量大于之前的,則按照之前的分區數量重新分區。如果shuffle參數設置為true則效果和repartition一致。

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true) }

partitionBy需要指定分區函數和分區數量

var rdd2=rdd.partitionBy(new HashPartitioner(2))
range
// range函數是閉開區間[) range(1,4,1) //輸出:1,2,3 // to 函數是閉閉區間[] sc.makeRDD(1 to 5,2) // 輸出:1,2,3,4,5
zip & zipWithIndex & zipWithUniqueId

zip

1.如果兩個RDD分區數不同,則拋出異常:Can’t zip RDDs with unequal numbers of partitions

2.如果兩個RDD的元素個數不同,則拋出異常:Can only zip RDDs with same number of elements in each partition

zipPartitions

zipPartitions函數將多個RDD按照partition組合成為新的RDD。

該函數需要組合的RDD具有相同的分區數,但對于每個分區內的元素數量沒有要求

var rdd1=sparkSession.range(1,4,1).rdd var rdd2=sparkSession.range(4,7,1).rdd var rdd3=sparkSession.range(7,10,1).rdd // zip函數用于將兩個RDD組合成Key/Value形式的RDD,這里默認兩個RDD的partition數量以及每個partition的元素數量都相同,否則會拋出異常。 var rdd5=rdd1 zip rdd2 zip rdd3 /*** +-----+---+* | _1| _2|* +-----+---+* |[1,4]| 7|* |[2,5]| 8|* |[3,6]| 9|* +-----+---+*/ // 該函數將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。 var rdd6=rdd1.zipWithIndex/*** +---+---+* | _1| _2|* +---+---+* | 1| 0|* | 2| 1|* | 3| 2|* +---+---+*/ var rdd7=sparkSession.range(1,10,2).rdd // 該函數將RDD中元素和一個唯一ID組合成鍵/值對,該唯一ID生成算法如下: // 每個分區中第一個元素的唯一ID值為:該分區索引號 // 每個分區中第N個元素的唯一ID值為:(前一個元素的唯一ID值) + (該RDD總的分區數) var rdd8=rdd7.zipWithUniqueId()/*** +---+---+* | _1| _2|* +---+---+* | 1| 0|* | 3| 2|* | 5| 1|* | 7| 3|* | 9| 5|* +---+---+*/
mapPartitionsWithIndex
var rdd1 = sparkSession.sparkContext.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (4, "D")),2)// 函數作用同mapPartitions相同,不過提供了兩個參數,第一個參數為分區的索引var rdd2 = rdd1.mapPartitionsWithIndex {(partIdx, iter) => {var part_map = scala.collection.mutable.Map[String, List[(Int, String)]]()while (iter.hasNext) {var part_name = "part_" + partIdxvar elem = iter.next()if (part_map.contains(part_name)) {var elems = part_map(part_name)elems ::= elempart_map(part_name)=elems} else {part_map(part_name) = List[(Int, String)] {elem}}}part_map.iterator}}.collect()/*** +------+--------------+* | _1| _2|* +------+--------------+* |part_0|[[2,B], [1,A]]|* |part_1|[[4,D], [3,C]]|* +------+--------------+*/
map & mapValues
var rdd1=sparkSession.sparkContext.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (4, "D")),2) // 對[K,V]整體操作 var rdd3=rdd1.map(_+"_").foreach(println(_)) /*** (1,A)_* (3,C)_* (2,B)_* (4,D)_*/ var rdd2=rdd1.mapValues(_+"_")/*** +---+---+* | _1| _2|* +---+---+* | 1| A_|* | 2| B_|* | 3| C_|* | 4| D_|* +---+---+*/// 鍵值對轉換rdd1.map(_.swap).foreach(println(_))/*** (C,3)* (D,4)* (A,1)* (B,2)*/// 使用map實現mapValues rdd1.map(x=>(x._1,x._2+"_")).foreach(println(_))/*** (1,A_)* (2,B_)* (3,C_)* (4,D_)*/
flodByKey
val rdd4=sparkSession.sparkContext.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) val rdd5=rdd4.foldByKey(2)(_+_).collect()/*** +---+---+* | _1| _2|* +---+---+* | B| 5|* | A| 4|* | C| 3|* +---+---+*/
groupByKey & reduceByKey & aggregateByKey & flodByKey

reduceByKey現在map過程中先進行聚合,再到reduce端聚合,減少數據太大帶來的壓力,減小RPC過程中的傳輸壓力。groupByKey是直接在reduce端進行聚合的,所以效率比reduceByKey低。

foldByKey和reduceByKey的功能是相似的,都是在map端先進行聚合,再到reduce聚合。不同的是flodByKey需要傳入一個參數。該參數是計算的初始值。

groupByKey是對每個key進行合并操作,但只生成一個sequence,groupByKey本身不能自定義操作函數。spark只能先將所有的鍵值對都移動,這樣的后果是集群節點之間的開銷很大,導致傳輸延時,詳情。

val words = Array("one", "two", "two", "three", "three", "three") val wordsRDD = sparkSession.sparkContext.parallelize(words).map(word => (word, 1)) val wordsCountWithGroup = wordsRDD.groupByKey().map(w => (w._1, w._2.sum)).collect() val wordsCountWithReduce = wordsRDD.reduceByKey(_ + _).collect() val wordsCountWithAggregate=wordsRDD.aggregateByKey(0)((u:Int,v)=>u+v,_+_).foreach(println)// aggregate簡寫seqOp和comOp使用同一個函數 val wordsCountWithFlod=wordsRDD.flodByKey(0)(_+_) val wordsCountWithCombe=wordsRDD.combineByKey((v: Int) => v,(c: Int, v: Int) => c+v,(c1: Int, c2: Int) => c1 + c2).collect
combineByKey

注意:

  • 同一個partition才會走mergeValue
  • 不同partition才會走mergeCombiners
  • /*** 參考:* https://www.jianshu.com/p/d7552ea4f882* https://cloud.tencent.com/developer/ask/98711* 該函數用于將RDD[K,V]轉換成RDD[K,C],這里的V類型和C類型可以相同也可以不同。** def combineByKey[C](* createCombiner: V => C,* mergeValue: (C, V) => C,* mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {* combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)* }** 參數的含義如下:* createCombiner:組合器函數,用于將V類型轉換成C類型,輸入參數為RDD[K,V]中的V,輸出為C* mergeValue:在每個分區上執行;合并值函數,將一個C類型和一個V類型值合并成一個C類型,輸入參數為(C,V),輸出為C,* mergeCombiners:將不同分區的結果合并;合并組合器函數,用于將兩個C類型值合并成一個C類型,輸入參數為(C,C),輸出為C* numPartitions:結果RDD分區數,默認保持原有的分區數* partitioner:分區函數,默認為HashPartitioner* mapSideCombine:是否需要在Map端進行combine操作,類似于MapReduce中的combine,默認為true* serializer:序列化類,默認為null*/// 對各個科目求平均值val scores = sparkSession.sparkContext.makeRDD(List(("chinese", 88) , ("chinese", 90) , ("math", 60), ("math", 87)),2)var avgScoresRdd=scores.combineByKey((x:Int)=>(x,1),(c:(Int,Int),x:Int)=>(c._1+x,c._2+1),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))sparkSession.createDataFrame(avgScoresRdd).show()var avgScores=avgScoresRdd.map{ case (key, value) => (key, value._1 / value._2.toFloat) }//.map(x=>(x,(x._1/x._2))
    cogroup & union

    cogroup相當于SQL中的全外連接full outer join,返回左右RDD中的記錄,關聯不上的為空。可指定分區數和分區函數,返回的是key和每個RDD的迭代器

    def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] var rdd1 = sparkSession.sparkContext.makeRDD(Array(("A","1"),("B","2")),2) var rdd2 = sparkSession.sparkContext.makeRDD(Array(("A","3"),("C","4")),2) var rdd3 = sparkSession.sparkContext.makeRDD(Array(("A","5"),("C","6"),("D","8")),2) rdd1.cogroup(rdd2,rdd3).collect().foreach(x=>println("("+x._1+","+x._2._1+","+x._2._2+x._2._3+")"))/*** output:* (B,CompactBuffer(2),CompactBuffer()CompactBuffer())* (D,CompactBuffer(),CompactBuffer()CompactBuffer(8))* (A,CompactBuffer(1),CompactBuffer(3)CompactBuffer(5))* (C,CompactBuffer(),CompactBuffer(4)CompactBuffer(6))*/ rdd1.union(rdd2).collect().foreach(x=>println("("+x._1+","+x._2)+")")
    jion
    // join相當于SQL中的內關聯join,只返回兩個RDD根據K可以關聯上的結果,join只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。 def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] // leftOuterJoin類似于SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可 def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] // rightOuterJoin類似于SQL中的有外關聯right outer join,返回結果以參數中的RDD為主,關聯不上的記錄為空。只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。 def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] // cogroup相當于SQL中的全外連接full outer join,返回左右RDD中的記錄,關聯不上的為空。 def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

    注意:

    rdd1.leftOuterJoin(rdd2)和rdd2.rightOuterJoin(rdd1)的結果是相同的,但是輸出格式是不一致的,不管是left jion還是right jion,輸出結果都是先輸出左邊的rdd對應的列,再輸出右邊的RDD對象的列

    union & intersection & subtract

    subtractByKey和基本轉換操作中的subtract類似,返回在主RDD中出現,并且不在otherRDD中出現的元素,可指定輸出分區數量和分區函數。

    transformation
    • map/mapValues/flatMap/mapPartitions/mapPartitionsWithIndex
    • filter
    • distinct:并局部無序而整體有序返回
    action
    • rdd.foreach
    • rdd.first
    • rdd.take(10): 從第一個分區的第一行數據開始取,不排序
    • rdd.takeOrdered(10):與top函數類似,但是與top函數的排序方式相反
    • rdd.top(10):默認按照降序的方式取前10個元素,可自定義排序規則
    • rdd.sortBy(x=>x._2,true):按照RDD第二列進行升序排列(false為降序)
    • rdd.countByValue():countByValue()函數與tuple元組中的(k,v)中的v 沒有關系,這點要搞清楚,countByValue是針對Rdd中的每一個元素對象。
    • rdd.aggregate(1)({(x:Int,y:Int)=>x+y},{(sum1:Int,sum2:Int)=>sum1+sum2})
    • rdd. fold(1)()(x:Int,y:Int)=>x+y): aggregate簡寫seqOp和comOp使用同一個函數
    • saveAsTextFile,saveAsObjectFile,saveAsSequenceFile
    • rdd.takeSample
    sparkSql
    object aggregatesFun extends Catalogs_Tutorial{import org.apache.spark.sql.functions._questionsDataFrame.filter("id > 400 and id< 450").filter("owner_userid is not null").join(dfTags,dfQuestions.col("id").equalTo(dfTags("id"))).groupBy(dfQuestions.col("owner_userid")).agg( avg("score"),max("answer_count")) // .sparkSession.conf.set("retainGroupColumns",false) // 結果是否展示分組字段.show() } +------------+----------+-----------------+ |owner_userid|avg(score)|max(answer_count)| +------------+----------+-----------------+ | 268| 26.0| 1| | 136| 57.6| 9| | 123| 20.0| 3| +------------+----------+-----------------+
    統計函數
    • 基本統計函數:avg,mean,max,min,sum
    • 高級統計函數:皮爾遜相關性(corr),協方差(cov),頻繁項(freqItems),交叉表(crosstabe),行列轉換(透視(pivot)),抽樣(sample)分層抽樣(sampleBy),詞頻統計(countMinSketch),布隆過濾器
    • 顯示對dataFrame的統計結果:describe,包含標準差(stddev)和avg,max,min,count
    手寫wordCount
    object LocalWorldCount {def main(args: Array[String]): Unit = {val conf=new SparkConf()conf.setAppName("my first spark local App")conf.setMaster("local")val sc=new SparkContext(conf)val lines=sc.textFile("file:\\E:\\data\\worldCount.txt")val words=lines.flatMap(line=>line.split(" "))val pairs=words.map(word=>(word,1))val worldCount=pairs.reduceByKey(_+_)val sortedWordCount=worldCount.map(pair=>(pair._2,pair._1)).sortByKey(true).map(pair=>(pair._2,pair._1))sortedWordCount.collect.foreach(println)sc.stop()} } // 對應sql lines.
    算子選擇

    mapPartitions/reduceByKey/foreachPartition/

    使用filter之后進行coalesce操作。

    使用repartitionAndSortWithinPartitions替代repartition與sort類操作。

    repartitionAndSortWithinPartitions是Spark官網推薦的一個算子。官方建議,如果是需要在repartition重分區之后還要進行排序,就可以直接使用repartitionAndSortWithinPartitions算子。因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。

    轉載于:https://my.oschina.net/freelili/blog/3037961

    總結

    以上是生活随笔為你收集整理的spark常用函数比较的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。