日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark中的广播变量broadcast

發(fā)布時間:2024/4/13 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark中的广播变量broadcast 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Spark中的Broadcast處理

首先先來看一看broadcast的使用代碼:

val?values?=?List[Int](1,2,3)

val?broadcastValues?=?sparkContext.broadcast(values)

rdd.mapPartitions(iter?=>?{

??broadcastValues.getValue.foreach(println)

})

?

在上面的代碼中,首先生成了一個集合變量,把這個變量通過sparkContext的broadcast函數(shù)進(jìn)行廣播,

最后在rdd的每個partition的迭代時,使用這個廣播變量.

?

接下來看看廣播變量的生成與數(shù)據(jù)的讀取實現(xiàn)部分:

def?broadcast[T:?ClassTag](value:?T):?Broadcast[T]?=?{
??assertNotStopped()
??if?(classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass))?{

這里要注意,使用broadcast時,不能直接對RDD進(jìn)行broadcast的操作.
????//?This?is?a?warning?instead?of?an?exception?in?order?to?avoid?breaking

//???????user?programs?that
????//?might?have?created?RDD?broadcast?variables?but?not?used?them:
????logWarning("Can?not?directly?broadcast?RDDs;?instead,?call?collect()?and?"
??????+?"broadcast?the?result?(see?SPARK-5063)")
??}

?

通過broadcastManager中的newBroadcast函數(shù)來進(jìn)行廣播.
??val?bc?=?env.broadcastManager.newBroadcast[T](value,?isLocal)
??val?callSite?=?getCallSite
??logInfo("Created?broadcast?"?+?bc.id?+?"?from?"?+?callSite.shortForm)
??cleaner.foreach(_.registerBroadcastForCleanup(bc))
??bc
}

?

在BroadcastManager中生成廣播變量的函數(shù),這個函數(shù)直接使用的broadcastFactory的相應(yīng)函數(shù).

broadcastFactory的實例通過配置spark.broadcast.factory,

?????默認(rèn)是TorrentBroadcastFactory.

def?newBroadcast[T:?ClassTag](value_?:?T,?isLocal:?Boolean):?Broadcast[T]?=?{
??broadcastFactory.newBroadcast[T](value_,?isLocal,?

???????nextBroadcastId.getAndIncrement())
}

?

在TorrentBroadcastFactory中生成廣播變量的函數(shù):

在這里面,直接生成了一個TorrentBroadcast的實例.

override?def?newBroadcast[T:?ClassTag](value_?:?T,?isLocal:?Boolean,?id:?Long)

:?Broadcast[T]?=?{
??new?TorrentBroadcast[T](value_,?id)
}

?

TorrentBroadcast實例生成時的處理流程:

這里基本的代碼部分是直接寫入這個要廣播的變量,返回的值是這個變量所占用的block的個數(shù).

Broadcast的block的大小通過spark.broadcast.blockSize配置.默認(rèn)是4MB,

Broadcast的壓縮是否通過spark.broadcast.compress配置,默認(rèn)是true表示啟用,默認(rèn)情況下使用snappy的壓縮.

?

private?val?broadcastId?=?BroadcastBlockId(id)
/**?Total?number?of?blocks?this?broadcast?variable?contains.?*/
private?val?numBlocks:?Int?=?writeBlocks(obj)

?

接下來生成一個lazy的屬性,這個屬性僅僅有在詳細(xì)的使用時,才會運行,在實例生成時不運行(上面的演示樣例中的getValue.foreach時運行).

@transient?private?lazy?val?_value:?T?=?readBroadcastBlock()

override?protected?def?getValue()?=?{
??_value
}

?

看看實例生成時的writeBlocks的函數(shù):

