日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

spark streaming 的 Job创建、调度、提交

發(fā)布時(shí)間:2023/12/9 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark streaming 的 Job创建、调度、提交 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

上文已經(jīng)從源碼分析了Receiver接收的數(shù)據(jù)交由BlockManager管理,整個(gè)數(shù)據(jù)接收流都已經(jīng)運(yùn)轉(zhuǎn)起來(lái)了,那么讓我們回到分析JobScheduler的博客中。

// JobScheduler.scala line 62def start(): Unit = synchronized {if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler")eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)}eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {inputDStream <- ssc.graph.getInputStreamsrateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started JobScheduler")}

前面好幾篇博客都是 由?receiverTracker.start() 延展開。延展完畢后,繼續(xù)下一步。

// JobScheduler.scala line 83 jobGenerator.start()

jobGenerator的實(shí)例化過(guò)程,前面已經(jīng)分析過(guò)。深入下源碼了解到。

  • 實(shí)例化eventLoop,此處的eventLoop與JobScheduler中的eventLoop不一樣,對(duì)應(yīng)的是不同的泛型。
  • EventLoop.start
  • 首次啟動(dòng),startFirstTime
  • // JobGenerator.scala line 78/** Start generation of jobs */def start(): Unit = synchronized {if (eventLoop != null) return // generator has already been started// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.// See SPARK-10125checkpointWritereventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = {jobScheduler.reportError("Error in job generator", e)}}eventLoop.start()if (ssc.isCheckpointPresent) {restart()} else {startFirstTime()}} // JobGenerator.scala line 189/** Starts the generator for the first time */private def startFirstTime() {val startTime = new Time(timer.getStartTime())graph.start(startTime - graph.batchDuration)timer.start(startTime.milliseconds)logInfo("Started JobGenerator at " + startTime)}

    將DStreamGraph.start

  • 將所有的outputStreams都initialize,初始化首次執(zhí)行時(shí)間,依賴的DStream一并設(shè)置。
  • 如果設(shè)置了duration,將所有的outputStreams都remember,依賴的DStream一并設(shè)置
  • 啟動(dòng)前驗(yàn)證,主要是驗(yàn)證chechpoint設(shè)置是否沖突以及各種Duration
  • 將所有的inputStreams啟動(dòng);讀者掃描了下目前版本1.6.0InputDStraem及其所有的子類。start方法啥都沒(méi)做。結(jié)合之前的博客,inputStreams都已經(jīng)交由ReceiverTracker管理了。
  • // DStreamGraph.scala line 39def start(time: Time) {this.synchronized {require(zeroTime == null, "DStream graph computation already started")zeroTime = timestartTime = timeoutputStreams.foreach(_.initialize(zeroTime))outputStreams.foreach(_.remember(rememberDuration))outputStreams.foreach(_.validateAtStart)inputStreams.par.foreach(_.start())}}

    至此,只是做了一些簡(jiǎn)單的初始化,并沒(méi)有讓數(shù)據(jù)處理起來(lái)。

    再回到JobGenerator。此時(shí),將循環(huán)定時(shí)器啟動(dòng),

    // JobGenerator.scala line 193timer.start(startTime.milliseconds)

    循環(huán)定時(shí)器啟動(dòng);讀者是不是很熟悉,是不是在哪見(jiàn)過(guò)這個(gè)循環(huán)定時(shí)器?

    沒(méi)錯(cuò),就是BlockGenerator.scala line 105 、109?,兩個(gè)線程,其中一個(gè)是循環(huán)定時(shí)器,定時(shí)將數(shù)據(jù)放入待push隊(duì)列中。

    // RecurringTimer.scala line 59def start(startTime: Long): Long = synchronized {nextTime = startTimethread.start()logInfo("Started timer for " + name + " at time " + nextTime)nextTime}

    具體的邏輯是在構(gòu)造是傳入的方法:longTime => eventLoop.post(GenerateJobs(new Time(longTime)));

    輸入是Long,

    方法體是eventLoop.post(GenerateJobs(new Time(longTime)))

    // JobGenerator.scala line 58private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

    只要線程狀態(tài)不是stopped,一直循環(huán)。

  • 初始化的時(shí)候?qū)⑸厦娴姆椒▊鬟M(jìn)來(lái), ?callback: (Long) => Unit 對(duì)應(yīng)的就是 ?longTime => eventLoop.post(GenerateJobs(new Time(longTime)))
  • start的時(shí)候 thread.run啟動(dòng),里面的loop方法被執(zhí)行。
  • loop中調(diào)用的是?triggerActionForNextInterval。
  • triggerActionForNextInterval調(diào)用構(gòu)造傳入的callback,也就是上面的?longTime => eventLoop.post(GenerateJobs(new Time(longTime)))?
  • private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)extends Logging { // RecurringTimer.scala line 27private val thread = new Thread("RecurringTimer - " + name) {setDaemon(true)override def run() { loop }} // RecurringTimer.scala line 56/*** Start at the given start time.*/def start(startTime: Long): Long = synchronized {nextTime = startTimethread.start()logInfo("Started timer for " + name + " at time " + nextTime)nextTime} // RecurringTimer.scala line 92private def triggerActionForNextInterval(): Unit = {clock.waitTillTime(nextTime)callback(nextTime)prevTime = nextTimenextTime += periodlogDebug("Callback for " + name + " called at time " + prevTime)}// RecurringTimer.scala line 100/*** Repeatedly call the callback every interval.*/private def loop() {try {while (!stopped) {triggerActionForNextInterval()}triggerActionForNextInterval()} catch {case e: InterruptedException =>}} // ...一些代碼 }

    定時(shí)發(fā)送GenerateJobs 類型的事件消息,eventLoop.post中將事件消息加入到eventQueue中

    // EventLoop.scala line 102def post(event: E): Unit = {eventQueue.put(event)}

    同時(shí),此EventLoop中的另一個(gè)成員變量?eventThread。會(huì)一直從隊(duì)列中取事件消息,將此事件作為參數(shù)調(diào)用onReceive。而此onReceive在實(shí)例化時(shí)被override了。

    // JobGenerator.scala line 86eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = {jobScheduler.reportError("Error in job generator", e)}}eventLoop.start()

    onReceive調(diào)用的是

    // JobGenerator.scala line 177/** Processes all events */private def processEvent(event: JobGeneratorEvent) {logDebug("Got event " + event)event match {case GenerateJobs(time) => generateJobs(time)// 其他case class}}

    GenerateJobs case class 是匹配到 generateJobs(time:Time) 來(lái)處理

  • 獲取當(dāng)前時(shí)間批次ReceiverTracker收集到的所有的Blocks,若開啟WAL會(huì)執(zhí)行WAL
  • DStreamGraph生產(chǎn)任務(wù)
  • 提交任務(wù)
  • 若設(shè)置checkpoint,則checkpoint
  • // JobGenerator.scala line 240/** Generate jobs and perform checkpoint for the given `time`. */private def generateJobs(time: Time) {// Set the SparkEnv in this thread, so that job generation code can access the environment// Example: BlockRDDs are created in this thread, and it needs to access BlockManager// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set(ssc.env)Try {jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block} match {case Success(jobs) =>val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) =>jobScheduler.reportError("Error generating jobs for time " + time, e)}eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}

    上述代碼不是特別容易理解。細(xì)細(xì)拆分:咋一看以為是try{} catch{case ...?},仔細(xì)一看,是Try{}match{}

    追蹤下代碼,原來(lái)Try是大寫的,是一個(gè)伴生對(duì)象,apply接收的參數(shù)是一個(gè)方法,返回Try的實(shí)例。在scala.util.Try.scala?代碼如下:

    // scala.util.Try.scala line 155 object Try {/** Constructs a `Try` using the by-name parameter. This* method will ensure any non-fatal exception is caught and a* `Failure` object is returned.*/def apply[T](r: => T): Try[T] =try Success(r) catch {case NonFatal(e) => Failure(e)}}

    Try有兩個(gè)子類,都是case class 。分別是Success和Failure。如圖。

    再返回調(diào)用處,Try中的代碼塊最后執(zhí)行的是?graph.generateJobs(time) 。跟蹤下:

    返回的是outputStream.generateJob(time)。

    // DStreamGraph.scala line 111def generateJobs(time: Time): Seq[Job] = {logDebug("Generating jobs for time " + time)val jobs = this.synchronized {outputStreams.flatMap { outputStream =>val jobOption = outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug("Generated " + jobs.length + " jobs for time " + time)jobs}

    從前文可知,outputStream其實(shí)都是ForEachDStream。進(jìn)入ForEachDStream,override了generateJob。

  • parent.getOrCompute(time) 返回一個(gè)Option[Job]。
  • 若有rdd,則返回可能是new Job(time,jobFunc)
  • // ForEachDStream.scala line 46override def generateJob(time: Time): Option[Job] = {parent.getOrCompute(time) match {case Some(rdd) =>val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {foreachFunc(rdd, time)}Some(new Job(time, jobFunc))case None => None}}

    那么ForEachDStream的parent是什么呢?看下我們的案例:

    import?org.apache.spark.SparkConf import?org.apache.spark.streaming.{Durations,?StreamingContext}object?StreamingWordCountSelfScala?{def?main(args:?Array[String])?{val?sparkConf?=?new?SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")val?ssc?=?new?StreamingContext(sparkConf,?Durations.seconds(5))?//?每5秒收割一次數(shù)據(jù)val?lines?=?ssc.socketTextStream("localhost",?9999)?//?監(jiān)聽(tīng)?本地9999?socket?端口val?words?=?lines.flatMap(_.split("?")).map((_,?1)).reduceByKey(_?+?_)?//?flat?map?后?reducewords.print()?//?打印結(jié)果ssc.start()?//?啟動(dòng)ssc.awaitTermination()ssc.stop(true)} }

    按照前文的描述:本例中?DStream的依賴是?SocketInputDStream <<?FlatMappedDStream <<?MappedDStream <<?ShuffledDStream <<?ForEachDStream

    筆者掃描了下DStream及其所有子類,發(fā)現(xiàn)只有DStream有?getOrCompute,沒(méi)有一個(gè)子類override了此方法。如此一來(lái),是ShuffledDStream.getorCompute

    在一般情況下,是RDD不存在,執(zhí)行orElse代碼快,

    // DStream.scala line 338/*** Get the RDD corresponding to the given time; either retrieve it from cache* or compute-and-cache it.*/private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {// If RDD was already generated, then retrieve it from HashMap,// or else compute the RDDgeneratedRDDs.get(time).orElse {// Compute the RDD if time is valid (e.g. correct time in a sliding window)// of RDD generation, else generate nothing.if (isTimeValid(time)) {val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {// Disable checks for existing output directories in jobs launched by the streaming// scheduler, since we may need to write output to an existing directory during checkpoint// recovery; see SPARK-4835 for more details. We need to have this call here because// compute() might cause Spark jobs to be launched.PairRDDFunctions.disableOutputSpecValidation.withValue(true) {compute(time) // line 352}}rddOption.foreach { case newRDD =>// Register the generated RDD for caching and checkpointingif (storageLevel != StorageLevel.NONE) {newRDD.persist(storageLevel)logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")}if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {newRDD.checkpoint()logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")}generatedRDDs.put(time, newRDD)}rddOption} else {None}}}

    ShuffledDStream.compute?

    又調(diào)用parent.getOrCompute

    // ShuffledDStream.scala line 40override def compute(validTime: Time): Option[RDD[(K, C)]] = {parent.getOrCompute(validTime) match {case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))case None => None}}

    MappedDStream的compute,又是父類的getOrCompute,結(jié)果又調(diào)用compute,如此循環(huán)。

    // MappedDStream.scala line 34override def compute(validTime: Time): Option[RDD[U]] = {parent.getOrCompute(validTime).map(_.map[U](mapFunc))}

    FlatMappedDStream的compute,又是父類的getOrCompute。結(jié)果又調(diào)用compute,如此循環(huán)。

    // FlatMappedDStream.scala line 34override def compute(validTime: Time): Option[RDD[U]] = {parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))}

    直到DStreamshi SocketInputDStream,也就是inputStream時(shí),compute是繼承自父類。

    先不考慮if中的邏輯,直接else代碼塊。

    進(jìn)入createBlockRDD

    // ReceiverInputDStream.scala line 69override def compute(validTime: Time): Option[RDD[T]] = {val blockRDD = {if (validTime < graph.startTime) {// If this is called for any time before the start time of the context,// then this returns an empty RDD. This may happen when recovering from a// driver failure without any write ahead log to recover pre-failure data.new BlockRDD[T](ssc.sc, Array.empty)} else {// Otherwise, ask the tracker for all the blocks that have been allocated to this stream// for this batchval receiverTracker = ssc.scheduler.receiverTrackerval blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)// Register the input blocks information into InputInfoTrackerval inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)// Create the BlockRDDcreateBlockRDD(validTime, blockInfos)}}Some(blockRDD)} new BlockRDD[T](ssc.sc, validBlockIds) line 127,RDD實(shí)例化成功 // ReceiverInputDStream.scala line 94private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {if (blockInfos.nonEmpty) {val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray// Are WAL record handles present with all the blocksval areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }if (areWALRecordHandlesPresent) {// If all the blocks have WAL record handle, then create a WALBackedBlockRDDval isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArrayval walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArraynew WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)} else {// Else, create a BlockRDD. However, if there are some blocks with WAL info but not// others then that is unexpected and log a warning accordingly.if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {logError("Some blocks do not have Write Ahead Log information; " +"this is unexpected and data may not be recoverable after driver failures")} else {logWarning("Some blocks have Write Ahead Log information; this is unexpected")}}val validBlockIds = blockIds.filter { id =>ssc.sparkContext.env.blockManager.master.contains(id)}if (validBlockIds.size != blockIds.size) {logWarning("Some blocks could not be recovered as they were not found in memory. " +"To prevent such data loss, enabled Write Ahead Log (see programming guide " +"for more details.")}new BlockRDD[T](ssc.sc, validBlockIds) // line 127}} else {// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD// according to the configurationif (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, Array.empty, Array.empty, Array.empty)} else {new BlockRDD[T](ssc.sc, Array.empty)}}}

    此BlockRDD是Spark Core的RDD的子類,且沒(méi)有依賴的RDD。至此,RDD的實(shí)例化已經(jīng)完成。

    // BlockRDD.scala line 30 private[spark] class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])extends RDD[T](sc, Nil) // RDd.scala line 74 abstract class RDD[T: ClassTag](@transient private var _sc: SparkContext,@transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging

    至此,最終還原回來(lái)的RDD:

    new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(flatMapFunc)).map(_.map[U](mapFunc)).combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)。

    在本例中則為

    new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true)

    而最終的print為

    () => foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true),time)

    其中foreachFunc為 DStrean.scala line 766

    至此,RDD已經(jīng)通過(guò)DStream實(shí)例化完成,現(xiàn)在再回顧下,是否可以理解DStream是RDD的模版。

    不過(guò)別急,回到ForEachDStream.scala line?46 ,將上述函數(shù)作為構(gòu)造參數(shù),傳入Job。

    ?

    -------------分割線--------------

    補(bǔ)充下Job創(chuàng)建的流程圖,來(lái)源于版本定制班學(xué)員博客,略有修改。

    ?

    ?

    補(bǔ)充下RDD按照l(shuí)ineage從?OutputDStream 回溯?創(chuàng)建RDD Dag的流程圖,來(lái)源于版本定制班學(xué)員博客

    ?

    ?

    補(bǔ)充案例中?RDD按照l(shuí)ineage從?OutputDStream 回溯?創(chuàng)建RDD Dag的流程圖,來(lái)源于版本定制班學(xué)員博客

    ?

    ?

    下節(jié)內(nèi)容從源碼分析Job提交,敬請(qǐng)期待。

    ?

    轉(zhuǎn)載于:https://my.oschina.net/corleone/blog/672999

    總結(jié)

    以上是生活随笔為你收集整理的spark streaming 的 Job创建、调度、提交的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。