日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(三十二):SparkSQL的External DataSource

發布時間:2023/11/28 生活经验 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(三十二):SparkSQL的External DataSource 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

目錄

External DataSource

數據源與格式

text 數據

json 數據

csv 數據

parquet 數據

jdbc 數據

???????加載/保存數據-API

???????Load 加載數據

???????Save 保存數據

???????保存模式(SaveMode)

???????案例演示


External DataSource

在SparkSQL模塊,提供一套完成API接口,用于方便讀寫外部數據源的的數據(從Spark 1.4版本提供),框架本身內置外部數據源:

在Spark 2.4版本中添加支持Image Source(圖像數據源)和Avro Source

數據源與格式

?????數據分析處理中,數據可以分為結構化數據、非結構化數據及半結構化數據。

??1)、結構化數據(Structured)

結構化數據源可提供有效的存儲和性能。例如,Parquet和ORC等柱狀格式使從列的子集中提取值變得更加容易。

基于行的存儲格式(如Avro)可有效地序列化和存儲提供存儲優勢的數據。然而,這些優點通常以靈活性為代價。如因結構的固定性,格式轉變可能相對困難。

?2)、非結構化數據(UnStructured)

相比之下,非結構化數據源通常是自由格式文本或二進制對象,其不包含標記或元數據以定義數據的結構。

報紙文章,醫療記錄,圖像,應用程序日志通常被視為非結構化數據。這些類型的源通常要求數據周圍的上下文是可解析的。

?3)、半結構化數據(Semi-Structured)

半結構化數據源是按記錄構建的,但不一定具有跨越所有記錄的明確定義的全局模式。每個數據記錄都使用其結構信息進行擴充。

半結構化數據格式的好處是,它們在表達數據時提供了最大的靈活性,因為每條記錄都是自我描述的。但這些格式的主要缺點是它們會產生額外的解析開銷,并且不是特別為ad-hoc(特定)查詢而構建的。

text 數據

SparkSession加載文本文件數據,提供兩種方法,返回值分別為DataFrame和Dataset,前面【WordCount】中已經使用,下面看一下方法聲明:

可以看出textFile方法底層還是調用text方法,先加載數據封裝到DataFrame中,再使用as[String]方法將DataFrame轉換為Dataset,實際中推薦使用textFile方法,從Spark 2.0開始提供。

無論是text方法還是textFile方法讀取文本數據時,一行一行的加載數據,每行數據使用UTF-8編碼的字符串,列名稱為【value】。?

json 數據

實際項目中,有時處理數據以JSON格式存儲的,尤其后續結構化流式模塊:StructuredStreaming,從Kafka Topic消費數據很多時間是JSON個數據,封裝到DataFrame中,需要解析提取字段的值。以讀取github操作日志JSON數據為例,數據結構如下:

?1)、操作日志數據使用GZ壓縮:2015-03-01-11.json.gz,先使用json方法讀取。

?2)、使用textFile加載數據,對每條JSON格式字符串數據,使用SparkSQL函數庫functions中自帶get_json_obejct函數提取字段:id、type、public和created_at的值。

函數:get_json_obejct使用說明

示例代碼:


package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** SparkSQL讀取JSON格式文本數據*/
object SparkSQLJson {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 從LocalFS上讀取json格式數據(壓縮)val jsonDF: DataFrame = spark.read.json("data/input/2015-03-01-11.json.gz")//jsonDF.printSchema()jsonDF.show(5, truncate = true)println("===================================================")val githubDS: Dataset[String] = spark.read.textFile("data/input/2015-03-01-11.json.gz")//githubDS.printSchema() // value 字段名稱,類型就是StringgithubDS.show(5,truncate = true)// TODO:使用SparkSQL自帶函數,針對JSON格式數據解析的函數import org.apache.spark.sql.functions._// 獲取如下四個字段的值:id、type、public和created_atval gitDF: DataFrame = githubDS.select(get_json_object($"value", "$.id").as("id"),get_json_object($"value", "$.type").as("type"),get_json_object($"value", "$.public").as("public"),get_json_object($"value", "$.created_at").as("created_at"))gitDF.printSchema()gitDF.show(10, truncate = false)// 應用結束,關閉資源spark.stop()}
}

