日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Spark SQL读写方法

發(fā)布時(shí)間:2023/12/13 综合教程 22 生活家
生活随笔 收集整理的這篇文章主要介紹了 Spark SQL读写方法 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、DataFrame:有列名的RDD

首先,我們知道SparkSQL的目的是用sql語句去操作RDD,和Hive類似。SparkSQL的核心結(jié)構(gòu)是DataFrame,如果我們知道RDD里面的字段,也知道里面的數(shù)據(jù)類型,就好比關(guān)系型數(shù)據(jù)庫里面的一張表。那么我們就可以寫SQL,所以其實(shí)這兒我們是不能用面向?qū)ο蟮乃季S去編程的。我們最好的方式就是把抽象成為一張表,然后去用SQL語句去操作它。

DataFrame的存儲(chǔ)方式:它采用的存儲(chǔ)是類似于數(shù)據(jù)庫的表的形式進(jìn)行存儲(chǔ)的。一個(gè)數(shù)據(jù)表有幾部分組成:1、數(shù)據(jù),這個(gè)數(shù)據(jù)是一行一行進(jìn)行存儲(chǔ)的,一條記錄就是一行,2、數(shù)據(jù)表的數(shù)據(jù)字典,包括表的名稱,表的字段和字段的類型等元數(shù)據(jù)信息。那么DataFrame也是按照行進(jìn)行存儲(chǔ)的,這個(gè)類是Row,一行一行的進(jìn)行數(shù)據(jù)存儲(chǔ)。一般情況下處理粒度是行粒度的,不需要對(duì)其行內(nèi)數(shù)據(jù)進(jìn)行操作。

二、SparkSQL的程序入口:

在Spark2.0之前,是有sqlContext和hiveContext的概念的,因?yàn)檫@兩個(gè)概念難以區(qū)分,Spark2.0之后統(tǒng)一稱為SparkSession,除此之外SparkSession還封裝了SparkConf和SparkContext。

值得注意的一點(diǎn)是:Hive有很多依賴包,所以這些依賴包沒有包含在默認(rèn)的Spark包里面。如果Hive依賴的包能在classpath找到,Spark將會(huì)自動(dòng)加載它們。這些Hive依賴包必須復(fù)制到所有的工作節(jié)點(diǎn)上,因?yàn)樗鼈優(yōu)榱四軌蛟L問存儲(chǔ)在Hive的數(shù)據(jù),會(huì)調(diào)用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xml、core-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目錄下面。
當(dāng)使用Hive時(shí),必須初始化一個(gè)支持Hive的SparkSession,用戶即使沒有部署一個(gè)Hive的環(huán)境仍然可以使用Hive。當(dāng)沒有配置hive-site.xml時(shí),Spark會(huì)自動(dòng)在當(dāng)前應(yīng)用目錄創(chuàng)建metastore_db和創(chuàng)建由spark.sql.warehouse.dir配置的目錄,如果沒有配置,默認(rèn)是當(dāng)前應(yīng)用目錄下的spark-warehouse目錄。

注意:從Spark 2.0.0版本開始,hive-site.xml里面的hive.metastore.warehouse.dir屬性已經(jīng)被spark.sql.warehouse.dir替代,用于指定warehouse的默認(rèn)數(shù)據(jù)路徑(必須有寫權(quán)限)。

于是SparkSQL在與Hive有交互的情況下,需要指定支持Hive:

val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}")
val spark = SparkSession.builder().config(conf).config("spark.sql.warehouse.dir",
"hdfs://hadoop1:9000/user/hive/warehouse").enableHiveSupport().getOrCreate()

回到正題,程序入口:

1.6版本:

val conf=new SparkConf()
conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val sc=new SparkContext(conf)
val sqlContext = new SQLContext(sc)

2.0版本:

SparkSQL的程序入口縮減為一句

 val sparkSession=SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local").getOrCreate()

