日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(十三):Spark Core的RDD创建

發(fā)布時間:2023/11/28 生活经验 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(十三):Spark Core的RDD创建 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?

RDD的創(chuàng)建

官方文檔:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

如何將數(shù)據(jù)封裝到RDD集合中,主要有兩種方式:并行化本地集合(Driver Program中)和引用加載外部存儲系統(tǒng)(如HDFS、Hive、HBase、Kafka、Elasticsearch等)數(shù)據(jù)集

?

?

?

?

并行化集合

由一個已經(jīng)存在的 Scala 集合創(chuàng)建,集合并行化,集合必須時Seq本身或者子類對象。

?

?

演示范例代碼,從List列表構(gòu)建RDD集合:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** Spark 采用并行化的方式構(gòu)建Scala集合Seq中的數(shù)據(jù)為RDD* ?- 將Scala集合轉(zhuǎn)換為RDD* ?????sc.parallelize(seq)* ?- 將RDD轉(zhuǎn)換為Scala中集合* ?????rdd.collect()* ?????rdd.collectAsMap()*/
object SparkParallelizeTest {def main(args: Array[String]): Unit = {// 創(chuàng)建應用程序入口SparkContext實例對象val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 1、Scala中集合Seq序列存儲數(shù)據(jù)val linesSeq: Seq[String] = Seq("hello me you her","hello you her","hello her","hello")// 2、并行化集合創(chuàng)建RDD數(shù)據(jù)集/*def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]*/val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)//val inputRDD: RDD[String] = sc.makeRDD(linesSeq, numSlices = 2)// 3、調(diào)用集合RDD中函數(shù)處理分析數(shù)據(jù)val resultRDD: RDD[(String, Int)] = inputRDD.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)// 4、保存結(jié)果RDD到外部存儲系統(tǒng)(HDFS、MySQL、HBase。。。。)resultRDD.foreach(println)// 應用程序運行結(jié)束,關閉資源sc.stop()}
}

?

外部存儲系統(tǒng)

由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有 Hadoop支持的數(shù)據(jù)集,比如 HDFS、Cassandra、HBase 等。實際使用最多的方法:textFile,讀取HDFS或LocalFS上文本文件,指定文件路徑和RDD分區(qū)數(shù)目。

?

范例演示:從文件系統(tǒng)讀取數(shù)據(jù),設置分區(qū)數(shù)目為2,代碼如下。

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 從HDFS/LocalFS文件系統(tǒng)加載文件數(shù)據(jù),封裝為RDD集合, 可以設置分區(qū)數(shù)目* ?- 從文件系統(tǒng)加載* ?????sc.textFile("")* ?- 保存文件系統(tǒng)* ?????rdd.saveAsTextFile("")*/
object SparkFileSystemTest {def main(args: Array[String]): Unit = {// 創(chuàng)建應用程序入口SparkContext實例對象val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 1、從文件系統(tǒng)加載數(shù)據(jù),創(chuàng)建RDD數(shù)據(jù)集/*def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String]*/val inputRDD: RDD[String] = sc.textFile("data/input/words.txt",2)println(s"Partitions Number : ${inputRDD.getNumPartitions}")// 2、調(diào)用集合RDD中函數(shù)處理分析數(shù)據(jù)val resultRDD: RDD[(String, Int)] = inputRDD.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)// 3、保存結(jié)果RDD到外部存儲系統(tǒng)(HDFS、MySQL、HBase。。。。)resultRDD.foreach(println)// 應用程序運行結(jié)束,關閉資源sc.stop()}}

其中文件路徑:可以指定文件名稱,可以指定文件目錄,可以使用通配符指定。

?

小文件讀取

?????在實際項目中,有時往往處理的數(shù)據(jù)文件屬于小文件(每個文件數(shù)據(jù)數(shù)據(jù)量很小,比如KB,幾十MB等),文件數(shù)量又很大,如果一個個文件讀取為RDD的一個個分區(qū),計算數(shù)據(jù)時很耗時性能低下,使用SparkContext中提供:wholeTextFiles類,專門讀取小文件數(shù)據(jù)。

?

范例演示:讀取10個小文件數(shù)據(jù),每個文件大小小于1MB,設置RDD分區(qū)數(shù)目為2。

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 采用SparkContext#wholeTextFiles()方法讀取小文件*/
object SparkWholeTextFileTest {def main(args: Array[String]): Unit = {// 創(chuàng)建應用程序入口SparkContext實例對象val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// wholeTextFiles()val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10", minPartitions = 2)filesRDD.map(_._1).foreach(println)val inputRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\n"))println(s"Partitions Number = ${inputRDD.getNumPartitions}")println(s"Count = ${inputRDD.count()}")// 應用程序運行結(jié)束,關閉資源sc.stop()}
}

?

實際項目中,可以先使用wholeTextFiles方法讀取數(shù)據(jù),設置適當RDD分區(qū),再將數(shù)據(jù)保存到文件系統(tǒng),以便后續(xù)應用讀取處理,大大提升性能。

總結(jié)

以上是生活随笔為你收集整理的2021年大数据Spark(十三):Spark Core的RDD创建的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。