日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

大数据实战第十六课(上)-Spark-Core04

發(fā)布時(shí)間:2023/12/18 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据实战第十六课(上)-Spark-Core04 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、上次課回顧

二、Shuffle剖析

  • 2.1 Shuffle簡(jiǎn)介
  • 2.2 Shuffle背景
  • 2.3 Shuffle Performance Impact(Shuffle 性能上的影響)

三、shuffle在Spark-shell操作

  • 3.1 IDEA下進(jìn)行分組
  • 3.2 coalesce和repartition 在生產(chǎn)中的使用
  • 3.3 reduceByKey和groupByKey分析
  • 3.4 圖解reduceByKey和groupByKey的shuffle過(guò)程
  • 3.5 探究源碼reduceByKey和groupByKey的combiner

四、擴(kuò)展:aggregateByKey算子

  • 4.1 collectAsMap

一、上次課回顧

大數(shù)據(jù)實(shí)戰(zhàn)第十五課(上)之-Spark-Core03:
https://blog.csdn.net/zhikanjiani/article/details/91045640#id_4.2

  • 寬窄依賴定義,在容錯(cuò)方面定義
  • spark on yarn(client、cluster)
  • key-value編程
  • YARN HADOOP_CONF_DIR
    對(duì)于yarn模式是否需要在$SPARK_HOME/conf下的slaves下修改localhosts為Hadoop002,。

    跑yarn的時(shí)候只需要這臺(tái)機(jī)器作為客戶端就行了;為什么spark on yarn說(shuō)的是它僅僅只需要一個(gè)客戶端。

    問(wèn):Spark on yarn是否需要啟動(dòng)這些東西?

    在$SPARK_HOME/sbin/start-all.sh
    /start-master.sh start-slaves.sh slaves

    跑Spark on yarn,哐哐哐要把spark節(jié)點(diǎn)啟動(dòng)起來(lái)。

    只要gateway+spark submit就行了,根本不需要啟動(dòng)什么進(jìn)程就行。

    二、Shuffle剖析

    2.1 Shuffle簡(jiǎn)介

    • 回顧:一個(gè)action會(huì)觸發(fā)一個(gè)job,一個(gè)job遇到shuffle會(huì)分裂出一個(gè)stage,stage中是一堆task。

    參見(jiàn)官網(wǎng):http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations

    需求

  • 給了你一堆通話記錄call records ==> 統(tǒng)計(jì)本月打出去了多少電話
    進(jìn)入手機(jī)通話界面:通訊人、通話時(shí)間、通話時(shí)長(zhǎng)、通話記錄。

  • spark中統(tǒng)計(jì)分析都是基于wc,(天時(shí)間+撥打,1), 天時(shí)間+撥打作為一個(gè)key,進(jìn)行reduceByKey()操作。

  • 相同的天時(shí)間+撥打 ==> shuffle到同一個(gè)reduce上去,你能進(jìn)行累加操作么?是不能的

  • 引出:某一種具有特定特征的數(shù)據(jù)匯聚到某一個(gè)節(jié)點(diǎn)進(jìn)行計(jì)算,此處進(jìn)行+1操作
    注意:能避免shuffle的操作盡量避免。

    • Shuffle operations
      Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism(機(jī)制) for re-distributing data(重新分發(fā)數(shù)據(jù)) so that it’s grouped differently across partitions. This typically involves copying data across executors(拷貝數(shù)據(jù)到機(jī)器上,會(huì)有磁盤(pán)和網(wǎng)絡(luò)IO) and machines, making the shuffle a complex and costly operation(是的shuffle成為了一個(gè)復(fù)雜的并且成本高的操作).

    重新分發(fā)數(shù)據(jù)還跨分區(qū)的一個(gè)操作,這個(gè)典型的操作還涉及到拷貝數(shù)據(jù)到不同的機(jī)器上,還會(huì)有磁盤(pán)IO和網(wǎng)絡(luò)IO,所以shuffle是一個(gè)復(fù)雜的并且成本高的操作。

    2.2 Shuffle背景

  • To understand what happens during the shuffle we can consider the example of the reduceByKey operation.
    • 我們以reduceByKey來(lái)理解shuffle操作中會(huì)發(fā)生什么.
  • The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple
    • reduceByKey操作生成一個(gè)新的RDD,每一個(gè)key所對(duì)應(yīng)的的值都會(huì)被組合成一個(gè)元組
  • the key and the result of executing a reduce function against all values associated with that key
    • (相同特征的key會(huì)被分到一個(gè)reduce上去處理).
  • The challenge is that not all values for a single key necessarily reside on the same partition. or even the same machine, but they must be co-located to compute the result.
    • 不是所有的key對(duì)應(yīng)的value都是保存在相同的分區(qū)下的(帶來(lái)的挑戰(zhàn)是:結(jié)果是跨分區(qū)的,它們必須要在同一個(gè)地點(diǎn)協(xié)同工作。)
  • Operations which can cause a shuffle include repartition operations like repartition and coalese, ‘ByKey’ operations (except for counting)like groupByKey and reduceByKey, and join operations like cogroup and join.
    • 有哪些操作可能會(huì)產(chǎn)生一些Shuffle?

    2.3 Shuffle Performance Impact(性能上的影響)

  • The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O(磁盤(pán)IO、數(shù)據(jù)序列化、網(wǎng)絡(luò)IO). To organize data for the shuffle. Spark generates sets of tasks (Spark會(huì)產(chǎn)生一系列的task)- map tasks to organize the data(map task組織數(shù)據(jù)), and a set of reduce tasks to aggregate it(reduce task去聚合數(shù)據(jù)).This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations(這種方式來(lái)自于MapReduce,但是并沒(méi)有直接映射到map和reduce操作).
    • Spark產(chǎn)生一系列的task ==> spark會(huì)產(chǎn)生一堆的stage,shuffle產(chǎn)生新的stage,stage產(chǎn)生一堆的task
  • Internally,results from individual map tasks are kept in memory until they can’t fit,these are sorted based on the target partition and written to a single file. On the reduce side,tasks read the relevant sorted blocks.
    • 本質(zhì)上,獨(dú)立的map結(jié)果保存在內(nèi)存上,reduce端會(huì)讀取相關(guān)排序數(shù)據(jù)(map端輸出的)。

    三、Shuffle在Spark-shell操作

    1、啟動(dòng)Spark-shell:

    scala> val info = sc.textFile("hdfs://hadoop002:9000/wordcount/input/ruozeinput.txt") info: org.apache.spark.rdd.RDD[String] = hdfs://hadoop002:9000/wordcount/input/ruozeinput.txt MapPartitionsRDD[1] at textFile at <console>:24scala> info.partitions.length res0: Int = 2scala> val info1 = info.coalesce(1) info1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[2] at coalesce at <console>:25scala> info1.partitions.length res1: Int = 1scala> val info2 = info.coalesce(4) info2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[3] at coalesce at <console>:25scala> info2.partitions.length res2: Int = 2scala> val info3 = info.coalesce(4,true) info3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at coalesce at <console>:25scala> info3.partitions.length res3: Int = 4scala> info3.collect res4: Array[String] = Array(hello world, hello, hello world john)

    解釋coalesce方法、

  • def coalesce(numPartitions: Int, shuffle: Boolean = false,
    partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    (implicit ord: Ordering[T] = null)
    • 傳入一個(gè)分區(qū)數(shù),傳入一個(gè)true或者false,可傳可不傳,
  • def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
    }
    • 調(diào)用的就是coalesce,肯定是會(huì)僅從shuffle的。
  • 使用collect操作觸發(fā):
    • scala> info3.collect
      res4: Array[String] = Array(hello world, hello, hello world john)
  • 使用repartition操作:
    • scala> val info4 = info.repartition(5)
      info4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :25

    • scala> info4.collect
      res6: Array[String] = Array(hello world john, hello world, hello)

    • scala> info.partitions.length
      res7: Int = 2

    2個(gè)分區(qū)變?yōu)?個(gè)分區(qū),對(duì)數(shù)據(jù)重新做分發(fā),使用coalesce,避免你做一個(gè)shuffle的動(dòng)作

    3.1 IDEA下進(jìn)行分組:

    package spark01import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject RepartitionApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf()sparkConf.setAppName("LogApp").setMaster("local[2]")val sc = new SparkContext(sparkConf)val students = sc.parallelize(List("黃帆","梅宇豪","秦朗","楊朝珅","王乾","沈兆乘","沈其文","陳思文"),3)students.mapPartitionsWithIndex((index,partition) => {val stus = new ListBuffer[String]while(partition.hasNext){ //迭代分區(qū)stus += ("~~~~" + partition.next() + ",哪個(gè)組:" + (index+1))}stus.iterator}).foreach(println) //進(jìn)行打印sc.stop()}} mapPartitionWithIndex():意思是分分區(qū),加一個(gè)組編號(hào) 在parallelize中設(shè)置并行度,明確是3個(gè)組;

    需求一:

    部門裁員,三個(gè)組變成二個(gè)組,進(jìn)行如下修改:

    • students.mapPartitionsWithIndex((index,partition) ==>
      變更如下 :
      students.coalesce(2).mapPartitionsWithIndex((index,partition)

    需求二:

    部門裁員前是三個(gè)組,把他們重新分組變成5個(gè)組
    students.repartition(5).mapPartitionsWithIndex((index,partition)

    為了直觀顯示partition和repartition操作:

    可以運(yùn)行如下代碼:

    package Sparkcore04import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject RepartitionApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf();sparkConf.setAppName("LogApp").setMaster("local[2]");val sc = new SparkContext(sparkConf);val students = sc.parallelize(List("梅宇豪","黃帆","楊超神","薛思雨","朱昱璇","周一虹","王曉嵐","沈兆乘","陳思文"),3);students.mapPartitionsWithIndex((index,partition) =>{val stus = new ListBuffer[String]while(partition.hasNext){stus += ("~~~~" + partition.next() + ",哪個(gè)組:" + (index+1))}stus.iterator}).foreach(println)println("---------------------------分割線---------------------------")students.repartition(4).mapPartitionsWithIndex((index,partition) => {val stus = new ListBuffer[String]while(partition.hasNext) {stus += ("~~~" + partition.next() + ",新組" + (index+1))}stus.iterator}).foreach(println)sc.stop()}}

    3.2 coalesce和repartition 在生產(chǎn)中的使用:

  • 假設(shè)一個(gè)RDD中有300個(gè)分區(qū),每個(gè)分區(qū)中只有一條記錄"id=100“,

  • 此時(shí)做了一個(gè)filter操作(id > 99),結(jié)果就是還是有300個(gè)partition,每個(gè)partition中只有一條數(shù)據(jù)

  • 變換起始條件:

  • 原來(lái)300個(gè)partition,每個(gè)partition有10萬(wàn)條數(shù)據(jù), 還是做了filter操作(id > 99),輸出出來(lái)每個(gè)文件只有一條數(shù)據(jù);
  • 如果此時(shí)coalesce(1),以此來(lái)進(jìn)行收斂,對(duì)小文件好很多。分區(qū)數(shù)決定了最終輸出的文件個(gè)數(shù)。
    • rePartition應(yīng)用場(chǎng)景:可以把數(shù)據(jù)打散,提升并行度。

    3.3 ReduceByKey和groupByKey分析

    1、手寫(xiě)一個(gè)word count:

    在secureCRT上啟動(dòng)spark-shell --master local[2]
    執(zhí)行如下:sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).reduceByKey(+).collect
    查看DAG圖:第一個(gè)算子textFile、第二個(gè)算子flatMap、第三個(gè)算子map,遇到reduceByKey,一拆前面一個(gè)stage后面一個(gè)stage


    兩個(gè)stage,做reduceByKey的時(shí)候按照(_,1)的數(shù)據(jù)先寫(xiě)出來(lái),再讀進(jìn)去。
    reduceByKey的數(shù)據(jù)結(jié)構(gòu)是:[String,Int]:代表的是單詞出現(xiàn)的個(gè)數(shù)

    2、reduceByKey和groupByKey的數(shù)據(jù)結(jié)構(gòu):

    • scala> sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).reduceByKey(+)
      res4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at :25

    • scala> sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).groupByKey()
      res5: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[14] at groupByKey at :25

    reduceByKey完成wordcount:

  • scala> sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).reduceByKey(+).collect
    res10: Array[(String, Int)] = Array((hello,3), (world,2), (john,1))
  • groupByKey完成wordcount:

  • scala> sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).groupByKey().map( x=> (x._1,x._2.sum)).collect
    res11: Array[(String, Int)] = Array((hello,3), (world,2), (john,1))
  • 小結(jié):

    對(duì)比UI中的兩張圖:reduceByKey讀進(jìn)來(lái)53B,shuffle的數(shù)據(jù)161B;而groupBykey讀進(jìn)來(lái)的數(shù)據(jù)是53B,shuffle的數(shù)據(jù)卻是172B.

  • groupByKey所有的數(shù)據(jù)未經(jīng)計(jì)算

  • reduceByKey做了局部聚合操作,本地做了combiner,combiner的結(jié)果再經(jīng)過(guò)shuffle,所以數(shù)據(jù)量會(huì)少一些。

  • 3.4 圖解reduceByKey和groupByKey的shuffle過(guò)程

    假設(shè)有三個(gè)map的數(shù)據(jù):第一個(gè)(a,1)(b,1) 第二個(gè):(a,1)(b,1) (a,1)(b,1) 第三個(gè):(a,1)(b,1) (a,1)(b,1) (a,1)(b,1)

    groupByKey的shuffle過(guò)程:

    reduceByKey的shuffle過(guò)程:


    為啥reduceByKey的數(shù)據(jù)量要少一點(diǎn),因?yàn)樵趍ap端先做了聚合減少了shuffle的數(shù)據(jù)量。

    擴(kuò)展aggregateByKey算子:

    有些方法使用reduceByKey解決不了,引出新的算子:

    源碼面前了無(wú)秘密:

    groupByKey中的源碼:

    在pairRDDFunctions.scala中定義的groupByKey方法:

    • def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
      // groupByKey shouldn’t use map side combine because map side combine does not
      // reduce the amount of data shuffled and requires all map side data be inserted
      // into a hash table, leading to more objects in the old gen.
      val createCombiner = (v: V) => CompactBuffer(v)
      val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
      val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
      val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
      bufs.asInstanceOf[RDD[(K, Iterable[V])]]
      }

    我們注意到combine的默認(rèn)值就是false.

    reduceByKey中的源碼:

    • def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
      require(mergeCombiners != null, “mergeCombiners must be defined”) // required as of Spark 0.9.0

    我們注意到combine的默認(rèn)值就是true.

    4.1 collectAsMap

    注釋:所有的數(shù)據(jù)都會(huì)被加載到driver的內(nèi)存,會(huì)扛不住掛掉

    /*** Return the key-value pairs in this RDD to the master as a Map.** Warning: this doesn't return a multimap (so if you have multiple values to the same key, only* one value per key is preserved in the map returned)** @note this method should only be used if the resulting data is expected to be small, as* all the data is loaded into the driver's memory.*/def collectAsMap(): Map[K, V] = self.withScope {val data = self.collect()val map = new mutable.HashMap[K, V]map.sizeHint(data.length)data.foreach { pair => map.put(pair._1, pair._2) }map}

    在RDD.scala中:

    記住:只要看到了源碼中有runJob,那么它一定就會(huì)觸發(fā)action.

    /*** Return the number of elements in the RDD.*/def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum /*** Return an array that contains all of the elements in this RDD.** @note This method should only be used if the resulting array is expected to be small, as* all the data is loaded into the driver's memory.*/def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)}

    Array.concat(results: _) ==> 這邊并不是可變參數(shù)
    點(diǎn)擊concat進(jìn)入下一層源碼:
    -def concat[T: ClassTag](xss: Array[T]): Array[T] //這個(gè)才是可變參數(shù)的定義

    在Scala04課程中有所體現(xiàn)。
    println(sum(1.to(10) :_* ))

    總結(jié)

    以上是生活随笔為你收集整理的大数据实战第十六课(上)-Spark-Core04的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。