日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark学习笔记:数据读取和保存

發布時間:2024/1/17 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark学习笔记:数据读取和保存 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

spark所支持的文件格式

?

1.文本文件

在 Spark 中讀寫文本文件很容易。

當我們將一個文本文件讀取為?RDD?時,輸入的每一行?都會成為?RDD 的 一個元素

也可以將多個完整的文本文件一次性讀取為一個 pair RDD, 其中鍵是文件名,值是文件內容

?在 Scala 中讀取一個文本文件

1

2

val?inputFile?=?"file:///home/common/coding/coding/Scala/word-count/test.segmented"

val?textFile?=?sc.textFile(inputFile)

?在 Scala 中讀取給定目錄中的所有文件

1

val?input?=?sc.wholeTextFiles("file:///home/common/coding/coding/Scala/word-count")

?保存文本文件,Spark 將傳入的路徑作為目錄對待,會在那個目錄下輸出多個文件

1

2

textFile.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/writeback")

//textFile.repartition(1).saveAsTextFile 就能保存成一個文件

對于dataFrame文件,先使用.toJavaRDD?轉換成RDD,然后再使用 ?coalesce(1).saveAsTextFile

?

2.JSON

JSON 是一種使用較廣的半結構化數據格式。

讀取JSON,書中代碼有問題所以找了另外的一段讀取JSON的代碼

?build.sbt

1

"org.json4s"?%%?"json4s-jackson"?%?"3.2.11"

?代碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

import?org.apache.spark.SparkContext

import?org.apache.spark.SparkContext._

import?org.apache.spark.SparkConf

import?org.json4s._

import?org.json4s.jackson.JsonMethods._

import?org.json4s.jackson.Serialization

import?org.json4s.jackson.Serialization.{read, write}

?

/**

??* Created by common on 17-4-3.

??*/

?

case?class?Person(firstName:?String, lastName:?String, address:?List[Address]) {

??override?def?toString?=?s"Person(firstName=$firstName, lastName=$lastName, address=$address)"

}

?

case?class?Address(line1:?String, city:?String, state:?String, zip:?String) {

??override?def?toString?=?s"Address(line1=$line1, city=$city, state=$state, zip=$zip)"

}

?

object?WordCount {

??def?main(args:?Array[String]) {

????val?inputJsonFile?=?"file:///home/common/coding/coding/Scala/word-count/test.json"

????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local")

????val?sc?=?new?SparkContext(conf)

????val?input5?=?sc.textFile(inputJsonFile)

????val?dataObjsRDD?=?input5.map { myrecord?=>

??????implicit?val?formats?=?DefaultFormats

??????// Workaround as????? DefaultFormats is not serializable

??????val?jsonObj?=?parse(myrecord)

??????//val addresses = jsonObj \ "address"

??????//println((addresses(0) \ "city").extract[String])

??????jsonObj.extract[Person]

????}

????dataObjsRDD.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/test1.json")

?

??}

?

?

}

?讀取的JSON文件

1

2

{"firstName":"John","lastName":"Smith","address":[{"line1":"1 main street","city":"San Francisco","state":"CA","zip":"94101"},{"line1":"1 main street","city":"sunnyvale","state":"CA","zip":"94000"}]}

{"firstName":"Tim","lastName":"Williams","address":[{"line1":"1 main street","city":"Mountain View","state":"CA","zip":"94300"},{"line1":"1 main street","city":"San Jose","state":"CA","zip":"92000"}]}

?輸出的文件

1

2

Person(firstName=John, lastName=Smith, address=List(Address(line1=1?main street, city=San Francisco, state=CA, zip=94101), Address(line1=1?main street, city=sunnyvale, state=CA, zip=94000)))

Person(firstName=Tim, lastName=Williams, address=List(Address(line1=1?main street, city=Mountain View, state=CA, zip=94300), Address(line1=1?main street, city=San Jose, state=CA, zip=92000)))

?

3.逗號分割值與制表符分隔值

逗號分隔值(CSV)文件每行都有固定數目的字段,字段間用逗號隔開(在制表符分隔值文件,即 TSV 文 件中用制表符隔開)。

如果恰好CSV 的所有數據字段均沒有包含換行符,你也可以使用?textFile()?讀取并解析數據,

build.sbt

1

"au.com.bytecode"?%?"opencsv"?%?"2.4"

3.1 讀取CSV文件

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

import?java.io.StringReader

?

import?org.apache.spark.SparkContext

import?org.apache.spark.SparkContext._

import?org.apache.spark.SparkConf

import?org.json4s._

import?org.json4s.jackson.JsonMethods._

import?org.json4s.jackson.Serialization

import?org.json4s.jackson.Serialization.{read, write}

import?au.com.bytecode.opencsv.CSVReader

?

/**

??* Created by common on 17-4-3.

??*/

?

