Spark算子讲解(一)
1:Zip算子
def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
將兩個RDD做zip操作,如果當兩個RDD分區(qū)數(shù)目不一樣的話或每一個分區(qū)數(shù)目不一樣的話則會異常。
例如:
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2) val rdd2 = sc.parallelize(Array(1,2,3,4,5,6),3) rdd.zip(rdd1).collect
異常信息:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 3) at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
例如:
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2) val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7),2) rdd.zip(rdd1).collect
異常信息:
Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
2:zipPartitions
以分區(qū)為單位進行zip操作,要求分區(qū)數(shù)目相等。否則異常。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2) val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7),2) val func = (x:Iterator[Int], y:Iterator[Int])=>x.toSeq.++(y.toSeq).toIterator rdd1.zipPartitions(rdd2)(func).collect
3:zipWithIndex
給RDD中的每一個元素添加上索引號,組成二元組。索引號從0開始并且索引號類型是Long,當RDD分區(qū)大于1個時候需要出發(fā)一個Spark Job。
4:zipWithUniqueId
var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
//rdd1有兩個分區(qū),
rdd1.zipWithUniqueId().collect
Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//總分區(qū)數(shù)為2
//第一個分區(qū)第一個元素ID為0,第二個分區(qū)第一個元素ID為1
//第一個分區(qū)第二個元素ID為0+2=2,第一個分區(qū)第三個元素ID為2+2=4
//第二個分區(qū)第二個元素ID為1+2=3,第二個分區(qū)第三個元素ID為3+2=5
其實就是按照每一個的分區(qū)的每一個元素的順序進行編號。這個算子不需要出發(fā)作業(yè)到集群運行。
5:union
RDD求并集操作,不會自動去重。
res31: Array[Int] = Array(1, 2, 3, 4, 5, 6) scala> rdd2.collect collect collectAsync scala> rdd2.collect res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7) scala> rdd1.union(rdd2).collect collect collectAsync scala> rdd1.union(rdd2).collect res34: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7)//不去重
6:distinct
scala> unionRDD.collect res38: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7) scala> unionRDD.distinct.collect res39: Array[Int] = Array(4, 1, 5, 6, 2, 3, 7)
實現(xiàn)去重。
7:treeReduce
treeReduce有點類似于reduce函數(shù),也不需要傳入初始值,只不過這個算子使用一個多層樹的形式來進行reduce操作。
scala> rdd1.collect res42: Array[Int] = Array(1, 2, 3, 4, 5, 6) scala> rdd1.treeReduce((x,y)=>x+y) res43: Int = 21
8:aggregate
scala> rdd1.collect res53: Array[Int] = Array(1, 2, 3, 4, 5, 6) scala> rdd1.partitions.length res54: Int = 2 scala> rdd1.aggregate(1)((x,y)=>x+y,(x,y)=>x+y) res56: Int = 24 scala> rdd1.repartition(3).aggregate(1)((x,y)=>x+y,(x,y)=>x+y) res57: Int = 25
我們設(shè)置的聚集函數(shù)的ZeroValue值是1,這個值會每一個分區(qū)聚集時候使用一次,最后在聚集所有分區(qū)時候在使用一次。
我們這里面分區(qū)內(nèi)部元素計算函數(shù)是:
(x,y)=>x+y
分區(qū)之間的聚集函數(shù):
(x,y)=>x+y
由于rdd1默認是2個分區(qū),所以在計算兩個分區(qū)時候使用兩次,相當于+1,最后合并兩個分區(qū)時候有使用一次,相當于再加1.所以一共加3,,即:
1+2+3+4+5+6=21,21+3 =24.另一個只因為多一個分區(qū),所以多累加1.
9:treeAggregate
和8中聚集算子效果一樣,只不過使用的是樹的層次結(jié)構(gòu)聚集。
10:top
返回前面n個最大元素,可以定義排序規(guī)則
11:takeSample
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
隨機采樣,抽取num個樣例。可以指定是否重復抽取,隨機數(shù)種子是一個生成隨機數(shù)的初始條件,可以使用系統(tǒng)時間戳作為種子值。
當不允許重復抽取時候,num數(shù)目大于rdd元素數(shù)目不會報錯,此時只會抽取rdd的所有元素。
12:takeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
抽取出num個個最小的元素,唯一和top區(qū)別就是top抽取大的,takeOrdered抽取小的。
13:take
def take(num: Int): Array[T]
返回num個數(shù)據(jù),一般當數(shù)據(jù)較大的時候如果collect操作會導致Driver內(nèi)存溢出,所以此時可以使用take攜帶少量數(shù)據(jù)到Driver。
14:subtract
def subtract(other: RDD[T]): RDD[T]
返回一個在當前RDD中且不在other中的元素所生成的RDD
15:sortBy
def sortBy[K](f: (T) ? K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
例如:
scala> rdd1.collect res19: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd2 = rdd1.map(x=>(x,scala.util.Random.nextInt(100),scala.util.Random.nextInt(100))) rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[33] at map at <console>:26 scala> rdd2.collect collect collectAsync scala> rdd2.collect res20: Array[(Int, Int, Int)] = Array((1,87,34), (2,5,62), (3,51,60), (4,72,33), (5,33,23)) scala> res13.sortBy(x=>x._3).collect res21: Array[(Int, Int, Int)] = Array((3,26,12), (4,45,28), (1,12,37), (2,71,67), (5,80,96))
16:sample
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
隨機采樣,是否重復采樣,抽取數(shù)據(jù)的百分比例。
17:repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
重新創(chuàng)建一個只有numPartitions個分區(qū)的RDD,提高分區(qū)數(shù)或降低分區(qū)數(shù)會改變并行度,內(nèi)部實現(xiàn)實現(xiàn)需要shuffle。如果需要降低RDD的分區(qū)數(shù)的話盡可能使用coalesce算子,它會避免shuffle的發(fā)生。
18:coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
降低原來RDD的分區(qū)數(shù)目到numPartitions個分區(qū)。例如由1000個分區(qū)降到100個分區(qū)的話,這樣是一個窄依賴,因此不需要shuffle過程。
但是如果RDD原本有2個分區(qū)的話,當我們調(diào)用coalesce(5)的話,生成的RDD分區(qū)還將是2,不會增加,但是如果調(diào)用coalesce(1)的話,則會生成分區(qū)個數(shù)為1的RDD。(coalesce只會減少分區(qū)數(shù),不會增加分區(qū)數(shù))。
拓展:如果我們的RDD分區(qū)數(shù)為1的話,我們可以傳遞shuffle=true,當計算時候會進行shuflle分布到多個節(jié)點進行計算。
19:checkpoint
def checkpoint(): Unit
Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext#setCheckpointDir and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
20:cartesian
def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
兩個RDD生成笛卡爾積。
scala> rdd1.collect res37: Array[Int] = Array(1, 2, 3, 4, 5) scala> rdd2.collect res38: Array[Int] = Array(6, 7, 8, 9, 10) scala> rdd1.cartesian(rdd2).collect res39: Array[(Int, Int)] = Array((1,6), (1,7), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,6), (3,7), (4,6), (4,7), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))
21:cache
def cache(): RDD.this.type
將RDD緩存,緩存級別為:MEMORY_ONLY
22:persist
def persist(): RDD.this.type
將RDD緩存,緩存級別為:MEMORY_ONLY
23:persist
def persist(newLevel: StorageLevel): RDD.this.type
指定緩存級別,在第一次被計算之后進行緩存。
24:keyBy
def keyBy[K](f: (T) ? K): RDD[(K, T)]
根據(jù)函數(shù)f進行選取key,例如:
scala> rdd1.collect res43: Array[Int] = Array(1, 2, 3, 4, 5) scala> rdd1.keyBy(x=>x*x).collect res44: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5))
25:intersection
def intersection(other: RDD[T]): RDD[T]
求兩個RDD的交集
總結(jié)
以上是生活随笔為你收集整理的Spark算子讲解(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 爆炒翅尖怎么做
- 下一篇: 高锰酸钾可以治脚气吗 用什么泡脚治脚气效