日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RDD -- Transformation算子分析

發(fā)布時間:2023/12/14 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RDD -- Transformation算子分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RDD

RDD(Resilient Distributed Datasets) ,彈性分布式數據集, 是分布式內存的一個抽象概念,RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區(qū)的集合,只能通過在其他RDD執(zhí)行確定的轉換操作(如map、join和group by)而創(chuàng)建,然而這些限制使得實現容錯的開銷很低。對開發(fā)者而言,RDD可以看作是Spark的一個對象,它本身運行于內存中,如讀文件是一個RDD,對文件計算是一個RDD,結果集也是一個RDD ,不同的分片、 數據之間的依賴 、key-value類型的map數據都可以看做RDD。(注意:來自百度百科)


RDD 操作分類

RDD操作分為兩種算子:Transformation 和 Actions。這兩種算子區(qū)分本質是否觸發(fā)任務提交。
Transformation:只是把依賴關系和轉換關系記錄在血統中并不會觸發(fā)任務提交。
Actions:遇到這種算子就會觸發(fā)任務提交,并把結果返回。


Transformation:

map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numPartitions]))
groupByKey([numPartitions])
reduceByKey(func, [numPartitions])
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
sortByKey([ascending], [numPartitions])
join(otherDataset, [numPartitions])
cogroup(otherDataset, [numPartitions])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)


RDD 繼承關系

map

官網 API 介紹

map(func) Return a new distributed dataset formed by passing each element of the source through a function func.

源碼

/*** Return a new RDD by applying a function to all elements of this RDD.*/def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

mapPartitions

官網 API 介紹

mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.

源碼

/*** Return a new RDD by first applying a function to all elements of this* RDD, and then flattening the results.*/def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}

Transformations 算子都是一樣,創(chuàng)建一個新的RDD,并沒有去提交計算任務。


例子


map

map:對集合中每個元素操作

def map[U: ClassTag](f: T => U): RDD[U]

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)val b = a.map(x => (x.length, x))b.collect.foreach(println)// (3,dog)// (5,tiger)// (4,lion)// (3,cat)// (7,panther)// (5,eagle)

filter

filter:過濾

def filter(f: T => Boolean): RDD[T]

val a = sc.parallelize(1 to 10, 3)val b = a.filter(_ % 2 == 0)b.collect.foreach(println)// 2// 4// 6// 8// 10

flatMap

flatMap和map很像,多了一個壓扁過程

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

val a = sc.parallelize(1 to 10, 5)a.flatMap(1 to _).collect.foreach(println)// 1// 1// 2// 1// 2// 3// 1// 2// 3// 4// 1// 2// 3// 4// 5// 1// 2// 3// 4// 5// 6// 1// 2// 3// 4// 5// 6// 7// 1// 2// 3// 4// 5// 6// 7// 8// 1// 2// 3// 4// 5// 6// 7// 8// 9// 1// 2// 3// 4// 5// 6// 7// 8// 9// 10

mapPartitions

mapPartitions:在每個分區(qū)中執(zhí)行map操作,和map操作的單位為單個元素,mapPartitions操作的單位為分區(qū),在map操作數據庫等消耗資源時,用mapPartitions優(yōu)化。

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

val a = sc.parallelize(1 to 9, 3)def myfunc[T](iter: Iterator[T]): Iterator[(T, T)] = {var res = List[(T, T)]()var pre = iter.nextwhile (iter.hasNext) {val cur = iter.nextres.::=(pre, cur)pre = cur}res.iterator}a.mapPartitions(myfunc).collect.foreach(println)// (2,3)// (1,2)// (5,6)// (4,5)// (8,9)// (7,8)

mapPartitionsWithIndex

