Spark API 详解(转)
spark中,slice=partition,一個slice對應一個task,啟動task的數量上限取決于集群中核的數量
sc.parallelize(0?until numMappers, numMappers)中的numMappers就是slice的數量[1]
下面的圖來自[3]
在spark調優中,增大RDD分區數目,可以增大任務并行度
?
map(function)?
map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD。
任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。
scala> val a = sc.parallelize(1 to 9, 3)//這里的3表示的是把數據分成幾份 a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24scala> val b = a.map(x => x*2) b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25scala> a.collect res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> b.collect res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)當然map也可以把Key變成Key-Value對
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2) a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24scala> val b = a.map(x => (x, 1)) b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:25scala> b.collect.foreach(println(_)) (dog,1) (tiger,1) (lion,1) (cat,1) (panther,1) ( eagle,1)mapPartitions(function)?
map()的輸入函數是應用于RDD中每個元素,
而mapPartitions()的輸入函數是應用于每個Partition(一個RDD可以對應于多個Partition)
關于這個具體案例可以參見[4]
?
?
mapValues(function)?
原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函數只適用于元素為KV對的RDD
mapWith和flatMapWith?
感覺用得不多,參考http://blog.csdn.net/jewes/article/details/39896301
?
flatMap(function)?
與map類似,區別是原RDD中的元素經map處理后只能生成一個元素,而原RDD中的元素經flatmap處理后可生成多個元素
注意,上面的flatMap中的字符串,其實是一個函數
?
flatMapValues(function)
scala> val a = sc.parallelize(List((1,2),(3,4),(5,6))) a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val b = a.flatMapValues(x=>1 to x) b: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[8] at flatMapValues at <console>:25scala> b.collect.foreach(println(_)) (1,1) (1,2) (3,1) (3,2) (3,3) (3,4) (5,1) (5,2) (5,3) (5,4) (5,5) (5,6)?
參考鏈接:
[1]https://blog.csdn.net/robbyo/article/details/50623339
[2]https://blog.csdn.net/guotong1988/article/details/50555185
[3]https://blog.csdn.net/weixin_38750084/article/details/83021819
[4]https://blog.csdn.net/appleyuchi/article/details/88371867
總結
以上是生活随笔為你收集整理的Spark API 详解(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 命令行中只用scala来运行一个spar
- 下一篇: scala中的case中的变量为啥没有定