日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink自定义SQL连接器

發布時間:2024/1/18 数据库 86 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink自定义SQL连接器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 為什么要自定義連接器

通常我們會有這樣的需求,在使用Flink SQL將指標進行聚合計算完成之后,想要寫入到我們想要寫入的中間件時,例如opentsdb時序數據庫,可能會發現Flink官方并沒有給我們提供opentsdb的連接器,這個時候我們就需要自己去定義連接器

2. 自定義連接器的步驟

  • 創建TableFactory,子類有StreamTableSourceFactory和StreamTableSinkFactory
  • 創建TableSink和TableSource,子類有AppendStreamTableSink和StreamTableSource
  • 自定義校驗器ConnectorDescriptorValidator
  • 創建子類繼承RichSinkFunction
  • 在resource目錄下創建META-INF/services,并且創建org.apache.flink.table.factories.TableFactory文件
  • 3. 各步驟解釋

    這里我們以sink的角度來解釋一下各個步驟,source的角度是類似的

    3.1 StreamTableSinkFactory

    我們需要創建自己的Factory去實現StreamTableSinkFactory,主要關注它的幾個方法:createStreamTableSink(Map),requiredContext(),supportedProperties():定義connector支持的配置

    createStreamTableSink(Map):創建StreamTableSink

    requiredContext():唯一標識這個connector的類型,即connector.type

    3.2 AppendStreamTableSink

    這是一個追加流,當然還有upsertStreamTableSink和RetractStreamTableSink,根據自己的需求去使用,它們之間的區別略過

    consumeDataStream():這是我們重點關注的方法,這個方法用于消費數據流中的數據然后通過addSink調用RichSinkFunction,將數據進行消費

    3.3 RichSinkFunction

    我們自己實現的Sink方法,主要有三個方法

    invoke(Row,Context):關鍵代碼,我們在這里獲取數據然后進行操作

    open:一般進行初始化操作,例如初始化一些客戶端如httpClient,kafkaClient

    close:結束時調用,一般進行關閉操作例如客戶端的關閉

    3.4 ConnectorDescriptorValidator

    4. 實戰代碼

    4.1 生成數據

    首先我們將數據推送到我們的Kafka中:

    創建JavaBean:

    public class KafkaTestBean {private Double bandWidth;private Long app_time;private Double packet;private String networkLineId;public KafkaTestBean(Double bandWidth, Long app_time, Double packet, String networkLineId) {this.bandWidth = bandWidth;this.app_time = app_time;this.packet = packet;this.networkLineId = networkLineId;}public Double getPacket() {return packet;}public void setPacket(Double packet) {this.packet = packet;}public KafkaTestBean() {}public Double getBandWidth() {return bandWidth;}public void setBandWidth(Double bandWidth) {this.bandWidth = bandWidth;}public Long getApp_time() {return app_time;}public void setApp_time(Long app_time) {this.app_time = app_time;}public String getNetworkLineId() {return networkLineId;}public void setNetworkLineId(String networkLineId) {this.networkLineId = networkLineId;} }

    Kafka消息生成器:

    public class KafkaMessageGenerator {public static void main(String[] args) throws Exception{//配置信息Properties props = new Properties();//kafka服務器地址props.put("bootstrap.servers", "192.168.245.11:9092");//設置數據key和value的序列化處理類props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//創建生產者實例KafkaProducer<String,String> producer = new KafkaProducer<>(props);while (true){KafkaTestBean bean = new KafkaTestBean();bean.setNetworkLineId(UUID.randomUUID().toString());bean.setBandWidth(generateBandWidth());bean.setApp_time(System.currentTimeMillis() / 1000);bean.setPacket(generateBandWidth());ProducerRecord<String,String> record = new ProducerRecord("firsttopic", JSONObject.toJSONString(bean));producer.send(record);Thread.sleep(1000);}}private static Double generateBandWidth() {String s1 = String.valueOf((int) ((Math.random()) * 10));String s2 = String.valueOf((int) ((Math.random()) * 10));return Double.parseDouble(s1.concat(".").concat(s2));} }

    4.2 實現功能

    實現這么一個功能:從kafka接受數據然后進行聚合計算,寫入到opentsdb中

    4.2.1 創建TableFactory

    package com.cxc.flink.extend;import com.alibaba.fastjson.JSONObject; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.FormatDescriptorValidator; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.table.factories.StreamTableSourceFactory; import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;import static com.cxc.flink.extend.CustomizedConnectorDescriptorValidator.*; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;/*** create by chenxichao*/ public class CustomizedTableSourceSinkFactory implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {private CustomizedConnectorDescriptorValidator customizedConnectorDescriptorValidator;@Overridepublic StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {customizedConnectorDescriptorValidator = new CustomizedConnectorDescriptorValidator();final DescriptorProperties descriptorProperties = new DescriptorProperties(true);descriptorProperties.putProperties(properties);//參數校驗customizedConnectorDescriptorValidator.validate(descriptorProperties);final TableSchema schema = TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(Schema.SCHEMA));String job = descriptorProperties.getString(CONNECTOR_JOB);String metrics = descriptorProperties.getString(CONNECTOR_METRICS);String address = descriptorProperties.getString(CONNECTOR_ADDRESS);String format = descriptorProperties.getString(FORMAT_TYPE);return new CustomizedTableSink(job, metrics, address, schema,format);}@Overridepublic StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {return null;}@Overridepublic Map<String, String> requiredContext() {/*** 這里connector類型,通過這個配置flink有且只能discover一種connector*/Map<String,String> context = new HashMap<>();context.put(CONNECTOR_TYPE,CONNECTOR_TYPE_VALUE_CUSTOMIZE);return context;}/*** 這里是自定義connector支持的配置* @return*/@Overridepublic List<String> supportedProperties() {List<String> supportProperties = new ArrayList<>();supportProperties.add(CONNECTOR_JOB);supportProperties.add(CONNECTOR_METRICS);supportProperties.add(CONNECTOR_ADDRESS);//schemasupportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_NAME);supportProperties.add(FormatDescriptorValidator.FORMAT_TYPE);return supportProperties;}}

    4.2.2 創建StreamTableSink

    package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/ public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.address)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();} }

    4.2.3 創建RickSinkFunction

    package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.types.Row;/*** create by chenxichao*/ public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;public CustomizedSinkFunction(String address){this.address = address;}@Overridepublic void invoke(Row value, Context context) {//打印即可System.out.println("send to " + address + "---" + value);} }

    4.2.4 創建ConnectorDescriptorValidator

    package com.cxc.flink.extend;import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; import org.apache.flink.table.descriptors.DescriptorProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** create by chenxichao*/ public class CustomizedConnectorDescriptorValidator extends ConnectorDescriptorValidator {public static final String CONNECTOR_JOB = "connector.job";public static final String CONNECTOR_METRICS = "connector.metrics";public static final String CONNECTOR_ADDRESS = "connector.address";public static final String CONNECTOR_TYPE_VALUE_CUSTOMIZE = "customize";public static final String FORMAT_TYPE = "format.type";private Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic void validate(DescriptorProperties properties) {/*** 這里對連接屬性進行校驗*/logger.info("開始校驗連接器參數");super.validate(properties);logger.info("連接器參數校驗完畢");} }

    4.2.5 創建META-INF/services

    TableFactory是利用Java的SPI去發現工廠的,可以在TableServiceFactory的discoverFactories()方法去查看源碼

    在目錄下創建META-INF/services/org.apache.flink.table.factories.TableFactory

    內容為:

    com.cxc.flink.extend.CustomizedTableSourceSinkFactory

    如果不創建該文件則會導致ServiceLoader找不到該工廠,使用就會報錯

    4.2.6 SQL代碼

    寫一個Flink SQL測試程序,計算之后發送到我們自定義的sink中

    import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment;/*** create by chenxichao*/ public class SQLExtendTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//輸入表的sql語句StringBuilder INPUT_SQL = new StringBuilder();INPUT_SQL.append("CREATE TABLE bandWidthInputTable (").append("networkLineId VARCHAR,bandWidth DOUBLE,app_time BIGINT,packet DOUBLE,").append("ts as TO_TIMESTAMP(FROM_UNIXTIME(app_time,'yyyy-MM-dd HH:mm:ss')),").append("WATERMARK FOR ts AS ts - INTERVAL '5' second) ").append("WITH(").append("'connector.type' = 'kafka',").append("'connector.version' = 'universal',").append("'connector.topic' = 'firsttopic',").append("'connector.properties.group.id' = 'start_log_group',").append("'connector.properties.zookeeper.connect' = '192.168.245.11:2181',").append("'connector.properties.bootstrap.servers' = '192.168.245.11:9092',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(INPUT_SQL.toString());//輸出表的sql語句StringBuilder OUT_TABLE_SQL = new StringBuilder();OUT_TABLE_SQL.append("CREATE TABLE bandWidthOutputTable (").append("metricset VARCHAR,`value` DOUBLE,`timestamp` BIGINT,networkLineId VARCHAR)").append("WITH(").append("'connector.type' = 'customize',").append("'connector.address' = '192.168.245.138:8081',").append("'connector.job' = 'testextendjob',").append("'connector.metrics' = 'testmetric',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(OUT_TABLE_SQL.toString());String sql = "INSERT INTO bandWidthOutputTable SELECT 'nmct_line_metric_bandwidth' as metricset,sum(bandWidth),avg(app_time),networkLineId FROM bandWidthInputTable GROUP BY tumble(ts,interval '5' second),networkLineId";tableEnv.sqlUpdate(sql);env.execute("window sql job");} }

    5. 功能擴展

    上述代碼我們可以實現一個小小的功能就是打印在控制臺,但是我們發現我們的數據并不是一個標準的JSON格式,在很多場景中我們都需要我們的數據是JSON,那么如何實現呢?

    這里通過翻閱Flink kafka連接器,發現一個SerializationSchema接口,這樣就很簡單了,找到實現類JsonRowSerializationSchema 它內部利用jackson進行json序列化,直接使用即可

    5.1 修改RichSinkFunction

    在invoke方法中處理數據然后進行序列化操作

    package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.types.Row;/*** create by chenxichao*/ public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;private SerializationSchema<Row> serializationSchema;public CustomizedSinkFunction(TypeInformation<Row> rowType,String address,String formatType){this.address = address;if(formatType.equals("json")){this.serializationSchema = new JsonRowSerializationSchema.Builder(rowType).build();}else{throw new RuntimeException("current custom format only support json serializer");}}@Overridepublic void invoke(Row value, Context context) {//打印即可byte[] serialize = this.serializationSchema.serialize(value);String jsonValue = new String(serialize);System.out.println("send to " + address + "---" + jsonValue);}}

    5.2 修改StreamTableSink

    創建SinkFunction的時候將數據schema傳入,為了生成jsonNode

    package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/ public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.schema.toRowType(),this.address,formatType)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();} }

    總結

    以上是生活随笔為你收集整理的Flink自定义SQL连接器的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    日韩小视频网站 | 久久激情五月丁香伊人 | 一区二区三区在线不卡 | 成人黄色大片网站 | 亚洲精品乱码久久久久久蜜桃91 | 久久久久中文字幕 | 国产精品观看在线亚洲人成网 | 中文在线最新版天堂 | 午夜精品一区二区三区四区 | 色开心| 日本午夜在线亚洲.国产 | 亚洲国产精品va在线 | 国产一区二区在线播放 | 西西大胆免费视频 | 99中文在线 | 国产精品成 | 欧美精品在线一区二区 | 极品久久久 | 精品视频在线看 | 免费观看mv大片高清 | 国产成人精品三级 | 在线激情av电影 | www日韩在线观看 | 91精品成人 | 青草视频在线免费 | 欧美日韩不卡一区二区三区 | 91亚洲国产成人 | 欧美精品亚洲二区 | 免费精品久久久 | 成人毛片网 | 18国产精品福利片久久婷 | 一区二区三区精品在线视频 | 这里有精品在线视频 | 亚洲精品一区二区三区在线观看 | 亚洲精品tv | 久久久免费观看视频 | 91精品影视 | aa级黄色大片| 五月天婷婷丁香花 | 成人午夜免费剧场 | 91亚洲视频在线观看 | 激情 婷婷 | 夜添久久精品亚洲国产精品 | 午夜精品视频免费在线观看 | 少妇视频一区 | 91在线视频精品 | 久久国产精品色av免费看 | 狠狠干成人综合网 | 在线免费高清一区二区三区 | 色婷婷久久久 | 黄色在线成人 | 欧美午夜寂寞影院 | 日韩欧美视频在线免费观看 | 日本狠狠干 | www.黄色片.com | 欧美精品一区二区蜜臀亚洲 | 日韩一区视频在线 | 视频在线亚洲 | 免费在线观看视频a | 国产99久久久国产精品免费看 | 国产 一区二区三区 在线 | 久久婷五月| 国产二区视频在线观看 | 精品美女久久久久久免费 | 久久香蕉电影 | 偷拍精偷拍精品欧洲亚洲网站 | 黄色大片免费播放 | 天天干夜夜爱 | av中文资源在线 | 成人毛片网 | 欧美成年黄网站色视频 | 亚洲成人二区 | www.91国产| 久久精品一 | www亚洲国产| 国产精品系列在线播放 | 久久99九九99精品 | 伊人五月天av | 午夜国产影院 | 久久久免费毛片 | 日韩中文字幕一区 | 精品福利在线观看 | 欧美精品久久久久 | 色 中文字幕 | 午夜色婷婷 | 欧美在线视频日韩 | 婷婷在线视频 | 免费在线观看成人小视频 | 日韩精品网址 | 天天干天天操 | 欧美日韩国产精品一区二区 | 五月天综合在线 | 香蕉看片 | 国产男女免费完整视频 | 国产亚洲日 | 色99之美女主播在线视频 | 激情文学丁香 | 中日韩在线视频 | 亚洲综合一区二区精品导航 | 色播六月天 | 国产黄色大全 | 美女久久久久久久 | 国产高清第一页 | 国产视频在线观看免费 | 91视视频在线直接观看在线看网页在线看 | 91热爆视频| 日韩电影一区二区三区在线观看 | 国产不卡免费av | 91 中文字幕 | 日韩69av| 欧美日韩综合在线 | 国精产品一二三线999 | 一区二区精品在线视频 | 国产精品久久久久久久久久99 | 外国av网 | 91久久偷偷做嫩草影院 | 又黄又网站 | 欧美另类高清 videos | 色婷婷97 | 精品久久久久久亚洲综合网站 | 天天操天天操天天 | 久久色中文字幕 | 国产精品专区h在线观看 | 97视频在线观看视频免费视频 | 在线a视频免费观看 | a电影免费看 | 国产啊v在线观看 | 毛片网站免费在线观看 | av黄色免费网站 | 99re8这里有精品热视频免费 | 韩日视频在线 | 四虎成人精品永久免费av九九 | 97在线观看免费高清 | 久久亚洲国产精品 | 99热超碰| 中文久久精品 | 国产欧美久久久精品影院 | 国内精品视频免费 | a级片韩国 | 婷婷久操 | 国产免费视频在线 | 日日躁你夜夜躁你av蜜 | 91久久久久久国产精品 | 久久99在线 | 麻豆视频免费在线播放 | 婷婷中文字幕在线观看 | 日韩乱理 | 1000部18岁以下禁看视频 | 天天狠狠干 | 啪啪肉肉污av国网站 | 亚洲精品在线免费 | 粉嫩av一区二区三区四区五区 | 欧美日韩免费观看一区=区三区 | 国产精品久久久久免费a∨ 欧美一级性生活片 | 91视频免费国产 | 91超在线| 国产精品免费在线观看视频 | 欧美另类xxxx | 91在线在线观看 | 91在线免费播放视频 | 免费视频a | 国产在线观 | 久久免费视频这里只有精品 | 国产日韩精品一区二区 | av直接看 | www.天天射.com | 国产精品综合av一区二区国产馆 | 在线观看成人 | 亚洲欧美视频在线 | 成人黄色大片在线免费观看 | 五月婷在线播放 | 91av资源网 | 婷婷九月激情 | 夜夜操网站 | 国产精品黄 | 精品久久网站 | 日韩在线观看av | 狠狠操精品 | 蜜桃视频在线视频 | 国内精品一区二区 | 欧美国产精品一区二区 | 在线观看国产永久免费视频 | 69精品久久 | 欧美日韩p片 | 久久精品国产亚洲精品2020 | 久久久久免费 | 99久久超碰中文字幕伊人 | 最近中文字幕高清字幕免费mv | 国产视频一二区 | 国内精品久久久久久久久久久 | 天天插天天 | 日韩免费一级电影 | 欧美日韩性视频 | 99产精品成人啪免费网站 | 日韩高清免费电影 | 美国av大片 | 国产午夜精品理论片在线 | 91最新网址 | 91字幕| 天堂av在线 | 久久综合之合合综合久久 | 日韩 在线a | 特级黄色片免费看 | www激情com| 亚洲婷婷在线视频 | 欧美一区二区三区在线观看 | 久久视频这里有精品 | 亚洲区二区 | 日韩成人一级大片 | 国产在线精品一区二区三区 | 911在线| 国产日韩在线观看一区 | 国产在线中文字幕 | 91污污视频在线观看 | 一级精品视频在线观看宜春院 | 91看片在线看片 | 日日操天天操夜夜操 | 91在线入口 | 欧美日韩一区二区视频在线观看 | 成人午夜黄色影院 | 日韩在线观看中文字幕 | 日韩欧美网址 | 97香蕉超级碰碰久久免费软件 | 久久99精品视频 | 蜜桃视频在线视频 | 国产亚洲婷婷免费 | 欧美激情在线看 | 日本少妇高清做爰视频 | 波多野结衣资源 | 亚洲情婷婷 | 天天干天天天天 | 在线看国产精品 | 日韩一二区在线 | 久久视频一区二区 | 国产精品欧美一区二区三区不卡 | 久久免费播放视频 | 91精品久久久久久综合乱菊 | 国产很黄很色的视频 | 免费在线成人av | 日韩h在线观看 | 成人在线免费小视频 | 正在播放日韩 | 欧美激情综合色综合啪啪五月 | 区一区二在线 | 丁香九月婷婷 | 91综合在线| 狠狠躁日日躁狂躁夜夜躁av | 天天干天天操天天拍 | 欧美日韩在线观看视频 | 91综合视频在线观看 | 人人射人人 | 欧美日韩国产精品一区二区亚洲 | 精品国产亚洲在线 | av免费网| 五月在线| 91麻豆操| 91在线中字| 中文字幕乱码视频 | 天躁狠狠躁 | 97久久久免费福利网址 | 久久午夜羞羞影院 | 四虎免费av | 99亚洲精品视频 | 日韩精品一区二区三区高清免费 | 91在线国内视频 | 久久精彩免费视频 | 免费a级毛片在线看 | 三级av网站| 精品国产美女 | 韩国三级av在线 | 免费麻豆| 在线亚洲播放 | 成人97视频| 最新中文字幕在线资源 | 国产69精品久久久久久久久久 | 色婷婷 亚洲 | 九九免费在线观看 | 狠狠伊人 | 久久免费视频6 | 亚洲手机天堂 | 日韩在线短视频 | 久久成视频 | 日韩欧美在线不卡 | 中文字幕成人在线观看 | 国产vs久久 | 久久久久久高潮国产精品视 | 日本中文字幕在线一区 | 婷婷六月综合亚洲 | 成人黄色在线观看视频 | 天天艹天天操 | 在线亚洲精品 | 99re在线视频观看 | 国产麻豆传媒 | 99精彩视频在线观看免费 | 免费国产亚洲视频 | 国产精品黄 | 国产高清成人在线 | 天天操天天射天天爱 | 天天综合网国产 | 毛片的网址| 日韩欧美在线观看 | 在线观看av网站 | 久久婷亚洲五月一区天天躁 | 日韩r级电影在线观看 | 日本久久综合网 | 色999精品| 久久国产精品久久w女人spa | 久久精品2 | 欧洲精品视频一区 | 成人免费在线网 | 最近能播放的中文字幕 | 久久久久久高清 | 美女免费av | 狠狠操狠狠 | va视频在线| 亚洲一区黄色 | 亚洲欧美国产精品久久久久 | 人人舔人人射 | 欧美国产日韩一区二区 | 99热在线看 | 久草在线视频首页 | www夜夜操| 国产成人一级 | 色人久久 | 五月激情av| 18性欧美xxxⅹ性满足 | 欧美日韩视频在线观看免费 | 午夜精品一区二区三区在线 | 91av在线播放视频 | 狠狠操天天射 | 97成人资源| 99久久精品免费看国产四区 | 成人在线视频你懂的 | 在线天堂视频 | 色激情在线| 日韩电影一区二区在线 | 中文字幕 在线 一 二 | 日韩在线观看高清 | 欧美一区二区在线免费看 | 日b视频在线观看网址 | 免费在线观看不卡av | 六月丁香激情综合色啪小说 | 久久精品xxx | 国产xx在线 | 精品国产99 | av国产网站 | 欧美一级免费 | 在线视频福利 | 免费看搞黄视频网站 | 五月天综合色激情 | 夜夜躁狠狠躁日日躁视频黑人 | 日韩欧美一区二区三区视频 | 99超碰在线播放 | 久久精品草 | 麻豆视频免费观看 | www.黄色在线 | 久久精品欧美日韩精品 | 久久中文网 | 色999在线| 97超碰资源网 | 国产成人精品一区二三区 | 在线观看国产日韩欧美 | 免费看污网站 | 日韩黄色大片在线观看 | 三级av免费观看 | 国产美女视频 | 婷婷婷国产在线视频 | 日日夜夜草| 国产成人精品综合久久久久99 | 激情久久伊人 | 国产主播99 | 91免费看黄 | 麻豆系列在线观看 | 日韩精品久久久久久久电影99爱 | 国产一区二区高清视频 | 成人h动漫精品一区二 | 97精品国产一二三产区 | 天天爱天天草 | 亚洲欧美综合 | 欧美日韩免费一区二区 | 免费黄色在线网站 | 久久婷婷亚洲 | 97狠狠操 | 天天操欧美 | 操操操影院 | 午夜精品一区二区三区视频免费看 | 伊人影院av | 蜜桃麻豆www久久囤产精品 | 中文资源在线官网 | 国产欧美高清 | 少妇自拍av| 在线免费观看国产视频 | 国产视频一区二区三区在线 | 青青草国产免费 | 国产韩国日本高清视频 | 高清日韩一区二区 | 97偷拍视频| 日本中文字幕在线观看 | 99中文在线 | 日韩xxxxxxxxx| 午夜精品久久久久久久久久久久 | 久草在线免 | 久久久久久久久久电影 | 日韩有码在线播放 | 久久免费视频在线观看30 | 天天想夜夜操 | 国产日韩视频在线播放 | 亚洲婷婷丁香 | 日本夜夜草视频网站 | 久久视频精品在线观看 | 黄色一级大片在线免费看产 | 在线观看一级片 | 特级xxxxx欧美 | 久草在线免费资源站 | 欧美日本在线视频 | 一级性av | 久久免费高清视频 | 色丁香婷婷 | 日日夜夜亚洲 | 欧美一区二区在线免费看 | 欧美片一区二区三区 | 久久精品99北条麻妃 | 狠狠地操 | 在线va视频 | 丝袜美腿在线播放 | 国产精品免费久久久久久久久久中文 | 一级黄色片在线 | 国产一二三四在线视频 | 国产不卡在线观看 | 日韩国产精品久久久久久亚洲 | 久久五月婷婷丁香社区 | 国产一级做a爱片久久毛片a | 国内成人av| 欧美在线一级片 | 成人欧美一区二区三区黑人麻豆 | a一片一级 | 国产护士av| 日韩免费在线视频观看 | 91九色在线 | 国内精品二区 | 国产精品粉嫩 | 在线之家免费在线观看电影 | 成人一级影视 | 亚洲伊人av | 色一级片| 成人黄色大片网站 | 丁香av | 国产精品专区在线观看 | 日本不卡一区二区 | 深夜免费小视频 | 蜜桃视频在线观看一区 | 欧美精品二 | 久久艹国产视频 | 四虎影视精品永久在线观看 | 国产最新视频在线 | 久久久99国产精品免费 | av在线超碰 | 欧美在线99 | 人人插人人舔 | 性色在线视频 | 91精品国产综合久久久久久久 | 中文字幕的 | 亚洲综合欧美精品电影 | 日韩欧美在线中文字幕 | 天天操天天操天天操天天操 | 日韩欧美网站 | 日本久久免费电影 | 中文字幕资源在线观看 | 亚洲精品视频在线 | 狠狠色丁香婷综合久久 | 337p西西人体大胆瓣开下部 | 在线观看中文字幕av | 日日爱夜夜爱 | 国产精品你懂的在线观看 | 欧美大码xxxx | 日日天天狠狠 | 韩国精品视频在线观看 | 国产精品美女www爽爽爽视频 | 在线va网站| 91刺激视频 | 日韩理论 | 日韩精品久久久免费观看夜色 | 国产品久精国精产拍 | 黄色软件在线看 | 亚洲波多野结衣 | 国产精品一区欧美 | 久久国产福利 | 欧美日韩一区二区免费在线观看 | 99精品视频免费全部在线 | 91成品视频 | 日本精品视频在线观看 | 久久久免费看视频 | 国产精品video爽爽爽爽 | 天天操天天曰 | 国产视频精品久久 | www.亚洲激情.com | 亚洲综合在线视频 | 婷婷国产一区二区三区 | 久久午夜精品 | 亚洲电影免费 | 国产视频精品视频 | 337p西西人体大胆瓣开下部 | 国产在线精 | 日韩av一区二区在线 | 欧美成人xxx| 国产黄色播放 | 精品成人国产 | 天天激情综合 | 伊人手机在线 | 99精品国产一区二区三区麻豆 | 欧美做受xxx | 国产高清免费视频 | 探花视频免费在线观看 | 国产成人一区二区三区在线观看 | 久久久久看片 | 国产精品久99 | 日韩精品视频免费专区在线播放 | 超碰伊人网 | 久精品在线观看 | 外国av网| 男女免费av | 999电影免费在线观看 | 五月激情视频 | 日韩久久久久久久久 | 国产免费视频在线 | 欧洲激情在线 | www.国产在线观看 | 韩国在线一区二区 | 国产综合在线观看视频 | 狠狠艹夜夜干 | 国产精品精品国产婷婷这里av | 中文字幕视频网 | 很黄很黄的网站免费的 | 中日韩在线视频 | 中文字幕一区在线观看视频 | 久久久福利| 波多野结衣综合网 | 免费在线观看一级片 | 九九九九精品九九九九 | 成人日批视频 | 九九免费在线观看视频 | 97中文字幕 | 国产精品久久久久一区 | 97精产国品一二三产区在线 | www久久久久| 五月婷在线观看 | 亚洲综合在线发布 | 免费在线h| 国内精品久久久久久久影视麻豆 | 狠狠干成人综合网 | 国产欧美精品xxxx另类 | 久久爱www. | 国产精品h在线观看 | 主播av在线| 国产资源免费在线观看 | 国产一级片一区二区三区 | 国产午夜三级 | 日本黄色免费观看 | 一区在线免费观看 | 爱色婷婷| 成年人视频在线免费观看 | 色多视频在线观看 | 国产视频1区2区 | a电影免费看 | 992tv人人网tv亚洲精品 | 午夜精品久久久久久中宇69 | 久久免视频 | 天天草天天干天天射 | 91香蕉亚洲精品 | 久久精品一区二区三区国产主播 | 国产精品黄网站在线观看 | 美女久久久久 | 日韩免费看 | 又紧又大又爽精品一区二区 | 五月婷婷精品 | 久久久国产电影 | 天天色天天色天天色 | 992tv人人草 黄色国产区 | 精品国内自产拍在线观看视频 | 亚洲日本精品视频 | 成人国产精品久久久 | av电影在线不卡 | 五月天.com| 五月婷在线观看 | 久草视频资源 | 日日夜夜免费精品视频 | 亚洲国产精品第一区二区 | 免费av片在线 | 国产96在线观看 | 欧美激情综合五月 | 亚洲美女精品视频 | 免费激情网 | 成年人免费观看在线视频 | 欧美日韩三级在线观看 | 午夜精品久久久久久久99婷婷 | 中文字幕在线免费 | 特级大胆西西4444www | 伊人天天色| 九色视频自拍 | 欧美精品久久久久久 | 天天躁天天操 | 81精品国产乱码久久久久久 | 91麻豆精品国产91久久久更新时间 | 99色在线播放 | 91大神精品视频在线观看 | 97视频久久久 | 成人免费色 | 国产成人av在线 | 天天玩夜夜操 | 狠狠操天天操 | 午夜精品一区二区国产 | 亚洲天堂精品视频在线观看 | 97超碰中文字幕 | 人人超碰97 | 久久网站最新地址 | 视频成人永久免费视频 | 一区中文字幕在线观看 | av在线专区 | 中文字幕在线久一本久 | 中文字幕视频一区 | 久久久成人精品 | 亚洲一区久久 | 91久久一区二区 | 国产精品久久久久久久久久久久午夜 | av成人资源 | 不卡的av电影 | 日日夜夜操av | 久久精品激情 | 精品福利视频在线观看 | 日韩视频免费 | 亚洲乱码中文字幕综合 | 视色网站| 国内精品久久久久国产 | 免费观看一区二区三区视频 | 在线观看黄色大片 | 亚洲精品videossex少妇 | 日本黄色大片免费看 | 91丨九色丨蝌蚪丰满 | 波多野结衣在线视频免费观看 | 国产在线欧美日韩 | 日韩电影中文字幕 | 亚洲四虎| 欧美性春潮 | 99精品视频免费在线观看 | 欧美日韩高清一区二区三区 | 欧美日韩一区二区三区不卡 | 成人黄色电影视频 | 麻豆免费视频 | 国产高清av在线播放 | 在线久久| av大片免费 | 狠狠色丁香婷婷综合欧美 | 天天操天天射天天添 | 人人玩人人添人人澡97 | 色综合久久88 | 麻豆国产精品视频 | 九九综合九九 | 亚洲精品综合在线 | 婷婷伊人五月天 | 国产又粗又长的视频 | 国产精品毛片一区视频播不卡 | 久久综合狠狠综合 | 久久国产露脸精品国产 | 久久精品网站免费观看 | 欧美一区三区四区 | 国内精品中文字幕 | 国产精品美女免费 | 欧美激情xxxx性bbbb | 欧美 亚洲 另类 激情 另类 | 国产成人精品av在线 | 成人日韩av| 久久精品日产第一区二区三区乱码 | 国内精品在线看 | 六月丁香激情网 | 视频一区二区在线观看 | 久久久69 | 日韩激情视频在线观看 | 成人在线免费看视频 | 久久久精品久久日韩一区综合 | 麻豆av一区二区三区在线观看 | 日韩久久精品一区二区 | 最近中文字幕完整视频高清1 | 成人在线观看av | 日韩欧美在线观看一区二区 | 一区二区精品在线 | 精品人人人人 | 最新日韩视频在线观看 | 国产xxxxx在线观看 | 亚洲最大的av网站 | 亚洲精品国精品久久99热一 | 国产精品大全 | 2023国产精品自产拍在线观看 | 日韩精品1区2区 | 丁香六月激情 | 91精品国产麻豆国产自产影视 | 久久综合桃花 | 国产无套精品久久久久久 | 精品少妇一区二区三区在线 | 日韩精品视频免费在线观看 | 亚洲日本中文字幕在线观看 | 午夜黄色 | 成人蜜桃 | 国产成人久久精品亚洲 | 亚洲高清视频在线观看 | 青青河边草手机免费 | 色香天天 | 久艹视频在线观看 | www视频在线观看 | 中文字幕精品www乱入免费视频 | av.com在线| 国产99免费视频 | 国产精品久久久久久一区二区三区 | 中文字幕在线一二 | 欧美午夜久久 | 亚洲精品小视频在线观看 | 久久8| 欧美一级电影免费观看 | 婷婷六月网 | 久色小说 | 99热只有精品在线观看 | 中文字幕av一区二区三区四区 | 午夜骚影| 国产免费专区 | 国产在线播放一区 | 丝袜美腿一区 | 久久调教视频 | 天天插天天干 | 亚洲精品国产免费 | 99久e精品热线免费 99国产精品久久久久久久久久 | 91网在线观看 | 国产精品久久久电影 | 国产永久免费观看 | 欧美亚洲另类在线视频 | 99精品免费在线 | 午夜影院三级 | 在线免费观看视频一区二区三区 | 久久免费视屏 | 亚洲 欧洲 国产 精品 | 久草久草在线观看 | 久久国产福利 | www·22com天天操 | 国产精品v a免费视频 | 干 操 插 | 992tv人人草| 午夜久久影院 | 国产精品久久久久久久久久ktv | 九九免费精品视频在线观看 | 精品一区91 | 亚洲成人av在线播放 | 在线一区观看 | 国产视频中文字幕 | 国产91精品看黄网站 | 一区二区三区四区精品视频 | 久久的色 | 91九色在线观看 | 国产精品3区 | 日韩av手机在线观看 | 91久久国产露脸精品国产闺蜜 | 色精品视频 | 国产婷婷vvvv激情久 | 啪啪小视频网站 | 中文字幕在线观看第一页 | 久久福利综合 | 免费影视大全推荐 | 亚洲欧美激情精品一区二区 | 超碰在线98| 99精品视频免费观看视频 | 麻豆精品传媒视频 | 婷婷综合视频 | www.天天射.com | 综合久久久久久 | 日韩一级黄色片 | 国产高清区 | 久青草国产在线 | 日韩欧美在线高清 | 中文字幕色在线 | 99爱视频| 九九一级片 | 在线观看av国产 | 99视频精品免费观看, | 久久午夜精品视频 | 亚洲三级黄色 | 国产午夜精品一区二区三区四区 | 日韩成人xxxx | 国产成人无码AⅤ片在线观 日韩av不卡在线 | 高潮久久久 | 欧美日韩亚洲第一 | 一区二区三区精品在线视频 | 福利视频一区二区 | 在线观看亚洲成人 | 国产精品成人av电影 | 久草在线视频网站 | 激情电影影院 | 久久曰视频 | 国产无套视频 | 国产日韩精品一区二区在线观看播放 | 久草av在线播放 | 人人干天天干 | 亚洲黄色在线免费观看 | 国产福利av| 玖玖在线看 | av中文字幕第一页 | 永久免费看av | 色综合久久天天 | 成人免费看电影 | 免费看国产黄色 | 日韩一级电影在线观看 | 超碰免费av | 中文字幕第一页在线播放 | 欧美日韩国产一区二 | 欧美精品一区二区在线观看 | 日韩高清在线不卡 | 亚洲香蕉在线观看 | 中日韩在线视频 | 韩日精品在线 | 超碰97成人 | 天天干天天操天天操 | 成人h动漫精品一区二 | 伊人www22综合色 | 999成人 | 国产区精品 | 香蕉视频在线免费 | 超碰大片 | 天天干 夜夜操 | 久久视频这里只有精品 | 九九久久成人 | 亚洲视频h | 干天天| av网站大全免费 | 午夜久久福利影院 | 中文字幕在线免费看 | 国产成人精品aaa | 国产视频亚洲精品 | 99精品欧美一区二区 | 精品视频在线免费观看 | 国产原创中文在线 | 永久免费精品视频 | 色婷婷在线播放 | 日韩a在线播放 | 欧美日性视频 | 免费看短 | 国产精品去看片 | 久久99久久99精品免观看软件 | 波多野结衣一区二区三区中文字幕 | 色国产视频 | 免费一级日韩欧美性大片 | wwwwww黄| 欧美国产日韩在线观看 | 国产男女免费完整视频 | 亚洲精品久久激情国产片 | av中文字幕在线播放 | 亚洲精品h | 婷婷色在线 | 久久免费精彩视频 | 国产馆在线播放 | 婷婷激情小说网 | 蜜臀久久99精品久久久酒店新书 | 日本中文字幕网 | 91精品一区二区三区久久久久久 | 亚洲黄色在线观看 | 在线91视频 | 一级一片免费看 | 操老逼免费视频 | 久久精品96| 日韩a在线观看 | 少妇bbbb搡bbbb桶 | 波多野结依在线观看 | 国产精品第一视频 | 日韩精品无码一区二区三区 | 超碰在线成人 | 在线免费观看视频 | 人人干人人超 | 岛国精品一区二区 | 久草在线最新免费 | 国产一级一片免费播放放a 一区二区三区国产欧美 | 911免费视频 | 在线天堂中文www视软件 | 国产精品午夜久久 | 色av网站 | 一本一道波多野毛片中文在线 | 天天操夜夜干 | 韩日精品在线 | 狠狠色丁香婷婷综合基地 | 99精品欧美一区二区蜜桃免费 | 天天射天天操天天 | 丁香5月婷婷久久 | 欧美孕交vivoestv另类 | 国产探花视频在线播放 | 狠狠五月婷婷 | 国产精品入口麻豆www | 亚洲女在线 | 亚洲国产中文字幕在线观看 | 夜夜躁天天躁很躁波 | 天天射成人 | 天天操婷婷 | 精品视频www | 国产剧情一区二区 | 亚洲国产精品女人久久久 | 在线观看视频一区二区三区 | 99久国产 | 操久| 国产免费久久精品 | 婷婷开心久久网 | 免费看片黄色 | 亚洲欧美乱综合图片区小说区 | 欧洲黄色片 | 国产精品毛片一区视频播不卡 | 久久毛片高清国产 | 91av资源网 | 午夜国产成人 | 一本一本久久aa综合精品 | 国产成视频在线观看 | 在线激情网 | 美女久久久久久久久久久 | 男女拍拍免费视频 | 在线黄色毛片 | www.福利视频 | 国产亚洲视频在线 | 久久精品在线免费观看 | 国产在线资源 | 国产成人av在线影院 | 日韩高清在线看 | 亚洲九九九 | 91人人澡人人爽 | 中文字幕免费看 | 亚洲精品视频中文字幕 | 天天添夜夜操 | 丁香激情综合国产 | 日韩大片在线免费观看 | 成人av一级片 | 欧美日韩性视频在线 | 波多野结衣在线视频一区 | 亚洲国产精品成人女人久久 | 99r在线观看 | 西西444www大胆高清图片 | a在线一区 | wwwwww国产| 欧美男男激情videos | 在线免费av网 | 国产一级免费片 | 国产精品日韩在线播放 | 亚洲极色| 日韩在线一区二区免费 | 最近2019好看的中文字幕免费 | 日韩视频在线不卡 | 69亚洲视频 | 99视频在线免费 | 日韩精品久久久久久久电影99爱 | 国产精品久久精品国产 | 婷婷五月情 | 99久精品视频 | av电影在线免费 | 国产手机在线 | 99久久99久国产黄毛片 | 色av色av色av | 亚洲精品在线观看网站 | 福利一区二区 | 国产亚洲一级高清 | 色多多视频在线 | 66av99精品福利视频在线 | 日韩 在线 | 亚洲性少妇性猛交wwww乱大交 | 91免费在线视频 | 国产精品一区二区 91 | 日韩免费看 | 久草在线网址 | 久久久观看 | 右手影院亚洲欧美 | 欧美精品久久久久久久久免 | 国产91学生| 九九热中文字幕 | 911久久香蕉国产线看观看 | 国内精品小视频 | 精品国产电影 | 欧美日韩精品久久久 | 久久久天堂 | 麻豆免费观看视频 | 亚洲国产高清视频 | 日韩啪啪小视频 | 免费观看一区 | 特级a老妇做爰全过程 | 欧美综合色| av短片在线| 6080yy精品一区二区三区 | 狠狠狠色丁香婷婷综合激情 | 日韩精品久久久久久久电影99爱 | 91中文字幕在线观看 | 日本中文在线观看 | 国产精品国产三级国产不产一地 | 又黄又刺激又爽的视频 | 午夜资源站 | 国产在线国偷精品产拍 | 美女视频黄免费的 | 人人爽久久涩噜噜噜网站 | 亚洲砖区区免费 | 亚洲最快最全在线视频 | 中文区中文字幕免费看 | 色综合久久综合 |