日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

spark sql 上个月_Spark学习之路 (十八)SparkSQL简单使用

發(fā)布時(shí)間:2023/12/10 数据库 55 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark sql 上个月_Spark学习之路 (十八)SparkSQL简单使用 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、SparkSQL的進(jìn)化之路

1.0以前:

Shark

1.1.x開始:

SparkSQL(只是測(cè)試性的)? SQL

1.3.x:

SparkSQL(正式版本)+Dataframe

1.5.x:

SparkSQL 鎢絲計(jì)劃

1.6.x:

SparkSQL+DataFrame+DataSet(測(cè)試版本)

x:

SparkSQL+DataFrame+DataSet(正式版本)

SparkSQL:還有其他的優(yōu)化

StructuredStreaming(DataSet)

二、認(rèn)識(shí)SparkSQL

2.1 什么是SparkSQL?

spark SQL是spark的一個(gè)模塊,主要用于進(jìn)行結(jié)構(gòu)化數(shù)據(jù)的處理。它提供的最核心的編程抽象就是DataFrame。

2.2 SparkSQL的作用

提供一個(gè)編程抽象(DataFrame) 并且作為分布式 SQL?查詢引擎

DataFrame:它可以根據(jù)很多源進(jìn)行構(gòu)建,包括:結(jié)構(gòu)化的數(shù)據(jù)文件,hive中的表,外部的關(guān)系型數(shù)據(jù)庫,以及RDD

2.3 運(yùn)行原理

將?Spark SQL?轉(zhuǎn)化為?RDD,?然后提交到集群執(zhí)行

2.4 特點(diǎn)

(1)容易整合

(2)統(tǒng)一的數(shù)據(jù)訪問方式

(3)兼容 Hive

(4)標(biāo)準(zhǔn)的數(shù)據(jù)連接

2.5 SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供了統(tǒng)一的切入點(diǎn),來讓用戶學(xué)習(xí)spark的各項(xiàng)功能。

在spark的早期版本中,SparkContext是spark的主要切入點(diǎn),由于RDD是主要的API,我們通過sparkcontext來創(chuàng)建和操作RDD。對(duì)于每個(gè)其他的API,我們需要使用不同的context。例如,對(duì)于Streming,我們需要使用StreamingContext;對(duì)于sql,使用sqlContext;對(duì)于Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標(biāo)準(zhǔn)的API,就需要為他們建立接入點(diǎn)。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點(diǎn),SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向后兼容,SQLContext和HiveContext也被保存下來。

SparkSession實(shí)質(zhì)上是SQLContext和HiveContext的組合(未來可能還會(huì)加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內(nèi)部封裝了sparkContext,所以計(jì)算實(shí)際上是由sparkContext完成的。

特點(diǎn):

----為用戶提供一個(gè)統(tǒng)一的切入點(diǎn)使用Spark?各項(xiàng)功能

----允許用戶通過它調(diào)用?DataFrame?和?Dataset?相關(guān) API?來編寫程序

----減少了用戶需要了解的一些概念,可以很容易的與?Spark?進(jìn)行交互

----與?Spark?交互之時(shí)不需要顯示的創(chuàng)建?SparkConf, SparkContext?以及 SQlContext,這些對(duì)象已經(jīng)封閉在?SparkSession?中

2.7 DataFrames

在Spark中,DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結(jié)構(gòu)信息,從而對(duì)藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。反觀RDD,由于無從得知所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu),Spark Core只能在stage層面進(jìn)行簡(jiǎn)單、通用的流水線優(yōu)化。

三、RDD轉(zhuǎn)換成為DataFrame

使用spark1.x版本的方式

測(cè)試數(shù)據(jù)目錄:/home/hadoop/apps/spark/examples/src/main/resources(spark的安裝目錄里面)

people.txt

3.1 方式一:通過?case class?創(chuàng)建?DataFrames(反射)

//定義case class,相當(dāng)于表結(jié)構(gòu)

case class People(var name:String,varage:Int)objectTestDataFrame1 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("RDDToDataFrame").setMaster("local")

val sc= newSparkContext(conf)

val context= newSQLContext(sc)//將本地的數(shù)據(jù)讀入 RDD, 并將 RDD 與 case class 關(guān)聯(lián)

val peopleRDD = sc.textFile("E:\\666\\people.txt")

.map(line=> People(line.split(",")(0), line.split(",")(1).trim.toInt))

import context.implicits._//將RDD 轉(zhuǎn)換成 DataFrames

val df =peopleRDD.toDF//將DataFrames創(chuàng)建成一個(gè)臨時(shí)的視圖

