日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark DAGScheduler、TaskSchedule、Executor执行task源码分析

發(fā)布時(shí)間:2025/3/8 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark DAGScheduler、TaskSchedule、Executor执行task源码分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

摘要

spark的調(diào)度一直是我想搞清楚的東西,以及有向無環(huán)圖的生成過程、task的調(diào)度、rdd的延遲執(zhí)行是怎么發(fā)生的和如何完成的,還要就是RDD的compute都是在executor的哪個(gè)階段調(diào)用和執(zhí)行我們定義的函數(shù)的。這些都非常的基礎(chǔ)和困難。花一段時(shí)間終于弄白了其中的奧秘。總結(jié)起來,以便以后繼續(xù)完善。spark的調(diào)度分為兩級(jí)調(diào)度:DAGSchedule和TaskSchedule。DAGSchedule是根據(jù)job來生成相互依賴的stages,然后把stages以TaskSet形式傳遞給TaskSchedule來進(jìn)行任務(wù)的分發(fā)過程,里面的細(xì)節(jié)會(huì)慢慢的講解出來的,比較長(zhǎng)。

本文目錄

1、spark的RDD邏輯執(zhí)行鏈
2、spark的job的劃分、stage的劃分
3、spark的DAGScheduler的調(diào)度
4、spark的TaskSchedule的調(diào)度
5、executor如何執(zhí)行task以及我們定義的函數(shù)

spark的RDD的邏輯執(zhí)行鏈

都說spark進(jìn)行延遲執(zhí)行,通過RDD的DAG來生成相應(yīng)的Stage等,RDD的DAG的形成過程,是通過依賴來完成的,每一個(gè)RDD通過轉(zhuǎn)換算子的時(shí)候都會(huì)生成一個(gè)和多個(gè)子RDD,在通過轉(zhuǎn)換算子的時(shí)候,在創(chuàng)建一個(gè)新的RDD的時(shí)候,也會(huì)創(chuàng)建他們之間的依賴關(guān)系。因此他們是通過Dependencies連接起來的,RDD的依賴不是我們的重點(diǎn),如果想了解RDD的依賴,可以自行g(shù)oogle,RDD的依賴分為:1:1的OneToOneDependency,m:1的RangeDependency,還有m:n的ShuffleDependencies,其中OneToOneDependency和RangeDependency又被稱為NarrowDependency,這里的1:1,m:1,m:n的粒度是對(duì)于RDD的分區(qū)而言的。

依賴中最根本的是保留了父RDD,其rdd的方法就是返回父RDD的方法。這樣其就形成了一個(gè)鏈表形式的結(jié)構(gòu),通過最后面的RDD根據(jù)依賴,可以向前回溯到所有的父類RDD。
我們以map為例,來看一下依賴是如何產(chǎn)生的。

通過map其實(shí)其實(shí)創(chuàng)建了一個(gè)MapPartitonsRDD的RDD

然后我們看一下MapPartitonsRDD的主構(gòu)造函數(shù),其又對(duì)RDD進(jìn)行了賦值,其中父RDD就是上面的this對(duì)象指定的RDD,我們?cè)倏匆幌翿DD這個(gè)類的構(gòu)造函數(shù):

其又調(diào)用了RDD的主構(gòu)造函數(shù)

其實(shí)依賴都是在RDD的構(gòu)造函數(shù)中形成的。
通過上面的依賴轉(zhuǎn)換就形成了RDD額DAG圖
生成了一個(gè)RDD的DAG圖:

spark的job的劃分、stage的劃分
spark的Application劃分job其實(shí)挺簡(jiǎn)單的,一個(gè)Application劃分為幾個(gè)job,我們就要看這個(gè)Application中有多少個(gè)Action算子,一個(gè)Action算子對(duì)應(yīng)一個(gè)job,這個(gè)可以通過源碼來看出來,轉(zhuǎn)換算子是形成一個(gè)或者多個(gè)RDD,而Action算子是觸發(fā)job的提交。
比如上面的map轉(zhuǎn)換算子就是這樣的

而Action算子是這樣的:

通過runJob方法提交作業(yè)。stage的劃分是根據(jù)是否進(jìn)行shuflle過程來決定的,這個(gè)后面會(huì)細(xì)說。

spark的DAGScheduler的調(diào)度

當(dāng)我們通過客戶端,向spark集群提交作業(yè)時(shí),如果利用的資源管理器是yarn,那么客戶端向spark提交申請(qǐng)運(yùn)行driver進(jìn)程的機(jī)器,driver其實(shí)在spark中是沒有具體的類的,driver機(jī)器主要是用來運(yùn)行用戶編寫的代碼的地方,完成DAGScheduler和TaskSchedule,追蹤task運(yùn)行的狀態(tài)。記住,用戶編寫的主函數(shù)是在driver中運(yùn)行的,但是RDD轉(zhuǎn)換和執(zhí)行是在不同的機(jī)器上完成。其實(shí)driver主要負(fù)責(zé)作業(yè)的調(diào)度和分發(fā)。Action算子到stage的劃分和DAGScheduler的完成過程。
當(dāng)我們?cè)赿river進(jìn)程中運(yùn)行用戶定義的main函數(shù)的時(shí)候,首先會(huì)創(chuàng)建SparkContext對(duì)象,這個(gè)是我們與spark集群進(jìn)行交互的入口它會(huì)初始化很多運(yùn)行需要的環(huán)境,最主要的是初始化了DAGScheduler和TaskSchedule。

我們以這樣的的一個(gè)RDD的邏輯執(zhí)行圖來分析整個(gè)DAGScheduler的過程。

因?yàn)镈AGScheduler發(fā)生在driver進(jìn)程中,我們就沖Driver進(jìn)程運(yùn)行用戶定義的main函數(shù)開始。在上圖中RDD9是最后一個(gè)RDD并且其調(diào)用了Action算子,就會(huì)觸發(fā)作業(yè)的提交,其會(huì)調(diào)用SparkContext的runjob函數(shù),其經(jīng)過一系列的runJob的封裝,會(huì)調(diào)用DAGScheduler的runJob

在SparkContext中存在著runJob方法

----------------------------------------------

def runJob[T, U: ClassTag](
rdd: RDD[T], // rdd為上面提到的RDD邏輯執(zhí)行圖中的RDD9
func: (TaskContext, Iterator[T]) => U,這個(gè)方法也是RDD9調(diào)用Action算子傳入的函數(shù)
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

----------------------------------------------

DAGScheduler的runJob

----------------------------------------------

def runJob[T, U](
rdd: RDD[T], //RDD9
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//在這里會(huì)生成一個(gè)job的守護(hù)進(jìn)程waiter,用來等待作業(yè)提交執(zhí)行是否完成,其又調(diào)用了submitJob,其以下的代
//碼都是用來處運(yùn)行結(jié)果的一些log日志信息
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}

----------------------------------------------

submitJob的源代碼

----------------------------------------------

def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 檢查RDD的分區(qū)是否合法
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
//這一塊是把我們的job繼續(xù)進(jìn)行封裝到JobSubmitted,然后放入到一個(gè)進(jìn)程中池里,spark會(huì)啟動(dòng)一個(gè)線程來處理我
//們提交的作業(yè)
val func2 = func.asInstanceOf[(TaskContext, Iterator[]) => ]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}

----------------------------------------------

在DAGScheduler類中有一個(gè)DAGSchedulerEventProcessLoop的類,用來接收處理DAGScheduler的消息事件

JobSubmitted對(duì)象,因此會(huì)執(zhí)行第一個(gè)操作handleJobSubmitted,在這里我們要說一下,Stage的類型,在spark中有兩種類型的stage一種是ShuffleMapStage,和ResultStage,最后一個(gè)RDD對(duì)應(yīng)的Stage是ResultStage,遇到Shuffle過程的RDD被稱為ShuffleMapStage。

----------------------------------------------

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[],//對(duì)應(yīng)RDD9
func: (TaskContext, Iterator[]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 先創(chuàng)建ResultStage。
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}

----------------------------------------------

