日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark编程指引(四)----共享变量(广播变量和累加器)

發(fā)布時(shí)間:2024/9/27 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark编程指引(四)----共享变量(广播变量和累加器) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

轉(zhuǎn)自:http://blog.csdn.net/happyanger6/article/details/46576831

共享變量
通常情況下,當(dāng)向Spark操作(如map,reduce)傳遞一個(gè)函數(shù)時(shí),它會(huì)在一個(gè)遠(yuǎn)程集群節(jié)點(diǎn)上執(zhí)行,它會(huì)使用函數(shù)中所有變量的副本。這些變量被復(fù)制到所有的機(jī)器上,遠(yuǎn)程機(jī)器上并沒有被更新的變量會(huì)向驅(qū)動(dòng)程序回傳。在任務(wù)之間使用通用的,支持讀寫的共享變量是低效的。盡管如此,Spark提供了兩種有限類型的共享變量,廣播變量和累加器。

廣播變量
廣播變量允許程序員將一個(gè)只讀的變量緩存在每臺(tái)機(jī)器上,而不用在任務(wù)之間傳遞變量。廣播變量可被用于有效地給每個(gè)節(jié)點(diǎn)一個(gè)大輸入數(shù)據(jù)集的副本。Spark還嘗試使用高效地廣播算法來分發(fā)變量,進(jìn)而減少通信的開銷。
Spark的動(dòng)作通過一系列的步驟執(zhí)行,這些步驟由分布式的洗牌操作分開。Spark自動(dòng)地廣播每個(gè)步驟每個(gè)任務(wù)需要的通用數(shù)據(jù)。這些廣播數(shù)據(jù)被序列化地緩存,在運(yùn)行任務(wù)之前被反序列化出來。這意味著當(dāng)我們需要在多個(gè)階段的任務(wù)之間使用相同的數(shù)據(jù),或者以反序列化形式緩存數(shù)據(jù)是十分重要的時(shí)候,顯式地創(chuàng)建廣播變量才有用。

通過在一個(gè)變量v上調(diào)用SparkContext.broadcast(v)可以創(chuàng)建廣播變量。廣播變量是圍繞著v的封裝,可以通過value方法訪問這個(gè)變量。舉例如下:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
在創(chuàng)建了廣播變量之后,在集群上的所有函數(shù)中應(yīng)該使用它來替代使用v.這樣v就不會(huì)不止一次地在節(jié)點(diǎn)之間傳輸了。另外,為了確保所有的節(jié)點(diǎn)獲得相同的變量,對(duì)象v在被廣播之后就不應(yīng)該再修改。

累加器
累加器是僅僅被相關(guān)操作累加的變量,因此可以在并行中被有效地支持。它可以被用來實(shí)現(xiàn)計(jì)數(shù)器和總和。Spark原生地只支持?jǐn)?shù)字類型的累加器,編程者可以添加新類型的支持。如果創(chuàng)建累加器時(shí)指定了名字,可以在Spark的UI界面看到。這有利于理解每個(gè)執(zhí)行階段的進(jìn)程。(對(duì)于Python還不支持)
累加器通過對(duì)一個(gè)初始化了的變量v調(diào)用SparkContext.accumulator(v)來創(chuàng)建。在集群上運(yùn)行的任務(wù)可以通過add或者”+=”方法在累加器上進(jìn)行累加操作。但是,它們不能讀取它的值。只有驅(qū)動(dòng)程序能夠讀取它的值,通過累加器的value方法。
下面的代碼展示了如何把一個(gè)數(shù)組中的所有元素累加到累加器上:
scala> val accum = sc.accumulator(0, “My Accumulator”)
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10
盡管上面的例子使用了內(nèi)置支持的累加器類型Int,但是開發(fā)人員也可以通過繼承AccumulatorParam類來創(chuàng)建它們自己的累加器類型。AccumulatorParam接口有兩個(gè)方法:
zero方法為你的類型提供一個(gè)0值。
addInPlace方法將兩個(gè)值相加。
假設(shè)我們有一個(gè)代表數(shù)學(xué)vector的Vector類。我們可以向下面這樣實(shí)現(xiàn):
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(…))(VectorAccumulatorParam)
在Scala里,Spark提供更通用的累加接口來累加數(shù)據(jù),盡管結(jié)果的類型和累加的數(shù)據(jù)類型可能不一致(例如,通過收集在一起的元素來創(chuàng)建一個(gè)列表)。同時(shí),SparkContext..accumulableCollection方法來累加通用的Scala的集合類型。
累加器僅僅在動(dòng)作操作內(nèi)部被更新,Spark保證每個(gè)任務(wù)在累加器上的更新操作只被執(zhí)行一次,也就是說,重啟任務(wù)也不會(huì)更新。在轉(zhuǎn)換操作中,用戶必須意識(shí)到每個(gè)任務(wù)對(duì)累加器的更新操作可能被不只一次執(zhí)行,如果重新執(zhí)行了任務(wù)和作業(yè)的階段。
累加器并沒有改變Spark的惰性求值模型。如果它們被RDD上的操作更新,它們的值只有當(dāng)RDD因?yàn)閯?dòng)作操作被計(jì)算時(shí)才被更新。因此,當(dāng)執(zhí)行一個(gè)惰性的轉(zhuǎn)換操作,比如map時(shí),不能保證對(duì)累加器值的更新被實(shí)際執(zhí)行了。下面的代碼片段演示了此特性:
val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
//在這里,accum的值仍然是0,因?yàn)闆]有動(dòng)作操作引起map被實(shí)際的計(jì)算.

總結(jié)

以上是生活随笔為你收集整理的Spark编程指引(四)----共享变量(广播变量和累加器)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。