spark广播变量 和 累加器
1 為什么使用廣播變量 和 累加器
變量存在的問(wèn)題:在spark程序中,當(dāng)一個(gè)傳遞給Spark操作(例如map和reduce)的函數(shù)在遠(yuǎn)程節(jié)點(diǎn)上面運(yùn)行時(shí),Spark操作實(shí)際上操作的是這個(gè)函數(shù)所用變量的一個(gè)獨(dú)立副本。這些變量會(huì)被復(fù)制到每臺(tái)機(jī)器上,并且這些變量在遠(yuǎn)程機(jī)器上的所有更新都不會(huì)傳遞回驅(qū)動(dòng)程序,通常跨任務(wù)的讀寫變量是低效的。
廣播變量的目的就是解決變量存在的問(wèn)題,變量聲明為廣播變量,那么知識(shí)每個(gè)executor擁有一份,這個(gè)executor啟動(dòng)的task會(huì)共享這個(gè)變量,節(jié)省了通信的成本和服務(wù)器的資源。
總的來(lái)說(shuō):累加器是用來(lái)對(duì)信息進(jìn)行聚合,廣播變量是用來(lái)分發(fā)較大的只讀對(duì)象。
?
2 如何定義? 和? 還原? 廣播變量
int a = 3; Broadcast<Integer> broadcast = sc.broadcast(a); //定義廣播變量int c = broadcast.value; //還原廣播變量
?
3 廣播變量注意事項(xiàng)
(1)變量一旦被定義為一個(gè)廣播變量,那么這個(gè)變量只能讀,不能修改
(2)能不能將一個(gè)RDD使用廣播變量廣播出去?
?????? 不能,因?yàn)镽DD是不存儲(chǔ)數(shù)據(jù)的。可以將RDD的結(jié)果廣播出去。
(3) 廣播變量只能在Driver端定義,不能在Executor端定義。
(4) 在Driver端可以修改廣播變量的值,在Executor端無(wú)法修改廣播變量的值。
(5)如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。
(6)如果Executor端用到了Driver的變量,如果使用廣播變量在每個(gè)Executor中只有一份Driver端的變量副本。
?
4 廣播變量的優(yōu)化
當(dāng)廣播一個(gè)比較大的值時(shí),選擇既快又好的序列化格式是很重要的。因?yàn)槿绻蛄谢瘜?duì)象的時(shí)間很長(zhǎng)或者傳送時(shí)間太久,這段時(shí)間很容易出現(xiàn)性能瓶頸。
默認(rèn)情況下,spark會(huì)使用java內(nèi)建的序列化庫(kù)。建議選擇kryo序列化工具,使用方法設(shè)置spark.serializer為org.apache.spark.serializer.KryoSerializer;
最好強(qiáng)制要求這種注冊(cè),設(shè)置spark.kryo.registrationRequired為true;
SparkConf conf = new SparkConf();conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");conf.set("spark.kryo.registrationRequired","true");conf.registerKryoClasses(Array(classOf[myClass]),classOf(MyOtherClass));這樣還會(huì)有其他的問(wèn)題,如果代碼中引用的類沒(méi)有序列化,會(huì)報(bào)異常,最簡(jiǎn)單的方式是實(shí)現(xiàn)序列化接口。
?
5 累加器和定義和還原
累加器只是一個(gè)只寫變量
LongAccumulator accumulator = new LongAccumulator();accumulator.add(1);long count = accumulator.count();?
?
參考文獻(xiàn):扎心了,老鐵
轉(zhuǎn)載于:https://www.cnblogs.com/parent-absent-son/p/9956574.html
總結(jié)
以上是生活随笔為你收集整理的spark广播变量 和 累加器的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: removeTask
- 下一篇: 激活prompt