日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

通过案例对SparkStreaming透彻理解-3

發(fā)布時間:2024/4/17 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 通过案例对SparkStreaming透彻理解-3 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

本期內(nèi)容:

  • 解密Spark Streaming Job架構(gòu)和運行機制

  • 解密Spark Streaming 容錯架構(gòu)和運行機制

  • ?

      一切不能進(jìn)行實時流處理的數(shù)據(jù)都是無效的數(shù)據(jù)。在流處理時代,SparkStreaming有著強大吸引力,而且發(fā)展前景廣闊,加之Spark的生態(tài)系統(tǒng),Streaming可以方便調(diào)用其他的諸如SQL,MLlib等強大框架,它必將一統(tǒng)天下。

      Spark Streaming運行時與其說是Spark Core上的一個流式處理框架,不如說是Spark Core上的一個最復(fù)雜的應(yīng)用程序。如果可以掌握Spark streaming這個復(fù)雜的應(yīng)用程序,那么其他的再復(fù)雜的應(yīng)用程序都不在話下了。這里選擇Spark Streaming作為版本定制的切入點也是大勢所趨。

    ????本節(jié)課通過從job和容錯的整體架構(gòu)上來考察Spark Streaming的運行機制。

    用之前已有的最簡單的例子:

    //?Socket來源的單詞計數(shù) //?YY課堂:每天20:00現(xiàn)場授課頻道68917580 val?sparkConf?=?new?SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala") val?ssc?=?new?StreamingContext(sparkConf,?Durations.seconds(5)) val?lines?=?ssc.socketTextStream("localhost",?9999) val?words?=?lines.flatMap(_.split("?")).map((_,?1)).reduceByKey(_?+?_) words.print() ssc.start()

    ?

    跟蹤源碼可以發(fā)現(xiàn):

    在初始化 StreamingContext時,創(chuàng)建了如下幾個對象:

    //?StreamingContext.scala?line?183 private[streaming]?val?scheduler?=?new?JobScheduler(this)

    ?

    而JobScheduler在初始化的時候,會初始化jobGenerator,且包含receiverTracker。

    //?JobScheduler.scala?line?50 private?val?jobGenerator?=?new?JobGenerator(this)?//?line?50 val?clock?=?jobGenerator.clock val?listenerBus?=?new?StreamingListenerBus()//?These?two?are?created?only?when?scheduler?starts. //?eventLoop?not?being?null?means?the?scheduler?has?been?started?and?not?stopped var?receiverTracker:?ReceiverTracker?=?null?//?56

    ?

    再看創(chuàng)建DStream的部分

    //?StreamingContext.scala?line?327 def?socketTextStream(hostname:?String,port:?Int,storageLevel:?StorageLevel?=?StorageLevel.MEMORY_AND_DISK_SER_2):?ReceiverInputDStream[String]?=?withNamedScope("socket?text?stream")?{socketStream[String](hostname,?port,?SocketReceiver.bytesToLines,?storageLevel) }//?StreamingContext.scala?line?345 def?socketStream[T:?ClassTag](hostname:?String,port:?Int,converter:?(InputStream)?=>?Iterator[T],storageLevel:?StorageLevel):?ReceiverInputDStream[T]?=?{new?SocketInputDStream[T](this,?hostname,?port,?converter,?storageLevel)?//?line?351 }

    ?

    ?

    //?SocketInputDStream.scala?line?33 private[streaming] class?SocketInputDStream[T:?ClassTag](ssc_?:?StreamingContext,host:?String,port:?Int,bytesToObjects:?InputStream?=>?Iterator[T],storageLevel:?StorageLevel)?extends?ReceiverInputDStream[T](ssc_)?{//?這個方法是關(guān)鍵def?getReceiver():?Receiver[T]?=?{new?SocketReceiver(host,?port,?bytesToObjects,?storageLevel)} }

    ?

    再看 ssc.start

    //?StreamingContext.scala?line?594 def?start():?Unit?=?synchronized?{state?match?{case?INITIALIZED?=>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized?{StreamingContext.assertNoOtherContextIsActive()try?{validate()//?Start?the?streaming?scheduler?in?a?new?thread,?so?that?thread?local?properties//?like?call?sites?and?job?groups?can?be?reset?without?affecting?those?of?the//?current?thread.ThreadUtils.runInNewThread("streaming-start")?{sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false")scheduler.start()?//?line?610}state?=?StreamingContextState.ACTIVE}?catch?{case?NonFatal(e)?=>logError("Error?starting?the?context,?marking?it?as?stopped",?e)scheduler.stop(false)state?=?StreamingContextState.STOPPEDthrow?e}StreamingContext.setActiveContext(this)}shutdownHookRef?=?ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)//?Registering?Streaming?Metrics?at?the?start?of?the?StreamingContextassert(env.metricsSystem?!=?null)env.metricsSystem.registerSource(streamingSource)uiTab.foreach(_.attach())logInfo("StreamingContext?started")case?ACTIVE?=>logWarning("StreamingContext?has?already?been?started")case?STOPPED?=>throw?new?IllegalStateException("StreamingContext?has?already?been?stopped")} }

    ?

    第610行,調(diào)用了scheduler.start,scheduler就是之前初始化是產(chǎn)生的JobScheduler。

    //?JobScheduler.scala?line?62 def?start():?Unit?=?synchronized?{if?(eventLoop?!=?null)?return?//?scheduler?has?already?been?startedlogDebug("Starting?JobScheduler")eventLoop?=?new?EventLoop[JobSchedulerEvent]("JobScheduler")?{override?protected?def?onReceive(event:?JobSchedulerEvent):?Unit?=?processEvent(event)override?protected?def?onError(e:?Throwable):?Unit?=?reportError("Error?in?job?scheduler",?e)}eventLoop.start()//?attach?rate?controllers?of?input?streams?to?receive?batch?completion?updatesfor?{inputDStream?<-?ssc.graph.getInputStreamsrateController?<-?inputDStream.rateController}?ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker?=?new?ReceiverTracker(ssc)?//?line?80inputInfoTracker?=?new?InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started?JobScheduler") }

    ?

    請看80行,將receiverTracker初始化:

    //?ReceiverTracker.scala?line?101 private[streaming] class?ReceiverTracker(ssc:?StreamingContext,?skipReceiverLaunch:?Boolean?=?false)?extends?Logging?{private?val?receiverInputStreams?=?ssc.graph.getReceiverInputStreams()private?val?receiverInputStreamIds?=?receiverInputStreams.map?{?_.id?}private?val?receivedBlockTracker?=?new?ReceivedBlockTracker(ssc.sparkContext.conf,ssc.sparkContext.hadoopConfiguration,receiverInputStreamIds,ssc.scheduler.clock,ssc.isCheckpointPresent,Option(ssc.checkpointDir))

    ?

    調(diào)用receiverTracker.start和jobGenerator.star

    //?ReceiverTracker.scala?line?148 /**?Start?the?endpoint?and?receiver?execution?thread.?*/ def?start():?Unit?=?synchronized?{if?(isTrackerStarted)?{throw?new?SparkException("ReceiverTracker?already?started")}if?(!receiverInputStreams.isEmpty)?{endpoint?=?ssc.env.rpcEnv.setupEndpoint("ReceiverTracker",?new?ReceiverTrackerEndpoint(ssc.env.rpcEnv))if?(!skipReceiverLaunch)?launchReceivers()?//?line?157logInfo("ReceiverTracker?started")trackerState?=?Started} }

    ?

    launchReceivers()

    //?ReceiverTracker.scala?line?413 private?def?launchReceivers():?Unit?=?{val?receivers?=?receiverInputStreams.map(nis?=>?{val?rcvr?=?nis.getReceiver()?//?這個就是SocketInputDStream.getReceiver(),本例中是SocketReceiver?,見SocketInputDStream.scala?line?34rcvr.setReceiverId(nis.id)rcvr})runDummySparkJob()logInfo("Starting?"?+?receivers.length?+?"?receivers")endpoint.send(StartAllReceivers(receivers))?//?line?423 }

    ?

    看看StartAllReceivers是如何被消費的?

    //?ReceiverTracker.scala?line?448 //?Local?messages case?StartAllReceivers(receivers)?=>val?scheduledLocations?=?schedulingPolicy.scheduleReceivers(receivers,?getExecutors)?//?盡量負(fù)載均勻for?(receiver?<-?receivers)?{val?executors?=?scheduledLocations(receiver.streamId)updateReceiverScheduledExecutors(receiver.streamId,?executors)receiverPreferredLocations(receiver.streamId)?=?receiver.preferredLocationstartReceiver(receiver,?executors)?//?啟動接收器,不再進(jìn)一步深究,有興趣的可以繼續(xù)查看源碼}

    ?

    再回到JobScheduler.scala line 83,jobGenerator.start

    //?JobGenerator.scala?line?79 def?start():?Unit?=?synchronized?{if?(eventLoop?!=?null)?return?//?generator?has?already?been?started//?Call?checkpointWriter?here?to?initialize?it?before?eventLoop?uses?it?to?avoid?a?deadlock.//?See?SPARK-10125checkpointWritereventLoop?=?new?EventLoop[JobGeneratorEvent]("JobGenerator")?{override?protected?def?onReceive(event:?JobGeneratorEvent):?Unit?=?processEvent(event)override?protected?def?onError(e:?Throwable):?Unit?=?{jobScheduler.reportError("Error?in?job?generator",?e)}}eventLoop.start()if?(ssc.isCheckpointPresent)?{restart()}?else?{startFirstTime()} }

    ?

    至此消息接收和Job生成器已啟動。

    ?

    在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進(jìn)行消息循環(huán),在JobScheduler的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法

    ?

      1.JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job

    ?

      2.ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息

    ?

      每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,從Java角度講,相當(dāng)于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行)。

    ?

      為什么使用線程池呢?

    ?

       1.作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙;

    ?

    ?  2.有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持。

    ?

      第二部分:從容錯架構(gòu)的角度透視Spark Streaming

    ?

      我們知道DStream與RDD的關(guān)系就是隨著時間流逝不斷的產(chǎn)生RDD,對DStream的操作就是在固定時間上操作RDD。所以從某種意義上而言,Spark Streaming的基于DStream的容錯機制,實際上就是劃分到每一次形成的RDD的容錯機制,這也是Spark Streaming的高明之處。

    ?

      RDD作為 分布式彈性數(shù)據(jù)集,它的彈性主要體現(xiàn)在:

    ?

      1.自動的分配內(nèi)存和硬盤,優(yōu)先基于內(nèi)存

    ?

      2.基于lineage容錯機制

    ?

      3.task會指定次數(shù)的重試

    ?

      4.stage失敗會自動重試

    ?

      5.checkpoint和persist 復(fù)用

    ?

      6.數(shù)據(jù)調(diào)度彈性:DAG,TASK和資源管理無關(guān)。

    ?

      7.數(shù)據(jù)分片的高度彈性

    ?

      基于RDD的特性,它的容錯機制主要就是兩種:一是checkpoint,二是基于lineage(血統(tǒng))的容錯。一般而言,spark選擇血統(tǒng)容錯,因為對于大規(guī)模的數(shù)據(jù)集,做檢查點的成本很高。但是有的情況下,不如說lineage鏈條過于復(fù)雜和冗長,這時候就需要做checkpoint。

    ?

      考慮到RDD的依賴關(guān)系,每個stage內(nèi)部都是窄依賴,此時一般基于lineage容錯,方便高效。在stage之間,是寬依賴,產(chǎn)生了shuffle操作,這種情況下,做檢查點則更好??偨Y(jié)來說,stage內(nèi)部做lineage,stage之間做checkpoint。

    后續(xù)的會有什么更深的內(nèi)幕?且聽下回分解。

    轉(zhuǎn)載于:https://my.oschina.net/corleone/blog/669520

    與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖

    總結(jié)

    以上是生活随笔為你收集整理的通过案例对SparkStreaming透彻理解-3的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。