日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Spark rdd 介绍,和案例介绍

發布時間:2024/9/27 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark rdd 介绍,和案例介绍 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.2、創建RDD

1)由一個已經存在的Scala集合創建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

2)由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)

1.3、RDD編程API

1.3.1、Transformation

RDD中的所有轉換都是延遲加載的,也就是說,它們并不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。

常用的Transformation:

轉換含義
map(func)返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成
filter(func)返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成
flatMap(func)類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func)類似于map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed)根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用于指定隨機數生成器種子
union(otherDataset)對源RDD和參數RDD求并集后返回一個新的RDD
intersection(otherDataset)對源RDD和參數RDD求交集后返回一個新的RDD
distinct([numTasks]))對源RDD進行去重后返回一個新的RDD
groupByKey([numTasks])在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks])與sortByKey類似,但是更靈活
join(otherDataset, [numTasks])在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])在類型為(K,V)和(K,W)的RDD上調用,返回一個(K(Iterable,Iterable))類型的RDD
cartesian(otherDataset)笛卡爾積
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

1.3.2、Action

動作含義
reduce(func)通過func函數聚集RDD中的所有元素,這個功能必須是可交換且可并聯的
collect()在驅動程序中,以數組的形式返回數據集的所有元素
count()返回RDD的元素個數
first()返回RDD的第一個元素(類似于take(1))
take(n)返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed])返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用于指定隨機數生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path)將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對于每個元素,Spark將會調用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path)將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。
saveAsObjectFile(path)
countByKey()針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func)在數據集的每一個元素上,運行函數func進行更新。

1.4 練習Spark rdd的api

連接Spark-Shell:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077

練習1

//通過并行化生成rdd val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) //對rdd1里的每一個元素乘2然后排序 val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true) //過濾出大于等于十的元素 val rdd3 = rdd2.filter(_ >= 10) //將元素以數組的方式在客戶端顯示 rdd3.collect

練習2:

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j")) //將rdd1里面的每一個元素先切分在壓平 val rdd2 = rdd1.flatMap(_.split(' ')) rdd2.collect

練習3:

val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4)) //求并集 val rdd3 = rdd1.union(rdd2) //求交集 val rdd4 = rdd1.intersection(rdd2) //去重 rdd3.distinct.collect rdd4.collect

練習4:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //求jion val rdd3 = rdd1.join(rdd2) rdd3.collect //求并集 val rdd4 = rdd1 union rdd2 rdd4.collect //按key進行分組 val rdd5 = rdd4.groupByKey rdd5.collect

練習5:

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //cogroup val rdd3 = rdd1.cogroup(rdd2) //注意cogroup與groupByKey的區別 rdd3.collect

練習6:

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5)) //reduce聚合 val rdd2 = rdd1.reduce(_ + _) rdd2.collect

練習7:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) val rdd3 = rdd1.union(rdd2) //按key進行聚合 val rdd4 = rdd3.reduceByKey(_ + _) rdd4.collect //按value的降序排序 val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1)) rdd5.collect

練習8:

mapPartitionsdef mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]該函數和map函數類似,只不過映射函數的參數由RDD中每一個元素變成了RDD中每一個分區的迭代器。如果在映射的過程中需要頻繁創建額外的對象,使用mapPartitions要比map高效的多比如:將RDD中的所有元素通過JDBC連接寫入數據庫,如果使用map函數,可能要為每一個元素都創建一個collection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個分區建立一個connection.參數preservesPartitioning表示是否保留父RDD的partitioner分區信息。 //rdd1有兩個分區 scala> var rdd1 = sc.makeRDD(1 to 5,2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at makeRDD at <console>:24 scala> rdd1.collectres27: Array[Int] = Array(1, 2, 3, 4, 5) //rdd3將rdd1中每個分區中的數值累加(通過mapPartitions來實現) scala> var rdd3 = rdd1.mapPartitions{ x => {| var result = List[Int]()| var i = 0| while(x.hasNext) {| i += x.next()| }| result.::(i).iterator| }}rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[64] at mapPartitions at <console>:26 //查看合并結果后的rdd3的值 scala> rdd3.collectres28: Array[Int] = Array(3, 12) //查看rdd3的分區大小 scala> rdd3.partitions.size res29: Int = 2

練習9:

mapPartitionsWithIndexdef mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]函數作用通mapPartitions,不過提供了兩個參數,第一個參數為分區的索引 例如: scala> var rdd1 = sc.makeRDD(1 to 25,4)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at <console>:24scala> var rdd2 = rdd1.mapPartitionsWithIndex{| (x,iter) => {| var result = List[String]()| var i = 0| while(iter.hasNext) {| i += iter.next()| }| result.::(x + "|" + i).iterator| }| }rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[67] at mapPartitionsWithIndex at <console>:26//獲取結果值(從返回的結果中可以看到) scala> rdd2.collectres30: Array[String] = Array(0|21, 1|57, 2|93, 3|154)再如: scala> val func = (index:Int,iter:Iterator[(Int)])=> {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| } scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2) scala> rdd1.mapPartitionsWithIndex(func).collectres0: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4], [partID:1,val:5], [part], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9])

練習8:
aggregate函數將每個分區里的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zerorValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致。
函數原型:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

