理解spark闭包以及broadcast(转载)
什么叫閉包:
跨作用域訪問(wèn)函數(shù)變量。
又指的一個(gè)擁有許多變量和綁定了這些變量的環(huán)境的表達(dá)式(通常是一個(gè)函數(shù)),因而這些變量也是該表達(dá)式的一部分。
Spark閉包的問(wèn)題引出:?
在spark中實(shí)現(xiàn)統(tǒng)計(jì)List(1,2,3)的和。如果使用下面的代碼,程序打印的結(jié)果不是6,而是0。這個(gè)和我們編寫(xiě)單機(jī)程序的認(rèn)識(shí)有很大不同。為什么呢?
test.scala代碼如下:
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val rdd = spark.sparkContext.parallelize(List(1,2,3))var counter = 0//warn: don't do thisrdd.foreach(x => counter += x)println("Counter value: "+counter)spark.sparkContext.stop() } }運(yùn)行方法:
scala -classpath $(echo *.jar ~/bigdata/spark-2.3.1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') test.scala
?
問(wèn)題分析:?
counter是在foreach函數(shù)外部定義的,也就是在driver程序中定義,而foreach函數(shù)是屬于rdd對(duì)象的,rdd函數(shù)的執(zhí)行位置是各個(gè)worker節(jié)點(diǎn)(或者說(shuō)worker進(jìn)程),main函數(shù)是在driver節(jié)點(diǎn)上(或者說(shuō)driver進(jìn)程上)執(zhí)行的,所以當(dāng)counter變量在driver中定義,被在rdd中使用的時(shí)候,出現(xiàn)了變量的“跨域”問(wèn)題,也就是閉包問(wèn)題。
問(wèn)題解釋:?
對(duì)于上面程序中的counter變量,由于在main函數(shù)和在rdd對(duì)象的foreach函數(shù)是屬于不同“閉包”的,所以,傳進(jìn)foreach中的counter是一個(gè)副本,初始值都為0。foreach中疊加的是counter的副本,不管副本如何變化,都不會(huì)影響到main函數(shù)中的counter,所以最終打印出來(lái)的counter為0.
當(dāng)用戶提交了一個(gè)用scala語(yǔ)言寫(xiě)的Spark程序,Spark框架會(huì)調(diào)用哪些組件呢?首先,這個(gè)Spark程序就是一個(gè)“Application”,程序里面的mian函數(shù)就是“Driver Program”, 前面已經(jīng)講到它的作用,只是,dirver程序的可能運(yùn)行在客戶端,也有可有可能運(yùn)行在spark集群中,這取決于spark作業(yè)提交時(shí)參數(shù)的選定,比如,yarn-client和yarn-cluster就是分別運(yùn)行在客戶端和spark集群中。在driver程序中會(huì)有RDD對(duì)象的相關(guān)代碼操作,比如下面代碼的newRDD.map()
class Test{def main(args: Array[String]) {val sc = new SparkContext(new SparkConf())val newRDD = sc.textFile("")newRDD.map(data => {//do somethingprintln(data.toString)})} }涉及到RDD的代碼,比如上面RDD的map操作,它們是在Worker節(jié)點(diǎn)上面運(yùn)行的,所以spark會(huì)透明地幫用戶把這些涉及到RDD操作的代碼傳給相應(yīng)的worker節(jié)點(diǎn)。
如果在RDD map函數(shù)中調(diào)用了在函數(shù)外部定義的對(duì)象,因?yàn)檫@些對(duì)象需要通過(guò)網(wǎng)絡(luò)從driver所在節(jié)點(diǎn)傳給其他的worker節(jié)點(diǎn),所以要求這些類是可序列化的,比如在Java或者scala中實(shí)現(xiàn)Serializable類,除了java這種序列化機(jī)制,還可以選擇其他方式,使得序列化工作更加高效。
worker節(jié)點(diǎn)接收到程序之后,在spark資源管理器的指揮下運(yùn)行RDD程序。
不同worker節(jié)點(diǎn)之間的運(yùn)行操作是并行的。
? 在worker節(jié)點(diǎn)上所運(yùn)行的RDD中代碼的變量是保存在worker節(jié)點(diǎn)上面的,在spark編程中,很多時(shí)候用戶需要在driver程序中進(jìn)行相關(guān)數(shù)據(jù)操作之后把該數(shù)據(jù)傳給RDD對(duì)象的方法以做進(jìn)一步處理,這時(shí)候,spark框架會(huì)自動(dòng)幫用戶把這些數(shù)據(jù)通過(guò)網(wǎng)絡(luò)傳給相應(yīng)的worker節(jié)點(diǎn)。
除了這種以變量的形式定義傳輸數(shù)據(jù)到worker節(jié)點(diǎn)之外,spark還另外提供了兩種機(jī)制,分別是broadcast和accumulator。
相比于變量的方式,在一定場(chǎng)景下使用broadcast比較有優(yōu)勢(shì),因?yàn)樗鶑V播的數(shù)據(jù)在每一個(gè)worker節(jié)點(diǎn)上面只存一個(gè)副本,而在spark算子中使用到的外部變量會(huì)在每一個(gè)用到它的task中保存一個(gè)副本,即使這些task在同一個(gè)節(jié)點(diǎn)上面。
所以當(dāng)數(shù)據(jù)量比較大的時(shí)候,建議使用廣播而不是外部變量。
#####################以上是轉(zhuǎn)載的內(nèi)容###########################
好了,這里加點(diǎn)東西,
如果是broadcast方式如何使用呢?代碼如下:
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val broadcastVar = spark.sparkContext.broadcast(Array("orange","apple","pear","orange"))val dictionary = Map(("man"-> "noun"), ("is"->"verb"),("mortal"->"adjective"))def getElementsCount(word :String, dictionary:Map[String,String]):(String,Int) = {dictionary.filter{ case (wording,wordType) => wording.equals((word))}.map(x => (x._2,1)).headOption.getOrElse(("unknown" -> 1))//some dummy logic}val words = spark.sparkContext.parallelize(Array("man","is","mortal","mortal","1234","789","456","is","man"))val grammarElementCounts = words.map( word =>getElementsCount(word,dictionary)).reduceByKey((x,y) => x+y)grammarElementCounts.collect().foreach(println)spark.sparkContext.stop() } }運(yùn)行方式是:
scala -classpath $(echo *.jar ~/bigdata/spark-2.3.
1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') broadcast_test.scala
運(yùn)行結(jié)果:
(adjective,2)
(noun,2)
(verb,2)
(unknown,3)
?
?
如果是accumulate的方式如何計(jì)數(shù)呢?accumulate_test.scala代碼如下:
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val rdd = spark.sparkContext.parallelize(List(1,2,3))var counter = spark.sparkContext.accumulator(0)//warn: don't do thisrdd.foreach(x => counter += x)println("Counter value: "+counter)spark.sparkContext.stop() } }運(yùn)行方法:
scala -classpath $(echo *.jar ~/bigdata/spark-2.3.
1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') accumulate_test.scala
運(yùn)行結(jié)果:
6
#######################################
另外關(guān)于Node數(shù)量和Executor數(shù)量放個(gè)圖
?
參考文獻(xiàn):
https://blog.csdn.net/liangyihuai/article/details/56840473
https://www.cnblogs.com/sunshisonghit/p/6063296.html?utm_source=itdadao&utm_medium=referral
http://www.huaxiaozhuan.com/%E5%B7%A5%E5%85%B7/spark/chapters/04_acc_broadcast.html
https://blog.knoldus.com/broadcast-variables-in-spark-how-and-when-to-use-them/
?
?
總結(jié)
以上是生活随笔為你收集整理的理解spark闭包以及broadcast(转载)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: ubuntu下面的背光键盘的使用
- 下一篇: 一句话讲清楚RDD是什么?