運行結果:

???????csv 數據

在機器學習中,常常使用的數據存儲在csv/tsv文件格式中,所以SparkSQL中也支持直接讀取格式數據,從2.0版本開始內置數據源。關于CSV/TSV格式數據說明:

SparkSQL中讀取CSV格式數據,可以設置一些選項,重點選項:

?1)、分隔符:sep

默認值為逗號,必須單個字符

?2)、數據文件首行是否是列名稱:header

默認值為false,如果數據文件首行是列名稱,設置為true

?3)、是否自動推斷每個列的數據類型:inferSchema

默認值為false,可以設置為true

官方提供案例:

當讀取CSV/TSV格式數據文件首行是否是列名稱,讀取數據方式(參數設置)不一樣的 。

?第一點:首行是列的名稱,如下方式讀取數據文件

???????//?TODO:?讀取TSV格式數據val?ratingsDF:?DataFrame?=?spark.read//?設置每行數據各個字段之間的分隔符,?默認值為?逗號.option("sep",?"\t")//?設置數據文件首行為列名稱,默認值為?false.option("header",?"true")//?自動推薦數據類型,默認值為false.option("inferSchema",?"true")//?指定文件的路徑.csv("datas/ml-100k/u.dat")ratingsDF.printSchema()ratingsDF.show(10,?truncate?=?false)

?第二點:首行不是列的名稱,如下方式讀取數據(設置Schema信息)

??????//?定義Schema信息val?schema?=?StructType(StructField("user_id",?IntegerType,?nullable?=?true)?::StructField("movie_id",?IntegerType,?nullable?=?true)?::StructField("rating",?DoubleType,?nullable?=?true)?::StructField("timestamp",?StringType,?nullable?=?true)?::?Nil)//?TODO:?讀取TSV格式數據val?mlRatingsDF:?DataFrame?=?spark.read//?設置每行數據各個字段之間的分隔符,?默認值為?逗號.option("sep",?"\t")//?指定Schema信息.schema(schema)//?指定文件的路徑.csv("datas/ml-100k/u.data")mlRatingsDF.printSchema()mlRatingsDF.show(5,?truncate?=?false)

?????將DataFrame數據保存至CSV格式文件,演示代碼如下:

示例代碼??

??????/***?將電影評分數據保存為CSV格式數據*/mlRatingsDF//?降低分區數,此處設置為1,將所有數據保存到一個文件中.coalesce(1).write//?設置保存模式,依據實際業務場景選擇,此處為覆寫.mode(SaveMode.Overwrite).option("sep",?",")//?TODO:?建議設置首行為列名.option("header",?"true").csv("datas/ml-csv-"?+?System.nanoTime())

