Spark SQL使用window进行统计
生活随笔
收集整理的這篇文章主要介紹了
Spark SQL使用window进行统计
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
在上一篇文章中,首先按照netType進行了統計,接下來添加一個條件,按照城市進行統計:
def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enabled", "false").master("local[2]").getOrCreate()val accessDF = spark.read.format("parquet").load("file:///E:/test/clean")// accessDF.printSchema()accessDF.show(false)// 最受歡迎的TopN netType// netTypeAccessTopNStat(spark, accessDF)// 按照地市進行統計TopN課程cityTypeAccessTopNStat(spark, accessDF)spark.stop} /*** 按照地市進行統計Top3課程** @param spark* @param accessDF*/def cityTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame): Unit = {val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20190702" && accessDF.col("netType") === "wifi").groupBy("day", "uid", "city").agg(count("uid").as("times")).orderBy(desc("times"))cityAccessTopNDF.show(false)// window 函數在Spark SQL的使用cityAccessTopNDF.select(cityAccessTopNDF("day"), cityAccessTopNDF("uid"), cityAccessTopNDF("city"), cityAccessTopNDF("times"), row_number().over(Window.partitionBy("city").orderBy(cityAccessTopNDF("times").desc)).as("times_rank")).filter("times_rank <= 3").show(false)}運行結果如下:
將結果寫入mysql
創建數據表:
create table day_netType_city_access_topn_stat ( day varchar(8) not null, uid bigint(10) not null, city varchar(20) not null, times bigint(10) not null, times_rank bigint(10) not null, primary key (day, uid) )創建一個Entity
package cn.ac.iie.logcase class DayCityNetTypeAccessStat(day:String, uid: Long, city:String, times: Long, times_rank: Long)創建Dao
*** 批量保存DayCityNetTypeAccessStat到數據庫** @param list*/def insertDayNetTypeCityAccessTopN(list: ListBuffer[DayCityNetTypeAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MysqlUtils.getConnection()// 設置手動提交connection.setAutoCommit(false)val sql = "insert into day_netType_city_access_topn_stat (day, uid, city, times, times_rank) values (?,?,?,?,?)"pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.uid)pstmt.setString(3, ele.city)pstmt.setLong(4, ele.times)pstmt.setLong(5, ele.times_rank)pstmt.addBatch()}pstmt.executeBatch() // 執行批量處理// 手動提交connection.commit()} catch {case e: Exception => e.printStackTrace()} finally {MysqlUtils.release(connection, pstmt)}}將結果寫入到Mysql中
// 將統計結果寫入到Mysql中try {top3DF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayCityNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval city = info.getAs[String]("city")val times = info.getAs[Long]("times")val timesRank = info.getAs[Int]("times_rank")list.append(DayCityNetTypeAccessStat(day, uid, city, times, timesRank))})StatDao.insertDayNetTypeCityAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}總結
以上是生活随笔為你收集整理的Spark SQL使用window进行统计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SparkSQL 将统计结果保存到Mys
- 下一篇: sed 删除windows下的CR/LF