日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作

發布時間:2023/11/28 生活经验 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

目錄

?RDD、DF、DS相關操作

SparkSQL初體驗

SparkSession 應用入口

獲取DataFrame/DataSet

使用樣例類

指定類型+列名

自定義Schema

???????RDD、DF、DS相互轉換


RDD、DF、DS相關操作

SparkSQL初體驗

Spark 2.0開始,SparkSQL應用程序入口為SparkSession,加載不同數據源的數據,封裝到DataFrame/Dataset集合數據結構中,使得編程更加簡單,程序運行更加快速高效。

?

?

SparkSession 應用入口

SparkSession:這是一個新入口,取代了原本的SQLContext與HiveContext。對于DataFrame API的用戶來說,Spark常見的混亂源頭來自于使用哪個“context”。現在使用SparkSession,它作為單個入口可以兼容兩者,注意原本的SQLContext與HiveContext仍然保留,以支持向下兼容。

文檔:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#starting-point-sparksession

?1)、添加MAVEN依賴

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.5</version></dependency>

?

?

2)、SparkSession對象實例通過建造者模式構建,代碼如下:

?

?

其中①表示導入SparkSession所在的包,②表示建造者模式構建對象和設置屬性,③表示導入SparkSession類中implicits對象object中隱式轉換函數

?3)、范例演示:構建SparkSession實例,加載文本數據,統計條目數。

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** Author itcast* Desc 演示SparkSQL*/
object SparkSQLDemo00_hello {def main(args: Array[String]): Unit = {//1.準備SparkSQL開發環境println(this.getClass.getSimpleName)println(this.getClass.getSimpleName.stripSuffix("$"))val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val df1: DataFrame = spark.read.text("data/input/text")val df2: DataFrame = spark.read.json("data/input/json")val df3: DataFrame = spark.read.csv("data/input/csv")val df4: DataFrame = spark.read.parquet("data/input/parquet")df1.printSchema()df1.show(false)df2.printSchema()df2.show(false)df3.printSchema()df3.show(false)df4.printSchema()df4.show(false)df1.coalesce(1).write.mode(SaveMode.Overwrite).text("data/output/text")df2.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json")df3.coalesce(1).write.mode(SaveMode.Overwrite).csv("data/output/csv")df4.coalesce(1).write.mode(SaveMode.Overwrite).parquet("data/output/parquet")//關閉資源sc.stop()spark.stop()}
}

?

使用SparkSession加載數據源數據,將其封裝到DataFrame或Dataset中,直接使用show函數就可以顯示樣本數據(默認顯示前20條)。

Spark2.0使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來實現其對數據加載、轉換、處理等功能。SparkSession實現了SQLContext及HiveContext所有功能。 SparkSession支持從不同的數據源加載數據,并把數據轉換成DataFrame,并且支持把DataFrame轉換成SQLContext自身中的表,然后使用SQL語句來操作數據。SparkSession亦提供了HiveQL以及其他依賴于Hive的功能的支持。

?

獲取DataFrame/DataSet

?????實際項目開發中,往往需要將RDD數據集轉換為DataFrame,本質上就是給RDD加上Schema信息,官方提供兩種方式:類型推斷和自定義Schema。

官方文檔:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds

?

?

???????使用樣例類

當RDD中數據類型CaseClass樣例類時,通過反射Reflecttion獲取屬性名稱和類型,構建Schema,應用到RDD數據集,將其轉換為DataFrame。

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示基于RDD創建DataFrame--使用樣例類*/
object CreateDataFrameDemo1 {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準備環境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數據val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個Array)轉為樣例類(相當于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉為DataFrame(DF)//注意:RDD的API中沒有toDF方法,需要導入隱式轉換!import spark.implicits._val personDF: DataFrame = personRDD.toDF//6.查看約束personDF.printSchema()//7.查看分布式表中的數據集personDF.show(6,false)//false表示不截斷列名,也就是列名很長的時候不會用...代替}
}

?

此種方式要求RDD數據類型必須為CaseClass,轉換的DataFrame中字段名稱就是CaseClass中屬性名稱。

???????指定類型+列名

除了上述兩種方式將RDD轉換為DataFrame以外,SparkSQL中提供一個函數:toDF,通過指定列名稱,將數據類型為元組的RDD或Seq轉換為DataFrame,實際開發中也常常使用。

?

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示基于RDD創建DataFrame--使用類型加列名*/
object CreateDataFrameDemo2 {def main(args: Array[String]): Unit = {//1.準備環境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數據val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個Array)轉為三元組(相當于有了類型!)val personWithColumnsTypeRDD: RDD[(Int, String, Int)] = linesArrayRDD.map(arr=>(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉為DataFrame(DF)并指定列名//注意:RDD的API中沒有toDF方法,需要導入隱式轉換!import spark.implicits._val personDF: DataFrame = personWithColumnsTypeRDD.toDF("id","name","age")//6.查看約束personDF.printSchema()//7.查看分布式表中的數據集personDF.show(6,false)//false表示不截斷列名,也就是列名很長的時候不會用...代替}
}

?

???????自定義Schema

依據RDD中數據自定義Schema,類型為StructType,每個字段的約束使用StructField定義,具體步驟如下:

?第一步、RDD中數據類型為Row:RDD[Row]

?第二步、針對Row中數據定義Schema:StructType

?第三步、使用SparkSession中方法將定義的Schema應用到RDD[Row]上;

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}/*** Author itcast* Desc 演示基于RDD創建DataFrame--使用StructType*/
object CreateDataFrameDemo3 {def main(args: Array[String]): Unit = {//1.準備環境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數據val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個Array)轉為Rowval rowRDD: RDD[Row] = linesArrayRDD.map(arr=>Row(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉為DataFrame(DF)并指定列名//注意:RDD的API中沒有toDF方法,需要導入隱式轉換!import spark.implicits._/*val schema: StructType = StructType(StructField("id", IntegerType, false) ::StructField("name", StringType, false) ::StructField("age", IntegerType, false) :: Nil)*/val schema: StructType = StructType(List(StructField("id", IntegerType, false),StructField("name", StringType, false),StructField("age", IntegerType, false)))val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)//6.查看約束personDF.printSchema()//7.查看分布式表中的數據集personDF.show(6,false)//false表示不截斷列名,也就是列名很長的時候不會用...代替}}

此種方式可以更加體會到DataFrame = RDD[Row] + Schema組成,在實際項目開發中靈活的選擇方式將RDD轉換為DataFrame。

?

???????RDD、DF、DS相互轉換

實際項目開發中,常常需要對RDD、DataFrame及Dataset之間相互轉換,其中要點就是Schema約束結構信息。

?1)、RDD轉換DataFrame或者Dataset

轉換DataFrame時,定義Schema信息,兩種方式

轉換為Dataset時,不僅需要Schema信息,還需要RDD數據類型為CaseClass類型

?2)、Dataset或DataFrame轉換RDD

由于Dataset或DataFrame底層就是RDD,所以直接調用rdd函數即可轉換

dataframe.rdd 或者dataset.rdd

?3)、DataFrame與Dataset之間轉換