mapPartitionsWithIndex:函數作用同mapPartitions,不過提供了兩個參數,第一個參數為分區(qū)的索引
def main(args: Array[String]): Unit = {//first()//second()third()}def first(): Unit = {val x = sc.parallelize(List(1, 2, 3, 4, 5, 7, 8, 9, 10), 3)def myfunc1(index: Int, iter: Iterator[Int]): Iterator[String] = {iter.map(x => index + ", " + x)}x.mapPartitionsWithIndex(myfunc1).collect().foreach(println)// 0, 1// 0, 2// 0, 3// 1, 4// 1, 5// 1, 7// 2, 8// 2, 9// 2, 10}def second(): Unit = {val randRDD = sc.parallelize(List((2, "cat"), (6, "mouse"), (7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)val rPartitioner = new RangePartitioner(3, randRDD)val partitioned = randRDD.partitionBy(rPartitioner)def myfunc2(index: Int, iter: Iterator[(Int, String)]): Iterator[String] = {iter.map(x => "[partID: " + index + ", val:" + x + "]")}partitioned.mapPartitionsWithIndex(myfunc2).collect().foreach(println)// [partID: 0, val:(2,cat)]// [partID: 0, val:(3,book)]// [partID: 0, val:(1,screen)]// [partID: 1, val:(4,tv)]// [partID: 1, val:(5,heater)]// [partID: 2, val:(6,mouse)]// [partID: 2, val:(7,cup)]}def third(): Unit = {val z = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)def myfunc3(index: Int, iter: Iterator[Int]): Iterator[String] = {iter.map(x => "[partID:" + index + ", val:" + x + "]")}z.mapPartitionsWithIndex(myfunc3).collect().foreach(println)// [partID:0, val:1]// [partID:0, val:2]// [partID:0, val:3]// [partID:1, val:4]// [partID:1, val:5]// [partID:1, val:6]}

sample

sample : 從原來RDD隨機抽樣出一部分元素組成一個新的RDD

def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T]

def main(args: Array[String]): Unit = {first()}def first(): Unit ={val a = sc.parallelize(1 to 10000,3)a.sample(false,0.001,444).collect().foreach(println)}// 120// 424// 477// 2349// 2691// 2773// 2988// 5143// 6449// 6659// 9820

union, ++

union:對于兩個數據集進行合并操作(不會去除重復元素)

def ++(other: RDD[T]): RDD[T]
def union(other: RDD[T]): RDD[T]

val a = sc.parallelize(1 to 7,1)val b = sc.parallelize(5 to 10,2)a.union(b).collect().foreach(println)a.++(b).collect().foreach(println)// 1// 2// 3// 4// 5// 6// 7// 5// 6// 7// 8// 9// 10

intersection

intersection : 求這個數據集的交集(會去除重復元素)

def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T]): RDD[T]

val x = sc.parallelize(1 to 20)val y = sc.parallelize(5 to 25)x.intersection(y).sortBy(x => x,true).collect().foreach(println)// 5// 6// 7// 8// 9// 10// 11// 12// 13// 14// 15// 16// 17// 18// 19// 20

distinct

distinct:去重

def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]

val x = sc.parallelize(1 to 10)x.union(x).distinct().collect().foreach(println)// 8// 1// 9// 10// 2// 3// 4// 5// 6// 7

groupByKey

groupByKey和reduceByKey雖然兩個函數都能得出正確的結果, 但reduceByKey函數更適合使用在大數據集上。 這是因為Spark知道它可以在每個分區(qū)移動數據之前將輸出數據與一個共用的key結合。

reduceByKey

reduceByKey:類似于mapreduce的reduce階段

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

val a = sc.parallelize(List("dog","cat","owl","gnu","ant",""))val animal = sc.parallelize(List("Lion","Deer","Leopard","Monkey","Elephant","Chimpanzees","Horse","Bear","Donkey","Kangaroo","Ox","Hedgehog","Sheep","Rhinoceros"))val b = a.union(animal).map(x => (x.length,x))b.reduceByKey((x,y)=> x+",\t"+y).collect().foreach(println)// (0,)// (8,Elephant, Kangaroo, Hedgehog)// (10,Rhinoceros)// (2,Ox)// (11,Chimpanzees)// (3,dog, cat, owl, gnu, ant)// (4,Lion, Deer, Bear)// (5,Horse, Sheep)// (6,Monkey, Donkey)// (7,Leopard)