兩個(gè)版本一個(gè)獲得sqlContext(或者h(yuǎn)iveContext),一個(gè)獲得sparkSession。

三、算了,還是放在一起寫吧。。

case  class Person(var name:String,var age:Int)
object Test {
  def main(args: Array[String]): Unit = {
    //1.6版本入口
    val conf=new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val sc=new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
//第一種創(chuàng)建DataFrame的方式:直接讀取列式存儲(chǔ)的格式,可以直接形成DataFrame(后續(xù)怎么操作呢?)
    val df: DataFrame = sqlContext.read.json("")
    //第二種創(chuàng)建DataFrame的方式:因?yàn)閞dd沒有toDF()方法,需要進(jìn)行隱式轉(zhuǎn)化,通過map后形成一個(gè)數(shù)組
    import sqlContext.implicits._
    val df: DataFrame = sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
//第二種方法的另一種形態(tài),用sqlContext或者sparkSession的createDataFrame(),其實(shí)和toDF()方法是雷同的
    val rdd: RDD[Person] = sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt")
      .map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
    val df: DataFrame = sqlContext.createDataFrame(rdd)
    //第三種創(chuàng)建DataFrame:生成一個(gè)RowRDD,然后給出構(gòu)造的描述
    val rdd=sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt")
    val rowRDD: RDD[Row] = rdd.map(_.split(",")).map(p=>Row(p(0),p(1).trim.toInt))
    val schame=StructType(
      StructField("name",StringType,true)::
      StructField("age",IntegerType,true)::Nil
    )
    val df: DataFrame = sqlContext.createDataFrame(rowRDD,schame)
 
    //后續(xù)代碼,可以創(chuàng)建臨時(shí)視圖作為查詢,與mysql互操作要?jiǎng)?chuàng)建臨時(shí)視圖才能做查詢
//用hiveContext則直接在hive中創(chuàng)建表,然后將數(shù)據(jù)load到hive表中,可以直接進(jìn)行條件查詢,無需創(chuàng)建臨時(shí)視圖,后面與hive集成會(huì)有說明
    df.registerTempTable("person")
    sqlContext.sql("select * from person where age>21").show()
//將處理后的數(shù)據(jù)用jdbc保存到mysql數(shù)據(jù)庫中成為一張表,注意這里要使用user而不能使用username,因?yàn)橄到y(tǒng)也有一個(gè)username,會(huì)覆蓋你的用戶名
    val properties=new Properties()
    properties.put("user","root")
    properties.put("password","root")
    df.write.mode(SaveMode.Overwrite)jdbc("jdbc:mysql://localhost:3306/test","test",properties)
  }
}

四、load和save操作。

object saveAndLoadTest {
  def main(args: Array[String]): Unit = {
    val conf =new SparkConf().setAppName("").setMaster("local")
    val sc=new SparkContext(conf)
    val sqlContext=new SQLContext(sc)
 
    //read,load:讀取
    sqlContext.read.json("")
//  sqlContext.read.jdbc("url","table",properties)
    sqlContext.read.load("parquet路徑")
    sqlContext.read.format("json").load("路徑")
    val df: DataFrame = sqlContext.read.format("parquet").load("路徑")
 
    //write,save保存
    df.write.parquet("路徑.parquet")
    df.write.json("路徑.json")
//  df.write.jdbc("url","table",properties)
    df.write.format("parquet").save("路徑.parquet")
    df.write.format(("json")).save("路徑.json")
    //保存模式可選擇覆蓋,追加等
    df.write.mode(SaveMode.Overwrite).save("")
  }
}

個(gè)人理解是read和load都是讀取的作用,write和save都是保存的作用,通過上述的代碼,我們可以完成文件格式轉(zhuǎn)換的工作,將效率低的一些格式轉(zhuǎn)化成parquet這種sparksql原生支持的文件類型

總結(jié)

以上是生活随笔為你收集整理的Spark SQL读写方法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。