日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount

發(fā)布時(shí)間:2023/11/28 生活经验 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄

案例一:花式查詢

案例二:WordCount

基于DSL編程

基于SQL編程

具體演示代碼如下:


?

案例一:花式查詢

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示SparkSQL的各種花式查詢*/
object FlowerQueryDemo {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準(zhǔn)備環(huán)境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數(shù)據(jù)val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個(gè)Array)轉(zhuǎn)為樣例類(相當(dāng)于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉(zhuǎn)為DataFrame(DF)//注意:RDD的API中沒有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!import spark.implicits._val personDF: DataFrame = personRDD.toDF//6.查看約束personDF.printSchema()//7.查看分布式表中的數(shù)據(jù)集personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長的時(shí)候不會用...代替//演示SQL風(fēng)格查詢//0.注冊表名//personDF.registerTempTable("t_person")//已經(jīng)過時(shí)//personDF.createTempView("t_person")//創(chuàng)建表,如果已存在則報(bào)錯(cuò):TempTableAlreadyExistsException//personDF.createOrReplaceGlobalTempView("t_person")//創(chuàng)建全局表,可以夸session使用,查詢的時(shí)候使用:SELECT * FROM global_temp.表名;生命周期太大,一般不用personDF.createOrReplaceTempView("t_person")//創(chuàng)建一個(gè)臨時(shí)表,只有當(dāng)前session可用!且表如果存在會替換!//1.查看name字段的數(shù)據(jù)spark.sql("select name from t_person").show//2.查看?name 和age字段數(shù)據(jù)spark.sql("select name,age from t_person").show//3.查詢所有的name和age,并將age+1spark.sql("select name,age,age+1 from t_person").show//4.過濾age大于等于25的spark.sql("select name,age from t_person where age >=25").show//5.統(tǒng)計(jì)年齡大于30的人數(shù)spark.sql("select count(age) from t_person where age >30").show//6.按年齡進(jìn)行分組并統(tǒng)計(jì)相同年齡的人數(shù)spark.sql("select age,count(age) from t_person group by age").show//演示DSL風(fēng)格查詢//1.查看name字段的數(shù)據(jù)import org.apache.spark.sql.functions._personDF.select(personDF.col("name")).showpersonDF.select(personDF("name")).showpersonDF.select(col("name")).showpersonDF.select("name").show//2.查看?name 和age字段數(shù)據(jù)personDF.select(personDF.col("name"),personDF.col("age")).showpersonDF.select("name","age").show//3.查詢所有的name和age,并將age+1//personDF.select("name","age","age+1").show//錯(cuò)誤,沒有age+1這一列//personDF.select("name","age","age"+1).show//錯(cuò)誤,沒有age1這一列personDF.select(col("name"),col("age"),col("age")+1).showpersonDF.select($"name",$"age",$"age"+1).show//$表示將"age"變?yōu)榱肆袑ο?先查詢再和+1進(jìn)行計(jì)算personDF.select('name,'age,'age+1).show//'表示將age變?yōu)榱肆袑ο?先查詢再和+1進(jìn)行計(jì)算//4.過濾age大于等于25的,使用filter方法/where方法過濾personDF.select("name","age").filter("age>=25").showpersonDF.select("name","age").where("age>=25").show//5.統(tǒng)計(jì)年齡大于30的人數(shù)personDF.where("age>30").count()//6.按年齡進(jìn)行分組并統(tǒng)計(jì)相同年齡的人數(shù)personDF.groupBy("age").count().show}}

?

???????案例二:WordCount

前面使用RDD封裝數(shù)據(jù),實(shí)現(xiàn)詞頻統(tǒng)計(jì)WordCount功能,從Spark 1.0開始,一直到Spark 2.0,建立在RDD之上的一種新的數(shù)據(jù)結(jié)構(gòu)DataFrame/Dataset發(fā)展而來,更好的實(shí)現(xiàn)數(shù)據(jù)處理分析。DataFrame 數(shù)據(jù)結(jié)構(gòu)相當(dāng)于給RDD加上約束Schema,知道數(shù)據(jù)內(nèi)部結(jié)構(gòu)(字段名稱、字段類型),提供兩種方式分析處理數(shù)據(jù):DataFrame API(DSL編程)和SQL(類似HiveQL編程),下面以WordCount程序?yàn)槔幊虒?shí)現(xiàn),體驗(yàn)DataFrame使用。

