通过案例对SparkStreaming透彻理解-3
2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
本期內(nèi)容:
解密Spark Streaming Job架構(gòu)和運行機制
解密Spark Streaming 容錯架構(gòu)和運行機制
?
一切不能進(jìn)行實時流處理的數(shù)據(jù)都是無效的數(shù)據(jù)。在流處理時代,SparkStreaming有著強大吸引力,而且發(fā)展前景廣闊,加之Spark的生態(tài)系統(tǒng),Streaming可以方便調(diào)用其他的諸如SQL,MLlib等強大框架,它必將一統(tǒng)天下。
Spark Streaming運行時與其說是Spark Core上的一個流式處理框架,不如說是Spark Core上的一個最復(fù)雜的應(yīng)用程序。如果可以掌握Spark streaming這個復(fù)雜的應(yīng)用程序,那么其他的再復(fù)雜的應(yīng)用程序都不在話下了。這里選擇Spark Streaming作為版本定制的切入點也是大勢所趨。
????本節(jié)課通過從job和容錯的整體架構(gòu)上來考察Spark Streaming的運行機制。
用之前已有的最簡單的例子:
//?Socket來源的單詞計數(shù) //?YY課堂:每天20:00現(xiàn)場授課頻道68917580 val?sparkConf?=?new?SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala") val?ssc?=?new?StreamingContext(sparkConf,?Durations.seconds(5)) val?lines?=?ssc.socketTextStream("localhost",?9999) val?words?=?lines.flatMap(_.split("?")).map((_,?1)).reduceByKey(_?+?_) words.print() ssc.start()?
跟蹤源碼可以發(fā)現(xiàn):
在初始化 StreamingContext時,創(chuàng)建了如下幾個對象:
//?StreamingContext.scala?line?183 private[streaming]?val?scheduler?=?new?JobScheduler(this)?
而JobScheduler在初始化的時候,會初始化jobGenerator,且包含receiverTracker。
//?JobScheduler.scala?line?50 private?val?jobGenerator?=?new?JobGenerator(this)?//?line?50 val?clock?=?jobGenerator.clock val?listenerBus?=?new?StreamingListenerBus()//?These?two?are?created?only?when?scheduler?starts. //?eventLoop?not?being?null?means?the?scheduler?has?been?started?and?not?stopped var?receiverTracker:?ReceiverTracker?=?null?//?56?
再看創(chuàng)建DStream的部分
//?StreamingContext.scala?line?327 def?socketTextStream(hostname:?String,port:?Int,storageLevel:?StorageLevel?=?StorageLevel.MEMORY_AND_DISK_SER_2):?ReceiverInputDStream[String]?=?withNamedScope("socket?text?stream")?{socketStream[String](hostname,?port,?SocketReceiver.bytesToLines,?storageLevel) }//?StreamingContext.scala?line?345 def?socketStream[T:?ClassTag](hostname:?String,port:?Int,converter:?(InputStream)?=>?Iterator[T],storageLevel:?StorageLevel):?ReceiverInputDStream[T]?=?{new?SocketInputDStream[T](this,?hostname,?port,?converter,?storageLevel)?//?line?351 }?
?
//?SocketInputDStream.scala?line?33 private[streaming] class?SocketInputDStream[T:?ClassTag](ssc_?:?StreamingContext,host:?String,port:?Int,bytesToObjects:?InputStream?=>?Iterator[T],storageLevel:?StorageLevel)?extends?ReceiverInputDStream[T](ssc_)?{//?這個方法是關(guān)鍵def?getReceiver():?Receiver[T]?=?{new?SocketReceiver(host,?port,?bytesToObjects,?storageLevel)} }?
再看 ssc.start
//?StreamingContext.scala?line?594 def?start():?Unit?=?synchronized?{state?match?{case?INITIALIZED?=>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized?{StreamingContext.assertNoOtherContextIsActive()try?{validate()//?Start?the?streaming?scheduler?in?a?new?thread,?so?that?thread?local?properties//?like?call?sites?and?job?groups?can?be?reset?without?affecting?those?of?the//?current?thread.ThreadUtils.runInNewThread("streaming-start")?{sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false")scheduler.start()?//?line?610}state?=?StreamingContextState.ACTIVE}?catch?{case?NonFatal(e)?=>logError("Error?starting?the?context,?marking?it?as?stopped",?e)scheduler.stop(false)state?=?StreamingContextState.STOPPEDthrow?e}StreamingContext.setActiveContext(this)}shutdownHookRef?=?ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)//?Registering?Streaming?Metrics?at?the?start?of?the?StreamingContextassert(env.metricsSystem?!=?null)env.metricsSystem.registerSource(streamingSource)uiTab.foreach(_.attach())logInfo("StreamingContext?started")case?ACTIVE?=>logWarning("StreamingContext?has?already?been?started")case?STOPPED?=>throw?new?IllegalStateException("StreamingContext?has?already?been?stopped")} }?
第610行,調(diào)用了scheduler.start,scheduler就是之前初始化是產(chǎn)生的JobScheduler。
//?JobScheduler.scala?line?62 def?start():?Unit?=?synchronized?{if?(eventLoop?!=?null)?return?//?scheduler?has?already?been?startedlogDebug("Starting?JobScheduler")eventLoop?=?new?EventLoop[JobSchedulerEvent]("JobScheduler")?{override?protected?def?onReceive(event:?JobSchedulerEvent):?Unit?=?processEvent(event)override?protected?def?onError(e:?Throwable):?Unit?=?reportError("Error?in?job?scheduler",?e)}eventLoop.start()//?attach?rate?controllers?of?input?streams?to?receive?batch?completion?updatesfor?{inputDStream?<-?ssc.graph.getInputStreamsrateController?<-?inputDStream.rateController}?ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker?=?new?ReceiverTracker(ssc)?//?line?80inputInfoTracker?=?new?InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started?JobScheduler") }?
請看80行,將receiverTracker初始化:
//?ReceiverTracker.scala?line?101 private[streaming] class?ReceiverTracker(ssc:?StreamingContext,?skipReceiverLaunch:?Boolean?=?false)?extends?Logging?{private?val?receiverInputStreams?=?ssc.graph.getReceiverInputStreams()private?val?receiverInputStreamIds?=?receiverInputStreams.map?{?_.id?}private?val?receivedBlockTracker?=?new?ReceivedBlockTracker(ssc.sparkContext.conf,ssc.sparkContext.hadoopConfiguration,receiverInputStreamIds,ssc.scheduler.clock,ssc.isCheckpointPresent,Option(ssc.checkpointDir))?
調(diào)用receiverTracker.start和jobGenerator.star
//?ReceiverTracker.scala?line?148 /**?Start?the?endpoint?and?receiver?execution?thread.?*/ def?start():?Unit?=?synchronized?{if?(isTrackerStarted)?{throw?new?SparkException("ReceiverTracker?already?started")}if?(!receiverInputStreams.isEmpty)?{endpoint?=?ssc.env.rpcEnv.setupEndpoint("ReceiverTracker",?new?ReceiverTrackerEndpoint(ssc.env.rpcEnv))if?(!skipReceiverLaunch)?launchReceivers()?//?line?157logInfo("ReceiverTracker?started")trackerState?=?Started} }?
launchReceivers()
//?ReceiverTracker.scala?line?413 private?def?launchReceivers():?Unit?=?{val?receivers?=?receiverInputStreams.map(nis?=>?{val?rcvr?=?nis.getReceiver()?//?這個就是SocketInputDStream.getReceiver(),本例中是SocketReceiver?,見SocketInputDStream.scala?line?34rcvr.setReceiverId(nis.id)rcvr})runDummySparkJob()logInfo("Starting?"?+?receivers.length?+?"?receivers")endpoint.send(StartAllReceivers(receivers))?//?line?423 }?
看看StartAllReceivers是如何被消費的?
//?ReceiverTracker.scala?line?448 //?Local?messages case?StartAllReceivers(receivers)?=>val?scheduledLocations?=?schedulingPolicy.scheduleReceivers(receivers,?getExecutors)?//?盡量負(fù)載均勻for?(receiver?<-?receivers)?{val?executors?=?scheduledLocations(receiver.streamId)updateReceiverScheduledExecutors(receiver.streamId,?executors)receiverPreferredLocations(receiver.streamId)?=?receiver.preferredLocationstartReceiver(receiver,?executors)?//?啟動接收器,不再進(jìn)一步深究,有興趣的可以繼續(xù)查看源碼}?
再回到JobScheduler.scala line 83,jobGenerator.start
//?JobGenerator.scala?line?79 def?start():?Unit?=?synchronized?{if?(eventLoop?!=?null)?return?//?generator?has?already?been?started//?Call?checkpointWriter?here?to?initialize?it?before?eventLoop?uses?it?to?avoid?a?deadlock.//?See?SPARK-10125checkpointWritereventLoop?=?new?EventLoop[JobGeneratorEvent]("JobGenerator")?{override?protected?def?onReceive(event:?JobGeneratorEvent):?Unit?=?processEvent(event)override?protected?def?onError(e:?Throwable):?Unit?=?{jobScheduler.reportError("Error?in?job?generator",?e)}}eventLoop.start()if?(ssc.isCheckpointPresent)?{restart()}?else?{startFirstTime()} }?
至此消息接收和Job生成器已啟動。
?
在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進(jìn)行消息循環(huán),在JobScheduler的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法
?
1.JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job
?
2.ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息
?
每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,從Java角度講,相當(dāng)于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行)。
?
為什么使用線程池呢?
?
1.作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙;
?
? 2.有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持。
?
第二部分:從容錯架構(gòu)的角度透視Spark Streaming
?
我們知道DStream與RDD的關(guān)系就是隨著時間流逝不斷的產(chǎn)生RDD,對DStream的操作就是在固定時間上操作RDD。所以從某種意義上而言,Spark Streaming的基于DStream的容錯機制,實際上就是劃分到每一次形成的RDD的容錯機制,這也是Spark Streaming的高明之處。
?
RDD作為 分布式彈性數(shù)據(jù)集,它的彈性主要體現(xiàn)在:
?
1.自動的分配內(nèi)存和硬盤,優(yōu)先基于內(nèi)存
?
2.基于lineage容錯機制
?
3.task會指定次數(shù)的重試
?
4.stage失敗會自動重試
?
5.checkpoint和persist 復(fù)用
?
6.數(shù)據(jù)調(diào)度彈性:DAG,TASK和資源管理無關(guān)。
?
7.數(shù)據(jù)分片的高度彈性
?
基于RDD的特性,它的容錯機制主要就是兩種:一是checkpoint,二是基于lineage(血統(tǒng))的容錯。一般而言,spark選擇血統(tǒng)容錯,因為對于大規(guī)模的數(shù)據(jù)集,做檢查點的成本很高。但是有的情況下,不如說lineage鏈條過于復(fù)雜和冗長,這時候就需要做checkpoint。
?
考慮到RDD的依賴關(guān)系,每個stage內(nèi)部都是窄依賴,此時一般基于lineage容錯,方便高效。在stage之間,是寬依賴,產(chǎn)生了shuffle操作,這種情況下,做檢查點則更好??偨Y(jié)來說,stage內(nèi)部做lineage,stage之間做checkpoint。
后續(xù)的會有什么更深的內(nèi)幕?且聽下回分解。
轉(zhuǎn)載于:https://my.oschina.net/corleone/blog/669520
與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的通过案例对SparkStreaming透彻理解-3的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Swift 总结使用问号(?)和感叹号
- 下一篇: linux下mysql中文乱码