spark-sql建表语句限制_第三篇|Spark SQL编程指南
在《第二篇|Spark Core編程指南》一文中,對Spark的核心模塊進行了講解。本文將討論Spark的另外一個重要模塊--Spark SQL,Spark SQL是在Shark的基礎之上構建的,于2014年5月發布。從名稱上可以看出,該模塊是Spark提供的關系型操作API,實現了SQL-on-Spark的功能。對于一些熟悉SQL的用戶,可以直接使用SQL在Spark上進行復雜的數據處理。通過本文,你可以了解到:
- Spark SQL簡介
- DataFrame API&DataSet API
- Catalyst Optimizer優化器
- Spark SQL基本操作
- Spark SQL的數據源
- RDD與DataFrame相互轉換
- Thrift ?server與Spark SQL CLI
Spark SQL簡介
Spark SQL是Spark的其中一個模塊,用于結構化數據處理。與基本的Spark RDD API不同,Spark SQL提供的接口為Spark提供了有關數據結構和正在執行的計算的更多信息,Spark SQL會使用這些額外的信息來執行額外的優化。使用SparkSQL的方式有很多種,包括SQL、DataFrame API以及Dataset API。值得注意的是,無論使用何種方式何種語言,其執行引擎都是相同的。實現這種統一,意味著開發人員可以輕松地在不同的API之間來回切換,從而使數據處理更加地靈活。
DataFrame API&DataSet API
DataFrame API
DataFrame代表一個不可變的分布式數據集合,其核心目的是讓開發者面對數據處理時,只關心要做什么,而不用關心怎么去做,將一些優化的工作交由Spark框架本身去處理。DataFrame是具有Schema信息的,也就是說可以被看做具有字段名稱和類型的數據,類似于關系型數據庫中的表,但是底層做了很多的優化。創建了DataFrame之后,就可以使用SQL進行數據處理。
用戶可以從多種數據源中構造DataFrame,例如:結構化數據文件,Hive中的表,外部數據庫或現有RDD。DataFrame API支持Scala,Java,Python和R,在Scala和Java中,row類型的DataSet代表DataFrame,即Dataset[Row]等同于DataFrame。
DataSet API
DataSet是Spark 1.6中添加的新接口,是DataFrame的擴展,它具有RDD的優點(強類型輸入,支持強大的lambda函數)以及Spark SQL的優化執行引擎的優點。可以通過JVM對象構建DataSet,然后使用函數轉換(map,flatMap,filter)。值得注意的是,Dataset API在Scala和 Java中可用,Python不支持Dataset API。
另外,DataSet API可以減少內存的使用,由于Spark框架知道DataSet的數據結構,因此在持久化DataSet時可以節省很多的內存空間。
Catalyst Optimizer優化器
在Catalyst中,存在兩種類型的計劃:
- 邏輯計劃(Logical Plan):定義數據集上的計算,尚未定義如何去執行計算。每個邏輯計劃定義了一系列的用戶代碼所需要的屬性(查詢字段)和約束(where條件),但是不定義該如何執行。具體如下圖所示:
- 物理計劃(Physical Plan):物理計劃是從邏輯計劃生成的,定義了如何執行計算,是可執行的。舉個栗子:邏輯計劃中的JOIN會被轉換為物理計劃中的sort merge JOIN。需要注意,Spark會生成多個物理計劃,然后選擇成本最低的物理計劃。具體如下圖所示:
在Spark SQL中,所有的算子操作會被轉換成AST(abstract syntax tree,抽象語法樹),然后將其傳遞給Catalyst優化器。該優化器是在Scala的函數式編程基礎會上構建的,Catalyst支持基于規則的(rule-based)和基于成本的(cost-based)優化策略。
Spark SQL的查詢計劃包括4個階段(見下圖):
- 1.分析
- 2.邏輯優化
- 3.物理計劃
- 4.生成代碼,將查詢部分編譯成Java字節碼
注意:在物理計劃階段,Catalyst會生成多個計劃,并且會計算每個計劃的成本,然后比較這些計劃的成本的大小,即基于成本的策略。在其他階段,都是基于規則的的優化策略。
分析
Unresolved Logical plan --> Logical plan。Spark SQL的查詢計劃首先起始于由SQL解析器返回的AST,或者是由API構建的DataFrame對象。在這兩種情況下,都會存在未處理的屬性引用(某個查詢字段可能不存在,或者數據類型錯誤),比如查詢語句:SELECT col FROM sales,關于字段col的類型,或者該字段是否是一個有效的字段,只有等到查看該sales表時才會清楚。當不能確定一個屬性字段的類型或者沒能夠與輸入表進行匹配時,稱之為未處理的。Spark SQL使用Catalyst的規則以及Catalog對象(能夠訪問數據源的表信息)來處理這些屬性。首先會構建一個Unresolved Logical Plan樹,然后作用一系列的規則,最后生成Logical Plan。
邏輯優化
Logical plan --> Optimized Logical Plan。邏輯優化階段使用基于規則的優化策略,比如謂詞下推、投影裁剪等。經過一些列優化過后,生成優化的邏輯計劃Optimized Logical Plan。
物理計劃
Optimized Logical Plan -->physical Plan。在物理計劃階段,Spark SQL會將優化的邏輯計劃生成多個物理執行計劃,然后使用Cost Model計算每個物理計劃的成本,最終選擇一個物理計劃。在這個階段,如果確定一張表很小(可以持久化到內存),Spark SQL會使用broadcast join。
需要注意的是,物理計劃器也會使用基于規則的優化策略,比如將投影、過濾操作管道化一個Spark的map算子。此外,還會將邏輯計劃階段的操作推到數據源端(支持謂詞下推、投影下推)。
代碼生成
查詢優化的最終階段是生成Java字節碼,使用Quasi quotes來完成這項工作的。
經過上面的分析,對Catalyst Optimizer有了初步的了解。關于Spark的其他組件是如何與Catalyst Optimizer交互的呢?具體如下圖所示:
如上圖所示:ML Pipelines, Structured streaming以及 GraphFrames都使用了DataFrame/Dataset APIs,并且都得益于 Catalyst optimiser。
Quick Start
創建SparkSession
SparkSession是Dataset與DataFrame API的編程入口,從Spark2.0開始支持。用于統一原來的HiveContext和SQLContext,為了兼容兩者,仍然保留這兩個入口。通過一個SparkSession入口,提高了Spark的易用性。下面的代碼展示了如何創建一個SparkSession:
import?org.apache.spark.sql.SparkSessionval?spark?=?SparkSession
??.builder()
??.appName("Spark?SQL?basic?example")
??.config("spark.some.config.option",?"some-value")
??.getOrCreate()
//導入隱式轉換,比如將RDD轉為DataFrame
import?spark.implicits._
創建DataFrame
創建完SparkSession之后,可以使用SparkSession從已經存在的RDD、Hive表或者其他數據源中創建DataFrame。下面的示例使用的是從一個JSON文件數據源中創建DataFrame:
/***?{"name":"Michael"}
*?{"name":"Andy",?"age":30}
*?{"name":"Justin",?"age":19}
*/
val?df?=?spark.read.json("E://people.json")
//輸出DataFrame的內容
df.show()
//?+----+-------+
//?|?age|???name|
//?+----+-------+
//?|null|Michael|
//?|??30|???Andy|
//?|??19|?Justin|
//?+----+-------+
DataFrame基本操作
創建完DataFrame之后,可以對其進行一些列的操作,具體如下面代碼所示:
//?打印該DataFrame的信息df.printSchema()
//?root
//?|--?age:?long?(nullable?=?true)
//?|--?name:?string?(nullable?=?true)
//?查詢name字段
df.select("name").show()
//?+-------+
//?|???name|
//?+-------+
//?|Michael|
//?|???Andy|
//?|?Justin|
//?+-------+
//?將每個人的age?+?1
df.select($"name",?$"age"?+?1).show()
//?+-------+---------+
//?|???name|(age?+?1)|
//?+-------+---------+
//?|Michael|?????null|
//?|???Andy|???????31|
//?|?Justin|???????20|
//?+-------+---------+
//?查找age大于21的人員信息
df.filter($"age"?>?21).show()
//?+---+----+
//?|age|name|
//?+---+----+
//?|?30|Andy|
//?+---+----+
//?按照age分組,統計每種age的個數
df.groupBy("age").count().show()
//?+----+-----+
//?|?age|count|
//?+----+-----+
//?|??19|????1|
//?|null|????1|
//?|??30|????1|
//?+----+-----+
在程序中使用SQL查詢
上面的操作使用的是**DSL(domain-specific language)**方式,還可以直接使用SQL對DataFrame進行操作,具體如下所示:
//?將DataFrame注冊為SQL的臨時視圖//?該方法創建的是一個本地的臨時視圖,生命周期與其綁定的SparkSession會話相關
//?即如果創建該view的session結束了,該view也就消失了
df.createOrReplaceTempView("people")
val?sqlDF?=?spark.sql("SELECT?*?FROM?people")
sqlDF.show()
//?+----+-------+
//?|?age|???name|
//?+----+-------+
//?|null|Michael|
//?|??30|???Andy|
//?|??19|?Justin|
//?+----+-------+
Global Temporary View
上面使用的是Temporary views的方式,該方式是Spark Session范圍的。如果將創建的view可以在所有session之間共享,可以使用Global Temporary View的方式創建view,具體如下:
//?將DataFrame注冊為全局臨時視圖(global?temporary?view)//?該方法創建的是一個全局的臨時視圖,生命周期與其綁定的Spark應用程序相關,
//?即如果應用程序結束,會自動被刪除
//?全局臨時視圖是可以跨Spark?Session的,系統保留的數據庫名為`global_temp`
//?當查詢時,必須要加上全限定名,如`SELECT?*?FROM?global_temp.view1`
df.createGlobalTempView("people")
//?全局臨時視圖默認的保留數據庫為:`global_temp`?
spark.sql("SELECT?*?FROM?global_temp.people").show()
//?+----+-------+
//?|?age|???name|
//?+----+-------+
//?|null|Michael|
//?|??30|???Andy|
//?|??19|?Justin|
//?+----+-------+
//?全局臨時視圖支持跨Spark?Session會話
spark.newSession().sql("SELECT?*?FROM?global_temp.people").show()
//?+----+-------+
//?|?age|???name|
//?+----+-------+
//?|null|Michael|
//?|??30|???Andy|
//?|??19|?Justin|
//?+----+-------+
創建DataSet
DataSet與RDD很類似,但是,RDD使用的Java的序列化器或者Kyro序列化,而DataSet使用的是Encoder對在網絡間傳輸的對象進行序列化的。創建DataSet的示例如下:
case?class?Person(name:?String,?age:?Long)//?創建DataSet
val?caseClassDS?=?Seq(Person("Andy",?32)).toDS()
caseClassDS.show()
//?+----+---+
//?|name|age|
//?+----+---+
//?|Andy|?32|
//?+----+---+
//?通過導入Spark的隱式轉換spark.implicits._
//?可以自動識別數據類型
val?primitiveDS?=?Seq(1,?2,?3).toDS()
primitiveDS.map(_?+?1).collect()?//?返回:?Array(2,?3,?4)
//?通過調用as方法,DataFrame可以轉為DataSet,
val?path?=?"E://people.json"
val?peopleDS?=?spark.read.json(path).as[Person]
peopleDS.show()
//?+----+-------+
//?|?age|???name|
//?+----+-------+
//?|null|Michael|
//?|??30|???Andy|
//?|??19|?Justin|
//?+----+-------+
RDD與DataFrame相互轉換
Spark SQL支持兩種不同的方式將RDD轉換為DataFrame。第一種是使用反射來推斷包含特定類型對象的RDD的模式,這種基于反射的方式可以提供更簡潔的代碼,如果在編寫Spark應用程序時,已經明確了schema,可以使用這種方式。第二種方式是通過可編程接口來構建schema,然后將其應用于現有的RDD。此方式編寫的代碼更冗長,此種方式創建的DataFrame,直到運行時才知道該DataFrame的列及其類型。
下面案例的數據集如下people.txt:
Tom,?29Bob,?30
Jack,?19
通過反射的方式
Spark SQL的Scala接口支持自動將包含樣例類的RDD轉換為DataFrame。樣例類定義表的schema。通過反射讀取樣例類的參數名稱,并映射成column的名稱。
object?RDD2DF_m1?{??//創建樣例類
??case?class??Person(name:?String,?age:?Int)
??def?main(args:?Array[String]):?Unit?=?{
????val?spark?=?SparkSession
??????.builder()
??????.appName("RDD2DF_m1")
??????.master("local")
??????.getOrCreate()
????Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
????Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
????runRDD2DF(spark)
??}
??private?def?runRDD2DF(spark:?SparkSession)?=?{
????//導入隱式轉換,用于RDD轉為DataFrame
????import?spark.implicits._
????//從文本文件中創建RDD,并將其轉換為DataFrame
????val?peopleDF?=?spark.sparkContext
??????.textFile("file:///E:/people.txt")
??????.map(_.split(","))
??????.map(attributes?=>?Person(attributes(0),?attributes(1).trim.toInt))
??????.toDF()
????//將DataFrame注冊成臨時視圖
????peopleDF.createOrReplaceTempView("people")
????//?運行SQL語句
????val?teenagersDF?=?spark.sql("SELECT?name,?age?FROM?people?WHERE?age?BETWEEN?13?AND?19")
????//?使用字段索引訪問列
????teenagersDF.map(teenager?=>?"Name:?"?+?teenager(0)).show()
????//?+----------+
????//?|?????value|
????//?+----------+
????//?|Name:?Jack|
????//?+----------+
????//?通過字段名訪問列
????teenagersDF.map(teenager?=>?"Name:?"?+?teenager.getAs[String]("name")).show()
????//?+------------+
????//?|???????value|
????//?+------------+
????//?|Name:?Jack|
????//?+------------+
??}
}
通過構建schema的方式
通過構建schema的方式創建DataFrame主要包括三步:
- 1.從原始RDD創建Row類型的RDD
- 2.使用StructType,創建schema
- 3.通過createDataFrame方法將schema應用于Row類型的RDD
??def?main(args:?Array[String]):?Unit?=?{
????val?spark?=?SparkSession
??????.builder()
??????.appName("RDD2DF_m1")
??????.master("local")
??????.getOrCreate()
????Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
????Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
????runRDD2DF(spark)
??}
??private?def?runRDD2DF(spark:?SparkSession)?=?{
????//導入隱式轉換,用于RDD轉為DataFrame
????import?spark.implicits._
????//創建原始RDD
????val?peopleRDD?=?spark.sparkContext.textFile("E:/people.txt")
????//step?1?將原始RDD轉換為ROW類型的RDD
????val?rowRDD?=?peopleRDD
??????.map(_.split(","))
??????.map(attributes?=>?Row(attributes(0),?attributes(1).trim.toInt))
????//step?2?創建schema
????val?schema?=?StructType(Array(
??????StructField("name",?StringType,?true),
??????StructField("age",?IntegerType,?true)
????))
????//step?3?創建DF
????val?peopleDF?=?spark.createDataFrame(rowRDD,?schema)
????//?將DataFrame注冊成臨時視圖
????peopleDF.createOrReplaceTempView("people")
????//?運行SQL語句
????val?results?=?spark.sql("SELECT?name?FROM?people")
????//?使用字段索引訪問列
????results.map(attributes?=>?"Name:?"?+?attributes(0)).show()
????//?+----------+
????//?|?????value|
????//?+----------+
????//?|?Name:?Tom|
????//?|?Name:?Bob|
????//?|?Name:?Jack|
????//?+----------+
??}
}
Spark SQL的數據源
Spark SQL支持通過DataFrame接口對各種數據源進行操作,可以使用關系轉換以及臨時視圖對DataFrame進行操作。常見的數據源包括以下幾種:
文件數據源
- Parquet文件
- JSON文件
- CSV文件
- ORC文件
????/**
??????*?讀取parquet文件數據源,并將結果寫入到parquet文件
??????*/
????val?usersDF?=?spark
??????.read
??????.load("E://users.parquet")
????usersDF.show()
????//?將DF保存到parquet文件
????usersDF
??????.select("name",?"favorite_color")
??????.write
??????.mode(SaveMode.Overwrite)
??????.save("E://namesAndFavColors.parquet")
????/**
??????*?讀取json文件數據源,并將結果寫入到parquet文件
??????*/
????val?peopleDF?=?spark
??????.read
??????.format("json")
??????.load("E://people.json")
????peopleDF.show()
????//?將DF保存到parquet文件
????peopleDF
??????.select("name",?"age")
??????.write
??????.format("parquet")
??????.mode(SaveMode.Overwrite)
??????.save("E://namesAndAges.parquet")
????/**
??????*?讀取CSV文件數據源
??????*/
????val?peopleDFCsv?=?spark.read.format("csv")
??????.option("sep",?";")
??????.option("inferSchema",?"true")
??????.option("header",?"true")
??????.load("E://people.csv")
????/**
??????*?將usersDF寫入到ORC文件
??????*/
????usersDF.write.format("orc")
??????.option("orc.bloom.filter.columns",?"favorite_color")
??????.option("orc.dictionary.key.threshold",?"1.0")
??????.option("orc.column.encoding.direct",?"name")
??????.mode(SaveMode.Overwrite)
??????.save("E://users_with_options.orc")
????/**
??????*?將peopleDF保存為持久化表,一般保存為Hive中
??????*/
????peopleDF
??????.write
??????.option("path","E://warehouse/people_bucketed")?//?保存路徑
??????.bucketBy(42,?"name")???????????//?按照name字段分桶
??????.sortBy("age")??????????????????//?按照age字段排序
??????.saveAsTable("people_bucketed")
????/**
??????*?將userDF保存為分區文件,類似于Hive分區表
??????*/
????usersDF
??????.write
??????.partitionBy("favorite_color")??//?分區字段
??????.format("parquet")????????//?文件格式
??????.mode(SaveMode.Overwrite)?//?保存模式
??????.save("E://namesPartByColor.parquet")
????/**
??????*
??????*/
????usersDF
??????.write
??????.option("path","E://warehouse/users_partitioned_bucketed")?//?保存路徑
??????.partitionBy("favorite_color")??//?分區
??????.bucketBy(42,?"name")???????????//?分桶
??????.saveAsTable("users_partitioned_bucketed")
????spark.sql("DROP?TABLE?IF?EXISTS?people_bucketed")
????spark.sql("DROP?TABLE?IF?EXISTS?users_partitioned_bucketed")
??}
保存模式
| SaveMode.ErrorIfExists(default) | 如果目標文件已經存在,則報異常 |
| SaveMode.Append | 如果目標文件或表已經存在,則將結果追加進去 |
| SaveMode.Overwrite | 如果目標文件或表已經存在,則覆蓋原有的內容 |
| SaveMode.Ignore | 類似于SQL中的CREATE TABLE IF NOT EXISTS,如果目標文件或表已經存在,則不做任何操作 |
保存為持久化表
DataFrame可以被保存為Hive的持久化表,值得注意的是,這種方式并不依賴與Hive的部署,也就是說Spark會使用Derby創建一個默認的本地Hive metastore,與createOrReplaceTempView不同,該方式會直接將結果物化。
對于基于文件的數據源( text, parquet, json等),在保存的時候可以指定一個具體的路徑,比如 df.write.option("path", "/some/path").saveAsTable("t")(存儲在指定路徑下的文件格式為parquet)。當表被刪除時,自定義的表的路徑和表數據不會被移除。如果沒有指定具體的路徑,spark默認的是warehouse的目錄(/user/hive/warehouse),當表被刪除時,默認的表路徑也會被刪除。
Hive數據源
見下面小節:Spark SQL集成Hive
JDBC數據源
Spark SQL還包括一個可以使用JDBC從其他數據庫讀取數據的數據源。與使用JdbcRDD相比,應優先使用此功能。這是因為結果作為DataFrame返回,它們可以在Spark SQL中輕松處理或與其他數據源連接。JDBC數據源也更易于使用Java或Python,因為它不需要用戶提供ClassTag。
可以使用Data Sources API將遠程數據庫中的表加載為DataFrame或Spark SQL臨時視圖。用戶可以在數據源選項中指定JDBC連接屬性。user并且password通常作為用于登錄數據源的連接屬性提供。除連接屬性外,Spark還支持以下不區分大小寫的選項:
| url | 要連接的JDBC URL |
| dbtable | 讀取或寫入的JDBC表 |
| query | 指定查詢語句 |
| driver | 用于連接到該URL的JDBC驅動類名 |
| partitionColumn, lowerBound, upperBound | 如果指定了這些選項,則必須全部指定。另外, numPartitions必須指定 |
| numPartitions | 表讀寫中可用于并行處理的最大分區數。這也確定了并發JDBC連接的最大數量。如果要寫入的分區數超過此限制,我們可以通過coalesce(numPartitions)在寫入之前進行調用將其降低到此限制 |
| queryTimeout | 默認為0,查詢超時時間 |
| fetchsize | JDBC的獲取大小,它確定每次要獲取多少行。這可以幫助提高JDBC驅動程序的性能 |
| batchsize | 默認為1000,JDBC批處理大小,這可以幫助提高JDBC驅動程序的性能。 |
| isolationLevel | 事務隔離級別,適用于當前連接。它可以是一個NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,對應于由JDBC的連接對象定義,缺省值為標準事務隔離級別READ_UNCOMMITTED。此選項僅適用于寫作。 |
| sessionInitStatement | 在向遠程數據庫打開每個數據庫會話之后,在開始讀取數據之前,此選項將執行自定義SQL語句,使用它來實現會話初始化代碼。 |
| truncate | 這是與JDBC writer相關的選項。當SaveMode.Overwrite啟用時,就會清空目標表的內容,而不是刪除和重建其現有的表。默認為false |
| pushDownPredicate | 用于啟用或禁用謂詞下推到JDBC數據源的選項。默認值為true,在這種情況下,Spark將盡可能將過濾器下推到JDBC數據源。 |
??def?main(args:?Array[String]):?Unit?=?{
????val?spark?=?SparkSession
??????.builder()
??????.appName("JdbcDatasetExample")
??????.master("local")?//設置為本地運行
??????.getOrCreate()
????Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
????Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
????runJdbcDatasetExample(spark)
??}
??private?def?runJdbcDatasetExample(spark:?SparkSession):?Unit?=?{
????//注意:從JDBC源加載數據
????val?jdbcPersonDF?=?spark.read
??????.format("jdbc")
??????.option("url",?"jdbc:mysql://localhost/mydb")
??????.option("dbtable",?"person")
??????.option("user",?"root")
??????.option("password",?"123qwe")
??????.load()
????//打印jdbcDF的schema
????jdbcPersonDF.printSchema()
????//打印數據
????jdbcPersonDF.show()
????val?connectionProperties?=?new?Properties()
????connectionProperties.put("user",?"root")
????connectionProperties.put("password",?"123qwe")
????//通過.jdbc的方式加載數據
????val?jdbcStudentDF?=?spark
??????.read
??????.jdbc("jdbc:mysql://localhost/mydb",?"student",?connectionProperties)
????//打印jdbcDF的schema
????jdbcStudentDF.printSchema()
????//打印數據
????jdbcStudentDF.show()
????//?保存數據到JDBC源
????jdbcStudentDF.write
??????.format("jdbc")
??????.option("url",?"jdbc:mysql://localhost/mydb")
??????.option("dbtable",?"student2")
??????.option("user",?"root")
??????.option("password",?"123qwe")
??????.mode(SaveMode.Append)
??????.save()
????jdbcStudentDF
??????.write
??????.mode(SaveMode.Append)
??????.jdbc("jdbc:mysql://localhost/mydb",?"student2",?connectionProperties)
??}
}
Spark SQL集成Hive
Spark SQL還支持讀取和寫入存儲在Apache Hive中的數據。但是,由于Hive具有大量依賴項,因此這些依賴項不包含在默認的Spark發布包中。如果可以在類路徑上找到Hive依賴項,Spark將自動加載它們。請注意,這些Hive依賴項也必須存在于所有工作節點(worker nodes)上,因為它們需要訪問Hive序列化和反序列化庫(SerDes)才能訪問存儲在Hive中的數據。
將hive-site.xml,core-site.xml以及hdfs-site.xml文件放在conf/下。
在使用Hive時,必須實例化一個支持Hive的SparkSession,包括連接到持久性Hive Metastore,支持Hive 的序列化、反序列化(serdes)和Hive用戶定義函數。沒有部署Hive的用戶仍可以啟用Hive支持。如果未配置hive-site.xml,則上下文(context)會在當前目錄中自動創建metastore_db,并且會創建一個由spark.sql.warehouse.dir配置的目錄,其默認目錄為spark-warehouse,位于啟動Spark應用程序的當前目錄中。請注意,自Spark 2.0.0以來,該在hive-site.xml中的hive.metastore.warehouse.dir屬性已被標記過時(deprecated)。使用spark.sql.warehouse.dir用于指定warehouse中的默認位置。可能需要向啟動Spark應用程序的用戶授予寫入的權限。
下面的案例為在本地運行(為了方便查看打印的結果),運行結束之后會發現在項目的目錄下E:\IdeaProjects\myspark創建了spark-warehouse和metastore_db的文件夾。可以看出沒有部署Hive的用戶仍可以啟用Hive支持,同時也可以將代碼打包,放在集群上運行。
object?SparkHiveExample?{??case?class?Record(key:?Int,?value:?String)
??def?main(args:?Array[String])?{
????val?spark?=?SparkSession
??????.builder()
??????.appName("Spark?Hive?Example")
??????.config("spark.sql.warehouse.dir",?"e://warehouseLocation")
??????.master("local")//設置為本地運行
??????.enableHiveSupport()
??????.getOrCreate()
????Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
????Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
????import?spark.implicits._
????import?spark.sql
????//使用Spark?SQL?的語法創建Hive中的表
????sql("CREATE?TABLE?IF?NOT?EXISTS?src?(key?INT,?value?STRING)?USING?hive")
????sql("LOAD?DATA?LOCAL?INPATH?'file:///e:/kv1.txt'?INTO?TABLE?src")
????//?使用HiveQL查詢
????sql("SELECT?*?FROM?src").show()
????//?+---+-------+
????//?|key|??value|
????//?+---+-------+
????//?|238|val_238|
????//?|?86|?val_86|
????//?|311|val_311|
????//?...
????//?支持使用聚合函數
????sql("SELECT?COUNT(*)?FROM?src").show()
????//?+--------+
????//?|count(1)|
????//?+--------+
????//?|????500?|
????//?+--------+
????//?SQL查詢的結果是一個DataFrame,支持使用所有的常規的函數
????val?sqlDF?=?sql("SELECT?key,?value?FROM?src?WHERE?key??0?ORDER?BY?key")
????//?DataFrames是Row類型的,?允許你按順序訪問列.
????val?stringsDS?=?sqlDF.map?{
??????case?Row(key:?Int,?value:?String)?=>?s"Key:?$key,?Value:?$value"
????}
????stringsDS.show()
????//?+--------------------+
????//?|???????????????value|
????//?+--------------------+
????//?|Key:?0,?Value:?val_0|
????//?|Key:?0,?Value:?val_0|
????//?|Key:?0,?Value:?val_0|
????//?...
????//可以通過SparkSession使用DataFrame創建一個臨時視圖
????val?recordsDF?=?spark.createDataFrame((1?to?100).map(i?=>?Record(i,?s"val_$i")))
????recordsDF.createOrReplaceTempView("records")
????//可以用DataFrame與Hive中的表進行join查詢
????sql("SELECT?*?FROM?records?r?JOIN?src?s?ON?r.key?=?s.key").show()
????//?+---+------+---+------+
????//?|key|?value|key|?value|
????//?+---+------+---+------+
????//?|??2|?val_2|??2|?val_2|
????//?|??4|?val_4|??4|?val_4|
????//?|??5|?val_5|??5|?val_5|
????//?...
????//創建一個Parquet格式的hive托管表,使用的是HQL語法,沒有使用Spark?SQL的語法("USING?hive")
????sql("CREATE?TABLE?IF?NOT?EXISTS?hive_records(key?int,?value?string)?STORED?AS?PARQUET")
????//讀取Hive中的表,轉換成了DataFrame
????val?df?=?spark.table("src")
????//將該DataFrame保存為Hive中的表,使用的模式(mode)為復寫模式(Overwrite)
????//即如果保存的表已經存在,則會覆蓋掉原來表中的內容
????df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
????//?查詢表中的數據
????sql("SELECT?*?FROM?hive_records").show()
????//?+---+-------+
????//?|key|??value|
????//?+---+-------+
????//?|238|val_238|
????//?|?86|?val_86|
????//?|311|val_311|
????//?...
????//?設置Parquet數據文件路徑
????val?dataDir?=?"/tmp/parquet_data"
????//spark.range(10)返回的是DataSet[Long]
????//將該DataSet直接寫入parquet文件
????spark.range(10).write.parquet(dataDir)
????//?在Hive中創建一個Parquet格式的外部表
????sql(s"CREATE?EXTERNAL?TABLE?IF?NOT?EXISTS?hive_ints(key?int)?STORED?AS?PARQUET?LOCATION?'$dataDir'")
????//?查詢上面創建的表
????sql("SELECT?*?FROM?hive_ints").show()
????//?+---+
????//?|key|
????//?+---+
????//?|??0|
????//?|??1|
????//?|??2|
????//?...
????//?開啟Hive動態分區
????spark.sqlContext.setConf("hive.exec.dynamic.partition",?"true")
????spark.sqlContext.setConf("hive.exec.dynamic.partition.mode",?"nonstrict")
????//?使用DataFrame?API創建Hive的分區表
????df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
????//分區鍵‘key’將會在最終的schema中被移除
????sql("SELECT?*?FROM?hive_part_tbl").show()
????//?+-------+---+
????//?|??value|key|
????//?+-------+---+
????//?|val_238|238|
????//?|?val_86|?86|
????//?|val_311|311|
????//?...
????spark.stop()
??}
}
Thrift ?server與Spark SQL CLI
可以使用JDBC/ODBC或者命令行訪問Spark SQL,通過這種方式,用戶可以直接使用SQL運行查詢,而不用編寫代碼。
Thrift JDBC/ODBC server
Thrift JDBC/ODBC server與Hive的HiveServer2向對應,可以使用Beeline訪問JDBC服務器。在Spark的sbin目錄下存在start-thriftserver.sh腳本,使用此腳本啟動JDBC/ODBC服務器:
./sbin/start-thriftserver.sh使用beeline訪問JDBC/ODBC服務器,Beeline會要求提供用戶名和密碼,在非安全模式下,只需輸入用戶名和空白密碼即可
beeline>?!connect?jdbc:hive2://localhost:10000Spark SQL CLI
Spark SQL CLI是在本地模式下運行Hive Metastore服務并執行從命令行輸入的查詢的便捷工具。請注意,Spark SQL CLI無法與Thrift JDBC服務器通信。
要啟動Spark SQL CLI,只需要在Spark的bin目錄中運行以下命令:
./spark-sql?總結
本文主要對Spark SQL進行了闡述,主要包括Spark SQL的介紹、DataFrame&DataSet API基本使用、Catalyst Optimizer優化器的基本原理、Spark SQL編程、Spark SQL數據源以及與Hive集成、Thrift ?server與Spark SQL CLI。下一篇將分享Spark Streaming編程指南。
總結
以上是生活随笔為你收集整理的spark-sql建表语句限制_第三篇|Spark SQL编程指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: exp导出excel oracle_如何
- 下一篇: 中统计字符串长度的函数_SQL Serv