日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 综合教程 >内容正文

综合教程

sparkRDD相关操作

發(fā)布時(shí)間:2023/12/13 综合教程 24 生活家
生活随笔 收集整理的這篇文章主要介紹了 sparkRDD相关操作 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

RDD(彈性分布式數(shù)據(jù)集)。RDD以分區(qū)中的每一行進(jìn)行分布式計(jì)算。父子依賴關(guān)系。

一、RDD創(chuàng)建操作

1)數(shù)據(jù)集合

Val data=Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

Val distData = sc.parallelize(data, 3) #分區(qū),生成RDD數(shù)據(jù)集

Val distData =sc.parallelize(1 to 10, 2) #2是并行程度,指定多少線程同時(shí)執(zhí)行。

distData.collect

distData.take(1)

sc.makeRDD(1 to 10, 4).map(e=> {val tname=Thread.currentThread().getName; println(tname + ":" +e)}).collect

2)外部讀取

Val distFile1 = sc.textFile(“data.txt”) /#本地當(dāng)前目錄下文件或指定目錄下文件

Val distFile2 = sc.textFile(“hdfs://192.168.1.100:9000/input/data.txt”)#HDFS文件

textFile(“/input/001.txt, /input/002.txt”)#讀取多個(gè)文件

textFile(“/input/*.txt”)#讀取含通配符路徑

二、RDD轉(zhuǎn)換操作(不會(huì)立即執(zhí)行,返回RDD)

1) Map

Map是對(duì)RDD中每個(gè)元素都執(zhí)行一個(gè)指定的函數(shù)來(lái)生成一個(gè)新的RDD

Val rdd1=sc.parallelize(1 to 9, 3)

Val rdd2=rdd1.map(x=>x*2)

Rdd2.collect

2) Filter

Filter是對(duì)RDD元素進(jìn)行過(guò)濾,返回一個(gè)新的數(shù)據(jù)集,是經(jīng)過(guò)func函數(shù)后返回值為True的原元素組成

Val rdd3=rdd2.filter (x=>x>10)

(12, 14, 16)

3) Top

提取rdd最大的n個(gè)元素

rdd1.top(1)

rdd1.top(1)(scala.math.Ordering.String.reverse) #倒序

4) flatMap

類似于map,但是它是一對(duì)多關(guān)系

rdd3.flatMap(x => x to 20)

(12,13,14,15,16,17,18,19,20,14,15,16,17,18,19,20,16,17,18,19,20)

5) mapPartitons

是map的一種變種,mapPartitions的輸入函數(shù)是每個(gè)分區(qū)的數(shù)據(jù),也就是把每個(gè)分區(qū)中的內(nèi)容作為整體來(lái)處理 。

6) repatition

再分區(qū)

rdd.repartition(4)

7) sample

Sample(withReplacement, fraction, seed) 第一個(gè)參數(shù)是是否為又放回抽樣,第二個(gè)參數(shù)是比例。

Val a=sc.parallelize(1 to 10000,3)

a.sample(false, 0.1).collect().foreach(println)

8) union

數(shù)據(jù)合并,返回一個(gè)新的數(shù)據(jù)集

Val rdd8=rdd1.union(rdd3)

Rdd8.collect

9) intersection

數(shù)據(jù)交集

Val rdd9=rdd8.intersection(rdd1)

10) distinct

數(shù)據(jù)去重

Val rdd10=rdd8.union(rdd9).distinct

11) groupBy

對(duì)RDD元素進(jìn)行分組

val rdd = sc.parallelize(Array((“tom”,10),(“tomas”,12),(“tomlee”,12),(“tomsan”,10))

val rdd2 = rdd.groupBy(e => e _2)

rdd2.collect()

Array((10.CompareBuffer((tom,10),(tomsam,10)),12.CompareBuffer((tomas,12),(tomlee,12))))

12) groupByKey

根據(jù)Key進(jìn)行分組,迭代部分都是value

Val rdd0=sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)

Val rdd1=rdd0.groupByKey()

Array((1,ArrayBuffer(1,2,3)),(2,ArrayBuffer(1,2,3)))

13) groupWith

兩個(gè)RDD

100->tom

200->tomas

100->20

200->30

Val rdd1 = sc.makeRDD(Array((100,”tom”),(200, “tomas”)))

Val rdd2 = sc.makeRDD(Array((100, 20),(200, 30)))

rdd1.groupWith(rdd2)

Array((100,(CompactBuffer(tom),CompactBuffer(20))),(200,(CompactBuffer(tomas),CompactBuffer(30))))

14) reduceBykey

數(shù)組的分組聚合操作

Val rdd12=rdd0.reduceByKey((x,y)=>x+y)

Array((1,6),(2,6))

15) aggregeteByKey

更加靈活的一個(gè)函數(shù)。三個(gè)參數(shù),第一個(gè)是初始值,第二個(gè)是給每個(gè)元素值進(jìn)行的函數(shù)操作,第三個(gè)是根據(jù)Key做相應(yīng)的合并操作

Val z=sc.parallelize(list((1,3),(1,2),(1,4),(2,3)))

z.aggregateByKey(0)(math.max(_,_),_+_) #先將每個(gè)value值與初始值0比較大小,然后根據(jù)Key求和。

Array((2,3),(1,9))

16) conbineByKey

更加靈活的一個(gè)函數(shù),與reduceByKey不同,它可以同時(shí)計(jì)算求和和求次數(shù)。根據(jù)Key進(jìn)行聚合操作。

17) sortByKey

排序操作

Val rdd14=rdd0.sortBykey()

Rdd14.collect

18) join

連接兩個(gè)RDD,形成新的rdd,(跟groupwith區(qū)別是join是一對(duì)一,groupwith是分組,相同的放一起)

(1 tom) join (1 100) --> 1, (tom, 100)

(2 tomas) join (2 80) --> 2, (tomas, 80)

Val rdd1 = sc.makeRDD(Array((1, “tom”), (2, “tomas”)))

Val rdd1 = sc.makeRDD(Array((1, 100), (2, 80)))

rdd1.join(rdd2).collect()

array((1,(tom,800)),(2,(tomas,700)))

19) intersection

提取RDD之間的交集

val rdd1 = sc.makeRDD(Array(“tom”,”tomas”,”tomaslee”))

val rdd2 = sc.makeRDD(Array(“tomas”,”tomaslee”,”tomason”))

rdd1.intersection().collect()

Array(tomaslee, tomas)

20) cogroup

輸入數(shù)據(jù)集(k, v)和另外一個(gè)數(shù)據(jù)集(k, w)進(jìn)行cogroup,得到一個(gè)格式(k, Seq[v], Seq[W])的數(shù)據(jù)集。

rdd0 = sc.makeRDD(Array((1, “tom”),(2,”tomas”),(3,”tomasLee”) ))

rdd0.cogroup(sc.makeRDD(Array((1,”hebei”),(2,”henan”),(3,”hexi”))))

Array((1,(CompactBuffer(tom),CompactBuffer(hebei))), (2,(CompactBuffer(tomas),CompactBuffer(henan))), (3,(CompactBuffer(tomasLee),CompactBuffer(hex))))

21) cache / persist

cache是特殊的persist,只在內(nèi)存中對(duì)RDD的結(jié)果進(jìn)行保存(一旦關(guān)掉就沒(méi)有了)。

val rdd = sc.makeRDD(1 to 10).map(e=>(println(e);e))

rdd.collect

rdd.cache

rdd.collect

rdd.presist() == rdd.persist(StorageLevel.MEMORY_ONLY)

rdd.presist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

22) pipe

對(duì)每個(gè)分區(qū)執(zhí)行shell命令

val r = sc.makeRDD(1 to 5).pipe(“echo hahah”).collect #hahah個(gè)數(shù)同分區(qū)數(shù)

val rdd18=sc.parallelize(1 to 9,3)

rdd18.pipe(“head -n 1”).collect #取每個(gè)分區(qū)的第一個(gè)數(shù)

23) randomSplit

Val rdd19=rdd1.randomSplit(Array(0.3,0.7), 1)

rdd19(0).collect

rdd19(1).collect

24) Zip

Val rdd21_1=sc.parallelize(Array(1,2,3,4), 3)

Val rdd21_2=sc.parallelize(Array(“a”,”b”,”c”,”d”), 3)

Val rdd21_3=rdd21_1.zip(rdd21_2)

將每個(gè)分區(qū)的所有元素放到一個(gè)數(shù)組中,形成RDD,RDD的每個(gè)元素是數(shù)組,數(shù)組長(zhǎng)度等于分區(qū)個(gè)數(shù)

Val rdd = sc. parallelize(1 to 10, 4).glom().collect()

Array<array<1,2>, array<3,4,5>,array<6,7>,array<8,9,10>>

25) keyBy

將rdd的元素和一個(gè)變換之后的值組合形成元組

val rdd = sc.makeRDD(1 to 10)

rdd.keyBy(_ * 2).collect

Array[(Int, Int)] = Array((2,1), (4,2), (6,3), (8,4), (10,5), (12,6), (14,7), (16,8), (18,9), (20,10))

26) max | min | mean

Rdd.max

27) repartitionAndSortWithinPartitions

通過(guò)指定分區(qū)函數(shù)實(shí)現(xiàn)再分區(qū)并在分區(qū)內(nèi)排序

二、RDD行動(dòng)操作(會(huì)立即執(zhí)行,返回?cái)?shù)組)

1) reduce

val rdd1=sc.parallelize(1 to 9, 3)

val rdd2=rdd1.reduce(_+_)

2) collect

3) count

4) first

5) take

6) takesample

類似于sample,但takeSample是行動(dòng)操作,所以返回的是數(shù)組

Rdd1.takeSample(true, 4)

7) takeOrdered

takeOrdered(n, [ordering])是返回包含隨機(jī)的n個(gè)元素的數(shù)組,按照順序輸出

8)SaveAsTextFile

把數(shù)據(jù)集中的元素寫到一個(gè)文本文件,Spark會(huì)對(duì)每個(gè)元素調(diào)用toString方法來(lái)把每個(gè)元素存成文本文件的一行。

r.saveAsTextFile(“/home/centos/aa”)

cd aa/

find .

ll

nano part-0000

9) saveAsSequenceFile

r.map(w=>(w,1)).saveAsSequenceFile(“home/centos/bb”)

cd bb/

ls

hdfs dfs -text file:///home/centos/bb/part-00000

10)countByKey

對(duì)于(k,v)類型的RDD,返回一個(gè)(k,int)的map, int為k的個(gè)數(shù)。

Val rdd =

sc.makeRDD(Array(1,”tom”),(2,”tomas”),(1,”tomasLee”))).countByKey.foreach(e=>println(e))

結(jié)果 (1,2)

(2,1)

11)foreach

Foreach(func)是對(duì)數(shù)據(jù)集中的每個(gè)元素都執(zhí)行func函數(shù)

總結(jié)

以上是生活随笔為你收集整理的sparkRDD相关操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 蜜臀久久99精品久久久 | 国产天堂一区 | 一级黄色在线视频 | www.天堂av | 亚洲热在线观看 | 亚洲人一区 | 国产女人18毛片水真多 | 色屋永久 | 最好看的2019年中文在线观看 | avtt2015| 97在线免费观看 | 国产黄a三级三级三级看三级男男 | 无码人妻一区二区三区在线 | 中文第一页 | 国产特黄大片aaaa毛片 | 岛国精品在线 | 强制憋尿play黄文尿奴 | 成 年人 黄 色 片 | 欧美手机看片 | 日韩经典第一页 | 中文字幕精品一区二区三区精品 | 欧美大片高清免费观看 | 欧美视频网址 | 激情久久av一区av二区av三区 | 免费看a网站 | 成人在线观看免费爱爱 | 精品一区二区三区四区五区六区 | 中文在线字幕免费观 | 黄色小说在线免费观看 | 精品国产一区二区在线 | 99久精品视频 | 成年人在线观看av | 8090理论片午夜理伦片 | 欧美岛国国产 | 欧美一级免费视频 | 国精产品一区二区三区 | 男人天堂a在线 | 亚洲成人18 | 五月天婷婷视频 | 中文字幕亚洲欧美日韩在线不卡 | 夜夜嗨av一区二区三区免费区 | 色秀视频网 | 蜜桃av中文字幕 | 欧美裸体精品 | 美女精品一区 | 亚洲成av人片一区二区梦乃 | 色偷偷噜噜噜亚洲男人的天堂 | 色女人网站 | 日韩性av | 日本黄网站色大片免费观看 | 亚洲av毛片一区二二区三三区 | 男人干女人视频 | 超碰人人草人人干 | 午夜在线播放视频 | 久操操| 西野翔夫の目の前で犯在线 | 啪啪亚洲| 99视频国产精品 | 国产传媒精品 | 欧美顶级metart裸体全部自慰 | 久久精品国产熟女亚洲AV麻豆 | 国产视频久久久久 | 欧美视频亚洲 | 乳女教师の诱惑julia | 中文 欧美 日韩 | 国产久草av | 韩国三级bd高清中字2021 | av在线地址 | 99r热| 亚洲 小说 欧美 激情 另类 | 嫩草影院在线免费观看 | 亚洲国产日韩一区无码精品久久久 | 亚洲免费精品视频在线观看 | 9.1成人看片免费版 日韩经典在线 | 久久无码人妻一区二区三区 | 人人爱操 | 精品国产乱码久久久人妻 | 日本成人午夜视频 | 麻豆网站入口 | 17c国产精品一区二区 | 久艹伊人| 99视频网站 | 欧美视频综合 | 国产网站在线看 | 日本一区二区三区在线免费观看 | 欧洲色av | 黄色小视频在线播放 | eeuss一区 | 午夜免费一区 | 色妞网 | 成人亚洲天堂 | 午夜伦理影院 | 韩漫动漫免费大全在线观看 | 日本一本久草 | 中文字幕人妻色偷偷久久 | 欧美午夜精品一区 | 欧美一级视频免费 | 成人福利影院 | 天堂视频免费在线观看 |