Spark学习笔记:数据读取和保存
spark所支持的文件格式
?
1.文本文件
在 Spark 中讀寫文本文件很容易。
當我們將一個文本文件讀取為?RDD?時,輸入的每一行?都會成為?RDD 的 一個元素。
也可以將多個完整的文本文件一次性讀取為一個 pair RDD, 其中鍵是文件名,值是文件內容。
?在 Scala 中讀取一個文本文件
| 1 2 | val?inputFile?=?"file:///home/common/coding/coding/Scala/word-count/test.segmented" val?textFile?=?sc.textFile(inputFile) |
?在 Scala 中讀取給定目錄中的所有文件
| 1 | val?input?=?sc.wholeTextFiles("file:///home/common/coding/coding/Scala/word-count") |
?保存文本文件,Spark 將傳入的路徑作為目錄對待,會在那個目錄下輸出多個文件
| 1 2 | textFile.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/writeback") //textFile.repartition(1).saveAsTextFile 就能保存成一個文件 |
對于dataFrame文件,先使用.toJavaRDD?轉換成RDD,然后再使用 ?coalesce(1).saveAsTextFile
?
2.JSON
JSON 是一種使用較廣的半結構化數據格式。
讀取JSON,書中代碼有問題所以找了另外的一段讀取JSON的代碼
?build.sbt
| 1 | "org.json4s"?%%?"json4s-jackson"?%?"3.2.11" |
?代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | import?org.apache.spark.SparkContext import?org.apache.spark.SparkContext._ import?org.apache.spark.SparkConf import?org.json4s._ import?org.json4s.jackson.JsonMethods._ import?org.json4s.jackson.Serialization import?org.json4s.jackson.Serialization.{read, write} ? /** ??* Created by common on 17-4-3. ??*/ ? case?class?Person(firstName:?String, lastName:?String, address:?List[Address]) { ??override?def?toString?=?s"Person(firstName=$firstName, lastName=$lastName, address=$address)" } ? case?class?Address(line1:?String, city:?String, state:?String, zip:?String) { ??override?def?toString?=?s"Address(line1=$line1, city=$city, state=$state, zip=$zip)" } ? object?WordCount { ??def?main(args:?Array[String]) { ????val?inputJsonFile?=?"file:///home/common/coding/coding/Scala/word-count/test.json" ????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local") ????val?sc?=?new?SparkContext(conf) ????val?input5?=?sc.textFile(inputJsonFile) ????val?dataObjsRDD?=?input5.map { myrecord?=> ??????implicit?val?formats?=?DefaultFormats ??????// Workaround as????? DefaultFormats is not serializable ??????val?jsonObj?=?parse(myrecord) ??????//val addresses = jsonObj \ "address" ??????//println((addresses(0) \ "city").extract[String]) ??????jsonObj.extract[Person] ????} ????dataObjsRDD.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/test1.json") ? ??} ? ? } |
?讀取的JSON文件
| 1 2 | {"firstName":"John","lastName":"Smith","address":[{"line1":"1 main street","city":"San Francisco","state":"CA","zip":"94101"},{"line1":"1 main street","city":"sunnyvale","state":"CA","zip":"94000"}]} {"firstName":"Tim","lastName":"Williams","address":[{"line1":"1 main street","city":"Mountain View","state":"CA","zip":"94300"},{"line1":"1 main street","city":"San Jose","state":"CA","zip":"92000"}]} |
?輸出的文件
| 1 2 | Person(firstName=John, lastName=Smith, address=List(Address(line1=1?main street, city=San Francisco, state=CA, zip=94101), Address(line1=1?main street, city=sunnyvale, state=CA, zip=94000))) Person(firstName=Tim, lastName=Williams, address=List(Address(line1=1?main street, city=Mountain View, state=CA, zip=94300), Address(line1=1?main street, city=San Jose, state=CA, zip=92000))) |
?
3.逗號分割值與制表符分隔值
逗號分隔值(CSV)文件每行都有固定數目的字段,字段間用逗號隔開(在制表符分隔值文件,即 TSV 文 件中用制表符隔開)。
如果恰好CSV 的所有數據字段均沒有包含換行符,你也可以使用?textFile()?讀取并解析數據,
build.sbt
| 1 | "au.com.bytecode"?%?"opencsv"?%?"2.4" |
3.1 讀取CSV文件
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import?java.io.StringReader ? import?org.apache.spark.SparkContext import?org.apache.spark.SparkContext._ import?org.apache.spark.SparkConf import?org.json4s._ import?org.json4s.jackson.JsonMethods._ import?org.json4s.jackson.Serialization import?org.json4s.jackson.Serialization.{read, write} import?au.com.bytecode.opencsv.CSVReader ? /** ??* Created by common on 17-4-3. ??*/ ? object?WordCount { ??def?main(args:?Array[String]) { ? ????val?input?=?sc.textFile("/home/common/coding/coding/Scala/word-count/sample_map.csv") ????val?result6?=?input.map{ line?=> ??????val?reader?=?new?CSVReader(new?StringReader(line)); ??????reader.readNext(); ????} ????for(result <- result6){ ??????for(re <- result){ ????????println(re) ??????} ????} ? ??} ? } |
?CSV文件內容
輸出
| 1 2 3 4 5 6 | 0 Front Left /usr/share/alsa/samples/Front_Left.wav 1 Front Right /usr/share/alsa/samples/Front_Right.wav |
?
如果在字段中嵌有換行符,就需要完整讀入每個文件,然后解析各段。如果每個文件都很大,讀取和解析的過程可能會很不幸地成為性能瓶頸。
?代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import?java.io.StringReader ? import?org.apache.spark.SparkContext import?org.apache.spark.SparkContext._ import?org.apache.spark.SparkConf import?org.json4s._ import?org.json4s.jackson.JsonMethods._ import?org.json4s.jackson.Serialization import?org.json4s.jackson.Serialization.{read, write} import?scala.collection.JavaConversions._ import?au.com.bytecode.opencsv.CSVReader ? /** ??* Created by common on 17-4-3. ??*/ ? case?class?Data(index:?String, title:?String, content:?String) ? object?WordCount { ??def?main(args:?Array[String]) { ? ????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local") ????val?sc?=?new?SparkContext(conf) ????val?input?=?sc.wholeTextFiles("/home/common/coding/coding/Scala/word-count/sample_map.csv") ????val?result?=?input.flatMap {?case?(_, txt)?=> ??????val?reader?=?new?CSVReader(new?StringReader(txt)); ??????reader.readAll().map(x?=> Data(x(0), x(1), x(2))) ????} ????for(res <- result){ ??????println(res) ????} ??} ? } |
?輸出
| 1 2 | Data(0,Front Left,/usr/share/alsa/samples/Front_Left.wav) Data(1,Front Right,/usr/share/alsa/samples/Front_Right.wav) |
?或者
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | import?java.io.StringReader ? import?org.apache.spark.SparkContext import?org.apache.spark.SparkContext._ import?org.apache.spark.SparkConf import?org.json4s._ import?org.json4s.jackson.JsonMethods._ import?org.json4s.jackson.Serialization import?org.json4s.jackson.Serialization.{read, write} import?scala.collection.JavaConversions._ import?au.com.bytecode.opencsv.CSVReader ? /** ??* Created by common on 17-4-3. ??*/ ? case?class?Data(index:?String, title:?String, content:?String) ? object?WordCount { ??def?main(args:?Array[String]) { ? ????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local") ????val?sc?=?new?SparkContext(conf) ????val?input?=?sc.wholeTextFiles("/home/common/coding/coding/Scala/word-count/sample_map.csv") //wholeTextFiles讀出來是一個RDD(String,String) ????val?result?=?input.flatMap {?case?(_, txt)?=> ??????val?reader?=?new?CSVReader(new?StringReader(txt)); ??????//reader.readAll().map(x => Data(x(0), x(1), x(2))) ??????reader.readAll() ????} ????result.collect().foreach(x?=> { ??????x.foreach(println); println("======") ????}) ? ??} } |
?輸出
| 1 2 3 4 5 6 7 8 | 0 Front Left /usr/share/alsa/samples/Front_Left.wav ====== 1 Front Right /usr/share/alsa/samples/Front_Right.wav ====== |
?
3.2 保存CSV
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import?java.io.{StringReader, StringWriter} ? import?org.apache.spark.SparkContext import?org.apache.spark.SparkContext._ import?org.apache.spark.SparkConf import?org.json4s._ import?org.json4s.jackson.JsonMethods._ import?org.json4s.jackson.Serialization import?org.json4s.jackson.Serialization.{read, write} ? import?scala.collection.JavaConversions._ import?au.com.bytecode.opencsv.{CSVReader, CSVWriter} ? /** ??* Created by common on 17-4-3. ??*/ ? case?class?Data(index:?String, title:?String, content:?String) ? object?WordCount { ??def?main(args:?Array[String]) { ? ????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local") ????val?sc?=?new?SparkContext(conf) ????val?inputRDD?=?sc.parallelize(List(Data("index",?"title",?"content"))) ????inputRDD.map(data?=> List(data.index, data.title, data.content).toArray) ??????.mapPartitions { data?=> ????????val?stringWriter?=?new?StringWriter(); ????????val?csvWriter?=?new?CSVWriter(stringWriter); ????????csvWriter.writeAll(data.toList) ????????Iterator(stringWriter.toString) ??????}.saveAsTextFile("/home/common/coding/coding/Scala/word-count/sample_map_out") ??} } |
?輸出
| 1 | "index","title","content" |
?
4.SequenceFile?是由沒有相對關系結構的鍵值對文件組成的常用 Hadoop 格式。
SequenceFile 文件有同步標記, Spark 可 以用它來定位到文件中的某個點,然后再與記錄的邊界對齊。這可以讓 Spark 使 用多個節點高效地并行讀取 SequenceFile 文件。SequenceFile 也是Hadoop MapReduce 作 業中常用的輸入輸出格式,所以如果你在使用一個已有的 Hadoop 系統,數據很有可能是以 S equenceFile 的格式供你使用的。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import?org.apache.hadoop.io.{IntWritable, Text} import?org.apache.spark.SparkContext import?org.apache.spark.SparkContext._ import?org.apache.spark.SparkConf ? /** ??* Created by common on 17-4-6. ??*/ object?SparkRDD { ? ??def?main(args:?Array[String]) { ????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local") ????val?sc?=?new?SparkContext(conf) ? ????//寫sequenceFile, ????val?rdd?=?sc.parallelize(List(("Panda",?3), ("Kay",?6), ("Snail",?2))) ????rdd.saveAsSequenceFile("output") ? ????//讀sequenceFile ????val?output?=?sc.sequenceFile("output", classOf[Text], classOf[IntWritable]). ??????map{case?(x, y)?=> (x.toString, y.get())} ????output.foreach(println) ? ??} } |
?
?
5.對象文件
對象文件看起來就像是對 SequenceFile 的簡單封裝,它允許存儲只包含值的 RDD。和 SequenceFile 不一樣的是,對象文件是使用 Java 序列化寫出的。
如果你修改了你的類——比如增減了幾個字段——已經生成的對象文件就不再可讀了。
讀取文件——用 SparkContext 中的 objectFile() 函數接收一個路徑,返回對應的 RDD。
寫入文件——要 保存對象文件, 只需在 RDD 上調用 saveAsObjectFile
?
6.Hadoop輸入輸出格式
除了 Spark 封裝的格式之外,也可以與任何 Hadoop 支持的格式交互。Spark 支持新舊兩套Hadoop 文件 API,提供了很大的靈活性。
舊的API:hadoopFile,使用舊的 API 實現的 Hadoop 輸入格式
新的API:newAPIHadoopFile?
接收一個路徑以及三個類。第一個類是“格式”類,代表輸入格式。第二個類是鍵的類,最后一個類是值的類。如果需要設定額外的 H adoop 配置屬性,也可以傳入一個 conf 對象。
KeyValueTextInputFormat 是最簡單的 Hadoop 輸入格式之一,可以用于從文本文件中讀取鍵值對數據。每一行都會被獨立處理,鍵和值之間用制表符隔開。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | import?org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text} import?org.apache.spark.SparkContext import?org.apache.spark.SparkConf import?org.apache.spark._ import?org.apache.hadoop.mapreduce.Job import?org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat import?org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import?org.apache.spark.rdd._ ? /** ??* Created by common on 17-4-6. ??*/ object?SparkRDD { ? ??def?main(args:?Array[String]) { ????val?conf?=?new?SparkConf().setAppName("WordCount").setMaster("local") ????val?sc?=?new?SparkContext(conf) ? ????//使用老式 API 讀取 KeyValueTextInputFormat(),以JSON文件為例子 ????//注意引用的包是org.apache.hadoop.mapred.KeyValueTextInputFormat //??? val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("input/test.json").map { //????? case (x, y) => (x.toString, y.toString) //??? } //??? input.foreach(println) ? ????// 讀取文件,使用新的API,注意引用的包是org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat ????val?job?=?new?Job() ????val?data?=?sc.newAPIHadoopFile("input/test.json"?, ??????classOf[KeyValueTextInputFormat], ??????classOf[Text], ??????classOf[Text], ??????job.getConfiguration) ????data.foreach(println) ? ????//保存文件,注意引用的包是org.apache.hadoop.mapreduce.lib.output.TextOutputFormat ????data.saveAsNewAPIHadoopFile( ??????"input/test1.json", ??????classOf[Text], ??????classOf[Text], ??????classOf[TextOutputFormat[Text,Text]], ??????job.getConfiguration) ? ??} } |
?
?
Hadoop 的非文件系統數據源
除 了?hadoopFile() 和 saveAsHadoopFile()?這 一 大 類 函 數, 還 可 以 使 用?hadoopDataset/saveAsHadoopDataSet 和 newAPIHadoopDataset/ saveAsNewAPIHadoopDataset?來訪問 Hadoop 所支持的非文件系統的存儲格式。例如,許多像?HBase 和 MongoDB?這樣的鍵值對存儲都提供了用來直接讀取 Hadoop 輸入格式的接口。我們可以在 Spark 中很方便地使用這些格式。
?
7.文件壓縮
Spark 原生的輸入方式( textFile 和 sequenceFile)可以自動處理一些類型的壓縮。在讀取壓縮后的數據時,一些壓縮編解碼器可以推測壓縮類型。?
這些壓縮選項只適用于支持壓縮的 Hadoop 格式,也就是那些寫出到文件系統的格式。寫入數據庫的 Hadoop 格式一般沒有實現壓縮支持。如果數據庫中有壓縮過的記錄,那應該是數據庫自己配置的。
總結
以上是生活随笔為你收集整理的Spark学习笔记:数据读取和保存的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark | scala | 线性代数
- 下一篇: spark向量