日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Spark算子讲解(一)

發(fā)布時間:2024/6/21 综合教程 27 生活家
生活随笔 收集整理的這篇文章主要介紹了 Spark算子讲解(一) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1:Zip算子

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

將兩個RDD做zip操作,如果當兩個RDD分區(qū)數(shù)目不一樣的話或每一個分區(qū)數(shù)目不一樣的話則會異常。

例如:

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2)
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6),3)
rdd.zip(rdd1).collect

異常信息:

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 3)
  at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

例如:

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2)
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7),2)
rdd.zip(rdd1).collect

異常信息:

Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

2:zipPartitions

以分區(qū)為單位進行zip操作,要求分區(qū)數(shù)目相等。否則異常。

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2)
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7),2)
val func = (x:Iterator[Int], y:Iterator[Int])=>x.toSeq.++(y.toSeq).toIterator
rdd1.zipPartitions(rdd2)(func).collect

3:zipWithIndex

給RDD中的每一個元素添加上索引號,組成二元組。索引號從0開始并且索引號類型是Long,當RDD分區(qū)大于1個時候需要出發(fā)一個Spark Job。

4:zipWithUniqueId

var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
//rdd1有兩個分區(qū),
rdd1.zipWithUniqueId().collect
 Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//總分區(qū)數(shù)為2
//第一個分區(qū)第一個元素ID為0,第二個分區(qū)第一個元素ID為1
//第一個分區(qū)第二個元素ID為0+2=2,第一個分區(qū)第三個元素ID為2+2=4
//第二個分區(qū)第二個元素ID為1+2=3,第二個分區(qū)第三個元素ID為3+2=5

其實就是按照每一個的分區(qū)的每一個元素的順序進行編號。這個算子不需要出發(fā)作業(yè)到集群運行。

5:union

RDD求并集操作,不會自動去重。

res31: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd2.collect
collect   collectAsync

scala> rdd2.collect
res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

scala> rdd1.union(rdd2).collect
collect   collectAsync

scala> rdd1.union(rdd2).collect
res34: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7)//不去重

6:distinct

scala> unionRDD.collect
res38: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7)
scala> unionRDD.distinct.collect
res39: Array[Int] = Array(4, 1, 5, 6, 2, 3, 7)

實現(xiàn)去重。

7:treeReduce

treeReduce有點類似于reduce函數(shù),也不需要傳入初始值,只不過這個算子使用一個多層樹的形式來進行reduce操作。

scala> rdd1.collect
res42: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.treeReduce((x,y)=>x+y)
res43: Int = 21

8:aggregate

scala> rdd1.collect
res53: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.partitions.length
res54: Int = 2

scala> rdd1.aggregate(1)((x,y)=>x+y,(x,y)=>x+y)
res56: Int = 24

scala> rdd1.repartition(3).aggregate(1)((x,y)=>x+y,(x,y)=>x+y)
res57: Int = 25

我們設(shè)置的聚集函數(shù)的ZeroValue值是1,這個值會每一個分區(qū)聚集時候使用一次,最后在聚集所有分區(qū)時候在使用一次。

我們這里面分區(qū)內(nèi)部元素計算函數(shù)是:

(x,y)=>x+y

分區(qū)之間的聚集函數(shù):

(x,y)=>x+y

由于rdd1默認是2個分區(qū),所以在計算兩個分區(qū)時候使用兩次,相當于+1,最后合并兩個分區(qū)時候有使用一次,相當于再加1.所以一共加3,,即:

1+2+3+4+5+6=21,21+3 =24.另一個只因為多一個分區(qū),所以多累加1.

9:treeAggregate

和8中聚集算子效果一樣,只不過使用的是樹的層次結(jié)構(gòu)聚集。

10:top

返回前面n個最大元素,可以定義排序規(guī)則

11:takeSample

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

隨機采樣,抽取num個樣例。可以指定是否重復抽取,隨機數(shù)種子是一個生成隨機數(shù)的初始條件,可以使用系統(tǒng)時間戳作為種子值。

當不允許重復抽取時候,num數(shù)目大于rdd元素數(shù)目不會報錯,此時只會抽取rdd的所有元素。

12:takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

抽取出num個個最小的元素,唯一和top區(qū)別就是top抽取大的,takeOrdered抽取小的。

13:take

def take(num: Int): Array[T]

返回num個數(shù)據(jù),一般當數(shù)據(jù)較大的時候如果collect操作會導致Driver內(nèi)存溢出,所以此時可以使用take攜帶少量數(shù)據(jù)到Driver。

14:subtract

def subtract(other: RDD[T]): RDD[T]

返回一個在當前RDD中且不在other中的元素所生成的RDD

15:sortBy