object?WordCount {

??def?main(args:?Array[String]) {

?

????val?input?=?sc.textFile("/home/common/coding/coding/Scala/word-count/sample_map.csv")

????val?result6?=?input.map{ line?=>

??????val?reader?=?new?CSVReader(new?StringReader(line));

??????reader.readNext();

????}

????for(result <- result6){

??????for(re <- result){

????????println(re)

??????}

????}

?

??}

?

}

?CSV文件內容

輸出

1

2

3

4

5

6

0

Front Left

/usr/share/alsa/samples/Front_Left.wav

1

Front Right

/usr/share/alsa/samples/Front_Right.wav

?

如果在字段中嵌有換行符,就需要完整讀入每個文件,然后解析各段。如果每個文件都很大,讀取和解析的過程可能會很不幸地成為性能瓶頸。

?代碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import?java.io.StringReader

?

import?org.apache.spark.SparkContext

import?org.apache.spark.SparkContext._

import?org.apache.spark.SparkConf

import?org.json4s._

import?org.json4s.jackson.JsonMethods._

import?org.json4s.jackson.Serialization

import?org.json4s.jackson.Serialization.{read, write}

import?scala.collection.JavaConversions._

import?au.com.bytecode.opencsv.CSVReader

?

/**

??* Created by common on 17-4-3.

??*/

?

case?class?Data(index:?String, title:?String, content:?String)

?

object?WordCount {

??def?main(args:?Array[String]) {

?

????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local")

????val?sc?=?new?SparkContext(conf)

????val?input?=?sc.wholeTextFiles("/home/common/coding/coding/Scala/word-count/sample_map.csv")

????val?result?=?input.flatMap {?case?(_, txt)?=>

??????val?reader?=?new?CSVReader(new?StringReader(txt));

??????reader.readAll().map(x?=> Data(x(0), x(1), x(2)))

????}

????for(res <- result){

??????println(res)

????}

??}

?

}

?輸出

1

2

Data(0,Front Left,/usr/share/alsa/samples/Front_Left.wav)

Data(1,Front Right,/usr/share/alsa/samples/Front_Right.wav)

?或者

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

import?java.io.StringReader

?

import?org.apache.spark.SparkContext

import?org.apache.spark.SparkContext._

import?org.apache.spark.SparkConf

import?org.json4s._

import?org.json4s.jackson.JsonMethods._

import?org.json4s.jackson.Serialization

import?org.json4s.jackson.Serialization.{read, write}

import?scala.collection.JavaConversions._

import?au.com.bytecode.opencsv.CSVReader

?

/**

??* Created by common on 17-4-3.

??*/

?

case?class?Data(index:?String, title:?String, content:?String)

?

object?WordCount {

??def?main(args:?Array[String]) {

?

????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local")

????val?sc?=?new?SparkContext(conf)

????val?input?=?sc.wholeTextFiles("/home/common/coding/coding/Scala/word-count/sample_map.csv")  //wholeTextFiles讀出來是一個RDD(String,String)

????val?result?=?input.flatMap {?case?(_, txt)?=>

??????val?reader?=?new?CSVReader(new?StringReader(txt));

??????//reader.readAll().map(x => Data(x(0), x(1), x(2)))

??????reader.readAll()

????}

????result.collect().foreach(x?=> {

??????x.foreach(println); println("======")

????})

?

??}

}

?輸出

1

2

3

4

5

6

7

8

0

Front Left

/usr/share/alsa/samples/Front_Left.wav

======

1

Front Right

/usr/share/alsa/samples/Front_Right.wav

======

?

3.2 保存CSV

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import?java.io.{StringReader, StringWriter}

?

import?org.apache.spark.SparkContext

import?org.apache.spark.SparkContext._

import?org.apache.spark.SparkConf

import?org.json4s._

import?org.json4s.jackson.JsonMethods._

import?org.json4s.jackson.Serialization

import?org.json4s.jackson.Serialization.{read, write}

?

import?scala.collection.JavaConversions._

import?au.com.bytecode.opencsv.{CSVReader, CSVWriter}

?

/**

??* Created by common on 17-4-3.

??*/

?

case?class?Data(index:?String, title:?String, content:?String)

?

object?WordCount {

??def?main(args:?Array[String]) {

?

????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local")

????val?sc?=?new?SparkContext(conf)

????val?inputRDD?=?sc.parallelize(List(Data("index",?"title",?"content")))

????inputRDD.map(data?=> List(data.index, data.title, data.content).toArray)

??????.mapPartitions { data?=>

????????val?stringWriter?=?new?StringWriter();

????????val?csvWriter?=?new?CSVWriter(stringWriter);

????????csvWriter.writeAll(data.toList)

????????Iterator(stringWriter.toString)

??????}.saveAsTextFile("/home/common/coding/coding/Scala/word-count/sample_map_out")

??}

}

?輸出

1

"index","title","content"

?

4.SequenceFile?是由沒有相對關系結構的鍵值對文件組成的常用 Hadoop 格式。

