spark中的广播变量broadcast
Spark中的Broadcast處理
首先先來(lái)看一看broadcast的使用代碼:
val?values?=?List[Int](1,2,3)
val?broadcastValues?=?sparkContext.broadcast(values)
rdd.mapPartitions(iter?=>?{
??broadcastValues.getValue.foreach(println)
})
?
在上面的代碼中,首先生成了一個(gè)集合變量,把這個(gè)變量通過(guò)sparkContext的broadcast函數(shù)進(jìn)行廣播,
最后在rdd的每個(gè)partition的迭代時(shí),使用這個(gè)廣播變量.
?
接下來(lái)看看廣播變量的生成與數(shù)據(jù)的讀取實(shí)現(xiàn)部分:
def?broadcast[T:?ClassTag](value:?T):?Broadcast[T]?=?{
??assertNotStopped()
??if?(classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass))?{
這里要注意,使用broadcast時(shí),不能直接對(duì)RDD進(jìn)行broadcast的操作.
????//?This?is?a?warning?instead?of?an?exception?in?order?to?avoid?breaking
//???????user?programs?that
????//?might?have?created?RDD?broadcast?variables?but?not?used?them:
????logWarning("Can?not?directly?broadcast?RDDs;?instead,?call?collect()?and?"
??????+?"broadcast?the?result?(see?SPARK-5063)")
??}
?
通過(guò)broadcastManager中的newBroadcast函數(shù)來(lái)進(jìn)行廣播.
??val?bc?=?env.broadcastManager.newBroadcast[T](value,?isLocal)
??val?callSite?=?getCallSite
??logInfo("Created?broadcast?"?+?bc.id?+?"?from?"?+?callSite.shortForm)
??cleaner.foreach(_.registerBroadcastForCleanup(bc))
??bc
}
?
在BroadcastManager中生成廣播變量的函數(shù),這個(gè)函數(shù)直接使用的broadcastFactory的相應(yīng)函數(shù).
broadcastFactory的實(shí)例通過(guò)配置spark.broadcast.factory,
?????默認(rèn)是TorrentBroadcastFactory.
def?newBroadcast[T:?ClassTag](value_?:?T,?isLocal:?Boolean):?Broadcast[T]?=?{
??broadcastFactory.newBroadcast[T](value_,?isLocal,?
???????nextBroadcastId.getAndIncrement())
}
?
在TorrentBroadcastFactory中生成廣播變量的函數(shù):
在這里面,直接生成了一個(gè)TorrentBroadcast的實(shí)例.
override?def?newBroadcast[T:?ClassTag](value_?:?T,?isLocal:?Boolean,?id:?Long)
:?Broadcast[T]?=?{
??new?TorrentBroadcast[T](value_,?id)
}
?
TorrentBroadcast實(shí)例生成時(shí)的處理流程:
這里基本的代碼部分是直接寫(xiě)入這個(gè)要廣播的變量,返回的值是這個(gè)變量所占用的block的個(gè)數(shù).
Broadcast的block的大小通過(guò)spark.broadcast.blockSize配置.默認(rèn)是4MB,
Broadcast的壓縮是否通過(guò)spark.broadcast.compress配置,默認(rèn)是true表示啟用,默認(rèn)情況下使用snappy的壓縮.
?
private?val?broadcastId?=?BroadcastBlockId(id)
/**?Total?number?of?blocks?this?broadcast?variable?contains.?*/
private?val?numBlocks:?Int?=?writeBlocks(obj)
?
接下來(lái)生成一個(gè)lazy的屬性,這個(gè)屬性僅僅有在詳細(xì)的使用時(shí),才會(huì)運(yùn)行,在實(shí)例生成時(shí)不運(yùn)行(上面的演示樣例中的getValue.foreach時(shí)運(yùn)行).
@transient?private?lazy?val?_value:?T?=?readBroadcastBlock()
override?protected?def?getValue()?=?{
??_value
}
?
看看實(shí)例生成時(shí)的writeBlocks的函數(shù):
private?def?writeBlocks(value:?T):?Int?=?{
這里先把這個(gè)廣播變量保存一份到當(dāng)前的task的storage中,這樣做是保證在讀取時(shí),假設(shè)要使用這個(gè)廣播變量的task就是本地的task時(shí),直接從blockManager中本地讀取.
??SparkEnv.get.blockManager.putSingle(broadcastId,?value,?
StorageLevel.MEMORY_AND_DISK,
????tellMaster?=?false)
?
這里依據(jù)block的設(shè)置大小,對(duì)value進(jìn)行序列化/壓縮分塊,每個(gè)塊的大小為blocksize的大小,
??val?blocks?=
????TorrentBroadcast.blockifyObject(value,?blockSize,?SparkEnv.get.serializer,?
????compressionCodec)
?
這里把序列化并壓縮分塊后的blocks進(jìn)行迭代,存儲(chǔ)到blockManager中,
??blocks.zipWithIndex.foreach?{?case?(block,?i)?=>
????SparkEnv.get.blockManager.putBytes(
??????BroadcastBlockId(id,?"piece"?+?i),
??????block,
??????StorageLevel.MEMORY_AND_DISK_SER,
??????tellMaster?=?true)
??}
這個(gè)函數(shù)的返回值是一個(gè)int類(lèi)型的值,這個(gè)值就是序列化壓縮存儲(chǔ)后block的個(gè)數(shù).
??blocks.length
}
?
在我們的演示樣例中,使用getValue時(shí),會(huì)運(yùn)行實(shí)例初始化時(shí)定義的lazy的函數(shù)readBroadcastBlock:
private?def?readBroadcastBlock():?T?=?Utils.tryOrIOException?{
??TorrentBroadcast.synchronized?{
????setConf(SparkEnv.get.conf)
這里先從local端的blockmanager中直接讀取storage中相應(yīng)此廣播變量的內(nèi)容,假設(shè)能讀取到,表示這個(gè)廣播變量已經(jīng)讀取過(guò)來(lái)或者說(shuō)這個(gè)task就是廣播的本地executor.
????SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next())?match?{
??????case?Some(x)?=>
????????x.asInstanceOf[T]
以下這部分運(yùn)行時(shí),表示這個(gè)廣播變量在當(dāng)前的executor中是第一次讀取,通過(guò)readBlocks函數(shù)去讀取這個(gè)廣播變量的全部的blocks,反序列化后,直接把這個(gè)廣播變量存儲(chǔ)到本地的blockManager中,下次讀取時(shí),就能夠直接從本地進(jìn)行讀取.
??????case?None?=>
????????logInfo("Started?reading?broadcast?variable?"?+?id)
????????val?startTimeMs?=?System.currentTimeMillis()
????????val?blocks?=?readBlocks()
????????logInfo("Reading?broadcast?variable?"?+?id?+?"?took"?+?
??????????????Utils.getUsedTimeMs(startTimeMs))
????????val?obj?=?TorrentBroadcast.unBlockifyObject[T](
??????????blocks,?SparkEnv.get.serializer,?compressionCodec)
????????//?Store?the?merged?copy?in?BlockManager?so?other?tasks?on?this?executor?don't
????????//?need?to?re-fetch?it.
????????SparkEnv.get.blockManager.putSingle(
??????????broadcastId,?obj,?StorageLevel.MEMORY_AND_DISK,?tellMaster?=?false)
????????obj
????}
??}
}
?
最后再看看readBlocks函數(shù)的處理流程:
private?def?readBlocks():?Array[ByteBuffer]?=?{
這里定義的變量用于存儲(chǔ)讀取到的block的信息,numBlocks是廣播變量序列化后所占用的block的個(gè)數(shù).
??val?blocks?=?new?Array[ByteBuffer](numBlocks)
??val?bm?=?SparkEnv.get.blockManager
這里開(kāi)始迭代讀取每個(gè)block的內(nèi)容,這里的讀取是先從local中進(jìn)行讀取,假設(shè)local中沒(méi)有讀取到數(shù)據(jù)時(shí),通過(guò)blockManager讀取遠(yuǎn)端的數(shù)據(jù),通過(guò)讀取這個(gè)block相應(yīng)的location從這個(gè)location去讀取這個(gè)block的內(nèi)容,并存儲(chǔ)到本地的blockManager中.最后,這個(gè)函數(shù)返回讀取到的blocks的集合.
??for?(pid?<-?Random.shuffle(Seq.range(0,?numBlocks)))?{
????val?pieceId?=?BroadcastBlockId(id,?"piece"?+?pid)
????logDebug(s"Reading?piece?$pieceId?of?$broadcastId")
????def?getLocal:?Option[ByteBuffer]?=?bm.getLocalBytes(pieceId)
????def?getRemote:?Option[ByteBuffer]?=?bm.getRemoteBytes(pieceId).map?{?block?=>
??????SparkEnv.get.blockManager.putBytes(
????????pieceId,
????????block,
????????StorageLevel.MEMORY_AND_DISK_SER,
????????tellMaster?=?true)
??????block
????}
????val?block:?ByteBuffer?=?getLocal.orElse(getRemote).getOrElse(
??????throw?new?SparkException(s"Failed?to?get?$pieceId?of?$broadcastId"))
????blocks(pid)?=?block
??}
??blocks
}
總結(jié)
以上是生活随笔為你收集整理的spark中的广播变量broadcast的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: kill -0 pid是做什么用的?
- 下一篇: 我们工作的意义到底在哪?