日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark的Transformations算子(理解+实例)

發(fā)布時間:2023/12/10 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark的Transformations算子(理解+实例) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

把每個Transformations算子都敲著練習(xí)幾遍會理解的更深刻

Transformations算子之后要寫action算子才會進(jìn)行計算。

1. map(func)

描述:返回一個新的RDD,該RDD由每一個輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(1,2,3,4,5,6)val numRDD = sc.parallelize(arr)val resultRDD = numRDD.map(x => x * x)resultRDD.foreach(println)} 結(jié)果: 1 4 9 16 25 36

2. filter(func)

描述:返回一個新的RDD,該RDD經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(1,2,3,4,5,6)//parallelize()創(chuàng)建個rddval numRDD = sc.parallelize(arr)val resultRDD = numRDD.map(_%2 == 0)resultRDD.foreach(println)resultRDD.take(100).foreach(println)resultRDD.collect()} 結(jié)果: false true false true false true

3.flatMap(func)

描述:類似map,到每個輸入元素可以被映射為0個或者多個輸入元素(所以func返回一個序列,而不是一個元素)

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chun").setMaster("local")val sc = new SparkContext(conf)val words = Array("hello python","hello hadoop","hello spark")val wordRDD = sc.parallelize(words)wordRDD.flatMap(_.split(" ")).collect.foreach(println)} 結(jié)果: hello python hello hadoop hello spark

4.mapPartitions(func)

描述:類似map,但獨立在RDD的每個分區(qū)上運行,因此在類型為T的RDD上運行時,,func函數(shù)的類型必須是Iterator => Iterator

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chun").setMaster("local")val sc = new SparkContext(conf)val array = Array(1,2,1,2,2,3,4,5,6,7,8,9)val arrayRDD = sc.parallelize(array)arrayRDD.mapPartitions(elements =>{val result = new ArrayBuffer[Int]()elements.foreach(e =>{result +=e})result.iterator}).foreach(println)} 結(jié)果: 121223456789

5.mapPartitionsWithIndex(func)

描述:類似于mapPartitions,但func帶有一個整形參數(shù)表示分片的索引值,因此在類型為T的RDD上運行時func函數(shù)的類型必須(int,Iterator)=> Iterator

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2) //2表示分區(qū)數(shù)arrayRDD.mapPartitionsWithIndex((index,elements) =>{println("partition index:" + index)val result = new ArrayBuffer[Int]()elements.foreach(e =>{result += e})result.iterator}).foreach(println)} 運行結(jié)果: partition index:0 1 2 3 4partition index:1 5 6 7 8 9

6.sample(WithReplacement,fraction,seed)

描述:根據(jù)fraction指定的比例對數(shù)據(jù)進(jìn)行采樣,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(1 to 10000)val sampleRDD = arrayRDD.sample(true,0.001) //true表示抽樣之后放回println(sampleRDD.count())}結(jié)果:10 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(1 to 10000)val sampleRDD = arrayRDD.sample(false,0.001) //false表示抽樣之后不放回println(sampleRDD.count())結(jié)果:9}

7.union(otherDataset)

描述:對源RDD和參數(shù)RDD求并集后并返回一個新的RDD

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(11 to 20)val resultRDD = rdd1.union(rdd2)resultRDD.foreach(print)} 結(jié)果: 11121314151617181920

8.intersection(otherDataset)

描述:對源RDD和參數(shù)RDD求交集后并返回一個新的RDD

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(Array(1,3,5,7,8))val rdd2 = sc.parallelize(Array(3,5,7))val resultRDD = rdd1.intersection(rdd2)resultRDD.foreach(println)} 結(jié)果: 3 7 5

9.distinct([numTasks])

描述:對源RDD進(jìn)行去重,返回一個新的RDD

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(Tuple3("max","math",90),("max","englist",85),("mike","math",100))val scoreRDD = sc.parallelize(arr)val studentNumber = scoreRDD.map(_._1).distinct().collect()println(studentNumber.mkString(","))} 結(jié)果: max,mike

