大数据实战第十六课(上)-Spark-Core04
一、上次課回顧
二、Shuffle剖析
- 2.1 Shuffle簡(jiǎn)介
- 2.2 Shuffle背景
- 2.3 Shuffle Performance Impact(Shuffle 性能上的影響)
三、shuffle在Spark-shell操作
- 3.1 IDEA下進(jìn)行分組
- 3.2 coalesce和repartition 在生產(chǎn)中的使用
- 3.3 reduceByKey和groupByKey分析
- 3.4 圖解reduceByKey和groupByKey的shuffle過(guò)程
- 3.5 探究源碼reduceByKey和groupByKey的combiner
四、擴(kuò)展:aggregateByKey算子
- 4.1 collectAsMap
一、上次課回顧
大數(shù)據(jù)實(shí)戰(zhàn)第十五課(上)之-Spark-Core03:
https://blog.csdn.net/zhikanjiani/article/details/91045640#id_4.2
YARN HADOOP_CONF_DIR
對(duì)于yarn模式是否需要在$SPARK_HOME/conf下的slaves下修改localhosts為Hadoop002,。
跑yarn的時(shí)候只需要這臺(tái)機(jī)器作為客戶端就行了;為什么spark on yarn說(shuō)的是它僅僅只需要一個(gè)客戶端。
問(wèn):Spark on yarn是否需要啟動(dòng)這些東西?
在$SPARK_HOME/sbin/start-all.sh
/start-master.sh start-slaves.sh slaves
跑Spark on yarn,哐哐哐要把spark節(jié)點(diǎn)啟動(dòng)起來(lái)。
只要gateway+spark submit就行了,根本不需要啟動(dòng)什么進(jìn)程就行。
二、Shuffle剖析
2.1 Shuffle簡(jiǎn)介
- 回顧:一個(gè)action會(huì)觸發(fā)一個(gè)job,一個(gè)job遇到shuffle會(huì)分裂出一個(gè)stage,stage中是一堆task。
參見(jiàn)官網(wǎng):http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations
需求:
給了你一堆通話記錄call records ==> 統(tǒng)計(jì)本月打出去了多少電話
進(jìn)入手機(jī)通話界面:通訊人、通話時(shí)間、通話時(shí)長(zhǎng)、通話記錄。
spark中統(tǒng)計(jì)分析都是基于wc,(天時(shí)間+撥打,1), 天時(shí)間+撥打作為一個(gè)key,進(jìn)行reduceByKey()操作。
相同的天時(shí)間+撥打 ==> shuffle到同一個(gè)reduce上去,你能進(jìn)行累加操作么?是不能的
引出:某一種具有特定特征的數(shù)據(jù)匯聚到某一個(gè)節(jié)點(diǎn)進(jìn)行計(jì)算,此處進(jìn)行+1操作
注意:能避免shuffle的操作盡量避免。
- Shuffle operations
Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism(機(jī)制) for re-distributing data(重新分發(fā)數(shù)據(jù)) so that it’s grouped differently across partitions. This typically involves copying data across executors(拷貝數(shù)據(jù)到機(jī)器上,會(huì)有磁盤(pán)和網(wǎng)絡(luò)IO) and machines, making the shuffle a complex and costly operation(是的shuffle成為了一個(gè)復(fù)雜的并且成本高的操作).
重新分發(fā)數(shù)據(jù)還跨分區(qū)的一個(gè)操作,這個(gè)典型的操作還涉及到拷貝數(shù)據(jù)到不同的機(jī)器上,還會(huì)有磁盤(pán)IO和網(wǎng)絡(luò)IO,所以shuffle是一個(gè)復(fù)雜的并且成本高的操作。
2.2 Shuffle背景
- 我們以reduceByKey來(lái)理解shuffle操作中會(huì)發(fā)生什么.
- reduceByKey操作生成一個(gè)新的RDD,每一個(gè)key所對(duì)應(yīng)的的值都會(huì)被組合成一個(gè)元組
- (相同特征的key會(huì)被分到一個(gè)reduce上去處理).
- 不是所有的key對(duì)應(yīng)的value都是保存在相同的分區(qū)下的(帶來(lái)的挑戰(zhàn)是:結(jié)果是跨分區(qū)的,它們必須要在同一個(gè)地點(diǎn)協(xié)同工作。)
- 有哪些操作可能會(huì)產(chǎn)生一些Shuffle?
2.3 Shuffle Performance Impact(性能上的影響)
- Spark產(chǎn)生一系列的task ==> spark會(huì)產(chǎn)生一堆的stage,shuffle產(chǎn)生新的stage,stage產(chǎn)生一堆的task
- 本質(zhì)上,獨(dú)立的map結(jié)果保存在內(nèi)存上,reduce端會(huì)讀取相關(guān)排序數(shù)據(jù)(map端輸出的)。
三、Shuffle在Spark-shell操作
1、啟動(dòng)Spark-shell:
scala> val info = sc.textFile("hdfs://hadoop002:9000/wordcount/input/ruozeinput.txt") info: org.apache.spark.rdd.RDD[String] = hdfs://hadoop002:9000/wordcount/input/ruozeinput.txt MapPartitionsRDD[1] at textFile at <console>:24scala> info.partitions.length res0: Int = 2scala> val info1 = info.coalesce(1) info1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[2] at coalesce at <console>:25scala> info1.partitions.length res1: Int = 1scala> val info2 = info.coalesce(4) info2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[3] at coalesce at <console>:25scala> info2.partitions.length res2: Int = 2scala> val info3 = info.coalesce(4,true) info3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at coalesce at <console>:25scala> info3.partitions.length res3: Int = 4scala> info3.collect res4: Array[String] = Array(hello world, hello, hello world john)解釋coalesce方法、
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
- 傳入一個(gè)分區(qū)數(shù),傳入一個(gè)true或者false,可傳可不傳,
coalesce(numPartitions, shuffle = true)
}
- 調(diào)用的就是coalesce,肯定是會(huì)僅從shuffle的。
- scala> info3.collect
res4: Array[String] = Array(hello world, hello, hello world john)
-
scala> val info4 = info.repartition(5)
info4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :25 -
scala> info4.collect
res6: Array[String] = Array(hello world john, hello world, hello) -
scala> info.partitions.length
res7: Int = 2
2個(gè)分區(qū)變?yōu)?個(gè)分區(qū),對(duì)數(shù)據(jù)重新做分發(fā),使用coalesce,避免你做一個(gè)shuffle的動(dòng)作
3.1 IDEA下進(jìn)行分組:
package spark01import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject RepartitionApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf()sparkConf.setAppName("LogApp").setMaster("local[2]")val sc = new SparkContext(sparkConf)val students = sc.parallelize(List("黃帆","梅宇豪","秦朗","楊朝珅","王乾","沈兆乘","沈其文","陳思文"),3)students.mapPartitionsWithIndex((index,partition) => {val stus = new ListBuffer[String]while(partition.hasNext){ //迭代分區(qū)stus += ("~~~~" + partition.next() + ",哪個(gè)組:" + (index+1))}stus.iterator}).foreach(println) //進(jìn)行打印sc.stop()}} mapPartitionWithIndex():意思是分分區(qū),加一個(gè)組編號(hào) 在parallelize中設(shè)置并行度,明確是3個(gè)組;需求一:
部門裁員,三個(gè)組變成二個(gè)組,進(jìn)行如下修改:
- students.mapPartitionsWithIndex((index,partition) ==>
變更如下 :
students.coalesce(2).mapPartitionsWithIndex((index,partition)
需求二:
部門裁員前是三個(gè)組,把他們重新分組變成5個(gè)組
students.repartition(5).mapPartitionsWithIndex((index,partition)
為了直觀顯示partition和repartition操作:
可以運(yùn)行如下代碼:
package Sparkcore04import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject RepartitionApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf();sparkConf.setAppName("LogApp").setMaster("local[2]");val sc = new SparkContext(sparkConf);val students = sc.parallelize(List("梅宇豪","黃帆","楊超神","薛思雨","朱昱璇","周一虹","王曉嵐","沈兆乘","陳思文"),3);students.mapPartitionsWithIndex((index,partition) =>{val stus = new ListBuffer[String]while(partition.hasNext){stus += ("~~~~" + partition.next() + ",哪個(gè)組:" + (index+1))}stus.iterator}).foreach(println)println("---------------------------分割線---------------------------")students.repartition(4).mapPartitionsWithIndex((index,partition) => {val stus = new ListBuffer[String]while(partition.hasNext) {stus += ("~~~" + partition.next() + ",新組" + (index+1))}stus.iterator}).foreach(println)sc.stop()}}3.2 coalesce和repartition 在生產(chǎn)中的使用:
假設(shè)一個(gè)RDD中有300個(gè)分區(qū),每個(gè)分區(qū)中只有一條記錄"id=100“,
此時(shí)做了一個(gè)filter操作(id > 99),結(jié)果就是還是有300個(gè)partition,每個(gè)partition中只有一條數(shù)據(jù)
變換起始條件:
- rePartition應(yīng)用場(chǎng)景:可以把數(shù)據(jù)打散,提升并行度。
3.3 ReduceByKey和groupByKey分析
1、手寫(xiě)一個(gè)word count:
在secureCRT上啟動(dòng)spark-shell --master local[2]
執(zhí)行如下:sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).reduceByKey(+).collect
查看DAG圖:第一個(gè)算子textFile、第二個(gè)算子flatMap、第三個(gè)算子map,遇到reduceByKey,一拆前面一個(gè)stage后面一個(gè)stage
兩個(gè)stage,做reduceByKey的時(shí)候按照(_,1)的數(shù)據(jù)先寫(xiě)出來(lái),再讀進(jìn)去。
reduceByKey的數(shù)據(jù)結(jié)構(gòu)是:[String,Int]:代表的是單詞出現(xiàn)的個(gè)數(shù)
2、reduceByKey和groupByKey的數(shù)據(jù)結(jié)構(gòu):
-
scala> sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).reduceByKey(+)
res4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at :25 -
scala> sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).groupByKey()
res5: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[14] at groupByKey at :25
reduceByKey完成wordcount:
res10: Array[(String, Int)] = Array((hello,3), (world,2), (john,1))
groupByKey完成wordcount:
res11: Array[(String, Int)] = Array((hello,3), (world,2), (john,1))
小結(jié):
對(duì)比UI中的兩張圖:reduceByKey讀進(jìn)來(lái)53B,shuffle的數(shù)據(jù)161B;而groupBykey讀進(jìn)來(lái)的數(shù)據(jù)是53B,shuffle的數(shù)據(jù)卻是172B.
groupByKey所有的數(shù)據(jù)未經(jīng)計(jì)算
reduceByKey做了局部聚合操作,本地做了combiner,combiner的結(jié)果再經(jīng)過(guò)shuffle,所以數(shù)據(jù)量會(huì)少一些。
3.4 圖解reduceByKey和groupByKey的shuffle過(guò)程
假設(shè)有三個(gè)map的數(shù)據(jù):第一個(gè)(a,1)(b,1) 第二個(gè):(a,1)(b,1) (a,1)(b,1) 第三個(gè):(a,1)(b,1) (a,1)(b,1) (a,1)(b,1)groupByKey的shuffle過(guò)程:
reduceByKey的shuffle過(guò)程:
為啥reduceByKey的數(shù)據(jù)量要少一點(diǎn),因?yàn)樵趍ap端先做了聚合減少了shuffle的數(shù)據(jù)量。
擴(kuò)展aggregateByKey算子:
有些方法使用reduceByKey解決不了,引出新的算子:
源碼面前了無(wú)秘密:
groupByKey中的源碼:
在pairRDDFunctions.scala中定義的groupByKey方法:
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn’t use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
我們注意到combine的默認(rèn)值就是false.
reduceByKey中的源碼:
- def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, “mergeCombiners must be defined”) // required as of Spark 0.9.0
我們注意到combine的默認(rèn)值就是true.
4.1 collectAsMap
注釋:所有的數(shù)據(jù)都會(huì)被加載到driver的內(nèi)存,會(huì)扛不住掛掉
/*** Return the key-value pairs in this RDD to the master as a Map.** Warning: this doesn't return a multimap (so if you have multiple values to the same key, only* one value per key is preserved in the map returned)** @note this method should only be used if the resulting data is expected to be small, as* all the data is loaded into the driver's memory.*/def collectAsMap(): Map[K, V] = self.withScope {val data = self.collect()val map = new mutable.HashMap[K, V]map.sizeHint(data.length)data.foreach { pair => map.put(pair._1, pair._2) }map}在RDD.scala中:
記住:只要看到了源碼中有runJob,那么它一定就會(huì)觸發(fā)action.
/*** Return the number of elements in the RDD.*/def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum /*** Return an array that contains all of the elements in this RDD.** @note This method should only be used if the resulting array is expected to be small, as* all the data is loaded into the driver's memory.*/def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)}Array.concat(results: _) ==> 這邊并不是可變參數(shù)
點(diǎn)擊concat進(jìn)入下一層源碼:
-def concat[T: ClassTag](xss: Array[T]): Array[T] //這個(gè)才是可變參數(shù)的定義
在Scala04課程中有所體現(xiàn)。
println(sum(1.to(10) :_* ))
總結(jié)
以上是生活随笔為你收集整理的大数据实战第十六课(上)-Spark-Core04的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 转角遇到爱之经典对白与漫画文字
- 下一篇: 线性规划(一):基本概念