日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[scala-spark]10. RDD转换操作

發布時間:2025/3/15 编程问答 14 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [scala-spark]10. RDD转换操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RDD提供了一組非常豐富的操作來操作數據,如:map,flatMap,filter等轉換操作,以及SaveAsTextFile,conutByKey等行動操作。這里僅僅綜述了轉換操作。

  • map

map是對RDD中的每一個元素都執行一個指定的函數來產生一個新的RDD,RDD之間的元素是一對一的關系。

val rdd1: RDD[Int] = sc.parallelize(1 to 9, 3) val rdd2: RDD[Int] = rdd1.map(_ * 2) printResult("map", rdd2) // 結果:map >> List(2, 4, 6, 8, 10, 12, 14, 16, 18)
  • flapMap

flatMap類似于map,但是每一個輸入元素,會被映射為0到多個出輸出元素(即func函數的返回值是一個Seq,而不是單一元素)的新的RDD,RDD之間的元素是一對多關系。

val rdd3: RDD[Int] = rdd2.filter(x => x > 10).flatMap(x => x to 21) printResult("flatMap", rdd3) // 結果:flatMap >> List(12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 14, 15, 16, 17, 18, 19, 20, 21, 16, 17, 18, 19, 20, 21, 18, 19, 20, 21)
  • filter

filter是對RDD元素進行過濾,返回一個新的數據集,有經過func函數后返回值為true的元素組成。

val rdd4 = rdd2.filter(x => x > 11) printResult("filter", rdd4) // 結果:filter >> List(12, 14, 16, 18)
  • mapPartitions

mapPartitions是map的一個變種。map的輸入函數應用于RDD中的每一個元素,而mapPartitions的輸入函數應用于每一個分區的數據,也就是把每一個分區中的內容作為整體來處理。

函數定義:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] val rdd5: RDD[(Int, Int)] = rdd1.repartition(2).mapPartitions((iter: Iterator[Int]) => { val lst: ListBuffer[(Int, Int)] = new ListBuffer[(Int, Int)]() var prev: Int = 0 var current: Int = 0while (iter.hasNext) {current = iter.nextlst += ((prev, current))prev = current}lst.iterator}) printResult("mapPartitions", rdd5) 結果:mapPartitions >> List((0,1), (1,3), (3,5), (5,7), (7,9), (0,2), (2,4), (4,6), (6,8))
  • mapPartitionsWithIndex

mapPartitionsWithIndex于mapPartitions的功能類似,只是多傳入split index而已,所有func函數必須是(Int,Iterator<T>)=> Iterator<U>類型。

val rdd6 = rdd1.repartition(2).mapPartitionsWithIndex((idx, iter) => { val lst = new ListBuffer[String]() var sum = 0 while (iter.hasNext) {sum += iter.next}lst += (idx + ":" + sum)lst.iterator }) printResult("mapPartitionsWithIndex", rdd6) // 結果:mapPartitionsWithIndex >> List(0:25, 1:20)
  • sample

sample(withReplacemet,fraction,seed)是根據給定的隨機種子seed,隨機抽樣出數量為fraction的數據。其中,withReplacement:是否放回抽樣;fraction:比抽樣比例,0.1表示抽樣10%,seed:隨機種子,相同的seed得到的隨機序列是一樣的。所以,如果沒有設置seed,同一段代碼執行兩遍得到的隨機序列是一樣的。
?

val sampleRDD = sc.parallelize(0 to 1000, 2) val rdd7 = sampleRDD.sample(false, 0.02, 2) printResult("sample", rdd7) // 結果:sample >> List(10, 42, 110, 121, 145, 158, 166, 234, 237, 253, 266, 343, 354, 393, 404, 457, 460, 662, 728, 738, 806, 808, 868, 887, 889, 934, 952)
  • union

union(otherDataset)是數據的合并,返回一個新的數據集,由原數據集和otherDataset合并而成的一個數據集RDD。

val rdd8 = rdd1.union(rdd2) printResult("union", rdd8) // 結果:union >> List(1, 2, 3, 4, 5, 6, 7, 8, 9, 2, 4, 6, 8, 10, 12, 14, 16, 18)
  • intersection

intersection(otherDataset)是數據交集,返回一個新的數據集,它是兩個數據集的交集數據。

val rdd9 = rdd8.intersection(rdd1) printResult("intersection", rdd9) // 結果:intersection >> List(4, 8, 1, 9, 5, 6, 2, 3, 7)
  • distinct

distince(numPartitions)是對數據集進行去重,返回一個新的數據集,它是對兩個數據集去掉重復數據后得到的一個數據集。其中,numPartitions參數是設置任務并行數量。

val rdd10 = rdd8.union(rdd9).distinct(2) printResult("distinct", rdd10) // 結果:distinct >> List(4, 16, 14, 6, 8, 12, 18, 10, 2, 1, 3, 7, 9, 5)
  • groupByKey

groupByKey(partitioner)是對數據進行分組操作,在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意,默認情況下,使用8個并行任務進行分組,可以傳入partitioner參數設置并行任務的分區數.

