Spark SQL之RDD转DataFrame
準備文件
首先準備好測試文件info.txt,內容如下:
1,vincent,20 2,sarah,19 3,sofia,29 4,monica,26將RDD轉成DataFrame
方式一:反射
可以使用反射來推斷包含了特定數據類型的RDD的元數據
代碼如下:
當得到DataFrame之后就可以進行其他的相應操作了,例如進行過濾:infoDF.filter(infoDF.col("age") > 25).show():輸出如下:
隨后可以將DataFrame轉成一張表。
我們可以通過infoDF.createOrReplaceTempView("infos")注冊成一張表,好處就是可以直接通過SQL的方式進行處理。
方式二:編程方式
當我們的Schema并不能提前定義時,就需要這種方式來實現了。這種方式必須要遵從如下三個步驟:
代碼試下如下:
package cn.ac.iie.sparkimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession}/*** DataFrame和RDD的互操作*/ object DataFrameRDDApp {def main(args: Array[String]): Unit = {val sparkSessionApp = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()// infoReflection(sparkSessionApp)program(sparkSessionApp)sparkSessionApp.close()}private def program(sparkSessionApp: SparkSession) = {val rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))val structType = StructType(Array(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("age", IntegerType, true)))val infoDF = sparkSessionApp.createDataFrame(infoRDD, structType)infoDF.printSchema()infoDF.show()}private def infoReflection(sparkSessionApp: SparkSession) = {// 將RDD轉成DataFrameval rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")// 注意需要導入隱式轉換import sparkSessionApp.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()infoDF.filter(infoDF.col("age") > 25).show()infoDF.createOrReplaceTempView("infos")sparkSessionApp.sql("select * from infos where age > 25").show()}case class Info(id:Int, name:String, age:Int){} }這種方式拿到DataFrame之后,依然可以進行其他的相關API操作。
兩種方式的優缺點
DataFrame和RDD互操作的兩種方式:
反射:case class。
這種方式事先需要知道你的字段、字段類型
編程方式:Row
如果第一種情況不能滿足要求,無法事先知道字段與類型
優先考慮第一種方式。因為實現較為簡單。
總結:DataFrame = RDD + Schema
RDD僅僅知道里面裝的是什么對象(user),但是無法知道這個user里有哪些屬性,以及屬性的字段是什么類型的。所以我們直接處理RDD是有一定的困難,因此需要自己執行Schema表結構,將Schema作用于RDD中,就可以看做是一個表了。接下來就可以方便的進行操作了。
同時DataFrame優勢:DataFrame底層使用了Catalyst進行優化。
DataFrame還支持text、json、parquet以及其他外部數據源格式。將外部數據源的數據注冊到sparksql中,成為DataFrame,然后就可以使用DataFrame自身提供的API進行操作了?;蛘呖梢宰猿梢粡埍韴绦衧ql語句。執行自己的API或sql,最終形成的邏輯執行計劃都是一樣的。
總結
以上是生活随笔為你收集整理的Spark SQL之RDD转DataFrame的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SparkSQL之DataFrame A
- 下一篇: SparkSQL之DataFrame案例