日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Spark源码分析之七:Task运行(一)

發布時間:2023/11/27 生活经验 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark源码分析之七:Task运行(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在Task調度相關的兩篇文章《Spark源碼分析之五:Task調度(一)》與《Spark源碼分析之六:Task調度(二)》中,我們大致了解了Task調度相關的主要邏輯,并且在Task調度邏輯的最后,CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的makeOffers()方法的最后,我們通過調用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[TaskDescription]],相關代碼如下:

?

[java]?view plain?copy ?
  1. //?調用scheduler的resourceOffers()方法,分配資源,并在得到資源后,調用launchTasks()方法,啟動tasks??
  2. ??????//?這個scheduler就是TaskSchedulerImpl??
  3. ??????launchTasks(scheduler.resourceOffers(workOffers))??
[java]?view plain?copy ?
  1. /**?
  2. ???*?Called?by?cluster?manager?to?offer?resources?on?slaves.?We?respond?by?asking?our?active?task?
  3. ???*?sets?for?tasks?in?order?of?priority.?We?fill?each?node?with?tasks?in?a?round-robin?manner?so?
  4. ???*?that?tasks?are?balanced?across?the?cluster.?
  5. ???*?
  6. ???*?被集群manager調用以提供slaves上的資源。我們通過按照優先順序詢問活動task集中的task來回應。?
  7. ???*?我們通過循環的方式將task調度到每個節點上以便tasks在集群中可以保持大致的均衡。?
  8. ???*/??
  9. ??def?resourceOffers(offers:?Seq[WorkerOffer]):?Seq[Seq[TaskDescription]]?=?synchronized?{??

? ? ? ? 這個TaskDescription很簡單,是傳遞到executor上即將被執行的Task的描述,通常由TaskSetManager的resourceOffer()方法生成。代碼如下:

?

?

[java]?view plain?copy ?
  1. /**?
  2. ?*?Description?of?a?task?that?gets?passed?onto?executors?to?be?executed,?usually?created?by?
  3. ?*?[[TaskSetManager.resourceOffer]].?
  4. ?*/??
  5. private[spark]?class?TaskDescription(??
  6. ????val?taskId:?Long,??
  7. ????val?attemptNumber:?Int,??
  8. ????val?executorId:?String,??
  9. ????val?name:?String,??
  10. ????val?index:?Int,????//?Index?within?this?task's?TaskSet??
  11. ????_serializedTask:?ByteBuffer)??
  12. ??extends?Serializable?{??
  13. ??
  14. ??//?Because?ByteBuffers?are?not?serializable,?wrap?the?task?in?a?SerializableBuffer??
  15. ??//?由于ByteBuffers不可以被序列化,所以將task包裝在SerializableBuffer中,_serializedTask為ByteBuffer類型的Task??
  16. ??private?val?buffer?=?new?SerializableBuffer(_serializedTask)??
  17. ????
  18. ??//?序列化后的Task,?取buffer的value??
  19. ??def?serializedTask:?ByteBuffer?=?buffer.value??
  20. ??
  21. ??
  22. ??override?def?toString:?String?=?"TaskDescription(TID=%d,?index=%d)".format(taskId,?index)??
  23. }??

? ? ? ? 此時,得到Seq[Seq[TaskDescription]],即Task被調度到相應executor上后(僅是邏輯調度,實際上并未分配到executor上執行),接下來要做的,便是真正的將Task分配到指定的executor上去執行,也就是本篇我們將要講的Task的運行。而這部分的開端,源于上述提到的CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的launchTasks()方法,代碼如下:

?

?

[java]?view plain?copy ?
  1. //?Launch?tasks?returned?by?a?set?of?resource?offers??
  2. ????private?def?launchTasks(tasks:?Seq[Seq[TaskDescription]])?{??
  3. ??????
  4. ??????//?循環每個task??
  5. ??????for?(task?<-?tasks.flatten)?{??
  6. ??????????
  7. ????????//?序列化Task??
  8. ????????val?serializedTask?=?ser.serialize(task)??
  9. ??????????
  10. ????????//?序列化后的task的大小超出規定的上限??
  11. ????????//?即如果序列化后task的大小大于等于框架配置的Akka消息最大大小減去除序列化task或task結果外,一個Akka消息需要保留的額外大小的值??
  12. ????????if?(serializedTask.limit?>=?akkaFrameSize?-?AkkaUtils.reservedSizeBytes)?{??
  13. ????????????
  14. ??????????//?根據task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中獲取對應的TaskSetManager??
  15. ??????????scheduler.taskIdToTaskSetManager.get(task.taskId).foreach?{?taskSetMgr?=>??
  16. ????????????try?{??
  17. ??????????????var?msg?=?"Serialized?task?%s:%d?was?%d?bytes,?which?exceeds?max?allowed:?"?+??
  18. ????????????????"spark.akka.frameSize?(%d?bytes)?-?reserved?(%d?bytes).?Consider?increasing?"?+??
  19. ????????????????"spark.akka.frameSize?or?using?broadcast?variables?for?large?values."??
  20. ??????????????msg?=?msg.format(task.taskId,?task.index,?serializedTask.limit,?akkaFrameSize,??
  21. ????????????????AkkaUtils.reservedSizeBytes)??
  22. ????????????????
  23. ??????????????//?調用TaskSetManager的abort()方法,標記對應TaskSetManager為失敗??
  24. ??????????????taskSetMgr.abort(msg)??
  25. ????????????}?catch?{??
  26. ??????????????case?e:?Exception?=>?logError("Exception?in?error?callback",?e)??
  27. ????????????}??
  28. ??????????}??
  29. ????????}??
  30. ????????else?{//?序列化后task的大小在規定的大小內??
  31. ????????????
  32. ??????????//?從executorDataMap中,根據task.executorId獲取executor描述信息executorData??
  33. ??????????val?executorData?=?executorDataMap(task.executorId)??
  34. ????????????
  35. ??????????//?executorData中,freeCores做相應減少??
  36. ??????????executorData.freeCores?-=?scheduler.CPUS_PER_TASK??
  37. ????????????
  38. ??????????//?利用executorData中的executorEndpoint,發送LaunchTask事件,LaunchTask事件中包含序列化后的task??
  39. ??????????executorData.executorEndpoint.send(LaunchTask(new?SerializableBuffer(serializedTask)))??
  40. ????????}??
  41. ??????}??
  42. ????}??

? ? ? ??launchTasks的執行邏輯很簡單,針對傳入的TaskDescription序列,循環每個Task,做以下處理:

?

? ? ? ? 1、首先對Task進行序列化,得到serializedTask;

? ? ? ? 2、針對序列化后的Task:serializedTask,判斷其大小:

? ? ? ? ? ? ? 2.1、序列化后的task的大小達到或超出規定的上限,即框架配置的Akka消息最大大小,減去除序列化task或task結果外,一個Akka消息需要保留的額外大小的值,則根據task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中獲取對應的TaskSetManager,并調用其abort()方法,標記對應TaskSetManager為失敗;

? ? ? ? ? ? ? 2.2、序列化后的task的大小未達到上限,在規定的大小范圍內,則:

? ? ? ? ? ? ? ? ? ? ? ?2.2.1、從executorDataMap中,根據task.executorId獲取executor描述信息executorData;

? ? ? ? ? ? ? ? ? ? ? ?2.2.2、在executorData中,freeCores做相應減少;

? ? ? ? ? ? ? ? ? ? ? ?2.2.3、利用executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件,LaunchTask事件中包含序列化后的task,將Task傳遞到executor中去執行。

? ? ? ? 接下來,我們重點分析下上述流程。

? ? ? ? 先說下異常流程,即序列化后Task的大小超過上限時,對TaskSet標記為失敗的處理。入口方法為TaskSetManager的abort()方法,代碼如下:

?

[java]?view plain?copy ?
  1. def?abort(message:?String,?exception:?Option[Throwable]?=?None):?Unit?=?sched.synchronized?{??
  2. ??????
  3. ????//?TODO:?Kill?running?tasks?if?we?were?not?terminated?due?to?a?Mesos?error??
  4. ????//?調用DAGScheduler的taskSetFailed()方法,標記TaskSet運行失敗??
  5. ????sched.dagScheduler.taskSetFailed(taskSet,?message,?exception)??
  6. ??????
  7. ????//?標志位isZombie設置為true??
  8. ????isZombie?=?true??
  9. ??????
  10. ????//?滿足一定條件的情況下,將TaskSet標記為Finished??
  11. ????maybeFinishTaskSet()??
  12. ??}??

? ? ? ??abort()方法處理邏輯共分三步:

?

? ? ? ? 第一,調用DAGScheduler的taskSetFailed()方法,標記TaskSet運行失敗;

? ? ? ? 第二,標志位isZombie設置為true;

? ? ? ? 第三,滿足一定條件的情況下,將TaskSet標記為Finished。

? ? ? ? 首先看下DAGScheduler的taskSetFailed()方法,代碼如下:

?

[java]?view plain?copy ?
  1. /**?
  2. ???*?Called?by?the?TaskSetManager?to?cancel?an?entire?TaskSet?due?to?either?repeated?failures?or?
  3. ???*?cancellation?of?the?job?itself.?
  4. ???*/??
  5. ??def?taskSetFailed(taskSet:?TaskSet,?reason:?String,?exception:?Option[Throwable]):?Unit?=?{??
  6. ????eventProcessLoop.post(TaskSetFailed(taskSet,?reason,?exception))??
  7. ??}??

? ? ? ? 和第二篇文章《Spark源碼分析之二:Job的調度模型與運行反饋》中Job的調度模型一致,都是依靠事件隊列eventProcessLoop來完成事件的調度執行的,這里,我們在事件隊列eventProcessLoop中放入了一個TaskSetFailed事件。在DAGScheduler的事件處理調度函數doOnReceive()方法中,明確規定了事件的處理方法,代碼如下:

?

?

[java]?view plain?copy ?
  1. //?如果是TaskSetFailed事件,調用dagScheduler.handleTaskSetFailed()方法處理??
  2. ????case?TaskSetFailed(taskSet,?reason,?exception)?=>??
  3. ??????dagScheduler.handleTaskSetFailed(taskSet,?reason,?exception)??

? ? ? ? 下面,我們看下handleTaskSetFailed()這個方法。

?

[java]?view plain?copy ?
  1. private[scheduler]?def?handleTaskSetFailed(??
  2. ??????taskSet:?TaskSet,??
  3. ??????reason:?String,??
  4. ??????exception:?Option[Throwable]):?Unit?=?{??
  5. ??????
  6. ????//?根據taskSet的stageId獲取到對應的Stage,循環調用abortStage,終止該Stage??
  7. ????stageIdToStage.get(taskSet.stageId).foreach?{?abortStage(_,?reason,?exception)?}??
  8. ??????
  9. ????//?提交等待的Stages??
  10. ????submitWaitingStages()??
  11. ??}??

? ? ? ? 很簡單,首先通過taskSet的stageId獲取到對應的Stage,針對Stage,循環調用abortStage()方法,終止該Stage,然后調用submitWaitingStages()方法提交等待的Stages。我們先看下abortStage()方法,代碼如下:

?

?

[java]?view plain?copy ?
  1. /**?
  2. ???*?Aborts?all?jobs?depending?on?a?particular?Stage.?This?is?called?in?response?to?a?task?set?
  3. ???*?being?canceled?by?the?TaskScheduler.?Use?taskSetFailed()?to?inject?this?event?from?outside.?
  4. ???*?終止給定Stage上的所有Job。?
  5. ???*/??
  6. ??private[scheduler]?def?abortStage(??
  7. ??????failedStage:?Stage,??
  8. ??????reason:?String,??
  9. ??????exception:?Option[Throwable]):?Unit?=?{??
  10. ??????
  11. ????//?如果stageIdToStage中不存在對應的stage,說明stage已經被移除,直接返回??
  12. ????if?(!stageIdToStage.contains(failedStage.id))?{??
  13. ??????//?Skip?all?the?actions?if?the?stage?has?been?removed.??
  14. ??????return??
  15. ????}??
  16. ??????
  17. ????//?遍歷activeJobs中的ActiveJob,逐個調用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs??
  18. ????val?dependentJobs:?Seq[ActiveJob]?=??
  19. ??????activeJobs.filter(job?=>?stageDependsOn(job.finalStage,?failedStage)).toSeq??
  20. ??????
  21. ????//?標記failedStage的完成時間completionTime??
  22. ????failedStage.latestInfo.completionTime?=?Some(clock.getTimeMillis())??
  23. ??????
  24. ????//?遍歷dependentJobs,調用failJobAndIndependentStages()??
  25. ????for?(job?<-?dependentJobs)?{??
  26. ??????failJobAndIndependentStages(job,?s"Job?aborted?due?to?stage?failure:?$reason",?exception)??
  27. ????}??
  28. ????if?(dependentJobs.isEmpty)?{??
  29. ??????logInfo("Ignoring?failure?of?"?+?failedStage?+?"?because?all?jobs?depending?on?it?are?done")??
  30. ????}??
  31. ??}??

? ? ? ? 這個方法的處理邏輯主要分為四步:

?

? ? ? ? 1、如果stageIdToStage中不存在對應的stage,說明stage已經被移除,直接返回,這是對異常情況下的一種特殊處理;

? ? ? ? 2、遍歷activeJobs中的ActiveJob,逐個調用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs;

? ? ? ? 3、標記failedStage的完成時間completionTime;

? ? ? ? 4、遍歷dependentJobs,調用failJobAndIndependentStages()。

? ? ? ? 其它都好說,我們主要看下stageDependsOn()和failJobAndIndependentStages()這兩個方法。首先看下stageDependsOn()方法,代碼如下:

?

[java]?view plain?copy ?
  1. /**?Return?true?if?one?of?stage's?ancestors?is?target.?*/??
  2. ??//?如果參數stage的祖先是target,返回true??
  3. ??private?def?stageDependsOn(stage:?Stage,?target:?Stage):?Boolean?=?{??
  4. ??????
  5. ????//?如果stage即為target,返回true??
  6. ????if?(stage?==?target)?{??
  7. ??????return?true??
  8. ????}??
  9. ??????
  10. ????//?存儲處理過的RDD??
  11. ????val?visitedRdds?=?new?HashSet[RDD[_]]??
  12. ??????
  13. ????//?We?are?manually?maintaining?a?stack?here?to?prevent?StackOverflowError??
  14. ????//?caused?by?recursively?visiting??
  15. ????//?存儲待處理的RDD??
  16. ????val?waitingForVisit?=?new?Stack[RDD[_]]??
  17. ??????
  18. ????//?定義一個visit()方法??
  19. ????def?visit(rdd:?RDD[_])?{??
  20. ??????//?如果該RDD未被處理過的話,繼續處理??
  21. ??????if?(!visitedRdds(rdd))?{??
  22. ????????//?將RDD添加到visitedRdds中??
  23. ????????visitedRdds?+=?rdd??
  24. ??????????
  25. ????????//?遍歷RDD的依賴??
  26. ????????for?(dep?<-?rdd.dependencies)?{??
  27. ??????????dep?match?{??
  28. ????????????//?如果是ShuffleDependency??
  29. ????????????case?shufDep:?ShuffleDependency[_,?_,?_]?=>??
  30. ??????????????
  31. ??????????????//?獲得mapStage,并且如果stage的isAvailable為false的話,將其壓入waitingForVisit??
  32. ??????????????val?mapStage?=?getShuffleMapStage(shufDep,?stage.firstJobId)??
  33. ??????????????if?(!mapStage.isAvailable)?{??
  34. ????????????????waitingForVisit.push(mapStage.rdd)??
  35. ??????????????}??//?Otherwise?there's?no?need?to?follow?the?dependency?back??
  36. ????????????//?如果是NarrowDependency,直接將其壓入waitingForVisit??
  37. ????????????case?narrowDep:?NarrowDependency[_]?=>??
  38. ??????????????waitingForVisit.push(narrowDep.rdd)??
  39. ??????????}??
  40. ????????}??
  41. ??????}??
  42. ????}??
  43. ??????
  44. ????//?從stage的rdd開始處理,將其入棧waitingForVisit??
  45. ????waitingForVisit.push(stage.rdd)??
  46. ??????
  47. ????//?當waitingForVisit中存在數據,就調用visit()方法進行處理??
  48. ????while?(waitingForVisit.nonEmpty)?{??
  49. ??????visit(waitingForVisit.pop())??
  50. ????}??
  51. ??????
  52. ????//?根據visitedRdds中是否存在target的rdd判斷參數stage的祖先是否為target??
  53. ????visitedRdds.contains(target.rdd)??
  54. ??}??

? ? ? ? 這個方法主要是判斷參數stage是否為參數target的祖先stage,其代碼風格與stage劃分和提交中的部分代碼一樣,這在前面的兩篇文章中也提到過,在此不再贅述。而它主要是通過stage的rdd,并遍歷其上層依賴的rdd鏈,將每個stage的rdd加入到visitedRdds中,最后根據visitedRdds中是否存在target的rdd判斷參數stage的祖先是否為target。值得一提的是,如果RDD的依賴是NarrowDependency,直接將其壓入waitingForVisit,如果為ShuffleDependency,則需要判斷stage的isAvailable,如果為false,則將對應RDD壓入waitingForVisit。關于isAvailable,我在《Spark源碼分析之四:Stage提交》一文中具體闡述過,這里不再贅述。

? ? ? ? 接下來,我們再看下failJobAndIndependentStages()方法,這個方法的主要作用就是使得一個Job和僅被該Job使用的所有stages失敗,并清空有關狀態。代碼如下:

?

[java]?view plain?copy ?
  1. /**?Fails?a?job?and?all?stages?that?are?only?used?by?that?job,?and?cleans?up?relevant?state.?*/??
  2. ??//?使得一個Job和僅被該Job使用的所有stages失敗,并清空有關狀態??
  3. ??private?def?failJobAndIndependentStages(??
  4. ??????job:?ActiveJob,??
  5. ??????failureReason:?String,??
  6. ??????exception:?Option[Throwable]?=?None):?Unit?=?{??
  7. ??????
  8. ????//?構造一個異常,內容為failureReason??
  9. ????val?error?=?new?SparkException(failureReason,?exception.getOrElse(null))??
  10. ??????
  11. ????//?標志位,是否能取消Stages??
  12. ????var?ableToCancelStages?=?true??
  13. ??
  14. ????//?標志位,是否應該中斷線程??
  15. ????val?shouldInterruptThread?=??
  16. ??????if?(job.properties?==?null)?false??
  17. ??????else?job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false").toBoolean??
  18. ??
  19. ????//?Cancel?all?independent,?running?stages.??
  20. ????//?取消所有獨立的,正在運行的stages??
  21. ??????
  22. ????//?根據Job的jobId,獲取其stages??
  23. ????val?stages?=?jobIdToStageIds(job.jobId)??
  24. ??????
  25. ????//?如果stages為空,記錄錯誤日志??
  26. ????if?(stages.isEmpty)?{??
  27. ??????logError("No?stages?registered?for?job?"?+?job.jobId)??
  28. ????}??
  29. ??????
  30. ????//?遍歷stages,循環處理??
  31. ????stages.foreach?{?stageId?=>??
  32. ????????
  33. ??????//?根據stageId,獲取jobsForStage,即每個Job所包含的Stage信息??
  34. ??????val?jobsForStage:?Option[HashSet[Int]]?=?stageIdToStage.get(stageId).map(_.jobIds)??
  35. ????????
  36. ??????//?首先處理異常情況,即jobsForStage為空,或者jobsForStage中不包含當前Job??
  37. ??????if?(jobsForStage.isEmpty?||?!jobsForStage.get.contains(job.jobId))?{??
  38. ????????logError(??
  39. ??????????"Job?%d?not?registered?for?stage?%d?even?though?that?stage?was?registered?for?the?job"??
  40. ????????????.format(job.jobId,?stageId))??
  41. ??????}?else?if?(jobsForStage.get.size?==?1)?{??
  42. ????????//?如果stageId對應的stage不存在??
  43. ????????if?(!stageIdToStage.contains(stageId))?{??
  44. ??????????logError(s"Missing?Stage?for?stage?with?id?$stageId")??
  45. ????????}?else?{??
  46. ??????????//?This?is?the?only?job?that?uses?this?stage,?so?fail?the?stage?if?it?is?running.??
  47. ??????????//???
  48. ??????????val?stage?=?stageIdToStage(stageId)??
  49. ??????????if?(runningStages.contains(stage))?{??
  50. ????????????try?{?//?cancelTasks?will?fail?if?a?SchedulerBackend?does?not?implement?killTask??
  51. ????????????????
  52. ??????????????//?調用taskScheduler的cancelTasks()方法,取消stage內的tasks??
  53. ??????????????taskScheduler.cancelTasks(stageId,?shouldInterruptThread)??
  54. ????????????????
  55. ??????????????//?標記Stage為完成??
  56. ??????????????markStageAsFinished(stage,?Some(failureReason))??
  57. ????????????}?catch?{??
  58. ??????????????case?e:?UnsupportedOperationException?=>??
  59. ????????????????logInfo(s"Could?not?cancel?tasks?for?stage?$stageId",?e)??
  60. ??????????????ableToCancelStages?=?false??
  61. ????????????}??
  62. ??????????}??
  63. ????????}??
  64. ??????}??
  65. ????}??
  66. ??
  67. ????if?(ableToCancelStages)?{//?如果能取消Stages??
  68. ??????
  69. ??????//?調用job監聽器的jobFailed()方法??
  70. ??????job.listener.jobFailed(error)??
  71. ????????
  72. ??????//?為Job和獨立Stages清空狀態,獨立Stages的意思為該stage僅為該Job使用??
  73. ??????cleanupStateForJobAndIndependentStages(job)??
  74. ????????
  75. ??????//?發送一個SparkListenerJobEnd事件??
  76. ??????listenerBus.post(SparkListenerJobEnd(job.jobId,?clock.getTimeMillis(),?JobFailed(error)))??
  77. ????}??
  78. ??}??

? ? ? ? 處理過程還是很簡單的,讀者可以通過上述源碼和注釋自行補腦,這里就先略過了。

?

? ? ? ? 下面,再說下正常情況下,即序列化后Task大小未超過上限時,LaunchTask事件的發送及executor端的響應。代碼再跳轉到CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的launchTasks()方法。正常情況下處理流程主要分為三大部分:

? ? ? ? 1、從executorDataMap中,根據task.executorId獲取executor描述信息executorData;

? ? ? ? 2、在executorData中,freeCores做相應減少;

? ? ? ? 3、利用executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件,LaunchTask事件中包含序列化后的task,將Task傳遞到executor中去執行。

? ? ? ? 我們重點看下第3步,利用Driver端持有的executor描述信息executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件給executor,將Task傳遞到executor中去執行。那么executor中是如何接收LaunchTask事件的呢?答案就在CoarseGrainedExecutorBackend中。

? ? ? ? 我們先說下這個CoarseGrainedExecutorBackend,類的定義如下所示:

?

[java]?view plain?copy ?
  1. private[spark]?class?CoarseGrainedExecutorBackend(??
  2. ????override?val?rpcEnv:?RpcEnv,??
  3. ????driverUrl:?String,??
  4. ????executorId:?String,??
  5. ????hostPort:?String,??
  6. ????cores:?Int,??
  7. ????userClassPath:?Seq[URL],??
  8. ????env:?SparkEnv)??
  9. ??extends?ThreadSafeRpcEndpoint?with?ExecutorBackend?with?Logging?{??

? ? ? ? 由上面的代碼我們可以知道,它實現了ThreadSafeRpcEndpoint和ExecutorBackend兩個trait,而ExecutorBackend的定義如下:

?

?

[java]?view plain?copy ?
  1. /**?
  2. ?*?A?pluggable?interface?used?by?the?Executor?to?send?updates?to?the?cluster?scheduler.?
  3. ?*?一個被Executor用來發送更新到集群調度器的可插拔接口。?
  4. ?*/??
  5. private[spark]?trait?ExecutorBackend?{??
  6. ????
  7. ??//?唯一的一個statusUpdate()方法??
  8. ??//?需要Long類型的taskId、TaskState類型的state、ByteBuffer類型的data三個參數??
  9. ??def?statusUpdate(taskId:?Long,?state:?TaskState,?data:?ByteBuffer)??
  10. }??

?

? ? ? ? 那么它自然就有兩種主要的任務,第一,作為endpoint提供driver與executor間的通訊功能;第二,提供了executor任務執行時狀態匯報的功能。

? ? ? ??CoarseGrainedExecutorBackend到底是什么呢?這里我們先不深究,留到以后分析,你只要知道它是Executor的一個后臺輔助進程,和Executor是一對一的關系,向Executor提供了與Driver通訊、任務執行時狀態匯報兩個基本功能即可。

? ? ? ? 接下來,我們看下CoarseGrainedExecutorBackend是如何處理LaunchTask事件的。做為RpcEndpoint,在其處理各類事件或消息的receive()方法中,定義如下:

?

[java]?view plain?copy ?
  1. case?LaunchTask(data)?=>??
  2. ??????if?(executor?==?null)?{??
  3. ????????logError("Received?LaunchTask?command?but?executor?was?null")??
  4. ????????System.exit(1)??
  5. ??????}?else?{??
  6. ????????
  7. ????????//?反序列話task,得到taskDesc??
  8. ????????val?taskDesc?=?ser.deserialize[TaskDescription](data.value)??
  9. ????????logInfo("Got?assigned?task?"?+?taskDesc.taskId)??
  10. ??????????
  11. ????????//?調用executor的launchTask()方法加載task??
  12. ????????executor.launchTask(this,?taskId?=?taskDesc.taskId,?attemptNumber?=?taskDesc.attemptNumber,??
  13. ??????????taskDesc.name,?taskDesc.serializedTask)??
  14. ??????}??

? ? ? ? 首先,會判斷對應的executor是否為空,為空的話,記錄錯誤日志并退出,不為空的話,則按照如下流程處理:

?

? ? ? ? 1、反序列話task,得到taskDesc;

? ? ? ? 2、調用executor的launchTask()方法加載task。

? ? ? ? 那么,重點就落在了Executor的launchTask()方法中,代碼如下:

?

[java]?view plain?copy ?
  1. def?launchTask(??
  2. ??????context:?ExecutorBackend,??
  3. ??????taskId:?Long,??
  4. ??????attemptNumber:?Int,??
  5. ??????taskName:?String,??
  6. ??????serializedTask:?ByteBuffer):?Unit?=?{??
  7. ????????
  8. ????//?新建一個TaskRunner??
  9. ????val?tr?=?new?TaskRunner(context,?taskId?=?taskId,?attemptNumber?=?attemptNumber,?taskName,??
  10. ??????serializedTask)??
  11. ????????
  12. ????//?將taskId與TaskRunner的對應關系存入runningTasks??
  13. ????runningTasks.put(taskId,?tr)??
  14. ??????
  15. ????//?線程池執行TaskRunner??
  16. ????threadPool.execute(tr)??
  17. ??}??

? ? ? ? 非常簡單,創建一個TaskRunner對象,然后將taskId與TaskRunner的對應關系存入runningTasks,將TaskRunner扔到線程池中去執行即可。

?

? ? ? ? 我們先看下這個TaskRunner類。我們先看下Class及其成員變量的定義,如下:

?

[java]?view plain?copy ?
  1. class?TaskRunner(??
  2. ??????execBackend:?ExecutorBackend,??
  3. ??????val?taskId:?Long,??
  4. ??????val?attemptNumber:?Int,??
  5. ??????taskName:?String,??
  6. ??????serializedTask:?ByteBuffer)??
  7. ????extends?Runnable?{??
  8. ??????
  9. ????//?TaskRunner繼承了Runnable??
  10. ??
  11. ????/**?Whether?this?task?has?been?killed.?*/??
  12. ????//?標志位,task是否被殺掉??
  13. ????@volatile?private?var?killed?=?false??
  14. ??
  15. ????/**?How?much?the?JVM?process?has?spent?in?GC?when?the?task?starts?to?run.?*/??
  16. ????@volatile?var?startGCTime:?Long?=?_??
  17. ??
  18. ????/**?
  19. ?????*?The?task?to?run.?This?will?be?set?in?run()?by?deserializing?the?task?binary?coming?
  20. ?????*?from?the?driver.?Once?it?is?set,?it?will?never?be?changed.?
  21. ?????*??
  22. ?????*?需要運行的task。它將在反序列化來自driver的task二進制數據時在run()方法被設置,一旦被設置,它將不會再發生改變。?
  23. ?????*/??
  24. ????@volatile?var?task:?Task[Any]?=?_??
  25. }??

? ? ? ? 由類的定義我們可以看出,TaskRunner繼承了Runnable,所以它本質上是一個線程,故其可以被放到線程池中去運行。它所包含的成員變量,主要有以下幾個:

?

? ? ? ? 1、execBackend:Executor后臺輔助進程,提供了與Driver通訊、狀態匯報等兩大基本功能,實際上傳入的是CoarseGrainedExecutorBackend實例;

? ? ? ? 2、taskId:Task的唯一標識;

? ? ? ? 3、attemptNumber:Task運行的序列號,Spark與MapReduce一樣,可以為拖后腿任務啟動備份任務,即推測執行原理,如此,就需要通過taskId加attemptNumber來唯一標識一個Task運行實例;

? ? ? ? 4、serializedTask:ByteBuffer類型,序列化后的Task,包含的是Task的內容,通過發序列化它來得到Task,并運行其中的run()方法來執行Task;

? ? ? ? 5、killed:Task是否被殺死的標志位;

? ? ? ? 6、task:Task[Any]類型,需要運行的Task,它將在反序列化來自driver的task二進制數據時在run()方法被設置,一旦被設置,它將不會再發生改變;

? ? ? ?7、startGCTime:JVM在task開始運行后,進行垃圾回收的時間。

? ? ? ? 另外,既然是一個線程,TaskRunner必須得提供run()方法,該run()方法就是TaskRunner線程在線程池中被調度時,需要執行的方法,我們來看下它的定義:

?

[java]?view plain?copy ?
  1. override?def?run():?Unit?=?{??
  2. ??????
  3. ??????//?Step1:Task及其運行時需要的輔助對象構造??
  4. ????????
  5. ??????//?獲取任務內存管理器??
  6. ??????val?taskMemoryManager?=?new?TaskMemoryManager(env.memoryManager,?taskId)??
  7. ????????
  8. ??????//?反序列化開始時間??
  9. ??????val?deserializeStartTime?=?System.currentTimeMillis()??
  10. ????????
  11. ??????//?當前線程設置上下文類加載器??
  12. ??????Thread.currentThread.setContextClassLoader(replClassLoader)??
  13. ????????
  14. ??????//?從SparkEnv中獲取序列化器??
  15. ??????val?ser?=?env.closureSerializer.newInstance()??
  16. ??????logInfo(s"Running?$taskName?(TID?$taskId)")??
  17. ????????
  18. ??????//?execBackend更新狀態TaskState.RUNNING??
  19. ??????execBackend.statusUpdate(taskId,?TaskState.RUNNING,?EMPTY_BYTE_BUFFER)??
  20. ??????var?taskStart:?Long?=?0??
  21. ????????
  22. ??????//?計算垃圾回收的時間??
  23. ??????startGCTime?=?computeTotalGcTime()??
  24. ??
  25. ??????try?{??
  26. ????????//?調用Task的deserializeWithDependencies()方法,反序列化Task,得到Task運行需要的文件taskFiles、jar包taskFiles和Task二進制數據taskBytes??
  27. ????????val?(taskFiles,?taskJars,?taskBytes)?=?Task.deserializeWithDependencies(serializedTask)??
  28. ????????updateDependencies(taskFiles,?taskJars)??
  29. ??????????
  30. ????????//?反序列化Task二進制數據taskBytes,得到task實例??
  31. ????????task?=?ser.deserialize[Task[Any]](taskBytes,?Thread.currentThread.getContextClassLoader)??
  32. ??????????
  33. ????????//?設置Task的任務內存管理器??
  34. ????????task.setTaskMemoryManager(taskMemoryManager)??
  35. ??
  36. ????????//?If?this?task?has?been?killed?before?we?deserialized?it,?let's?quit?now.?Otherwise,??
  37. ????????//?continue?executing?the?task.??
  38. ????????//?如果此時Task被kill,拋出異常,快速退出??
  39. ????????if?(killed)?{??
  40. ??????????//?Throw?an?exception?rather?than?returning,?because?returning?within?a?try{}?block??
  41. ??????????//?causes?a?NonLocalReturnControl?exception?to?be?thrown.?The?NonLocalReturnControl??
  42. ??????????//?exception?will?be?caught?by?the?catch?block,?leading?to?an?incorrect?ExceptionFailure??
  43. ??????????//?for?the?task.??
  44. ??????????throw?new?TaskKilledException??
  45. ????????}??
  46. ??
  47. ????????logDebug("Task?"?+?taskId?+?"'s?epoch?is?"?+?task.epoch)??
  48. ????????//?mapOutputTracker更新Epoch??
  49. ????????env.mapOutputTracker.updateEpoch(task.epoch)??
  50. ??
  51. ????????//?Run?the?actual?task?and?measure?its?runtime.??
  52. ????????//?運行真正的task,并度量它的運行時間??
  53. ??????????
  54. ????????//?Step2:Task運行??
  55. ??????????
  56. ????????//?task開始時間??
  57. ????????taskStart?=?System.currentTimeMillis()??
  58. ??????????
  59. ????????//?標志位threwException設置為true,標識Task真正執行過程中是否拋出異常??
  60. ????????var?threwException?=?true??
  61. ??????????
  62. ????????//?調用Task的run()方法,真正執行Task,并獲得運行結果value??
  63. ????????val?(value,?accumUpdates)?=?try?{??
  64. ??????????
  65. ??????????//?調用Task的run()方法,真正執行Task??
  66. ??????????val?res?=?task.run(??
  67. ????????????taskAttemptId?=?taskId,??
  68. ????????????attemptNumber?=?attemptNumber,??
  69. ????????????metricsSystem?=?env.metricsSystem)??
  70. ????????????
  71. ??????????//?標志位threwException設置為false??
  72. ??????????threwException?=?false??
  73. ????????????
  74. ??????????//?返回res,Task的run()方法中,res的定義為(T,?AccumulatorUpdates)??
  75. ??????????//?這里,前者為任務運行結果,后者為累加器更新??
  76. ??????????res??
  77. ????????}?finally?{??
  78. ????????????
  79. ??????????//?通過任務內存管理器清理所有的分配的內存??
  80. ??????????val?freedMemory?=?taskMemoryManager.cleanUpAllAllocatedMemory()??
  81. ??????????if?(freedMemory?>?0)?{??
  82. ????????????val?errMsg?=?s"Managed?memory?leak?detected;?size?=?$freedMemory?bytes,?TID?=?$taskId"??
  83. ????????????if?(conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak",?false)?&&?!threwException)?{??
  84. ??????????????throw?new?SparkException(errMsg)??
  85. ????????????}?else?{??
  86. ??????????????logError(errMsg)??
  87. ????????????}??
  88. ??????????}??
  89. ????????}??
  90. ??????????
  91. ????????//?task完成時間??
  92. ????????val?taskFinish?=?System.currentTimeMillis()??
  93. ??
  94. ????????//?If?the?task?has?been?killed,?let's?fail?it.??
  95. ????????//?如果task被殺死,拋出TaskKilledException異常??
  96. ????????if?(task.killed)?{??
  97. ??????????throw?new?TaskKilledException??
  98. ????????}??
  99. ??
  100. ????????//?Step3:Task運行結果處理??
  101. ??????????
  102. ????????//?通過Spark獲取Task運行結果序列化器??
  103. ????????val?resultSer?=?env.serializer.newInstance()??
  104. ??????????
  105. ????????//?結果序列化前的時間點??
  106. ????????val?beforeSerialization?=?System.currentTimeMillis()??
  107. ??????????
  108. ????????//?利用Task運行結果序列化器序列化Task運行結果,得到valueBytes??
  109. ????????val?valueBytes?=?resultSer.serialize(value)??
  110. ??????????
  111. ????????//?結果序列化后的時間點??
  112. ????????val?afterSerialization?=?System.currentTimeMillis()??
  113. ??
  114. ????????//?度量指標體系相關,暫不介紹??
  115. ????????for?(m?<-?task.metrics)?{??
  116. ??????????//?Deserialization?happens?in?two?parts:?first,?we?deserialize?a?Task?object,?which??
  117. ??????????//?includes?the?Partition.?Second,?Task.run()?deserializes?the?RDD?and?function?to?be?run.??
  118. ??????????m.setExecutorDeserializeTime(??
  119. ????????????(taskStart?-?deserializeStartTime)?+?task.executorDeserializeTime)??
  120. ??????????//?We?need?to?subtract?Task.run()'s?deserialization?time?to?avoid?double-counting??
  121. ??????????m.setExecutorRunTime((taskFinish?-?taskStart)?-?task.executorDeserializeTime)??
  122. ??????????m.setJvmGCTime(computeTotalGcTime()?-?startGCTime)??
  123. ??????????m.setResultSerializationTime(afterSerialization?-?beforeSerialization)??
  124. ??????????m.updateAccumulators()??
  125. ????????}??
  126. ??
  127. ????????//?構造DirectTaskResult,同時包含Task運行結果valueBytes和累加器更新值accumulator?updates??
  128. ????????val?directResult?=?new?DirectTaskResult(valueBytes,?accumUpdates,?task.metrics.orNull)??
  129. ??????????
  130. ????????//?序列化DirectTaskResult,得到serializedDirectResult??
  131. ????????val?serializedDirectResult?=?ser.serialize(directResult)??
  132. ??????????
  133. ????????//?獲取Task運行結果大小??
  134. ????????val?resultSize?=?serializedDirectResult.limit??
  135. ??
  136. ????????//?directSend?=?sending?directly?back?to?the?driver??
  137. ????????//?directSend的意思就是直接發送結果至Driver端??
  138. ????????val?serializedResult:?ByteBuffer?=?{??
  139. ??????????
  140. ??????????//?如果Task運行結果大小大于所有Task運行結果的最大大小,序列化IndirectTaskResult??
  141. ??????????//?IndirectTaskResult為存儲在Worker上BlockManager中DirectTaskResult的一個引用??
  142. ??????????if?(maxResultSize?>?0?&&?resultSize?>?maxResultSize)?{??
  143. ????????????logWarning(s"Finished?$taskName?(TID?$taskId).?Result?is?larger?than?maxResultSize?"?+??
  144. ??????????????s"(${Utils.bytesToString(resultSize)}?>?${Utils.bytesToString(maxResultSize)}),?"?+??
  145. ??????????????s"dropping?it.")??
  146. ????????????ser.serialize(new?IndirectTaskResult[Any](TaskResultBlockId(taskId),?resultSize))??
  147. ??????????}??
  148. ??????????//?如果?Task運行結果大小超過Akka除去需要保留的字節外最大大小,則將結果寫入BlockManager??
  149. ??????????//?即運行結果無法通過消息傳遞??
  150. ??????????else?if?(resultSize?>=?akkaFrameSize?-?AkkaUtils.reservedSizeBytes)?{??
  151. ??????????????
  152. ????????????val?blockId?=?TaskResultBlockId(taskId)??
  153. ????????????env.blockManager.putBytes(??
  154. ??????????????blockId,?serializedDirectResult,?StorageLevel.MEMORY_AND_DISK_SER)??
  155. ????????????logInfo(??
  156. ??????????????s"Finished?$taskName?(TID?$taskId).?$resultSize?bytes?result?sent?via?BlockManager)")??
  157. ????????????ser.serialize(new?IndirectTaskResult[Any](blockId,?resultSize))??
  158. ??????????}???
  159. ??????????//?Task運行結果比較小的話,直接返回,通過消息傳遞??
  160. ??????????else?{??
  161. ????????????logInfo(s"Finished?$taskName?(TID?$taskId).?$resultSize?bytes?result?sent?to?driver")??
  162. ????????????serializedDirectResult??
  163. ??????????}??
  164. ????????}??
  165. ??
  166. ????????//?execBackend更新狀態TaskState.FINISHED??
  167. ????????execBackend.statusUpdate(taskId,?TaskState.FINISHED,?serializedResult)??
  168. ??
  169. ??????}?catch?{//?處理各種異常信息??
  170. ??????????
  171. ????????case?ffe:?FetchFailedException?=>??
  172. ??????????val?reason?=?ffe.toTaskEndReason??
  173. ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?ser.serialize(reason))??
  174. ??
  175. ????????case?_:?TaskKilledException?|?_:?InterruptedException?if?task.killed?=>??
  176. ??????????logInfo(s"Executor?killed?$taskName?(TID?$taskId)")??
  177. ??????????execBackend.statusUpdate(taskId,?TaskState.KILLED,?ser.serialize(TaskKilled))??
  178. ??
  179. ????????case?cDE:?CommitDeniedException?=>??
  180. ??????????val?reason?=?cDE.toTaskEndReason??
  181. ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?ser.serialize(reason))??
  182. ??
  183. ????????case?t:?Throwable?=>??
  184. ??????????//?Attempt?to?exit?cleanly?by?informing?the?driver?of?our?failure.??
  185. ??????????//?If?anything?goes?wrong?(or?this?was?a?fatal?exception),?we?will?delegate?to??
  186. ??????????//?the?default?uncaught?exception?handler,?which?will?terminate?the?Executor.??
  187. ??????????logError(s"Exception?in?$taskName?(TID?$taskId)",?t)??
  188. ??
  189. ??????????val?metrics:?Option[TaskMetrics]?=?Option(task).flatMap?{?task?=>??
  190. ????????????task.metrics.map?{?m?=>??
  191. ??????????????m.setExecutorRunTime(System.currentTimeMillis()?-?taskStart)??
  192. ??????????????m.setJvmGCTime(computeTotalGcTime()?-?startGCTime)??
  193. ??????????????m.updateAccumulators()??
  194. ??????????????m??
  195. ????????????}??
  196. ??????????}??
  197. ??????????val?serializedTaskEndReason?=?{??
  198. ????????????try?{??
  199. ??????????????ser.serialize(new?ExceptionFailure(t,?metrics))??
  200. ????????????}?catch?{??
  201. ??????????????case?_:?NotSerializableException?=>??
  202. ????????????????//?t?is?not?serializable?so?just?send?the?stacktrace??
  203. ????????????????ser.serialize(new?ExceptionFailure(t,?metrics,?false))??
  204. ????????????}??
  205. ??????????}??
  206. ????????????
  207. ??????????//?execBackend更新狀態TaskState.FAILED??
  208. ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?serializedTaskEndReason)??
  209. ??
  210. ??????????//?Don't?forcibly?exit?unless?the?exception?was?inherently?fatal,?to?avoid??
  211. ??????????//?stopping?other?tasks?unnecessarily.??
  212. ??????????if?(Utils.isFatalError(t))?{??
  213. ????????????SparkUncaughtExceptionHandler.uncaughtException(t)??
  214. ??????????}??
  215. ??
  216. ??????}?finally?{??
  217. ????????
  218. ????????//?最后,無論運行成功還是失敗,將task從runningTasks中移除??
  219. ????????runningTasks.remove(taskId)??
  220. ??????}??
  221. ????}??