val rddMap: RDD[(Int, Int)] = rdd1.map(item => (item % 3, item)) val rdd11 = rddMap.groupByKey(); printResult("groupByKey", rdd11) // 結果:groupByKey >> List((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)), (1,CompactBuffer(1, 4, 7)))
  • reduceByKey

reduceByKey(func, numPartitions)是對數據進行分組聚合操作,在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集。Key是相同的值,都被使用指定的reduce函數聚合到一起。和groupByKey類似,可以通過參數numPartitions設置并行任務的分區數。

val rdd12 = rddMap.reduceByKey((x, y) => x + y) printResult("reduceByKey", rdd12) // 結果:reduceByKey >> List((0,18), (2,15), (1,12))
  • sortByKey

sortByKey(ascending, numPartitions)是對RDD中的數據集進行排序操作,對(K,V)類型的數據按照K進行排序,其中K需要實現Ordered方法。

第一個參數是ascending,該參數決定排序后RDD中的元素是升序還是降序,默認是true,按升序排序。
第二個參數是numPartitions,即排序分區的并行任務個數。

val rdd15 = rddMap.sortByKey(false) printResult("sortByKey", rdd15) // 結果:sortByKey >> List((2,2), (2,5), (2,8), (1,1), (1,4), (1,7), (0,3), (0,6), (0,9))
  • aggregateByKey