SequenceFile 文件有同步標記, Spark 可 以用它來定位到文件中的某個點,然后再與記錄的邊界對齊。這可以讓 Spark 使 用多個節點高效地并行讀取 SequenceFile 文件。SequenceFile 也是Hadoop MapReduce 作 業中常用的輸入輸出格式,所以如果你在使用一個已有的 Hadoop 系統,數據很有可能是以 S equenceFile 的格式供你使用的。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

import?org.apache.hadoop.io.{IntWritable, Text}

import?org.apache.spark.SparkContext

import?org.apache.spark.SparkContext._

import?org.apache.spark.SparkConf

?

/**

??* Created by common on 17-4-6.

??*/

object?SparkRDD {

?

??def?main(args:?Array[String]) {

????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local")

????val?sc?=?new?SparkContext(conf)

?

????//寫sequenceFile,

????val?rdd?=?sc.parallelize(List(("Panda",?3), ("Kay",?6), ("Snail",?2)))

????rdd.saveAsSequenceFile("output")

?

????//讀sequenceFile

????val?output?=?sc.sequenceFile("output", classOf[Text], classOf[IntWritable]).

??????map{case?(x, y)?=> (x.toString, y.get())}

????output.foreach(println)

?

??}

}

?

?

5.對象文件

對象文件看起來就像是對 SequenceFile 的簡單封裝,它允許存儲只包含值的 RDD。和 SequenceFile 不一樣的是,對象文件是使用 Java 序列化寫出的。

如果你修改了你的類——比如增減了幾個字段——已經生成的對象文件就不再可讀了。

讀取文件——用 SparkContext 中的 objectFile() 函數接收一個路徑,返回對應的 RDD。

寫入文件——要 保存對象文件, 只需在 RDD 上調用 saveAsObjectFile

?

6.Hadoop輸入輸出格式

除了 Spark 封裝的格式之外,也可以與任何 Hadoop 支持的格式交互。Spark 支持新舊兩套Hadoop 文件 API,提供了很大的靈活性。

舊的API:hadoopFile,使用舊的 API 實現的 Hadoop 輸入格式

新的API:newAPIHadoopFile?
接收一個路徑以及三個類。第一個類是“格式”類,代表輸入格式。第二個類是鍵的類,最后一個類是值的類。如果需要設定額外的 H adoop 配置屬性,也可以傳入一個 conf 對象。

KeyValueTextInputFormat 是最簡單的 Hadoop 輸入格式之一,可以用于從文本文件中讀取鍵值對數據。每一行都會被獨立處理,鍵和值之間用制表符隔開。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

import?org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}

import?org.apache.spark.SparkContext

import?org.apache.spark.SparkConf

import?org.apache.spark._

import?org.apache.hadoop.mapreduce.Job

import?org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat

import?org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import?org.apache.spark.rdd._

?

/**

??* Created by common on 17-4-6.

??*/

object?SparkRDD {

?

??def?main(args:?Array[String]) {

????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local")

????val?sc?=?new?SparkContext(conf)

?

????//使用老式 API 讀取 KeyValueTextInputFormat(),以JSON文件為例子

????//注意引用的包是org.apache.hadoop.mapred.KeyValueTextInputFormat

//??? val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("input/test.json").map {

//????? case (x, y) => (x.toString, y.toString)

//??? }

//??? input.foreach(println)

?

????// 讀取文件,使用新的API,注意引用的包是org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat

????val?job?=?new?Job()

????val?data?=?sc.newAPIHadoopFile("input/test.json"?,

??????classOf[KeyValueTextInputFormat],

??????classOf[Text],

??????classOf[Text],

??????job.getConfiguration)

????data.foreach(println)

?

????//保存文件,注意引用的包是org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

????data.saveAsNewAPIHadoopFile(

??????"input/test1.json",

??????classOf[Text],

??????classOf[Text],

??????classOf[TextOutputFormat[Text,Text]],

??????job.getConfiguration)

?

??}

}

?

?

Hadoop 的非文件系統數據源

除 了?hadoopFile() 和 saveAsHadoopFile()?這 一 大 類 函 數, 還 可 以 使 用?hadoopDataset/saveAsHadoopDataSet 和 newAPIHadoopDataset/ saveAsNewAPIHadoopDataset?來訪問 Hadoop 所支持的非文件系統的存儲格式。例如,許多像?HBase 和 MongoDB?這樣的鍵值對存儲都提供了用來直接讀取 Hadoop 輸入格式的接口。我們可以在 Spark 中很方便地使用這些格式。

?

7.文件壓縮

Spark 原生的輸入方式( textFile 和 sequenceFile)可以自動處理一些類型的壓縮。在讀取壓縮后的數據時,一些壓縮編解碼器可以推測壓縮類型。?
這些壓縮選項只適用于支持壓縮的 Hadoop 格式,也就是那些寫出到文件系統的格式。寫入數據庫的 Hadoop 格式一般沒有實現壓縮支持。如果數據庫中有壓縮過的記錄,那應該是數據庫自己配置的。

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Spark学习笔记:数据读取和保存的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。