由于DataFrame為Dataset特例,所以Dataset直接調用toDF函數轉換為DataFrame

當將DataFrame轉換為Dataset時,使用函數as[Type],指定CaseClass類型即可。

?

?

?

RDD、DataFrame和DataSet之間的轉換如下,假設有個樣例類:case?class?Emp(name:?String),相互轉換

RDD轉換到DataFrame:rdd.toDF(“name”)RDD轉換到Dataset:rdd.map(x => Emp(x)).toDSDataFrame轉換到Dataset:df.as[Emp]DataFrame轉換到RDD:df.rddDataset轉換到DataFrame:ds.toDFDataset轉換到RDD:ds.rdd

注意:

RDD與DataFrame或者DataSet進行操作,都需要引入隱式轉換import spark.implicits._,其中的spark是SparkSession對象的名稱!


package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** Author itcast* Desc 演示基于RDD/DataFrame/DataSet三者之間的相互轉換*/
object TransformationDemo {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準備環境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數據val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個Array)轉為樣例類(相當于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉為DataFrame(DF)//注意:RDD的API中沒有toDF方法,需要導入隱式轉換!import spark.implicits._//轉換1:rdd-->dfval personDF: DataFrame = personRDD.toDF //注意:DataFrame沒有泛型//轉換2:rdd-->dsval personDS: Dataset[Person] = personRDD.toDS() //注意:Dataset具有泛型//轉換3:df-->rddval rdd: RDD[Row] = personDF.rdd //注意:DataFrame沒有泛型,也就是不知道里面是Person,所以轉為rdd之后統一的使用Row表示里面是很多行//轉換4:ds-->rddval rdd1: RDD[Person] = personDS.rdd //注意:Dataset具有泛型,所以轉為rdd之后還有原來泛型!//轉換5:ds-->dfval dataFrame: DataFrame = personDS.toDF()//轉換5:df-->dsval personDS2: Dataset[Person] = personDF.as[Person]//目前DataFrame和DataSet使用類似,如:也有show/createOrReplaceTempView/selectpersonDS.show()personDS.createOrReplaceTempView("t_person")personDS.select("name").show()}
}

?

總結

以上是生活随笔為你收集整理的2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。