private?def?writeBlocks(value:?T):?Int?=?{

這里先把這個廣播變量保存一份到當(dāng)前的task的storage中,這樣做是保證在讀取時,假設(shè)要使用這個廣播變量的task就是本地的task時,直接從blockManager中本地讀取.
??SparkEnv.get.blockManager.putSingle(broadcastId,?value,?

StorageLevel.MEMORY_AND_DISK,
????tellMaster?=?false)

?

這里依據(jù)block的設(shè)置大小,對value進(jìn)行序列化/壓縮分塊,每個塊的大小為blocksize的大小,
??val?blocks?=
????TorrentBroadcast.blockifyObject(value,?blockSize,?SparkEnv.get.serializer,?

????compressionCodec)

?

這里把序列化并壓縮分塊后的blocks進(jìn)行迭代,存儲到blockManager中,
??blocks.zipWithIndex.foreach?{?case?(block,?i)?=>
????SparkEnv.get.blockManager.putBytes(
??????BroadcastBlockId(id,?"piece"?+?i),
??????block,
??????StorageLevel.MEMORY_AND_DISK_SER,
??????tellMaster?=?true)
??}

這個函數(shù)的返回值是一個int類型的值,這個值就是序列化壓縮存儲后block的個數(shù).
??blocks.length
}

?

在我們的演示樣例中,使用getValue時,會運行實例初始化時定義的lazy的函數(shù)readBroadcastBlock:

private?def?readBroadcastBlock():?T?=?Utils.tryOrIOException?{
??TorrentBroadcast.synchronized?{
????setConf(SparkEnv.get.conf)

這里先從local端的blockmanager中直接讀取storage中相應(yīng)此廣播變量的內(nèi)容,假設(shè)能讀取到,表示這個廣播變量已經(jīng)讀取過來或者說這個task就是廣播的本地executor.
????SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next())?match?{
??????case?Some(x)?=>
????????x.asInstanceOf[T]

以下這部分運行時,表示這個廣播變量在當(dāng)前的executor中是第一次讀取,通過readBlocks函數(shù)去讀取這個廣播變量的全部的blocks,反序列化后,直接把這個廣播變量存儲到本地的blockManager中,下次讀取時,就能夠直接從本地進(jìn)行讀取.
??????case?None?=>
????????logInfo("Started?reading?broadcast?variable?"?+?id)
????????val?startTimeMs?=?System.currentTimeMillis()
????????val?blocks?=?readBlocks()
????????logInfo("Reading?broadcast?variable?"?+?id?+?"?took"?+?

??????????????Utils.getUsedTimeMs(startTimeMs))

????????val?obj?=?TorrentBroadcast.unBlockifyObject[T](
??????????blocks,?SparkEnv.get.serializer,?compressionCodec)
????????//?Store?the?merged?copy?in?BlockManager?so?other?tasks?on?this?executor?don't
????????//?need?to?re-fetch?it.
????????SparkEnv.get.blockManager.putSingle(
??????????broadcastId,?obj,?StorageLevel.MEMORY_AND_DISK,?tellMaster?=?false)
????????obj
????}
??}
}

?

最后再看看readBlocks函數(shù)的處理流程:

private?def?readBlocks():?Array[ByteBuffer]?=?{

這里定義的變量用于存儲讀取到的block的信息,numBlocks是廣播變量序列化后所占用的block的個數(shù).
??val?blocks?=?new?Array[ByteBuffer](numBlocks)
??val?bm?=?SparkEnv.get.blockManager

這里開始迭代讀取每個block的內(nèi)容,這里的讀取是先從local中進(jìn)行讀取,假設(shè)local中沒有讀取到數(shù)據(jù)時,通過blockManager讀取遠(yuǎn)端的數(shù)據(jù),通過讀取這個block相應(yīng)的location從這個location去讀取這個block的內(nèi)容,并存儲到本地的blockManager中.最后,這個函數(shù)返回讀取到的blocks的集合.
??for?(pid?<-?Random.shuffle(Seq.range(0,?numBlocks)))?{
????val?pieceId?=?BroadcastBlockId(id,?"piece"?+?pid)
????logDebug(s"Reading?piece?$pieceId?of?$broadcastId")

????def?getLocal:?Option[ByteBuffer]?=?bm.getLocalBytes(pieceId)
????def?getRemote:?Option[ByteBuffer]?=?bm.getRemoteBytes(pieceId).map?{?block?=>
??????SparkEnv.get.blockManager.putBytes(
????????pieceId,
????????block,
????????StorageLevel.MEMORY_AND_DISK_SER,
????????tellMaster?=?true)
??????block
????}
????val?block:?ByteBuffer?=?getLocal.orElse(getRemote).getOrElse(
??????throw?new?SparkException(s"Failed?to?get?$pieceId?of?$broadcastId"))
????blocks(pid)?=?block
??}
??blocks
}

總結(jié)

以上是生活随笔為你收集整理的spark中的广播变量broadcast的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。