? ? ? ? 如此長的一個方法,好長好大,哈哈!不過,縱觀全篇,無非三個Step就可搞定:

?

? ? ? ? 1、Step1:Task及其運行時需要的輔助對象構造;

? ? ? ? 2、Step2:Task運行;

? ? ? ? 3、Step3:Task運行結果處理。

? ? ? ? 對, 就這么簡單!鑒于時間與篇幅問題,我們這里先講下主要流程,細節方面的東西留待下節繼續。

? ? ? ? 下面,我們一個個Step來看,首先看下Step1:Task及其運行時需要的輔助對象構造,主要包括以下步驟:

? ? ? ? 1.1、構造TaskMemoryManager任務內存管理器,即taskMemoryManager;

? ? ? ??1.2、記錄反序列化開始時間;

? ? ? ??1.3、當前線程設置上下文類加載器;

? ? ? ??1.4、從SparkEnv中獲取序列化器ser;

? ? ? ??1.5、execBackend更新狀態TaskState.RUNNING;

? ? ? ??1.6、計算垃圾回收時間;

? ? ? ??1.7、調用Task的deserializeWithDependencies()方法,反序列化Task,得到Task運行需要的文件taskFiles、jar包taskFiles和Task二進制數據taskBytes;

? ? ? ? 1.8、反序列化Task二進制數據taskBytes,得到task實例;

