Spark笔记:RDD基本操作(上)
本文主要是講解spark里RDD的基礎(chǔ)操作。RDD是spark特有的數(shù)據(jù)模型,談到RDD就會提到什么彈性分布式數(shù)據(jù)集,什么有向無環(huán)圖,本文暫時不去展開這些高深概念,在閱讀本文時候,大家可以就把RDD當作一個數(shù)組,這樣的理解對我們學(xué)習(xí)RDD的API是非常有幫助的。本文所有示例代碼都是使用scala語言編寫的。
Spark里的計算都是操作RDD進行,那么學(xué)習(xí)RDD的第一個問題就是如何構(gòu)建RDD,構(gòu)建RDD從數(shù)據(jù)來源角度分為兩類:第一類是從內(nèi)存里直接讀取數(shù)據(jù),第二類就是從文件系統(tǒng)里讀取,當然這里的文件系統(tǒng)種類很多常見的就是HDFS以及本地文件系統(tǒng)了。
第一類方式從內(nèi)存里構(gòu)造RDD,使用的方法:makeRDD和parallelize方法,如下代碼所示:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | /* 使用makeRDD創(chuàng)建RDD */ /* List */ val?rdd01?=?sc.makeRDD(List(1,2,3,4,5,6)) val?r01?=?rdd01.map { x?=> x * x } println(r01.collect().mkString(",")) /* Array */ val?rdd02?=?sc.makeRDD(Array(1,2,3,4,5,6)) val?r02?=?rdd02.filter { x?=> x <?5} println(r02.collect().mkString(",")) ? val?rdd03?=?sc.parallelize(List(1,2,3,4,5,6),?1) val?r03?=?rdd03.map { x?=> x +?1?} println(r03.collect().mkString(",")) /* Array */ val?rdd04?=?sc.parallelize(List(1,2,3,4,5,6),?1) val?r04?=?rdd04.filter { x?=> x >?3?} println(r04.collect().mkString(",")) |
?
大家看到了RDD本質(zhì)就是一個數(shù)組,因此構(gòu)造數(shù)據(jù)時候使用的是List(鏈表)和Array(數(shù)組)類型。
第二類方式是通過文件系統(tǒng)構(gòu)造RDD,代碼如下所示:
| 1 2 3 | val?rdd:RDD[String]?=?sc.textFile("file:///D:/sparkdata.txt",?1) val?r:RDD[String]?=?rdd.flatMap { x?=> x.split(",") } println(r.collect().mkString(",")) |
這里例子使用的是本地文件系統(tǒng),所以文件路徑協(xié)議前綴是file://。
構(gòu)造了RDD對象了,接下來就是如何操作RDD對象了,RDD的操作分為轉(zhuǎn)化操作(transformation)和行動操作(action),RDD之所以將操作分成這兩類這是和RDD惰性運算有關(guān),當RDD執(zhí)行轉(zhuǎn)化操作時候,實際計算并沒有被執(zhí)行,只有當RDD執(zhí)行行動操作時候才會促發(fā)計算任務(wù)提交,執(zhí)行相應(yīng)的計算操作。區(qū)別轉(zhuǎn)化操作和行動操作也非常簡單,轉(zhuǎn)化操作就是從一個RDD產(chǎn)生一個新的RDD操作,而行動操作就是進行實際的計算。
下面是RDD的基礎(chǔ)操作API介紹:
| 操作類型 | 函數(shù)名 | 作用 |
| 轉(zhuǎn)化操作 | map() | 參數(shù)是函數(shù),函數(shù)應(yīng)用于RDD每一個元素,返回值是新的RDD |
| flatMap() | 參數(shù)是函數(shù),函數(shù)應(yīng)用于RDD每一個元素,將元素數(shù)據(jù)進行拆分,變成迭代器,返回值是新的RDD | |
| filter() | 參數(shù)是函數(shù),函數(shù)會過濾掉不符合條件的元素,返回值是新的RDD | |
| distinct() | 沒有參數(shù),將RDD里的元素進行去重操作 | |
| union() | 參數(shù)是RDD,生成包含兩個RDD所有元素的新RDD | |
| intersection() | 參數(shù)是RDD,求出兩個RDD的共同元素 | |
| subtract() | 參數(shù)是RDD,將原RDD里和參數(shù)RDD里相同的元素去掉 | |
| cartesian() | 參數(shù)是RDD,求兩個RDD的笛卡兒積 | |
| 行動操作 | collect() | 返回RDD所有元素 |
| count() | RDD里元素個數(shù) | |
| countByValue() | 各元素在RDD中出現(xiàn)次數(shù) | |
| reduce() | 并行整合所有RDD數(shù)據(jù),例如求和操作 | |
| fold(0)(func) | 和reduce功能一樣,不過fold帶有初始值 | |
| aggregate(0)(seqOp,combop) | 和reduce功能一樣,但是返回的RDD數(shù)據(jù)類型和原RDD不一樣 | |
| foreach(func) | 對RDD每個元素都是使用特定函數(shù) |
下面是以上API操作的示例代碼,如下:
轉(zhuǎn)化操作:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | val?rddInt:RDD[Int]?=?sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) val?rddStr:RDD[String]?=?sc.parallelize(Array("a","b","c","d","b","a"),?1) val?rddFile:RDD[String]?=?sc.textFile(path,?1) ? val?rdd01:RDD[Int]?=?sc.makeRDD(List(1,3,5,3)) val?rdd02:RDD[Int]?=?sc.makeRDD(List(2,4,5,1)) ? /* map操作 */ println("======map操作======") println(rddInt.map(x?=> x +?1).collect().mkString(",")) println("======map操作======") /* filter操作 */ println("======filter操作======") println(rddInt.filter(x?=> x >?4).collect().mkString(",")) println("======filter操作======") /* flatMap操作 */ println("======flatMap操作======") println(rddFile.flatMap { x?=> x.split(",") }.first()) println("======flatMap操作======") /* distinct去重操作 */ println("======distinct去重======") println(rddInt.distinct().collect().mkString(",")) println(rddStr.distinct().collect().mkString(",")) println("======distinct去重======") /* union操作 */ println("======union操作======") println(rdd01.union(rdd02).collect().mkString(",")) println("======union操作======") /* intersection操作 */ println("======intersection操作======") println(rdd01.intersection(rdd02).collect().mkString(",")) println("======intersection操作======") /* subtract操作 */ println("======subtract操作======") println(rdd01.subtract(rdd02).collect().mkString(",")) println("======subtract操作======") /* cartesian操作 */ println("======cartesian操作======") println(rdd01.cartesian(rdd02).collect().mkString(",")) println("======cartesian操作======") |
行動操作代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | val?rddInt:RDD[Int]?=?sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) val?rddStr:RDD[String]?=?sc.parallelize(Array("a","b","c","d","b","a"),?1) ? /* count操作 */ println("======count操作======") println(rddInt.count()) println("======count操作======")?? /* countByValue操作 */ println("======countByValue操作======") println(rddInt.countByValue()) println("======countByValue操作======") /* reduce操作 */ println("======countByValue操作======") println(rddInt.reduce((x ,y)?=> x + y)) println("======countByValue操作======") /* fold操作 */ println("======fold操作======") println(rddInt.fold(0)((x ,y)?=> x + y)) println("======fold操作======") /* aggregate操作 */ println("======aggregate操作======") val?res:(Int,Int)?=?rddInt.aggregate((0,0))((x,y)?=> (x._1?+ x._2,y),(x,y)?=> (x._1?+ x._2,y._1?+ y._2)) println(res._1?+?","?+ res._2) println("======aggregate操作======") /* foeach操作 */ println("======foeach操作======") println(rddStr.foreach { x?=> println(x) }) println("======foeach操作======") |
RDD操作暫時先學(xué)習(xí)到這里,剩下的內(nèi)容在下一篇里再談了,下面我要說說如何開發(fā)spark,安裝spark的內(nèi)容我后面會使用專門的文章進行講解,這里我們假設(shè)已經(jīng)安裝好了spark,那么我們就可以在已經(jīng)裝好的spark服務(wù)器上使用spark-shell進行與spark交互的shell,這里我們直接可以敲打代碼編寫spark程序。但是spark-shell畢竟使用太麻煩,而且spark-shell一次只能使用一個用戶,當另外一個用戶要使用spark-shell就會把前一個用戶踢掉,而且shell也沒有IDE那種代碼補全,代碼校驗的功能,使用起來很是痛苦。
不過spark的確是一個神奇的框架,這里的神奇就是指spark本地開發(fā)調(diào)試非常簡單,本地開發(fā)調(diào)試不需要任何已經(jīng)裝好的spark系統(tǒng),我們只需要建立一個項目,這個項目可以是java的也可以是scala,然后我們將spark-assembly-1.6.1-hadoop2.6.0.jar這樣的jar放入項目的環(huán)境里,這個時候我們就可以在本地開發(fā)調(diào)試spark程序了。
大家請看我們裝有scala插件的eclipse里的完整代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | package?cn.com.sparktest ? import?org.apache.spark.SparkConf import?org.apache.spark.SparkConf import?org.apache.spark.SparkContext import?org.apache.spark.rdd.RDD ? object?SparkTest { ??val?conf:SparkConf?=?new?SparkConf().setAppName("xtq").setMaster("local[2]") ??val?sc:SparkContext?=?new?SparkContext(conf) ??? ??/** ???* 創(chuàng)建數(shù)據(jù)的方式--從內(nèi)存里構(gòu)造數(shù)據(jù)(基礎(chǔ)) ???*/ ??def?createDataMethod():Unit?=?{ ????/* 使用makeRDD創(chuàng)建RDD */ ????/* List */ ????val?rdd01?=?sc.makeRDD(List(1,2,3,4,5,6)) ????val?r01?=?rdd01.map { x?=> x * x } ????println("===================createDataMethod:makeRDD:List=====================") ????println(r01.collect().mkString(",")) ????println("===================createDataMethod:makeRDD:List=====================") ????/* Array */ ????val?rdd02?=?sc.makeRDD(Array(1,2,3,4,5,6)) ????val?r02?=?rdd02.filter { x?=> x <?5} ????println("===================createDataMethod:makeRDD:Array=====================") ????println(r02.collect().mkString(",")) ????println("===================createDataMethod:makeRDD:Array=====================") ????? ????/* 使用parallelize創(chuàng)建RDD */ ????/* List */ ????val?rdd03?=?sc.parallelize(List(1,2,3,4,5,6),?1) ????val?r03?=?rdd03.map { x?=> x +?1?} ????println("===================createDataMethod:parallelize:List=====================") ????println(r03.collect().mkString(",")) ????println("===================createDataMethod:parallelize:List=====================") ????/* Array */ ????val?rdd04?=?sc.parallelize(List(1,2,3,4,5,6),?1) ????val?r04?=?rdd04.filter { x?=> x >?3?} ????println("===================createDataMethod:parallelize:Array=====================") ????println(r04.collect().mkString(",")) ????println("===================createDataMethod:parallelize:Array=====================") ??} ??? ??/** ???* 創(chuàng)建Pair Map ???*/ ??def?createPairRDD():Unit?=?{ ????val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("key01",1),("key02",2),("key03",3))) ????val?r:RDD[String]?=?rdd.keys ????println("===========================createPairRDD=================================") ????println(r.collect().mkString(",")) ????println("===========================createPairRDD=================================") ??} ??? ??/** ???* 通過文件創(chuàng)建RDD ???* 文件數(shù)據(jù): ???*??? key01,1,2.3 ??????????key02,5,3.7 ??????key03,23,4.8 ??????key04,12,3.9 ??????key05,7,1.3 ???*/ ??def?createDataFromFile(path:String):Unit?=?{ ????val?rdd:RDD[String]?=?sc.textFile(path,?1) ????val?r:RDD[String]?=?rdd.flatMap { x?=> x.split(",") } ????println("=========================createDataFromFile==================================") ????println(r.collect().mkString(",")) ????println("=========================createDataFromFile==================================") ??} ??? ??/** ???* 基本的RDD操作 ???*/ ??def?basicTransformRDD(path:String):Unit?=?{ ????val?rddInt:RDD[Int]?=?sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) ????val?rddStr:RDD[String]?=?sc.parallelize(Array("a","b","c","d","b","a"),?1) ????val?rddFile:RDD[String]?=?sc.textFile(path,?1) ????? ????val?rdd01:RDD[Int]?=?sc.makeRDD(List(1,3,5,3)) ????val?rdd02:RDD[Int]?=?sc.makeRDD(List(2,4,5,1)) ? ????/* map操作 */ ????println("======map操作======") ????println(rddInt.map(x?=> x +?1).collect().mkString(",")) ????println("======map操作======") ????/* filter操作 */ ????println("======filter操作======") ????println(rddInt.filter(x?=> x >?4).collect().mkString(",")) ????println("======filter操作======") ????/* flatMap操作 */ ????println("======flatMap操作======") ????println(rddFile.flatMap { x?=> x.split(",") }.first()) ????println("======flatMap操作======") ????/* distinct去重操作 */ ????println("======distinct去重======") ????println(rddInt.distinct().collect().mkString(",")) ????println(rddStr.distinct().collect().mkString(",")) ????println("======distinct去重======") ????/* union操作 */ ????println("======union操作======") ????println(rdd01.union(rdd02).collect().mkString(",")) ????println("======union操作======") ????/* intersection操作 */ ????println("======intersection操作======") ????println(rdd01.intersection(rdd02).collect().mkString(",")) ????println("======intersection操作======") ????/* subtract操作 */ ????println("======subtract操作======") ????println(rdd01.subtract(rdd02).collect().mkString(",")) ????println("======subtract操作======") ????/* cartesian操作 */ ????println("======cartesian操作======") ????println(rdd01.cartesian(rdd02).collect().mkString(",")) ????println("======cartesian操作======")??? ??} ??? ??/** ???* 基本的RDD行動操作 ???*/ ??def?basicActionRDD():Unit?=?{ ????val?rddInt:RDD[Int]?=?sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) ????val?rddStr:RDD[String]?=?sc.parallelize(Array("a","b","c","d","b","a"),?1) ????? ????/* count操作 */ ????println("======count操作======") ????println(rddInt.count()) ????println("======count操作======")?? ????/* countByValue操作 */ ????println("======countByValue操作======") ????println(rddInt.countByValue()) ????println("======countByValue操作======") ????/* reduce操作 */ ????println("======countByValue操作======") ????println(rddInt.reduce((x ,y)?=> x + y)) ????println("======countByValue操作======") ????/* fold操作 */ ????println("======fold操作======") ????println(rddInt.fold(0)((x ,y)?=> x + y)) ????println("======fold操作======") ????/* aggregate操作 */ ????println("======aggregate操作======") ????val?res:(Int,Int)?=?rddInt.aggregate((0,0))((x,y)?=> (x._1?+ x._2,y),(x,y)?=> (x._1?+ x._2,y._1?+ y._2)) ????println(res._1?+?","?+ res._2) ????println("======aggregate操作======") ????/* foeach操作 */ ????println("======foeach操作======") ????println(rddStr.foreach { x?=> println(x) }) ????println("======foeach操作======")??? ??} ??? ??def?main(args:?Array[String]):?Unit?=?{ ????println(System.getenv("HADOOP_HOME")) ????createDataMethod() ????createPairRDD() ????createDataFromFile("file:///D:/sparkdata.txt") ????basicTransformRDD("file:///D:/sparkdata.txt") ????basicActionRDD() ????/*打印結(jié)果*/ ????/*D://hadoop ===================createDataMethod:makeRDD:List===================== 1,4,9,16,25,36 ===================createDataMethod:makeRDD:List===================== ===================createDataMethod:makeRDD:Array===================== 1,2,3,4 ===================createDataMethod:makeRDD:Array===================== ===================createDataMethod:parallelize:List===================== 2,3,4,5,6,7 ===================createDataMethod:parallelize:List===================== ===================createDataMethod:parallelize:Array===================== 4,5,6 ===================createDataMethod:parallelize:Array===================== ===========================createPairRDD================================= key01,key02,key03 ===========================createPairRDD================================= key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3 =========================createDataFromFile================================== 2,3,4,5,6,7,3,6,2 ======map操作====== ======filter操作====== 5,6,5 ======filter操作====== ======flatMap操作====== key01 ======flatMap操作====== ======distinct去重====== 4,6,2,1,3,5 ======distinct去重====== ======union操作====== 1,3,5,3,2,4,5,1 ======union操作====== ======intersection操作====== 1,5 ======intersection操作====== ======subtract操作====== 3,3 ======subtract操作====== ======cartesian操作====== (1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1) ======cartesian操作====== ======count操作====== 9 ======count操作====== ======countByValue操作====== Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1) ======countByValue操作====== ======countByValue操作====== 29 ======countByValue操作====== ======fold操作====== 29 ======fold操作====== ======aggregate操作====== 19,10 ======aggregate操作====== ======foeach操作====== a b c d b a ======foeach操作======*/ ??} } |
Spark執(zhí)行時候我們需要構(gòu)造一個SparkContenxt的環(huán)境變量,構(gòu)造環(huán)境變量時候需要構(gòu)造一個SparkConf對象,例如代碼:setAppName("xtq").setMaster("local[2]")
appName就是spark任務(wù)名稱,master為local[2]是指使用本地模式,啟動2個線程完成spark任務(wù)。
在eclipse里運行spark程序時候,會報出如下錯誤:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | java.io.IOException:?Could not locate executable?null\bin\winutils.exe in the Hadoop binaries. ????at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355) ????at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370) ????at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363) ????at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79) ????at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104) ????at org.apache.hadoop.security.Groups.<init>(Groups.java:86) ????at org.apache.hadoop.security.Groups.<init>(Groups.java:66) ????at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280) ????at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271) ????at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248) ????at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763) ????at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748) ????at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621) ????at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160) ????at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160) ????at scala.Option.getOrElse(Option.scala:120) ????at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160) ????at org.apache.spark.SparkContext.<init>(SparkContext.scala:322) ????at cn.com.sparktest.SparkTest$.<init>(SparkTest.scala:10) ????at cn.com.sparktest.SparkTest$.<clinit>(SparkTest.scala) ????at cn.com.sparktest.SparkTest.main(SparkTest.scala) |
該錯誤不會影響程序的運算,但總是讓人覺得不舒服,這個問題是因為spark運行依賴于hadoop,可是在window下其實是無法安裝hadoop,只能使用cygwin模擬安裝,而新版本的hadoop在windows下使用需要使用winutils.exe,解決這個問題很簡單,就是下載一個winutils.exe,注意下自己操作系統(tǒng)是32位還是64位,找到對應(yīng)版本,然后放置在這樣的目錄下:
D:\hadoop\bin\winutils.exe
然后再環(huán)境變量里定義HADOOP_HOME= D:\hadoop
環(huán)境變量的改變要重啟eclipse,這樣環(huán)境變量才會生效,這個時候程序運行就不會報出錯誤了。
總結(jié)
以上是生活随笔為你收集整理的Spark笔记:RDD基本操作(上)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark笔记:RDD基本操作(下)
- 下一篇: Spark笔记:复杂RDD的API的理解