日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

040 DataFrame中的write与read编程

發布時間:2025/7/14 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 040 DataFrame中的write与read编程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一:SparkSQL支持的外部數據源

1.支持情況

  

?

2.External LIbraries

  不是內嵌的,看起來不支持。

  但是現在已經有很多開源插件,可以進行支持。

?

3.參考材料

·  支持的格式:https://github.com/databricks

?

二:準備

1.啟動服務

  RunJar是metastore服務,在hive那邊開啟。

  只需要啟動三個服務就可以了,以后runjar都要啟動,因為這里使用hive與spark集成了,不啟動這個服務,就會總是報錯。

  

?

2.啟動spark-shell

  

?

三:測試檢驗程序

1.DataFrame的構成

  

?

2.結果

  

?

3.測試

?  

?

4.結果

  

?

四:DataFrame的創建

1.創建SQLContext

  val sqlContext=new SQLContext(sc)

2.創建DataFrame(兩種方式)

  val df=sqlContext.#

  val df=sqlContext.read.#

3.DataFrame數據轉換

  val ndf=df.#.#

4.結果保存

  ndf.#

  ndf.write.#

?

五:DataFrame的保存

1.第一種方式

  將DataFrame轉換為RDD,RDD數據保存

?

2.第二種方式

  直接通過DataFrame的write屬性將數據寫出。

  但是有限制,必須有定義類實現,默認情況:SparkSQL只支持parquet,json,jdbc

?

六:兩個常用的網站(數據源問題)

1.金磚公司提供的一些插件

  

?

2.package網址

  https://spark-packages.org/

  

?

七:DataFrameReader編程模式

功能: 通過SQLContext提供的reader讀取器讀取外部數據源的數據,并形成DataFrame

1.源碼的主要方法

  format:給定數據源數據格式類型,eg: json、parquet
  schema:給定讀入數據的數據schema,可以不給定,不給定的情況下,進行數據類型推斷
  option:添加參數,這些參數在數據解析的時候可能會用到
  load:
    有參數的指從參數給定的path路徑中加載數據,比如:JSON、Parquet...
    無參數的指直接加載數據(根據option相關的參數)
  jdbc:讀取關系型數據庫的數據
  json:讀取json格式數據
  parquet:讀取parquet格式數據
  orc: 讀取orc格式數據
  table:直接讀取關聯的Hive數據庫中的對應表數據

?

八:Reader的程序測試

1.新建文件夾

  

?

2.上傳數據

  

?

3.加載json數據

  val df=sqlContext.read.format("json").load("spark/sql/people.json")

  結果:

  

?

4.數據展示

  df.show()

  結果:

  

?

5.數據注冊成臨時表并操作展示

  

  結果:

  

?

6.和上面的方法等效的方式

  sqlContext.sql("select * from json.`spark/sql/people.json`").show()

  結果:

  

?

7.讀取顯示parquet格式的數據

  sqlContext.read.format("parquet").load("spark/sql/users.parquet").show()

  結果:

  

?

8.加載mysql中的數據

  這個是服務器上的mysql。

  sqlContext.read.jdbc("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/mysql?user=root&password=123456", "user", new java.util.Properties()).show()

  這個地方比較特殊。

  第一次使用bin/spark-shell進入后,使用命令,效果如下:

  

  然后使用這種方式進行啟動,加上jar

  ?bin/spark-shell --jars /opt/softwares/mysql-connector-java-5.1.27-bin.jar --driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar

  

?

九:DataFrameWriter編程模式

功能:將DataFrame的數據寫出到外部數據源

?

1.源碼主要方法

mode: 給定數據輸出的模式
  `overwrite`: overwrite the existing data.
  `append`: append the data.
  `ignore`: ignore the operation (i.e. no-op).
  `error`: default option, throw an exception at runtime.
format:給定輸出文件所屬類型, eg: parquet、json
option: 給定參數
partitionBy:給定分區字段(要求輸出的文件類型支持數據分區)
save: 觸發數據保存操作 --> 當該API被調用后,數據已經寫出到具體的數據保存位置了
jdbc:將數據輸出到關系型數據庫
  當mode為append的時候,數據追加方式是:
    先將表中的所有索引刪除
    再追加數據

  沒法實現,數據不存在就添加,存在就更新的需求

?

十:writer的程序測試

?1.讀取hive數據,形成DateFrame

  

?

2.結果保存為json格式

  自動創建存儲目錄。

  

  效果:

  

?

3.不再詳細粘貼結果了

1 讀取Hive表數據形成DataFrame 2 val df = sqlContext.read.table("common.emp") 3 4 結果保存json格式 5 df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json") 6 df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json") 7 df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json") 8 df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\ 9 上面雖然在追加的時候加上了sal,但是解析沒有問題 10 sqlContext.read.format("json").load("/beifeng/result/json").show() 11 12 結果保存parquet格式 13 df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01") 14 df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal導致解析失敗,讀取數據的時候 15 16 sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100) 17 sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100) 18 19 partitionBy按照給定的字段進行分區 20 df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02") 21 sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)

?

?

?

?

  

?

轉載于:https://www.cnblogs.com/juncaoit/p/6777648.html

總結

以上是生活随笔為你收集整理的040 DataFrame中的write与read编程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。