10.groupByKey([numTasks])

描述:在一個(k,v)形式的RDD上調(diào)用,返回一個(k,Iterator[V])的RDD

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)var x =0val arr = Array("chun1 chun2 chun3 chun1 chun1 chun2", "chun1")val arrRDD = sc.parallelize(arr)val resultRDD = arrRDD.flatMap(_.split(" ")).map((_,1)).groupByKey()//resultRDD.foreach(println)resultRDD.foreach(element => {println(element._1+" "+element._2.size)})} chun1 4 chun3 1 chun2 2

11.reduceByKey(func,[numTasks])

描述:在一個(k,v)形式的RDD上調(diào)用,返回一個(k,v)的RDD,使用指定的reduce函數(shù),將相同key的值聚集到一起,與groupBy類似,reudce任務(wù)的個數(shù)可以通過第二個參數(shù)來設(shè)置

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr =Array("chun1 chun2 chun3 chun1 chun1 chun2","chun1")val arrRDD=sc.parallelize(arr)val resultRDD = a.flatMap(_.split(" ")).map(x=>((x,1))).reduceByKey(_+_).collect.foreach(println)} 結(jié)果: (chun1,4) (chun3,1) (chun2,2)

12.aggregateByKey(zeroValue)(seqOp,combOP,[numTasks])

描述:當(dāng)調(diào)用(k,v)對的數(shù)據(jù)集時,返回(K,U)數(shù)據(jù)集,其中每個key的值使用給定的聚合函數(shù)和中性‘零’進(jìn)行聚合,與groupyKey類似,reduce任務(wù)的數(shù)量可以通過可選的第二個參數(shù)進(jìn)行配置

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val data = List((1,3),(1,4),(2,3),(3,6),(1,2),(3,8))val rdd =sc.parallelize(data)rdd.aggregateByKey(0)(math.max(_,_),_+_).collect(.foreach(println()))} 結(jié)果:(1,4) (3,8) (2,3)

13.sortByKey([ascending],[numTasks])

描述:在一個(k,v)形式的RDD上調(diào)用,k必須實現(xiàn)Ordered接口,返回一個按照key進(jìn)行排序的(k,v)的RDD

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val scores = Array(Tuple2("mike",80),("max",90),("bob",100))val scoresRDD = sc.parallelize(scores)val sortByKeyRDD = scoresRDD.map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1) //把元組k,v換位值進(jìn)行排序后,再換回來)sortByKeyRDD.collect.foreach(println)} (bob,100) (max,90) (mike,80)

14.join(otherDataset,[numTasks])

描述:當(dāng)調(diào)用(k,v)和(k,w)類型的數(shù)據(jù)集時,返回一個(k,(v,w))形式的數(shù)據(jù)集,支持left outer join、right outer join 和full outer join

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)//學(xué)生信息val students = Array(Tuple2(1,"max"),Tuple2(2,"mike"),Tuple2(3,"bob"))//分?jǐn)?shù)val scores = Array(Tuple2(1,90),Tuple2(2,120),Tuple2(3,80))val stuRDD = sc.parallelize(students)val scoresRDD = sc.parallelize(scores)//兩組kv對join,返回的是(k,(v,w))val resultRDD = stuRDD.join(scoresRDD).sortByKey()resultRDD.foreach(x => {println("id:" +x._1 +" name:"+x._2._1 + " score:"+x._2._2)println("=========================")})} 結(jié)果:id:1 name:max score:90 ========================= id:2 name:mike score:120 ========================= id:3 name:bob score:80 =========================

15.cogroup(otherDataset,[numTasks])

