日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark rdd 介绍,和案例介绍

發布時間:2024/9/27 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark rdd 介绍,和案例介绍 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.2、創建RDD

1)由一個已經存在的Scala集合創建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

2)由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)

1.3、RDD編程API

1.3.1、Transformation

RDD中的所有轉換都是延遲加載的,也就是說,它們并不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。

常用的Transformation:

轉換含義
map(func)返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成
filter(func)返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成
flatMap(func)類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func)類似于map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed)根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用于指定隨機數生成器種子
union(otherDataset)對源RDD和參數RDD求并集后返回一個新的RDD
intersection(otherDataset)對源RDD和參數RDD求交集后返回一個新的RDD
distinct([numTasks]))對源RDD進行去重后返回一個新的RDD
groupByKey([numTasks])在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks])與sortByKey類似,但是更靈活
join(otherDataset, [numTasks])在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])在類型為(K,V)和(K,W)的RDD上調用,返回一個(K(Iterable,Iterable))類型的RDD
cartesian(otherDataset)笛卡爾積
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

1.3.2、Action

動作含義
reduce(func)通過func函數聚集RDD中的所有元素,這個功能必須是可交換且可并聯的
collect()在驅動程序中,以數組的形式返回數據集的所有元素
count()返回RDD的元素個數
first()返回RDD的第一個元素(類似于take(1))
take(n)返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed])返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用于指定隨機數生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path)將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對于每個元素,Spark將會調用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path)將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。
saveAsObjectFile(path)
countByKey()針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func)在數據集的每一個元素上,運行函數func進行更新。

1.4 練習Spark rdd的api

連接Spark-Shell:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077

練習1

//通過并行化生成rdd val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) //對rdd1里的每一個元素乘2然后排序 val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true) //過濾出大于等于十的元素 val rdd3 = rdd2.filter(_ >= 10) //將元素以數組的方式在客戶端顯示 rdd3.collect

練習2:

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j")) //將rdd1里面的每一個元素先切分在壓平 val rdd2 = rdd1.flatMap(_.split(' ')) rdd2.collect

練習3:

val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4)) //求并集 val rdd3 = rdd1.union(rdd2) //求交集 val rdd4 = rdd1.intersection(rdd2) //去重 rdd3.distinct.collect rdd4.collect

練習4:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //求jion val rdd3 = rdd1.join(rdd2) rdd3.collect //求并集 val rdd4 = rdd1 union rdd2 rdd4.collect //按key進行分組 val rdd5 = rdd4.groupByKey rdd5.collect

練習5:

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //cogroup val rdd3 = rdd1.cogroup(rdd2) //注意cogroup與groupByKey的區別 rdd3.collect

練習6:

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5)) //reduce聚合 val rdd2 = rdd1.reduce(_ + _) rdd2.collect

練習7:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) val rdd3 = rdd1.union(rdd2) //按key進行聚合 val rdd4 = rdd3.reduceByKey(_ + _) rdd4.collect //按value的降序排序 val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1)) rdd5.collect

練習8:

mapPartitionsdef mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]該函數和map函數類似,只不過映射函數的參數由RDD中每一個元素變成了RDD中每一個分區的迭代器。如果在映射的過程中需要頻繁創建額外的對象,使用mapPartitions要比map高效的多比如:將RDD中的所有元素通過JDBC連接寫入數據庫,如果使用map函數,可能要為每一個元素都創建一個collection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個分區建立一個connection.參數preservesPartitioning表示是否保留父RDD的partitioner分區信息。 //rdd1有兩個分區 scala> var rdd1 = sc.makeRDD(1 to 5,2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at makeRDD at <console>:24 scala> rdd1.collectres27: Array[Int] = Array(1, 2, 3, 4, 5) //rdd3將rdd1中每個分區中的數值累加(通過mapPartitions來實現) scala> var rdd3 = rdd1.mapPartitions{ x => {| var result = List[Int]()| var i = 0| while(x.hasNext) {| i += x.next()| }| result.::(i).iterator| }}rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[64] at mapPartitions at <console>:26 //查看合并結果后的rdd3的值 scala> rdd3.collectres28: Array[Int] = Array(3, 12) //查看rdd3的分區大小 scala> rdd3.partitions.size res29: Int = 2

練習9:

