日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

SparkSQL 将统计结果保存到Mysql

發布時間:2024/9/16 数据库 57 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkSQL 将统计结果保存到Mysql 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在MySQL創建表

create table day_netType_access_topn_stat ( day varchar(8) not null, uid bigint(10) not null, times bigint(10) not null, primary key (day, uid) )

查看表結構:

創建Entity

package cn.ac.iie.log/*** 每天訪問次數實體類* @param day* @param uid* @param times*/ case class DayNetTypeAccessStat (day: String, uid:Long, times:Long)

創建Dao

insert

package cn.ac.iie.logimport java.sql.{Connection, PreparedStatement}import scala.collection.mutable.ListBuffer/*** 各個維度統計的DAO操作*/ object StatDao {/*** 批量保存DayVideoAccessStat到數據庫** @param list*/def insertNetTypeAccessTopN(list: ListBuffer[DayNetTypeAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MysqlUtils.getConnection()// 設置手動提交connection.setAutoCommit(false)val sql = "insert into day_netType_access_topn_stat (day, uid, times) values (?,?,?)"pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.uid)pstmt.setLong(3, ele.times)pstmt.addBatch()}pstmt.executeBatch() // 執行批量處理// 手動提交connection.commit()} catch {case e: Exception => e.printStackTrace()} finally {MysqlUtils.release(connection, pstmt)}} }

這里insert數據時,最好使用批處理,提交使用batch操作,手動提交。

將數據保存到Mysql中

package cn.ac.iie.logimport org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._import scala.collection.mutable.ListBuffer/*** TopN 統計spark作業*/ object TopNStatJob {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enabled","false").master("local[2]").getOrCreate()val accessDF = spark.read.format("parquet").load("file:///E:/test/clean") // accessDF.printSchema()accessDF.show(false)// 最受歡迎的TopN netTypenetTypeAccessTopNStat(spark, accessDF)spark.stop}/*** 最受歡迎的TopN netType* @param spark* @param accessDF*/def netTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame): Unit = {accessDF.createOrReplaceTempView("access_logs")val wifiAccessTopNDF = spark.sql("select day,uid,count(1) as times from access_logs where day='20190702' and netType='wifi' group by day,uid order by times desc") // wifiAccessTopNDF.show(false)// 將統計結果寫入到Mysql中try{wifiAccessTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[Long]("uid")val times = info.getAs[Long]("times")list.append(DayNetTypeAccessStat(day, uid, times))})StatDao.insertNetTypeAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}} }

總結

以上是生活随笔為你收集整理的SparkSQL 将统计结果保存到Mysql的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。