描述:當(dāng)調(diào)用(k,v)和(k,w)類型的數(shù)據(jù)集時,返回(k,(Iterator,Iterator))元組的數(shù)據(jù)集

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun1").setMaster("local")val sc = new SparkContext(conf)//學(xué)生信息val students = Array(("class1","max"),("class1","mike"),("class2","bob"))//分?jǐn)?shù)val scores = Array(("class1",90),("class1",120),("class2",80))val stuRDD = sc.parallelize(students)val scoresRDD = sc.parallelize(scores)val resultRDD = stuRDD.cogroup(scoresRDD).sortByKey()resultRDD.foreach(x =>{println("class:"+x._1)x._2._1.foreach(println)x._2._2.foreach(println) //可以去掉只顯示名字println("===========")})} 結(jié)果:class:class1 max mike 90 120 =========== class:class2 bob 80 ===========

16.cartesian(otherDataset)

描述:當(dāng)調(diào)用T和U類型的數(shù)據(jù)集時,返回一個(T,U)類型的數(shù)據(jù)集

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr1 = sc.parallelize(Array(1,3,5))val arr2 = sc.parallelize(Array(2,4,6))arr1.cartesian(arr2).collect().foreach(println)} (1,2) (1,4) (1,6) (3,2) (3,4) (3,6) (5,2) (5,4) (5,6)

17.pipe(command,[envVars])

描述:通過shell命令(例如perl或bash腳本)對RDD的每個分區(qū)進(jìn)行管道連接。RDD元素寫入進(jìn)程的stdin,輸出到其stdout的行作為字符串的RDD返回

18.coalesce(numpartitions)

描述:將RDD中的分區(qū)數(shù)減少到numpartitions,在過濾大型數(shù)據(jù)集后,可以更高效地運行操作

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(1 to 20,10)println(rdd1.partitions.length) //10var rdd2 = rdd1.coalesce(15,true)println(rdd2.partitions.length) //15var rdd3 = rdd1.repartition(15)println(rdd3.partitions.length) //15var rdd4 = rdd1.coalesce(15,false) //這種是不可以重新分區(qū)的println(rdd4.partitions.length) //10var rdd5 = rdd1.coalesce(2,false)println(rdd5.partitions.length) //2rdd5.foreach(print) //第一個區(qū):12345678910 第二個區(qū):11121314151617181920var rdd6 = rdd1.coalesce(2,true)println(rdd6.partitions.length) //2rdd6.foreach(print) //第一個區(qū):135791113151719 第二個區(qū):2468101214161820}

19.repartiton(numPartitions)

描述:隨機(jī)重組RDD中的數(shù)據(jù),以創(chuàng)建更多或更少的分區(qū),并在分區(qū)之間進(jìn)行平衡,總是會產(chǎn)生shuffle操作


repartition和coalesce

他們兩個都是RDD的分區(qū)進(jìn)行重新劃分,repartition只是coalesce接口中shuffle為true的簡易實現(xiàn),(假設(shè)RDD有N個分區(qū),需要重新劃分成M個分區(qū))
1)、N<M。一般情況下N個分區(qū)有數(shù)據(jù)分布不均勻的狀況,利用HashPartitioner函數(shù)將數(shù)據(jù)重新分區(qū)為M個,這時需要將shuffle設(shè)置為true。
2)如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以將N個分區(qū)中的若干個分區(qū)合并成一個新的分區(qū),最終合并為M個分區(qū),這時可以將shuff設(shè)置為false,在shuffl為false的情況下,如果M>N時,coalesce為無效的,不進(jìn)行shuffle過程,父RDD和子RDD之間是窄依賴關(guān)系。
3)如果N>M并且兩者相差懸殊,這時如果將shuffle設(shè)置為false,父子RDD是窄依賴關(guān)系,他們同處在一個Stage中,就可能造成spark程序的并行度不夠,從而影響性能,如果在M為1的時候,為了使coalesce之前的操作有更好的并行度,可以講shuffle設(shè)置為true。
總之:如果shuff為false時,如果傳入的參數(shù)大于現(xiàn)有的分區(qū)數(shù)目,RDD的分區(qū)數(shù)不變,也就是說不經(jīng)過shuffle,是無法將RDDde分區(qū)數(shù)變多的。

20.repartitionAndSortWithinPartitions(partitioner)