df.createOrReplaceTempView("people")//使用SQL語句進(jìn)行查詢

context.sql("select * from people").show()

}

}

運(yùn)行結(jié)果

3.2 方式二:通過?structType?創(chuàng)建?DataFrames(編程接口)

objectTestDataFrame2 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val fileRDD= sc.textFile("E:\\666\\people.txt")//將 RDD 數(shù)據(jù)映射成 Row,需要 import org.apache.spark.sql.Row

val rowRDD: RDD[Row] = fileRDD.map(line =>{

val fields= line.split(",")

Row(fields(0), fields(1).trim.toInt)

})//創(chuàng)建 StructType 來定義結(jié)構(gòu)

val structType: StructType =StructType(//字段名,字段類型,是否可以為空

StructField("name", StringType, true) ::

StructField("age", IntegerType, true) :: Nil

)/**

* rows: java.util.List[Row],

* schema: StructType

**/val df: DataFrame=sqlContext.createDataFrame(rowRDD,structType)

df.createOrReplaceTempView("people")

sqlContext.sql("select * from people").show()

}

}

運(yùn)行結(jié)果

3.3 方式三:通過 json 文件創(chuàng)建?DataFrames

objectTestDataFrame3 {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val df: DataFrame= sqlContext.read.json("E:\\666\\people.json")

df.createOrReplaceTempView("people")

sqlContext.sql("select * from people").show()

}

}

四、DataFrame的read和save和savemode

4.1 數(shù)據(jù)的讀取

objectTestRead {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)//方式一

val df1 = sqlContext.read.json("E:\\666\\people.json")

val df2= sqlContext.read.parquet("E:\\666\\users.parquet")//方式二

val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")

val df4= sqlContext.read.format("parquet").load("E:\\666\\users.parquet")//方式三,默認(rèn)是parquet格式

val df5 = sqlContext.load("E:\\666\\users.parquet")

}

}

4.2 數(shù)據(jù)的保存

objectTestSave {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val df1= sqlContext.read.json("E:\\666\\people.json")//方式一

df1.write.json("E:\\111")

df1.write.parquet("E:\\222")//方式二

df1.write.format("json").save("E:\\333")

df1.write.format("parquet").save("E:\\444")//方式三

df1.write.save("E:\\555")

}

}

4.3 數(shù)據(jù)的保存模式

使用mode

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

五、數(shù)據(jù)源

5.1 數(shù)據(jù)源只json

參考4.1

5.2 數(shù)據(jù)源之parquet

參考4.1

5.3 數(shù)據(jù)源之Mysql

objectTestMysql {

def main(args: Array[String]): Unit={

val conf= new SparkConf().setAppName("TestMysql").setMaster("local")

val sc= newSparkContext(conf)

val sqlContext= newSQLContext(sc)

val url= "jdbc:mysql://192.168.123.102:3306/hivedb"val table= "dbs"val properties= newProperties()

properties.setProperty("user","root")

properties.setProperty("password","root")//需要傳入Mysql的URL、表明、properties(連接數(shù)據(jù)庫的用戶名密碼)

val df =sqlContext.read.jdbc(url,table,properties)

df.createOrReplaceTempView("dbs")

sqlContext.sql("select * from dbs").show()

}

}

運(yùn)行結(jié)果

5.4 數(shù)據(jù)源之Hive

(1)準(zhǔn)備工作

在pom.xml文件中添加依賴

org.apache.spark

spark-hive_2.11

2.3.0

開發(fā)環(huán)境則把resource文件夾下添加hive-site.xml文件,集群環(huán)境把hive的配置文件要發(fā)到$SPARK_HOME/conf目錄下

javax.jdo.option.ConnectionURL

jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true

JDBC connect string for a JDBC metastore

javax.jdo.option.ConnectionDriverName

com.mysql.jdbc.Driver

Driver class name for a JDBC metastore

javax.jdo.option.ConnectionUserName

root

username to use against metastore database

javax.jdo.option.ConnectionPassword

root

password to use against metastore database

hive.metastore.warehouse.dir

/hive/warehouse

hive default warehouse, if nessecory, change it

(2)測(cè)試代碼

object TestHive {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)

val sc = new SparkContext(conf)

val sqlContext = new HiveContext(sc)

sqlContext.sql("select * from myhive.student").show()

}

}

運(yùn)行結(jié)果

總結(jié)

以上是生活随笔為你收集整理的spark sql 上个月_Spark学习之路 (十八)SparkSQL简单使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。