通过案例对SparkStreaming透彻理解-3
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
本期內(nèi)容:
解密Spark Streaming Job架構(gòu)和運(yùn)行機(jī)制
解密Spark Streaming 容錯(cuò)架構(gòu)和運(yùn)行機(jī)制
?
一切不能進(jìn)行實(shí)時(shí)流處理的數(shù)據(jù)都是無效的數(shù)據(jù)。在流處理時(shí)代,SparkStreaming有著強(qiáng)大吸引力,而且發(fā)展前景廣闊,加之Spark的生態(tài)系統(tǒng),Streaming可以方便調(diào)用其他的諸如SQL,MLlib等強(qiáng)大框架,它必將一統(tǒng)天下。
Spark Streaming運(yùn)行時(shí)與其說是Spark Core上的一個(gè)流式處理框架,不如說是Spark Core上的一個(gè)最復(fù)雜的應(yīng)用程序。如果可以掌握Spark streaming這個(gè)復(fù)雜的應(yīng)用程序,那么其他的再?gòu)?fù)雜的應(yīng)用程序都不在話下了。這里選擇Spark Streaming作為版本定制的切入點(diǎn)也是大勢(shì)所趨。
????本節(jié)課通過從job和容錯(cuò)的整體架構(gòu)上來考察Spark Streaming的運(yùn)行機(jī)制。
用之前已有的最簡(jiǎn)單的例子:
//?Socket來源的單詞計(jì)數(shù) //?YY課堂:每天20:00現(xiàn)場(chǎng)授課頻道68917580 val?sparkConf?=?new?SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala") val?ssc?=?new?StreamingContext(sparkConf,?Durations.seconds(5)) val?lines?=?ssc.socketTextStream("localhost",?9999) val?words?=?lines.flatMap(_.split("?")).map((_,?1)).reduceByKey(_?+?_) words.print() ssc.start()?
跟蹤源碼可以發(fā)現(xiàn):
在初始化 StreamingContext時(shí),創(chuàng)建了如下幾個(gè)對(duì)象:
//?StreamingContext.scala?line?183 private[streaming]?val?scheduler?=?new?JobScheduler(this)?
而JobScheduler在初始化的時(shí)候,會(huì)初始化jobGenerator,且包含receiverTracker。
//?JobScheduler.scala?line?50 private?val?jobGenerator?=?new?JobGenerator(this)?//?line?50 val?clock?=?jobGenerator.clock val?listenerBus?=?new?StreamingListenerBus()//?These?two?are?created?only?when?scheduler?starts. //?eventLoop?not?being?null?means?the?scheduler?has?been?started?and?not?stopped var?receiverTracker:?ReceiverTracker?=?null?//?56?
再看創(chuàng)建DStream的部分
//?StreamingContext.scala?line?327 def?socketTextStream(hostname:?String,port:?Int,storageLevel:?StorageLevel?=?StorageLevel.MEMORY_AND_DISK_SER_2):?ReceiverInputDStream[String]?=?withNamedScope("socket?text?stream")?{socketStream[String](hostname,?port,?SocketReceiver.bytesToLines,?storageLevel) }//?StreamingContext.scala?line?345 def?socketStream[T:?ClassTag](hostname:?String,port:?Int,converter:?(InputStream)?=>?Iterator[T],storageLevel:?StorageLevel):?ReceiverInputDStream[T]?=?{new?SocketInputDStream[T](this,?hostname,?port,?converter,?storageLevel)?//?line?351 }?
?
//?SocketInputDStream.scala?line?33 private[streaming] class?SocketInputDStream[T:?ClassTag](ssc_?:?StreamingContext,host:?String,port:?Int,bytesToObjects:?InputStream?=>?Iterator[T],storageLevel:?StorageLevel)?extends?ReceiverInputDStream[T](ssc_)?{//?這個(gè)方法是關(guān)鍵def?getReceiver():?Receiver[T]?=?{new?SocketReceiver(host,?port,?bytesToObjects,?storageLevel)} }?
再看 ssc.start
//?StreamingContext.scala?line?594 def?start():?Unit?=?synchronized?{state?match?{case?INITIALIZED?=>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized?{StreamingContext.assertNoOtherContextIsActive()try?{validate()//?Start?the?streaming?scheduler?in?a?new?thread,?so?that?thread?local?properties//?like?call?sites?and?job?groups?can?be?reset?without?affecting?those?of?the//?current?thread.ThreadUtils.runInNewThread("streaming-start")?{sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false")scheduler.start()?//?line?610}state?=?StreamingContextState.ACTIVE}?catch?{case?NonFatal(e)?=>logError("Error?starting?the?context,?marking?it?as?stopped",?e)scheduler.stop(false)state?=?StreamingContextState.STOPPEDthrow?e}StreamingContext.setActiveContext(this)}shutdownHookRef?=?ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)//?Registering?Streaming?Metrics?at?the?start?of?the?StreamingContextassert(env.metricsSystem?!=?null)env.metricsSystem.registerSource(streamingSource)uiTab.foreach(_.attach())logInfo("StreamingContext?started")case?ACTIVE?=>logWarning("StreamingContext?has?already?been?started")case?STOPPED?=>throw?new?IllegalStateException("StreamingContext?has?already?been?stopped")} }?
第610行,調(diào)用了scheduler.start,scheduler就是之前初始化是產(chǎn)生的JobScheduler。
//?JobScheduler.scala?line?62 def?start():?Unit?=?synchronized?{if?(eventLoop?!=?null)?return?//?scheduler?has?already?been?startedlogDebug("Starting?JobScheduler")eventLoop?=?new?EventLoop[JobSchedulerEvent]("JobScheduler")?{override?protected?def?onReceive(event:?JobSchedulerEvent):?Unit?=?processEvent(event)override?protected?def?onError(e:?Throwable):?Unit?=?reportError("Error?in?job?scheduler",?e)}eventLoop.start()//?attach?rate?controllers?of?input?streams?to?receive?batch?completion?updatesfor?{inputDStream?<-?ssc.graph.getInputStreamsrateController?<-?inputDStream.rateController}?ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker?=?new?ReceiverTracker(ssc)?//?line?80inputInfoTracker?=?new?InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started?JobScheduler") }?
請(qǐng)看80行,將receiverTracker初始化:
//?ReceiverTracker.scala?line?101 private[streaming] class?ReceiverTracker(ssc:?StreamingContext,?skipReceiverLaunch:?Boolean?=?false)?extends?Logging?{private?val?receiverInputStreams?=?ssc.graph.getReceiverInputStreams()private?val?receiverInputStreamIds?=?receiverInputStreams.map?{?_.id?}private?val?receivedBlockTracker?=?new?ReceivedBlockTracker(ssc.sparkContext.conf,ssc.sparkContext.hadoopConfiguration,receiverInputStreamIds,ssc.scheduler.clock,ssc.isCheckpointPresent,Option(ssc.checkpointDir))?
調(diào)用receiverTracker.start和jobGenerator.star
//?ReceiverTracker.scala?line?148 /**?Start?the?endpoint?and?receiver?execution?thread.?*/ def?start():?Unit?=?synchronized?{if?(isTrackerStarted)?{throw?new?SparkException("ReceiverTracker?already?started")}if?(!receiverInputStreams.isEmpty)?{endpoint?=?ssc.env.rpcEnv.setupEndpoint("ReceiverTracker",?new?ReceiverTrackerEndpoint(ssc.env.rpcEnv))if?(!skipReceiverLaunch)?launchReceivers()?//?line?157logInfo("ReceiverTracker?started")trackerState?=?Started} }?
launchReceivers()
//?ReceiverTracker.scala?line?413 private?def?launchReceivers():?Unit?=?{val?receivers?=?receiverInputStreams.map(nis?=>?{val?rcvr?=?nis.getReceiver()?//?這個(gè)就是SocketInputDStream.getReceiver(),本例中是SocketReceiver?,見SocketInputDStream.scala?line?34rcvr.setReceiverId(nis.id)rcvr})runDummySparkJob()logInfo("Starting?"?+?receivers.length?+?"?receivers")endpoint.send(StartAllReceivers(receivers))?//?line?423 }?
看看StartAllReceivers是如何被消費(fèi)的?
//?ReceiverTracker.scala?line?448 //?Local?messages case?StartAllReceivers(receivers)?=>val?scheduledLocations?=?schedulingPolicy.scheduleReceivers(receivers,?getExecutors)?//?盡量負(fù)載均勻for?(receiver?<-?receivers)?{val?executors?=?scheduledLocations(receiver.streamId)updateReceiverScheduledExecutors(receiver.streamId,?executors)receiverPreferredLocations(receiver.streamId)?=?receiver.preferredLocationstartReceiver(receiver,?executors)?//?啟動(dòng)接收器,不再進(jìn)一步深究,有興趣的可以繼續(xù)查看源碼}?
再回到JobScheduler.scala line 83,jobGenerator.start
//?JobGenerator.scala?line?79 def?start():?Unit?=?synchronized?{if?(eventLoop?!=?null)?return?//?generator?has?already?been?started//?Call?checkpointWriter?here?to?initialize?it?before?eventLoop?uses?it?to?avoid?a?deadlock.//?See?SPARK-10125checkpointWritereventLoop?=?new?EventLoop[JobGeneratorEvent]("JobGenerator")?{override?protected?def?onReceive(event:?JobGeneratorEvent):?Unit?=?processEvent(event)override?protected?def?onError(e:?Throwable):?Unit?=?{jobScheduler.reportError("Error?in?job?generator",?e)}}eventLoop.start()if?(ssc.isCheckpointPresent)?{restart()}?else?{startFirstTime()} }?
至此消息接收和Job生成器已啟動(dòng)。
?
在StreamingContext調(diào)用start方法的內(nèi)部其實(shí)是會(huì)啟動(dòng)JobScheduler的Start方法,進(jìn)行消息循環(huán),在JobScheduler的start內(nèi)部會(huì)構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法
?
1.JobGenerator啟動(dòng)后會(huì)不斷的根據(jù)batchDuration生成一個(gè)個(gè)的Job
?
2.ReceiverTracker啟動(dòng)后首先在Spark Cluster中啟動(dòng)Receiver(其實(shí)是在Executor中先啟動(dòng)ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會(huì)通過ReceiverSupervisor存儲(chǔ)到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會(huì)通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息
?
每個(gè)BatchInterval會(huì)產(chǎn)生一個(gè)具體的Job,其實(shí)這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,從Java角度講,相當(dāng)于Runnable接口實(shí)例,此時(shí)要想運(yùn)行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個(gè)單獨(dú)的線程來提交Job到集群運(yùn)行(其實(shí)是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運(yùn)行)。
?
為什么使用線程池呢?
?
1.作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙;
?
? 2.有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個(gè)時(shí)候也需要多線程的支持。
?
第二部分:從容錯(cuò)架構(gòu)的角度透視Spark Streaming
?
我們知道DStream與RDD的關(guān)系就是隨著時(shí)間流逝不斷的產(chǎn)生RDD,對(duì)DStream的操作就是在固定時(shí)間上操作RDD。所以從某種意義上而言,Spark Streaming的基于DStream的容錯(cuò)機(jī)制,實(shí)際上就是劃分到每一次形成的RDD的容錯(cuò)機(jī)制,這也是Spark Streaming的高明之處。
?
RDD作為 分布式彈性數(shù)據(jù)集,它的彈性主要體現(xiàn)在:
?
1.自動(dòng)的分配內(nèi)存和硬盤,優(yōu)先基于內(nèi)存
?
2.基于lineage容錯(cuò)機(jī)制
?
3.task會(huì)指定次數(shù)的重試
?
4.stage失敗會(huì)自動(dòng)重試
?
5.checkpoint和persist 復(fù)用
?
6.數(shù)據(jù)調(diào)度彈性:DAG,TASK和資源管理無關(guān)。
?
7.數(shù)據(jù)分片的高度彈性
?
基于RDD的特性,它的容錯(cuò)機(jī)制主要就是兩種:一是checkpoint,二是基于lineage(血統(tǒng))的容錯(cuò)。一般而言,spark選擇血統(tǒng)容錯(cuò),因?yàn)閷?duì)于大規(guī)模的數(shù)據(jù)集,做檢查點(diǎn)的成本很高。但是有的情況下,不如說lineage鏈條過于復(fù)雜和冗長(zhǎng),這時(shí)候就需要做checkpoint。
?
考慮到RDD的依賴關(guān)系,每個(gè)stage內(nèi)部都是窄依賴,此時(shí)一般基于lineage容錯(cuò),方便高效。在stage之間,是寬依賴,產(chǎn)生了shuffle操作,這種情況下,做檢查點(diǎn)則更好。總結(jié)來說,stage內(nèi)部做lineage,stage之間做checkpoint。
后續(xù)的會(huì)有什么更深的內(nèi)幕?且聽下回分解。
轉(zhuǎn)載于:https://my.oschina.net/corleone/blog/669520
與50位技術(shù)專家面對(duì)面20年技術(shù)見證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的通过案例对SparkStreaming透彻理解-3的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Swift 总结使用问号(?)和感叹号
- 下一篇: view技术简单了解