日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【转】Spark源码分析之-scheduler模块

發布時間:2023/12/1 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【转】Spark源码分析之-scheduler模块 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文地址:http://jerryshao.me/architecture/2013/04/21/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B-scheduler%E6%A8%A1%E5%9D%97/

?

?

Background

Spark在資源管理和調度方式上采用了類似于Hadoop?YARN的方式,最上層是資源調度器,它負責分配資源和調度注冊到Spark中的所有應用,Spark選用Mesos或是YARN等作為其資源調度框架。在每一個應用內部,Spark又實現了任務調度器,負責任務的調度和協調,類似于MapReduce。本質上,外層的資源調度和內層的任務調度相互獨立,各司其職。本文對于Spark的源碼分析主要集中在內層的任務調度器上,分析Spark任務調度器的實現。

Scheduler模塊整體架構

scheduler模塊主要分為兩大部分:

  • TaskSchedulerListener。TaskSchedulerListener部分的主要功能是監聽用戶提交的job,將job分解為不同的類型的stage以及相應的task,并向TaskScheduler提交task。

  • TaskScheduler。TaskScheduler接收用戶提交的task并執行。而TaskScheduler根據部署的不同又分為三個子模塊:

    • ClusterScheduler
    • LocalScheduler
    • MesosScheduler
  • TaskSchedulerListener

    Spark抽象了TaskSchedulerListener并在其上實現了DAGScheduler。DAGScheduler的主要功能是接收用戶提交的job,將job根據類型劃分為不同的stage,并在每一個stage內產生一系列的task,向TaskScheduler提交task。下面我們首先來看一下TaskSchedulerListener部分的類圖:

    • 用戶所提交的job在得到DAGScheduler的調度后,會被包裝成ActiveJob,同時會啟動JobWaiter阻塞監聽job的完成狀況。
    • 于此同時依據job中RDD的dependency和dependency屬性(NarrowDependency,ShufflerDependecy),DAGScheduler會根據依賴關系的先后產生出不同的stage DAG(result stage, shuffle map stage)。
    • 在每一個stage內部,根據stage產生出相應的task,包括ResultTask或是ShuffleMapTask,這些task會根據RDD中partition的數量和分布,產生出一組相應的task,并將其包裝為TaskSet提交到TaskScheduler上去。

    RDD的依賴關系和Stage的分類

    在Spark中,每一個RDD是對于數據集在某一狀態下的表現形式,而這個狀態有可能是從前一狀態轉換而來的,因此換句話說這一個RDD有可能與之前的RDD(s)有依賴關系。根據依賴關系的不同,可以將RDD分成兩種不同的類型:Narrow Dependency和Wide Dependency。

    • Narrow Dependency指的是?child RDD只依賴于parent RDD(s)固定數量的partition。
    • Wide Dependency指的是child RDD的每一個partition都依賴于parent RDD(s)所有partition。

    它們之間的區別可參看下圖:

    根據RDD依賴關系的不同,Spark也將每一個job分為不同的stage,而stage之間的依賴關系則形成了DAG。對于Narrow Dependency,Spark會盡量多地將RDD轉換放在同一個stage中;而對于Wide Dependency,由于Wide Dependency通常意味著shuffle操作,因此Spark會將此stage定義為ShuffleMapStage,以便于向MapOutputTracker注冊shuffle操作。對于stage的劃分可參看下圖,Spark通常將shuffle操作定義為stage的邊界。

    DAGScheduler

    在用戶創建SparkContext對象時,Spark會在內部創建DAGScheduler對象,并根據用戶的部署情況,綁定不同的TaskSechduler,并啟動DAGcheduler

    private var taskScheduler: TaskScheduler = {//... } taskScheduler.start() private var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start()

    而DAGScheduler的啟動會在內部創建daemon線程,daemon線程調用run()從block queue中取出event進行處理。

    而run()會調用processEvent來處理不同的event。?

    private def run() {SparkEnv.set(env)while (true) {val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)if (event != null) {logDebug("Got event of type " + event.getClass.getName)}if (event != null) {if (processEvent(event)) {return}}val time = System.currentTimeMillis() // TODO: use a pluggable clock for testabilityif (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {resubmitFailedStages()} else {submitWaitingStages()}} }

    ?

    DAGScheduler處理的event包括:

    • JobSubmitted
    • CompletionEvent
    • ExecutorLost
    • TaskFailed
    • StopDAGScheduler

    根據event的不同調用不同的方法去處理。

    本質上DAGScheduler是一個生產者-消費者模型,用戶和TaskSchduler產生event將其放入block queue,daemon線程消費event并處理相應事件。

    Job的生與死

    既然用戶提交的job最終會交由DAGScheduler去處理,那么我們就來研究一下DAGScheduler處理job的整個流程。在這里我們分析兩種不同類型的job的處理流程。

    1.沒有shuffle和reduce的job

    val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()

    ?

    2.有shuffle和reduce的job

    val textFile = sc.textFile("README.md")textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)

    ?

    首先在對RDD的count()和reduceByKey()操作都會調用SparkContext的runJob()來提交job,而SparkContext的runJob()最終會調用DAGScheduler的runJob()

    runJob()會調用prepareJob()對job進行預處理,封裝成JobSubmitted事件,放入queue中,并阻塞等待job完成

    def runJob[T, U: ClassManifest](finalRdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: String,allowLocal: Boolean,resultHandler: (Int, U) => Unit) {if (partitions.size == 0) {return}val (toSubmit, waiter) = prepareJob(finalRdd, func, partitions, callSite, allowLocal, resultHandler)eventQueue.put(toSubmit)waiter.awaitResult() match {case JobSucceeded => {}case JobFailed(exception: Exception) =>logInfo("Failed to run " + callSite)throw exception} }

    ?

    當daemon線程的processEvent()從queue中取出JobSubmitted事件后,會根據job劃分出不同的stage,并且提交stage:

    首先,對于任何的job都會產生出一個finalStage來產生和提交task。其次對于某些簡單的job,它沒有依賴關系,并且只有一個partition,這樣的job會使用local thread處理而并非提交到TaskScheduler上處理。

    case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>val runId = nextRunId.getAndIncrement()val finalStage = newStage(finalRDD, None, runId)val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)clearCacheLocs()if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {runLocally(job)} else {activeJobs += jobresultStageToJob(finalStage) = jobsubmitStage(finalStage)}

    其次,產生finalStage后,需要調用submitStage(),它根據stage之間的依賴關系得出stage DAG,并以依賴關系進行處理:

    private def submitStage(stage: Stage) {if (!waiting(stage) && !running(stage) && !failed(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)if (missing == Nil) {submitMissingTasks(stage)running += stage} else {for (parent <- missing) {submitStage(parent)}waiting += stage}} }

    ?

    對于新提交的job,finalStage的parent stage還未獲得,因此submitStage會調用getMissingParentStages()來獲得依賴關系:

    private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddif (getCacheLocs(rdd).contains(Nil)) {for (dep <- rdd.dependencies) {dep match {case shufDep: ShuffleDependency[_,_] =>val mapStage = getShuffleMapStage(shufDep, stage.priority)if (!mapStage.isAvailable) {missing += mapStage}case narrowDep: NarrowDependency[_] =>visit(narrowDep.rdd)}}}}}visit(stage.rdd)missing.toList }

    這里parent stage是通過RDD的依賴關系遞歸遍歷獲得。對于Wide Dependecy也就是Shuffle Dependecy,Spark會產生新的mapStage作為finalStage的parent,而對于Narrow Dependecy?Spark則不會產生新的stage。這里對stage的劃分是按照上面提到的作為劃分依據的,因此對于本段開頭提到的兩種job,第一種job只會產生一個finalStage,而第二種job會產生finalStage和mapStage。

    ?

    當stage DAG產生以后,針對每個stage需要產生task去執行,故在這會調用submitMissingTasks():

    private def submitMissingTasks(stage: Stage) {val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)myPending.clear()var tasks = ArrayBuffer[Task[_]]()if (stage.isShuffleMap) {for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {val locs = getPreferredLocs(stage.rdd, p)tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)}} else {val job = resultStageToJob(stage)for (id <- 0 until job.numPartitions if (!job.finished(id))) {val partition = job.partitions(id)val locs = getPreferredLocs(stage.rdd, partition)tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)}}if (tasks.size > 0) {myPending ++= taskstaskSched.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))if (!stage.submissionTime.isDefined) {stage.submissionTime = Some(System.currentTimeMillis())}} else {running -= stage} }

    首先根據stage所依賴的RDD的partition的分布,會產生出與partition數量相等的task,這些task根據partition的locality進行分布;其次對于finalStage或是mapStage會產生不同的task;最后所有的task會封裝到TaskSet內提交到TaskScheduler去執行。

    至此job在DAGScheduler內的啟動過程全部完成,交由TaskScheduler執行task,當task執行完后會將結果返回給DAGScheduler,DAGScheduler調用handleTaskComplete()處理task返回:

    private def handleTaskCompletion(event: CompletionEvent) {val task = event.taskval stage = idToStage(task.stageId)def markStageAsFinished(stage: Stage) = {val serviceTime = stage.submissionTime match {case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)case _ => "Unkown"}logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))running -= stage}event.reason match {case Success =>...task match {case rt: ResultTask[_, _] =>...case smt: ShuffleMapTask =>...}case Resubmitted =>...case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>...case other =>abortStage(idToStage(task.stageId), task + " failed: " + other)} }

    每個執行完成的task都會將結果返回給DAGScheduler,DAGScheduler根據返回結果來進行進一步的動作。

    RDD的計算

    RDD的計算是在task中完成的。我們之前提到task分為ResultTask和ShuffleMapTask,我們分別來看一下這兩種task具體的執行過程。

    • ResultTask

    ?

    override def run(attemptId: Long): U = {val context = new TaskContext(stageId, partition, attemptId)try {func(context, rdd.iterator(split, context))} finally {context.executeOnCompleteCallbacks()}}

    ?

    • ShuffleMapTask

    override def run(attemptId: Long): MapStatus = {val numOutputSplits = dep.partitioner.numPartitionsval taskContext = new TaskContext(stageId, partition, attemptId)try {val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])for (elem <- rdd.iterator(split, taskContext)) {val pair = elem.asInstanceOf[(Any, Any)]val bucketId = dep.partitioner.getPartition(pair._1)buckets(bucketId) += pair}val compressedSizes = new Array[Byte](numOutputSplits)val blockManager = SparkEnv.get.blockManagerfor (i <- 0 until numOutputSplits) {val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + ival iter: Iterator[(Any, Any)] = buckets(i).iteratorval size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)compressedSizes(i) = MapOutputTracker.compressSize(size)}return new MapStatus(blockManager.blockManagerId, compressedSizes)} finally {taskContext.executeOnCompleteCallbacks()}}

    ResultTask和ShuffleMapTask都會調用RDD的iterator()來計算和轉換RDD,不同的是:ResultTask轉換完RDD后調用func()計算結果;而ShufflerMapTask則將其放入blockManager中用來shuffle。

    ?

    RDD的計算調用iterator(),iterator()在內部調用compute()從RDD依賴關系的根開始計算:

    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)} else {computeOrReadCheckpoint(split, context)} } private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {if (isCheckpointed) {firstParent[T].iterator(split, context)} else {compute(split, context)} }?

    至此大致分析了TaskSchedulerListener,包括DAGScheduler內部的結構,job生命周期內的活動,RDD是何時何地計算的。接下來我們分析一下task在TaskScheduler內干了什么。

    TaskScheduler

    前面也提到了Spark實現了三種不同的TaskScheduler,包括LocalSheduler、ClusterScheduler和MesosScheduler。LocalSheduler是一個在本地執行的線程池,DAGScheduler提交的所有task會在線程池中被執行,并將結果返回給DAGScheduler。MesosScheduler依賴于Mesos進行調度,筆者對Mesos了解甚少,因此不做分析。故此章節主要分析ClusterScheduler模塊。

    ClusterScheduler模塊與deploy模塊和executor模塊耦合較為緊密,因此在分析ClUsterScheduler時也會順帶介紹deploy和executor模塊。

    首先我們來看一下ClusterScheduler的類圖:

    ClusterScheduler的啟動會伴隨SparkDeploySchedulerBackend的啟動,而backend會將自己分為兩個角色:首先是driver,driver是一個local運行的actor,負責與remote的executor進行通行,提交任務,控制executor;其次是StandaloneExecutorBackend,Spark會在每一個slave node上啟動一個StandaloneExecutorBackend進程,負責執行任務,返回執行結果。

    ClusterScheduler的啟動

    在SparkContext實例化的過程中,ClusterScheduler被隨之實例化,同時賦予其SparkDeploySchedulerBackend:

    master match {...case SPARK_REGEX(sparkUrl) =>val scheduler = new ClusterScheduler(this)val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)scheduler.initialize(backend)schedulercase LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>...case _ =>...} } taskScheduler.start()

    ?

    ClusterScheduler的啟動會啟動SparkDeploySchedulerBackend,同時啟動daemon進程來檢查speculative task:

    override def start() {backend.start()if (System.getProperty("spark.speculation", "false") == "true") {new Thread("ClusterScheduler speculation check") {setDaemon(true)override def run() {while (true) {try {Thread.sleep(SPECULATION_INTERVAL)} catch {case e: InterruptedException => {}}checkSpeculatableTasks()}}}.start()} }

    ?

    SparkDeploySchedulerBacked的啟動首先會調用父類的start(),接著它會啟動client,并由client連接到master向每一個node的worker發送請求啟動StandaloneExecutorBackend。這里的client、master、worker涉及到了deploy模塊,暫時不做具體介紹。而StandaloneExecutorBackend則涉及到了executor模塊,它主要的功能是在每一個node創建task可以運行的環境,并讓task在其環境中運行。

    override def start() {super.start()val driverUrl = "akka://spark@%s:%s/user/%s".format(System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),StandaloneSchedulerBackend.ACTOR_NAME)val args = Seq(driverUrl, "", "", "")val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone"))val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome)client = new Client(sc.env.actorSystem, master, appDesc, this)client.start() }

    ?

    在StandaloneSchedulerBackend中會創建DriverActor,它就是local的driver,以actor的方式與remote的executor進行通信。

    override def start() {val properties = new ArrayBuffer[(String, String)]val iterator = System.getProperties.entrySet.iteratorwhile (iterator.hasNext) {val entry = iterator.nextval (key, value) = (entry.getKey.toString, entry.getValue.toString)if (key.startsWith("spark.")) {properties += ((key, value))}}driverActor = actorSystem.actorOf(Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME) }

    ?

    在client實例化之前,會將StandaloneExecutorBackend的啟動環境作為參數傳遞給client,而client啟動時會將此提交給master,由master分發給所有node上的worker,worker會配置環境并創建進程啟動StandaloneExecutorBackend。

    至此ClusterScheduler的啟動,local driver的創建,remote executor環境的啟動所有過程都已結束,ClusterScheduler等待DAGScheduler提交任務。

    ClusterScheduler提交任務

    DAGScheduler會調用ClusterScheduler提交任務,任務會被包裝成TaskSetManager并等待調度:

    override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {val manager = new TaskSetManager(this, taskSet)activeTaskSets(taskSet.id) = manageractiveTaskSetsQueue += managertaskSetTaskIds(taskSet.id) = new HashSet[Long]()if (hasReceivedTask == false) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered")} else {this.cancel()}}}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)}hasReceivedTask = true;}backend.reviveOffers() }

    ?

    在任務提交的同時會啟動定時器,如果任務還未被執行,定時器持續發出警告直到任務被執行。

    同時會調用StandaloneSchedulerBackend的reviveOffers(),而它則會通過actor向driver發送ReviveOffers,driver收到ReviveOffers后調用makeOffers():

    // Make fake resource offers on just one executor def makeOffers(executorId: String) {launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {freeCores(task.executorId) -= 1executorActor(task.executorId) ! LaunchTask(task)} }

    makeOffers()會向ClusterScheduler申請資源,并向executor提交LauchTask請求。

    接下來LaunchTask會進入executor模塊,StandaloneExecutorBackend在收到LaunchTask請求后會調用Executor執行task:

    override def receive = {case RegisteredExecutor(sparkProperties) =>... case RegisterExecutorFailed(message) =>...case LaunchTask(taskDesc) =>logInfo("Got assigned task " + taskDesc.taskId)executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>... } def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {threadPool.execute(new TaskRunner(context, taskId, serializedTask)) }

    ?

    Executor內部是一個線程池,每一個提交的task都會包裝為TaskRunner交由threadpool執行:

    class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)extends Runnable {override def run() {SparkEnv.set(env)Thread.currentThread.setContextClassLoader(urlClassLoader)val ser = SparkEnv.get.closureSerializer.newInstance()logInfo("Running task ID " + taskId)context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)try {SparkEnv.set(env)Accumulators.clear()val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)updateDependencies(taskFiles, taskJars)val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)logInfo("Its generation is " + task.generation)env.mapOutputTracker.updateGeneration(task.generation)val value = task.run(taskId.toInt)val accumUpdates = Accumulators.valuesval result = new TaskResult(value, accumUpdates)val serializedResult = ser.serialize(result)logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)logInfo("Finished task ID " + taskId)} catch {case ffe: FetchFailedException => {val reason = ffe.toTaskEndReasoncontext.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))}case t: Throwable => {val reason = ExceptionFailure(t)context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))// TODO: Should we exit the whole executor here? On the one hand, the failed task may// have left some weird state around depending on when the exception was thrown, but on// the other hand, maybe we could detect that when future tasks fail and exit then.logError("Exception in task ID " + taskId, t)//System.exit(1)}}} }

    其中task.run()則真正執行了task中的任務,如前RDD的計算章節所述。返回值被包裝成TaskResult返回。

    至此task在ClusterScheduler內運行的流程有了一個大致的介紹,當然這里略掉了許多異常處理的分支,但這不影響我們對主線的了解。

    END

    至此對Spark的Scheduler模塊的主線做了一個順藤摸瓜式的介紹,Scheduler模塊作為Spark最核心的模塊之一,充分體現了Spark與MapReduce的不同之處,體現了Spark DAG思想的精巧和設計的優雅。

    當然Spark的代碼仍然在積極開發之中,當前的源碼分析在過不久后可能會變得沒有意義,但重要的是體會Spark區別于MapReduce的設計理念,以及DAG思想的應用。DAG作為對MapReduce框架的改進越來越受到大數據界的重視,hortonworks也提出了類似DAG的框架tez作為對MapReduce的改進。

    轉載于:https://www.cnblogs.com/vincent-hv/p/3334715.html

    總結

    以上是生活随笔為你收集整理的【转】Spark源码分析之-scheduler模块的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 人妻无码一区二区三区免费 | 91麻豆国产 | 性色免费视频 | 色偷偷亚洲| 97香蕉久久夜色精品国产 | 国产欧美精品一区二区三区 | 天天综合久久综合 | 丰满少妇高潮一区二区 | 一区二区三区免费看视频 | 久久成人资源 | 日本毛片在线看 | 欧美色图激情 | 亚洲精品国产精品乱码桃花 | 三级视频网 | 福利视频三区 | 女人夜夜春| 久久a久久 | 麻豆亚洲精品 | 国产噜噜噜噜久久久久久久久 | 在线你懂的视频 | 久久影院中文字幕 | 男女啪啪免费网站 | 久久综合操 | 激情综合视频 | 爱爱免费视频 | 超碰888| 快播久久| 日本欧美在线 | 人妻换人妻a片爽麻豆 | 爱爱中文字幕 | 国产无限制自拍 | 网站在线观看你懂的 | 久久久无码精品亚洲无少妇 | 久热网 | 尤物视频一区 | 国产一级网站 | 久久久青草| 韩国一级淫一片免费放 | 手机看片日韩久久 | 精品日本一区二区 | 色图自拍偷拍 | 亚洲www啪成人一区二区麻豆 | 亚洲AV无码国产日韩久久 | 久久久久无码国产精品一区李宗瑞 | 国产精品丝袜黑色高跟鞋的设计特点 | 熟妇高潮一区二区三区 | 青青插 | 国产成人久久精品麻豆二区 | 国产又爽又黄视频 | 99国产成人精品 | 住在隔壁的她动漫免费观看全集下载 | 韩国19主播内部福利vip | 爆乳熟妇一区二区三区霸乳 | 狼人精品一区二区三区在线 | 日本一区三区 | 台湾佬久久 | 男人吃奶视频 | 午夜影院久久 | 中文字幕欧美专区 | 综合天天色 | 久久精品视频网站 | 国产精品不卡在线观看 | 国产日韩一区二区在线观看 | 国产91精品久久久 | 韩国三级中文字幕hd浴缸戏 | 欧美透逼视频 | 久草超碰在线 | 欧美一区二区三区网站 | 永久毛片| 91九色porny视频 | 欧美丰满熟妇xxxxx | 免费香蕉视频 | 九九热精品视频 | 国产精品久久久久久久久久久久久久久久 | 毛片视频免费 | 97人人澡| 欧美粗暴se喷水 | 国产精品制服诱惑 | 日韩亚洲区 | 欧美专区在线 | 国产级毛片 | 看污网站 | 蜜桃麻豆视频 | 精品一区二区三区久久 | 成人欧美一区二区 | 爱爱视频在线看 | 亚洲天堂午夜 | 青青草成人影视 | 精品乱码一区二区三区四区 | 爱爱视频在线看 | 国产伦精品一区二区三区在线观看 | 亚洲砖区区免费 | 中文字幕15页 | 爱爱免费视频网站 | 国产中文字幕一区二区 | 中国少妇毛片 | 91精品国产99久久久久久红楼 | 中文字幕导航 | 少妇高潮av |