mapPartitionsWithIndexdef mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]函數作用通mapPartitions,不過提供了兩個參數,第一個參數為分區的索引 例如: scala> var rdd1 = sc.makeRDD(1 to 25,4)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at <console>:24scala> var rdd2 = rdd1.mapPartitionsWithIndex{| (x,iter) => {| var result = List[String]()| var i = 0| while(iter.hasNext) {| i += iter.next()| }| result.::(x + "|" + i).iterator| }| }rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[67] at mapPartitionsWithIndex at <console>:26//獲取結果值(從返回的結果中可以看到) scala> rdd2.collectres30: Array[String] = Array(0|21, 1|57, 2|93, 3|154)再如: scala> val func = (index:Int,iter:Iterator[(Int)])=> {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| } scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2) scala> rdd1.mapPartitionsWithIndex(func).collectres0: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4], [partID:1,val:5], [part], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9])

練習8:
aggregate函數將每個分區里的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zerorValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致。
函數原型:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

aggregate 聚合,先在分區內進行聚合,然后再將每個分區的結果一起結果進行聚合scala> def func1(index:Int,iter:Iterator[(Int)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| }func1: (index: Int, iter: Iterator[Int])Iterator[String]//創建一個并行化的RDD,有兩個分區 scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[77] at parallelize at <console>:24 //通過下面的代碼可以看到rdd1中內容再兩個分區內的分布情況,通過下面的結果可以看出有兩個分區,分別是partID:0和partID:1 scala> rdd1.mapPartitionsWithIndex(func1).collectres56: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4], [partID:1,val:5], [partID:1,val:6], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9]) //下面的執行步驟是: //一:01取出最大值112取出最大值223取出最大值334取出最大值4===》第一個分區的最大值是4 //二:05取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分區的最大值是9 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(0) + 4+9=13 scala> rdd1.aggregate(0)(math.max(_,_),_+_) res57: Int = 13//下面的執行步驟是: //一:31取出最大值332取出最大值333取出最大值334取出最大值4===》第一個分區的最大值是4 //二:35取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分區的最大值是9 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(3)+4+9=16 scala> rdd1.aggregate(3)(math.max(_,_),_+_) res62: Int = 16//下面的執行步驟是: //一:51取出最大值552取出最大值553取出最大值554取出最大值5===》第一個分區的最大值是5 //二:55取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分區的最大值是9 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(5)+5+9=19 scala> rdd1.aggregate(5)(math.max(_,_),_+_) res58: Int = 19再如: //下面的執行步驟是: //一:81取出最大值882取出最大值883取出最大值884取出最大值8===》第一個分區的最大值是8 //二:85取出最大值886取出最大值887取出最大值888取出最大值889取出最大值9====>第二個分區的最大值是9 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(8)+8+9=25 scala> rdd1.aggregate(8)(math.max(_,_),_+_) res58: Int = 19再如: //下面的執行步驟是: //一:101取出最大值10102取出最大值10103取出最大值10104取出最大值10===》第一個分區的最大值是10 //二:105取出最大值10106取出最大值10107取出最大值10108取出最大值10109取出最大值10====>第二個分區的最大值是10 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(10)+10+10=30 scala> rdd1.aggregate(10)(math.max(_,_),_+_) res58: Int = 30================================================================================ 下面是字符串的聚合 scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[79] at parallelize at <console>:24scala> def fun2(index:Int,iter:Iterator[(String)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| } fun2: (index: Int, iter: Iterator[String])Iterator[String] //通過下面的結果可以知道:"a","b","c"在partID:0中,"d","e","f"在partID:1中 scala> rdd2.mapPartitionsWithIndex(fun2).collect res63: Array[String] = Array([partID:0,val:a], [partID:0,val:b], [partID:0,val:c], [partID:1,val:d], [partID:1,val:e], [partID:1,val:f]) //下面的運行順序是: //一、"""a"相加得"a","a""b"相加得"ab","ab""c"相加得"abc",第一個分區得到的結果是:"abc" //一、"""d"相加得"d","d""e"相加得"de","ed""f"相加得"def",第一個分區得到的結果是:"def" //三、由于是并行的計算,所以可能是第一個分區先執行完,此時的結果是:"" + "abc" + "def" ===》"abcdef";若是第二個分區先執行完,此時的結果是:"" + "def" + "abc" ===》"defabc" scala> rdd2.aggregate("")(_+_,_+_) res64: String = abcdef scala> rdd2.aggregate("")(_+_,_+_) res65: String = defabc//下面的運行順序是: //一、"=""a"相加得"=a","=a""b"相加得"=ab","=ab""c"相加得"=abc",第一個分區得到的結果是:"=abc" //一、"=""d"相加得"=d","=d""e"相加得"=de","=ed""f"相加得"=def",第一個分區得到的結果是:"=def" //三、由于是并行的計算,所以可能是第一個分區先執行完,此時的結果是:"=" + "=abc" + "=def" ===》"==abc=def";若是第二個分區先執行完,此時的結果是:"="+"=def" + "=abc" ===》"==def=abc" //下面的結果中分別是:res68: String = ==def=abc 和 res69: String = ==abc=def,和上面的推算結果一致 scala> rdd2.aggregate("=")(_ + _, _ + _) res68: String = ==def=abc scala> rdd2.aggregate("=")(_ + _, _ + _) res69: String = ==abc=defval rdd3 = sc.parallelize(List("12","23","345","4567"),2) //通過下面可以知道有兩個分區,并且每個分區中有不同的值 scala> rdd3.mapPartitionsWithIndex(fun2).collect res70: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:4567]) //下面的運行步驟是(scala> "".length結果是res72: Int = 0),(scala>"12".length結果是res73:Int=2): //一:"".length"12".length求出最大值2,得到字符串是"2";"2".length和"23".length求出最大值2,得到的字符串是2;所以第一個分區計算出的結果是:"2" //二:"".length"345".length求出最大值3,得到字符串是"3";"3".length和"4567".length求出最大值4,得到的字符串是4;所以第一個分區計算出的結果是:"4" //三:得到的結果最后執行x+y,由于是并行計算所以可能是"24"或者"42" scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) res75: String = 24 scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) res76: String = 42//下面求最小值: scala> val rdd4 = sc.parallelize(List("12","23","345",""),2) rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[84] at parallelize at <console>:24 scala> rdd4.mapPartitionsWithIndex(fun2).collect res79: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:]) //運行過程是: //一:"".length"12".length求出最小值0,得到字符串是"0";"0".length和"23".length求出最小值1,得到的字符串是0;所以第一個分區計算出的結果是:"0" //二:"".length"345".length求出最小值0,得到字符串是"0";"0".length和"".length求出最小值0,得到的字符串是0;所以第一個分區計算出的結果是:"0" //三:得到的結果最后執行x+y,由于是并行計算所以可能是"01""10" scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res85: String = 10scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res86: String = 01val rdd5 = sc.parallelize(List("12","23","","345"),2) //運行過程是: //一:"".length"12".length求出最小值0,得到字符串是"0";"0".length和"23".length求出最小值1,得到的字符串是0;所以第一個分區計算出的結果是:"0" //二:"".length"".length求出最小值0,得到字符串是"0";"0".length和"345".length求出最小值1,得到的字符串是1;所以第一個分區計算出的結果是:"1" //三:得到的結果最后執行x+y,由于是并行計算所以可能是"1"或 rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)再如案例: scala> def seqOP(a:Int, b:Int) : Int = {| println("seqOp: " + a + "\t" + b)| math.min(a,b)| } seqOP: (a: Int, b: Int)Intscala> def combOp(a:Int, b:Int): Int = {| println("combOp: " + a + "\t" + b)| a + b| } combOp: (a: Int, b: Int)Intscala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2) //這里要注意的是上面的z是Int類型的,所以下面要用于集合迭代的類型也是Int類型的。 scala> def fun2(index:Int,iter:Iterator[(Int)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| } fun2: (index: Int, iter: Iterator[Int])Iterator[String] //通過下面的方式顯示出每個值所在的分區 scala> z.mapPartitionsWithIndex(fun2).collect res94: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:1,val:4], [partID:1,val:5], [partID:1,val:6]) //下面的含義是:兩個分區每個里面先單獨執行seqOP,兩個都執行完成之后,再執行comOp邏輯,所以下面的運行過程是: //一、31執行seqOP的最小值是112執行seqOP間的最小值是113執行seqOP的最小值是1,第一個分區得到的結果是1 //二、34執行seqOP的最小值是335執行seqOP間的最小值是336執行seqOP的最小值是3,第一個分區得到的結果是3 //三、接著執行comOp邏輯,(3)和分區一種的1執行combOp得到的結果是:3+1=44接著和分區二中的3執行combOp得到的結果是4+3=7,所以最后的結果是:7 scala> z.aggregate(3)(seqOP, combOp) combOp:3 1 combOp:4 3 res95: Int = 7//再次驗證: //一、21執行seqOP的最小值是112執行seqOP間的最小值是113執行seqOP的最小值是1,第一個分區得到的結果是1 //二、24執行seqOP的最小值是225執行seqOP間的最小值是226執行seqOP的最小值是2,第一個分區得到的結果是2 //三、接著執行comOp邏輯,(2)和分區一種的1執行combOp得到的結果是:2+1=33接著和分區二中的2執行combOp得到的結果是3 + 2=5,所以最后的結果是:5 scala> z.aggregate(2)(seqOP, combOp) [Stage 105:> (0 + 0) / 2]combOp:2 1 combOp:3 2 res96: Int = 5 //下面的同樣: scala> def seqOp(a:String, b:String) : String = {| println("seqOp: " + a + "\t" + b)| math.min(a.length , b.length ).toString| } seqOp: (a: String, b: String)Stringscala> def combOp(a:String, b:String) : String = {| println("combOp: " + a + "\t" + b)| a + b| } combOp: (a: String, b: String)Stringscala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2) scala> z. aggregate ("")(seqOp, combOp) seqOp: 345 seqOp: 12 seqOp: 0 4567 seqOp: 0 23 combOp: 1 combOp: 1 1res25: String = 11

練習10:
aggregateByKey

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } scala> pairRDD.mapPartitionsWithIndex(func2).collect res99: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]) //執行過程是: //1、每個分區中的內容都按照key先進行分組, //第一個分區分組后的結果是:(cat,(2,5))、(mouse,(4)) //第二個分區分組后的結果是:(cat,(12))、(dog,(12))、(mouse,(2)) //2、接著0,分別和每組中的結果比對, //對于分區一:0和cat中的2比較,得到最大值2;2和cat中的5比較,得到的最大結果是5。同樣mouse執行相同操作。所以最終得到的是:(cat,(5)),(mouse,(4)) //對于分區二:0和cat中的12比較,得到的最大值12。依次類推,最終得到的結果是:(cat,(12))、(dog,(12))、(mouse,(2)) //3、接著0和分區一和分區二中每個最大值相加,最終得到的結果是: // (cat,(5)) + (cat,(12)) ? (cat,(5 + 12)) ==> (cat,(17)) //(mouse,(4)) + (mouse,(2)) ? (mouse,(4 + 2)) ==> (mouse,(6)) //(dog,(12)) ? (dog,(12)) ==> (dog,(12)) pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect//執行過程是: //1、每個分區中的內容都按照key先進行分組, //第一個分區分組后的結果是:(cat,(2,5))、(mouse,(4)) //第二個分區分組后的結果是:(cat,(12))、(dog,(12))、(mouse,(2)) //2、接著100,分別和每組中的結果比對, //對于分區一:100和cat中的2比較,得到最大值100;100和cat中的5比較,得到的最大結果是100。同樣mouse執行相同操作。所以最終得到的是:(cat,(100)),(mouse,(100)) //對于分區二:100和cat中的12比較,得到的最大值100。依次類推,最終得到的結果是:(cat,(100))、(dog,(100))、(mouse,(100)) //3、接著100和分區一和分區二中每個最大值相加,最終得到的結果是: //(cat,(100)) + (cat,(100)) ? (cat,(100 + 100)) ==> (cat,(200)) //(mouse,(100)) + (mouse,(100)) ? (mouse,(100 + 100)) ==> (mouse,(200)) //(dog,(100)) + (dog,(100)) ? (dog,(100)) ==> (dog,(100)) pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