package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** SparkSQL 讀取CSV/TSV格式數據:* i). 指定Schema信息* ii). 是否有header設置*/
object SparkSQLCsv {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._/*** 實際企業數據分析中* csv\tsv格式數據,每個文件的第一行(head, 首行),字段的名稱(列名)*/// TODO: 讀取CSV格式數據val ratingsDF: DataFrame = spark.read// 設置每行數據各個字段之間的分隔符, 默認值為 逗號.option("sep", "\t")// 設置數據文件首行為列名稱,默認值為?false.option("header", "true")// 自動推薦數據類型,默認值為false.option("inferSchema", "true")// 指定文件的路徑.csv("data/input/rating_100k_with_head.data")ratingsDF.printSchema()ratingsDF.show(10, truncate = false)println("=======================================================")// 定義Schema信息val schema = StructType(StructField("user_id", IntegerType, nullable = true) ::StructField("movie_id", IntegerType, nullable = true) ::StructField("rating", DoubleType, nullable = true) ::StructField("timestamp", StringType, nullable = true) :: Nil)// TODO: 讀取CSV格式數據val mlRatingsDF: DataFrame = spark.read// 設置每行數據各個字段之間的分隔符, 默認值為 逗號.option("sep", "\t")// 指定Schema信息.schema(schema)// 指定文件的路徑.csv("data/input/rating_100k.data")mlRatingsDF.printSchema()mlRatingsDF.show(10, truncate = false)println("=======================================================")/*** 將電影評分數據保存為CSV格式數據*/mlRatingsDF// 降低分區數,此處設置為1,將所有數據保存到一個文件中.coalesce(1).write// 設置保存模式,依據實際業務場景選擇,此處為覆寫.mode(SaveMode.Overwrite).option("sep", ",")// TODO: 建議設置首行為列名.option("header", "true").csv("data/output/ml-csv-" + System.currentTimeMillis())// 關閉資源spark.stop()}}

???????parquet 數據

SparkSQL模塊中默認讀取數據文件格式就是parquet列式存儲數據通過參數【spark.sql.sources.default】設置,默認值為【parquet】。

示例代碼:

直接load加載parquet數據和指定parquet格式加載數據。

運行程序結果:


package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}/*** SparkSQL讀取Parquet列式存儲數據*/
object SparkSQLParquet {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 從LocalFS上讀取parquet格式數據val usersDF: DataFrame = spark.read.parquet("data/input/users.parquet")usersDF.printSchema()usersDF.show(10, truncate = false)println("==================================================")// SparkSQL默認讀取文件格式為parquetval df = spark.read.load("data/input/users.parquet")df.printSchema()df.show(10, truncate = false)// 應用結束,關閉資源spark.stop()}
}

???????jdbc 數據

回顧在SparkCore中讀取MySQL表的數據通過JdbcRDD來讀取的,在SparkSQL模塊中提供對應接口,提供三種方式讀取數據:

?方式一:單分區模式

?方式二:多分區模式,可以設置列的名稱,作為分區字段及列的值范圍和分區數目

?方式三:高度自由分區模式,通過設置條件語句設置分區數據及各個分區數據范圍

當加載讀取RDBMS表的數據量不大時,可以直接使用單分區模式加載;當數據量很多時,考慮使用多分區及自由分區方式加載。

從RDBMS表中讀取數據,需要設置連接數據庫相關信息,基本屬性選項如下:

演示代碼如下:

//?連接數據庫三要素信息val?url:?String?=?"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"val?table:?String?=?"db_shop.so"//?存儲用戶和密碼等屬性val?props:?Properties?=?new?Properties()props.put("driver",?"com.mysql.cj.jdbc.Driver")props.put("user",?"root")props.put("password",?"123456")//?TODO:?從MySQL數據庫表:銷售訂單表?so//?def?jdbc(url:?String,?table:?String,?properties:?Properties):?DataFrameval?sosDF:?DataFrame?=?spark.read.jdbc(url,?table,?props)println(s"Count?=?${sosDF.count()}")sosDF.printSchema()sosDF.show(10,?truncate?=?false)

可以使用option方法設置連接數據庫信息,而不使用Properties傳遞,代碼如下:

//?TODO:?使用option設置參數val?dataframe:?DataFrame?=?spark.read.format("jdbc").option("driver",?"com.mysql.cj.jdbc.Driver").option("url",?"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true").option("user",?"root").option("password",?"123456").option("dbtable",?"db_shop.so").load()dataframe.show(5,?truncate?=?false)

???????加載/保存數據-API

????SparkSQL提供一套通用外部數據源接口,方便用戶從數據源加載和保存數據,例如從MySQL表中既可以加載讀取數據:load/read,又可以保存寫入數據:save/write。