aggregateByKey

aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ? U, combOp: (U, U) ? U) 先局部操作,再全局操作

zeroValue:分區(qū)操作初始值
seqOp:分區(qū)內操作規(guī)則
combOp:全局操作規(guī)則

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

val pairRDD = sc.parallelize(List(("cat", 2), ("cat", 5), ("mouse", 4), ("cat", 12), ("dog", 12), ("mouse", 2)))println(pairRDD.partitions.length)def func(index: Int, iter: Iterator[(String, Int)]): Iterator[String] = {iter.map(x => "partID:" + index + ",val:" + x)}pairRDD.mapPartitionsWithIndex(func).collect().foreach(println)// partID:0,val:(cat,2)// partID:1,val:(cat,5)// partID:1,val:(mouse,4)// partID:2,val:(cat,12)// partID:3,val:(dog,12)// partID:3,val:(mouse,2)pairRDD.aggregateByKey(0)(math.max(_, _), math.max(_, _)).collect().foreach(println)// (dog,12)// (mouse,4)// (cat,12)pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect().foreach(println)// (dog,12)// (mouse,6)// (cat,19)pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect().foreach(println)// (dog,100)// (mouse,200)// (cat,300)pairRDD.aggregateByKey(100)(_ + _, _ + _).collect().foreach(println)// (dog,112)// (mouse,206)// (cat,319)

join

join:相同的key join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

val a = sc.parallelize(List("dog","salmon","salmon","rat","elephant"))val b = a.keyBy(_.length)b.collect().foreach(println)val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"))val d = c.keyBy(_.length)d.collect().foreach(println)b.join(d).collect().foreach(println)

cogroup, groupWith

cogroup / groupWith : 是對最多三個RDD里key相同的,合并成集合

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))]

val a = sc.parallelize(List(1,2,1,3,4,5,1,2,3,1,2,3))val b = a.map(x=>(x,"b"))b.collect().foreach(println)val c = a.map((_,"c"))val d = a.map(x=>(x,"d"))c.collect().foreach(println)d.collect().foreach(println)b.cogroup(c).collect().foreach(println)b.groupWith(c).collect().foreach(println)b.cogroup(c,d).collect().foreach(println)val x = sc.parallelize(List((1,"apple"),(2,"banana"),(3,"orange"),(4,"kiwi")))val y = sc.parallelize(List((5,"computer"),(1,"laptop"),(1,"desktop"),(4,"iPad")))x.cogroup(y).collect().foreach(println)

repartitionAndSortWithinPartitions

repartitionAndSortWithinPartitions :根據給定的分區(qū)程序重新分區(qū)RDD,并在每個結果分區(qū)中根據鍵對記錄進行排序。

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

val randRDD = sc.parallelize(List((2,"cat"),(6,"mouse"),(7,"cup"),(3,"book"),(4,"tv"),(1,"screen"),(5,"heater")),3)val rPartitioner = new RangePartitioner(3,randRDD)val partitioned2 = randRDD.repartitionAndSortWithinPartitions(rPartitioner)partitioned2.mapPartitionsWithIndex(myfunc).collect().foreach(println)def myfunc2(index:Int,iter:Iterator[(Int,String)]):Iterator[String] = {iter.map(x => "partID:"+index+", val:"+x)}partitioned2.mapPartitionsWithIndex(myfunc2).collect().foreach(println)

車遙遙,馬憧憧。

君游東山東復東,安得奮飛逐西風。

愿我如星君如月,夜夜流光相皎潔。

月暫晦,星常明。

留明待月復,三五共盈盈。


總結

以上是生活随笔為你收集整理的RDD -- Transformation算子分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。