aggregateByKey(zeroValue, numPartitions)(seqOp: (U, V) => U,combOp: (U, U) => U)和reduceByKey的不同在于:reduceByKey輸入/輸出都是(K,V),而aggregateByKey輸出是(K,U),可以不同于輸入(K,U)。
ggregateByKey的三個參數如下: ? ? ? ?

  • zeroValue:U,初始值,比如初始值為0或空列表。 ? ? ? ?
  • numPartitions:指定并行任務的分區數。 ? ? ? ?
  • seqOp:(U,V)=>U,seq操作符,描述如何將T合并入U,比如如何將item合并到列表中。 ? ? ? ?
  • combOp: (U, U) => U,comb操作符,描述如何合并兩個U,比如合并兩個列表。 ? ? ? ?
  • 所以,可以將aggregateByKey函數抽象成更高級的,更靈活的reduce和group的組合。

    val rdd13 = rddMap.aggregateByKey(0)(seqOp = (x, y) => {math.max(x, y)},combOp = (x, y) => {x + y}) printResult("aggregateByKey", rdd13) // 結果:aggregateByKey >> List((0,12), (2,10), (1,11))
    • combineByKey

    combineByKey(createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,numPartitions: Int)是對RDD中的數據按照Key進行聚合操作。聚合操作的邏輯是通過自定義函數提供給combineByKey。?把(K,V)類型的RDD轉換為(K,C)類型的RDD,C和Vk可以不一樣。
    combineyey的三個參數如下: ?

  • ?createCombiner:在遍歷(k,v)時,如果combineByKey的第一次遇到值為K的key(類型K),那么將對這個(K,V)調用createCombiner函數,將V轉換為c(類型C,聚合對象的類型)。 ? ? ?
  • mergeValue:在遍歷(k,v)時,如果comineByKey不是第一次遇到值為k的key(類型為K),那么將對這個(k,v)調用mergeValue函數,它的作用是將v累加到聚合對象(類型C)中,mergeValue的類型是(C,V)=>C,參數中的C遍歷到此處的聚合對象,然后對v進行聚合得到新的聚合對象值。 ? ? ?
  • mergeCombiners:combineByKey實在分布式環境中執行的,RDD的每個分區單獨進行combineBykey操作,最后需要對各個分區的結果進行最后的聚合。它的函數類型是(C,C)=>C,每個參數是分區聚合得到的聚合對象。?
  • val rdd14 = rdd1.map(item => (item % 4, item)).mapValues(v => v.toDouble).combineByKey((v: Double) => (v, 1),(c: (Double, Int), v: Double) => (c._1 + v, c._2 + 1),(c1: (Double, Int), c2: (Double, Int)) => (c1._1 + c2._1, c1._2 + c2._2)) printResult("combineByKey", rdd14) // 結果:combineByKey >> List((0,(12.0,2)), (2,(8.0,2)), (1,(15.0,3)), (3,(10.0,2)))
    • jion

    join(other, partitioner)是連接操作,將數據的數據集(K,V)和另外一個數據集(K,W)進行join,得到(K,(V,W))。該操作是對于相同K的V和W集合進行笛卡爾積操作,也即V和W的所有組合。連接操作除了join外,還有左連接,右連接,全連接操作函數:leftOuterJoin,rightOuterJoin和fullOuterJoin,它們的用法基本上是一樣的。

    val rddMap1 = rdd1.map(item => (item % 4, item)) val rdd16 = rddMap.join(rddMap1) printResult("join", rdd16) // 結果:join >> List((0,(3,4)), (0,(3,8)), (0,(6,4)), (0,(6,8)), (0,(9,4)), (0,(9,8)), (2,(2,2)), (2,(2,6)), (2,(5,2)), (2,(5,6)), (2,(8,2)), (2,(8,6)), (1,(1,1)), (1,(1,5)), (1,(1,9)), (1,(4,1)), (1,(4,5)), (1,(4,9)), (1,(7,1)), (1,(7,5)), (1,(7,9)))
    • pipe

    pipe(command: Seq[String],env: Map[String, String])是以shell命令處理RDD數據。

    val rdd19: RDD[String] = rdd1.pipe("head -n 1") printResult("pipe", rdd19) // 結果:pipe >> List(1,4,7)
    • subtract

    subtract(other: RDD[T], numPartitions: Int)是RDD對other數據集進行減法操作,將輸入的rdd中的元素減去other中的元素,得到他們差值的一個新的RDD。

    val rdd21 = rdd1.subtract(sc.parallelize(1 to 5)) printResult("subtract", rdd21) // 結果:subtract >> List(6, 9, 7, 8)
    • zip

    zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]是對兩個RDD進行拉鏈操作,得到一個新的RDD[T,U]。拉鏈操作還包括:zipWitIndex和zipPartitinos。

    val rdd22 = rdd1.repartition(3).zip(sc.parallelize(Array("A", "B", "C", "D", "E", "F", "G", "H", "I"), 3)) printResult("zip", rdd22) // 結果: zip >> List((3,A), (6,B), (8,C), (1,D), (4,E), (9,F), (2,G), (5,H), (7,I))
    • coalesce 和?repartition

    coalesce(numPartitions: Int)是對RDD進行重分區,默認不進行shuffle,且該RDD的分區個數等于numPartitions個數。

    repartition(numPartitions: Int)是將RDD進行重新分區,分區過程中會進行shuffle,調整分區數量為numPartitions。

    總結

    以上是生活随笔為你收集整理的[scala-spark]10. RDD转换操作的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 国产一级在线观看视频 | 免费a视频 | 91精品欧美一区二区三区 | 狠狠干91| 欧美cccc极品丰满hd | 日本猛少妇色xxxxx | 久久久久国产 | 国产原创在线视频 | 久久久久久久99 | 天天综合网天天综合色 | 夜夜爽爽 | 亚洲资源在线观看 | 全黄一级片 | 大香蕉视频一区二区 | 日本japanese极品少妇 | 自拍偷拍五月天 | 91爱视频 | 欧美日韩激情在线观看 | 日韩一级免费观看 | 97人人射 | 国产操操操 | 强伦人妻一区二区三区 | 女同一区二区 | 男人天堂tv | 亚洲午夜久久久久久久久 | 亚洲国产精品成人av | 四川丰满妇女毛片四川话 | 69性影院 | 天堂av一区二区 | 女生下面流水视频 | 国产高清视频在线免费观看 | 免费视频色 | 色一情一乱一伦一区二区三区 | 香蕉视频污视频 | 一区二区三区在线观看免费视频 | 精品一区二区三区成人免费视频 | 福利视频不卡 | 少妇高潮毛片 | www.色黄| 少妇一区二区三区 | 欧美黄色图片 | 97人妻精品一区二区三区免费 | 日本精品不卡 | 黑人一区二区三区 | 8x8x最新网址 | 嫩草一二三 | 婷婷深爱激情 | 伊人久久综合影院 | 无码人妻丰满熟妇区毛片蜜桃精品 | 日本综合视频 | cao死你 | 精品人妻码一区二区三区红楼视频 | 日本黄色网址大全 | 亚洲视频免费在线播放 | 91视频在线观看 | 欧美激情精品久久久久久免费 | 国产精品高清无码 | 久久伊人成人网 | 美女视频免费在线观看 | 好色婷婷| 国产成人无码AA精品区 | 狠狠综合| 欧洲一区二区三区在线 | 97香蕉超级碰碰久久免费软件 | 九色婷婷| 免费黄色三级网站 | 天天干天天天 | 精品国产一区一区二区三亚瑟 | 人人妻人人澡人人爽久久av | 羞羞网站在线看 | 黄色一级在线播放 | 欧洲一区二区三区 | 国产精品1000| 国产一二三在线 | 国产女人水真多18毛片18精品 | 亚洲欧美999 | 粗喘呻吟撞击猛烈疯狂 | 精品国产乱码久久久久久蜜臀网站 | 寻找身体恐怖电影免费播放 | 日韩精品短片 | 肉丝袜脚交视频一区二区 | 最近中文字幕免费 | 亚洲av电影天堂男人的天堂 | 久久久一二三区 | 欧美三级在线视频 | 影音先锋国产精品 | 成人福利视频网 | 蜜臀人妻四季av一区二区不卡 | 国产精品suv一区二区 | 欧美极品在线 | 动漫精品一区一码二码三码四码 | 欧美videossex另类| 国产欧美啪啪 | av资源网址 | 亚洲最大黄色 | 精品人妻一区二区三区蜜桃视频 | 国产专区一 | 12av毛片 | wwwyoujizz日本|