? ? ? ??1.9、設置Task的任務內存管理器;

? ? ? ? 1.10、如果此時Task被kill,拋出異常,快速退出;

? ? ? ??

? ? ? ? 接下來,是Step2:Task運行,主要流程如下:

? ? ? ??2.1、獲取task開始時間;

? ? ? ??2.2、標志位threwException設置為true,標識Task真正執行過程中是否拋出異常;

? ? ? ??2.3、調用Task的run()方法,真正執行Task,并獲得運行結果value,和累加器更新accumUpdates;

? ? ? ??2.4、標志位threwException設置為false;

? ? ? ??2.5、通過任務內存管理器taskMemoryManager清理所有的分配的內存;

? ? ? ??2.6、獲取task完成時間;

? ? ? ? 2.7、如果task被殺死,拋出TaskKilledException異常。

? ? ? ??

? ? ? ? 最后一步,Step3:Task運行結果處理,大體流程如下:

? ? ? ? 3.1、通過SparkEnv獲取Task運行結果序列化器;

? ? ? ? 3.2、獲取結果序列化前的時間點;

? ? ? ? 3.3、利用Task運行結果序列化器序列化Task運行結果value,得到valueBytes;

? ? ? ??3.4、獲取結果序列化后的時間點;

? ? ? ??3.5、度量指標體系相關,暫不介紹;