上面的createResultStage其實(shí)就是RDD轉(zhuǎn)換為Stage的過程,方法如下

----------------------------------------------

/*
創(chuàng)建ResultStage的時(shí)候,它會(huì)調(diào)用相關(guān)函數(shù)
*/
private def createResultStage(
rdd: RDD[], //對(duì)應(yīng)上圖的RDD9
func: (TaskContext, Iterator[]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

/**

  • 形成ResultStage依賴的父Stage
    */
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
    }
    /**
  • 采用的是深度優(yōu)先遍歷找到Action算子的父依賴中的寬依賴
  • 這個(gè)是最主要的方法,要看懂這個(gè)方法,其實(shí)后面的就好理解,最好結(jié)合這例子上面給出的RDD邏輯依賴圖,比*
  • 較容易看出來,根據(jù)上面的RDD邏輯依賴圖,其返回的ShuffleDependency就是RDD2和RDD1,RDD7和RDD6的依
    賴,如果存在A<-B<-C,這兩個(gè)都是shuffle依賴,那么對(duì)于C其只返回B的shuffle依賴,而不會(huì)返回A
    /
    private[scheduler] def getShuffleDependencies(
    rdd: RDD[]): HashSet[ShuffleDependency[, , ]] = {
    //用來存放依賴
    val parents = new HashSet[ShuffleDependency[, , ]]
    //遍歷過的RDD放入這個(gè)里面
    val visited = new HashSet[RDD[]]
    //創(chuàng)建一個(gè)待遍歷RDD的棧結(jié)構(gòu)
    val waitingForVisit = new ArrayStack[RDD[]]
    //壓入finalRDD,邏輯圖中的RDD9
    waitingForVisit.push(rdd)
    //循環(huán)遍歷這個(gè)棧結(jié)構(gòu)
    while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    // 如果RDD沒有被遍歷過執(zhí)行其中的代碼
    if (!visited(toVisit)) {
    //然后把其放入已經(jīng)遍歷隊(duì)列中
    visited += toVisit
    //得到依賴,我們知道依賴中存放的有父RDD的對(duì)象
    toVisit.dependencies.foreach {
    //如果這個(gè)依賴是shuffle依賴,則放入返回隊(duì)列中
    case shuffleDep: ShuffleDependency[, , ] =>
    parents += shuffleDep
    case dependency =>
    //如果不是shuffle依賴,把其父RDD壓入待訪問棧中,從而進(jìn)行循環(huán)
    waitingForVisit.push(dependency.rdd)
    }
    }
    }
    parents
    }
    /創(chuàng)建shuffleMapStage,根據(jù)上面得到的兩個(gè)Shuffle對(duì)象,分別創(chuàng)建了兩個(gè)shuffleMapStage
    /
    /
    def createShuffleMapStage(shuffleDep: ShuffleDependency[, , _], jobId: Int): ShuffleMapStage = {
    //這個(gè)RDD其實(shí)就是RDD1和RDD6
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId) //查看這兩個(gè)ShuffleMapStage是否存在父Shuffle的Stage
    val id = nextStageId.getAndIncrement()
    //創(chuàng)建ShuffleMapStage,下面是更新一下SparkContext的狀態(tài)
    val stage = new ShuffleMapStage(
    id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
    }

    ----------------------------------------------

    通過上面的源代碼分析,結(jié)合RDD的邏輯執(zhí)行圖,我們可以看出,這個(gè)job擁有三個(gè)Stage,一個(gè)ResultStage,兩個(gè)ShuffleMapStage,一個(gè)ShuffleMapStage中的RDD是RDD1,另一個(gè)stage中的RDD是RDD6,從而,以上完成了RDD到Stage的切分工作。當(dāng)切分完成后在handleJobSubmitted這個(gè)方法的最后,調(diào)用提交stage的方法。

submitStage源代碼比較簡(jiǎn)單,它會(huì)檢查我們當(dāng)前的stage依賴的父stage是否已經(jīng)執(zhí)行完成,如果沒有執(zhí)行完成會(huì)循環(huán)提交其父stage等待其父stage執(zhí)行完成了,才提交我們當(dāng)前的stage進(jìn)行執(zhí)行。

----------------------------------------------

private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

----------------------------------------------

提交task的方法源代碼,我們按照剛才的三個(gè)stage中,提交的是前兩個(gè)stage的過程來看待這個(gè)源代碼。以包含RDD1的stage為例

----------------------------------------------

private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingPartitions.clear()

// 計(jì)算需要計(jì)算的分區(qū)數(shù) val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()// Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).propertiesrunningStages += stage// 封裝stage的一些信息,得到stage到分區(qū)數(shù)的映射關(guān)系,即一個(gè)stage對(duì)應(yīng)多少個(gè)分區(qū)需要計(jì)算 stage match {case s: ShuffleMapStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)case s: ResultStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) }

//得到每個(gè)分區(qū)對(duì)應(yīng)的具體位置,即分區(qū)的數(shù)據(jù)位于集群的哪臺(tái)機(jī)器上。
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// 這個(gè)把上面stage要計(jì)算的分區(qū)和每個(gè)分區(qū)對(duì)應(yīng)的物理位置進(jìn)行了從新封裝,放在了latestInfo里面
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

//序列化我們剛才得到的信息,以便在driver機(jī)器和work機(jī)器之間進(jìn)行傳輸
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

taskBinary = sc.broadcast(taskBinaryBytes) } catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString, Some(e))runningStages -= stage// Abort executionreturncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn }

//封裝stage構(gòu)成taskSet集合,ShuffleMapStage對(duì)應(yīng)的task為ShuffleMapTask,而ResultStage對(duì)應(yīng)的taskSet為ResultTask
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage =>partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)} }

} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

//提交task給TaskSchedule
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

val debugString = stage match {case stage: ShuffleMapStage =>s"Stage ${stage} is actually done; " +s"(available: ${stage.isAvailable}," +s"available outputs: ${stage.numAvailableOutputs}," +s"partitions: ${stage.numPartitions})"case stage : ResultStage =>s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString)submitWaitingChildStages(stage)

}
}

----------------------------------------------

到此,完成了整個(gè)DAGScheduler的調(diào)度。

spark的TaskSchedule的調(diào)度

spark的Task的調(diào)度,我們要明白其調(diào)度過程,其根據(jù)不同的資源管理器擁有不同的調(diào)度策略,因此也擁有不同的調(diào)度守護(hù)進(jìn)程,這個(gè)守護(hù)進(jìn)程管理著集群的資源信息,spark提供了一個(gè)基本的守護(hù)進(jìn)程的類,來完成與driver和executor的交互:CoarseGrainedSchedulerBackend,它應(yīng)該運(yùn)行在集群資源管理器上,比如yarn等。他收集了集群work機(jī)器的一般資源信息。當(dāng)我們形成tasks將要進(jìn)行調(diào)度的時(shí)候,driver進(jìn)程會(huì)與其通信,請(qǐng)求資源的分配和調(diào)度,其會(huì)把最優(yōu)的work節(jié)點(diǎn)分配給task來執(zhí)行其任務(wù)。而TaskScheduleImpl實(shí)現(xiàn)了task調(diào)度的過程,采用的調(diào)度算法默認(rèn)的是FIFO的策略,也可以采用公平調(diào)度策略。

當(dāng)我們提交task時(shí),其會(huì)創(chuàng)建一個(gè)管理task的類TaskSetManager,然后把其加入到任務(wù)調(diào)度池中。

----------------------------------------------

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// 創(chuàng)建taskSetManager,以下為更新一下狀態(tài)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{._2.taskSet.id}.mkString(",")}")
}
//把封裝好的taskSet,加入到任務(wù)調(diào)度隊(duì)列中。
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true

}
//這個(gè)地方就是向資源管理器發(fā)出請(qǐng)求,請(qǐng)求任務(wù)的調(diào)度
backend.reviveOffers()
}

/*

*這個(gè)方法是位于CoarseGrainedSchedulerBackend類中,driver進(jìn)程會(huì)想集群管理器發(fā)送請(qǐng)求資源的請(qǐng)求。
/
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

----------------------------------------------

當(dāng)其收到這個(gè)請(qǐng)求時(shí),其會(huì)調(diào)用這樣的方法。

----------------------------------------------

override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
//發(fā)送的請(qǐng)求滿足這個(gè)條件
case ReviveOffers =>
makeOffers()

case KillTask(taskId, executorId, interruptThread) =>
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
}

/*

*這個(gè)方法是搜集集群上現(xiàn)在還在活著的機(jī)器的相關(guān)信息。并且進(jìn)行封裝成WorkerOffer類,

  • 然后其會(huì)調(diào)用TaskSchedulerImpl中的resourceOffers方法,來進(jìn)行篩選,篩選出符合請(qǐng)求資源的機(jī)器,來執(zhí)行我們當(dāng)前的任務(wù)
    /
    private def makeOffers() {
    // Filter out executors under killing
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    launchTasks(scheduler.resourceOffers(workOffers))
    }

/*
得到集群中空閑機(jī)器的信息后,我們通過此方法來篩選出滿足我們這次任務(wù)要求的機(jī)器,然后返回TaskDescription類
*這個(gè)類封裝了task與excutor的相關(guān)信息

  • /
    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    //檢查work是否已經(jīng)存在了,把不存在的加入到work調(diào)度池中
    for (o <- offers) {
    if (!hostToExecutors.contains(o.host)) {
    hostToExecutors(o.host) = new HashSet[String]()
    }
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
    hostToExecutors(o.host) += o.executorId
    executorAdded(o.executorId, o.host)
    executorIdToHost(o.executorId) = o.host
    executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
    newExecAvail = true
    }
    for (rack <- getRackForHost(o.host)) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
    }
    }
    // 打亂work機(jī)器的順序,以免每次分配任務(wù)時(shí)都在同一個(gè)機(jī)器上進(jìn)行。避免某一個(gè)work計(jì)算壓力太大。
    val shuffledOffers = Random.shuffle(offers)
    //對(duì)于每一work,創(chuàng)建一個(gè)與其核數(shù)大小相同的數(shù)組,數(shù)組的大小決定了這臺(tái)work上可以并行執(zhí)行task的數(shù)目.
    val tasks = shuffledOffers.map(o => new ArrayBufferTaskDescription)
    //取出每臺(tái)機(jī)器的cpu核數(shù)
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //從task任務(wù)調(diào)度池中,按照我們的調(diào)度算法,取出需要執(zhí)行的任務(wù)
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
    taskSet.executorAdded()
    }
    }
    // 下面的這個(gè)循環(huán),是用來標(biāo)記task根據(jù)work的信息來標(biāo)定數(shù)據(jù)本地化的程度的。當(dāng)我們?cè)趛arn資源管理器,以--driver-mode配置
    //為client時(shí),我們就會(huì)在打出來的日志上看出每一臺(tái)機(jī)器上運(yùn)行task的數(shù)據(jù)本地化程度。同時(shí)還會(huì)選擇每個(gè)task對(duì)應(yīng)的work機(jī)器
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    for (taskSet <- sortedTaskSets) {
    var launchedAnyTask = false
    var launchedTaskAtCurrentMaxLocality = false
    for (currentMaxLocality <- taskSet.myLocalityLevels) {
    do {
    launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
    taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
    launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    } while (launchedTaskAtCurrentMaxLocality)
    }
    if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    }
    }

    if (tasks.size > 0) {
    hasLaunchedTask = true
    }
    //返回taskDescription對(duì)象
    return tasks
    }

/*
task選擇執(zhí)行其任務(wù)的work其實(shí)是在這個(gè)函數(shù)中實(shí)現(xiàn)的,從這個(gè)可以看出,一臺(tái)work上其實(shí)是可以運(yùn)行多個(gè)task,主要是看如何
*進(jìn)行算法調(diào)度

  • /
    private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    //循環(huán)所有的機(jī)器,找適合此機(jī)器的task
    for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    //判斷其剩余的cpu核數(shù)是否滿足我們的最低配置,滿足則為其分配任務(wù),否則不為其分配任務(wù)。
    if (availableCpus(i) >= CPUS_PER_TASK) {
    try {
    //這個(gè)for中的resourOffer就是來判斷其標(biāo)記任務(wù)數(shù)據(jù)本地化的程度的。task(i)其實(shí)是一個(gè)數(shù)組,數(shù)組大小和其cpu核心數(shù)大小相同。
    for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
    tasks(i) += task
    val tid = task.taskId
    taskIdToTaskSetManager(tid) = taskSet
    taskIdToExecutorId(tid) = execId
    executorIdToRunningTaskIds(execId).add(tid)
    availableCpus(i) -= CPUS_PER_TASK
    assert(availableCpus(i) >= 0)
    launchedTask = true
    }
    } catch {
    case e: TaskNotSerializableException =>
    logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
    // Do not offer resources for this task, but don't throw an error to allow other
    // task sets to be submitted.
    return launchedTask
    }
    }
    }
    return launchedTask
    }

    ----------------------------------------------

    以上完成了從TaskSet到task和work機(jī)器的綁定過程的所有任務(wù)。下面就是如何發(fā)送task到executor進(jìn)行執(zhí)行。在makeOffers()方法中調(diào)用了launchTasks方法,這個(gè)方法其實(shí)就是發(fā)送task作業(yè)到指定的機(jī)器上。只此,spark TaskSchedule的調(diào)度就此結(jié)束。

executor如何執(zhí)行task以及我們定義的函數(shù)

當(dāng)TaskSchedule完成對(duì)task的調(diào)度時(shí),task需要在work機(jī)器上來進(jìn)行執(zhí)行。此時(shí),work機(jī)器就會(huì)啟動(dòng)一個(gè)Backend的守護(hù)進(jìn)程,用來完成與driver和資源管理器的通信。這個(gè)Backend就是CoarseGrainedExecutorBackend,啟動(dòng)的main主函數(shù)為,從main函數(shù)中可以看出,其主要進(jìn)行參數(shù)的解析,然后運(yùn)行run方法。

----------------------------------------------

def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
}
}
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
/*
可以看出,run方法只是進(jìn)行了一些需要運(yùn)行task所需要的環(huán)境進(jìn)行配置。并且創(chuàng)建相應(yīng)的運(yùn)行環(huán)境。

  • /
    private def run(
    driverUrl: String,
    executorId: String,
    hostname: String,
    cores: Int,
    appId: String,
    workerUrl: Option[String],
    userClassPath: Seq[URL]) {

    Utils.initDaemon(log)

    SparkHadoopUtil.get.runAsSparkUser { () =>
    // Debug code
    Utils.checkHost(hostname)

    // Bootstrap to fetch the driver's Spark properties.
    val executorConf = new SparkConf
    val port = executorConf.getInt("spark.executor.port", 0)
    val fetcher = RpcEnv.create(
    "driverPropsFetcher",
    hostname,
    port,
    executorConf,
    new SecurityManager(executorConf),
    clientMode = true)
    val driver = fetcher.setupEndpointRefByURI(driverUrl)
    val cfg = driver.askWithRetrySparkAppConfig
    val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
    fetcher.shutdown()

    // Create SparkEnv using properties we fetched from the driver.
    val driverConf = new SparkConf()
    for ((key, value) <- props) {
    // this is required for SSL in standalone mode
    if (SparkConf.isExecutorStartupConf(key)) {
    driverConf.setIfMissing(key, value)
    } else {
    driverConf.set(key, value)
    }
    }
    if (driverConf.contains("spark.yarn.credentials.file")) {
    logInfo("Will periodically update credentials from: " +
    driverConf.get("spark.yarn.credentials.file"))
    SparkHadoopUtil.get.startCredentialUpdater(driverConf)
    }

    val env = SparkEnv.createExecutorEnv(
    driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
    env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
    workerUrl.foreach { url =>
    env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
    }
    env.rpcEnv.awaitTermination()
    SparkHadoopUtil.get.stopCredentialUpdater()
    }
    }

    ----------------------------------------------

    其執(zhí)行函數(shù)的調(diào)用過程如下:

我們知道當(dāng)我們完成TaskSchedule的調(diào)度時(shí),是通過rpc發(fā)送了一個(gè)消息,如下圖所示,當(dāng)work機(jī)器的Backend啟動(dòng)以后,其會(huì)與driver進(jìn)程進(jìn)行rpc通信,當(dāng)其收到LaunchTask的消息后,其會(huì)執(zhí)行下面的代碼。

我們可以看出此方法存在很多的情況,根據(jù)接收到的不同的消息,執(zhí)行不同的代碼。我們上面執(zhí)行的是LaunchTask的請(qǐng)求。

----------------------------------------------

override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}

case RegisterExecutorFailed(message) =>
exitExecutor(1, "Slave registration failed: " + message)
//提交任務(wù)時(shí),執(zhí)行這樣的操作。
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
//先反序列化
val taskDesc = ser.deserializeTaskDescription
logInfo("Got assigned task " + taskDesc.taskId)
//然后執(zhí)行l(wèi)aunchTask操作。
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}

case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread)
}

case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)

case Shutdown =>
stopping.set(true)
new Thread("CoarseGrainedExecutorBackend-stop-executor") {
override def run(): Unit = {
// executor.stop() will call SparkEnv.stop() which waits until RpcEnv stops totally.
// However, if executor.stop() runs in some thread of RpcEnv, RpcEnv won't be able to
// stop until executor.stop() returns, which becomes a dead-lock (See SPARK-14180).
// Therefore, we put this line in a new thread.
executor.stop()
}
}.start()
}

----------------------------------------------

Executor的相關(guān)源代碼,從源碼中我們可以看出,對(duì)于Task,其創(chuàng)建了一個(gè)TaskRunner的線程,并且把其放入到執(zhí)行隊(duì)列中進(jìn)行執(zhí)行。

----------------------------------------------

def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}

----------------------------------------------

從下面可以看出,其定義的就是一個(gè)線程,那我們就看一下這個(gè)線程的run方法。

----------------------------------------------

override def run(): Unit = {
//初始化線程運(yùn)行需要的一些環(huán)境
val threadMXBean = ManagementFactory.getThreadMXBean
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
//得到當(dāng)前進(jìn)程的類加載器
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
//更新相關(guān)的狀態(tài)
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()

try {

//反序列化類相關(guān)的依賴,得到相關(guān)的參數(shù)
val (taskFiles, taskJars, taskProps, taskBytes) =
Task.deserializeWithDependencies(serializedTask)

// Must be set before updateDependencies() is called, in case fetching dependencies// requires access to properties contained within (e.g. for access control).Executor.taskDeserializationProps.set(taskProps)

//更新依賴配置
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.localProperties = taskProps
task.setTaskMemoryManager(taskMemoryManager)

// If this task has been killed before we deserialized it, let's quit now. Otherwise,// continue executing the task.if (killed) {// Throw an exception rather than returning, because returning within a try{} block// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl// exception will be caught by the catch block, leading to an incorrect ExceptionFailure// for the task.throw new TaskKilledException}logDebug("Task " + taskId + "'s epoch is " + task.epoch)

//追蹤緩存數(shù)據(jù)的位置
env.mapOutputTracker.updateEpoch(task.epoch)

// Run the actual task and measure its runtime.taskStart = System.currentTimeMillis()taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lvar threwException = true

//運(yùn)行任務(wù)的run方法來運(yùn)行task,主要就是下面的task.run方法,它又會(huì)調(diào)用runTask方法來真正執(zhí)行task,前面我們提到過,job變
//為stage有兩種,ShuffleMapStage和ResultStage,那么其對(duì)應(yīng)的也有兩個(gè)Task:ShuffleMapTask和ResultTask,不同的task類型,執(zhí)行不同的run方法。
val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
//下面就是根據(jù)上面的運(yùn)行結(jié)果,來進(jìn)行一些判斷和日志的打出
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

if (freedMemory > 0 && !threwException) {val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +releasedLocks.mkString("[", ", ", "]")if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}}val taskFinish = System.currentTimeMillis()val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L// If the task has been killed, let's fail it.if (task.killed) {throw new TaskKilledException}val resultSer = env.serializer.newInstance()val beforeSerialization = System.currentTimeMillis()val valueBytes = resultSer.serialize(value)val afterSerialization = System.currentTimeMillis()// Deserialization happens in two parts: first, we deserialize a Task object, which// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.task.metrics.setExecutorDeserializeTime((taskStart - deserializeStartTime) + task.executorDeserializeTime)task.metrics.setExecutorDeserializeCpuTime((taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)// We need to subtract Task.run()'s deserialization time to avoid double-countingtask.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)task.metrics.setExecutorCpuTime((taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)// Note: accumulator updates must be collected after TaskMetrics is updatedval accumUpdates = task.collectAccumulatorUpdates()// TODO: do not serialize value twiceval directResult = new DirectTaskResult(valueBytes, accumUpdates)val serializedDirectResult = ser.serialize(directResult)val resultSize = serializedDirectResult.limit// directSend = sending directly back to the driverval serializedResult: ByteBuffer = {if (maxResultSize > 0 && resultSize > maxResultSize) {logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +s"dropping it.")ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))} else if (resultSize > maxDirectResultSize) {val blockId = TaskResultBlockId(taskId)env.blockManager.putBytes(blockId,new ChunkedByteBuffer(serializedDirectResult.duplicate()),StorageLevel.MEMORY_AND_DISK_SER)logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))} else {logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")serializedDirectResult}}execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {case ffe: FetchFailedException =>val reason = ffe.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case _: TaskKilledException =>logInfo(s"Executor killed $taskName (TID $taskId)")setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case _: InterruptedException if task.killed =>logInfo(s"Executor interrupted and killed $taskName (TID $taskId)")setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case CausedBy(cDE: CommitDeniedException) =>val reason = cDE.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case t: Throwable =>// Attempt to exit cleanly by informing the driver of our failure.// If anything goes wrong (or this was a fatal exception), we will delegate to// the default uncaught exception handler, which will terminate the Executor.logError(s"Exception in $taskName (TID $taskId)", t)// Collect latest accumulator values to report back to the driverval accums: Seq[AccumulatorV2[_, _]] =if (task != null) {task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)task.collectAccumulatorUpdates(taskFailed = true)} else {Seq.empty}val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))val serializedTaskEndReason = {try {ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))} catch {case _: NotSerializableException =>// t is not serializable so just send the stacktraceser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))}}setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)// Don't forcibly exit unless the exception was inherently fatal, to avoid// stopping other tasks unnecessarily.if (Utils.isFatalError(t)) {SparkUncaughtExceptionHandler.uncaughtException(t)}} finally {runningTasks.remove(taskId) }

}
}

----------------------------------------------

前面我們提到過,job變?yōu)閟tage有兩種,ShuffleMapStage和ResultStage,那么其對(duì)應(yīng)的也有兩個(gè)Task:ShuffleMapTask和
ResultTask,不同的task類型,執(zhí)行不同的Task.runTask方法。Task.run方法中調(diào)用了runTask的方法,這個(gè)方法在上面兩個(gè)Task類中都進(jìn)行了重寫。
ShuffleMapTask的runTask方法

----------------------------------------------

override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
//首先進(jìn)行一些初始化操作
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
//反序列化,這里的rdd,其實(shí)是我們進(jìn)行shuffle之前的最后一個(gè)rdd,這個(gè)我們?cè)谇懊嬉呀?jīng)說到的。
val (rdd, dep) = ser.deserialize[(RDD[], ShuffleDependency[, , ])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
//下面就是把每一個(gè)shuffle之前的stage的最后一個(gè)rdd進(jìn)行寫入操作,但是沒有看到task執(zhí)行我們寫的函數(shù),也沒有看到其調(diào)用compute函數(shù)以及rdd之間的管道執(zhí)行也沒有體現(xiàn)出來,往下看,會(huì)揭露這些問題的面紗。
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}

----------------------------------------------

對(duì)于上面紅色部分的問題,我們?cè)谶@里進(jìn)行詳細(xì)的解釋。RDD會(huì)根據(jù)依賴關(guān)系來形成一個(gè)有向無環(huán)圖,通過最后一個(gè)RDD和其依賴,我們就可以反向查找其對(duì)應(yīng)的所有父類。如果沒有shuffle過程,那么其就會(huì)形成管道,形成管道的好處就是所有RDD的中間結(jié)果不需要進(jìn)行存儲(chǔ),直接就把我們的定義的多個(gè)函數(shù)串連起來,從輸入到輸出中間結(jié)果不需要存儲(chǔ),節(jié)省了時(shí)間和空間。同時(shí)我們也知道RDD的中間結(jié)果可以持久化到內(nèi)存或者硬盤上,spark對(duì)于這個(gè)是可以追蹤到的。

通過上面的分析,我們可以看出,executor中

正是我們RDD往前回溯的開始。對(duì)于shuffle過程和ResultTask的runTask的執(zhí)行過程以后會(huì)在慢慢跟進(jìn)。

轉(zhuǎn)載于:https://blog.51cto.com/9269309/2091219

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

總結(jié)

以上是生活随笔為你收集整理的spark DAGScheduler、TaskSchedule、Executor执行task源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

日韩,精品电影 | 婷婷av综合 | 婷婷综合网 | 欧美不卡视频在线 | 国产成年免费视频 | 色吊丝在线永久观看最新版本 | 伊人www22综合色 | 在线小视频 | 国产成人综合图片 | 日韩免费久久 | 在线看国产日韩 | 激情av在线播放 | 国产激情小视频在线观看 | 91精品老司机久久一区啪 | 国产最新在线 | 成人va天堂| 欧美一级片 | 久久福利国产 | 国产成人精品电影久久久 | 国产手机av | 免费亚洲黄色 | 久久国产视频网站 | 黄网站免费大全入口 | 色婷婷福利视频 | 日韩av在线免费看 | 国产成人精品久久久久蜜臀 | 国产一区二区精品 | 免费在线激情电影 | 国产精品亚洲a | 欧美日韩亚洲精品在线 | 亚洲经典视频在线观看 | 久久伊人八月婷婷综合激情 | 在线国产福利 | 日韩一区二区三区高清免费看看 | 92中文资源在线 | 婷婷色综 | 五月婷婷丁香激情 | 九九九在线 | 国产无套一区二区三区久久 | 久久国产精品成人免费浪潮 | 激情av综合 | 国产精品成人免费一区久久羞羞 | 亚洲国产高清在线观看视频 | 天天综合导航 | 成人黄色小说在线观看 | 精品播放 | 亚洲九九九在线观看 | 国产不卡av在线播放 | 国产又粗又硬又长又爽的视频 | 亚洲精品玖玖玖av在线看 | 亚洲精品午夜aaa久久久 | 91丨九色丨国产丨porny精品 | 日韩av成人免费看 | 97精品国产97久久久久久 | 久久99精品久久久久久 | 免费的成人av | 婷婷六月天丁香 | 日韩视频在线不卡 | 91九色精品女同系列 | 国产精品高潮呻吟久久久久 | 免费网站黄 | 中文字幕在线观看免费高清完整版 | 久久综合久色欧美综合狠狠 | 国产视频欧美视频 | 精品日韩中文字幕 | 国产手机视频在线观看 | 99久久精品视频免费 | 亚洲高清视频在线观看免费 | 亚洲第一区在线观看 | 久久人人爽人人爽人人片av软件 | 五月天伊人网 | 999久久久免费精品国产 | av电影亚洲 | 国产又黄又爽又猛视频日本 | 亚洲国产精品成人av | 成人免费视频网 | 日日麻批40分钟视频免费观看 | 日本黄色免费电影网站 | 国产美女视频网站 | 91精品伦理 | 成人黄色av网站 | 国产精品大片在线观看 | 欧美视屏一区二区 | 亚洲天天综合 | 在线国产精品一区 | 五月婷婷在线观看视频 | 成人午夜黄色影院 | 99久久婷婷国产精品综合 | 91九色蝌蚪视频网站 | 免费在线观看不卡av | 成人a大片| 果冻av在线 | 99精品国产一区二区 | 色综合天天色综合 | 久久久久久久久久福利 | 成人a视频在线观看 | 国产精品男女 | 91亚色视频 | 久久在线免费 | 色婷婷骚婷婷 | 福利网址在线观看 | 91精品视频免费观看 | 天天天天色射综合 | 99视频精品视频高清免费 | 特级黄色视频毛片 | 波多野结衣视频网址 | 免费黄色在线网址 | 综合久久影院 | 色多多污污| 久久久久久久久久久黄色 | 超碰在线人人 | 久久精品www人人爽人人 | 夜夜夜| 欧美精品一区二区蜜臀亚洲 | 日韩理论片 | 天天爽天天爽 | 97精品一区 | 婷婷色综合色 | 日p在线观看 | 夜夜躁狠狠燥 | 久久久久久久久久毛片 | 久热爱| 顶级欧美色妇4khd | 亚洲日本三级 | 少妇资源站 | 久久在线视频在线 | 中国黄色一级大片 | 精品视频中文字幕 | 日韩二级毛片 | 日韩精品不卡在线 | 亚洲女欲精品久久久久久久18 | 国产精品亚洲片夜色在线 | 婷婷丁香社区 | 福利久久久 | 国产一级高清视频 | 成人av影视 | 九九国产视频 | 91丝袜美腿| 粉嫩aⅴ一区二区三区 | 91大神免费视频 | 成人免费一区二区三区在线观看 | 五月婷婷.com| 国产精品麻豆三级一区视频 | 天天综合网久久 | 久草在线免费看视频 | 岛国av在线免费 | av网站免费在线 | 亚洲欧美日韩国产一区二区三区 | 欧美激情精品久久久久 | 久热爱 | 97色在线视频| 亚洲视频在线免费看 | 午夜资源站 | 久久精品亚洲一区二区三区观看模式 | 国产精品女人久久久久久 | 精品视频一区在线 | 国产精品女同一区二区三区久久夜 | 欧美一区,二区 | 日日干天天爽 | 精品国产大片 | 六月久久婷婷 | 欧美黑吊大战白妞欧美 | 91免费在线| 欧美精品一区二区蜜臀亚洲 | 久久综合九色九九 | 国产高清成人 | 久久精品国产亚洲aⅴ | 成人av在线看 | 久久综合狠狠综合久久综合88 | 国产婷婷色 | 日产乱码一二三区别免费 | 国产视频2021 | 日韩专区在线观看 | 99亚洲国产| 91视频在线免费 | 人人舔人人爽 | 久久夜夜爽 | 婷婷在线五月 | 日日日视频| 日韩一区二区三区在线看 | 黄网站色欧美视频 | 日韩免费电影一区二区三区 | 亚洲一级久久 | 日本性生活一级片 | 色在线中文字幕 | 97碰碰碰| av电影在线免费观看 | 日韩网站在线播放 | 久久99久久精品国产 | 中文字幕乱视频 | 久久亚洲人 | 黄色亚洲在线 | 狠狠色丁香婷婷综合久小说久 | 国产中文字幕亚洲 | 色婷婷综合五月 | 亚洲成av人电影 | 国产精品h在线观看 | 99精品在线直播 | 国产在线探花 | 午夜视频免费在线观看 | 911久久香蕉国产线看观看 | av动态图片| 欧美日本一二三 | 精品久久久久久国产偷窥 | 九九热免费在线观看 | 免费观看日韩 | 国产日韩中文字幕在线 | 国产精品久久电影网 | 久久久久麻豆v国产 | 国产亚洲在线观看 | 久久影视网 | 激情五月激情综合网 | 91视频免费视频 | 日韩三级精品 | 99热免费在线 | 99这里精品 | 日本精品视频一区二区 | 欧美性大战久久久久 | 精品黄色视| 国模一二三区 | 欧美久久99 | 激情视频免费在线 | 欧美在线不卡一区 | 国产在线观看av | 91片网 | 亚洲国产成人久久 | 国产在线a不卡 | 在线观看av免费 | 奇米网8888 | 中文在线免费观看 | 欧美精品久久久久久久久免 | 欧美日韩三级在线观看 | 久久久精品一区二区 | 狠狠狠色丁香婷婷综合激情 | 亚洲理论在线观看电影 | 69国产成人综合久久精品欧美 | 成人动漫一区二区 | 美女网站免费福利视频 | 国产精品久99 | 成人午夜剧场在线观看 | 尤物一区二区三区 | 日本中文一级片 | 波多野结衣在线中文字幕 | 韩国av永久免费 | 久久国产成人午夜av影院宅 | 四虎免费av | 色婷婷丁香 | 日韩欧美精品在线视频 | 免费看污在线观看 | 九色视频网址 | 日日干,天天干 | 8x成人免费视频 | 色婷婷六月天 | 国产91学生粉嫩喷水 | 久久久精品一区二区 | av在线免费观看网站 | 天天天干天天射天天天操 | 欧美一级视频免费 | 玖玖在线精品 | 三级黄色免费片 | 国产精品久久久久久久av大片 | 日韩大片免费观看 | 少妇自拍av | 日韩黄在线观看 | 欧美精品免费一区二区 | 在线观看视频99 | 在线观看成人毛片 | 狠狠色丁香婷婷综合欧美 | 欧美性大胆 | 97在线观看免费视频 | 蜜臀aⅴ精品一区二区三区 久久视屏网 | 五月婷婷黄色网 | 亚洲欧美视频 | 久久久久久久99 | 精品国产诱惑 | 天天干夜夜想 | 中文字幕免费高清 | 色婷婷福利 | 日本性生活一级片 | 国色天香第二季 | 欧美一区二区三区特黄 | 欧美精品久久久久a | 国产亚洲精品久久久久久 | 99精品毛片 | 日日夜夜免费精品视频 | 午夜私人影院久久久久 | 在线视频观看国产 | 999超碰| 911av视频 | 在线免费观看黄色 | 超碰97在线资源站 | 成人 亚洲 欧美 | av电影久久| 三级午夜片 | 亚洲最大av网 | 亚洲成人网av | 国产精品久久久久久久久久东京 | 日韩精品一区二区三区视频播放 | 国产高清免费在线观看 | 欧美 亚洲 另类 激情 另类 | 黄p网站在线观看 | 中文字幕久久精品 | 91精品国自产拍天天拍 | 国产91精品欧美 | 精品一区二区久久久久久久网站 | 久久久一本精品99久久精品66 | 51久久成人国产精品麻豆 | 97精品国产97久久久久久春色 | av网址在线播放 | 插综合网| 91精品久久久久久 | 国内精品美女在线观看 | 国产69精品久久99不卡的观看体验 | 狠狠色狠狠色合久久伊人 | 免费在线激情电影 | 美女视频黄在线 | 国产精品11| 主播av在线 | 91九色porn在线资源 | 欧美小视频在线观看 | 国产在线精品二区 | a天堂一码二码专区 | av成人资源| 亚洲日韩欧美视频 | 97人人模人人爽人人少妇 | 一区二区三区污 | 天天干人人插 | 久久精品之| 国产一级视频免费看 | 久久视| 精品亚洲视频在线 | 色噜噜在线观看视频 | 久久综合久久综合九色 | 99情趣网视频 | 99国产精品久久久久久久久久 | 久久久久女人精品毛片九一 | 日韩中文字幕免费视频 | 天天综合网久久 | 嫩嫩影院理论片 | 亚洲综合少妇 | 日韩高清免费在线观看 | 免费网站黄色 | 久久免费毛片视频 | 91欧美视频网站 | 五月婷婷久久丁香 | 婷婷在线免费视频 | 国产精品毛片一区二区在线 | 香蕉视频最新网址 | 国产精品网红福利 | 免费观看一级视频 | 国产在线不卡视频 | 天堂在线一区 | 久久黄色网址 | 综合婷婷丁香 | 99热网站| 色婷婷狠狠五月综合天色拍 | 日韩黄在线观看 | 麻豆91精品视频 | 狠狠狠狠狠狠天天爱 | 中文字幕日韩免费视频 | 在线观看免费av网站 | 欧美在线视频一区二区三区 | 国产一级视频免费看 | 婷婷五月在线视频 | 亚洲精品自在在线观看 | 五月天色婷婷丁香 | 日韩在线观看不卡 | 天天干天天玩天天操 | www.五月天婷婷.com | 国产一区二区观看 | 亚洲天天综合网 | 日韩欧美精品一区二区 | 免费国产在线视频 | 91精品日韩 | 在线观av | 天天干天天射天天爽 | 国产男女爽爽爽免费视频 | 久久亚洲免费视频 | www激情网| 欧美精品二区 | 91自拍成人 | 日韩av不卡在线观看 | 亚洲人成网站精品片在线观看 | 亚洲欧美综合 | 精品免费观看 | 五月婷婷丁香在线观看 | 97人人爽人人 | 欧美激情综合色综合啪啪五月 | 日韩有码网站 | 91麻豆精品久久久久久 | 国产亚洲精品久久久久5区 成人h电影在线观看 | 亚洲欧洲xxxx| 国产精品h在线观看 | 4438全国亚洲精品观看视频 | 久久国产精品色婷婷 | 韩国精品在线观看 | 久久综合欧美精品亚洲一区 | 成人国产精品一区 | 国产精品麻豆三级一区视频 | 三级黄免费看 | 96精品视频 | 中文字幕在线看视频 | 91精品国产一区二区在线观看 | 视频成人永久免费视频 | 久草精品网 | 国产第页 | 国模视频一区二区三区 | 亚洲精品在线观看不卡 | 91自拍91| 日韩两性视频 | 亚洲成av人片在线观看www | 久久精品久久久久久久 | 欧美日韩亚洲第一 | 中文字幕在线日 | 久久久久中文 | 久久久免费精品国产一区二区 | 婷婷色六月天 | 黄色av免费 | 91久久久久久久 | 又黄又爽又色无遮挡免费 | 亚洲欧洲日韩在线观看 | 国产最新福利 | 欧美一级高清片 | 最新国产精品视频 | 国产色婷婷在线 | 欧美日韩天堂 | 亚洲天堂香蕉 | 97超碰.com | 日韩在线视频网址 | 毛片网在线播放 | av天天在线观看 | 伊在线视频 | 免费色网 | 天天综合网天天 | 91色国产在线 | 在线观看视频精品 | 国产一区免费看 | 国产黄色片在线 | 亚洲精品在线国产 | 黄色精品一区 | 亚洲国产mv | 狠狠做六月爱婷婷综合aⅴ 日本高清免费中文字幕 | 正在播放国产一区二区 | av网址在线播放 | 久久国产精品免费一区 | 国产999精品久久久影片官网 | 91免费日韩 | 在线播放第一页 | 日韩在线观看你懂得 | 日韩久久视频 | 中文字幕av电影下载 | 在线视频国产区 | 国产五月婷 | 久草在线看片 | 91色偷偷 | 亚州精品天堂中文字幕 | 国产精品毛片久久久 | 国产高清在线 | 精品在线免费观看 | 午夜私人影院久久久久 | www99精品 | 日韩电影一区二区在线观看 | 97人人模人人爽人人喊网 | 欧美日韩国产一区二区在线观看 | 中文字幕乱码亚洲精品一区 | 亚洲精品中文字幕在线 | 粉嫩aⅴ一区二区三区 | 最近久乱中文字幕 | 丁香电影小说免费视频观看 | 最新日韩视频在线观看 | 水蜜桃亚洲一二三四在线 | www天天操 | 亚洲激情五月 | 黄色小说在线观看视频 | 91亚洲免费| 亚洲精品午夜久久久久久久 | 欧美美女激情18p | 国产精品久久久久9999 | 中文字幕一区二区三区久久蜜桃 | 悠悠av资源片 | 亚洲成a人片在线观看中文 中文字幕在线视频第一页 狠狠色丁香婷婷综合 | 综合久久精品 | 国产免费观看久久黄 | 伊人资源视频在线 | 日韩精品最新在线观看 | 成人影片免费 | 在线观看中文字幕一区二区 | 国产一区二区免费看 | 91mv.cool在线观看 | 欧美在线一级片 | 麻豆网站免费观看 | 国产中文字幕在线观看 | 免费久久久| 国产精品日韩在线播放 | 91麻豆免费版 | 91网站观看 | 欧美九九九 | 久草在线免费看视频 | 久久精品婷婷 | 国产99一区视频免费 | 婷婷深爱五月 | 亚洲国产成人高清精品 | 婷婷婷国产在线视频 | 精品国产一区二区三区久久久蜜臀 | 精品国产乱码久久久久久三级人 | 韩日av一区二区 | 久久激情五月丁香伊人 | 黄色av一级 | 久久久久久久久久久久电影 | 日韩 国产 | www.天堂av| 一级久久精品 | 久久99精品久久只有精品 | 亚洲在线精品视频 | 亚洲美女免费精品视频在线观看 | 成片免费观看视频 | 久久一区二区三区国产精品 | 久av在线| 黄色av一区二区三区 | 亚洲精品小视频 | 婷婷五天天在线视频 | 日日夜夜综合网 | 亚洲欧美成人综合 | 亚洲精品国偷自产在线91正片 | 国产精品久久久视频 | 欧美日韩免费一区 | 国产精品成人自产拍在线观看 | 麻豆一二 | 久久国内精品 | 麻豆精品传媒视频 | 欧美一级片免费观看 | 99re热精品视频 | 久久综合婷婷国产二区高清 | 国产免费精彩视频 | 在线免费观看黄色小说 | 中文国产在线观看 | 久久免费黄色网址 | 久久看毛片| 伊人在线视频 | 日韩在线观看三区 | 免费看黄色毛片 | 国产91在| 狠狠激情中文字幕 | 精品五月天 | 成人欧美亚洲 | 97在线播放 | 久久精品男人的天堂 | 在线免费观看国产黄色 | 五月开心六月婷婷 | 波多野结衣日韩 | 在线电影 你懂得 | 又爽又黄又无遮挡网站动态图 | 一区二区三区四区在线 | 国产极品尤物在线 | 日日日视频 | 日本最新高清不卡中文字幕 | 久久精品久久久久久久 | 亚洲va欧美 | 精品91| 青青啪 | 成年人黄色av | 天天曰天天干 | 国产一区在线播放 | 在线 成人 | 激情综合五月 | 精品久久一区二区三区 | 国产精品久久久久久久电影 | 一区二区网 | a在线观看免费视频 | 日韩欧美高清 | 国产馆在线播放 | 中文国产在线观看 | 久久一区二区三区超碰国产精品 | 久久免费的精品国产v∧ | 亚洲激情影院 | 国产精品视频地址 | 国产免费不卡av | 美女在线黄 | 午夜影视av| 国产日韩欧美视频在线观看 | 久久精品视频在线 | 五月婷婷亚洲 | 少妇bbb| 久久免费视频这里只有精品 | 日韩网站在线观看 | 中文字幕高清av | 99日韩精品 | 天天天在线综合网 | 国产91在线播放 | 四虎影视8848dvd | 亚洲精品国产精品乱码在线观看 | 丁香激情网 | 亚洲性少妇性猛交wwww乱大交 | 亚洲在线网址 | 99热只有精品在线观看 | 在线观看日韩国产 | 国产一级二级三级在线观看 | 久久久久久久久久网站 | 99视频在线免费看 | 黄色片视频免费 | 亚洲天堂在线观看完整版 | 美女精品国产 | 蜜臀久久99精品久久久久久网站 | 亚洲精品网页 | 国产福利不卡视频 | 国产福利a | 夜又临在线观看 | 啪啪动态视频 | www操操| 欧美精品久久久久性色 | 五月花婷婷| 国产成人精品午夜在线播放 | 欧美aaaxxxx做受视频 | 黄色大全视频 | 欧美精品一区二区免费 | 最近中文字幕视频网 | 精品在线视频观看 | 天天操狠狠操 | 最新国产在线 | 91豆花在线观看 | 久久 地址 | 欧洲精品视频一区 | 狠狠躁夜夜躁人人爽超碰97香蕉 | 天天伊人网 | 国产在线播放观看 | 九九有精品 | 亚洲精品视频 | 国产精品精品久久久久久 | 性色在线视频 | 精品国产91亚洲一区二区三区www | 麻豆一区在线观看 | 日日夜夜操操操操 | 亚洲v精品 | 国内精品久久久久久久影视麻豆 | 91久久奴性调教 | 久99久视频 | 欧美成人在线网站 | 五月婷婷丁香在线观看 | 91精品国产91 | 国产精品成久久久久 | 日韩| 日日夜夜网站 | 九九九九热精品免费视频点播观看 | 国产资源站 | 天堂在线v| www日韩欧美 | wwwww.国产 | 色综合久久五月 | 国产视频1| 午夜在线免费视频 | 亚洲精品动漫久久久久 | 国产美女在线精品免费观看 | 狠狠干综合 | 国产精品久久久一区二区三区网站 | 欧美精品久久久久久久久久久 | 国产美女搞久久 | 成人av免费 | 久久精品欧美一区二区三区麻豆 | 国产午夜精品一区二区三区嫩草 | 日韩在线免费视频 | 久久久久久久久综合 | 国内精品二区 | 手机看片久久 | 色综合久久88色综合天天人守婷 | 亚洲va欧美va人人爽 | 久久理论电影网 | a色网站 | 成人久久久精品国产乱码一区二区 | 欧美一区在线看 | av三级在线看 | 国产精品久久久久免费观看 | 日韩一级片观看 | 97超碰国产精品 | 992tv在线| 在线91播放 | 一级免费黄色 | 在线免费观看羞羞视频 | 亚洲一级黄色 | 亚洲天堂激情 | www色综合| 中文字幕网站视频在线 | 国产精品成久久久久三级 | 色综合天天综合在线视频 | 久久免费视频1 | 91精品老司机久久一区啪 | 欧美日韩国产综合网 | 丁香六月天婷婷 | 韩国一区二区三区视频 | 黄色一区三区 | 色狠狠久久av五月综合 | 国产99久久久国产精品免费二区 | 午夜视频不卡 | 国产亚洲精品v | 国产在线视频导航 | 国产一级片直播 | 久久国产欧美日韩 | 日本韩国在线不卡 | 国内精品久久久久久久久 | 麻豆手机在线 | 日本黄色免费大片 | a级国产乱理论片在线观看 特级毛片在线观看 | 成人免费一级片 | 日日干天天 | 久久亚洲欧美日韩精品专区 | 伊人精品在线 | 天天爽天天搞 | 91.精品高清在线观看 | 日日夜夜精品视频 | 在线免费观看黄色av | 亚洲国产伊人 | av成人黄色| 操操操夜夜操 | 国产精品一区二区三区电影 | 天天操比 | 久久超碰97 | 日本中文字幕视频 | 激情综合一区 | 五月天狠狠操 | 中文字幕日本在线观看 | 久久手机免费视频 | 久久久久成人精品亚洲国产 | 精品久久一区 | 精品国产一区二区三区四区在线观看 | www免费黄色 | 日韩在线短视频 | 国产精品久久久久久久久久久久午夜 | 国产999精品久久久久久麻豆 | 日韩视频中文字幕在线观看 | 欧美一级裸体视频 | 麻豆首页 | 中文字幕av在线不卡 | 一区二区三区在线观看 | 日韩av在线免费看 | 久久国产女人 | 精品国产视频一区 | 亚洲精品网站 | 日韩亚洲欧美中文字幕 | 国内精品久久久久久久 | 色网址99| 久久久久久久久网站 | 在线观看国产日韩欧美 | 亚洲另类人人澡 | 麻豆视频在线播放 | 成人免费在线播放 | 国产精品久久婷婷六月丁香 | 亚洲黄色免费网站 | 国产激情久久久 | 国产精品国产亚洲精品看不卡15 | 欧美黄色免费 | 久99精品 | 伊人天天狠天天添日日拍 | 成年人免费看片 | 最新精品视频在线 | 香蕉久草 | 久草在线视频网 | 天天爽夜夜操 | 日韩免费电影一区二区三区 | 国产色在线,com | 日韩一片| 久久手机在线视频 | 成人久久18免费 | 91九色蝌蚪视频 | av网站免费在线 | 综合久久久久久久 | 91日韩在线专区 | 九九99 | aav在线 | 欧美色婷| 久草在线手机观看 | 国产一区电影在线观看 | 欧美9999 | 久久久免费观看视频 | 99在线视频观看 | 国产精品免费视频网站 | 久久夜色精品国产欧美乱 | 91成人精品一区在线播放 | 国产精品一码二码三码在线 | 国产不卡在线观看 | 亚洲一区二区视频在线播放 | 久久99精品久久久久久清纯直播 | 在线免费观看麻豆视频 | 国产日产精品一区二区三区四区的观看方式 | 91天堂影院 | 日韩欧美国产激情在线播放 | 久久精品—区二区三区 | 久久美女电影 | 久久精品国产成人 | 日韩在线观看免费 | 在线看片视频 | 国内精品久久久久久久久久久久 | 九九九热精品免费视频观看 | 国产精品久久久久久久久搜平片 | 久久天堂精品视频 | 天天干,夜夜操 | 一本一本久久a久久精品综合妖精 | 国产精品欧美日韩 | 欧美午夜精品久久久久久浪潮 | 日本三级吹潮在线 | 国产一区二区在线免费播放 | 国产精品理论视频 | 久久草在线免费 | 激情在线网 | 五月天六月丁香 | 99久久99久久精品免费 | 日韩三区在线 | 亚洲免费公开视频 | 午夜在线看| 97精品国产97久久久久久免费 | 久久久亚洲麻豆日韩精品一区三区 | 青青久视频 | 91av在线视频免费观看 | www最近高清中文国语在线观看 | 久久av中文字幕片 | 久草网站在线观看 | 国内一级片在线观看 | 国产美女主播精品一区二区三区 | 中文字幕精品三区 | 久久99精品国产麻豆婷婷 | 91天堂影院 | 一区中文字幕电影 | 色五月情| 国产999精品久久久影片官网 | 在线免费视| 久久亚洲精品国产亚洲老地址 | 人人艹人人 | 中文字幕日韩伦理 | 国产又粗又猛又黄又爽 | av久久在线 | 欧美日韩午夜 | 在线精品观看 | 瑞典xxxx性hd极品 | 99c视频高清免费观看 | 色午夜| 色婷五月天 | 久久精品中文视频 | 成人久久亚洲 | 日韩a级免费视频 | 91最新地址永久入口 | 日韩精品一区二区久久 | 午夜精品久久久久久久99无限制 | 五月综合在线观看 | 亚洲激情综合 | 欧美极品少妇xxxxⅹ欧美极品少妇xxxx亚洲精品 | 亚洲高清视频在线 | 久久97视频 | 久久综合毛片 | 久久免费视频1 | 亚洲视频久久久 | 久草综合视频 | 国产精品嫩草影视久久久 | 亚洲成人av电影在线 | 青青草在久久免费久久免费 | 国产中文字幕视频在线观看 | 成人动图 | 国产最顶级的黄色片在线免费观看 | 亚洲综合色激情五月 | 91福利视频一区 | 中文字幕国产在线 | 久久精品毛片基地 | 黄网站色 | 欧美日韩3p | 欧美日韩不卡一区二区三区 | 久久国产片 | 亚洲精品乱码久久久一二三 | 国产成人一区二区在线观看 | 国产五月色婷婷六月丁香视频 | 亚洲精品一区二区18漫画 | 欧美精品二 | 欧美激情视频一区 | 91av影视| 在线看片视频 | 日韩啪啪小视频 | 亚洲成人精品av | 中文字幕av专区 | 国产精品乱码久久久久 | 亚洲精品在线观看的 | 麻豆一区二区三区视频 | 午夜视频在线观看一区二区 | 久久久久国产精品厨房 | 97视频在线免费播放 | 国产成人免费av电影 | 国产一区二区在线播放视频 | 91综合久久一区二区 | www激情久久 | 日韩有码中文字幕在线 | 黄色软件在线观看 | 69国产成人综合久久精品欧美 | 日本韩国精品一区二区在线观看 | 日韩成人精品 | 丁香5月婷婷 | 在线观看亚洲免费视频 | 日本99精品| 天天干人人干 | 在线视频欧美精品 | 在线观看成年人 | 久草在线一免费新视频 | 91亚洲精品视频 | 91日韩精品一区 | 香蕉视频免费在线播放 | 91九色在线视频 | 午夜视频一区二区三区 | 夜夜爱av | 亚洲人在线7777777精品 | 天天天操天天天干 | 天天天色综合a | 91高清免费看 | 久久视频在线观看免费 | 欧美日韩一区二区三区在线免费观看 | 99精品黄色片免费大全 | 高清不卡一区二区在线 | 国产 一区二区三区 在线 | 色com| 一二三区在线 | 国产97色在线 | 久久夜av| 超碰日韩在线 | 欧美日韩裸体免费视频 | 99精品区| 99精品色 | 欧美亚洲国产一卡 | 蜜臀av性久久久久av蜜臀妖精 | a在线免费| 久久久久二区 | 9在线观看免费高清完整版 玖玖爱免费视频 | 午夜精品久久久久久久99热影院 | 91爱爱电影 | 日韩成人黄色 | 欧美激情综合五月色丁香小说 | 最近中文字幕免费av | 又紧又大又爽精品一区二区 | 亚洲最大av | 一本一道久久a久久精品蜜桃 | 极品国产91在线网站 | 国产福利一区二区三区在线观看 | 欧美日韩午夜爽爽 | av成人黄色 | 亚洲免费av在线播放 | 一区二区三区 亚洲 | 亚洲jizzjizz日本少妇 | 999毛片| 亚洲国产精品激情在线观看 | 精品一区二区在线看 | 久草在线视频首页 | 天天综合网久久 | 国产成人一区二区精品非洲 | 成人久久国产 | av大片免费在线观看 | 中文字幕电影高清在线观看 | 欧美精品天堂 | 一区二区三区在线免费观看视频 | 久久伦理网 | 深爱婷婷激情 | 日韩av在线一区二区 | 九九九毛片 | 国产午夜精品一区二区三区在线观看 | 久久久久观看 | 日韩美女黄色片 | 中文字幕在线播放日韩 | 中日韩在线视频 | 日韩精品视频在线免费观看 | 国产视频欧美视频 | 天天干国产| 国产特级毛片aaaaaaa高清 | 96视频免费在线观看 | 国产在线综合视频 | 日韩一级黄色av | 99热这里只有精品在线观看 | 96久久欧美麻豆网站 | 日韩成人免费在线电影 | 丁香花中文字幕 | 色婷婷精品大在线视频 | 精品国内自产拍在线观看视频 | 91福利免费 | 亚洲爱视频 | adn—256中文在线观看 | 开心色停停 | 天天干天天做天天爱 | 人人爽人人乐 | 国产精品精品国产色婷婷 | 日韩精品免费一区 | 操操操av| 狠狠的干狠狠的操 | 中文字幕在线看片 | 国产原创在线 | 97人人视频| 亚洲区另类春色综合小说 | 日韩免费在线视频观看 | 欧美日韩国产亚洲乱码字幕 | 久99久精品| 国产视频日本 | 日韩高清国产精品 | 开心激情网五月天 | 九九视频在线观看视频6 | 97国产大学生情侣白嫩酒店 | 国内精品99 | 麻豆视频大全 | 欧美九九视频 | 日日干av |