aggregate 聚合,先在分區內進行聚合,然后再將每個分區的結果一起結果進行聚合scala> def func1(index:Int,iter:Iterator[(Int)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| }func1: (index: Int, iter: Iterator[Int])Iterator[String]//創建一個并行化的RDD,有兩個分區 scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[77] at parallelize at <console>:24 //通過下面的代碼可以看到rdd1中內容再兩個分區內的分布情況,通過下面的結果可以看出有兩個分區,分別是partID:0和partID:1 scala> rdd1.mapPartitionsWithIndex(func1).collectres56: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4], [partID:1,val:5], [partID:1,val:6], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9]) //下面的執行步驟是: //一:01取出最大值112取出最大值223取出最大值334取出最大值4===》第一個分區的最大值是4 //二:05取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分區的最大值是9 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(0) + 4+9=13 scala> rdd1.aggregate(0)(math.max(_,_),_+_) res57: Int = 13//下面的執行步驟是: //一:31取出最大值332取出最大值333取出最大值334取出最大值4===》第一個分區的最大值是4 //二:35取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分區的最大值是9 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(3)+4+9=16 scala> rdd1.aggregate(3)(math.max(_,_),_+_) res62: Int = 16//下面的執行步驟是: //一:51取出最大值552取出最大值553取出最大值554取出最大值5===》第一個分區的最大值是5 //二:55取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分區的最大值是9 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(5)+5+9=19 scala> rdd1.aggregate(5)(math.max(_,_),_+_) res58: Int = 19再如: //下面的執行步驟是: //一:81取出最大值882取出最大值883取出最大值884取出最大值8===》第一個分區的最大值是8 //二:85取出最大值886取出最大值887取出最大值888取出最大值889取出最大值9====>第二個分區的最大值是9 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(8)+8+9=25 scala> rdd1.aggregate(8)(math.max(_,_),_+_) res58: Int = 19再如: //下面的執行步驟是: //一:101取出最大值10102取出最大值10103取出最大值10104取出最大值10===》第一個分區的最大值是10 //二:105取出最大值10106取出最大值10107取出最大值10108取出最大值10109取出最大值10====>第二個分區的最大值是10 //三:后面的執行邏輯是:_+_,就是說將兩個分區的最大結果值求和,執行的結果是:(10)+10+10=30 scala> rdd1.aggregate(10)(math.max(_,_),_+_) res58: Int = 30================================================================================ 下面是字符串的聚合 scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[79] at parallelize at <console>:24scala> def fun2(index:Int,iter:Iterator[(String)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| } fun2: (index: Int, iter: Iterator[String])Iterator[String] //通過下面的結果可以知道:"a","b","c"在partID:0中,"d","e","f"在partID:1中 scala> rdd2.mapPartitionsWithIndex(fun2).collect res63: Array[String] = Array([partID:0,val:a], [partID:0,val:b], [partID:0,val:c], [partID:1,val:d], [partID:1,val:e], [partID:1,val:f]) //下面的運行順序是: //一、"""a"相加得"a","a""b"相加得"ab","ab""c"相加得"abc",第一個分區得到的結果是:"abc" //一、"""d"相加得"d","d""e"相加得"de","ed""f"相加得"def",第一個分區得到的結果是:"def" //三、由于是并行的計算,所以可能是第一個分區先執行完,此時的結果是:"" + "abc" + "def" ===》"abcdef";若是第二個分區先執行完,此時的結果是:"" + "def" + "abc" ===》"defabc" scala> rdd2.aggregate("")(_+_,_+_) res64: String = abcdef scala> rdd2.aggregate("")(_+_,_+_) res65: String = defabc//下面的運行順序是: //一、"=""a"相加得"=a","=a""b"相加得"=ab","=ab""c"相加得"=abc",第一個分區得到的結果是:"=abc" //一、"=""d"相加得"=d","=d""e"相加得"=de","=ed""f"相加得"=def",第一個分區得到的結果是:"=def" //三、由于是并行的計算,所以可能是第一個分區先執行完,此時的結果是:"=" + "=abc" + "=def" ===》"==abc=def";若是第二個分區先執行完,此時的結果是:"="+"=def" + "=abc" ===》"==def=abc" //下面的結果中分別是:res68: String = ==def=abc 和 res69: String = ==abc=def,和上面的推算結果一致 scala> rdd2.aggregate("=")(_ + _, _ + _) res68: String = ==def=abc scala> rdd2.aggregate("=")(_ + _, _ + _) res69: String = ==abc=defval rdd3 = sc.parallelize(List("12","23","345","4567"),2) //通過下面可以知道有兩個分區,并且每個分區中有不同的值 scala> rdd3.mapPartitionsWithIndex(fun2).collect res70: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:4567]) //下面的運行步驟是(scala> "".length結果是res72: Int = 0),(scala>"12".length結果是res73:Int=2): //一:"".length"12".length求出最大值2,得到字符串是"2";"2".length和"23".length求出最大值2,得到的字符串是2;所以第一個分區計算出的結果是:"2" //二:"".length"345".length求出最大值3,得到字符串是"3";"3".length和"4567".length求出最大值4,得到的字符串是4;所以第一個分區計算出的結果是:"4" //三:得到的結果最后執行x+y,由于是并行計算所以可能是"24"或者"42" scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) res75: String = 24 scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) res76: String = 42//下面求最小值: scala> val rdd4 = sc.parallelize(List("12","23","345",""),2) rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[84] at parallelize at <console>:24 scala> rdd4.mapPartitionsWithIndex(fun2).collect res79: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:]) //運行過程是: //一:"".length"12".length求出最小值0,得到字符串是"0";"0".length和"23".length求出最小值1,得到的字符串是0;所以第一個分區計算出的結果是:"0" //二:"".length"345".length求出最小值0,得到字符串是"0";"0".length和"".length求出最小值0,得到的字符串是0;所以第一個分區計算出的結果是:"0" //三:得到的結果最后執行x+y,由于是并行計算所以可能是"01""10" scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res85: String = 10scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res86: String = 01val rdd5 = sc.parallelize(List("12","23","","345"),2) //運行過程是: //一:"".length"12".length求出最小值0,得到字符串是"0";"0".length和"23".length求出最小值1,得到的字符串是0;所以第一個分區計算出的結果是:"0" //二:"".length"".length求出最小值0,得到字符串是"0";"0".length和"345".length求出最小值1,得到的字符串是1;所以第一個分區計算出的結果是:"1" //三:得到的結果最后執行x+y,由于是并行計算所以可能是"1"或 rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)再如案例: scala> def seqOP(a:Int, b:Int) : Int = {| println("seqOp: " + a + "\t" + b)| math.min(a,b)| } seqOP: (a: Int, b: Int)Intscala> def combOp(a:Int, b:Int): Int = {| println("combOp: " + a + "\t" + b)| a + b| } combOp: (a: Int, b: Int)Intscala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2) //這里要注意的是上面的z是Int類型的,所以下面要用于集合迭代的類型也是Int類型的。 scala> def fun2(index:Int,iter:Iterator[(Int)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| } fun2: (index: Int, iter: Iterator[Int])Iterator[String] //通過下面的方式顯示出每個值所在的分區 scala> z.mapPartitionsWithIndex(fun2).collect res94: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:1,val:4], [partID:1,val:5], [partID:1,val:6]) //下面的含義是:兩個分區每個里面先單獨執行seqOP,兩個都執行完成之后,再執行comOp邏輯,所以下面的運行過程是: //一、31執行seqOP的最小值是112執行seqOP間的最小值是113執行seqOP的最小值是1,第一個分區得到的結果是1 //二、34執行seqOP的最小值是335執行seqOP間的最小值是336執行seqOP的最小值是3,第一個分區得到的結果是3 //三、接著執行comOp邏輯,(3)和分區一種的1執行combOp得到的結果是:3+1=44接著和分區二中的3執行combOp得到的結果是4+3=7,所以最后的結果是:7 scala> z.aggregate(3)(seqOP, combOp) combOp:3 1 combOp:4 3 res95: Int = 7//再次驗證: //一、21執行seqOP的最小值是112執行seqOP間的最小值是113執行seqOP的最小值是1,第一個分區得到的結果是1 //二、24執行seqOP的最小值是225執行seqOP間的最小值是226執行seqOP的最小值是2,第一個分區得到的結果是2 //三、接著執行comOp邏輯,(2)和分區一種的1執行combOp得到的結果是:2+1=33接著和分區二中的2執行combOp得到的結果是3 + 2=5,所以最后的結果是:5 scala> z.aggregate(2)(seqOP, combOp) [Stage 105:> (0 + 0) / 2]combOp:2 1 combOp:3 2 res96: Int = 5 //下面的同樣: scala> def seqOp(a:String, b:String) : String = {| println("seqOp: " + a + "\t" + b)| math.min(a.length , b.length ).toString| } seqOp: (a: String, b: String)Stringscala> def combOp(a:String, b:String) : String = {| println("combOp: " + a + "\t" + b)| a + b| } combOp: (a: String, b: String)Stringscala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2) scala> z. aggregate ("")(seqOp, combOp) seqOp: 345 seqOp: 12 seqOp: 0 4567 seqOp: 0 23 combOp: 1 combOp: 1 1res25: String = 11

練習10:
aggregateByKey

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } scala> pairRDD.mapPartitionsWithIndex(func2).collect res99: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]) //執行過程是: //1、每個分區中的內容都按照key先進行分組, //第一個分區分組后的結果是:(cat,(2,5))、(mouse,(4)) //第二個分區分組后的結果是:(cat,(12))、(dog,(12))、(mouse,(2)) //2、接著0,分別和每組中的結果比對, //對于分區一:0和cat中的2比較,得到最大值2;2和cat中的5比較,得到的最大結果是5。同樣mouse執行相同操作。所以最終得到的是:(cat,(5)),(mouse,(4)) //對于分區二:0和cat中的12比較,得到的最大值12。依次類推,最終得到的結果是:(cat,(12))、(dog,(12))、(mouse,(2)) //3、接著0和分區一和分區二中每個最大值相加,最終得到的結果是: // (cat,(5)) + (cat,(12)) ? (cat,(5 + 12)) ==> (cat,(17)) //(mouse,(4)) + (mouse,(2)) ? (mouse,(4 + 2)) ==> (mouse,(6)) //(dog,(12)) ? (dog,(12)) ==> (dog,(12)) pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect//執行過程是: //1、每個分區中的內容都按照key先進行分組, //第一個分區分組后的結果是:(cat,(2,5))、(mouse,(4)) //第二個分區分組后的結果是:(cat,(12))、(dog,(12))、(mouse,(2)) //2、接著100,分別和每組中的結果比對, //對于分區一:100和cat中的2比較,得到最大值100;100和cat中的5比較,得到的最大結果是100。同樣mouse執行相同操作。所以最終得到的是:(cat,(100)),(mouse,(100)) //對于分區二:100和cat中的12比較,得到的最大值100。依次類推,最終得到的結果是:(cat,(100))、(dog,(100))、(mouse,(100)) //3、接著100和分區一和分區二中每個最大值相加,最終得到的結果是: //(cat,(100)) + (cat,(100)) ? (cat,(100 + 100)) ==> (cat,(200)) //(mouse,(100)) + (mouse,(100)) ? (mouse,(100 + 100)) ==> (mouse,(200)) //(dog,(100)) + (dog,(100)) ? (dog,(100)) ==> (dog,(100)) pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