練習11:
checkpoint (知識點可以查看:http://blog.csdn.net/tototuzuoquan/article/details/74838936)
為當前RDD設置檢查點。該函數將會創建一個二進制的文件,并存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程中,該RDD的所有依賴于父RDD中的信息將全部被移出。對RDD進行checkpoint操作并不會馬上被執行,必須執行Action操作才能觸發。
函數原型:
def checkpoint()
實例:

scala> val data = sc.parallelize(1 to 100000,15) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[94] at parallelize at <console>:24 scala> sc.setCheckpointDir("/iteblog") 17/07/07 19:17:22 WARN spark.SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/iteblog' appears to be on the local filesystem. scala> data.checkpoint scala> data.count res105: Long = 100000[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog Found 1 items drwxr-xr-x - root supergroup 0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c Found 1 items drwxr-xr-x - root supergroup 0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94 [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94 Found 15 items -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00000 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00001 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00002 -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00003 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00004 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00005 -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00006 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00007 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00008 -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00009 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00010 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00011 -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00012 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00013 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00014 [root@hadoop2 hadoop-2.8.0]# 執行完count之后,會在/iteblog目錄下產生出多個(數量和你分區個數有關)二進制的文件。 //設置檢查點,將文件最終輸出到下面的位置上 scala> sc.setCheckpointDir("hdfs://mycluster/wordcount/ck")scala> val rdd = sc.textFile("hdfs://mycluster/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[100] at reduceByKey at <console>:24scala> rdd.checkpointscala> rdd.isCheckpointed res108: Boolean = falsescala> rdd.count res109: Long = 289 scala> rdd.isCheckpointed res110: Boolean = truescala> rdd.getCheckpointFile res111: Option[String] = Some(hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100)scala> [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100 Found 10 items -rw-r--r-- 3 root supergroup 147 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/_partitioner -rw-r--r-- 3 root supergroup 867 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00000 -rw-r--r-- 3 root supergroup 721 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00001 -rw-r--r-- 3 root supergroup 1091 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00002 -rw-r--r-- 3 root supergroup 1030 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00003 -rw-r--r-- 3 root supergroup 944 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00004 -rw-r--r-- 3 root supergroup 810 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00005 -rw-r--r-- 3 root supergroup 964 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00006 -rw-r--r-- 3 root supergroup 1011 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00007 -rw-r--r-- 3 root supergroup 974 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00008

練習12:
coalesce, repartition

coalesce:對RDD中的分區重新進行合并
函數原型:
def coalesce(numPartitions: Int, shuffle: Boolean = false)
    (implicit ord: Ordering[T] = null): RDD[T]
  返回一個新的RDD,且該RDD的分區個數等于numPartitions個數。如果shuffle設置為true,這回進行shuffle。

scala> var data = sc.parallelize(List(1,2,3,4)) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[104] at parallelize at <console>:24scala> data.partitions.length res115: Int = 6scala> val result = data.coalesce(2,false) result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[105] at coalesce at <console>:26scala> result.partitions.length res116: Int = 2scala> result.toDebugString res117: String = (2) CoalescedRDD[105] at coalesce at <console>:26 []| ParallelCollectionRDD[104] at parallelize at <console>:24 []scala> val result1 = data.coalesce(2,true) result1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[109] at coalesce at <console>:26scala> result1.toDebugString res118: String = (2) MapPartitionsRDD[109] at coalesce at <console>:26 []| CoalescedRDD[108] at coalesce at <console>:26 []| ShuffledRDD[107] at coalesce at <console>:26 []+-(6) MapPartitionsRDD[106] at coalesce at <console>:26 []| ParallelCollectionRDD[104] at parallelize at <console>:24 []scala> 從上面可以看出shuffle為false的時候并不進行shuffle操作;而為true的時候會進行shuffle操作。RDD.partitions.length可以獲取相關RDD的分區數。再如下面的例子: scala> val rdd1 = sc.parallelize(1 to 10,10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[102] at parallelize at <console>:24scala> rdd1.collect res112: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)scala> rdd1.partitions.length res113: Int = 10scala> val rdd2 = rdd1.coalesce(2,false) rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[103] at coalesce at <console>:26scala> rdd1.partitions.length res114: Int = 10scala>

練習13:
collectAsMap
功能和collect函數類似,該函數用于Pair RDD,最終返回Map類型的結果
函數原型:
def collectAsMap(): Map[K, V]

scala> val rdd = sc.parallelize(List(("a",1),("b",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[111] at parallelize at <console>:24scala> rdd.collectAsMap res119: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)scala>從結果我們可以看出,如果RDD中同一個key中存在多個Value,那么后面的Value將會把前面的Value覆蓋,最終得到的結果就是Key唯一,而且對應一個Value。

練習14:
combineByKey
使用用戶設置好的聚合函數對每個Key中的Value進行組合(combine)??梢詫⑤斎腩愋蜑镽DD[(K,V)]轉成RDD[(K,C)]
函數原型:
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) : RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine:
Boolean = true, serializer: Serializer = null): RDD[(K, C)]
第一個和第二個函數都是基于第三個函數實現的,使用的是HashPartitioner,Serializer為null。而第三個函數我們可以指定分區,如果需要使用Serializer的話也可以指定。combineByKey函數比較重要,我們熟悉地諸如aggregateByKey、foldByKey、reduceByKey等函數都是基于函數實現的。默認情況在Map端進行組合操作。

scala> val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"), (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good"))) data: org.apache.spark.rdd.RDD[(Int, String)] =ParallelCollectionRDD[15] at parallelize at <console>:12scala> val result = data.combineByKey(List(_), (x: List [String], y: String) => y :: x, (x: List[String], y: List[String]) => x ::: y) result: org.apache.spark.rdd.RDD[(Int, List[String])] =ShuffledRDD[19] at combineByKey at <console>:14scala> result.collect res20: Array[(Int, List[String])] = Array((1,List(www, iteblog, com)),(2,List(bbs, iteblog, com)), (3,List(good)))scala> val data = sc.parallelize(List(("iteblog", 1), ("bbs", 1), ("iteblog", 3))) data: org.apache.spark.rdd.RDD[(String, Int)] =ParallelCollectionRDD[24] at parallelize at <console>:12scala> val result = data.combineByKey(x => x, (x: Int, y:Int) => x + y, (x:Int, y: Int) => x + y) result: org.apache.spark.rdd.RDD[(String, Int)] =ShuffledRDD[25] at combineByKey at <console>:14scala> result.collect res27: Array[(String, Int)] = Array((iteblog,4), (bbs,1))再如: val rdd1 = sc.textFile("hdfs://mycluster/wordcount/input").flatMap(_.split(" ")).map((_, 1)) val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd2.collectval rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd3.collectval rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val rdd6 = rdd5.zip(rdd4) val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)