基于DSL編程

使用SparkSession加載文本數(shù)據(jù),封裝到Dataset/DataFrame中,調(diào)用API函數(shù)處理分析數(shù)據(jù)(類似RDD中API函數(shù),如flatMap、map、filter等),編程步驟:

?第一步、構(gòu)建SparkSession實(shí)例對象,設(shè)置應(yīng)用名稱和運(yùn)行本地模式;

?第二步、讀取HDFS上文本文件數(shù)據(jù);

?第三步、使用DSL(Dataset?API),類似RDD?API處理分析數(shù)據(jù);

?第四步、控制臺打印結(jié)果數(shù)據(jù)和關(guān)閉SparkSession;

?

基于SQL編程

也可以實(shí)現(xiàn)類似HiveQL方式進(jìn)行詞頻統(tǒng)計(jì),直接對單詞分組group by,再進(jìn)行count即可,步驟如下:

?第一步、構(gòu)建SparkSession對象,加載文件數(shù)據(jù),分割每行數(shù)據(jù)為單詞;

?第二步、將DataFrame/Dataset注冊為臨時(shí)視圖(Spark 1.x中為臨時(shí)表);

?第三步、編寫SQL語句,使用SparkSession執(zhí)行獲取結(jié)果;

?第四步、控制臺打印結(jié)果數(shù)據(jù)和關(guān)閉SparkSession;

?

具體演示代碼如下:

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Author itcast* Desc 使用SparkSQL完成WordCount---SQL風(fēng)格和DSL風(fēng)格*/
object WordCount {def main(args: Array[String]): Unit = {//1.準(zhǔn)備環(huán)境val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._//2.加載數(shù)據(jù)//val rdd: RDD[String] = sc.textFile("data/input/words.txt")//可以使用該方式,然后使用昨天的知識將rdd轉(zhuǎn)為df/dsval df: DataFrame = spark.read.text("data/input/words.txt")val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")//df.show()//查看分布式表數(shù)據(jù)//ds.show()//查看分布式表數(shù)據(jù)//3.做WordCount//切割//df.flatMap(_.split(" ")) //注意:直接這樣寫報(bào)錯(cuò)!因?yàn)閐f沒有泛型,不知道_是String!//df.flatMap(row=>row.getAs[String]("value").split(" "))val wordsDS: Dataset[String] = ds.flatMap(_.split(" "))//wordsDS.show()//使用SQL風(fēng)格做WordCountwordsDS.createOrReplaceTempView("t_words")val sql:String ="""|select value,count(*) as count|from t_words|group by value|order by count desc|""".stripMarginspark.sql(sql).show()//使用DSL風(fēng)格做WordCountwordsDS.groupBy("value").count().orderBy($"count".desc).show()/*+-----+-----+|value|count|+-----+-----+|hello| ???4|| ?her| ???3|| ?you| ???2|| ??me| ???1|+-----+-----++-----+-----+|value|count|+-----+-----+|hello| ???4|| ?her| ???3|| ?you| ???2|| ??me| ???1|+-----+-----+*/}
}

?

無論使用DSL還是SQL編程方式,底層轉(zhuǎn)換為RDD操作都是一樣,性能一致,查看WEB UI監(jiān)控中Job運(yùn)行對應(yīng)的DAG圖如下:

?

從上述的案例可以發(fā)現(xiàn)將數(shù)據(jù)封裝到Dataset/DataFrame中,進(jìn)行處理分析,更加方便簡潔,這就是Spark框架中針對結(jié)構(gòu)化數(shù)據(jù)處理模:Spark SQL模塊。

官方文檔:http://spark.apache.org/sql/

總結(jié)

以上是生活随笔為你收集整理的2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。