? ? ? ??3.6、構造DirectTaskResult,同時包含Task運行結果valueBytes和累加器更新值accumulator updates;

? ? ? ? 3.7、序列化DirectTaskResult,得到serializedDirectResult;

? ? ? ? 3.8、獲取Task運行結果大小;

? ? ? ? 3.9、處理Task運行結果:

? ? ? ? ? ? ? ? ?3.9.1、如果Task運行結果大小大于所有Task運行結果的最大大小,序列化IndirectTaskResult,IndirectTaskResult為存儲在Worker上BlockManager中DirectTaskResult的一個引用;

? ? ? ? ? ? ? ? ?3.9.2、如果 Task運行結果大小超過Akka除去需要保留的字節外最大大小,則將結果寫入BlockManager,Task運行結果比較小的話,直接返回,通過消息傳遞;

? ? ? ? ? ? ? ? ?3.9.3、Task運行結果比較小的話,直接返回,通過消息傳遞

? ? ? ??3.10、execBackend更新狀態TaskState.FINISHED;

? ? ? ??最后,無論運行成功還是失敗,將task從runningTasks中移除。

? ? ? ? 至此,Task的運行主體流程已經介紹完畢,剩余的部分細節,包括Task內run()方法的具體執行,還有任務內存管理器、序列化器、累加更新,還有部分異常情況處理,狀態匯報等等其他更為詳細的內容留到下篇再講吧!