練習15
countByKey

scala> val rdd1 = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd1.countByKey res0: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2) scala> rdd1.countByValue res1: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (a,1) -> 1, (c,2) -> 1, (c,1) -> 1)scala>

練習16:
filterByRange

scala> val rdd1 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24scala> val rdd2 = rdd1.filterByRange("b","d") rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at filterByRange at <console>:26scala> rdd2.collect res2: Array[(String, Int)] = Array((c,3), (d,4), (c,2))

練習17:
flatMapValues

scala> a.flatMapValues(_.split(" ")) res5: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at flatMapValues at <console>:27scala> a.flatMapValues(_.split(" ")).collect res6: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

練習18:
foldByKey

scala> val rdd1 = sc.parallelize(List("dog","wolf","cat","bear"),2) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:24scala> val rdd2 = rdd1.map(x => (x.length,x)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[13] at map at <console>:26scala> rdd2.collect res7: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))scala> val rdd3 = rdd2.foldByKey("")(_+_) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[14] at foldByKey at <console>:28scala> rdd3.collect res8: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))scala> rdd3.collect res9: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))scala> val rdd = sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)) rdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at map at <console>:24scala> rdd.foldByKey(0)(_+_) res10: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at foldByKey at <console>:27scala> rdd.foldByKey(0)(_+_).collect res11: Array[(String, Int)] = Array((role,1), (Play,1), (fraud,1), (level,1), (business,2), (improve,1), (platforms,1), (order,1), (big,1), (with,1), (scientist,,1), (active,1), (valuable,1), (data,5), (information,1), (Cooperate,1), (Collecting,1), (framework,1), (E-commerce/payment,1), (acquired,1), (root,1), (accurate,1), (solutions,1), (analysis;Maintenance,1), (problems,1), (them,1), (Analyze,1), (models,1), (analysis,3), (realize,1), (actual,1), (weight,1), (compare,1), (risk,1), (anti-fraud,1), (key,1), (related,1), (base,1), (Support,1), (against,1), (automatic,1), (to,2), (platform,2), (company's,1), (in,2), (needs,,1), (provide,2), (implement,1), (affecting,1), (strategy,1), (of,1), (reports,1), (management,1), (detection,,1), (for,1), (work,,1), (cause,1), (an,1), (verify,1),... scala>

foreachPartition
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreachPartition(x => println(x.reduce(_ + _)))

練習19:
keyBy

scala> val rdd1 = sc.parallelize(List("dog","salmon","salmon","rat","elephant"),3) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24scala> val rdd2 = rdd1.keyBy(_.length) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[22] at keyBy at <console>:26scala> rdd2.collect res12: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))scala>

練習20:
keys values

scala> val rdd1 = sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"),2) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at parallelize at <console>:24scala> val rdd2 = rdd1.map(x => (x.length,x)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at map at <console>:26scala> rdd2.keys.collect res13: Array[Int] = Array(3, 5, 4, 3, 7, 5)scala> rdd2.values.collect res14: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)

總結

以上是生活随笔為你收集整理的Spark rdd 介绍,和案例介绍的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。