日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark sql中的窗口函数

發布時間:2024/1/17 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark sql中的窗口函数 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

databricks博客給出的窗口函數概述

Spark SQL supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions. The available ranking functions and analytic functions are summarized in the table below. For aggregate functions, users can use any existing aggregate function as a window function.

窗口函數包含3種:

  • ranking 排名類
  • analytic 分析類
  • aggregate 聚合類
  • ranking 和 analytic 見下表,所有已經存在的聚合類函數(sum、avg、max、min)都可以作為窗口函數。

    |Function Type| SQL| DataFrame API| |--|--|--| |Ranking |rank | rank | |Ranking |dense_rank|denseRank| |Ranking |percent_rank |percentRank| |Ranking |ntile|ntile| |Ranking |row_number|rowNumber| |Analytic |cume_dist|cumeDist| |Analytic |first_value |firstValue| |Analytic |last_value |lastValue| |Analytic |lag|lag| |Analytic |lead|lead|

    先用案例說明

    案例數據:/root/score.json/score.json,學生名字、課程、分數

    {"name":"A","lesson":"Math","score":100} {"name":"B","lesson":"Math","score":100} {"name":"C","lesson":"Math","score":99} {"name":"D","lesson":"Math","score":98} {"name":"A","lesson":"E","score":100} {"name":"B","lesson":"E","score":99} {"name":"C","lesson":"E","score":99} {"name":"D","lesson":"E","score":98} ./spark-shell --master local #本地啟動spark-shell import org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._import org.apache.spark.sql.hive.HiveContextsc.setLogLevel("WARN") // 日志級別,可不改val hiveContext = new HiveContext(sc)val df = hiveContext.read.json("file:///root/score.json")case class Score(val name: String, val lesson: String, val score: Int)df.registerTempTable("score") // 注冊臨時表// SQL語句val stat = "select".concat(" name,lesson,score, ").concat(" ntile(2) over (partition by lesson order by score desc ) as ntile_2,").concat(" ntile(3) over (partition by lesson order by score desc ) as ntile_3,").concat(" row_number() over (partition by lesson order by score desc ) as row_number,").concat(" rank() over (partition by lesson order by score desc ) as rank, ").concat(" dense_rank() over (partition by lesson order by score desc ) as dense_rank, ").concat(" percent_rank() over (partition by lesson order by score desc ) as percent_rank ").concat(" from score ").concat(" order by lesson,name,score")hiveContext.sql(stat).show // 執行語句得到的結果/*** 用DataFrame API的方式完成相同的功能。 **/val window_spec = Window.partitionBy("lesson").orderBy(df("score").desc) // 窗口函數中公用的子句df.select(df("name"), df("lesson"), df("score"),ntile(2).over(window_spec).as("ntile_2"),ntile(3).over(window_spec).as("ntile_3"),row_number().over(window_spec).as("row_number"),rank().over(window_spec).as("rank"),dense_rank().over(window_spec).as("dense_rank"),percent_rank().over(window_spec).as("percent_rank")).orderBy("lesson", "name", "score").show
    • 輸出結果完全一樣,如下表所示
    namelessonscorentile_2ntile_3row_numberrankdense_rankpercent_rank
    AE100111110.0
    BE99112220.3333333333333333
    CE99223220.3333333333333333
    DE98234431.0
    AMath100111110.0
    BMath100112110.0
    CMath99223320.6666666666666666
    DMath98234431.0
    • rank遇到相同的數據則rank并列,因此rank值可能是不連續的
    • dense_rank遇到相同的數據則rank并列,但是rank值一定是連續的
    • row_number 很單純的行號,類似excel的行號,不會因為數據相同而rank的值重復或者有間隔
    • percent_rank = 相同的分組中 (rank -1) / ( count(score) - 1 )
    • ntile(n) 是將同一組數據 循環的往n個 桶中放,返回對應的桶的index,index從1開始。
    • 結合官方博客的python調用dataframe API的寫法可知,scala的寫法幾乎和python的一樣。官方博客的地址見最下面的參考。

    上面的案例,每個分組中所有的數據都參與到窗口函數中計算了。考慮下面一種場景:

  • 各科成績 與 該科成績的 最高分、最高分、平均分相差多少。每一行與此行所屬分組聚合后的值再做計算。參與窗口計算的數據是絕對的,就是此行所屬的窗口內的所有數據。
  • 各科成績按從高到低排序后,比前一名相差多少。每一行與此行的前一行的值相關。參與窗口計算的數據是相對于當前行的。
  • // 各科成績和最高分、最高分、平均分差多少分// 各科成績按從高到低排序后,比前一名差多少分val window_clause = Window.partitionBy(df("lesson")).orderBy(df("score").desc)val window_spec2 = window_clause.rangeBetween(-Int.MaxValue, Int.MaxValue) // 絕對范圍val window_spec3 = window_clause.rowsBetween(-1, 0) // 相對范圍,-1:當前行的前一行,df.select(df("name"),df("lesson"),df("score"),// 窗口內的第一行的score-當前的行score(df("score") - first("score").over(window_spec3)).as("score-last_score"), // 各科成績和最高分、最高分、平均分差多少分(min(df("score")).over(window_spec2)).as("min_score"),(df("score") - min(df("score")).over(window_spec2)).as("score-min"),(max(df("score")).over(window_spec2)).as("max_score"),(df("score") - max(df("score")).over(window_spec2)).as("score-max"),(avg(df("score")).over(window_spec2)).as("avg_score"),(df("score") - avg(df("score")).over(window_spec2)).as("score-avg")).orderBy("lesson", "name", "score").show namelessonscorescore-last_scoremin_scorescore-minmax_scorescore-maxavg_scorescore-avg
    AE1000982100099.01.0
    BE99-1981100-199.00.0
    CE990981100-199.00.0
    DE98-1980100-299.0-1.0
    AMath1000982100099.250.75
    BMath1000982100099.250.75
    CMath99-1981100-199.25-0.25
    DMath98-1980100-299.25-1.25

    未完待續

    • Analytic functions類型的解析
    • 源碼解析

    參考:

  • percent_rank
  • databricks博客
  • 轉載于:https://my.oschina.net/corleone/blog/755393

    總結

    以上是生活随笔為你收集整理的spark sql中的窗口函数的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。