? ? ? ? 明天還要工作,洗洗睡了!

?

博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50726216

轉載于:https://www.cnblogs.com/jirimutu01/p/5274461.html

總結

以上是生活随笔為你收集整理的Spark源码分析之七:Task运行(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

午夜久操 | 国产中文在线播放 | 久久综合五月 | 国产综合精品一区二区三区 | 久免费 | 青青草视频精品 | 丁香婷五月 | 午夜av片| 亚洲永久精品视频 | 国产高清视频免费观看 | 天天插综合 | 国产中文字幕在线免费观看 | 欧美一区二区日韩一区二区 | 97在线视频免费播放 | 日韩欧美中文 | 久久人人爽视频 | 在线av资源 | 免费国产黄线在线观看视频 | 亚洲电影自拍 | 国产日韩欧美精品在线观看 | 亚洲jizzjizz日本少妇 | 久草香蕉在线 | 国产精品久久久久影院日本 | 免费的黄色av| 天天艹天天干天天 | 成人超碰97 | 国产精品理论视频 | 久久精品一区二区 | 黄色三级在线观看 | 日本最新一区二区三区 | 亚洲黄色高清 | 91黄色小网站 | 婷婷色资源 | 欧美国产高清 | 在线a人片免费观看视频 | 日韩在线观看视频网站 | 蜜臀av性久久久久蜜臀aⅴ涩爱 | 久久av在线播放 | 国产成人精品综合 | 久久久国产电影 | 少妇啪啪av入口 | 久草在线资源网 | 国产精品毛片久久久久久久 | 久久久精品午夜 | 亚洲黄色免费电影 | bbbb操bbbb| 成人国产精品入口 | 黄色国产精品 | 日本少妇高清做爰视频 | 亚洲人成在线观看 | 天天干夜夜爱 | 日韩欧美电影在线 | 极品久久久久久久 | 日韩免费在线观看视频 | 天天射色综合 | 视频二区在线视频 | 福利视频一区二区 | 国产99久久 | 中文字幕人成人 | 欧美三级高清 | 国产精品久久久久久吹潮天美传媒 | 伊人天天干 | 午夜精品久久久久久中宇69 | 亚洲情感电影大片 | 黄污在线观看 | 亚洲精品乱码久久久久久久久久 | 久久精品理论 | 又爽又黄又无遮挡网站动态图 | 91av中文字幕 | 天天曰夜夜爽 | 日韩国产精品久久久久久亚洲 | 久久免费视频国产 | 色婷婷福利| 99色视频 | 国产成人久久久77777 | 97国产视频 | 免费一级片观看 | 99综合视频 | 深爱开心激情 | 三级大片网站 | 欧美日韩精品在线观看视频 | 日韩精品免费在线观看视频 | 国产成人精品亚洲a | 日韩免费电影一区二区三区 | 成人影片在线免费观看 | 亚洲专区在线 | 欧美色噜噜 | 久久久久久网站 | 久久精品一二三 | 99精品在线免费 | 久草视频在线资源站 | 国产特级毛片 | 17videosex性欧美| 最近日本字幕mv免费观看在线 | 婷婷去俺也去六月色 | 伊甸园av在线 | 三级av网站| 人操人| 日韩激情中文字幕 | 性色大片在线观看 | 少妇bbw撒尿 | 免费99精品国产自在在线 | 在线有码中文 | 成人一区影院 | 涩五月婷婷 | 蜜桃传媒一区二区 | 亚洲天堂首页 | 欧美另类v | 丁香视频在线观看 | 国产高清在线观看av | 午夜色影院 | 日韩电影一区二区在线 | 六月丁香激情综合色啪小说 | 中文字幕一区在线 | 在线看片成人 | 国产一区二区在线播放 | 久久综合一本 | 999久久国产| 人人涩 | 丁香婷婷激情网 | 久久超碰免费 | 国产色视频一区 | 婷婷色资源| 黄网在线免费观看 | 天堂久色| 国产色在线,com | 国产高清专区 | 黄色视屏在线免费观看 | 欧美色综合天天久久综合精品 | 免费观看福利视频 | 亚洲第五色综合网 | 国语麻豆 | 香蕉影视 | 免费观看视频的网站 | 亚洲成人黄色在线观看 | 国产视频中文字幕 | 一级一级一片免费 | 国产精品国产三级国产不产一地 | 国产黄色大片免费看 | 国产精品theporn| 五月开心综合 | 亚洲黄色软件 | 91中文字幕永久在线 | 99r国产精品 | 成人久久久精品国产乱码一区二区 | 日韩视频免费观看高清完整版在线 | 免费在线黄色av | 亚洲伊人第一页 | 字幕网在线观看 | 一级免费黄色 | 波多野结衣视频一区 | 在线免费色 | 国产又粗又猛又黄又爽的视频 | 亚洲 欧美 91 | 九九九九色| 伊人天堂网 | 国产色视频网站 | 久草在| 婷婷六月天在线 | 粉嫩av一区二区三区四区在线观看 | 99久热在线精品视频观看 | 久久婷婷网 | 91av视频播放 | 久草视频在线观 | 96亚洲精品久久 | 国产成人一级 | 蜜臀av.com | 日韩a在线观看 | 成人国产电影在线观看 | 亚洲第一中文字幕 | 精品国产一区二区三区久久 | 欧美一级性生活 | 日韩一级片大全 | 超碰在线免费97 | 在线观看成人福利 | 久久久高清一区二区三区 | 亚洲国内精品 | 中文字幕91在线 | 久久久久国产精品视频 | 97国产大学生情侣酒店的特点 | 欧美精品视 | 亚州中文av | 国产精品理论片在线播放 | 一区二区三区免费看 | 8x成人免费视频 | 毛片网站观看 | 天天干干 | 日本少妇视频 | 福利片免费看 | 99电影| 日韩视频在线一区 | 国产一级电影在线 | 九九热精品国产 | av手机在线播放 | 亚洲精品免费在线播放 | 色综合久久88色综合天天免费 | 9999在线视频 | 久久久久久国产精品久久 | 国产精品毛片一区二区三区 | 中国精品一区二区 | 国产麻豆精品一区 | 二区三区中文字幕 | 蜜臀精品久久久久久蜜臀 | 又黄又刺激视频 | 亚洲男男gⅴgay双龙 | 久久艹在线| 91精品久久久久久久91蜜桃 | 欧美国产日韩中文 | 午夜男人影院 | 亚洲激情久久 | 最新中文字幕在线资源 | 免费日韩高清 | 国产精品福利在线 | 日韩精品网址 | 久久电影中文字幕视频 | 91久久国产自产拍夜夜嗨 | av免费网站观看 | 国产精品2019 | 在线视频观看国产 | 激情亚洲综合在线 | 狠狠色丁香久久婷婷综合五月 | 在线韩国电影免费观影完整版 | 欧美精品一区二区在线播放 | 热久久免费视频精品 | 久久狠狠一本精品综合网 | 亚洲a成人v | 在线成人一区二区 | 91porny九色在线播放 | 欧美黄色软件 | 亚洲精品国产精品国自 | av片在线观看免费 | 亚洲五月花| 亚洲精品黄网站 | 国产精品亚 | 色网av| 91精品国产综合久久福利不卡 | 韩国一区二区三区视频 | www婷婷 | 在线成人一区 | 日韩一级片网址 | 深夜国产在线 | 92国产精品久久久久首页 | 中文字幕超清在线免费 | av在线电影播放 | 一二三久久久 | 中文字幕第一页在线 | 在线 国产 亚洲 欧美 | 91精品视频免费 | 四虎影视8848dvd | 成人 亚洲 欧美 | 97精品超碰一区二区三区 | 精品视频999 | 久久久久国产成人精品亚洲午夜 | av激情五月| www.五月婷婷 | 美女黄频网站 | 欧美成人中文字幕 | 亚洲精品国产精品国自产 | 色88久久 | 日韩首页 | 亚洲一区二区三区在线看 | 国产成人综合精品 | 奇米四色影狠狠爱7777 | 日韩免费在线观看视频 | 国产精品欧美久久久久无广告 | 国产一区成人在线 | 91视频在线看 | 日日夜夜av | 在线观看视频一区二区 | 欧美一区二区三区免费观看 | av一级免费 | 日韩av在线影视 | 99国产成+人+综合+亚洲 欧美 | 一区二区亚洲精品 | 国产在线污 | 精品国产亚洲一区二区麻豆 | 中文字幕在线播放一区 | 麻豆首页 | 国产一在线精品一区在线观看 | 精品色综合 | 国产一区二区成人 | 亚洲美女免费精品视频在线观看 | 午夜精品久久久久久中宇69 | 蜜臀一区二区三区精品免费视频 | 亚洲综合五月天 | 国产成人免费av电影 | 国产在线欧美 | 亚洲一区久久 | 欧美成a人片在线观看久 | 在线免费高清一区二区三区 | 国产999在线 | 精品国产免费av | 国产成人精品福利 | 欧美日韩高清一区二区 国产亚洲免费看 | 91在线国产观看 | 国产伦理一区 | 国产亚洲精品久久久久久移动网络 | 国产亚洲精品成人 | 国产黄网在线 | 波多野结依在线观看 | 激情五月av | 日韩亚洲国产精品 | 久久久这里有精品 | 婷婷久久综合九色综合 | 九九爱免费视频 | 天天操天天操天天操天天 | 91福利视频免费 | 91精品国产一区二区在线观看 | 久久一区二| 九九热精品视频在线观看 | 久久免费看视频 | 午夜av日韩| 国产精品美女www爽爽爽视频 | 亚洲高清国产视频 | av中文在线 | 香蕉影院在线播放 | 日日日日| 人人舔人人干 | 91视频观看免费 | 国产亚洲日本 | 97在线免费观看视频 | 国产日产欧美在线观看 | 国产精品国产亚洲精品看不卡15 | 久久成人资源 | 亚洲国产精品一区二区久久,亚洲午夜 | 一级做a视频 | 久久久成人精品 | 久久精品中文 | 韩日精品在线 | 91视频久久久久久 | 久久夜夜操 | 国产v亚洲v | 国产精品嫩草在线 | 三级黄色大片在线观看 | 香蕉97视频观看在线观看 | 丝袜美女视频网站 | 免费在线国产视频 | 精品国产一区二 | av黄色av| 中文字幕在线播放一区 | 99久久影视 | 精品国产乱码一区二 | 国产亚洲精品久久久久久无几年桃 | 国产成人精品在线观看 | 中文字幕视频一区 | 97超碰人人澡人人爱学生 | 欧美黄色特级片 | 97国产| 国产91亚洲精品 | 亚洲综合色网站 | 国产成人精品国内自产拍免费看 | 97在线观看免费高清完整版在线观看 | 日韩美女免费线视频 | 国产亚洲视频系列 | 99精品在这里 | 在线影院中文字幕 | 天天爽夜夜爽人人爽一区二区 | 亚洲人人爱 | 韩日三级av| 99视频精品免费观看, | 亚洲精品视频在线观看网站 | 欧美精品在线一区二区 | 国产精品区在线观看 | 国产小视频91 | 亚洲国产精品视频在线观看 | 久久综合久色欧美综合狠狠 | 成人影视免费看 | 国产精品久久久久久电影 | 国产麻豆精品免费视频 | av大片免费| 国产一区在线免费 | 亚洲成人精品影院 | 亚洲影院天堂 | 久久久午夜电影 | 亚洲精品永久免费视频 | 一级免费观看 | 黄a在线看 | 97品白浆高清久久久久久 | 国产中文字幕视频在线观看 | 综合网伊人 | 黄色avwww | 欧美日韩中 | 日韩精品一区二区三区在线视频 | 午夜精品电影一区二区在线 | 亚洲精品视频免费在线 | 久久精品在线免费观看 | 亚洲第一av在线 | 久久免费视频99 | 黄色视屏在线免费观看 | 亚洲五月六月 | 91久久精品日日躁夜夜躁国产 | 亚洲无吗av | 国产精品一区二区中文字幕 | av免费线看 | 最新国产精品拍自在线播放 | 欧洲一区二区在线观看 | 国产精品一区二区在线观看 | 国产精品美女久久久久久久久 | 亚洲精品视频在线看 | 久久精品亚洲精品国产欧美 | 国产一在线精品一区在线观看 | 一区二区三区高清不卡 | 国产精品久久久久久久免费 | 国产69精品久久久久99尤 | 91网页版免费观看 | 插综合网 | 免费看黄色小说的网站 | 在线视频欧美日韩 | 99r在线 | 国产精品免费久久 | 成人一区二区在线 | 国产一区二区视频在线 | 久久久久久久久久久精 | 日韩视频免费 | 美女久久视频 | 国产免费视频一区二区裸体 | 国产91对白在线播 | 麻豆91网站 | 日韩在线观看影院 | 不卡精品 | 国产亚洲久一区二区 | 日韩久久精品一区二区三区 | 中文字幕一区二区在线播放 | 日韩黄视频 | 91最新网址在线观看 | 成年人看片网站 | 国产午夜视频在线观看 | 亚洲作爱视频 | 97在线视 | 久久久精品视频网站 | 在线观看91网站 | 国产xx在线 | 免费成人在线观看视频 | 最新中文字幕在线播放 | av超碰在线 | 日韩免费在线网站 | 中文字幕中文字幕 | 亚洲精品av中文字幕在线在线 | 亚洲色五月| 91精品欧美一区二区三区 | 永久免费精品视频 | www在线观看视频 | 亚洲精品中文字幕视频 | 天天综合网在线观看 | 国产高清福利在线 | 日韩在线网址 | 狠狠的干狠狠的操 | 免费观看一级视频 | 免费观看一区二区 | 国产精品18久久久 | 欧美老女人xx | 久久激情婷婷 | 精品一区在线看 | 精品国产aⅴ一区二区三区 在线直播av | 国产精品久久久久久久电影 | 91成人黄色 | 色视频在线免费 | av大片免费在线观看 | 久久久久久久久久免费 | 国内视频 | 久草在线在线精品观看 | 黄色日本片| 国产一级淫片在线观看 | 69久久夜色精品国产69 | www蜜桃视频 | 国产打女人屁股调教97 | 精品视频在线观看 | 久久超级碰| 免费观看视频黄 | 亚洲一级片av | 国产在线观看高清视频 | 三级av在线免费观看 | 成人国产亚洲 | 久久免费美女视频 | 成人一级片在线观看 | 亚洲免费观看在线视频 | 天天干天天干天天操 | 免费在线观看午夜视频 | 亚洲精品在线视频观看 | 欧美精品一区二区蜜臀亚洲 | 超级碰碰免费视频 | 色综合久久五月 | 国产精品第三页 | 中文字幕在线网 | 999久久久久久久久久久 | 久久美女免费视频 | 天天插夜夜操 | 色视频网站在线观看一=区 a视频免费在线观看 | 亚洲精品麻豆视频 | 精品1区二区 | 四虎在线免费观看 | www.少妇| 色www免费视频 | 国产在线精品国自产拍影院 | 一本一本久久a久久精品综合妖精 | 日韩在线观看不卡 | 亚洲精区二区三区四区麻豆 | 日韩电影中文,亚洲精品乱码 | 干干操操 | 中文字幕av电影下载 | 亚洲免费小视频 | 国产视频 亚洲视频 | 国产精品不卡在线观看 | 97在线视频免费看 | 国产午夜精品一区二区三区嫩草 | 香蕉在线视频播放网站 | 久久国产电影院 | 亚洲最大免费成人网 | 欧美成人猛片 | 激情综合网五月婷婷 | 久久久午夜视频 | 精品一区二区三区四区在线 | 91久久精| 99理论片| 日韩免费在线观看 | 精品在线视频一区 | 欧美激情视频一二三区 | 国产一级二级在线播放 | 天天操天天拍 | 天天曰| 九九九国产 | 久久久亚洲麻豆日韩精品一区三区 | 久久精品电影院 | 91在线区 | 一区二区三区在线不卡 | 成人av在线播放网站 | 国产免费一区二区三区网站免费 | 一区二区三区电影在线播 | 最近的中文字幕大全免费版 | 99爱在线| 91桃花视频 | 亚洲精品国产欧美在线观看 | 久草视频免费在线观看 | 69精品视频在线观看 | 手机成人av | 六月丁香激情综合色啪小说 | 久久成人免费电影 | 天天操天天添天天吹 | av电影在线免费观看 | 久久字幕精品一区 | 五月激情天 | 久久国产香蕉视频 | 在线观看黄网 | 不卡av电影在线 | 婷婷色网| 久久久久国产精品一区二区 | 日韩资源在线 | 中文字幕在线播放一区二区 | 日本精品午夜 | 亚洲婷婷综合色高清在线 | 精品主播网红福利资源观看 | 激情久久一区二区三区 | 91麻豆精品国产91 | 成人av在线网址 | 欧美成人一区二区 | 日韩欧美观看 | 一区二区三区免费在线播放 | 久久这里只精品 | 亚洲精品网址在线观看 | 91夜夜夜| 激情av网 | 国产成人精品一区二区三区福利 | 麻豆视频大全 | 九九精品视频在线观看 | 亚洲片在线观看 | 精品二区视频 | 97超碰在线资源 | 国产综合婷婷 | a在线免费观看视频 | 日韩精品一区二区三区第95 | 久久久久久久影院 | 欧美精品在线观看免费 | 日女人电影 | av电影久久 | 久草在线资源免费 | 黄色av观看| 99热官网 | 伊人六月 | 亚洲精品美女久久久久网站 | 久久久午夜精品理论片中文字幕 | 一级一片免费视频 | av电影 一区二区 | 亚洲视频 视频在线 | 日韩欧美高清免费 | 蜜桃视频成人在线观看 | 97精品视频在线 | 国产淫片| 日韩欧美在线播放 | 亚洲国产一区二区精品专区 | 精品国产激情 | 国产精品乱码久久久 | 手机av在线网站 | 一区二区网 | 亚洲精品动漫成人3d无尽在线 | 国产亚洲激情视频在线 | 狠狠干美女 | 欧洲精品视频一区 | 亚洲久草网| 天堂av网址 | 91av精品| 在线观看亚洲专区 | 午夜精品av在线 | 色哟哟国产精品 | 在线观看亚洲精品 | 中文免费观看 | 久久最新网址 | 国产尤物在线观看 | 免费看特级毛片 | 99视频导航| 国产福利91精品一区 | 香蕉视频亚洲 | 伊人天堂久久 | 制服丝袜在线 | 天天干天天弄 | 亚洲另类视频在线观看 | 色天天综合久久久久综合片 | 日本69hd| 国产精品色在线 | 福利视频| 91av在线播放视频 | 久久久污 | 人人插人人舔 | 色之综合网 | 日韩区欠美精品av视频 | 国产精品k频道 | 免费看一级一片 | 91高清在线 | 久久天天躁狠狠躁夜夜不卡公司 | 91爱爱免费观看 | 香蕉久草 | 草莓视频在线观看免费观看 | 欧美日韩一区二区视频在线观看 | 夜夜爱av| 免费三级a | 亚洲精品动漫成人3d无尽在线 | se婷婷| 久久草在线免费 | 亚洲在线不卡 | 又黄又爽又湿又无遮挡的在线视频 | av成年人电影 | 色综合天天色 | 91精品在线免费 | 成人福利在线观看 | 天天干,狠狠干 | 精品久久久久免费极品大片 | av在线一级 | 久久久久北条麻妃免费看 | 91精品国自产在线观看欧美 | 久久精品5 | 亚洲a网 | 国产在线播放一区二区三区 | 日韩毛片久久久 | 粉嫩高清一区二区三区 | 日韩一级电影在线观看 | 亚洲最大av| 国产在线视频不卡 | 免费能看的黄色片 | 福利视频一二区 | 91丨九色丨国产在线 | 在线观看黄色免费视频 | 天天操天天爱天天爽 | 日本激情视频中文字幕 | 五月婷婷在线播放 | 久久免费视频在线 | 2020天天干夜夜爽 | 激情视频国产 | 久久香蕉国产 | 国产精品嫩草影院123 | 亚洲一区二区三区在线看 | 日韩sese | 欧美在线视频一区二区三区 | 欧美日韩亚洲在线观看 | 最新中文字幕在线播放 | 国产 在线观看 | 国产精品99久久久久久宅男 | 国产亚洲精品免费 | 香蕉视频久久久 | 中文字幕国语官网在线视频 | 18国产精品白浆在线观看免费 | 免费看污在线观看 | 韩日电影在线观看 | 久久大香线蕉app | 日韩欧美精品在线观看视频 | 国产精品日韩久久久久 | 欧美性生活免费 | 免费在线观看一区 | 欧美久久久久久久久久久久 | 国产精品久久久久久久久久免费看 | 免费观看www视频 | av片子在线观看 | 欧美成人性战久久 | 日本韩国在线不卡 | 91看片麻豆| 四虎影视国产精品免费久久 | 亚洲最新精品 | 久久精品欧美日韩精品 | 久久精品一区二区三区中文字幕 | 国产精品一区二区精品视频免费看 | 在线观看91精品国产网站 | 丝袜av网站 | 美女搞黄国产视频网站 | 日韩精品中文字幕在线不卡尤物 | 国产精品久久久影视 | 99久久日韩精品视频免费在线观看 | 人人涩 | 日韩一区二区三免费高清在线观看 | 欧美大香线蕉线伊人久久 | 亚洲成av人片在线观看香蕉 | 久青草国产在线 | 国产欧美日韩一区 | 亚洲黑丝少妇 | 五月婷婷天堂 | 免费黄a| 91香蕉视频黄色 | 久草在线观看 | 正在播放国产一区 | 国产在线国产 | 欧美性护士 | 亚洲一区二区三区精品在线观看 | 精品国产免费一区二区三区五区 | 国产亚洲成人网 | 午夜视频在线观看一区二区三区 | 国产又黄又猛又粗 | 国产一级性生活视频 | 99re国产 | 亚洲精区二区三区四区麻豆 | 狠狠操狠狠干2017 | 黄免费在线观看 | 午夜视频在线网站 | 黄色在线成人 | 亚洲国产成人精品在线观看 | 色在线中文字幕 | 亚州激情视频 | 二区精品视频 | 国产精彩在线视频 | 日韩电影精品 | 欧美无极色 | 国产一区二区在线播放 | 91九色在线视频观看 | 97日日| 91热精品 | 91视频首页 | 国产在线更新 | 成人久久久久久久久 | 亚洲欧洲成人精品av97 | 99久久一区 | 成人九九视频 | 亚洲精品视频大全 | 日韩r级电影在线观看 | 五月综合 | av色一区 | 亚洲精品短视频 | av三区在线 | 亚洲欧美日本国产 | 精精国产xxxx视频在线播放 | 国产精品国产亚洲精品看不卡15 | 欧美日韩国产一区二 | 国产麻豆果冻传媒在线观看 | 国产中年夫妇高潮精品视频 | 国产中文字幕在线 | 日本高清免费中文字幕 | 久久热首页 | 国产精品美女久久久久久久久久久 | 97超碰在线资源 | 国产精品一区在线 | 欧美日韩一区二区三区视频 | www.夜夜操 | 久久久综合香蕉尹人综合网 | 激情av综合 | 成人av.com | 日韩一区二区三区视频在线 | 亚洲欧美激情精品一区二区 | 性色av免费在线观看 | 国产精品一区免费看8c0m | 最新影院| 日韩av午夜在线观看 | 黄网av在线| 午夜视频日本 | 久久久久久美女 | 狠狠躁日日躁 | 国产精品一区二区电影 | 亚洲精品免费播放 | 欧美日韩午夜 | 亚洲精品videossex少妇 | 日韩中文字幕在线不卡 | 国产精品一区二区三区在线看 | 国产裸体永久免费视频网站 | 99精品国产一区二区三区麻豆 | 亚洲精品乱码久久久久久写真 | 欧美 亚洲 另类 激情 另类 | 91在线porny国产在线看 | 在线观看一区 | 国产精品热视频 | 国产高清福利在线 | 亚洲一区视频免费观看 | 免费av看片| 婷婷射五月 | 久久久久激情 | 欧美日韩中文在线 | 国产在线播放一区 | 日韩高清在线一区二区 | 午夜久久网站 | 爱爱一区| 成人国产精品久久久春色 | 国产又粗又猛又黄又爽的视频 | 一级久久精品 | 国产精品片 | 亚洲色综合 | 久久久亚洲麻豆日韩精品一区三区 | 欧美另类高潮 | 久久久久久久久黄色 | 中文字幕在线观看的网站 | 国产精品一区二区久久精品 | 中文字幕在线观看不卡 | 天天综合操 | 婷婷久久国产 | 亚洲人毛片 | 亚洲色图激情文学 | 91视频中文字幕 | 九九九九精品九九九九 | 视频 国产区 | 国产精品观看在线亚洲人成网 | 日韩免费电影 | 91麻豆精品国产91久久久久久久久 | 久久手机精品视频 | 久草香蕉在线 | av中文在线播放 | 狠狠操在线 | 久久视频免费 | 成人性生交大片免费观看网站 | 日韩 国产 | 日韩在线小视频 | 成人一级免费电影 | 国产九九在线 | 中文字幕日本在线 | 天天干天天射天天插 | 97电影网手机版 | 少妇性aaaaaaaaa视频 | 精品国产乱码久久久久久天美 | 日韩在线观看一区 | 亚洲精品成人免费 | 日韩美在线观看 | 麻豆94tv免费版 | 天堂av免费观看 | 国产精品系列在线观看 | 国产精品99在线播放 | 99久久精品免费看国产一区二区三区 | 91在线视频在线观看 | 国产精品h在线观看 | 婷婷色 亚洲| 91香蕉嫩草 | 中文资源在线播放 | 欧美精品黑人性xxxx | 91在线成人 | 国产精品网红直播 | 97人人添人澡人人爽超碰动图 | 中国美女一级看片 | 91九色视频 | 久久久国产一区 | 亚洲精品乱码久久久久久蜜桃动漫 | 91丨九色丨蝌蚪丨对白 | 五月婷婷欧美视频 | 久久国产露脸精品国产 | 操高跟美女| 午夜 在线 | 国产午夜精品一区二区三区欧美 | 久久国产手机看片 | 九九视频免费观看视频精品 | 777视频在线观看 | 99国产免费网址 | 97精品一区二区三区 | 一级免费片 | 青青河边草观看完整版高清 | www.色午夜,com | av免费网站在线观看 | 国产精品欧美日韩在线观看 | 国产精品一区二区av麻豆 | 精品久久久久久久久久 | 午夜精品一区二区三区在线视频 | 欧美黄污视频 | 欧美一级大片在线观看 | 最新av电影网址 | 国产99久久九九精品免费 | 亚洲精品永久免费视频 | 一本色道久久综合亚洲二区三区 | 少妇激情久久 | 在线视频欧美精品 | 视频1区2区| 色在线亚洲 | 伊人狠狠 | 精品国产一区二区三区久久久蜜月 | 人人干人人超 | 国产日韩欧美在线看 | 三上悠亚一区二区在线观看 | 欧美韩日精品 | 国产午夜一区 | 美女久久精品 | 日韩一区二区三区观看 | 国产成人精品日本亚洲999 | 久久婷婷国产色一区二区三区 | 天天干天天上 | 蜜臀久久99静品久久久久久 | 欧美一区二区精品在线 | 久草在线电影网 | 人人爽人人爽人人片av | 久草免费在线视频 | 日韩免费不卡av | 久久精品美女视频 | 97视频免费播放 | 亚洲欧洲成人精品av97 | 不卡的av片 | 国产精品一区二区av日韩在线 | 国产在线精品区 | 国产精品中文在线 | 亚洲精品午夜久久久久久久 | 国产精品99久久99久久久二8 | 99久久精品免费看国产免费软件 | 亚洲精品国产第一综合99久久 | 成人在线播放网站 | 蜜桃视频在线视频 | 亚洲精品综合欧美二区变态 | 国产高清在线不卡 | 激情大尺度视频 | 91精品在线免费观看 | 五月婷香蕉久色在线看 | 精品三级av| 最近中文字幕高清字幕在线视频 | 欧美a视频在线观看 | 久久露脸国产精品 | 在线观看久久 | 美女国产免费 | 国产九色在线播放九色 | 在线观看中文字幕第一页 | 超碰在线免费97 | 亚洲国产一区二区精品专区 | 免费男女羞羞的视频网站中文字幕 | 天天干,天天草 | 91精品国产高清自在线观看 | 中文超碰字幕 | 色婷婷激情四射 | 久久人人爽视频 | 成人网色| 天天色播| 黄色小说免费在线观看 | 性色大片在线观看 | 69人人 | 一区二区三区日韩在线 | 久草电影在线观看 | 天天干天天操天天拍 | 久久久久国产精品一区二区 | 久久精品视频免费播放 | 精品久久久久久一区二区里番 | 亚洲理论片在线观看 | 97人人澡人人爽人人模亚洲 | 欧美精品亚州精品 | 久久久久久美女 | www.黄色片网站| 一级国产视频 | 久草在线一免费新视频 | 久久成人久久 | 手机成人在线 | 久久精品99精品国产香蕉 | 91干干干 | 在线黄色av| 成人中文字幕+乱码+中文字幕 | 亚洲做受高潮欧美裸体 | 久久精品日产第一区二区三区乱码 | 国产一及片| 日日操天天爽 | 91麻豆精品国产自产在线游戏 | 日本特黄特色aaa大片免费 | 夜夜爽www | 亚洲视频免费 | 中文字幕第 | 91免费版在线 | 日韩视频免费观看高清完整版在线 | 天天操夜夜看 | 天天色天| 波多野结衣电影一区二区三区 | 久久视频在线免费观看 | 亚洲免费观看视频 | 久久久久久久久久福利 | 天天鲁一鲁摸一摸爽一爽 | 毛片永久免费 | 91麻豆精品国产91久久久久久 | 国产流白浆高潮在线观看 | 亚洲久草在线 | 精品欧美一区二区在线观看 | 国产流白浆高潮在线观看 | 久久精品美女 | 9i看片成人免费看片 | 天天干天天做 | 亚洲国产精品资源 | 激情五月婷婷 | 久久精品老司机 | 国产又黄又爽又猛视频日本 | 日韩免费在线观看网站 | 18网站在线观看 |