練習11:
checkpoint (知識點可以查看:http://blog.csdn.net/tototuzuoquan/article/details/74838936)
為當前RDD設置檢查點。該函數將會創建一個二進制的文件,并存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程中,該RDD的所有依賴于父RDD中的信息將全部被移出。對RDD進行checkpoint操作并不會馬上被執行,必須執行Action操作才能觸發。
函數原型:
def checkpoint()
實例:

scala> val data = sc.parallelize(1 to 100000,15) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[94] at parallelize at <console>:24 scala> sc.setCheckpointDir("/iteblog") 17/07/07 19:17:22 WARN spark.SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/iteblog' appears to be on the local filesystem. scala> data.checkpoint scala> data.count res105: Long = 100000[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog Found 1 items drwxr-xr-x - root supergroup 0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c Found 1 items drwxr-xr-x - root supergroup 0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94 [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94 Found 15 items -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00000 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00001 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00002 -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00003 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00004 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00005 -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00006 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00007 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00008 -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00009 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00010 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00011 -rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00012 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00013 -rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00014 [root@hadoop2 hadoop-2.8.0]# 執行完count之后,會在/iteblog目錄下產生出多個(數量和你分區個數有關)二進制的文件。 //設置檢查點,將文件最終輸出到下面的位置上 scala> sc.setCheckpointDir("hdfs://mycluster/wordcount/ck")scala> val rdd = sc.textFile("hdfs://mycluster/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[100] at reduceByKey at <console>:24scala> rdd.checkpointscala> rdd.isCheckpointed res108: Boolean = falsescala> rdd.count res109: Long = 289 scala> rdd.isCheckpointed res110: Boolean = truescala> rdd.getCheckpointFile res111: Option[String] = Some(hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100)scala> [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100 Found 10 items -rw-r--r-- 3 root supergroup 147 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/_partitioner -rw-r--r-- 3 root supergroup 867 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00000 -rw-r--r-- 3 root supergroup 721 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00001 -rw-r--r-- 3 root supergroup 1091 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00002 -rw-r--r-- 3 root supergroup 1030 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00003 -rw-r--r-- 3 root supergroup 944 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00004 -rw-r--r-- 3 root supergroup 810 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00005 -rw-r--r-- 3 root supergroup 964 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00006 -rw-r--r-- 3 root supergroup 1011 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00007 -rw-r--r-- 3 root supergroup 974 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00008

練習12:
coalesce, repartition

coalesce:對RDD中的分區重新進行合并
函數原型:
def coalesce(numPartitions: Int, shuffle: Boolean = false)
    (implicit ord: Ordering[T] = null): RDD[T]
  返回一個新的RDD,且該RDD的分區個數等于numPartitions個數。如果shuffle設置為true,這回進行shuffle。

scala> var data = sc.parallelize(List(1,2,3,4)) data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[104] at parallelize at <console>:24scala> data.partitions.length res115: Int = 6scala> val result = data.coalesce(2,false) result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[105] at coalesce at <console>:26scala> result.partitions.length res116: Int = 2scala> result.toDebugString res117: String = (2) CoalescedRDD[105] at coalesce at <console>:26 []| ParallelCollectionRDD[104] at parallelize at <console>:24 []scala> val result1 = data.coalesce(2,true) result1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[109] at coalesce at <console>:26scala> result1.toDebugString res118: String = (2) MapPartitionsRDD[109] at coalesce at <console>:26 []| CoalescedRDD[108] at coalesce at <console>:26 []| ShuffledRDD[107] at coalesce at <console>:26 []+-(6) MapPartitionsRDD[106] at coalesce at <console>:26 []| ParallelCollectionRDD[104] at parallelize at <console>:24 []scala> 從上面可以看出shuffle為false的時候并不進行shuffle操作;而為true的時候會進行shuffle操作。RDD.partitions.length可以獲取相關RDD的分區數。再如下面的例子: scala> val rdd1 = sc.parallelize(1 to 10,10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[102] at parallelize at <console>:24scala> rdd1.collect res112: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)scala> rdd1.partitions.length res113: Int = 10scala> val rdd2 = rdd1.coalesce(2,false) rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[103] at coalesce at <console>:26scala> rdd1.partitions.length res114: Int = 10scala>

練習13:
collectAsMap
功能和collect函數類似,該函數用于Pair RDD,最終返回Map類型的結果
函數原型:
def collectAsMap(): Map[K, V]

scala> val rdd = sc.parallelize(List(("a",1),("b",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[111] at parallelize at <console>:24scala> rdd.collectAsMap res119: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)scala>從結果我們可以看出,如果RDD中同一個key中存在多個Value,那么后面的Value將會把前面的Value覆蓋,最終得到的結果就是Key唯一,而且對應一個Value。

練習14:
combineByKey
使用用戶設置好的聚合函數對每個Key中的Value進行組合(combine)。可以將輸入類型為RDD[(K,V)]轉成RDD[(K,C)]
函數原型:
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) : RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine:
Boolean = true, serializer: Serializer = null): RDD[(K, C)]
第一個和第二個函數都是基于第三個函數實現的,使用的是HashPartitioner,Serializer為null。而第三個函數我們可以指定分區,如果需要使用Serializer的話也可以指定。combineByKey函數比較重要,我們熟悉地諸如aggregateByKey、foldByKey、reduceByKey等函數都是基于函數實現的。默認情況在Map端進行組合操作。

scala> val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"), (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good"))) data: org.apache.spark.rdd.RDD[(Int, String)] =ParallelCollectionRDD[15] at parallelize at <console>:12scala> val result = data.combineByKey(List(_), (x: List [String], y: String) => y :: x, (x: List[String], y: List[String]) => x ::: y) result: org.apache.spark.rdd.RDD[(Int, List[String])] =ShuffledRDD[19] at combineByKey at <console>:14scala> result.collect res20: Array[(Int, List[String])] = Array((1,List(www, iteblog, com)),(2,List(bbs, iteblog, com)), (3,List(good)))scala> val data = sc.parallelize(List(("iteblog", 1), ("bbs", 1), ("iteblog", 3))) data: org.apache.spark.rdd.RDD[(String, Int)] =ParallelCollectionRDD[24] at parallelize at <console>:12scala> val result = data.combineByKey(x => x, (x: Int, y:Int) => x + y, (x:Int, y: Int) => x + y) result: org.apache.spark.rdd.RDD[(String, Int)] =ShuffledRDD[25] at combineByKey at <console>:14scala> result.collect res27: Array[(String, Int)] = Array((iteblog,4), (bbs,1))再如: val rdd1 = sc.textFile("hdfs://mycluster/wordcount/input").flatMap(_.split(" ")).map((_, 1)) val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd2.collectval rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd3.collectval rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val rdd6 = rdd5.zip(rdd4) val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)

