日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

031 广播变量与累加器

發(fā)布時(shí)間:2025/4/16 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 031 广播变量与累加器 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.廣播變量機(jī)制

  將傳遞給task的值,變成傳遞給executor。

  為什么可以共用,因?yàn)閠ask是executor下的線程。

  只讀的變量,在task中不允許修改

  

?

2.累加器介紹

  在只寫(xiě)的變量,在task中只允許被修改,不允許讀的操作。

  但是在driver中就只能讀操作。

  

?

?3.程序

  需求一:對(duì)應(yīng)于MR中的累加器,累積計(jì)算次數(shù)

  需求二:將累加器做成共享變量來(lái)使用。避免了shuffle過(guò)程,提高了效率。

1 package com.ibeifeng.senior.accumulator 2 3 import org.apache.spark.{AccumulableParam, SparkConf, SparkContext} 4 5 import scala.collection.mutable 6 import scala.util.Random 7 8 /** 9 * Spark累加器 10 * Created by ibf on 02/15. 11 */ 12 object AccumulatorDemo { 13 def main(args: Array[String]): Unit = { 14 val conf = new SparkConf() 15 //.setMaster("local[*]") // local模式下默認(rèn)不進(jìn)行失敗重啟機(jī)制 16 .setMaster("local[*,4]") // 開(kāi)啟local模式的失敗重啟機(jī)制,重啟次數(shù)4-1=3次 17 .setAppName("accumulator") 18 val sc = SparkContext.getOrCreate(conf) 19 20 // =============================== 21 val rdd = sc.parallelize(Array( 22 "hadoop,spark,hbase", 23 "spark,hbase,hadoop", 24 "", 25 "spark,hive,hue", 26 "spark,hadoop", 27 "spark,,hadoop,hive", 28 "spark,hbase,hive", 29 "hadoop,hbase,hive", 30 "hive,hbase,spark,hadoop", 31 "hive,hbase,hadoop,hue" 32 ), 5) 33 34 // 需求一:實(shí)現(xiàn)WordCount程序,同時(shí)統(tǒng)計(jì)輸入的記錄數(shù)量以及最終輸出結(jié)果的數(shù)量 35 val inputRecords = sc.accumulator(0, "Input Record Size") 36 val outputRecords = sc.accumulator(0, "Output Record Size") 37 rdd.flatMap(line => { 38 // 累計(jì)數(shù)量 39 inputRecords += 1 40 val nline = if (line == null) "" else line 41 // 進(jìn)行數(shù)據(jù)分割、過(guò)濾、數(shù)據(jù)轉(zhuǎn)換 42 nline.split(",") 43 .map(word => (word.trim, 1)) // 數(shù)據(jù)轉(zhuǎn)換 44 .filter(_._1.nonEmpty) // word非空,進(jìn)行數(shù)據(jù)過(guò)濾 45 }) 46 .reduceByKey(_ + _) 47 .foreachPartition(iter => { 48 iter.foreach(record => { 49 // 累計(jì)數(shù)據(jù) 50 outputRecords += 1 51 println(record) 52 }) 53 }) 54 55 println(s"Input Size:${inputRecords.value}") 56 println(s"Ouput Size:${outputRecords.value}") 57



58 // 需求二:假設(shè)wordcount的最終結(jié)果可以在driver/executor節(jié)點(diǎn)的內(nèi)存中保存下,要求不通過(guò)reduceByKey相關(guān)API實(shí)現(xiàn)wordcount程序 59 /** 60 * 1. 每個(gè)分區(qū)進(jìn)行wordcount的統(tǒng)計(jì),將結(jié)果保存到累加器中 61 * 2. 當(dāng)分區(qū)全部執(zhí)行完后,各個(gè)分區(qū)的累加器數(shù)據(jù)進(jìn)行聚合操作 62 */ 63 val mapAccumulable = sc.accumulable(mutable.Map[String, Int]())(MapAccumulableParam)//MapAccumulableParam是強(qiáng)制轉(zhuǎn)換 64 try 65 rdd.foreachPartition(iter => { 66 val index = Random.nextInt(2) // index的取值范圍[0,1] 67 iter.foreach(line => { 68 val r = 1 / index 69 print(r) 70 val nline = if (line == null) "" else line 71 // 進(jìn)行數(shù)據(jù)分割、過(guò)濾、數(shù)據(jù)轉(zhuǎn)換 72 nline.split(",") 73 .filter(_.trim.nonEmpty) // 過(guò)濾空單詞 74 .map(word => { 75 mapAccumulable += word // 統(tǒng)計(jì)word出現(xiàn)的次數(shù) 76 }) 77 }) 78 }) 79 catch { 80 case e: Exception => println(s"異常:${e.getMessage}") 81 } 82 println("result================") 83 mapAccumulable.value.foreach(println) 84 85 Thread.sleep(100000) 86 } 87 } 88 89 90 object MapAccumulableParam extends AccumulableParam[mutable.Map[String, Int], String] { 91 /** 92 * 添加一個(gè)string的元素到累加器中 93 * 94 * @param r 95 * @param t 96 * @return 97 */ 98 override def addAccumulator(r: mutable.Map[String, Int], t: String): mutable.Map[String, Int] = { 99 r += t -> (1 + r.getOrElse(t, 0)) 100 } 101 102 /** 103 * 合并兩個(gè)數(shù)據(jù) 104 * 105 * @param r1 106 * @param r2 107 * @return 108 */ 109 override def addInPlace(r1: mutable.Map[String, Int], r2: mutable.Map[String, Int]): mutable.Map[String, Int] = { 110 r2.foldLeft(r1)((a, b) => { 111 a += b._1 -> (a.getOrElse(b._1, 0) + b._2) 112 }) 113 } 114 115 /** 116 * 返回初始值 117 * 118 * @param initialValue 119 * @return 120 */ 121 override def zero(initialValue: mutable.Map[String, Int]): mutable.Map[String, Int] = initialValue 122 }

?

轉(zhuǎn)載于:https://www.cnblogs.com/juncaoit/p/6542166.html

總結(jié)

以上是生活随笔為你收集整理的031 广播变量与累加器的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。