日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

040 DataFrame中的write与read编程

發(fā)布時間:2025/7/14 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 040 DataFrame中的write与read编程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一:SparkSQL支持的外部數(shù)據(jù)源

1.支持情況

  

?

2.External LIbraries

  不是內(nèi)嵌的,看起來不支持。

  但是現(xiàn)在已經(jīng)有很多開源插件,可以進行支持。

?

3.參考材料

·  支持的格式:https://github.com/databricks

?

二:準備

1.啟動服務

  RunJar是metastore服務,在hive那邊開啟。

  只需要啟動三個服務就可以了,以后runjar都要啟動,因為這里使用hive與spark集成了,不啟動這個服務,就會總是報錯。

  

?

2.啟動spark-shell

  

?

三:測試檢驗程序

1.DataFrame的構(gòu)成

  

?

2.結(jié)果

  

?

3.測試

?  

?

4.結(jié)果

  

?

四:DataFrame的創(chuàng)建

1.創(chuàng)建SQLContext

  val sqlContext=new SQLContext(sc)

2.創(chuàng)建DataFrame(兩種方式)

  val df=sqlContext.#

  val df=sqlContext.read.#

3.DataFrame數(shù)據(jù)轉(zhuǎn)換

  val ndf=df.#.#

4.結(jié)果保存

  ndf.#

  ndf.write.#

?

五:DataFrame的保存

1.第一種方式

  將DataFrame轉(zhuǎn)換為RDD,RDD數(shù)據(jù)保存

?

2.第二種方式

  直接通過DataFrame的write屬性將數(shù)據(jù)寫出。

  但是有限制,必須有定義類實現(xiàn),默認情況:SparkSQL只支持parquet,json,jdbc

?

六:兩個常用的網(wǎng)站(數(shù)據(jù)源問題)

1.金磚公司提供的一些插件

  

?

2.package網(wǎng)址

  https://spark-packages.org/

  

?

七:DataFrameReader編程模式

功能: 通過SQLContext提供的reader讀取器讀取外部數(shù)據(jù)源的數(shù)據(jù),并形成DataFrame

1.源碼的主要方法

  format:給定數(shù)據(jù)源數(shù)據(jù)格式類型,eg: json、parquet
  schema:給定讀入數(shù)據(jù)的數(shù)據(jù)schema,可以不給定,不給定的情況下,進行數(shù)據(jù)類型推斷
  option:添加參數(shù),這些參數(shù)在數(shù)據(jù)解析的時候可能會用到
  load:
    有參數(shù)的指從參數(shù)給定的path路徑中加載數(shù)據(jù),比如:JSON、Parquet...
    無參數(shù)的指直接加載數(shù)據(jù)(根據(jù)option相關的參數(shù))
  jdbc:讀取關系型數(shù)據(jù)庫的數(shù)據(jù)
  json:讀取json格式數(shù)據(jù)
  parquet:讀取parquet格式數(shù)據(jù)
  orc: 讀取orc格式數(shù)據(jù)
  table:直接讀取關聯(lián)的Hive數(shù)據(jù)庫中的對應表數(shù)據(jù)

?

八:Reader的程序測試

1.新建文件夾

  

?

2.上傳數(shù)據(jù)

  

?

3.加載json數(shù)據(jù)

  val df=sqlContext.read.format("json").load("spark/sql/people.json")

  結(jié)果:

  

?

4.數(shù)據(jù)展示

  df.show()

  結(jié)果:

  

?

5.數(shù)據(jù)注冊成臨時表并操作展示

  

  結(jié)果:

  

?

6.和上面的方法等效的方式

  sqlContext.sql("select * from json.`spark/sql/people.json`").show()

  結(jié)果:

  

?

7.讀取顯示parquet格式的數(shù)據(jù)

  sqlContext.read.format("parquet").load("spark/sql/users.parquet").show()

  結(jié)果:

  

?

8.加載mysql中的數(shù)據(jù)

  這個是服務器上的mysql。

  sqlContext.read.jdbc("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/mysql?user=root&password=123456", "user", new java.util.Properties()).show()

  這個地方比較特殊。

  第一次使用bin/spark-shell進入后,使用命令,效果如下:

  

  然后使用這種方式進行啟動,加上jar

  ?bin/spark-shell --jars /opt/softwares/mysql-connector-java-5.1.27-bin.jar --driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar

  

?

九:DataFrameWriter編程模式

功能:將DataFrame的數(shù)據(jù)寫出到外部數(shù)據(jù)源

?

1.源碼主要方法

mode: 給定數(shù)據(jù)輸出的模式
  `overwrite`: overwrite the existing data.
  `append`: append the data.
  `ignore`: ignore the operation (i.e. no-op).
  `error`: default option, throw an exception at runtime.
format:給定輸出文件所屬類型, eg: parquet、json
option: 給定參數(shù)
partitionBy:給定分區(qū)字段(要求輸出的文件類型支持數(shù)據(jù)分區(qū))
save: 觸發(fā)數(shù)據(jù)保存操作 --> 當該API被調(diào)用后,數(shù)據(jù)已經(jīng)寫出到具體的數(shù)據(jù)保存位置了
jdbc:將數(shù)據(jù)輸出到關系型數(shù)據(jù)庫
  當mode為append的時候,數(shù)據(jù)追加方式是:
    先將表中的所有索引刪除
    再追加數(shù)據(jù)

  沒法實現(xiàn),數(shù)據(jù)不存在就添加,存在就更新的需求

?

十:writer的程序測試

?1.讀取hive數(shù)據(jù),形成DateFrame

  

?

2.結(jié)果保存為json格式

  自動創(chuàng)建存儲目錄。

  

  效果:

  

?

3.不再詳細粘貼結(jié)果了

1 讀取Hive表數(shù)據(jù)形成DataFrame 2 val df = sqlContext.read.table("common.emp") 3 4 結(jié)果保存json格式 5 df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json") 6 df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json") 7 df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json") 8 df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\ 9 上面雖然在追加的時候加上了sal,但是解析沒有問題 10 sqlContext.read.format("json").load("/beifeng/result/json").show() 11 12 結(jié)果保存parquet格式 13 df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01") 14 df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal導致解析失敗,讀取數(shù)據(jù)的時候 15 16 sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100) 17 sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100) 18 19 partitionBy按照給定的字段進行分區(qū) 20 df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02") 21 sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)

?

?

?

?

  

?

轉(zhuǎn)載于:https://www.cnblogs.com/juncaoit/p/6777648.html

總結(jié)

以上是生活随笔為你收集整理的040 DataFrame中的write与read编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。