由于SparkSQL沒有內置支持從HBase表中加載和保存數據,但是只要實現外部數據源接口,也能像上面方式一樣讀取加載數據。

???????Load 加載數據

在SparkSQL中讀取數據使用SparkSession讀取,并且封裝到數據結構Dataset/DataFrame中。

DataFrameReader專門用于加載load讀取外部數據源的數據,基本格式如下:

SparkSQL模塊本身自帶支持讀取外部數據源的數據:

總結起來三種類型數據,也是實際開發中常用的:

?第一類:文件格式數據

文本文件text、csv文件和json文件

?第二類:列式存儲數據

Parquet格式、ORC格式

?第三類:數據庫表

關系型數據庫RDBMS:MySQL、DB2、Oracle和MSSQL

Hive倉庫表

官方文檔:http://spark.apache.org/docs/2.4.5/sql-data-sources-load-save-functions.html

此外加載文件數據時,可以直接使用SQL語句,指定文件存儲格式和路徑:

???????Save 保存數據

SparkSQL模塊中可以從某個外部數據源讀取數據,就能向某個外部數據源保存數據,提供相應接口,通過DataFrameWrite類將數據進行保存。

與DataFrameReader類似,提供一套規則,將數據Dataset保存,基本格式如下:

SparkSQL模塊內部支持保存數據源如下:

所以使用SpakrSQL分析數據時,從數據讀取,到數據分析及數據保存,鏈式操作,更多就是ETL操作。當將結果數據DataFrame/Dataset保存至Hive表中時,可以設置分區partition和分桶bucket,形式如下:

???????保存模式(SaveMode

?????將Dataset/DataFrame數據保存到外部存儲系統中,考慮是否存在,存在的情況下的下如何進行保存,DataFrameWriter中有一個mode方法指定模式:

通過源碼發現SaveMode時枚舉類,使用Java語言編寫,如下四種保存模式:

?第一種:Append 追加模式,當數據存在時,繼續追加;

?第二種:Overwrite 覆寫模式,當數據存在時,覆寫以前數據,存儲當前最新數據;

?第三種:ErrorIfExists?存在及報錯;

?第四種:Ignore 忽略,數據存在時不做任何操作;

實際項目依據具體業務情況選擇保存模式,通常選擇Append和Overwrite模式。

???????案例演示

package cn.it.sqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** Author itcast* Desc 先準備一個df/ds,然后再將該df/ds的數據寫入到不同的數據源中,最后再從不同的數據源中讀取*/
object DataSourceDemo{case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準備環境-SparkSession和DFval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val lines: RDD[String] = sc.textFile("data/input/person.txt")val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))import spark.implicits._val personDF: DataFrame = personRDD.toDFpersonDF.show(6,false)/*+---+--------+---+|id |name ???|age|+---+--------+---+|1 ?|zhangsan|20 ||2 ?|lisi ???|29 ||3 ?|wangwu ?|25 ||4 ?|zhaoliu |30 ||5 ?|tianqi ?|35 ||6 ?|kobe ???|40 |+---+--------+---+*///2.將personDF寫入到不同的數據源personDF.write.mode(SaveMode.Overwrite).json("data/output/json")personDF.write.mode(SaveMode.Overwrite).csv("data/output/csv")personDF.write.mode(SaveMode.Overwrite).parquet("data/output/parquet")val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","root")personDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)println("寫入成功!")//personDF.write.text("data/output/text")//會報錯, Text data source supports only a single column, and you have 3 columns.personDF.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json1")//personDF.repartition(1)//3.從不同的數據源讀取數據val df1: DataFrame = spark.read.json("data/output/json")val df2: DataFrame = spark.read.csv("data/output/csv").toDF("id_my","name","age")val df3: DataFrame = spark.read.parquet("data/output/parquet")val df4: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)df1.show()df2.show()df3.show()df4.show()}
}

總結

以上是生活随笔為你收集整理的2021年大数据Spark(三十二):SparkSQL的External DataSource的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。