描述:根據(jù)給定的分區(qū)重新分區(qū)RDD,在每個結(jié)果分區(qū)中,根據(jù)它們的鍵對記錄進(jìn)行排序。這比調(diào)用重新分區(qū)更有效,然后在每個分區(qū)中進(jìn)行排序,因為它可以將排序推入到洗牌機(jī)器中。

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3) //3表示分區(qū)數(shù)arrayRDD.mapPartitionsWithIndex((index,elements) =>{ //index為索引值,elements數(shù)據(jù)println("partition index:" + index)val result = new ArrayBuffer[Int]()elements.foreach(e =>{result += e})result.iterator}).foreach(println)} 結(jié)果: partition index:0 1 2 3 partition index:1 4 5 6 partition index:2 7 8 9

總結(jié)

以上是生活随笔為你收集整理的Spark的Transformations算子(理解+实例)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 成人无码视频 | 亚洲精品久久久久中文字幕二区 | 久久久久人妻一区精品色欧美 | 伊人成人22| 伊人99在线| 寡妇高潮一级视频免费看 | 午夜精品久久久 | 美女隐私免费观看 | 在线观看国产黄色 | 国产精品一卡二卡在线观看 | xx久久| 国产精品va在线 | 欧美日本韩国一区 | 国产大片中文字幕在线观看 | 亚洲国产精品一 | 91艹| 国产又粗又猛又爽视频 | 日韩a在线观看 | 国产交换配乱淫视频免费 | 丰满少妇大力进入 | 亚洲黄色影视 | 国产欧美熟妇另类久久久 | av免费观看网址 | 99热一区 | 在线人成 | 天堂中文在线资源 | 精品久久网 | 用力挺进新婚白嫩少妇 | 国产又大又粗又爽 | 欧美激情一二三 | 一级黄毛片 | 久久成人免费电影 | 亚洲精品一区二区三区新线路 | 三上悠亚ssⅰn939无码播放 | www.com毛片 | 蜜臀一区 | 熟女人妻在线视频 | 午夜激情久久久 | 精品少妇爆乳无码av无码专区 | 自拍偷拍色图 | 国产精品无 | 色呦呦网站在线观看 | 亚洲欧美网 | 精品人妻一区二区三区四区不卡 | 成人软件在线观看 | 少妇被狂c下部羞羞漫画 | 香蕉小视频 | 大地资源中文第三页 | 脱美女衣服亲摸揉视频 | 91官网在线观看 | 亚洲av不卡一区二区 | 女人毛片视频 | 欧美处女| 男女视频久久 | 香蕉久热 | 成人福利社 | 国产欧美二区 | 国产又粗又猛又黄又爽的视频 | 国产欧美精品在线观看 | 久久久久伊人 | 国产真实的和子乱拍在线观看 | 欧美激情精品久久久久久蜜臀 | 日韩av在线播放观看 | 国产自产 | 亚洲二三区 | 爽妇网国产精品 | 国产精品成人免费精品自在线观看 | 天天看天天摸天天操 | 在线天堂中文字幕 | 中文字幕视频在线观看 | 色涩涩| 久久久精品91| 最新99热| 日韩精品一二 | 亚洲欧美日韩综合在线 | av片免费在线播放 | 日韩欧美啪啪 | 欧美国产一二三区 | 亚洲第一视频在线观看 | 国产成人a亚洲精品 | 无码av天堂一区二区三区 | 黄色a网站 | 182tv福利视频 | 国产又爽又黄免费软件 | www.日本在线观看 | 亚洲色婷婷一区二区三区 | 欧美性xxxxx| 男女午夜免费视频 | 国产在线你懂得 | 色哟哟一区 | 亚洲激情一区二区三区 | 午夜影院久久久 | 91麻豆免费看| 日日骚网 | 在线视频观看免费 | 国产精品国产三级国产传播 | 九九爱视频 | 国产精品久久久影院 | 美女又爽又黄视频毛茸茸 |