練習15
countByKey

scala> val rdd1 = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd1.countByKey res0: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2) scala> rdd1.countByValue res1: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (a,1) -> 1, (c,2) -> 1, (c,1) -> 1)scala>

練習16:
filterByRange

scala> val rdd1 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24scala> val rdd2 = rdd1.filterByRange("b","d") rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at filterByRange at <console>:26scala> rdd2.collect res2: Array[(String, Int)] = Array((c,3), (d,4), (c,2))

練習17:
flatMapValues

scala> a.flatMapValues(_.split(" ")) res5: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at flatMapValues at <console>:27scala> a.flatMapValues(_.split(" ")).collect res6: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

練習18:
foldByKey

scala> val rdd1 = sc.parallelize(List("dog","wolf","cat","bear"),2) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:24scala> val rdd2 = rdd1.map(x => (x.length,x)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[13] at map at <console>:26scala> rdd2.collect res7: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))scala> val rdd3 = rdd2.foldByKey("")(_+_) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[14] at foldByKey at <console>:28scala> rdd3.collect res8: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))scala> rdd3.collect res9: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))scala> val rdd = sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)) rdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at map at <console>:24scala> rdd.foldByKey(0)(_+_) res10: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at foldByKey at <console>:27scala> rdd.foldByKey(0)(_+_).collect res11: Array[(String, Int)] = Array((role,1), (Play,1), (fraud,1), (level,1), (business,2), (improve,1), (platforms,1), (order,1), (big,1), (with,1), (scientist,,1), (active,1), (valuable,1), (data,5), (information,1), (Cooperate,1), (Collecting,1), (framework,1), (E-commerce/payment,1), (acquired,1), (root,1), (accurate,1), (solutions,1), (analysis;Maintenance,1), (problems,1), (them,1), (Analyze,1), (models,1), (analysis,3), (realize,1), (actual,1), (weight,1), (compare,1), (risk,1), (anti-fraud,1), (key,1), (related,1), (base,1), (Support,1), (against,1), (automatic,1), (to,2), (platform,2), (company's,1), (in,2), (needs,,1), (provide,2), (implement,1), (affecting,1), (strategy,1), (of,1), (reports,1), (management,1), (detection,,1), (for,1), (work,,1), (cause,1), (an,1), (verify,1),... scala>

foreachPartition
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreachPartition(x => println(x.reduce(_ + _)))

練習19:
keyBy

scala> val rdd1 = sc.parallelize(List("dog","salmon","salmon","rat","elephant"),3) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24scala> val rdd2 = rdd1.keyBy(_.length) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[22] at keyBy at <console>:26scala> rdd2.collect res12: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))scala>

練習20:
keys values

scala> val rdd1 = sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"),2) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at parallelize at <console>:24scala> val rdd2 = rdd1.map(x => (x.length,x)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at map at <console>:26scala> rdd2.keys.collect res13: Array[Int] = Array(3, 5, 4, 3, 7, 5)scala> rdd2.values.collect res14: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)

總結

以上是生活随笔為你收集整理的Spark rdd 介绍,和案例介绍的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

