Spark SQL(五)之数据加载与存储
一、數據加載
(1)默認數據源(parquet)
最簡單加載數據的方式,所有操作都使用默認數據源(parquet)。如果指定默認數據源需要配置?spark.sql.sources.default參數。
Dataset<Row> manDF = spark.read().load("hdfs://master:9000/test.parquet"); manDF.select("name", "desc").write().save("hdfs://master:9000/test1.parquet");(2)手動指定選項
可以手動指定將要使用的數據源以及要傳遞給數據源的任何其他選項。數據源通過其全名指定(即org.apache.spark.sql.parquet),但內置的來源。
也可以使用自己的短名稱(json,parquet,jdbc,orc,libsvm,csv,text)。從任何數據源類型加載的DataFrame都可以使用此語法轉換為其他類型。
請參閱API文檔以獲取內置源的可用選項,例如?org.apache.spark.sql.DataFrameReader和org.apache.spark.sql.DataFrameWriter。此處記錄的選項也應通過非Scala Spark API(例如PySpark)應用。
Dataset<Row> manDF = spark.read().format("json").load("hdfs://master:9000/test.json"); manDF.select("name", "desc").write().format("parquet").save("hdfs://master:9000/test1.parquet");?(3)加載CSV文件
Dataset<Row> manDF = spark.read().format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("hdfs://master:9000/test.csv");(4)寫操作期使用額外的option
控制ORC數據源的Bloom過濾器和字典編碼,以下ORC示例將創建Bloom過濾器,并僅將字典編碼用于age。對于parquet,也存在parquet.enable.dictionary。要查找有關其他ORC / Parquet選項的更多詳細信息,請訪問Apache ORC / Parquet官方網站。
manDF.write.format("orc").option("orc.bloom.filter.columns", "age").option("orc.dictionary.key.threshold", "1.0").option("orc.column.encoding.direct", "name").save("hdfs://master:9000/man.orc")?
二、保存模式
保存操作可以選擇帶SaveMode,指定如何處理現有數據(如果存在)。重要的是要認識到這些保存模式不使用任何鎖定,也不是原子的。另外,執行時Overwrite,將在寫出新數據之前刪除數據。
| SaveMode.ErrorIfExists?(默認) | "error" or "errorifexists"?(默認) | 將DataFrame保存到數據源時,如果已經存在數據,則將引發異常。 |
| SaveMode.Append | "append" | 將DataFrame保存到數據源時,如果已經存在數據/表,則應該將DataFrame的內容附加到現有數據中。 |
| SaveMode.Overwrite | "overwrite" | 覆蓋模式意味著將DataFrame保存到數據源時,如果已經存在數據/表,則預期現有數據將被DataFrame的內容覆蓋。 |
| SaveMode.Ignore | "ignore" | 忽略模式意味著在將DataFrame保存到數據源時,如果已經存在數據,則期望保存操作不保存DataFrame的內容并且不更改現有數據。這類似于CREATE TABLE IF NOT EXISTSSQL中的。 |
保存到永久表
DataFrames也可以使用以下saveAsTable?命令作為持久性表保存到Hive Metastore中。請注意,使用此功能不需要現有的Hive部署。Spark將為您創建一個默認的本地Hive Metastore(使用Derby)。與createOrReplaceTempView命令不同,?saveAsTable它將具體化DataFrame的內容并在Hive元存儲中創建一個指向數據的指針。即使您重新啟動Spark程序,持久表仍將存在,只要您保持與同一metastore的連接即可。可以通過使用表名稱table在上調用方法來創建持久表的DataFrame?SparkSession。
對于基于文件的數據源,例如文本,鑲木地板,json等,您可以通過path選項指定自定義表路徑?,例如df.write.option("path", "/some/path").saveAsTable("t")。刪除表后,自定義表路徑將不會刪除,并且表數據仍然存在。如果未指定自定義表路徑,Spark會將數據寫入倉庫目錄下的默認表路徑。刪除表時,默認表路徑也將被刪除。
從Spark 2.1開始,持久數據源表在Hive元存儲中存儲了按分區的元數據。這帶來了幾個好處:
- 由于元存儲只能返回查詢的必要分區,因此不再需要在第一個查詢中將所有分區發現到表中。
- Hive DDL(例如,ALTER TABLE PARTITION ... SET LOCATION現在可用于使用Datasource API創建的表)。
請注意,在創建外部數據源表(帶有path選項的表)時,默認情況下不會收集分區信息。要同步元存儲中的分區信息,可以調用MSCK REPAIR TABLE。
?
三、分組,分類和分區
對于基于文件的數據源,也可以對輸出進行存儲和分類或分區。
(1)桶和排序
存儲桶和排序僅適用于持久表
cityDF.write().bucketBy(10, "city").sortBy("area").saveAsTable("city_buckets");(2)分區
而分區可以既使用save和saveAsTable使用DataSet API時
manDF.write().partitionBy("age").format("json").save("hdfs://master:9000/man.json");(3)分區和桶
對表使用分區和存儲桶
manDF.write().partitionBy("age").bucketBy(18, "name").saveAsTable("man_partition_buckets")partitionBy創建一個分區結構描述“分區發現”部分,它對具有高聚集數的列適用性有限。相反, bucketBy將數據分布在固定數量的存儲桶中,在唯一值的數量不受限制時可以使用。
?
?
總結
以上是生活随笔為你收集整理的Spark SQL(五)之数据加载与存储的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑不断重启故障诊断电脑不断重启原因
- 下一篇: Spark SQL(六)之加载数据的参数