def sortBy[K](f: (T) ? K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

例如:

scala> rdd1.collect
res19: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd2 = rdd1.map(x=>(x,scala.util.Random.nextInt(100),scala.util.Random.nextInt(100)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[33] at map at <console>:26

scala> rdd2.collect
collect   collectAsync

scala> rdd2.collect
res20: Array[(Int, Int, Int)] = Array((1,87,34), (2,5,62), (3,51,60), (4,72,33), (5,33,23))

scala> res13.sortBy(x=>x._3).collect
res21: Array[(Int, Int, Int)] = Array((3,26,12), (4,45,28), (1,12,37), (2,71,67), (5,80,96))

16:sample

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

隨機采樣,是否重復采樣,抽取數(shù)據(jù)的百分比例。

17:repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

重新創(chuàng)建一個只有numPartitions個分區(qū)的RDD,提高分區(qū)數(shù)或降低分區(qū)數(shù)會改變并行度,內(nèi)部實現(xiàn)實現(xiàn)需要shuffle。如果需要降低RDD的分區(qū)數(shù)的話盡可能使用coalesce算子,它會避免shuffle的發(fā)生。

18:coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

降低原來RDD的分區(qū)數(shù)目到numPartitions個分區(qū)。例如由1000個分區(qū)降到100個分區(qū)的話,這樣是一個窄依賴,因此不需要shuffle過程。

但是如果RDD原本有2個分區(qū)的話,當我們調(diào)用coalesce(5)的話,生成的RDD分區(qū)還將是2,不會增加,但是如果調(diào)用coalesce(1)的話,則會生成分區(qū)個數(shù)為1的RDD。(coalesce只會減少分區(qū)數(shù),不會增加分區(qū)數(shù))。

拓展:如果我們的RDD分區(qū)數(shù)為1的話,我們可以傳遞shuffle=true,當計算時候會進行shuflle分布到多個節(jié)點進行計算。

19:checkpoint

def checkpoint(): Unit

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext#setCheckpointDir and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

20:cartesian

def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

兩個RDD生成笛卡爾積。

scala> rdd1.collect
res37: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd2.collect
res38: Array[Int] = Array(6, 7, 8, 9, 10)

scala> rdd1.cartesian(rdd2).collect
res39: Array[(Int, Int)] = Array((1,6), (1,7), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,6), (3,7), (4,6), (4,7), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))

21:cache

def cache(): RDD.this.type

將RDD緩存,緩存級別為:MEMORY_ONLY

22:persist

def persist(): RDD.this.type

將RDD緩存,緩存級別為:MEMORY_ONLY

23:persist

def persist(newLevel: StorageLevel): RDD.this.type

指定緩存級別,在第一次被計算之后進行緩存。

24:keyBy

def keyBy[K](f: (T) ? K): RDD[(K, T)]

根據(jù)函數(shù)f進行選取key,例如:

scala> rdd1.collect
res43: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd1.keyBy(x=>x*x).collect
res44: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5))

25:intersection

def intersection(other: RDD[T]): RDD[T]

求兩個RDD的交集

總結(jié)

以上是生活随笔為你收集整理的Spark算子讲解(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 狠狠鲁狠狠干 | 欧美日韩精品一区二区 | 欧美一级做a爰片免费视频 成人激情在线观看 | 视频免费观看在线 | av5566| 日韩亚洲欧美一区二区 | 欧美日韩精品一区二区在线播放 | 99久| 69视频在线播放 | 国产女主播在线 | 欧美草逼网 | 久久福利视频网 | 国产欧美一区二区三区国产幕精品 | 大陆av在线 | 韩日av| 日本wwwwwww| 蜜臀av性久久久久蜜臀aⅴ | 亚洲精品乱码久久久久久国产主播 | 欧美日韩一二三 | 久久视频免费 | 射网站 | 国产一区二区在线精品 | 性视频免费| 欧美激情精品久久久久 | 美女久久久久 | 欧美日韩一区二区久久 | 性色av一区二区三区在线观看 | 日本高清不卡码 | 成人a级网站 | 亚洲精品视频二区 | 95香蕉视频 | 五级 黄 色 片 | 播放男人添女人下边视频 | 国产视频在线观看一区二区 | 亚洲码欧美码一区二区三区 | 91亚洲精品在线 | 147人体做爰大胆图片成人 | 国产精品国产三级国产专播精品人 | 中文字字幕在线中文 | 亚洲偷偷| 日本xxxwww| 欧美黑人一区 | 超碰成人免费 | 黄视频免费在线观看 | 国产午夜福利片 | 岛国精品资源网站 | 人妻熟女一区二区aⅴ水 | 日本色片网站 | 成人hd| 亚洲综合日韩 | 中文字幕国产精品 | 夜夜视频 | 一道本在线观看 | 欧美日韩在线免费观看视频 | 国产免费一区二区三区在线播放 | 草草免费视频 | 亚洲精品v天堂中文字幕 | 一级美女黄色片 | 91视频色 | 黄色的一级片 | 91最新入口| 亚洲少妇在线 | 欧美日韩中文在线观看 | 天天躁夜夜躁狠狠躁 | 青青草娱乐视频 | 性猛交娇小69hd | 午夜在线免费视频 | 久久这里只有精品99 | 欧美精品二区三区四区免费看视频 | 已满十八岁免费观看全集动漫 | 精品xxxxx| 天堂资源av| 中文av一区二区三区 | 91色视频在线 | 成人在线视频一区二区三区 | 国产亚洲欧美一区二区 | 麻豆视频免费 | 中文字幕在线国产 | 干干干操操操 | 天天摸夜夜添狠狠添婷婷 | 亚洲天堂av在线播放 | 成年人在线免费观看视频网站 | 亚洲精品乱码久久久久 | 网友自拍一区 | а中文在线天堂 | 深夜福利网址 | 丁香啪啪综合成人亚洲 | 乳揉みま痴汉4在线播放 | 免费福利av| 中文字幕亚洲无线码在线一区 | 先锋资源一区 | 色婷婷aⅴ一区二区三区 | 亚洲一级片免费 | 中文字幕一区二区三区乱码 | 成人av影院 | 爱福利视频一区二区 | jizz黄色片| www.youjizz.com久久 | 亚洲一区毛片 |