日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join

發布時間:2023/11/28 生活经验 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

擴展閱讀??雙流Join

介紹

Window Join

Interval Join

???????代碼演示1

???????代碼演示2

重點注意


擴展閱讀??雙流Join

介紹

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html

https://zhuanlan.zhihu.com/p/340560908

https://blog.csdn.net/andyonlines/article/details/108173259

?

雙流Join是Flink面試的高頻問題。一般情況下說明以下幾點就可以hold了:

  1. Join大體分類只有兩種:Window Join和Interval Join。
  • Window Join又可以根據Window的類型細分出3種:

Tumbling Window Join、Sliding Window Join、Session Widnow Join。

Windows類型的join都是利用window的機制,先將數據緩存在Window State中,當窗口觸發計算時,執行join操作;

  • interval join也是利用state存儲數據再處理,區別在于state中的數據有失效機制,依靠數據觸發數據清理;

目前Stream join的結果是數據的笛卡爾積;

?

Window Join

  • Tumbling Window Join

執行翻滾窗口聯接時,具有公共鍵和公共翻滾窗口的所有元素將作為成對組合聯接,并傳遞給JoinFunction或FlatJoinFunction。因為它的行為類似于內部連接,所以一個流中的元素在其滾動窗口中沒有來自另一個流的元素,因此不會被發射!

如圖所示,我們定義了一個大小為2毫秒的翻滾窗口,結果窗口的形式為[0,1]、[2,3]、。。。。該圖顯示了每個窗口中所有元素的成對組合,這些元素將傳遞給JoinFunction。注意,在翻滾窗口[6,7]中沒有發射任何東西,因為綠色流中不存在與橙色元素⑥和⑦結合的元素。

?

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
  • Sliding Window Join

在執行滑動窗口聯接時,具有公共鍵和公共滑動窗口的所有元素將作為成對組合聯接,并傳遞給JoinFunction或FlatJoinFunction。在當前滑動窗口中,一個流的元素沒有來自另一個流的元素,則不會發射!請注意,某些元素可能會連接到一個滑動窗口中,但不會連接到另一個滑動窗口中!

在本例中,我們使用大小為2毫秒的滑動窗口,并將其滑動1毫秒,從而產生滑動窗口[-1,0],[0,1],[1,2],[2,3]…。x軸下方的連接元素是傳遞給每個滑動窗口的JoinFunction的元素。在這里,您還可以看到,例如,在窗口[2,3]中,橙色②與綠色③連接,但在窗口[1,2]中沒有與任何對象連接。

?

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
  • Session Window Join

?

在執行會話窗口聯接時,具有相同鍵(當“組合”時滿足會話條件)的所有元素以成對組合方式聯接,并傳遞給JoinFunction或FlatJoinFunction。同樣,這執行一個內部連接,所以如果有一個會話窗口只包含來自一個流的元素,則不會發出任何輸出!

在這里,我們定義了一個會話窗口連接,其中每個會話被至少1ms的間隔分割。有三個會話,在前兩個會話中,來自兩個流的連接元素被傳遞給JoinFunction。在第三個會話中,綠色流中沒有元素,所以⑧和⑨沒有連接!

?


import?org.apache.flink.api.java.functions.KeySelector;import?org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;import?org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer>?orangeStream?=?...DataStream<Integer>?greenStream?=?...orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply?(new?JoinFunction<Integer,?Integer,?String>?(){@Overridepublic?String?join(Integer?first,?Integer?second)?{return?first?+?","?+?second;}});

?

???????Interval Join

前面學習的Window Join必須要在一個Window中進行JOIN,那如果沒有Window如何處理呢?

interval join也是使用相同的key來join兩個流(流A、流B),

并且流B中的元素中的時間戳,和流A元素的時間戳,有一個時間間隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]

or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

?

也就是:

流B的元素的時間戳 ≥ 流A的元素時間戳 + 下界,且,流B的元素的時間戳 ≤ 流A的元素時間戳 + 上界。

?

在上面的示例中,我們將兩個流“orange”和“green”連接起來,其下限為-2毫秒,上限為+1毫秒。默認情況下,這些邊界是包含的,但是可以應用.lowerBoundExclusive()和.upperBoundExclusive來更改行為

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound


import?org.apache.flink.api.java.functions.KeySelector;import?org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;import?org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer>?orangeStream?=?...DataStream<Integer>?greenStream?=?...orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2),?Time.milliseconds(1)).process?(new?ProcessJoinFunction<Integer,?Integer,?String(){@Overridepublic?void?processElement(Integer?left,?Integer?right,?Context?ctx,?Collector<String>?out)?{out.collect(first?+?","?+?second);}});