久久久999精品视频 国产美女免费观看 | 国产婷婷精品 | 亚洲成av人电影 | 午夜精品成人一区二区三区 | 国产精品一区久久久久 | 精品亚洲免a | 在线中文视频 | 国产91精品看黄网站 | 狠狠插狠狠干 | 999久久a精品合区久久久 | 久草爱 | 色网免费观看 | 又紧又大又爽精品一区二区 | 国产一区国产二区在线观看 | 国产亚洲成av人片在线观看桃 | 成人一区二区在线观看 | 91久久国产自产拍夜夜嗨 | 在线看av网址 | 欧美国产大片 | 丁香视频五月 | 在线观看视频在线观看 | 欧美一区二区三区四区夜夜大片 | 看黄色.com| 中文字幕高清视频 | 99热这里是精品 | 狠狠躁夜夜a产精品视频 | 亚洲精品国 | 韩日精品在线 | 一区二区视频网站 | 久草在线中文视频 | av在线一级 | 中文字幕在线免费看线人 | 婷婷丁香国产 | 久久精品日韩 | 碰超在线 | 亚洲国产中文在线 | 国产一级不卡视频 | 国产视频精品免费 | 国产精品一区免费观看 | 成人精品国产免费网站 | 2017狠狠干 | 麻豆精品在线视频 | 久久激情五月丁香伊人 | 日韩精品一区二区三区水蜜桃 | 五月在线 | 国际精品久久久久 | 欧美性久久久久久 | 91麻豆精品国产91久久久无需广告 | 国产网站色 | 一区二区精品 | 美女福利视频网 | 久99久在线视频 | 五月天久久综合 | 成人综合婷婷国产精品久久免费 | 亚州免费视频 | 国产精品一区二区av麻豆 | 中文在线a√在线 | 欧美久久久久久久久久 | 日韩中文字幕免费视频 | av电影免费 | 色婷婷国产精品一区在线观看 | 久久精品9 | 综合黄色网 | 天天操天天干天天综合网 | www久久九 | 99免费精品| av黄色免费在线观看 | 成人小电影在线看 | 狠狠五月天 | 日韩在线观看中文字幕 | 丁香 久久 综合 | 国产精品久久久视频 | 久久久国产精品人人片99精片欧美一 | 国产精品亚洲片在线播放 | 黄视频色网站 | 久久一区二区三区国产精品 | 亚洲日本va在线观看 | aa级黄色大片 | 欧美一级性生活视频 | 五月婷婷综合在线 | 国产伦精品一区二区三区四区视频 | 国产91精品久久久久 | 亚洲欧美日韩在线看 | 日韩一区二区三区在线观看 | 成人免费视频视频在线观看 免费 | 国产成人一区在线 | 99久久影院| 天天弄天天干 | 日韩免费在线视频观看 | 伊人久久影视 | 国产区在线 | 在线97| 女人18毛片a级毛片一区二区 | 九九有精品 | 色综合婷婷久久 | 正在播放五月婷婷狠狠干 | 最近最新mv字幕免费观看 | 中文日韩在线视频 | 97在线视频网站 | 亚洲经典中文字幕 | av一级一片 | 色欧美综合 | 精品产品国产在线不卡 | 五月综合在线观看 | 天堂av在线中文在线 | 欧美一级片免费播放 | 亚洲国产人午在线一二区 | 最近日本字幕mv免费观看在线 | 四虎国产精品免费观看视频优播 | 欧美性生活久久 | 国产小视频网站 | 成年人视频免费在线播放 | 亚洲国产美女精品久久久久∴ | 成人九九视频 | 亚洲伊人第一页 | 999国产在线| 91久久久国产精品 | 国产精品久久久久久久av电影 | 国产一区二区三区黄 | 久久视| av色网站 | 精品一区精品二区高清 | 美女黄频在线观看 | 久久一级电影 | 久久天天躁狠狠躁夜夜不卡公司 | 国产免费一区二区三区网站免费 | 国产区网址| 高清色免费 | 日韩中文在线字幕 | av在线网站大全 | 天堂va在线观看 | 五月开心六月伊人色婷婷 | 国产原创中文在线 | 欧美在线一级片 | 国产电影黄色av | 精品专区一区二区 | 婷婷精品国产欧美精品亚洲人人爽 | 日韩视频免费 | 在线国产一区二区 | 在线观看黄网 | 欧美做受69| 国外成人在线视频网站 | 精品一区 精品二区 | 超黄视频网站 | 天天插狠狠干 | 午夜精品一区二区三区可下载 | 丁香婷婷激情网 | 国产精品久久久久毛片大屁完整版 | 久久av免费| 麻豆视频在线 | 97av在线视频| 日本久久精 | 在线观看一区 | 国产精品s色 | 国产精品高潮在线观看 | 日本精品久久久久影院 | 成人免费在线播放视频 | av天天澡天天爽天天av | 色老板在线视频 | 日本三级久久 | 亚洲一级黄色 | 中文字幕一区二区三区在线播放 | 免费黄色av片 | 国内精品在线观看视频 | 97日日 | 国模视频一区二区 | 久久久久久久久毛片 | 91欧美精品 | 在线播放精品一区二区三区 | 男女全黄一级一级高潮免费看 | 99热官网 | 亚洲午夜av | 国产91在线播放 | 国产精品你懂的在线观看 | 在线高清 | 国产精品自产拍在线观看中文 | 国产成人精品福利 | www国产亚洲精品久久麻豆 | 久久久久久电影 | 亚洲深夜影院 | 92精品国产成人观看免费 | 亚洲高清免费在线 | 亚洲视频在线观看网站 | free. 性欧美.com | 亚洲精品午夜国产va久久成人 | 狠狠干激情 | 国产一区久久 | 久草资源在线观看 | 精品久久国产一区 | 久久躁日日躁aaaaxxxx | 91色九色 | 亚洲免费av电影 | 六月丁香激情综合色啪小说 | 中文字幕第一页在线播放 | 日韩久久视频 | 视频成人 | 亚州精品国产 | 91色偷偷| 色88久久| 国产精品中文字幕在线播放 | 在线免费视频你懂的 | 婷婷在线五月 | 日本精品视频在线观看 | 99在线精品视频 | 国产午夜精品一区二区三区四区 | 国产丝袜美腿在线 | 国产日产欧美在线观看 | 三日本三级少妇三级99 | 伊人永久 | 中文字幕在线播出 | 久久永久免费 | 国产视频一区二区在线播放 | 中文字幕第一页av | 国产精品久久在线 | 亚洲精品视频在线播放 | 久久理论片 | 97超碰总站 | 91九色蝌蚪视频 | 91精品国产乱码 | 丁香5月婷婷 | 久久久久久久久久久高潮一区二区 | 视频二区在线视频 | 午夜精品久久一牛影视 | 欧美色一色 | 一区二区三区免费在线观看 | 日韩免费一级a毛片在线播放一级 | 国产精品视频内 | av超碰在线 | 97视频久久久 | 精品国产自在精品国产精野外直播 | 最近更新好看的中文字幕 | 中文字幕日本在线 | 国产成人精品一区二区三区在线 | 日韩av电影网站在线观看 | 伊人黄| 国产高清免费观看 | 超碰在线观看97 | 色婷婷免费 | 久久久首页 | 视频三区在线 | 国产美女网站视频 | 天堂入口网站 | 中文电影网| 国产精品黄色在线观看 | 国产麻豆精品免费视频 | 久久免费视频在线观看6 | 视频一区在线播放 | 久久成人高清视频 | 亚洲永久精品在线观看 | av在线之家电影网站 | 99久久99久久精品国产片果冰 | 在线观看中文字幕网站 | 91桃色在线免费观看 | 欧美日韩国产欧美 | 国产精品久久久久久久久久久久午夜 | av在线网站观看 | 日韩欧美精品一区 | 99久久99久久综合 | 日韩理论片 | 久草免费在线视频观看 | 国产精品igao视频网入口 | 亚洲在线视频免费观看 | 在线观看午夜av | 天天干人人干 | 精品一区二区三区久久 | 欧美精品二区 | 97免费在线观看视频 | 久久精品视频网站 | 国产中文字幕在线看 | 久热免费在线观看 | 人人干97| 操操操操网 | 国产三级久久久 | 国产手机av | www亚洲一区 | 久草在线视频精品 | 黄色网www | 天天干,狠狠干 | 首页中文字幕 | 亚州av成人 | 91九色蝌蚪视频 | 在线观看91久久久久久 | 在线中文字幕一区二区 | 国产成人精品一区二区三区福利 | 日本护士撒尿xxxx18 | 69av在线播放| 深爱五月激情网 | 特级西西444www大精品视频免费看 | 色婷婷播放 | 00av视频| 91看片淫黄大片一级在线观看 | 亚洲夜夜综合 | 久久久久久欧美二区电影网 | 日韩网站免费观看 | av成人黄色 | 在线蜜桃视频 | 日韩日韩日韩日韩 | 免费观看全黄做爰大片国产 | 欧美亚洲国产日韩 | 综合色狠狠 | 中文字幕免费高 | 日韩一二区在线 | 久久国产高清 | 国产精品黄色影片导航在线观看 | 在线播放精品一区二区三区 | 正在播放 国产精品 | 久久久久久久久久久综合 | 成人国产精品久久久久久亚洲 | 99久热在线精品视频观看 | 亚洲精品婷婷 | 天天插天天射 | 国产一区二区三精品久久久无广告 | 久久综合一本 | 欧美一二三区在线观看 | 99精品国产免费久久久久久下载 | 日本大片免费观看在线 | 瑞典xxxx性hd极品 | 国产精品剧情在线亚洲 | 国产夫妻性生活自拍 | 精品国产三级a∨在线欧美 免费一级片在线观看 | 国产美女视频黄a视频免费 久久综合九色欧美综合狠狠 | 麻豆视频国产在线观看 | 欧美不卡视频在线 | 国产黄色精品 | 91久色蝌蚪 | 日韩在线观看网址 | 97在线看| 欧美日韩伦理一区 | 五月天国产精品 | www.久久视频 | 日韩性久久 | 中文字幕黄色av | av综合 日韩 | 久久综合精品国产一区二区三区 | 深夜男人影院 | 日韩免| 国产九九九精品视频 | 丁香午夜 | 亚洲一区美女视频在线观看免费 | 91在线麻豆| 久久久久久高潮国产精品视 | 啪啪激情网 | 正在播放一区 | 国内精品久久久久久中文字幕 | 成人久久18免费网站 | 日韩高清成人在线 | 成人久久久久久久久久 | 国产中文字幕一区 | 亚洲精品系列 | 狠狠干综合 | 久久久久久久久久久久av | 久操操| 中文字幕在线观看国产 | 在线免费91| 五月婷在线播放 | 亚洲日本国产 | 色射爱 | 国产手机免费视频 | 国产高清99 | 欧美做受xxx | 国产成人99av超碰超爽 | 国产精品资源在线 | 天天爽天天爽夜夜爽 | 在线电影 一区 | 国产精品美女久久久 | 西西人体4444www高清视频 | 亚洲精品一区二区三区在线观看 | 人人澡超碰碰97碰碰碰软件 | 丁香综合av | 国产成人精品综合久久久 | 国产99色| 亚洲欧洲av在线 | 亚洲欧美乱综合图片区小说区 | 亚洲天堂精品 | 国产精品入口麻豆 | 精品96久久久久久中文字幕无 | 最近中文字幕完整视频高清1 | 中文字幕在线观看第三页 | 99 久久久久 | 美女精品在线观看 | 干干干操操操 | 色在线最新 | 亚洲一区二区三区在线看 | 久久99久久久久久 | 在线看污网站 | 国产视频资源 | 91天堂素人约啪 | 精品黄色在线观看 | 韩国av在线 | 国产精品欧美久久久久天天影视 | 亚洲国产免费 | 日韩欧美一区二区在线观看 | 在线视频a | 免费福利视频网 | 婷婷色中文网 | 精品国产免费人成在线观看 | 国产精品久久久久久久久大全 | 成人毛片一区 | 亚洲欧美国产精品 | 久久久久久蜜av免费网站 | 精品三级av | 亚洲精品久久久久58 | 在线一区av | 日韩二区在线观看 | 成av在线| 精品国模一区二区 | 国产人成在线视频 | 91麻豆高清视频 | 97精品国自产拍在线观看 | 九色视频自拍 | 国产精品视频地址 | 国产女做a爱免费视频 | 99久久精品久久亚洲精品 | 欧美日韩国产精品一区二区亚洲 | 午夜色站 | 国产精品久久久久久99 | 成人免费中文字幕 | 免费a v视频| 亚洲小视频在线观看 | 婷婷 中文字幕 | 美女视频黄是免费的 | 亚洲国产网站 | 天天搞夜夜骑 | 亚洲高清不卡av | 在线黄频 | 成人在线免费看 | 日本丰满少妇免费一区 | 国产精品一区二区三区视频免费 | 午夜在线免费观看视频 | 青草视频在线播放 | 成人久久久久久久久久 | 国产91丝袜在线播放动漫 | 国产粉嫩在线观看 | 超碰av在线| 久久免费成人网 | 欧美精品国产综合久久 | av在线免费观看不卡 | 久久久久久影视 | 亚洲三级性片 | 色综合久| 久久爽久久爽久久av东京爽 | 麻豆国产视频 | 青青射 | 人人爽夜夜爽 | 麻豆高清免费国产一区 | 欧美日韩高清 | 最近中文字幕在线中文高清版 | 二区视频在线观看 | 麻豆91网站| 免费av网站在线 | 日本精品视频免费 | 色人久久 | 欧美国产日韩激情 | 精品在线观看一区二区 | 一区二区三区在线视频观看58 | 国产小视频在线观看免费 | 中文字幕在线视频第一页 | 深爱五月激情五月 | 久久精品视频3 | 亚洲自拍偷拍色图 | 久久色中文字幕 | 成 人 黄 色 视频 免费观看 | 中文字幕久久精品一区 | 亚洲视频分类 | 久久成人福利 | 欧美日韩在线视频观看 | 亚洲闷骚少妇在线观看网站 | av三级在线免费观看 | 六月激情| 在线观看视频免费播放 | 在线视频 成人 | 久久久精品国产一区二区三区 | 国产原创在线 | 黄色的视频 | 国产精品热视频 | 国产一区二区精品 | 日韩久久久久久久久久 | 狠狠地操 | 国产成人黄色av | 久久成人18免费网站 | 日韩久久精品一区 | 色综合天天色综合 | 人人爽爽人人 | 久久久国产一区二区三区 | 亚洲理论在线观看 | 色视频在线免费观看 | 国产精品2020 | 天天操天天干天天操天天干 | 99久久99视频只有精品 | 狠狠色免费| 国产一级片直播 | 婷婷99| 国产91电影在线观看 | av丁香花| 亚洲精品乱码久久久久久蜜桃不爽 | 欧美a级片免费看 | 伊人五月在线 | 91视频中文字幕 | 999久久国产精品免费观看网站 | 日韩免费视频在线观看 | 国产精品99久久久久久久久久久久 | 欧美一二三在线 | 少妇高潮流白浆在线观看 | 国产成人在线观看 | 在线国产高清 | 国产在线一区二区三区播放 | 一区二区中文字幕在线播放 | 黄色免费在线视频 | 国产成人三级一区二区在线观看一 | 男女视频国产 | 中文字幕在线一区二区三区 | 日本精品午夜 | 五月色婷 | 国产精品成人一区二区 | 久久久久亚洲精品中文字幕 | 国产成人一区二区三区 | 91在线文字幕 | 四虎影视av| 91视频中文字幕 | 黄色一级在线免费观看 | 国产精品美女视频 | 干干夜夜 | 激情久久综合网 | 亚洲免费国产 | 天天摸天天操天天爽 | 欧美黄色高清 | 国产精品久久久久久一区二区 | 成人蜜桃视频 | 综合网成人| 91精品爽啪蜜夜国产在线播放 | 国产一区免费在线 | 久久国产精品一二三区 | 福利二区视频 | 国产免费亚洲 | 一区二区丝袜 | 欧美在线不卡一区 | 日韩影片在线观看 | 波多野结衣动态图 | 草久中文字幕 | 99精品在线观看视频 | 国产无遮挡又黄又爽馒头漫画 | 精品一二区 | 久久久久国产一区二区 | 四虎国产精品免费观看视频优播 | 波多野结衣视频一区二区 | 99亚洲精品在线 | 欧美精品久久久久性色 | 日韩欧美视频免费在线观看 | 久久久久久欧美二区电影网 | 亚洲国产欧美在线看片xxoo | 91在线观看欧美日韩 | 国产成人精品一区二区三区福利 | 国产精品久久久久婷婷 | 97超碰免费在线 | 久久精品79国产精品 | 黄色三级免费网址 | www好男人 | 国产不卡av在线播放 | 狠狠干激情 | 在线观看视频你懂 | 婷婷丁香激情 | 高清av免费看| 欧美另类xxx | 2019久久精品| 日韩精品一区二区在线视频 | 久草在线99 | 国产成人久久久77777 | 中文字幕在线观看视频一区 | 国产精品手机看片 | 天天爱天天爽 | 久久久久二区 | 国产精品a级 | 九九九在线观看 | 蜜臀av性久久久久av蜜臀三区 | 天天干天天操天天搞 | 久草视频免费播放 | 色综合久久综合中文综合网 | 色综合久久中文综合久久牛 | 超碰97在线看 | 麻豆高清免费国产一区 | 久久久电影网站 | 国产69久久精品成人看 | 久久99视频免费观看 | 91精品啪在线观看国产线免费 | 91精品国产自产老师啪 | 色香网| 亚洲国产精品女人久久久 | 中文字幕在线观看视频一区二区三区 | 日韩中文在线电影 | 成人av地址 | 亚洲精品啊啊啊 | 91精品免费在线视频 | 在线观看精品视频 | 久久精品亚洲国产 | 久热这里有精品 | 欧美日韩中文字幕综合视频 | 四虎成人av| 日韩大片在线看 | 欧美日一级片 | 国产剧情av在线播放 | 91传媒免费在线观看 | 在线视频 一区二区 | 男女视频91| 深爱婷婷网 | 国产aa免费视频 | 就色干综合 | 午夜精品久久久久久 | 天天色天天操天天爽 | 亚洲一区二区三区在线看 | 亚洲成人免费在线观看 | 99av在线视频 | 91黄色小网站 | 亚洲成a人片在线观看中文 中文字幕在线视频第一页 狠狠色丁香婷婷综合 | 激情 一区二区 | 日韩精品一区二区三区在线播放 | 日韩免费网址 | 欧美日韩一区二区在线观看 | 成人精品一区二区三区中文字幕 | 91女神的呻吟细腰翘臀美女 | 久久在线观看 | 成人国产在线 | 亚洲成av人片在线观看无 | 国产99久久久精品视频 | 国产精品美女免费看 | 久草在线最新免费 | 亚洲欧美一区二区三区孕妇写真 | 男女视频国产 | 欧美日韩久 | 日本性xxxxx 亚洲精品午夜久久久 | 99久久国产免费,99久久国产免费大片 | 精品一区二区三区久久 | 成人免费在线播放视频 | 日日干天夜夜 | 六月色丁 | 天天操天天透 | 天天干天天插伊人网 | 在线观看国产www | 射射射av| 久久久久中文字幕 | 亚洲一区久久 | 国产精品欧美精品 | 91精品在线视频观看 | 久久久久国产精品午夜一区 | 久久av福利 | 成人永久视频 | 色噜噜在线观看视频 | 天天操天天操 | 欧美精品一二三 | 久久69精品 | 欧美性做爰猛烈叫床潮 | 在线高清一区 | www.99久久.com| 麻豆 videos| 综合网久久 | 一级黄色片在线免费观看 | 美女国产在线 | 96视频在线 | 欧美最猛性xxxxx(亚洲精品) | 国产手机精品视频 | 一本一本久久a久久精品综合妖精 | 草莓视频在线观看免费观看 | 在线观看黄色国产 | 天天草夜夜 | 国产男女无遮挡猛进猛出在线观看 | 在线免费观看av网站 | 日本在线观看黄色 | 又黄又爽又色无遮挡免费 | 午夜精品999 | 久久99国产精品二区护士 | 亚洲专区 国产精品 | 国产精品久久久久一区二区三区共 | 国内精品久久久久影院男同志 | 亚洲 成人 一区 | 亚洲精品国产片 | 99re国产视频| 亚洲国产欧美在线看片xxoo | 国产精品久久久久久久av大片 | 午夜精品久久久久久久爽 | 亚洲精品在线视频播放 | 亚洲精品国产欧美在线观看 | 91免费看黄 | 97精品国产97久久久久久粉红 | 欧美精品被 | 91在线观看高清 | 国产精品一区二区你懂的 | 81精品国产乱码久久久久久 | 国产精品第一页在线观看 | 日韩精品一区二区三区高清免费 | 国产午夜精品一区二区三区嫩草 | 精品国产欧美 | 日韩在线视频不卡 | 亚洲精品一区二区在线观看 | 在线观看黄色小视频 | 97色婷婷成人综合在线观看 | 91激情视频在线 | 精品在线一区二区三区 | av网址aaa| 久久tv| 日本韩国精品一区二区在线观看 | 免费黄色在线网站 | 国产精品免费不卡 | 免费a v观看 | 色噜噜狠狠狠狠色综合 | 91视频高清免费 | 在线免费观看成人 | h久久| 99九九99九九九视频精品 | 国产美女精品在线 | 亚洲精品国产品国语在线 | 天天操天天色天天射 | 久久精品国产v日韩v亚洲 | 蜜臀久久99精品久久久无需会员 | 日日操日日插 | 日韩高清dvd | 亚洲综合欧美精品电影 | 黄色中文字幕在线 | 天天干.com| 成人中文字幕+乱码+中文字幕 | 国产在线国偷精品产拍 | 国产字幕在线看 | 五月婷婷在线视频观看 | 开心激情综合网 | 久久久精品免费看 | 国产成人精品一区二区三区福利 | 亚洲国产精品成人av | 色播五月激情综合网 | a天堂中文在线 | 亚洲精品一区中文字幕乱码 | 欧美一二三四在线 | 久久爱www. | 亚洲午夜久久久久久久久 | av三级av | 亚洲资源 | 国产一区二区视频在线播放 | 2024av| 国产精品久久久久久影院 | 最新精品视频在线 | 综合伊人av | 精品成人国产 | 亚洲色图 校园春色 | 在线 国产一区 | 亚洲精品乱码久久久久久蜜桃动漫 | 日韩精品国产一区 | 欧美精品天堂 | 成人宗合网 | 亚洲成人中文在线 | 99爱精品在线 | 97小视频 | 中文字幕色在线 | 久久久国产精华液 | 国产精品久久久免费看 | 国产在线视频在线观看 | 精品国偷自产在线 | av一区二区三区在线观看 | 日韩国产在线观看 | 国产精品久久久久999 | 亚洲一区二区三区精品在线观看 | 免费在线观看污网站 | 天天操天天怕 | 综合网伊人 | 男女激情片在线观看 | 超碰在线97免费 | 99久久精品久久久久久清纯 | 国产99久久久国产精品免费二区 | 久久久久久久久久久久久国产精品 | 成人在线视频免费观看 | 久久这里只有精品1 | 亚州精品天堂中文字幕 | 在线性视频日韩欧美 | 91黄色小视频 | 五月婷影院| 在线视频福利 | 午夜黄色大片 | av大片免费在线观看 | 九九九电影免费看 | 日韩高清在线一区二区三区 | 亚欧洲精品视频在线观看 | 国产不卡精品 | 国内精品免费 | 超碰在线97国产 | 精品福利网站 | 久久国产视屏 | 国产精品久久久影视 | 91人人爽久久涩噜噜噜 | 综合久久婷婷 | 黄色小说在线免费观看 | 日韩中文字幕免费视频 | 国产在线视频资源 | 婷婷精品在线 | 国产精品高潮呻吟久久av无 | 日韩欧美国产激情在线播放 | 久久久免费高清视频 | 久久99精品国产麻豆宅宅 | 天天干天天干天天干天天干天天干天天干 | 国产视频久久 | 免费观看91视频大全 | 中文字幕有码在线播放 | 最近能播放的中文字幕 | 国产精品免费视频一区二区 | 亚洲国产剧情 | 免费三级黄色片 | 91丨porny丨九色| 青青草华人在线视频 | 国产高清绿奴videos | 亚洲人久久久 | 五月开心六月伊人色婷婷 | 成人网页在线免费观看 | av色综合网 | 免费亚洲片 | 在线小视频国产 | 国产视频欧美视频 | 高清在线观看av | 日韩1级片 | 日韩精品在线视频 | 国产精品午夜久久久久久99热 | 中文字幕免费在线 | 亚洲精品一区二区在线观看 | 欧美综合国产 | 91视频在线观看免费 | 中文字幕在线不卡国产视频 | 在线欧美小视频 | 国产精品乱码在线 | 免费av视屏 | 国产精品白丝av | 少妇bbw搡bbbb搡bbbb | 免费69视频 | 97成人在线 | 国产日韩欧美综合在线 | 日韩欧美在线免费观看 | 国内外成人在线视频 | 久久婷婷国产色一区二区三区 | 亚洲视频免费在线看 | 天天做日日做天天爽视频免费 | 国产精品亚洲人在线观看 | 久草视频免费在线观看 | 日韩欧美在线综合网 | 九色在线视频 | 黄网站免费大全入口 | 一区二区电影网 | 少妇精69xxtheporn | 久久精品视频在线播放 | 精品91久久久久 | 激情在线五月天 | 中文字幕乱码亚洲精品一区 | 碰碰影院 | 亚洲精品一区二区久 | 黄网站免费久久 | 午夜国产一区 | 手机av永久免费 | 黄色三级在线观看 | 日韩激情一二三区 | 欧美aa级 | 九九热99视频 | 激情片av | 国产九九精品视频 | 色婷婷亚洲精品 | 久草视频观看 | 午夜免费视频网站 | 色香网| 日日夜夜免费精品视频 | 国产一区二区三区高清播放 | 在线欧美最极品的av | 国产综合香蕉五月婷在线 | 国产精品免费看久久久8精臀av | 精品一区久久 | 韩日电影在线观看 | 激情喷水 | 免费成人短视频 | 国产小视频精品 | 亚洲视频1区2区 | 99热在线观看 | 日本精品视频在线观看 | 国产黄色精品在线 | 日韩一二区在线观看 | 美女视频黄是免费的 | 欧洲精品二区 | 日韩av网页 | 亚洲视频免费在线观看 | 日韩免费在线观看视频 | 九九在线精品视频 | 亚洲国产精品一区二区久久hs | 久二影院 | 开心综合网 | 精品1区2区3区 | 激情五月在线视频 | 亚洲精品大片www | 欧美国产精品久久久久久免费 | 精品久久久网 | 国产原厂视频在线观看 | 亚洲理论片 | 欧美精品乱码久久久久 | 久久久99精品免费观看 | 日韩视频在线观看视频 | av亚洲产国偷v产偷v自拍小说 | 亚洲国产经典视频 | 91视视频在线直接观看在线看网页在线看 | 国产精品18久久久久久久久 | 婷婷丁香色 | 美女视频久久黄 | 亚洲精品tv久久久久久久久久 | 免费不卡中文字幕视频 | 麻豆传媒一区二区 | 久久久天堂 | 久久久精品视频网站 | 麻豆传媒视频观看 | 少妇自拍av | 五月婷婷六月丁香 | 亚洲综合在线一区二区三区 | 国产午夜av | 精品久久电影 | 国产午夜精品一区二区三区欧美 | 国产色就色 | 久一网站| 97电影手机 | 蜜臀av一区| 国产精品系列在线观看 | 午夜 免费 | 伊甸园永久入口www 99热 精品在线 | 色999视频 | 欧美性极品xxxx做受 | 午夜精品剧场 | 亚洲免费婷婷 | 国产精品videoxxxx | www.午夜| 国产小视频在线免费观看 | 夜夜骑日日 | 国产精品原创在线 | 欧美 亚洲 另类 激情 另类 | 欧美不卡视频在线 | 亚洲一区二区三区四区精品 | 中文字幕在线观看一区 | av在线播放中文字幕 | 久草www | 亚洲精品美女 | 国产黄av | 国产欧美日韩精品一区二区免费 | 久久9精品| 91成人免费在线 | 成人在线播放av | 国产中文字幕91 | av大全在线播放 | 六月丁香在线视频 | 色婷婷激情五月 | 国产 日韩 欧美 在线 | 国产在线观看91 | 国产日韩精品欧美 | 国内久久 | 国产精品成人自产拍在线观看 | 日韩欧美国产精品 | 久99热| 日韩免费中文字幕 | 91九色国产在线 | 一区二区三区高清不卡 | 99999精品视频 | 成年人在线免费看 | 免费看v片网站 | 又黄又爽又刺激视频 | 六月色丁 | 日韩精品在线免费播放 | 黄色三级在线看 | 91成人小视频 | 麻豆精品在线视频 | 九九精品视频在线观看 | 亚洲.www | 国偷自产中文字幕亚洲手机在线 | 天天插天天狠天天透 | 久久九九久久 | 免费久久99精品国产婷婷六月 | 国产小视频免费在线网址 | 一本色道久久精品 | 婷婷六月天综合 | 四虎在线视频 | 天天爱天天插 | av黄网站 | 亚洲人xxx| 亚洲成人精品在线观看 | 狠狠色丁香婷婷综合久久片 | 亚洲做受高潮欧美裸体 | jizz999| 久草在线免 | 久久成人免费视频 | 欧美国产大片 | 国产中文字幕在线看 | 国产亚洲精品久久19p | 在线免费黄色 | 国产女人18毛片水真多18精品 | 女人18精品一区二区三区 | 伊香蕉大综综综合久久啪 | 亚洲精品在线一区二区 | 五月婷婷视频在线观看 | 丁香婷五月 | 亚洲国产成人精品在线 | 亚洲va男人天堂 | 色综合色综合色综合 | 欧美日韩中| 国产韩国日本高清视频 | 看黄色91 | 国产精品99久久久久久久久久久久 |