Spark笔记:RDD基本操作(下)
上一篇里我提到可以把RDD當作一個數組,這樣我們在學習spark的API時候很多問題就能很好理解了。上篇文章里的API也都是基于RDD是數組的數據模型而進行操作的。
Spark是一個計算框架,是對mapreduce計算框架的改進,mapreduce計算框架是基于鍵值對也就是map的形式,之所以使用鍵值對是人們發現世界上大部分計算都可以使用map這樣的簡單計算模型進行計算。但是Spark里的計算模型卻是數組形式,RDD如何處理Map的數據格式了?本篇文章就主要講解RDD是如何處理Map的數據格式。
Pair RDD及鍵值對RDD,Spark里創建Pair RDD也是可以通過兩種途徑,一種是從內存里讀取,一種是從文件讀取。
首先是從文件讀取,上篇里我們看到使用textFile方法讀取文件,讀取的文件是按行組織成一個數組,要讓其變成map格式就的進行轉化,代碼如下所示:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | /* ?* 測試文件數據: ?* x01,1,4 ???x02,11,1 x01,3,9 x01,2,6 ???x02,18,12 ???x03,7,9 ?* ?* */ val?rddFile:RDD[(String,String)]?=?sc.textFile("file:///F:/sparkdata01.txt",?1).map { x?=> (x.split(",")(0),x.split(",")(1) +?","?+ x.split(",")(2)) } val?rFile:RDD[String]?=?rddFile.keys println("=========createPairMap File=========") println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03 println("=========createPairMap File=========") |
我們由此可以看到以讀取文件方式構造RDD,我們需要使用map函數進行轉化,讓其變成map的形式。
下面是通過內存方式進行創建,代碼如下:
| 1 2 3 4 5 | val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) val?r:RDD[(String,Int)]?=?rdd.reduceByKey((x,y)?=> x + y) println("=========createPairMap=========") println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6) println("=========createPairMap=========") |
RDD任然是數組形式,只不過數組的元素是("k01",3)格式是scala里面特有的Tuple2及二元組,元組可以當作一個集合,這個集合可以是各種不同數據類型組合而成,二元組就是只包含兩個元素的元組。
由此可見Pair RDD也是數組,只不過是一個元素為二元組的數組而已,上篇里對RDD的操作也是同樣適用于Pair RDD的。
下面是Pair RDD的API講解,同樣我們先說轉化操作的API:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | reduceByKey:合并具有相同鍵的值; groupByKey:對具有相同鍵的值進行分組; keys:返回一個僅包含鍵值的RDD; values:返回一個僅包含值的RDD; sortByKey:返回一個根據鍵值排序的RDD; flatMapValues:針對Pair RDD中的每個值應用一個返回迭代器的函數,然后對返回的每個元素都生成一個對應原鍵的鍵值對記錄; mapValues:對Pair RDD里每一個值應用一個函數,但是不會對鍵值進行操作; combineByKey:使用不同的返回類型合并具有相同鍵的值; subtractByKey:操作的RDD我們命名為RDD1,參數RDD命名為參數RDD,剔除掉RDD1里和參數RDD中鍵相同的元素; join:對兩個RDD進行內連接; rightOuterJoin:對兩個RDD進行連接操作,第一個RDD的鍵必須存在,第二個RDD的鍵不再第一個RDD里面有那么就會被剔除掉,相同鍵的值會被合并; leftOuterJoin:對兩個RDD進行連接操作,第二個RDD的鍵必須存在,第一個RDD的鍵不再第二個RDD里面有那么就會被剔除掉,相同鍵的值會被合并; cogroup:將兩個RDD里相同鍵的數據分組在一起 |
下面就是行動操作的API了,具體如下:
| 1 2 3 | countByKey:對每個鍵的元素進行分別計數; collectAsMap:將結果變成一個map; lookup:在RDD里使用鍵值查找數據 |
接下來我再提提那些不是很常用的RDD操作,具體如下:
轉化操作的:
| 1 | sample:對RDD采樣; |
行動操作:
| 1 2 3 4 5 | take(num):返回RDD里num個元素,隨機的; top(num):返回RDD里最前面的num個元素,這個方法實用性還比較高; takeSample:從RDD里返回任意一些元素; sample:對RDD里的數據采樣; takeOrdered:從RDD里按照提供的順序返回最前面的num個元素 |
接下來就是示例代碼了,如下所示:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | package?cn.com.sparktest ? import?org.apache.spark.SparkConf import?org.apache.spark.SparkConf import?org.apache.spark.SparkContext import?org.apache.spark.SparkContext._ import?org.apache.spark.rdd.RDD import?org.apache.spark.util.collection.CompactBuffer ? object?SparkPairMap { ??? ??val?conf:SparkConf?=?new?SparkConf().setAppName("spark pair map").setMaster("local[2]") ??val?sc:SparkContext?=?new?SparkContext(conf) ?? ??/** ???* 構建Pair RDD ???*/ ??def?createPairMap():Unit?=?{ ????val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) ????val?r:RDD[(String,Int)]?=?rdd.reduceByKey((x,y)?=> x + y) ????println("=========createPairMap=========") ????println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6) ????println("=========createPairMap=========") ????? ????/* ?????* 測試文件數據: ?????* x01,1,4 ?????????????x02,11,1 ?????????????x01,3,9 ?????????????x01,2,6 ???????x02,18,12 ???????x03,7,9 ?????* ?????* */ ????val?rddFile:RDD[(String,String)]?=?sc.textFile("file:///F:/sparkdata01.txt",?1).map { x?=> (x.split(",")(0),x.split(",")(1) +?","?+ x.split(",")(2)) } ????val?rFile:RDD[String]?=?rddFile.keys ????println("=========createPairMap File=========") ????println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03 ????println("=========createPairMap File=========") ??} ??? ??/** ???* 關于Pair RDD的轉化操作和行動操作 ???*/ ??def?pairMapRDD(path:String):Unit?=?{ ????val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) ????val?other:RDD[(String,Int)]?=?sc.parallelize(List(("k01",29)),?1) ????? ????// 轉化操作 ????val?rddReduce:RDD[(String,Int)]?=?rdd.reduceByKey((x,y)?=> x + y) ????println("====reduceByKey===:"?+ rddReduce.collect().mkString(","))// (k01,29),(k03,2),(k02,6) ????val?rddGroup:RDD[(String,Iterable[Int])]?=?rdd.groupByKey() ????println("====groupByKey===:"?+ rddGroup.collect().mkString(","))// (k01,CompactBuffer(3, 26)),(k03,CompactBuffer(2)),(k02,CompactBuffer(6)) ????val?rddKeys:RDD[String]?=?rdd.keys ????println("====keys=====:"?+ rddKeys.collect().mkString(","))// k01,k02,k03,k01 ????val?rddVals:RDD[Int]?=?rdd.values ????println("======values===:"?+ rddVals.collect().mkString(","))// 3,6,2,26 ????val?rddSortAsc:RDD[(String,Int)]?=?rdd.sortByKey(true,?1) ????val?rddSortDes:RDD[(String,Int)]?=?rdd.sortByKey(false,?1) ????println("====rddSortAsc=====:"?+ rddSortAsc.collect().mkString(","))// (k01,3),(k01,26),(k02,6),(k03,2) ????println("======rddSortDes=====:"?+ rddSortDes.collect().mkString(","))// (k03,2),(k02,6),(k01,3),(k01,26) ????val?rddFmVal:RDD[(String,Int)]?=?rdd.flatMapValues { x?=> List(x +?10) } ????println("====flatMapValues===:"?+ rddFmVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36) ????val?rddMapVal:RDD[(String,Int)]?=?rdd.mapValues { x?=> x +?10?} ????println("====mapValues====:"?+ rddMapVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36) ????val?rddCombine:RDD[(String,(Int,Int))]?=?rdd.combineByKey(x?=> (x,1), (param:(Int,Int),x)?=> (param._1?+ x,param._2?+?1), (p1:(Int,Int),p2:(Int,Int))?=> (p1._1?+ p2._1,p1._2?+ p2._2)) ????println("====combineByKey====:"?+ rddCombine.collect().mkString(","))//(k01,(29,2)),(k03,(2,1)),(k02,(6,1)) ????val?rddSubtract:RDD[(String,Int)]?=?rdd.subtractByKey(other); ????println("====subtractByKey====:"?+ rddSubtract.collect().mkString(","))// (k03,2),(k02,6) ????val?rddJoin:RDD[(String,(Int,Int))]?=?rdd.join(other) ????println("=====rddJoin====:"?+ rddJoin.collect().mkString(","))// (k01,(3,29)),(k01,(26,29)) ????val?rddRight:RDD[(String,(Option[Int],Int))]?=?rdd.rightOuterJoin(other) ????println("====rightOuterJoin=====:"?+ rddRight.collect().mkString(","))// (k01,(Some(3),29)),(k01,(Some(26),29)) ????val?rddLeft:RDD[(String,(Int,Option[Int]))]?=?rdd.leftOuterJoin(other) ????println("=====rddLeft=====:"?+ rddLeft.collect().mkString(","))// (k01,(3,Some(29))),(k01,(26,Some(29))),(k03,(2,None)),(k02,(6,None)) ????val?rddCogroup:?RDD[(String, (Iterable[Int], Iterable[Int]))]?=?rdd.cogroup(other) ????println("=====cogroup=====:"?+ rddCogroup.collect().mkString(","))// (k01,(CompactBuffer(3, 26),CompactBuffer(29))),(k03,(CompactBuffer(2),CompactBuffer())),(k02,(CompactBuffer(6),CompactBuffer())) ????? ????// 行動操作 ????val?resCountByKey?=?rdd.countByKey() ????println("=====countByKey=====:"?+ resCountByKey)// Map(k01 -> 2, k03 -> 1, k02 -> 1) ????val?resColMap?=?rdd.collectAsMap() ????println("=====resColMap=====:"?+ resColMap)//Map(k02 -> 6, k01 -> 26, k03 -> 2) ????val?resLookup?=?rdd.lookup("k01") ????println("====lookup===:"?+ resLookup)?// WrappedArray(3, 26) ??} ??? ??/** ???* 其他一些不常用的RDD操作 ???*/ ??def?otherRDDOperate(){ ????val?rdd:RDD[(String,Int)]?=?sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) ????? ????println("=====first=====:"?+ rdd.first())//(k01,3) ????val?resTop?=?rdd.top(2).map(x?=> x._1?+?";"?+ x._2) ????println("=====top=====:"?+ resTop.mkString(","))// k03;2,k02;6 ????val?resTake?=?rdd.take(2).map(x?=> x._1?+?";"?+ x._2) ????println("=======take====:"?+ resTake.mkString(","))// k01;3,k02;6 ????val?resTakeSample?=?rdd.takeSample(false,?2).map(x?=> x._1?+?";"?+ x._2) ????println("=====takeSample====:"?+ resTakeSample.mkString(","))// k01;26,k03;2 ????val?resSample1?=?rdd.sample(false,?0.25) ????val?resSample2?=?rdd.sample(false,?0.75) ????val?resSample3?=?rdd.sample(false,?0.5) ????println("=====sample======:"?+ resSample1.collect().mkString(","))// 無 ????println("=====sample======:"?+ resSample2.collect().mkString(","))// (k01,3),(k02,6),(k01,26) ????println("=====sample======:"?+ resSample3.collect().mkString(","))// (k01,3),(k01,26) ??} ??? ??def?main(args:?Array[String]):?Unit?=?{ ????createPairMap() ????pairMapRDD("file:///F:/sparkdata01.txt") ????otherRDDOperate() ??} ??? } |
本篇到此就將我知道的spark的API全部講完了,兩篇文章里的示例代碼都是經過測試的,可以直接運行,大家在閱讀代碼時候最好注意這個特點:我在寫RDD轉化代碼時候都是很明確的寫上了轉化后的RDD的數據類型,這樣做的目的就是讓讀者更加清晰的認識不同RDD轉化后的數據類型,這點在實際開發里非常重要,在實際的計算里我們經常會不同的計算算法不停的轉化RDD的數據類型,而使用scala開發spark程序時候,我發現scala和javascript很類似,我們不去指定返回值數據類型,scala編譯器也會自動推算結果的數據類型,因此編碼時候我們可以不指定具體數據類型。這個特點就會讓我們在實際開發里碰到種種問題,因此我在示例代碼里明確了RDD轉化后的數據類型。
在使用Pair RDD時候,我們要引入:
| 1 | import?org.apache.spark.SparkContext._ |
否則代碼就有可能報錯,說找不到對應的方法,這個引入就是scala里導入的隱世類型轉化的功能,原理和上段文字說到的內容差不多。
? ? ? 開發spark程序不僅僅只可以使用scala,還可以使用python,java,不過scala使用起來更加方便,spark的API簡單清晰,這樣的編程大大降低了原先使用mapreduce編程的難度,但是如果我們要深入掌握這些API那么就要更加深入的學習下scala。下一篇我就根據spark里RDD的API講解一些scala的語法,通過這些語法讓我們更好的掌握Spark的API。
總結
以上是生活随笔為你收集整理的Spark笔记:RDD基本操作(下)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark Java API:forea
- 下一篇: Spark笔记:RDD基本操作(上)