?

???????代碼演示1

  • 需求

來做個案例:

使用兩個指定Source模擬數據,一個Source是訂單明細,一個Source是商品數據。我們通過window join,將數據關聯到一起。

?

  • 思路

1、Window Join首先需要使用where和equalTo指定使用哪個key來進行關聯,此處我們通過應用方法,基于GoodsId來關聯兩個流中的元素。

2、設置5秒的滾動窗口,流的元素關聯都會在這個5秒的窗口中進行關聯。

3、apply方法中實現將兩個不同類型的元素關聯并生成一個新類型的元素。

package cn.lanson.extend;import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc*/
public class JoinDemo01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 構建商品數據流DataStream<Goods> goodsDS = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());// 構建訂單明細數據流DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());// 進行關聯查詢DataStream<FactOrderItem> factOrderItemDS = orderItemDS.join(goodsDS)// join條件:第一個流orderItemDS的GoodsId == 第二個流goodsDS的GoodsId.where(OrderItem::getGoodsId).equalTo(Goods::getGoodsId)//指定窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))//處理join結果.apply((OrderItem item, Goods goods) -> {FactOrderItem factOrderItem = new FactOrderItem();factOrderItem.setGoodsId(goods.getGoodsId());factOrderItem.setGoodsName(goods.getGoodsName());factOrderItem.setCount(new BigDecimal(item.getCount()));factOrderItem.setTotalMoney(goods.getGoodsPrice().multiply(new BigDecimal(item.getCount())));return factOrderItem;});factOrderItemDS.print();env.execute("滾動窗口JOIN");}//商品類@Datapublic static class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODS_LIST;public static Random r;static ?{r = new Random();GOODS_LIST = new ArrayList<>();GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODS_LIST.size());return GOODS_LIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}}//訂單明細類@Datapublic static class OrderItem {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}}//關聯結果@Datapublic static class FactOrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal totalMoney;@Overridepublic String toString() {return JSON.toJSONString(this);}}//構建一個商品Stream源(這個好比就是維表)public static class GoodsSource extends RichSourceFunction<Goods> {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while(!isCancel) {Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//構建訂單明細Stream源public static class OrderItemSource extends RichSourceFunction<OrderItem> {private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while(!isCancel) {Goods goods = Goods.randomGoods();OrderItem orderItem = new OrderItem();orderItem.setGoodsId(goods.getGoodsId());orderItem.setCount(r.nextInt(10) + 1);orderItem.setItemId(UUID.randomUUID().toString());sourceContext.collect(orderItem);orderItem.setGoodsId("111");sourceContext.collect(orderItem);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//構建水印分配器,學習測試直接使用系統時間了public static class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}//構建水印分配器,學習測試直接使用系統時間了public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {@Overridepublic TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<OrderItem>() {@Overridepublic void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
}

?

???????代碼演示2

1、通過keyBy將兩個流join到一起

2、interval join需要設置流A去關聯哪個時間范圍的流B中的元素。此處,我設置的下界為-1、上界為0,且上界是一個開區間。表達的意思就是流A中某個元素的時間,對應上一秒的流B中的元素。

3、process中將兩個key一樣的元素,關聯在一起,并加載到一個新的FactOrderItem對象中

package cn.lanson.extend;import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc*/
public class JoinDemo02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 構建商品數據流DataStream<Goods> goodsDS = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());// 構建訂單明細數據流DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());// 進行關聯查詢SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(OrderItem::getGoodsId).intervalJoin(goodsDS.keyBy(Goods::getGoodsId)).between(Time.seconds(-1), Time.seconds(0))//.upperBoundExclusive().process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {@Overridepublic void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {FactOrderItem factOrderItem = new FactOrderItem();factOrderItem.setGoodsId(right.getGoodsId());factOrderItem.setGoodsName(right.getGoodsName());factOrderItem.setCount(new BigDecimal(left.getCount()));factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));out.collect(factOrderItem);}});factOrderItemDS.print();env.execute("Interval JOIN");}//商品類@Datapublic static class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODS_LIST;public static Random r;static {r = new Random();GOODS_LIST = new ArrayList<>();GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODS_LIST.size());return GOODS_LIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}}//訂單明細類@Datapublic static class OrderItem {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}}//關聯結果@Datapublic static class FactOrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal totalMoney;@Overridepublic String toString() {return JSON.toJSONString(this);}}//構建一個商品Stream源(這個好比就是維表)public static class GoodsSource extends RichSourceFunction<Goods> {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//構建訂單明細Stream源public static class OrderItemSource extends RichSourceFunction<OrderItem> {private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods goods = Goods.randomGoods();OrderItem orderItem = new OrderItem();orderItem.setGoodsId(goods.getGoodsId());orderItem.setCount(r.nextInt(10) + 1);orderItem.setItemId(UUID.randomUUID().toString());sourceContext.collect(orderItem);orderItem.setGoodsId("111");sourceContext.collect(orderItem);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//構建水印分配器,學習測試直接使用系統時間了public static class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}//構建水印分配器,學習測試直接使用系統時間了public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {@Overridepublic TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<OrderItem>() {@Overridepublic void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
}

