spark中的广播变量broadcast
Spark中的Broadcast處理
首先先來看一看broadcast的使用代碼:
val?values?=?List[Int](1,2,3)
val?broadcastValues?=?sparkContext.broadcast(values)
rdd.mapPartitions(iter?=>?{
??broadcastValues.getValue.foreach(println)
})
?
在上面的代碼中,首先生成了一個集合變量,把這個變量通過sparkContext的broadcast函數(shù)進(jìn)行廣播,
最后在rdd的每個partition的迭代時,使用這個廣播變量.
?
接下來看看廣播變量的生成與數(shù)據(jù)的讀取實現(xiàn)部分:
def?broadcast[T:?ClassTag](value:?T):?Broadcast[T]?=?{
??assertNotStopped()
??if?(classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass))?{
這里要注意,使用broadcast時,不能直接對RDD進(jìn)行broadcast的操作.
????//?This?is?a?warning?instead?of?an?exception?in?order?to?avoid?breaking
//???????user?programs?that
????//?might?have?created?RDD?broadcast?variables?but?not?used?them:
????logWarning("Can?not?directly?broadcast?RDDs;?instead,?call?collect()?and?"
??????+?"broadcast?the?result?(see?SPARK-5063)")
??}
?
通過broadcastManager中的newBroadcast函數(shù)來進(jìn)行廣播.
??val?bc?=?env.broadcastManager.newBroadcast[T](value,?isLocal)
??val?callSite?=?getCallSite
??logInfo("Created?broadcast?"?+?bc.id?+?"?from?"?+?callSite.shortForm)
??cleaner.foreach(_.registerBroadcastForCleanup(bc))
??bc
}
?
在BroadcastManager中生成廣播變量的函數(shù),這個函數(shù)直接使用的broadcastFactory的相應(yīng)函數(shù).
broadcastFactory的實例通過配置spark.broadcast.factory,
?????默認(rèn)是TorrentBroadcastFactory.
def?newBroadcast[T:?ClassTag](value_?:?T,?isLocal:?Boolean):?Broadcast[T]?=?{
??broadcastFactory.newBroadcast[T](value_,?isLocal,?
???????nextBroadcastId.getAndIncrement())
}
?
在TorrentBroadcastFactory中生成廣播變量的函數(shù):
在這里面,直接生成了一個TorrentBroadcast的實例.
override?def?newBroadcast[T:?ClassTag](value_?:?T,?isLocal:?Boolean,?id:?Long)
:?Broadcast[T]?=?{
??new?TorrentBroadcast[T](value_,?id)
}
?
TorrentBroadcast實例生成時的處理流程:
這里基本的代碼部分是直接寫入這個要廣播的變量,返回的值是這個變量所占用的block的個數(shù).
Broadcast的block的大小通過spark.broadcast.blockSize配置.默認(rèn)是4MB,
Broadcast的壓縮是否通過spark.broadcast.compress配置,默認(rèn)是true表示啟用,默認(rèn)情況下使用snappy的壓縮.
?
private?val?broadcastId?=?BroadcastBlockId(id)
/**?Total?number?of?blocks?this?broadcast?variable?contains.?*/
private?val?numBlocks:?Int?=?writeBlocks(obj)
?
接下來生成一個lazy的屬性,這個屬性僅僅有在詳細(xì)的使用時,才會運行,在實例生成時不運行(上面的演示樣例中的getValue.foreach時運行).
@transient?private?lazy?val?_value:?T?=?readBroadcastBlock()
override?protected?def?getValue()?=?{
??_value
}
?
看看實例生成時的writeBlocks的函數(shù):
private?def?writeBlocks(value:?T):?Int?=?{
這里先把這個廣播變量保存一份到當(dāng)前的task的storage中,這樣做是保證在讀取時,假設(shè)要使用這個廣播變量的task就是本地的task時,直接從blockManager中本地讀取.
??SparkEnv.get.blockManager.putSingle(broadcastId,?value,?
StorageLevel.MEMORY_AND_DISK,
????tellMaster?=?false)
?
這里依據(jù)block的設(shè)置大小,對value進(jìn)行序列化/壓縮分塊,每個塊的大小為blocksize的大小,
??val?blocks?=
????TorrentBroadcast.blockifyObject(value,?blockSize,?SparkEnv.get.serializer,?
????compressionCodec)
?
這里把序列化并壓縮分塊后的blocks進(jìn)行迭代,存儲到blockManager中,
??blocks.zipWithIndex.foreach?{?case?(block,?i)?=>
????SparkEnv.get.blockManager.putBytes(
??????BroadcastBlockId(id,?"piece"?+?i),
??????block,
??????StorageLevel.MEMORY_AND_DISK_SER,
??????tellMaster?=?true)
??}
這個函數(shù)的返回值是一個int類型的值,這個值就是序列化壓縮存儲后block的個數(shù).
??blocks.length
}
?
在我們的演示樣例中,使用getValue時,會運行實例初始化時定義的lazy的函數(shù)readBroadcastBlock:
private?def?readBroadcastBlock():?T?=?Utils.tryOrIOException?{
??TorrentBroadcast.synchronized?{
????setConf(SparkEnv.get.conf)
這里先從local端的blockmanager中直接讀取storage中相應(yīng)此廣播變量的內(nèi)容,假設(shè)能讀取到,表示這個廣播變量已經(jīng)讀取過來或者說這個task就是廣播的本地executor.
????SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next())?match?{
??????case?Some(x)?=>
????????x.asInstanceOf[T]
以下這部分運行時,表示這個廣播變量在當(dāng)前的executor中是第一次讀取,通過readBlocks函數(shù)去讀取這個廣播變量的全部的blocks,反序列化后,直接把這個廣播變量存儲到本地的blockManager中,下次讀取時,就能夠直接從本地進(jìn)行讀取.
??????case?None?=>
????????logInfo("Started?reading?broadcast?variable?"?+?id)
????????val?startTimeMs?=?System.currentTimeMillis()
????????val?blocks?=?readBlocks()
????????logInfo("Reading?broadcast?variable?"?+?id?+?"?took"?+?
??????????????Utils.getUsedTimeMs(startTimeMs))
????????val?obj?=?TorrentBroadcast.unBlockifyObject[T](
??????????blocks,?SparkEnv.get.serializer,?compressionCodec)
????????//?Store?the?merged?copy?in?BlockManager?so?other?tasks?on?this?executor?don't
????????//?need?to?re-fetch?it.
????????SparkEnv.get.blockManager.putSingle(
??????????broadcastId,?obj,?StorageLevel.MEMORY_AND_DISK,?tellMaster?=?false)
????????obj
????}
??}
}
?
最后再看看readBlocks函數(shù)的處理流程:
private?def?readBlocks():?Array[ByteBuffer]?=?{
這里定義的變量用于存儲讀取到的block的信息,numBlocks是廣播變量序列化后所占用的block的個數(shù).
??val?blocks?=?new?Array[ByteBuffer](numBlocks)
??val?bm?=?SparkEnv.get.blockManager
這里開始迭代讀取每個block的內(nèi)容,這里的讀取是先從local中進(jìn)行讀取,假設(shè)local中沒有讀取到數(shù)據(jù)時,通過blockManager讀取遠(yuǎn)端的數(shù)據(jù),通過讀取這個block相應(yīng)的location從這個location去讀取這個block的內(nèi)容,并存儲到本地的blockManager中.最后,這個函數(shù)返回讀取到的blocks的集合.
??for?(pid?<-?Random.shuffle(Seq.range(0,?numBlocks)))?{
????val?pieceId?=?BroadcastBlockId(id,?"piece"?+?pid)
????logDebug(s"Reading?piece?$pieceId?of?$broadcastId")
????def?getLocal:?Option[ByteBuffer]?=?bm.getLocalBytes(pieceId)
????def?getRemote:?Option[ByteBuffer]?=?bm.getRemoteBytes(pieceId).map?{?block?=>
??????SparkEnv.get.blockManager.putBytes(
????????pieceId,
????????block,
????????StorageLevel.MEMORY_AND_DISK_SER,
????????tellMaster?=?true)
??????block
????}
????val?block:?ByteBuffer?=?getLocal.orElse(getRemote).getOrElse(
??????throw?new?SparkException(s"Failed?to?get?$pieceId?of?$broadcastId"))
????blocks(pid)?=?block
??}
??blocks
}
總結(jié)
以上是生活随笔為你收集整理的spark中的广播变量broadcast的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kill -0 pid是做什么用的?
- 下一篇: 我们工作的意义到底在哪?