15.RDD 创建内幕解析
第15課:RDD創建內幕
Spark應用程序運行過程中,第一個RDD代表了Spark應用程序輸入數據的來源,之后通過Trasformation來對RDD進行各種算子的轉換,來實現具體的算法
Spark中的基本方式:
1)?????? 使用程序中的集合創建
這種方式的實際意義主要用于測試。
2)?????? 使用本地文件系統創建
這種方式的實際意義主要用于測試大量數據的文件
3)?????? 使用HDFS創建RDD
這種方式為生產環境中最常用的創建RDD的方式
4)?????? 基于DB創建
5)?????? 基于NoSQL:例如HBase
6)?????? 基于S3(SC3)創建
7)?????? 基于數據流創建
1)?????? 通過集合創建
代碼:
object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
val sc =new SparkContext(conf)
val numbers = 1 to 100? //創建一個Scala集合
val rdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_)? //1+2=3 3+3=6 6+4=10
println("1+2+...+99+100"+"="+sum)
? }
}
結果:
?
2)?????? 通過本地文件系統創建
代碼:
object RDDBasedOnLocalFile {def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
val sc =new SparkContext(conf)
val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt")
val linesLength=rdd.map(line=>line.length())
val sum = linesLength.reduce(_+_)
println("the total characters of the file"+"="+sum)
}
}
結果:
?
3)?????? 通過HDFS創建RDD
代碼:
?val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)
結果:
?
關于spark并行度:
1.默認并行度為程序分配到的cpu core的數目
2.可以手動設置并行度,并行度最佳實踐
???????? 1. 2-4 partitions for each CPU core
???????? 2.綜合考慮cpu和 內存
?
注:本內容原型來自 IMP 課程筆記
如果技術上有什么疑問,歡迎加我QQ交流: 1106373297?轉載于:https://www.cnblogs.com/zhouyf/p/5424158.html
總結
以上是生活随笔為你收集整理的15.RDD 创建内幕解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringMVC一例 是否需要重定向
- 下一篇: EventSource