重點注意

注意:后面項目中涉及到雙流

接下來的內容面試常問

雙流Join是Flink面試的高頻問題。一般情況下說明以下幾點就可以hold了:
1.Join大體分類只有兩種:Window Join和Interval Join。
2.Window Join又可以根據Window的類型細分出3種:
Tumbling 、Sliding 、Session Widnow Join。
3.Windows類型的join都是利用window的機制,先將數據緩存在Window State中,當窗口觸發計算時,執行join操作;
4.interval join也是利用state存儲數據再處理,區別在于state中的數據有失效機制,依靠數據觸發數據清理;

看官網示例說明

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html

總結

以上是生活随笔為你收集整理的2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 日本一区二区三区免费看 | 欧美三级在线视频 | 黄色三级在线 | 亚洲黄色在线观看 | 免费a大片 | 一区www | 免费一级特黄特色毛片久久看 | 可以免费看的av毛片 | 91免费国产视频 | 中文在线字幕av | 欧美黄网在线观看 | 青青草久久伊人 | 日本人的性生活视频 | 国产91熟女高潮一区二区 | 女尊高h男高潮呻吟 | 日本一区二区免费高清视频 | 亚洲乱熟女一区二区三区小说 | 超碰免费在线观看 | ktv做爰视频一区二区 | 国产精品无码久久久久久 | 99re这里 | 精品人妻一区二区三区日产 | 黄一区二区三区 | 亚洲夜色 | 亚洲免费黄色网 | 午夜免费一级片 | 欧洲av一区二区 | 色天天色 | 91直接进入 | 国产午夜久久 | 好吊色在线视频 | 亚洲一区二区三区四区视频 | 日韩免费高清一区二区 | 成为性瘾网黄的yy对象后 | 日本老太婆做爰视频 | 国产在线视频卡一卡二 | 国产精品18久久久久久无码 | www.国产黄色 | 国产又黄又湿 | 日韩专区一区 | 国产一区二区三区在线观看 | 成人拍拍拍 | 国产操操操 | 国产免费三片 | 美女赤身免费网站 | aⅴ在线免费观看 | 国产大学生视频 | 丝袜国产一区 | 久久叉 | 亚洲一区精品视频在线观看 | 免费无遮挡在线观看视频网站 | 成人午夜激情视频 | 嫩色av | 久久99国产精品成人 | 国产视频1区2区3区 国产欧美一区二区精品性色99 | 在线免费激情视频 | 久久国产精品久久久久久电车 | 黄色免费视频 | 亚洲乱亚洲 | 免费看国产黄色片 | 国产福利片在线观看 | 亚洲成人伊人 | 久久一级电影 | 精品中文字幕视频 | 国产成人无码精品久在线观看 | 免费看国产曰批40分钟粉红裤头 | 国内黄色一级片 | 99热这里是精品 | 成年网站在线观看 | 男生草女生视频 | hitomi一区二区三区精品 | 一卡二卡在线 | 玖玖国产精品视频 | 韩国av在线 | 波多野结衣亚洲 | 99久久精品日本一区二区免费 | 无码黑人精品一区二区 | 看日本毛片| 可以免费观看的av网站 | 欧美黄网站在线观看 | 777久久 | 黄色国产视频 | 久色精品 | 99reav| 亚洲激情啪啪 | 尤物在线视频观看 | 国产电影一区在线观看 | 福利网址在线 | 校园激情av | 视频一区在线播放 | 欧美大奶在线 | 亚洲瘦老头同性xxxxx | 日韩综合第一页 | 国产尤物视频 | 香蕉成视频人app下载安装 | 男人操女人动漫 | 波多野吉衣在线视频 | 91成人一区二区三区 | 国产高清视频在线 |