sparkRDD相关操作
RDD(彈性分布式數(shù)據(jù)集)。RDD以分區(qū)中的每一行進(jìn)行分布式計(jì)算。父子依賴關(guān)系。
一、RDD創(chuàng)建操作
1)數(shù)據(jù)集合
Val data=Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
Val distData = sc.parallelize(data, 3) #分區(qū),生成RDD數(shù)據(jù)集
Val distData =sc.parallelize(1 to 10, 2) #2是并行程度,指定多少線程同時(shí)執(zhí)行。
distData.collect
distData.take(1)
sc.makeRDD(1 to 10, 4).map(e=> {val tname=Thread.currentThread().getName; println(tname + ":" +e)}).collect
2)外部讀取
Val distFile1 = sc.textFile(“data.txt”) /#本地當(dāng)前目錄下文件或指定目錄下文件
Val distFile2 = sc.textFile(“hdfs://192.168.1.100:9000/input/data.txt”)#HDFS文件
textFile(“/input/001.txt, /input/002.txt”)#讀取多個(gè)文件
textFile(“/input/*.txt”)#讀取含通配符路徑
二、RDD轉(zhuǎn)換操作(不會(huì)立即執(zhí)行,返回RDD)
1) Map
Map是對(duì)RDD中每個(gè)元素都執(zhí)行一個(gè)指定的函數(shù)來(lái)生成一個(gè)新的RDD
Val rdd1=sc.parallelize(1 to 9, 3)
Val rdd2=rdd1.map(x=>x*2)
Rdd2.collect
2) Filter
Filter是對(duì)RDD元素進(jìn)行過(guò)濾,返回一個(gè)新的數(shù)據(jù)集,是經(jīng)過(guò)func函數(shù)后返回值為True的原元素組成
Val rdd3=rdd2.filter (x=>x>10)
(12, 14, 16)
3) Top
提取rdd最大的n個(gè)元素
rdd1.top(1)
rdd1.top(1)(scala.math.Ordering.String.reverse) #倒序
4) flatMap
類似于map,但是它是一對(duì)多關(guān)系
rdd3.flatMap(x => x to 20)
(12,13,14,15,16,17,18,19,20,14,15,16,17,18,19,20,16,17,18,19,20)
5) mapPartitons
是map的一種變種,mapPartitions的輸入函數(shù)是每個(gè)分區(qū)的數(shù)據(jù),也就是把每個(gè)分區(qū)中的內(nèi)容作為整體來(lái)處理 。
6) repatition
再分區(qū)
rdd.repartition(4)
7) sample
Sample(withReplacement, fraction, seed) 第一個(gè)參數(shù)是是否為又放回抽樣,第二個(gè)參數(shù)是比例。
Val a=sc.parallelize(1 to 10000,3)
a.sample(false, 0.1).collect().foreach(println)
8) union
數(shù)據(jù)合并,返回一個(gè)新的數(shù)據(jù)集
Val rdd8=rdd1.union(rdd3)
Rdd8.collect
9) intersection
數(shù)據(jù)交集
Val rdd9=rdd8.intersection(rdd1)
10) distinct
數(shù)據(jù)去重
Val rdd10=rdd8.union(rdd9).distinct
11) groupBy
對(duì)RDD元素進(jìn)行分組
val rdd = sc.parallelize(Array((“tom”,10),(“tomas”,12),(“tomlee”,12),(“tomsan”,10))
val rdd2 = rdd.groupBy(e => e _2)
rdd2.collect()
Array((10.CompareBuffer((tom,10),(tomsam,10)),12.CompareBuffer((tomas,12),(tomlee,12))))
12) groupByKey
根據(jù)Key進(jìn)行分組,迭代部分都是value
Val rdd0=sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
Val rdd1=rdd0.groupByKey()
Array((1,ArrayBuffer(1,2,3)),(2,ArrayBuffer(1,2,3)))
13) groupWith
兩個(gè)RDD
100->tom
200->tomas
100->20
200->30
Val rdd1 = sc.makeRDD(Array((100,”tom”),(200, “tomas”)))
Val rdd2 = sc.makeRDD(Array((100, 20),(200, 30)))
rdd1.groupWith(rdd2)
Array((100,(CompactBuffer(tom),CompactBuffer(20))),(200,(CompactBuffer(tomas),CompactBuffer(30))))
14) reduceBykey
數(shù)組的分組聚合操作
Val rdd12=rdd0.reduceByKey((x,y)=>x+y)
Array((1,6),(2,6))
15) aggregeteByKey
更加靈活的一個(gè)函數(shù)。三個(gè)參數(shù),第一個(gè)是初始值,第二個(gè)是給每個(gè)元素值進(jìn)行的函數(shù)操作,第三個(gè)是根據(jù)Key做相應(yīng)的合并操作
Val z=sc.parallelize(list((1,3),(1,2),(1,4),(2,3)))
z.aggregateByKey(0)(math.max(_,_),_+_) #先將每個(gè)value值與初始值0比較大小,然后根據(jù)Key求和。
Array((2,3),(1,9))
16) conbineByKey
更加靈活的一個(gè)函數(shù),與reduceByKey不同,它可以同時(shí)計(jì)算求和和求次數(shù)。根據(jù)Key進(jìn)行聚合操作。
17) sortByKey
排序操作
Val rdd14=rdd0.sortBykey()
Rdd14.collect
18) join
連接兩個(gè)RDD,形成新的rdd,(跟groupwith區(qū)別是join是一對(duì)一,groupwith是分組,相同的放一起)
(1 tom) join (1 100) --> 1, (tom, 100)
(2 tomas) join (2 80) --> 2, (tomas, 80)
Val rdd1 = sc.makeRDD(Array((1, “tom”), (2, “tomas”)))
Val rdd1 = sc.makeRDD(Array((1, 100), (2, 80)))
rdd1.join(rdd2).collect()
array((1,(tom,800)),(2,(tomas,700)))
19) intersection
提取RDD之間的交集
val rdd1 = sc.makeRDD(Array(“tom”,”tomas”,”tomaslee”))
val rdd2 = sc.makeRDD(Array(“tomas”,”tomaslee”,”tomason”))
rdd1.intersection().collect()
Array(tomaslee, tomas)
20) cogroup
輸入數(shù)據(jù)集(k, v)和另外一個(gè)數(shù)據(jù)集(k, w)進(jìn)行cogroup,得到一個(gè)格式(k, Seq[v], Seq[W])的數(shù)據(jù)集。
rdd0 = sc.makeRDD(Array((1, “tom”),(2,”tomas”),(3,”tomasLee”) ))
rdd0.cogroup(sc.makeRDD(Array((1,”hebei”),(2,”henan”),(3,”hexi”))))
Array((1,(CompactBuffer(tom),CompactBuffer(hebei))), (2,(CompactBuffer(tomas),CompactBuffer(henan))), (3,(CompactBuffer(tomasLee),CompactBuffer(hex))))
21) cache / persist
cache是特殊的persist,只在內(nèi)存中對(duì)RDD的結(jié)果進(jìn)行保存(一旦關(guān)掉就沒(méi)有了)。
val rdd = sc.makeRDD(1 to 10).map(e=>(println(e);e))
rdd.collect
rdd.cache
rdd.collect
rdd.presist() == rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.presist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
22) pipe
對(duì)每個(gè)分區(qū)執(zhí)行shell命令
val r = sc.makeRDD(1 to 5).pipe(“echo hahah”).collect #hahah個(gè)數(shù)同分區(qū)數(shù)
val rdd18=sc.parallelize(1 to 9,3)
rdd18.pipe(“head -n 1”).collect #取每個(gè)分區(qū)的第一個(gè)數(shù)
23) randomSplit
Val rdd19=rdd1.randomSplit(Array(0.3,0.7), 1)
rdd19(0).collect
rdd19(1).collect
24) Zip
Val rdd21_1=sc.parallelize(Array(1,2,3,4), 3)
Val rdd21_2=sc.parallelize(Array(“a”,”b”,”c”,”d”), 3)
Val rdd21_3=rdd21_1.zip(rdd21_2)
將每個(gè)分區(qū)的所有元素放到一個(gè)數(shù)組中,形成RDD,RDD的每個(gè)元素是數(shù)組,數(shù)組長(zhǎng)度等于分區(qū)個(gè)數(shù)
Val rdd = sc. parallelize(1 to 10, 4).glom().collect()
Array<array<1,2>, array<3,4,5>,array<6,7>,array<8,9,10>>
25) keyBy
將rdd的元素和一個(gè)變換之后的值組合形成元組
val rdd = sc.makeRDD(1 to 10)
rdd.keyBy(_ * 2).collect
Array[(Int, Int)] = Array((2,1), (4,2), (6,3), (8,4), (10,5), (12,6), (14,7), (16,8), (18,9), (20,10))
26) max | min | mean
Rdd.max
27) repartitionAndSortWithinPartitions
通過(guò)指定分區(qū)函數(shù)實(shí)現(xiàn)再分區(qū)并在分區(qū)內(nèi)排序
二、RDD行動(dòng)操作(會(huì)立即執(zhí)行,返回?cái)?shù)組)
1) reduce
val rdd1=sc.parallelize(1 to 9, 3)
val rdd2=rdd1.reduce(_+_)
2) collect
3) count
4) first
5) take
6) takesample
類似于sample,但takeSample是行動(dòng)操作,所以返回的是數(shù)組
Rdd1.takeSample(true, 4)
7) takeOrdered
takeOrdered(n, [ordering])是返回包含隨機(jī)的n個(gè)元素的數(shù)組,按照順序輸出
8)SaveAsTextFile
把數(shù)據(jù)集中的元素寫到一個(gè)文本文件,Spark會(huì)對(duì)每個(gè)元素調(diào)用toString方法來(lái)把每個(gè)元素存成文本文件的一行。
r.saveAsTextFile(“/home/centos/aa”)
cd aa/
find .
ll
nano part-0000
9) saveAsSequenceFile
r.map(w=>(w,1)).saveAsSequenceFile(“home/centos/bb”)
cd bb/
ls
hdfs dfs -text file:///home/centos/bb/part-00000
10)countByKey
對(duì)于(k,v)類型的RDD,返回一個(gè)(k,int)的map, int為k的個(gè)數(shù)。
Val rdd =
sc.makeRDD(Array(1,”tom”),(2,”tomas”),(1,”tomasLee”))).countByKey.foreach(e=>println(e))
結(jié)果 (1,2)
(2,1)
11)foreach
Foreach(func)是對(duì)數(shù)據(jù)集中的每個(gè)元素都執(zhí)行func函數(shù)
總結(jié)
以上是生活随笔為你收集整理的sparkRDD相关操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2018-2019 1 20165203
- 下一篇: Python——因子分析(KMO检验和B