日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Spark SQL之RDD转DataFrame

發布時間:2024/9/16 数据库 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark SQL之RDD转DataFrame 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

準備文件

首先準備好測試文件info.txt,內容如下:

1,vincent,20 2,sarah,19 3,sofia,29 4,monica,26

將RDD轉成DataFrame

方式一:反射

可以使用反射來推斷包含了特定數據類型的RDD的元數據
代碼如下:

package cn.ac.iie.sparkimport org.apache.spark.sql.SparkSession/*** DataFrame和RDD的互操作*/ object DataFrameRDDApp {def main(args: Array[String]): Unit = {val sparkSessionApp = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()// 將RDD轉成DataFrameval rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")// 注意需要導入隱式轉換import sparkSessionApp.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()sparkSessionApp.close()}case class Info(id:Int, name:String, age:Int){} }


當得到DataFrame之后就可以進行其他的相應操作了,例如進行過濾:infoDF.filter(infoDF.col("age") > 25).show():輸出如下:

隨后可以將DataFrame轉成一張表。
我們可以通過infoDF.createOrReplaceTempView("infos")注冊成一張表,好處就是可以直接通過SQL的方式進行處理。

infoDF.createOrReplaceTempView("infos")sparkSessionApp.sql("select * from infos where age > 25").show()

方式二:編程方式

當我們的Schema并不能提前定義時,就需要這種方式來實現了。這種方式必須要遵從如下三個步驟:

  • 創建一個Rows的RDD
  • 定義一個Schema(使用StructType)
  • 使用createDataFrame將schema作用于Rows
  • 代碼試下如下:

    package cn.ac.iie.sparkimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession}/*** DataFrame和RDD的互操作*/ object DataFrameRDDApp {def main(args: Array[String]): Unit = {val sparkSessionApp = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()// infoReflection(sparkSessionApp)program(sparkSessionApp)sparkSessionApp.close()}private def program(sparkSessionApp: SparkSession) = {val rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))val structType = StructType(Array(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("age", IntegerType, true)))val infoDF = sparkSessionApp.createDataFrame(infoRDD, structType)infoDF.printSchema()infoDF.show()}private def infoReflection(sparkSessionApp: SparkSession) = {// 將RDD轉成DataFrameval rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")// 注意需要導入隱式轉換import sparkSessionApp.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()infoDF.filter(infoDF.col("age") > 25).show()infoDF.createOrReplaceTempView("infos")sparkSessionApp.sql("select * from infos where age > 25").show()}case class Info(id:Int, name:String, age:Int){} }

    這種方式拿到DataFrame之后,依然可以進行其他的相關API操作。

    兩種方式的優缺點

    DataFrame和RDD互操作的兩種方式:

    反射:case class。

    這種方式事先需要知道你的字段、字段類型

    編程方式:Row

    如果第一種情況不能滿足要求,無法事先知道字段與類型
    優先考慮第一種方式。因為實現較為簡單。

    總結:DataFrame = RDD + Schema

    RDD僅僅知道里面裝的是什么對象(user),但是無法知道這個user里有哪些屬性,以及屬性的字段是什么類型的。所以我們直接處理RDD是有一定的困難,因此需要自己執行Schema表結構,將Schema作用于RDD中,就可以看做是一個表了。接下來就可以方便的進行操作了。
    同時DataFrame優勢:DataFrame底層使用了Catalyst進行優化。
    DataFrame還支持text、json、parquet以及其他外部數據源格式。將外部數據源的數據注冊到sparksql中,成為DataFrame,然后就可以使用DataFrame自身提供的API進行操作了?;蛘呖梢宰猿梢粡埍韴绦衧ql語句。執行自己的API或sql,最終形成的邏輯執行計劃都是一樣的。

    總結

    以上是生活随笔為你收集整理的Spark SQL之RDD转DataFrame的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。