日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

FlinkSQL实战开发

發布時間:2024/1/16 数据库 64 coder
生活随笔 收集整理的這篇文章主要介紹了 FlinkSQL实战开发 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

FlinkSQL實戰開發

1、基礎知識

FlinkSQL分為Table API和SQL API,是架構于Flink Core之上用SQL予以方便快捷地進行結構化數據處理的上層庫。

  • 工作流程

SQL和Table在進入Flink以后轉化成統一的數據結構表達形式,也就是邏輯計劃(logic plan),其中catalog提供元數據信息,用于后續的優化,邏輯計劃是優化的入門,經過一系列規則后,Flink把初始的邏輯計劃優化為物理計劃(phy plan),物理計劃通過代碼構造器翻譯為Transformation,最后轉換為工作圖(job graph)。

整個過程沒有單獨的流處理和批處理,因為流處理和批處理優化過程和擴建都是共享的。

  • 編程模型

創建Flink SQL運行環境。

將數據源定義成表。

執行SQL語義查詢。

將查詢結果輸出到目標表中。

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.15.2</flink.version>
    <scala.version>2.12.2</scala.version>
    <log4j.version>2.12.1</log4j.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink客戶端-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--本地運行的webUI-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink與kafka整合-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.16</version>
    </dependency>
    <!--狀態后端-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--日志系統-->
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>5.3.21</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.3.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-cep</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--json格式依賴-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--csv格式依賴-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink SQL -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink CDC 的依賴 -->
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.3.0</version>
    </dependency>

    <!-- flink與File整合的依賴 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- Flink On Hive-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>3.1.2</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.calcite.avatica</groupId>
          <artifactId>avatica</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.calcite</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>com.alibaba.fastjson2</groupId>
      <artifactId>fastjson2</artifactId>
      <version>2.0.41</version>
    </dependency>
      <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-compress</artifactId>
      <version>1.21</version>
    </dependency>
      <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-compress</artifactId>
      <version>1.21</version>
    </dependency>
  </dependencies>
  • emp.txt數據
{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20}
{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":351619200000,"sal":1250.0,"comm":500.0,"deptno":30}
{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":354988800000,"sal":2975.0,"comm":null,"deptno":20}
{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":370454400000,"sal":1250.0,"comm":1400.0,"deptno":30}
{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":357494400000,"sal":2850.0,"comm":null,"deptno":30}
{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":360864000000,"sal":2450.0,"comm":null,"deptno":10}
{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":553100400000,"sal":3000.0,"comm":null,"deptno":20}
{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":null,"hiredate":374774400000,"sal":5000.0,"comm":null,"deptno":10}
{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":368726400000,"sal":1500.0,"comm":0.0,"deptno":30}
{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":553100400000,"sal":1100.0,"comm":null,"deptno":20}
{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":376156800000,"sal":950.0,"comm":null,"deptno":30}
{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":376156800000,"sal":3000.0,"comm":null,"deptno":20}
{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":380563200000,"sal":1300.0,"comm":null,"deptno":10}
  • JAVA代碼
    public static void main(String[] args) throws Exception {
        //快速入門
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
        //讀取文本文件數據轉為Table 對象
        DataStream<Emp> source = environment.readTextFile("data/emp.txt")
                .map(lines ->JSONObject.parseObject(lines, Emp.class));
        //把JAVA對象轉為table對象
        //注意Emp對象中hiredate時間戳是Long類型
//        {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
        Table table = tableEnv.fromDataStream(source);
        table.select(Expressions.$("*")).execute().print();
    }
  • 運行環境

TableEnvironment是Table API和SQL的核心概念:

  • 內部catalog中注冊Table
  • 注冊外部的catalog
  • 加載可插拔模式
  • 執行SQL查詢
  • 注冊自定義函數(scalar table aggregation)
  • DataStream和Table之間的轉換

Table與特定的TableEnvironment綁定,不能在同一條查詢中使用不同的TableEnvironment中的表。

輸入源流式還是批式,Table API和SQL查詢都會轉換成DataStream程序。

Table對象的標識位:CataLog.DB.Table

  • 創建方式一
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
  • 創建方式二
   EnvironmentSettings build = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment TabEnv = TableEnvironment.create(build);
  • 創建表

標識符由三個部分組成:catalog 名稱、數據庫名稱以及對象名稱。

如果catalog或者數據庫沒有指明,就會使用當前默認值。

Table可以是虛擬的(視圖views)也可以是常規的表Tables,其中視圖是臨時的存儲在內存中,會話結束臨時表就消失,而tables表示永久化保存的外部數據物理表。

表分類:臨時表(僅存在flink會話中)永久表(元數據保存在catalog中)屏蔽特性(臨時表與永久表同名,臨時表存在永久表就無法訪問,刪除臨時表就可以訪問永久表)

  • 案例
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
        Table table = tabEnv.fromDataStream(source);
//Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查詢指定列數據并設置別名。
        tabEnv.createTemporaryView("t_emp",table);
        tabEnv.sqlQuery("select * from t_emp").execute().print();
  • DataStream轉Table對象
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
        //設置別名并查詢指定列數據
        Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));
  • createTemporaryView

創建臨時視圖(臨時表),第一個參數是注冊的表名([catalog.db.]tableName),第二個參數可以是Tabe對象也可以是DataStream對象,第三個參數是指定的列字段名(可選)。

      Table table = tabEnv.fromDataStream(source);
//Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查詢指定列數據并設置別名。
        tabEnv.createTemporaryView("t_emp",table);
=========================================================================================
  DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
        //設置別名  并指定查詢的列數據
        tabEnv.createTemporaryView("t_emp",source,$("deptno").as("dd"));
        tabEnv.sqlQuery("select * from t_emp").execute().print();

  • 數據類型
  • 原子類型:DataStream中支持的數據類型,Table也是支持的,也就是基本數據類和通用類型(Integer、Double、String等)
  • Tuple類型:從f0開始計數,f0 f1 f2,所有字段都可以被重新排序,也可以提前一部分字段。
  • Pojo類型:Flink 也支持多種數據類型組合成的“復合類型”,最典型的就是簡單 Java 對象(POJO 類型)。將 POJO 類型的 DataStream 轉換成 Table,如果不指定字段名稱,就會直接使用原始 POJO 類型 中的字段名稱。Pojo字段可以被重新排序、提取和重命名。
  • Row類型:Flink 中還定義了一個在關系型表中更加通用的數據類型——行(Row),它是 Table 中數據的基 本組織形式。長度固定,無法推斷出每個字段的類型,在使用時必須聲明具體的類型信息。
  StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = environment.readTextFile("data/dept.txt");
  //所謂的字段重新排序就是查詢出來的指定字段順序可以自定義 
StreamTableEnvironment.create(environment).fromDataStream(source,$("f1")).execute().print();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
        DataStreamSource<Row> source = environment.fromElements(Row.ofKind(RowKind.INSERT, "張三", 20)
                , Row.ofKind(RowKind.INSERT, "李四", 25)
                //RowKind.UPDATE_BEFORE  打標記的作用
                , Row.ofKind(RowKind.UPDATE_BEFORE, "yy", 12)
                , Row.ofKind(RowKind.UPDATE_AFTER, "aaa", 18));
        Table table = tabEnv.fromChangelogStream(source);
        table.execute().print();
  • 查詢表

Table API 是關于 Scala 和 Java 的集成語言式查詢 API。與 SQL 相反,Table API 的查詢不是由字符串指定,而是在宿主語言中逐步構建。

table.groupBy(...).select() ,其中 groupBy(...) 指定 table 的分組,而 select(...) 在 table 分組上的投影

 //{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20}
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        Table table = StreamTableEnvironment.create(environment).fromDataStream(source);
        table.where($("deptno").isEqual(10)).select($("ename"), $("job")).execute().print();
        table.groupBy($("deptno")).select($("deptno"),$("sal").avg().as("sal_avg")).execute().print();

  • SQL語法

StreamTableEnvironment對象有兩個常用的方法:sqlQuery()和executeSql()兩個方法。

  • sqlQuery()主要用于查詢數據,并且可以查詢混用。
  • executeSql()可以用來增刪改查數據都可以。
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tbl = StreamTableEnvironment.create(environment);
        tbl.createTemporaryView("t_emp_demo",source);
        String sql="select deptno,avg(sal) " +
                " from t_emp_demo " +
                " group by deptno ";
         tbl.executeSql(sql).print();
=========================================================================================
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        Table empTable = tableEnvironment.fromDataStream(source);
tableEnvironment.sqlQuery("select * from "+empTable).execute().print();
  • 輸出表

insertInto:Table通過寫入TableSink輸出。TableSink是一個通用接口,包括:

  • 用于支持多種文件格式(如CSV、Apache Parquest、Apache Avro)
  • 存儲系統(如JDBC、Apache Hbase、Apache Cassandra、Es)
  • 消息隊列系統(如Apache kafka、Rabbit MQ)
  • 控制臺寫入并輸出
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.createTemporaryView("t_emp_d",source);
        Table tableSource = tblEnv.fromDataStream(source, $("empno"), $("ename"), $("job"));
        String sql=
                "create table t_emp_r(" +
                "empno Integer," +
                "ename String," +
                "job String) " +
                "with ( " +
                "'connector'='print')";
         tblEnv.executeSql(sql);
         tableSource.insertInto("t_emp_r").execute();
         //t_emp_r 不能當做表進行查詢 只能當做sink端
//        tblEnv.executeSql("select * from t_emp_r").print();
        environment.execute();
  • kafka寫入
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String sqlSource="create table kafka_source( " +
                "deptno int," +
                "dname String," +
                "loc String)" +
                "with (" +
                "'connector'='kafka'," +
                "'topic'='flink_kafka_source'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='flink-zwf'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='csv')";
        tblEnv.executeSql(sqlSource);

        String sqlSink="create table kafka_sink( " +
                "deptno int," +
                "dname String," +
                "loc String)" +
                "with (" +
                "'connector'='kafka'," +
                "'topic'='flink_kafka_sink'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='flink-zwf'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        tblEnv.executeSql(sqlSink);
         //從一張表查詢數據插入到另外一張表中
        tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").execute();
  • 查看執行計劃
tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").printExplain();
  • 表對象轉換為流對象

將一個Table對象轉換成DataStream,直接調用表環境中國的ToDataStream();

  tableEnv.toDataStream(table).print();
  • toChangelogStream

對于有更新操作的表,我們不要視圖直接把它轉換成DataStream打印,而是記錄一下它的更新日志(change log)。

對于表的更新操作的表,就變成了一條更新日志的流,可轉換成流打印輸出。

規則:Insert插入操作編碼是add消息。Delete刪除操作編碼為retract消息 update更新操作則為編碼更改行的retract消息和更新后行的add消息

tableEnv.toChangelogStream(table).print();
  • JDBC連接

Flink 支持連接到多個使用方言(dialect)的數據庫,如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于測試目的。下表列出了從關系數據庫數據類型到 Flink SQL 數據類型的類型映射,映射表可以使得在 Flink 中定義 JDBC 表更加簡單。

  • 常見的數據類型映射

  • 依賴
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>1.15.4</version>
</dependency>
   <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.16</version>
    </dependency>
  • 案例
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
         String jdbcSQL=
                 "create table jdbc_scott_emp(" +
                 "empno int," +
                 "ename string," +
                 "job string," +
                 "mgr int," +
                 "hiredate date," +
                 "sal double," +
                 "comm double," +
                 "deptno int)" +
                 "with (" +
                 "'connector'='jdbc'," +
                 "'url'='jdbc:mysql://master:3306/scott?serverTimeZone=Asia/Shanghai'," +
                 "'table-name'='emp'," +
                 "'driver'='com.mysql.cj.jdbc.Driver'," +
                 "'username'='root'," +
                 "'password'='Root@123456.')";
        tblEnv.executeSql(jdbcSQL);
        tblEnv.sqlQuery("select * from jdbc_scott_emp").execute().print();
  • SQL語句(jdbc數據插入操作、時態關聯創建維表)
-- 從另一張表 "T" 將數據寫入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;


-- JDBC 表在時態表關聯中作為維表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
  • DataGen SQL連接器

用于生成模擬數據,DataGen 連接器允許按數據生成規則進行讀取。

不支持復雜類型: Array,Map,Row。 請用計算列構造這些類型。

連接器參數

  • 案例
 //按照一定規則隨機生成數據
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String SqlStr="CREATE TABLE datagen (\n" +
                " f_sequence INT,\n" +
                " f_random INT,\n" +
                " f_random_str STRING,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.f_sequence.kind'='sequence',\n" +
                " 'fields.f_sequence.start'='1',\n" +
                " 'fields.f_sequence.end'='1000',\n" +
                " 'fields.f_random.min'='1',\n" +
                " 'fields.f_random.max'='1000',\n" +
                " 'fields.f_random_str.length'='10'\n" +
                ")";
        tblEnv.executeSql(SqlStr);
        tblEnv.sqlQuery("select * from datagen").execute().print();
  • Upsert Kafka SQL連接器

由于flink是流式計算,會出現相同的key值數據寫入,在寫入kafka中,同一個key生成的value值會不斷被更新(update -u u+標記),如果沒有重復的key則被插入(insert +i標記),如果value為空值就會被標記刪除(delete +d標記)。

作為 sink,upsert-kafka 連接器可以消費 changelog 流。它會將 INSERT/UPDATE_AFTER 數據作為正常的 Kafka 消息寫入并將 DELETE 數據以 value 為空的 Kafka 消息寫入(表示對應 key 的消息被刪除)。Flink 將根據主鍵列的值對數據進行分區,從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區中。

  • 案例
//使用datagen模擬數據
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String dataGen=
                "create table t_dataGen(" +
                        "deptno int," +
                        "salnum int," +
                        "ts AS localtimestamp," +
                        "WATERMARK FOR ts AS ts" +
                        ") with ( " +
                        "'connector'='datagen'," +
                        "'rows-per-second'='2'," +
                        "'fields.deptno.min'='88'," +
                        "'fields.deptno.max'='99'," +
                        "'fields.salnum.min'='10'," +
                        "'fields.salnum.max'='20')";
        tblEnv.executeSql(dataGen);
//        tblEnv.sqlQuery("select deptno,sum(salnum) as salnum from t_dataGen group by deptno").execute().print();
        //kafka sink端
        String kafkaSink="create table upsert_kafka_num(" +
                "deptno int," +
                "salnum int," +
                "primary key(deptno) not enforced)" +
                "with(" +
                "'connector'='upsert-kafka'," +
                "'topic'='upsert_kafka'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'key.format'='csv'," +
                "'value.format'='json')";
        tblEnv.executeSql(kafkaSink);
          //插入數據
        tblEnv.executeSql("insert into upsert_kafka_num select deptno,sum(salnum) as salnum from t_dataGen group by deptno");
  • FileSystem連接器

文件系統分為:本地文件系統、外部文件系統。

本地文件系統:ink 原生支持本地機器上的文件系統,包括任何掛載到本地文件系統的 NFS 或 SAN 驅動器,默認即可使用,無需額外配置。本地文件可通過 file:// URI Scheme 引用。

外部文件系統:常見的有HDFS、clickhouse、HBase,上述文件系統可以并且需要作為插件使用。

使用外部文件系統時,在啟動 Flink 之前需將對應的 JAR 文件從 opt 目錄復制到 Flink 發行版 plugin 目錄下的某一文件夾中。

  • 本地文件測試
 public static void main(String[] args) {
        //設置環境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String sqlDemo="create table t_dept_d(" +
                "deptno int," +
                "dname string," +
                "loc string)" +
                "with(" +
                "'connector'='filesystem'," +
                "'path'='data/dept.txt'," +
                "'format'='csv'" +
                ")";
         tblEnv.executeSql(sqlDemo);
         tblEnv.sqlQuery("select * from t_dept_d").execute().print();
    }
  • HDFS分布文件系統測試
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.3.4</version>
    </dependency>
<!--加載一些其他配置文件 比如core-site.xml dfs-core.xml yarn-site.xml等配置文件進resource目錄-->
  StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String hdfsSql="create table dfs_dept(" +
                "deptno int," +
                "dname string," +
                "loc string)" +
                "with (" +
                "'connector'='filesystem'," +
                "'path'='hdfs://hdfs-zwf/dept.txt'," +
                "'format'='csv')";
            tblEnv.executeSql(hdfsSql);
            tblEnv.sqlQuery("select * from dfs_dept").execute().print();

    }

4、Schema結構

  • Pythsical column

物理字段:源自于外部存儲系統本身schema中的字段

  • kafka消息的key、value中的字段
  • mysql表中的字段
  • hive表中的字段
  • parquet文件中的字段
  • computed column

表達式字段:在物理字段上施加一個sql表達式,并將表達式結果定義為一個字段.

// 第一種sqlAPI 
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);

        String sqlStr="create table upsert_info(" +
                "deptno int," +
                "salnum2 as salnum*100,"+    //計算列
                "salnum int)" +
                "with (" +
                "'connector'='kafka'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'topic'='upsert_kafka'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        tblEnv.executeSql(sqlStr);
        tblEnv.sqlQuery("select * from upsert_info").execute().print();


//第二種方式  TableAPI
        tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
                        .schema(Schema.newBuilder()
                                .column("deptno", DataTypes.INT())
                                .column("salnum",DataTypes.INT())
                                .columnByExpression("salpluns","salnum*100")
                                .build()).option("connector","kafka")
                .option("topic","upsert_kafka")
                .option("scan.startup.mode","earliest-offset")
                .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
                .format("json").build());
        tblEnv.sqlQuery("select * from kafka_dept").execute().print();
  • metadata column

元數據字段:來源于connector從外部存儲系統中獲取到外部系統元信息。

kafka消息,通常意義上的數據內容是在record的key和value中,但是kafka還會攜帶所屬partition、offset、timestamp等元信息。而flink的連接器可以獲取并暴露這些元信息,允許用戶將信息定義成flinksql表中的字段。

//第一種sqlAPi
        String sqlStr="create table upsert_info(" +
                "deptno int," +
                "salnum2 as salnum*100," +   //計算列
                "event_time timestamp_ltz(3) metadata from 'timestamp',"+    //metadata列
                "salnum int)" +
                "with (" +
                "'connector'='kafka'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'topic'='upsert_kafka'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        tblEnv.executeSql(sqlStr);
        tblEnv.sqlQuery("select * from upsert_info").execute().print();

//第二種方式  TableAPI
        tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
                        .schema(Schema.newBuilder()
                                .column("deptno", DataTypes.INT())
                                .column("salnum",DataTypes.INT())
                                
                                //metadata column
                                .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
                                .columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
                                .build()).option("connector","kafka")
                .option("topic","upsert_kafka")
                .option("scan.startup.mode","earliest-offset")
                .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
                .format("json").build());
        tblEnv.sqlQuery("select * from kafka_dept").execute().print();
  • 主鍵約束

單字段主鍵約束語法:

// SQL API
id INT PRIMARY KEY NOT ENFORCED,
name STRING

// Table Api
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
                        .schema(Schema.newBuilder()
                                .column("deptno", DataTypes.INT())
                                //設置主鍵字段 primary key
                                .primaryKey("deptno")
                                .column("salnum",DataTypes.INT())
                                //metadata column
                                .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
                                .columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
                                .build()).option("connector","kafka")

多字段主鍵約束語法:

-- SQL API
id,
name,
PRIMARY KEY(id,name) NOT ENFORCED

//Table API    
 tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
                        .schema(Schema.newBuilder()
                                .column("deptno", DataTypes.INT())
                                //設置主鍵字段 primary key
                                .primaryKey("deptno","event_time")
                                .column("salnum",DataTypes.INT())
                                //metadata column
                                .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
                                .columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
                                .build()).option("connector","kafka")
                    
                    
//第一種sqlAPi
        String sqlStr="create table upsert_info(" +
                "deptno int," +//計算列
                "event_time timestamp_ltz(3) metadata from 'timestamp',"+  //metadata列
                "dname string," +
                "loc string," +
                "primary key(deptno,loc) not enforced)" +
                "with (" +
                "'connector'='upsert-kafka'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'topic'='flink_kafka_source'," +
                "'key.format'='csv'," +
                "'value.format'='json')";
        tblEnv.executeSql(sqlStr);
        tblEnv.sqlQuery("select * from upsert_info").execute().print();

注意的是:kafka連接器模式下不能設置主鍵,但是upsert-kafka連接器模式必須設置主鍵!主鍵字段不能有空值

在upsert-kafka模式下,key和value值不能為空,否則在csv模式中會解析失敗!

5、FlinkSQL Format

connector 連接器:對接外部存儲時, 根據外部存儲中的數據格式不同, 需要用到不同的 format 組件;

format 組件:作用就是告訴連接器, 如何解析外部存儲中的數據及映射到表 schema;

使用基本步驟:

  • 導入format組件的jar依賴
  • 指導format組件名稱
  • 設置format組件所需的參數
  • FlinkSQL支持的Format

  • 案例
<!--json格式依賴-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--csv格式依賴-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>${flink.version}</version>
    </dependency>
  • 案例
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)

CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)

動態表 是 Flink 的支持流數據的 Table API 和 SQL 的核心概念。與表示批處理數據的靜態表不同,動態表是隨時間變化的。可以像查詢靜態批處理表一樣查詢它們。查詢動態表將生成一個連續查詢(Continuous Query) 。一個連續查詢永遠不會終止,結果會生成一個動態表。查詢不斷更 新其(動態)結果表,以反映其(動態)輸入表上的更改。本質上,動態表上的連續查詢非常類似于定 義物化視圖的查詢。

需要注意的是,連續查詢的結果在語義上總是等價于以批處理模式在輸入表快照上執行的相同查詢的結果。

與spark、hive組件中的表最大不同之處在于flink SQL中的表是動態表。flink核心就是對有界或者*的數據流處理,是流式持續處理的過程。

  • 連續查詢

在動態表上計算一個連續查詢,生成一個新的動態表。與批處理查詢不同,連續查詢從不終止,根據其輸入表上的更新其結果表。在任何時候,連續查詢的結果在語義上與批處理模式在輸入表快照上執行相同查詢的結果相同。

  • 事件時間

創建表的DDL,增加一個字段,通過watermark語句來定義事件時間屬性。

WATERMARK 語句主要用來定義水位線(watermark)的生成表達式,這個表達式會將帶有事件 時間戳的字段標記為事件時間屬性,并在它基礎上給出水位線的延遲時間。

//水位線   設置延遲時間5s
        String eventTime="create table proc_dept_tbl(" +
                "deptno int," +
                "dname string," +
                "loc string," +
                "ts timestamp_ltz(3) metadata from 'timestamp'," +
                "watermark for ts as ts-interval '5' second" +    // pt是事件處理
                ")with( " +
                "'connector'='kafka'," +
                "'topic'='flink_kafka_sink'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(eventTime);
        tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print();


//Table API
 tblEnv.createTable("t_water_mark", TableDescriptor.forConnector("kafka")
                         .option("topic","flink_kafka_sink")
                         .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
                         .option("properties.group.id","zwf")
                         .option("scan.startup.mode","earliest-offset")
                         .format("json")
                         .schema(Schema.newBuilder()
                                 .column("deptno",DataTypes.INT())
                                 .column("dname",DataTypes.STRING())
                                 .column("loc",DataTypes.STRING())
                                 .columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true)
                                 .watermark("ts","ts-interval '5' second").build()).build());
        tblEnv.sqlQuery("select deptno,dname,ts from t_water_mark").execute().print();
  • 處理時間

定義處理時間屬性時,必須要額外聲明一個字段,專門用來保存當前的處理時間

在創建表的 DDL(CREATE TABLE 語句)中,可以增加一個額外的字段,通過調用系統內置的 PROCTIME()函數來指定當前的處理時間屬性,返回的類型是 TIMESTAMP_LTZ

  • 案例
   //Flink SQL 水位線   處理時間
        String procTime="create table proc_dept_tbl(" +
                "deptno int," +
                "dname string," +
                "loc string," +
                "pt as proctime()" +    // pt是事件處理
                ")with( " +
                "'connector'='kafka'," +
                "'topic'='flink_kafka_sink'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(procTime);
        tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print();


//使用TableApi執行
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        Table table = tblEnv.fromDataStream(source, Schema.newBuilder()
                        .column("empno",DataTypes.INT())
                        .column("ename", DataTypes.STRING())
                        .column("job",DataTypes.STRING())
                        .column("mgr",DataTypes.INT())
                        .column("hiredate",DataTypes.BIGINT())
                        .column("sal",DataTypes.DOUBLE())
                        .column("comm",DataTypes.DOUBLE())
                        .column("deptno",DataTypes.INT())
                        .columnByExpression("ts","proctime()")
                .build());
 tblEnv.sqlQuery("select empno,ename,ts from"+table.toString()).execute().print();
  • DataStream定義時間

處理時間屬性同樣可以在將DataStream轉換為表的時候來定義。我們調用fromDataStream()方法 創建表時,可以用.proctime()后綴來指定處理時間屬性字段

由于處理時間是系統時間,原始數據中并沒有這個字段,所以處理時間屬性一定不能定義在一個已 有字段上,只能定義在表結構所有字段的最后,作為額外的邏輯字段出現

      //快速入門
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        environment.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
        //讀取文本文件數據轉為Table 對象
        DataStream<Emp> source = environment.readTextFile("data/emp.txt")
                .map(lines ->JSONObject.parseObject(lines, Emp.class));
        //把JAVA對象轉為table對象
        //注意Emp對象中hiredate時間戳是Long類型
//        {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
        Table table = tableEnv.fromDataStream(source,$("empno"),$("ename"),$("ts").proctime());

        table.select($("*")).execute().print();

7、FlinkSQL 窗口TVF

  • TVF窗口化表值函數
  • 目前flink提供了以下幾個窗口:
    • 滑動窗口
    • 滾動窗口
    • 累積窗口
    • 會話窗口
  • 窗口TVF的返回值中,除去原始表中的所有列,增加描述窗口的額外3個列:

? 窗口起始點:窗口開始起始時間

? 窗口結束點:窗口結束時間

? 窗口時間:窗口結束時間-1

滾動窗口在DataStream API中的定義完全一樣,是長度固定、時間對齊、無重疊的窗口,一般用于周期性的統計計算。

Tumble(table data,timecol,size[,offset])函數三個必需參數:

data:表參數,此表需要包含一個時間屬性列

timecol:一個描述符,指示數據的哪個時間屬性列應該映射到滾動的窗口

size:指定滾動窗口的大小

  • 案例
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        //執行SQL  隨機生成gid和sales   gid隨機值取10到20 sales隨機值取1到9
        //ts 使用本地時間 水位線是本地時間延遲5s
        tblEnv.executeSql("CREATE TABLE t_goods (\n" +
                " gid INT,\n" +
                " sales INT,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.gid.min'='10',\n" +
                " 'fields.gid.max'='20',\n" +
                " 'fields.sales.min'='1',\n" +
                " 'fields.sales.max'='9'\n" +
                ")");
//           tblEnv.sqlQuery("select * from t_goods").execute().print();
        //使用滾動窗口  5s滾動計算一次
//        String tumbleWin="select * from table(tumble(table t_goods,descriptor(ts),interval '5' second))";
//        tblEnv.sqlQuery(tumbleWin).execute().print();
        //每個時間窗口中每個guid中總銷售額信息
        tblEnv.sqlQuery(
                "select window_start,window_end,gid,sum(sales) as sum_sales " +
                        "from table(tumble(table t_goods,descriptor(ts),interval '5' second))" +
                        "group by window_start,window_end,gid"
        ).execute().print();
  • 滑動窗口

Hopping windows也稱為"sliding windows"

HOP函數分配的窗口覆蓋大小間隔內的行,并根據時間屬性性列移動每個窗口

HOP函數有三個必需的參數:HOP(Table data,slide,size[,offset])

  • data:表格值,帶有時間戳字段的表格。
  • slide:指定順序hopping窗口開始之間的持續時間。
  • size:指定hopping窗口寬度的持續時間,size必須是slide的整數倍

  • 案例
  //滑動窗口表值函數  窗口表值函數
        //隨機生成gid大小是10到20 sales大小是1到10
       String datagen="create table t_datagen(" +
               "gid int," +
               "sales int," +
               "ts as localtimestamp," +
               "watermark for ts as ts-interval '5' second" +
               ") with (" +
               "'connector'='datagen'," +
               "'rows-per-second'='10'," +
               "'fields.gid.min'='10'," +
               "'fields.gid.max'='20'," +
               "'fields.sales.min'='1'," +
               "'fields.sales.max'='10')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(datagen);
//        tblEnv.sqlQuery("select * from t_datagen").execute().print();
        //窗口大小是15s 滑動3s
         tblEnv.sqlQuery("select gid,sum(sales),window_start,window_end from table(hop(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
  • 累積窗口

CUMULATE函數將元素分配給覆蓋在初始步長間隔內的行,并將每一步擴展為多一個步長(保持 window start固定),直到最大窗口大小。

可以把cumulative函數看作應用TUMBLE窗口,首先使用最大窗口大小,然后將每個滾動窗口分 割成幾個具有相同窗口開始和窗口結束步長差異的窗口。

因此,累積窗口確實是重疊的,而且沒有固定的大小。

cumulate函數有三個必須的參數:

cumulate(table data,descriptor(timecol),step,size)——必須參數有以下:

  • data:表格參數,表格必須包含一個時間屬性列
  • timecol:時間屬性字段,也就是使用那個時間。
  • step:連續累積窗口結束之間增加的窗口大小的持續時間。
  • size:累積窗口的最大寬度的持續時間。大小必須是步長的整數倍。
  • 案例
 //累加窗口大小時間
        //滑動窗口表值函數  窗口表值函數
        //隨機生成gid大小是10到20 sales大小是1到10
        String datagen="create table t_datagen(" +
                "gid int," +
                "sales int," +
                "ts as localtimestamp," +
                "watermark for ts as ts-interval '5' second" +
                ") with (" +
                "'connector'='datagen'," +
                "'rows-per-second'='10'," +
                "'fields.gid.min'='10'," +
                "'fields.gid.max'='20'," +
                "'fields.sales.min'='1'," +
                "'fields.sales.max'='10')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
           environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(datagen);
//每3s計算一次 并進行累加  比如19:45-19:48:10  19:48-19:51:20=>19:45-19:51:30
        tblEnv.sqlQuery("select window_start,window_end,gid,sum(sales) as sales_sum from table(cumulate(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
  • 分組去重

group+distinct:表示分組+去重,在用于uv統計時就需要!

  • 案例
   //用于網站統計 uv 用戶訪問數  pv 頁面訪問數
        String websiteSQL="create table wbSiteNum(" +
                "gid int," +
                "url string," +
                "ts as localtimestamp," +
                "watermark for ts as ts-interval '5' second" +
                ")with(" +
                "'connector'='datagen'," +
                "'fields.gid.min'='1000'," +
                "'fields.gid.max'='2000'," +
                "'fields.url.length'='10')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(websiteSQL);
//        tblEnv.sqlQuery("select * from wbSiteNum").execute().print();
      tblEnv.sqlQuery(
              "select count(distinct gid) as uv,count(url) as pv\n" +
                      "from wbSiteNum"
      ).execute().print();

8、FlinkSQL聚合函數

  • 分組聚合

在SQL中一般所說的聚合,通過一些內置的函數來實現,比如SUM、MAX、MIN、AVG、以及count。

它得特點是對多條輸入數據進行計算,得到一個唯一的值,屬于多對一的轉換。比如我們可以通過下面的代碼計算輸入數據的個數。更多時候,我們通過group by子句指定分組的鍵,從而對數據按照某個字段做一個分組統計。

  • 案例
 //分組求和
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
                "FROM (VALUES\n" +
                " ('省1','市1','縣1',100),\n" +
                " ('省1','市2','縣2',101),\n" +
                " ('省1','市2','縣1',102),\n" +
                " ('省2','市1','縣4',103),\n" +
                " ('省2','市2','縣1',104),\n" +
                " ('省2','市2','縣1',105),\n" +
                " ('省3','市1','縣1',106),\n" +
                " ('省3','市2','縣1',107),\n" +
                " ('省3','市2','縣2',108),\n" +
                " ('省4','市1','縣1',109),\n" +
                " ('省4','市2','縣1',110))\n" +
                "AS t_person_num(pid, cid, xid,num)\n" +
                "GROUP BY pid;").execute().print();
  • rollup

維度的上卷,字段維度從細粒度上轉變粗粒度!

  //分組求和  rollup(pid,cid,xid)  維度從粗粒度到細粒度 pid->cid->xid
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
                "FROM (VALUES\n" +
                " ('省1','市1','縣1',100),\n" +
                " ('省1','市2','縣2',101),\n" +
                " ('省1','市2','縣1',102),\n" +
                " ('省2','市1','縣4',103),\n" +
                " ('省2','市2','縣1',104),\n" +
                " ('省2','市2','縣1',105),\n" +
                " ('省3','市1','縣1',106),\n" +
                " ('省3','市2','縣1',107),\n" +
                " ('省3','市2','縣2',108),\n" +
                " ('省4','市1','縣1',109),\n" +
                " ('省4','市2','縣1',110))\n" +
                "AS t_person_num(pid, cid, xid,num)\n" +
                "GROUP BY rollup(pid,cid,xid)").execute().print();
  • cube

所有維度分組顯示,也就是正方體原則!比如(col1,col2,col3)2^3個維度表示。

tableEnvironment.sqlQuery("SELECT pid, cid, xid, sum(num) AS total\n" +
            "FROM (VALUES\n" +
            " ('省1','市1','縣1',100),\n" +
          " ('省1','市2','縣2',101),\n" +
           " ('省1','市2','縣1',102),\n" +
          " ('省2','市1','縣4',103),\n" +
            " ('省2','市2','縣1',104),\n" +
           " ('省2','市2','縣1',105),\n" +
             " ('省3','市1','縣1',106),\n" +
          " ('省3','市2','縣1',107),\n" +
           " ('省3','市2','縣2',108),\n" +
            " ('省4','市1','縣1',109),\n" +
                 " ('省4','市2','縣1',110))\n" +
           "AS t_person_num(pid, cid, xid, num)\n" +
          "GROUP BY CUBE(pid, cid, xid)").execute().print();
  • grouping Sets

自定義維度分組,以下案例(pid, cid, xid),(pid, cid),(pid), ()自定義四個維度分組。

  //自定義維度分組 
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
                "FROM (VALUES\n" +
                " ('省1','市1','縣1',100),\n" +
                " ('省1','市2','縣2',101),\n" +
                " ('省1','市2','縣1',102),\n" +
                " ('省2','市1','縣4',103),\n" +
                " ('省2','市2','縣1',104),\n" +
                " ('省2','市2','縣1',105),\n" +
                " ('省3','市1','縣1',106),\n" +
                " ('省3','市2','縣1',107),\n" +
                " ('省3','市2','縣2',108),\n" +
                " ('省4','市1','縣1',109),\n" +
                " ('省4','市2','縣1',110))\n" +
                "AS t_person_num(pid, cid, xid,num)\n" +
                "GROUP BY GROUPING SETS ((pid, cid, xid),(pid, cid),(pid), ())").execute().print();

9、開窗函數

比如說,我們可以以每一行數據為基準,計算它之前 1 小時內所有數據的平均值;也可以計算它 之前 10 個數的平均值。 就好像是在每一行上打開了一扇窗戶、收集數據進行統計一樣,這就是所謂的“開窗函數”。

分組聚合、窗口 TVF聚合都是“多對一”的關系,將數據分組之后每組只會得到一個聚合結果;

而開窗函數是對每行都要做一次開窗聚合,因此聚合之后表中的行數不會有任何減少,是一 個“多對多”的關系.

  • 基本語法
SELECT 
<聚合函數> OVER ( [PARTITION BY <字段 1>[, <字段 2>, ...]] ORDER BY <時間屬性字段> <開窗范圍>)
, ... 
FROM ...
  • over():關鍵字前面是一個聚合函數,它會應用在后面over定義的窗口上,有如下參數:

? 1、partition by(可選)

? 用來指定分區的鍵,類似于group by的分組,這部分是可選的。

? 2、order by (必選)

? OVER 窗口是基于當前行擴展出的一段數據范圍,選擇的標準可以 基于時間也可以基于數量 。

? 在 Flink 的流處理中,目前只支持按照時間屬性的升序排列,所以這里 ORDER BY 后面 的字段必須是定義好的時間屬性

? 開窗范圍:

? 1、對于開窗函數而言,還有一個必須要指定的就是開窗的范圍,也就是到底要擴展多少行來做聚合。

? 2、這個范圍是由between<下界>and<上界>來定義,也就是"從下界到上界"的范圍。

? 3、目前上界只能是current row,也就是定義一個”從之前某一行到當前行“的范圍

? 4、開窗選擇的范圍可以基于時間,也可以基于數據的數量。所以開窗范圍還應該在兩種模式之間做出選擇:

  • 行間隔(rows intervals )
  1. 行間隔以rows為前綴,就是直接確定要選多少行,由當前行出發向前選取多少行。
  2. 例如開窗函數選擇當前行之前的5行數據:ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  • 范圍間隔(range intervals 以時間劃分范圍)

    • 范圍間隔:范圍間隔以range為前綴,就是基于order by指定時間字段去選擇一個范圍,一般就是當前行時間戳之前的一段時間。
    • 例如:開窗范圍選擇當前行之前1小時的數據:RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  • 案例

//執行環境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        //執行SQL
        tableEnvironment.executeSql("CREATE TABLE t_goods (\n" +
                " gid STRING,\n" +
                " type INT,\n" +
                " price INT,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.gid.length'='10',\n" +
                " 'fields.type.min'='1',\n" +
                " 'fields.type.max'='5',\n" +
                " 'fields.price.min'='1',\n" +
                " 'fields.price.max'='9'\n" +
                ")");
        //截止當前 前10s的每個類型的平均價格
        tableEnvironment.sqlQuery(
                "select tg.*,avg(price) over(partition by type order by ts range between interval '10' second preceding and current row) as price_avg\n" +
                        "from t_goods tg"
        ).execute().print();



        //截止當前 前10行的每個類型商品的平均價格
        tableEnvironment.sqlQuery(
                "select tg.*,avg(price) over(partition by type order by ts rows between 10 preceding and current row) as price_avg\n" +
                        "from t_goods tg"
        ).execute().print();
  • TopN

在 Flink SQL 中,是通過 OVER 聚合和一個條件篩選來實現TopN的。

利用row_number()函數為每一行數據聚合得到一個排序之后的行號,行號為row_num,并在外層的查詢中以row_num<=N作為條件進行篩選,就可以得到根據排序字段統計的topN結果了。

FlinkSQL專門用over聚合做了優化實現,只有在topN的應用場景中,over窗口oder by后才可以指定其他排序字段,要實現top N要嚴格按照上面格式定義,否則FlinkSQL優化器將無法正常解析。而且目前TableApi不支持row_number()函數,只有SQL API實現TopN方式

SELECT ... FROM ( SELECT ..., 
ROW_NUMBER() OVER ( [PARTITION BY <字段 1>[, <字段 1>...]] ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...] ) AS row_num FROM ...) 
WHERE row_num <= N [AND <其它條件>]
  • 案例
 //窗口排序
        String dataGenDemo="create table t_datagen(" +
                "gid string," +
                "price int," +
                "type int," +
                "ts as localtimestamp," +
                "watermark for ts as ts-interval '10' second" +
                ")with(" +
                "'connector'='datagen'," +
                "'fields.gid.length'='10'," +
                "'rows-per-second'='10'," +
                "'fields.price.min'='100'," +
                "'fields.price.max'='999'," +
                "'fields.type.min'='1'," +
                "'fields.type.max'='1')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(dataGenDemo);
//        tblEnv.sqlQuery("select * from t_datagen").execute().print();
        String topNStr="select * from\n" +
                "(select d.*,row_number() over(partition by type order by price desc) as row_num from\n"+
                "t_datagen d) where row_num<=3";
         tblEnv.sqlQuery(topNStr).execute().print();

=========================================================================================
      //滾動窗口每5s滾動一次 每種類型排名前3的商品信息
        String topNWin=" select *\n" +
                "         from(\n" +
                "          select *,row_number() over(partition by type order by price desc) as row_num\n" +
                "          from table(tumble(table t_datagen,descriptor(ts),interval '5' second))\n" +
                "          ) where row_num<=3";
        tblEnv.sqlQuery(topNWin).execute().print();
=========================================================================================

  • 窗口TopN
//查詢10秒內  每個窗口銷售總額最高的前三名的種類
        String topNWinSql="select * " +
                "        from(select type,t_price,window_start,window_end,row_number() over(partition by window_start,window_end order by t_price desc) as row_num\n" +
                "        from (\n" +
                "        select type,window_start,window_end,sum(price) as t_price\n" +
                "         from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" +
                "         group by type,window_start,window_end\n" +
                "        ))where row_num<=3";
        tblEnv.sqlQuery(topNWinSql).execute().print();

//查詢10秒內  每個種類中銷售總額最高的前三名的商品
        String topNWinSql="select * " +
                "  from (select gid,type,window_start,window_end,row_number() over(partition by window_start,window_end,type,gid order by price desc) as row_num\n" +
                "        from (\n" +
                "        select *\n" +
                "         from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" +
                "        ) )" +
                "where row_num<=3";
        tblEnv.sqlQuery(topNWinSql).execute().print();

10、Join窗口聯結

與標準SQL一致,Flink SQL的常規聯結分為內聯結(inner join)和外聯結(outer join),區別在于結果中是否包含不符合聯結條件的行。目前僅支持等值條件作為聯結條件,也就是關鍵字ON后面必須是判斷兩表中字段相等的邏輯表達式

  • 等值內聯結,會返回兩表中符合聯接條件的所有行組合(動態表關聯)
    //生成兩股數據流
        String dataStr="create table  dataGen_demo(" +
                "gid string," +
                "type int," +
                "price int," +
                "ts1 as localtimestamp," +
                "watermark for ts1 as ts1-interval '5' second" +
                ") with (" +
                "'connector'='datagen'," +
                "'rows-per-second'='1'," +
                "'fields.gid.length'='10'," +
                "'fields.type.min'='1'," +
                "'fields.type.max'='30'," +
                "'fields.price.min'='100'," +
                "'fields.price.max'='999')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(dataStr);
//        tblEnv.sqlQuery("select * from dataGen_demo").execute().print();

        String dataStr1="create table  dataGen_demo1(" +
                "type int," +
                "tname string," +
                "price int," +
                "ts2 as localtimestamp," +
                "watermark for ts2 as ts2-interval '5' second" +
                ") with (" +
                "'connector'='datagen'," +
                "'rows-per-second'='1'," +
                "'fields.tname.length'='10'," +
                "'fields.type.kind'='sequence'," +
                "'fields.type.start'='1'," +
                "'fields.type.end'='50'," +
                "'fields.price.min'='300'," +
                "'fields.price.max'='400')";
        tblEnv.executeSql(dataStr1);
        tblEnv.sqlQuery("select * from dataGen_demo inner join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
  • 等值外聯結

left join: 左外連接 ,左表數據全部顯示,在內存等待數據匹配,匹配后刪除原來未匹配的數據重新顯示。

right join: 右外連接,右表數據全部顯示,在內存等待數據匹配,匹配后刪除原來未匹配的數據重新顯示。

full join:不管數據是否匹配,左右表的數據全部顯示,不管哪個表在內存中匹配到數據都先刪除未匹配的數據,重新顯示已經匹配的數據。

tblEnv.sqlQuery("select * from dataGen_demo left join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();

tblEnv.sqlQuery("select * from dataGen_demo full join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
  • 時間間隔聯接查詢

兩條流的join對應著SQL中兩個表的join,是流處理中特有的聯結方式。

目前 Flink SQL 還不支持窗口聯結,而間隔聯結則已經實現,這里除了符合約束條件的兩條中數據的笛卡爾積,多了一個時間間隔的限制。

具體語法:間隔聯結不需要用join關鍵字,直接在from后將聯結兩表列出來的就可以,用逗號分割。聯結條件用where子句來定義,用一個等值表達式描述。交叉聯結之后用where進行條件篩選,效果跟內聯結inner join... on ... 非常類似,我們可以在where子句中,聯結條件后用and追加一個時間間隔的限制條件。

 String dataStr1="create table  dataGen_demo1(" +
                "type int," +
                "tname string," +
                "price int," +
                "ts2 as localtimestamp," +
                "watermark for ts2 as ts2-interval '5' second" +
                ") with (" +
                "'connector'='datagen'," +
                "'rows-per-second'='1'," +
                "'fields.tname.length'='10'," +
                "'fields.type.kind'='sequence'," +
                "'fields.type.start'='1'," +
                "'fields.type.end'='50'," +
                "'fields.price.min'='300'," +
                "'fields.price.max'='400')";
        tblEnv.executeSql(dataStr1);
        tblEnv.sqlQuery("select * from dataGen_demo d,dataGen_demo1 g where d.type=g.type and d.ts1 between g.ts2-interval '5' second and g.ts2+interval '5' second").execute().print();

11、FlinkSQL Client

Flink提供了SQL Client,有了它我們可以向hive的beeline一樣直接在控制臺編寫SQL并提交作業。

Flink SQL client支持運行在standalone集群和yarn集群上。提交任務的命令有所不同。

  • Standalone集群(普通模式啟動)
##啟動集群、前提已經配置好flink環境變量
start-cluster.sh
##啟動客戶端
sql-client.sh embedded
  • Yarn集群

前提要開啟hadoop-yarn大數據架構。

flink每次啟動yarn-session,都會創建一個/temp/.yarn-properties-root文件,記錄了最近一次提交的yarn session對應的Application ID。注意:啟動Yarn Session和SQL client必須使用相同的用戶。

##啟動YarnSession模式 前提已經配置好flink環境變量
yarn-session.sh -n 3 -jm 1024 -tm 1024

##啟動客戶端  必須與上面命令在同一個服務器節點上
sql-client.sh embedded -s yarn-session

## 客戶端控制臺測試
select 'hello word';  #測試連接是否成功
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;  #測試數據

# client界面執行下面命令
# 在專門的界面展示,使用分頁table格式。可按照界面下方說明,使用快捷鍵前后翻頁和退出到SQL命令行
SET sql-client.execution.result-mode = table;
# changelog格式展示,可展示數據增(I)刪(D)改(U)
SET sql-client.execution.result-mode = changelog;
# 接近傳統數據庫的展示方式,不使用專門界面
SET sql-client.execution.result-mode = tableau;
  • 安裝依賴

如果運行sql client時,需要使用第三方依賴包時,就需要將項目中用到的依賴放入flink安裝位置的lib目錄下

例如:flink-connector-kafka_2.11-1.13.2.jar: 讀寫Kafka支持。

12、FlinkSQL 官方文檔

  • Table API

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/tableapi/

Table API 是批處理和流處理的統一的關系型 API。Table API 的查詢不需要修改代碼就可以采用批 輸入或流輸入來運行。Table API 是 SQL 語言的超集,并且是針對 Apache Flink 專門設計的。 Table API 集成了 Scala,Java 和 Python 語言的 API。Table API 的查詢是使用 Java,Scala 或 Python 語言嵌入的風格定義的,有諸如自動補全和語法校驗的 IDE 支持,而不是像普通 SQL 一樣 使用字符串類型的值來指定查詢。

  • SQL API

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/overview/

Flink 所支持的 SQL 語言,包括數據定義語言(Data Definition Language,DDL)、數據操縱語(Data Manipulation Language,DML)以及查詢語言。Flink 對 SQL 的支持基于實現了 SQL 標準的 Apache Calcite。

13、FlinkSQL函數

SQL中,我們可以把一些數據的轉換操作包裝起來,嵌入到SQL查詢中統一調用,這是函數。

Flink的Table API和SQL同樣提供了函數的功能。兩者在調用時略有不同:

  • Table API中的函數是通過數據對象的方法調用來實現的
  • SQL則是直接引用函數名稱,傳入數據作為參數。
  • Table API是內嵌在java語言中,很多方法需要在類中額外添加,目前支持的函數比較少。

官方文檔:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/functions/overview/

  • 函數類型

Flink 中的函數有兩個劃分標準:

  • 一個劃分標準是:系統函數和catalog函數。
  • 一個劃分標準是臨時函數和持久函數
  • 因此提供了4種函數:臨時性系統函數、系統函數、臨時性catalog函數、catalog函數

flink中可以通過精確、模糊兩種引用方式引用函數:精確函數允許用戶跨catalog、數據庫,也就是指定catalog和database函數;模糊函數不用指定catalog和database使用默認catalog和database。

  • 系統函數

系統函數(System Functions)也叫內置函數(Built-in Functions),是在系統中預先實現好的 功能模塊。可以通過固定的函數名直接調用,實現想要的轉換操作。又分為兩大類:標量函數和聚合函數。

函數分類:標量函數、聚合函數、時間間隔單位和時間點標識符、列函數

  • 標量函數:

  • 自定義函數

Flink 的 Table API 和 SQL 提供了多種自定義函數的接口,以抽象類的形式定義。

當前UDF主要有以下幾類:

  • 標量函數:將輸入的標量值轉換成一個新的標量值
  • 表函數:將標量值轉換成一個或多個新的行數據,也就是擴展成一個表。
  • 聚合函數:將多行數據里的標量值轉換成一個新的標量值。
  • 表聚合函數:將多行數據里的標量值轉換成一個或多個新的 行數據。
  • UDF標量函數

自定義方式:需要自定義一個類來繼承抽象類 ScalarFunction,并實現叫作 eval() 的求值方法。

標量函數的行為就取決于求值方法的定義,它必須是公有的(public),而且名字必須是 eval

求值方法 eval 可以重載多次,任何數據類型都可作為求值方法的參數和返回值類型,寫完后將類注冊到表環境就可以直接在SQL中調用了。

  • 代碼實現
import org.apache.flink.table.functions.ScalarFunction;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 21:34
 */
//自定義標量函數
public class ScalarUDFDemo extends ScalarFunction {
    // 接受任意類型輸入,返回 INT 型輸出 必須使用公共權限的eval方法
    public String eval(String input) {
        //字符串連接字符串長度
        return input.concat(String.valueOf(input.length()));
    }

}

 //創建模擬數據
        //執行環境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //執行SQL
        tableEnvironment.executeSql("CREATE TABLE t_datagen (\n" +
                " f_sequence INT,\n" +
                " f_random INT,\n" +
                " f_random_str STRING,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.f_sequence.kind'='sequence',\n" +
                " 'fields.f_sequence.start'='1',\n" +
                " 'fields.f_sequence.end'='1000',\n" +
                " 'fields.f_random.min'='1',\n" +
                " 'fields.f_random.max'='1000',\n" +
                " 'fields.f_random_str.length'='10'\n" +
                ")");
//         tableEnvironment.sqlQuery("select * from t_datagen").execute().print();
       //使用Table API 直接內嵌函數執行  第一種方式
//        tableEnvironment.from("t_datagen").select(call(ScalarUDFDemo.class, $("f_random_str"))).execute().print();
       //第二種方式
        tableEnvironment.createTemporarySystemFunction("sfsl",ScalarUDFDemo.class);
        tableEnvironment.sqlQuery("select sfsl(f_random_str) from t_datagen").execute().print();
  • UDF表值函數

自定義方式:

要實現自定義的表函數,需要自定義類來繼承抽象類 TableFunction,內部必須要實現的也 是一個名為 eval 的求值方法

與標量函數不同的是,TableFunction 類本身是有一個泛型參數T 的,這就是表函數返回數據的類型。

eval()方法沒有返回類型,內部也沒有 return語句,是通過調用 collect()方法來發送想要 輸出的行數據的

  • 數據
1,尋夢環游記,喜劇:8_動畫:7_冒險:3_音樂:9_家庭:6
2,至愛梵高,劇情:8_傳記:7_動畫:3
3,小丑回魂,劇情:6_兒童:7_恐怖:9
  • 案例代碼
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 21:53
 */

/**
 * Row<type STRING,score INT> 輸出字段名type、score 數據類型分別是STRING、INT
 */
@FunctionHint(output = @DataTypeHint("Row<type STRING,score INT>"))
public class UDFTableFunction extends TableFunction<Row> {
    //輸入數據類型是字符串

    /**
     * 喜劇:8_動畫:7_冒險:3_音樂:9_家庭:6
     * @param line
     */
    public void eval(String line){
        String[] split = line.split("_");
        for (String s : split) {
            String[] v = s.split(":");
            collect(Row.of(v[0],Integer.parseInt(v[1])));
        }
    }
}


=========================================================================================

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        //使用FileSystem讀取文件
        String fs="create table t_movie(" +
                "id int," +
                "name string," +
                "types string" +
                ") with (" +
                "'connector'='filesystem'," +
                "'path'='data/movie.txt'," +
                "'format'='csv')";
        //sql讀取數據
        tblEnv.executeSql(fs);
//        tblEnv.sqlQuery("select * from t_movie").execute().print();
        //Table API
           tblEnv
                 .from("t_movie")
                 .joinLateral(call(UDFTableFunction.class, $("types")).as("type", "score"))
                 .select($("id"),$("name"),$("type"),$("score"))
                 .execute()
                 .print();
           //SQL API
        tblEnv.createTemporarySystemFunction("tbl_f",UDFTableFunction.class);
        tblEnv.sqlQuery("select id,name,type,score from t_movie ,lateral table(tbl_f(types))").execute().print();

  • UDF聚合函數

自定義方式:

  • 自定義聚合函數需要繼承抽象類 AggregateFunction。
  • AggregateFunction 有兩個泛型參數,T 表示聚合輸出的結果類型,ACC 則表示聚 合的中間狀態類型。
  • 每個 AggregateFunction 都 必須 實現以下幾個方法:
  • createAccumulator():這是創建累加器的方法。沒有輸入參數,返回類型為累加器類型 ACC
  • accumulate(): 這是進行聚合計算的核心方法,每來一行數據都會調用。它的第一個參數是確定 的,就是當前的累加器,類型為 ACC,表示當前聚合的中間狀態;
  • getValue():這是得到最終返回結果的方法。輸入參數是 ACC 類型的累加器,輸出類型為 T。 在遇到復雜類型時,Flink 的類型推導可能會無法得到正確的結果。
  • 代碼實現
package com.zwf.udf;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;
/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 22:31
 */

/**
 * AggregateFunction<Double, Tuple2<Integer,Integer>> 輸出類型是Double  中間狀態類型是Tuple2<Integer,Integer>
 *     必須要實現getValue()  createAccumulator() accumulate() 三個方法
 */
public class UDFAggregationDemo extends AggregateFunction<Double, Tuple2<Integer,Integer>> {

    /**
     * 輸出的函數邏輯代碼
     * @param integerIntegerTuple2
     * @return
     */
    @Override
    public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
        if (integerIntegerTuple2.f0==0){
            return 0.0;
        }
        return integerIntegerTuple2.f0*1.0/integerIntegerTuple2.f1;
    }

    /**
     *
     * @return 初始化中間狀態值
     */
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }
     //輸入類型是兩個int類型數據

    /**
     *   如果不加 @FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})注解
     *   傳入的字段數據類必須有not null的約束
     * @param acc
     * @param weight
     * @param price
     */
     @FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})
    public void accumulate(Tuple2<Integer,Integer> acc ,Integer weight,Integer price){
        acc.f0+=weight*price;
        acc.f1+=weight;
    }

}

========================================================================================

package com.zwf.flinkSQL;

import com.zwf.udf.UDFAggregationDemo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 22:43
 */
public class UDFDemo3 {
    public static void main(String[] args) {
        //執行環境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //執行SQL
        tableEnvironment.executeSql("CREATE TABLE t_order (\n" +
                " id INT,\n" +
                " type INT,\n" +
                " weight INT,\n" +
                " price INT\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='1000',\n" +
                " 'fields.type.min'='1',\n" +
                " 'fields.type.max'='3',\n" +
                " 'fields.weight.min'='10',\n" +
                " 'fields.weight.max'='20',\n" +
                " 'fields.price.min'='100',\n" +
                " 'fields.price.max'='200'\n" +
                ")");
        tableEnvironment.createTemporarySystemFunction("aggre", UDFAggregationDemo.class);
        tableEnvironment.sqlQuery("select type,aggre(weight,price) from t_order group by type").execute().print();

    }

}

  • UDF表值聚合函數

用戶自定義表聚合函數(UDTAGG)可以把一行或多行數據(也就是一個表)聚合成另一張表,結果表中可以有多行多列

自定義方式:

  • createAccumulator():創建累加器的方法,與 AggregateFunction 中用法相同
  • accumulate():聚合計算的核心方法,與 AggregateFunction 中用法相同
  • emitValue():所有輸入行處理完成后,輸出最終計算結果的方法。這個方法對應著 AggregateFunction中的 getValue()方法;區別在于 emitValue 沒 有輸出類型,而輸入參數有兩個:第一個是 ACC類型的累加器 第二個則是用于輸出數據的“收集器”out,它的類型為 Collect。
  • 代碼
package com.zwf.udf;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 22:56
 */

/**
 * TableAggregateFunction<out,acc>: out 輸出類型  acc中間值類型
 */
public class TableAggregateUDF extends TableAggregateFunction<String, Tuple3<Integer,Integer,Boolean>> {
    /**
     * 初始化中間值
     * @return
     */
    @Override
    public Tuple3<Integer, Integer, Boolean> createAccumulator() {
        return Tuple3.of(0,0,false);
    }

    /**
     *
     * @param acc 中間值
     * @param price 輸入值
     */
    public void accumulate(Tuple3<Integer,Integer,Boolean> acc,Integer price){
        if(price>acc.f0){
            acc.f0=price;
            acc.f1=acc.f0;
            acc.f2=true;
        }else if (price>acc.f1){
          acc.f1=price;
          acc.f2=true;
        }else {
            acc.f2=false;
        }

    }

    /**
     *
     * @param acc 中間值
     * @param out 輸出集合
     */
    public void emitValue(Tuple3<Integer, Integer, Boolean> acc, Collector<String> out){
        if(acc.f2){
         acc.f2=false;
         out.collect("First[" + acc.f0 + "]Second[" + acc.f1 + "]");
        }
    }
}

=========================================================================================
    
package com.zwf.flinkSQL;

import com.zwf.udf.TableAggregateUDF;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 23:06
 */
public class UDFDemo4 {

    public static void main(String[] args) {
        //執行環境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //執行SQL
        tableEnvironment.executeSql("CREATE TABLE t_order (\n" +
                " id INT,\n" +
                " type INT,\n" +
                " price INT\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='1000',\n" +
                " 'fields.type.min'='1',\n" +
                " 'fields.type.max'='3',\n" +
                " 'fields.price.min'='100',\n" +
                " 'fields.price.max'='200'\n" +
                ")");

        //普通查詢
        // tableEnvironment.sqlQuery("select * from t_order").execute().print();

        // 注冊函數
        tableEnvironment.createTemporarySystemFunction("tafop", TableAggregateUDF.class);
        tableEnvironment.sqlQuery("select type,tafop(price) from t_order group by type").execute().print();

    }
}

14、FlinkSQL CDC

CDC,Change Data Capture變動數據獲取的簡稱,使用CDC從數據庫獲取已提交的更改并將這些更改發送到下游,供下游使用。

  • Flink CDC

在以前的數據同步中,如果想實時獲取數據庫的數據,一般采用架構就是采用第三方工具,比如canal、debezium等,實時采集數據庫的變更日志,然后將數據發送到kafka消息隊列,最后通過其他組件、比如flink、spark等消費kafka中的數據,計算之后發送到下游系統。

新架構下flink直接消費數據庫的增量日志,替代了原來的數據采集層,然后直接對數據進行計算, 最后將計算結果發送到下游.

工作原理:啟動MySQL CDC源時,它將獲取一個全局讀取鎖(FLUSH TABLES WITH READ LOCK),該 鎖將阻止其他數據庫的寫入。然后,它讀取當前binlog位置以及數據庫和表的schema之后, 將釋放 全局讀取鎖。然后,它掃描數據庫表并從先前記錄的位置讀取binlog。Flink將定期執 行checkpoints以記錄binlog位置。如果發生故障,作業將重新啟動并從checkpoint完成的 binlog位置恢復。因此,它保證了僅一次的語義。

優點:開箱即用,簡單易上手 減少維護的組件,簡化實時鏈路,減輕部署成本 減小端到端延遲

  • ChangeLOg

Flink SQL 內部支持了完整的 changelog 機制,所以 Flink 對接 CDC 數據只需要把CDC 數據轉換成 Flink 認識的數據,以便更好支持和集成 CDC

重構后的 TableSource 輸出的都是 RowData 數據結構,代表了一行的數據。在RowData 上面會 有一個元數據的信息,我們稱為 RowKind.

RowKind 里面包括了插入、更新前、更新后、刪除,這樣和數據庫里面的 binlog 概念十分類似。

通過 Debezium 采集的 JSON 格式,包含了舊數據和新數據行以及原數據信息,對接 Debezium JSON 的數據,其實就是將這種原始的 JSON 數據轉換成 Flink 認識的 RowData。

  • mysql CDC

官方文檔:https://github.com/ververica/flink-cdc-connectors

mysql數據庫的數據新增或者修改,將實時獲取到flink上進行計算處理并傳輸到下游!

目前支持的數據庫有以下:

  • Mysql修改配置文件 (vim /etc/my.cnf)
# 服務器ID
server_id=12345
log_bin=/var/lib/mysql/mysql-bin
expire_logs_days=7
# 必須為ROW
binlog_format=ROW
binlog_cache_size=16M
max_binlog_size=100M
max_binlog_cache_size=256M
relay_log_recovery=1
# 必須為FULL,MySQL-5.7后才有該參數
binlog_row_image=FULL
expire_logs_days=30
binlog_do_db=scott
  • 創建數據庫表
DROP TABLE IF EXISTS `dept`;
CREATE TABLE `dept` (
`deptno` int(11) NOT NULL,
`dname` varchar(255) DEFAULT NULL,
`loc` varchar(255) DEFAULT NULL,
PRIMARY KEY (`deptno`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
--代碼運行之后再開始插入數據
INSERT INTO `dept` VALUES ('10', 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES ('20', 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES ('30', 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES ('40', 'OPERATIONS', 'BOSTON');
  • pom.xml
<!-- Flink CDC 的依賴 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!--驅動包版本必須是8.0.27及其以上版本-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>

  • 代碼實現
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //創建表
        tableEnvironment.executeSql("CREATE TABLE flink_cdc_dept (\n" +
                "     deptno INT,\n" +
                "     dname STRING,\n" +
                "     loc STRING,\n" +
                "     PRIMARY KEY(deptno) NOT ENFORCED\n" +
                "     ) WITH (\n" +
                "     'connector' = 'mysql-cdc',\n" +
                "     'hostname' = '192.168.147.120',\n" +
                "     'port' = '3306',\n" +
                "     'username' = 'root',\n" +
                "     'password' = 'Root@123456.',\n" +
                "     'database-name' = 'scott',\n" +
                "     'table-name' = 'dept')");

        //簡單查詢
        tableEnvironment.sqlQuery("select * from flink_cdc_dept").execute().print();

Catalog 提供了元數據信息,例如數據庫、表、分區、視圖以及數據庫或其他外部系統中存儲的函 數和信息。

元數據可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。 元數據也可以是持久化的,例如 Hive Metastore 中的元數據。 Catalog 提供了一個統一的API,用于管理元數據,并使其可以從 Table API 和 SQL 查詢語句中來 訪問。

GenericInMemoryCatalog: 基于內存實現,所有元數據只在session聲明周期可用

JdbcCatalog:將flink通過jdbc協議連接到關系數據庫。Postgres Catalog 和 MySQL Catalog 是目前 JDBC Catalog 僅有的兩種實現

HiveCatalog:作為原生 Flink 元數據的持久化存儲,以及作為讀寫現有 Hive 元數據的接口

用戶自定義Catalog:編寫類實現對應的 CatalogFactory 接口來自定義開發Catalog

  • 連接hive集群

將flink catalog中的元數據信息持久化存儲到hive metastore對應的元數據庫中,flink打通hive集成,如同使用spark SQL或者impala操作hive中的數據一樣,直接使用flink直接讀寫hive中的表。

  • pom.xml
<!-- Flink On Hive-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
  • 連接hive寄去哪
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//獲取hive中元數據注冊flink中的catalog。
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
//使用hive中的catalog
tableEnv.useCatalog("myhive") 
  • 相關配置參數參考:

flink提供了兩種優化器:

  • RBO(基于規則的優化器)
  • CBO(基于成本的優化器)

優化方案:

  • 基于 Apache Calcite 的子查詢解相關
  • 投影下推(Projection Pushdown)
  • 分區剪裁(Partition Prune)
  • 謂詞下推(Predicate Pushdown)
  • 常量折疊(Constant Folding)
  • 子計劃消除重復數據以避免重復計算
  • 特殊子查詢重寫:使用left semi-joins left anti-join
  • 可選 join 重新排序: 通過 table.optimizer.join-reorder-enabled 啟用

優化器不僅基于計劃,而且還基于可從數據源獲得的豐富統計信息以及每個算子(例如 io,cpu, 網絡和內存)的細粒度成本來做出明智的決策。

  • 常量折疊(常量替換)

常量折疊:對sql中的常量的加減乘除等操作進行預計算,避免執行過程頻繁對常量重復執行加減 乘除計算: 折疊前:1+2+t1.value;折疊后:3+t1.value.

  • 謂詞下推

在from數據源中過濾出重要數據,降低了數據的掃描范圍,提升了數據庫查詢的效率!

  • 投影下推(列裁剪)

投影下推:可以用來避免加載不需要的字段,只需要查詢出需要查詢的數據庫字段。由于SQL中沒用到,加載多余字段就是浪費,所以將project操作下推執行,就不需要加載無 用字段。而且此時假如是列存儲,只需要加載指定的列,優化更大。

  • Hash Join

兩表進行join時,先把大表中的重要數據過濾出來變成小表,然后通過sortmergejoin, hashjoin, boradcasthashjoin,把表中數據過濾后再進行join,減少笛卡爾積值。

  • Transformation Tree

  • 性能調整

MiniBatch 聚合:MiniBatch 聚合的核心思想是將一組輸入的數據緩存在聚合算子內部的緩沖區中。當輸入的數據被觸發處理時,每個 key 只需一個操作即可訪問狀態。這樣可以大大減少狀態開銷并獲得更好的吞 吐量。但是,這可能會增加一些延遲,因為它會緩沖一些記錄而不是立即處理它們。這是吞吐量和 延遲之間的權衡。

  • Local-Global 聚合

Local-Global 聚合是為解決數據傾斜問題提出的,通過將一組聚合分為兩個階段,首先在上游進行 本地聚合,然后在下游進行全局聚合,類似于 MapReduce 中的 Combine + Reduce 模式。

  • 拆分distinct 聚合

把要去重的字段中的使用hash shuffle打散到不同分區中進行分區,然后進行去重字段聚合計算!

SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

  • distinct 聚合過濾

使用filter對去重的字段進行過濾,過濾后去重字段值后最后進行分組聚合!

SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone'))
AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS
web_uv
FROM T
GROUP BY day

17、SQL時間日期轉換

-- flinksql里面最常用的事情就是時間格式轉換,比如各種時間格式轉換成TIMESTAMP(3).
now() bigint      -- CAST(TO_TIMESTAMP(log_time) as TIMESTAMP(3)) ,log_time=now()
localtimestamp timestamp(3)
timestamp  -- 不帶括號數字表示timestamp(6)
now() 1403006911000 bigint -- 毫秒時間戳數值 1528257600000

localtimestamp 1636272032500 timestamp(3) -- 毫秒時間戳
timestamp(3) 1636272032500 -- 毫秒時間戳
timestamp(9)
timestamp(6)
TIMESTAMP(9) TO_TIMESTAMP(BIGINT time)
TIMESTAMP(9) TO_TIMESTAMP(STRING time)
TIMESTAMP(9) TO_TIMESTAMP(STRING time, STRING format)
BIGINT TIMESTAMP_TO_MS(TIMTSTAMP time)

BIGINT TIMESTAMP_TO_MS(STRING time, STRING format)

TO_DATE(CAST(LOCALTIMESTAMP AS VARCHAR))
FROM_UNIXTIME(TIMESTAMP_TO_MS(localtimestamp)/1000, ‘yyyy-MM-dd HH:mm:ss’) event_time   -- 6點到6點
time_pt as cast(to_timestamp(eventTime - 6 * 3600 * 1000) as TIMESTAMP(3)) -- 偏移6小時

總結

以上是生活随笔為你收集整理的FlinkSQL实战开发的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

成人三级黄色 | 国产亚洲综合在线 | 91刺激视频 | 欧美伦理电影一区二区 | 亚洲综合视频在线观看 | 91成人网在线观看 | 成片免费观看视频 | 日韩资源视频 | 欧美日韩免费在线观看视频 | 色婷婷导航 | 久久手机免费观看 | 国内精品久久影院 | 国产视频一区二区在线 | 久久视频99 | 日本性久久 | 日韩欧美精品在线 | 丰满少妇在线观看资源站 | 久草观看视频 | 99久久夜色精品国产亚洲96 | 久久精品综合一区 | 伊人超碰在线 | 天天射天天艹 | 亚洲精品久久久蜜臀下载官网 | 韩日av一区二区 | 97超碰成人 | 色噜噜噜噜 | 国产精品扒开做爽爽的视频 | 久草新在线 | 一区二区三区免费在线播放 | av免费在线免费观看 | 久久免费一级片 | 激情视频免费在线 | 91在线观看黄 | 91少妇精拍在线播放 | 四虎亚洲精品 | 亚洲三级网站 | 国产探花| 99久在线精品99re8热视频 | 天天操综合 | 精品国产免费久久 | 色人久久 | 成人毛片在线观看视频 | 久久精品高清 | 国产又粗又猛又黄又爽视频 | 成人动漫精品一区二区 | 日韩高清毛片 | 亚洲国产成人在线 | www久久九| 亚洲乱亚洲乱亚洲 | 亚洲免费在线播放视频 | 91黄色成人 | 91av原创| 久久久精品在线观看 | av夜夜操| 一级电影免费在线观看 | 免费观看的黄色 | 99久久影视 | 亚洲第一区精品 | 国产高清视频在线观看 | 97精产国品一二三产区在线 | 天天干天天插伊人网 | 日韩在线免费看 | 91精品国产九九九久久久亚洲 | 国产露脸91国语对白 | 欧美激情奇米色 | 亚洲最新视频在线播放 | 日操操| 欧美日韩成人一区 | 成人高清在线观看 | 日韩在线电影 | 国产91在线播放 | 国产又粗又猛又爽又黄的视频免费 | 天天操天天干天天操天天干 | 国产 视频 高清 免费 | 中文字幕高清免费日韩视频在线 | 999久久久国产精品 高清av免费观看 | 天天天干 | av中文字幕网 | 国产一区二区综合 | www.香蕉| 国产 中文 日韩 欧美 | 成人午夜电影在线观看 | 亚洲五月 | 日韩精品第一区 | 五月激情亚洲 | 色婷婷在线视频 | 免费看污黄网站 | 中文在线a∨在线 | 亚洲激情av | 国产一性一爱一乱一交 | 亚洲色图美腿丝袜 | 日本在线视频网址 | 亚洲精品乱码久久久久久久久久 | 久久国产精品免费 | 日韩理论片在线观看 | 91超碰免费在线 | 久久综合操| 97av影院 | 99视频在线精品 | 一级性av | 婷婷av网站 | www.com黄色| 成人a v视频 | 中文字幕观看av | 日日草视频 | 国产精品一区二区三区久久久 | 在线播放91| 黄色av高清| 精品福利视频在线观看 | 欧美色婷 | 国产亚洲精品久久久久秋 | 色综合天天色 | 精品国产一区二区三区不卡 | 久久久久久福利 | 黄p网站在线观看 | 丁香在线观看完整电影视频 | 黄色在线观看网站 | 欧美俄罗斯性视频 | 黄色小网站在线观看 | 国产精品美女久久久久久 | 久久久久在线视频 | 亚洲激情av| 亚洲区色 | 久草在线国产 | 国产黄色大片 | 国产盗摄精品一区二区 | 日韩精品一区二区三区中文字幕 | 手机在线黄色网址 | 国产精品av久久久久久无 | 国产成人精品不卡 | 亚洲精品成人av在线 | 色狠狠干 | 国产99中文字幕 | 911在线| 人人爱在线视频 | 久久亚洲私人国产精品 | 亚洲乱亚洲乱妇 | 久久综合色婷婷 | zzijzzij亚洲成熟少妇 | 国模精品一区二区三区 | 最近中文国产在线视频 | 日韩在线激情 | 精品久久久久久久久久 | 国产精品国内免费一区二区三区 | 久久激情小说 | av午夜电影 | 日韩系列 | 欧美日在线观看 | 亚洲一区二区精品3399 | 亚洲视频一 | 一区二区三区在线免费播放 | 欧美在线aaa | 中文字幕丰满人伦在线 | 日本中文字幕在线电影 | 久久dvd| 五月婷婷在线观看 | 91精品国产乱码 | 亚洲综合精品视频 | 丁香六月婷婷开心婷婷网 | 97av影院 | 中文字幕亚洲精品日韩 | 成人黄色在线播放 | 在线观看亚洲成人 | 中文字幕乱在线伦视频中文字幕乱码在线 | 成人av一区二区在线观看 | 久久久999精品视频 国产美女免费观看 | 国产毛片久久 | 99久热在线精品视频 | 国产日韩中文在线 | 91桃色在线免费观看 | 午夜av在线电影 | 午夜精品一区二区三区在线 | 久久久精品一区二区三区 | 婷婷色五 | 人人视频网站 | 欧美黑吊大战白妞欧美 | 国产香蕉视频在线播放 | 免费日韩av片 | 久久久久一区二区三区 | 日韩精选在线观看 | 中文资源在线观看 | 亚洲精品视频在线免费 | 日韩欧美在线观看一区二区三区 | 亚洲精品国久久99热 | 色99久久| 免费国产在线精品 | 九九九九精品九九九九 | 国产资源av| 天天草综合 | 天天做天天爱天天综合网 | av色一区 | 91理论片午午伦夜理片久久 | 免费在线观看成人小视频 | 国产精品欧美日韩在线观看 | 1000部18岁以下禁看视频 | 亚洲欧美婷婷六月色综合 | 色资源网免费观看视频 | 免费在线观看的av网站 | 97精品超碰一区二区三区 | 久久人人爽人人人人片 | 亚洲爱视频 | 在线观看a视频 | 午夜精品久久久久久久99无限制 | 中文字幕永久免费 | 国产精品久久久久久久久大全 | 国产黄色大片免费看 | 欧美一区二视频在线免费观看 | 最近日本中文字幕a | 久久精品久久久久电影 | 国产亚洲免费观看 | 777xxx欧美 | 欧美日韩有码 | www.色午夜,com| 亚洲人人射 | 最新国产精品拍自在线播放 | 久久久久国产成人精品亚洲午夜 | av手机版| 黄色一级大片在线免费看国产一 | 欧美少妇xxx| 久久人人爽人人片 | 99视频一区二区 | 久久久久久影视 | 欧美一区二区三区免费观看 | 日韩在线观看电影 | 午夜色性片 | 久久精品99国产精品亚洲最刺激 | 日韩高清在线看 | 又色又爽又黄高潮的免费视频 | 在线欧美日韩 | 韩国av电影在线观看 | 国产精品视频内 | 亚洲最新视频在线 | 国内精品久久久 | 欧美欧美| 91久久久国产精品 | www.夜色.com | 在线成人短视频 | 久久国产美女 | 操久久网| 九九久久久 | 97超碰资源 | 日韩大片在线 | 99热手机在线 | 亚洲精品美女视频 | 波多野结衣精品 | 麻豆激情电影 | 国产原厂视频在线观看 | 人人爽人人做 | 久草在线最新视频 | 国产视频一区在线免费观看 | 色综合久久久久综合 | 六月丁香激情综合 | 久久任你操 | 婷婷丁香五 | 亚洲精品动漫成人3d无尽在线 | 亚洲砖区区免费 | 亚洲影院一区 | 丝袜美女视频网站 | 国产精品久久久久婷婷 | 在线播放亚洲 | 高清精品在线 | 免费看的黄色片 | 中文字幕在线观看免费高清完整版 | 精品在线免费视频 | 91免费网址 | 国产亚洲久一区二区 | 日韩在线网址 | 在线你懂 | 国产色道 | 色婷婷亚洲婷婷 | 日韩免费电影网站 | 久久九九国产精品 | 奇米网在线观看 | 久久精品免视看 | 日韩理论 | 91桃色国产在线播放 | 国产精品久久久久婷婷 | 亚洲最大av网 | 天天曰夜夜操 | 狠狠躁日日躁狂躁夜夜躁 | 四虎影视国产精品免费久久 | 久久综合网色—综合色88 | 国产美女精彩久久 | 成人中文字幕在线 | 成人av电影免费在线观看 | 91九色蝌蚪视频在线 | 久久人人精品 | 91免费试看 | 97精品国产 | 精品夜夜嗨av一区二区三区 | 日韩一级成人av | 久久久午夜精品理论片中文字幕 | 日本精品视频在线播放 | 在线观看成人福利 | 欧美日韩国产在线 | 国产精品久久久久国产精品日日 | 91九色视频导航 | 日韩在线观看视频免费 | 国产精品99久久久久久有的能看 | 亚洲午夜精品在线观看 | 欧美激情精品一区 | 免费成人在线网站 | 亚洲综合成人婷婷小说 | 在线天堂视频 | 91视频在线观看下载 | 国产色网| 99在线观看精品 | 日韩一三区 | 99在线热播精品免费99热 | 日本婷婷色 | 在线观看韩国av | 黄色免费视频在线观看 | 免费黄色网址网站 | 色.www| 国产视频丨精品|在线观看 国产精品久久久久久久久久久久午夜 | 2019中文字幕网站 | 人人爽人人爽人人片av | 国产黄色大片 | 成人国产一区 | 亚洲国产精品传媒在线观看 | 精品国产伦一区二区三区免费 | 91亚洲国产成人久久精品网站 | 天天爽天天碰狠狠添 | 免费观看黄色av | 91视频这里只有精品 | 美女网站在线 | 欧美一二三区在线观看 | 深爱婷婷激情 | 久久精品久久久久 | 免费三级黄色 | 久草com | 超碰在线人人 | 日韩精品一区二 | 色偷偷88欧美精品久久久 | 亚洲成人精品 | 月丁香婷婷 | 婷婷狠狠操 | 特级黄色片免费看 | 五月天久久| 国产色拍 | 午夜精品视频一区二区三区在线看 | 成人91av | 一区二区三区韩国免费中文网站 | 草久视频在线 | 国产91精品在线观看 | 亚洲爱爱视频 | 久草视频在 | 24小时日本在线www免费的 | av中文字幕在线播放 | 日本最新中文字幕 | 亚洲日b视频 | 99麻豆视频 | 天天操天天色综合 | www.婷婷com| 日本中文字幕视频 | 久久亚洲美女 | 最近最新最好看中文视频 | 免费看黄20分钟 | 久久激情五月丁香伊人 | 国产aa精品 | 高清av在线 | 97视频入口免费观看 | 一级免费观看 | 91麻豆精品国产自产在线游戏 | 国产精品视频线看 | 国产精品久久久久一区二区三区 | 西西444www大胆无视频 | 99r在线视频 | 97色国产| 337p欧美 | 国产网站av | 日日干天天插 | 久久欧美综合 | 免费看污在线观看 | 色噜噜日韩精品一区二区三区视频 | 99热这里只有精品免费 | 日韩欧美网址 | 国产精品va在线观看入 | 亚洲日韩中文字幕在线播放 | 日韩精品一区在线观看 | 伊人久在线 | 午夜久久成人 | 日韩三级在线 | 伊人欧美 | 免费观看久久久 | 99视频在线观看免费 | 在线观看视频一区二区三区 | 久久精品视频国产 | 综合在线亚洲 | 久草香蕉在线视频 | 亚洲成色777777在线观看影院 | 成人a免费视频 | 黄色av免费电影 | www91在线观看 | 午夜.dj高清免费观看视频 | 精品在线观看一区二区三区 | 国产小视频在线看 | 在线电影av | 久久黄色小说 | 天天操天天操天天 | 国产999精品久久久 免费a网站 | 福利视频一区二区 | 中文字幕在线视频第一页 | 亚洲男男gaygay无套 | 91麻豆精品国产91久久久更新时间 | 久久99精品久久久久久清纯直播 | va视频在线观看 | 亚洲欧洲精品一区二区 | 一区二区中文字幕在线观看 | 亚洲码国产日韩欧美高潮在线播放 | 久久久影院一区二区三区 | 91污视频在线 | 日韩一区二区三区在线看 | 日日噜噜噜噜夜夜爽亚洲精品 | 国产日本在线播放 | 国产一级一级国产 | 91九色成人 | 成人羞羞免费 | 国产亚洲在线视频 | 日韩在线视频精品 | 91av美女| 婷婷色吧 | 婷婷在线精品视频 | 日韩色高清 | 91在线入口| 视频一区二区精品 | 精品久久久久一区二区国产 | 亚洲美女视频网 | 欧美怡红院 | 国产美女免费视频 | 丝袜+亚洲+另类+欧美+变态 | 免费视频久久久 | 日韩天天操 | 在线欧美中文字幕 | av线上看| 探花视频网站 | 97视频在线观看视频免费视频 | 97视频免费播放 | 色婷婷综合久色 | av激情五月 | 午夜在线免费观看 | 国产另类av | 日韩在线视频观看 | 欧美激情综合五月 | 日韩一区二区三区免费电影 | 激情综合久久 | 色综合天天综合网国产成人网 | 日日夜夜天天射 | 狠狠躁夜夜a产精品视频 | 国产va饥渴难耐女保洁员在线观看 | 欧美 亚洲 另类 激情 另类 | 在线久热 | 国产色道| 最新av免费在线观看 | 免费男女羞羞的视频网站中文字幕 | 韩国三级一区 | 久草在线手机观看 | 人人草在线视频 | 欧美在线观看视频 | 人人爽人人| 国产精品欧美久久久久久 | 国产精品情侣视频 | 在线观看国产日韩欧美 | 激情视频91 | 手机色在线| 国产在线视频在线观看 | 91久久在线观看 | a久久久久久 | 色就干| 日本特黄特色aaa大片免费 | 国产成人精品av久久 | 国产不卡在线观看视频 | 久99久视频 | 亚洲专区一二三 | 91女人18片女毛片60分钟 | 午夜av免费在线观看 | 久久不见久久见免费影院 | 国产美女视频一区 | 日韩免费视频一区二区 | 亚洲精品乱码久久久久久蜜桃欧美 | 精品国产精品久久 | 成人免费视频免费观看 | 国产精品一区二区av麻豆 | 国产精久久久久久妇女av | 丝袜足交在线 | 国产99久久99热这里精品5 | 日韩网站免费观看 | 国产精品久久久久高潮 | 午夜视频亚洲 | 69精品人人人人 | av在线收看 | 欧美精品二 | 亚洲老妇xxxxxx | 日韩精品视频在线免费观看 | 国产精品久久久久久久久久不蜜月 | 在线观看国产永久免费视频 | 激情五月亚洲 | 成人电影毛片 | 91大神电影 | 91精品国产91久久久久 | 麻豆av一区二区三区在线观看 | 日韩一区在线免费观看 | 色综合久久久久综合体桃花网 | 成人午夜电影网 | 日本公妇在线观看 | 免费看国产a | 欧美国产一区在线 | 亚洲污视频| 久草在线99 | 日本久久久亚洲精品 | 欧美日韩国内在线 | 日韩经典一区二区三区 | 国产 日韩 欧美 中文 在线播放 | 成年人国产精品 | 六月丁香激情综合色啪小说 | 91精品国产麻豆 | 国产一级免费视频 | 精品久久久99| 狠狠狠狠干 | 综合网色 | 美女视频网 | 操一草| 国产乱对白刺激视频不卡 | 超碰免费成人 | 国产精品1区 | 91精品91 | 久久极品 | 五月黄色| 免费网站在线观看人 | 久久免费看毛片 | 在线免费观看黄色 | 国产一区欧美二区 | 久草在线综合网 | www.国产在线| 日韩欧美一区二区三区在线观看 | 99视频精品在线 | 久久久久久久毛片 | 亚洲,播放 | 成人午夜av电影 | 免费看的av片 | 欧美日bb | 亚洲精品久久久久www | 国产短视频在线播放 | 免费日韩电影 | 亚洲一区精品二人人爽久久 | 伊人小视频 | 久久精品精品电影网 | 丁香激情综合国产 | 精品免费观看视频 | 成人资源在线播放 | 人人狠 | 在线观看av大片 | 蜜臀av一区二区 | 福利片免费看 | 欧美综合色在线图区 | 国产成人免费高清 | 福利电影一区二区 | 成人在线观看日韩 | 国产一区在线视频 | 欧美极品少妇xbxb性爽爽视频 | 国产99久久久精品 | 在线视频专区 | 国产又粗又猛又爽又黄的视频先 | 天天综合区| 久久免费视频一区 | 国产精品免费在线视频 | 欧美一区三区四区 | www.日日日.com | 中文字幕 91 | 中文字幕日韩国产 | 欧美日韩精品免费观看视频 | 国产69精品久久久久久久久久 | 91麻豆网| 亚洲综合射 | 欧美日韩91 | 久久av免费 | 久久精品福利 | 在线观看精品视频 | 国产视频精选 | 国产视 | 在线观看免费av片 | 国产在线一区二区 | av不卡免费看 | 日本久久免费视频 | 中文字幕在线看视频国产中文版 | 国产成人精品久久亚洲高清不卡 | 黄色免费网站大全 | 插婷婷| 国产黄色片一级 | 欧美色图狠狠干 | 中文字幕成人 | 久久久久久蜜av免费网站 | 国产品久精国精产拍 | 精品一区二区三区四区在线 | 91精品国产福利在线观看 | 国产精品日韩在线 | 在线亚洲精品 | 丁香视频全集免费观看 | 欧美日韩一区二区在线观看 | 国产精品久久久一区二区三区网站 | 欧美在线视频精品 | 日日干美女| 中文字幕一区二区三区在线视频 | 中文字幕在线视频一区二区三区 | 日日爱视频 | 97色在线观看 | 国产二区电影 | 日本久久高清视频 | 性色视频在线 | 日韩欧美国产激情在线播放 | 四虎4hu永久免费 | 911精品视频 | 黄色软件视频大全免费下载 | 国产69熟| 玖玖精品视频 | 久久久精品一区二区三区 | 日本中文乱码卡一卡二新区 | 中文字幕在线国产 | 国产区精品视频 | 国产精品v欧美精品 | 91视频久久久久久 | 天天色天天射天天综合网 | 五月综合久久 | 91av在线播放视频 | 在线免费观看不卡av | 欧美va天堂va视频va在线 | 国产精品porn | 久久情爱| 日韩欧美一区二区在线观看 | 国产成人久久 | 久久成人国产精品免费软件 | 丝袜av一区| 特黄色大片| 日韩电影中文字幕在线 | 懂色av一区二区在线播放 | 日本精品视频免费 | 亚洲天堂视频在线 | 久久国产精品一区二区 | 99热网站| 二区视频在线观看 | 人人艹人人 | 911在线| 综合网欧美 | 激情伊人五月天 | 久久r精品 | 69精品| 国产 视频 高清 免费 | 91在线你懂的| 日韩精品一区二区三区免费观看 | 国产精品久久久久婷婷 | 蜜臀av性久久久久蜜臀av | 91丨九色丨蝌蚪丨老版 | 在线观看网站黄 | 国产精品va在线 | 99视频免费在线观看 | 特黄免费av | 日韩欧美国产激情在线播放 | 亚洲美女视频在线观看 | 欧美日韩在线观看一区 | www国产亚洲精品久久麻豆 | 97超碰.com | 色婷婷中文 | 色人久久| 五月婷婷中文字幕 | 久久久久亚洲精品成人网小说 | 久久久久久久久精 | 久草精品电影 | 菠萝菠萝在线精品视频 | av在线免费观看不卡 | 日韩深夜在线观看 | 色婷婷综合五月 | 欧美一级片在线观看视频 | 一级片黄色片网站 | 亚洲综合五月 | 97超级碰碰碰碰久久久久 | 黄色毛片在线看 | 国产精品久久久久久久久久免费 | 97免费在线视频 | 九九电影在线 | 日韩在线观看第一页 | 狠狠狠狠狠狠天天爱 | 在线黄色国产电影 | 国产婷婷视频在线 | 一本大道久久精品懂色aⅴ 五月婷社区 | 五月婷婷丁香网 | 国产小视频在线免费观看视频 | av在线播放免费 | 欧美一级片在线免费观看 | 日本一区二区不卡高清 | 国产精品女视频 | 免费久久久 | 久操免费视频 | 亚洲网久久 | 伊人激情网 | 欧美一级片免费播放 | 日韩精品一区二区三区在线播放 | 99精品视频在线播放免费 | 日韩欧美在线观看一区二区三区 | 99精品国产兔费观看久久99 | 久久久久免费精品视频 | 亚洲精品国产第一综合99久久 | 亚洲美女视频在线观看 | 欧美一级专区免费大片 | 久久免费的精品国产v∧ | www激情网| 中文字幕在线免费97 | 国内精品久久久久影院男同志 | 欧美欧美 | 色吊丝在线永久观看最新版本 | 日日爱夜夜爱 | 综合天堂av久久久久久久 | 亚洲激情在线观看 | 久久99深爱久久99精品 | 免费av网址在线观看 | 亚洲三级影院 | 精品国产免费人成在线观看 | 成人国产网站 | 日本韩国精品一区二区在线观看 | 亚洲播放一区 | 国产伦精品一区二区三区免费 | 婷婷综合视频 | 成人免费共享视频 | 欧洲不卡av | 看片的网址| 国产成人精品一区二三区 | 久久成人高清视频 | 夜夜操综合网 | 99r在线精品 | 国产一区二区精 | 在线观看中文字幕dvd播放 | 五月婷激情 | 成人在线视频你懂的 | 天天干 天天摸 天天操 | 高清中文字幕av | 国产精品久久久久影院日本 | 亚洲乱码精品久久久久 | 日韩色爱 | 99热官网 | 狠日日| 国产黄色a | 九九热久久免费视频 | 激情xxxx | 国产视频精品久久 | 日本黄色免费在线观看 | 国产精品欧美精品 | 日日摸日日爽 | 久久国产精品99精国产 | 成人资源网 | 黄色在线观看免费网站 | 国产精品日韩在线观看 | 国产999精品久久久久久 | 久久久久久久久久久成人 | 亚洲国产精品成人综合 | 国产精品视频区 | 婷婷综合伊人 | 久久国产综合视频 | 91精品国产高清自在线观看 | 国产剧情一区 | 又紧又大又爽精品一区二区 | 亚洲无线视频 | 国产精品理论视频 | 亚洲精品午夜久久久久久久 | 天天玩天天操天天射 | 808电影 | 国产黄色av | 日本在线精品视频 | 国产精品mv| 我爱av激情网 | 国产亚洲va综合人人澡精品 | 亚洲干| 黄色影院在线免费观看 | 又湿又紧又大又爽a视频国产 | ,久久福利影视 | 日本特黄一级片 | 久久99久久99精品 | 狠狠狠狠干 | 久久99精品波多结衣一区 | av成人免费观看 | 欧美视频一区二 | a在线观看视频 | 久草www | 丁香婷婷激情网 | 永久黄网站色视频免费观看w | 国产原创在线视频 | 日韩欧美在线视频一区二区 | 欧美国产精品久久久久久免费 | 久久久久免费 | 亚洲视频在线免费看 | 亚洲天堂激情 | 黄色三级网站在线观看 | 天天操天天添天天吹 | 久久久麻豆精品一区二区 | 亚洲精品456在线播放第一页 | 国产精品久久久久久av | 天天操天天摸天天射 | 在线播放亚洲激情 | 午夜色站 | 成人综合日日夜夜 | 日本精品免费看 | 免费合欢视频成人app | 亚洲视频六区 | 国内外成人免费在线视频 | 日日操操| 超碰在线天天 | 亚洲欧美成aⅴ人在线观看 四虎在线观看 | 色在线国产 | 五月天欧美精品 | 超碰免费在线公开 | 国产 视频 久久 | 亚洲第一区在线播放 | 黄色片网站大全 | 色搞搞| 四虎在线永久免费观看 | 久久成人高清 | 国产99久久久国产精品免费看 | 国产精品久久毛片 | 欧美与欧洲交xxxx免费观看 | 久操综合| 久久精品免费电影 | 99免在线观看免费视频高清 | 国产一区二区日本 | 精品色综合 | 日日躁你夜夜躁你av蜜 | 九九九电影免费看 | 日韩a欧美 | 国产福利在线 | 欧美成人一二区 | 日韩综合视频在线观看 | 一区 二区电影免费在线观看 | 国产黄色一级片在线 | 久久成人精品电影 | 国产97色在线 | 九九综合九九 | 四虎精品成人免费网站 | 欧洲成人av| 国产精品久久久久久久久久直播 | 夜色资源站国产www在线视频 | avwww在线 | 日韩视| 色综合婷婷久久 | 国产亚洲精品久久久网站好莱 | 中文字幕在线观看完整 | 一区二区三区免费播放 | 五月天色中色 | 久久久久久久毛片 | 亚洲精品456在线播放第一页 | 国产成人免费观看 | 欧美日韩国产在线一区 | 久久精品综合视频 | 欧美色图88 | 久久99久久99精品免费看小说 | 五月开心六月伊人色婷婷 | 国产亚洲免费观看 | 91精品久久香蕉国产线看观看 | 一级黄色大片在线观看 | 中文字幕一区在线 | 激情av资源| 色狠狠久久av五月综合 | 91精品一区二区在线观看 | 国产日韩欧美中文 | 色网免费观看 | 国产精品一区二区你懂的 | 欧美性黑人 | 亚洲天堂网在线观看视频 | 亚洲欧美视频在线 | 狠狠躁夜夜av | 久久不见久久见免费影院 | 亚洲午夜剧场 | 很黄很黄的网站免费的 | 99精品国自产在线 | 国产精品美女久久久久久久 | 国产精品久久久久9999吃药 | 成人黄色在线 | 超碰在线免费97 | 欧美精品v国产精品v日韩精品 | 婷婷综合 | a黄色一级| 久久久香蕉视频 | 九九热视频在线免费观看 | 字幕网资源站中文字幕 | 福利一区在线 | 91麻豆精品国产自产在线 | 中文字幕日韩电影 | 欧美日韩视频在线 | 亚洲首页 | 亚洲男男gaygay无套 | 97成人精品视频在线观看 | 成人免费在线视频观看 | 岛国一区在线 | 亚洲狠狠婷婷 | 欧美日韩视频免费 | 日本黄网站| 欧美精品免费一区二区 | 99久久综合狠狠综合久久 | 久久久久久久久久国产精品 | 国产免费观看久久黄 | 国产aa免费视频 | 国产精品ⅴa有声小说 | 午夜天天操 | 免费福利小视频 | 国产精品久久久久av | 国产精品第52页 | 国内精品久久久久久久久 | 日日夜夜天天久久 | 超碰在线日本 | 99视频在线播放 | 日韩字幕在线观看 | 国产福利精品一区二区 | 麻豆视频在线免费看 | 成人黄色在线 | 成人app在线播放 | 在线观看成人福利 | 亚洲国产一区av | 久久99免费观看 | 黄色a视频免费 | 在线播放日韩av | 蜜臀久久99精品久久久无需会员 | 久久久蜜桃一区二区 | 美女国产免费 | 亚洲综合激情小说 | 在线亚洲人成电影网站色www | 东方av在线免费观看 | 97精品电影院 | 久久久久国产精品免费网站 | 日韩欧美在线中文字幕 | 最近2019中文免费高清视频观看www99 | 永久免费av在线播放 | 日本黄色一级电影 | 久久草草热国产精品直播 | 国产精品美女久久久久久久网站 | 久久久精品小视频 | 国产精品久久久777 成人手机在线视频 | 亚洲在线视频免费观看 | 狠狠色伊人亚洲综合网站野外 | 日日爱网址 | 草久久久 | 久久黄页 | 丰满少妇在线观看网站 | 国产精品 中文在线 | 97免费在线观看视频 | 97香蕉视频 | 国精产品999国精产品视频 | 超碰97中文 | 天天爽网站 | www久久久久 | 亚洲欧美日韩精品一区二区 | 久久国产精品系列 | 玖玖国产精品视频 | 欧美视频二区 | 免费手机黄色网址 | 欧美日韩国产在线观看 | 亚洲精品视频在线看 | 黄色av成人在线观看 | 精品国产99国产精品 | 久久精品一二区 | 国产精品永久久久久久久www | 成人h视频在线播放 | 亚洲一区黄色 | 狠狠色狠狠色综合日日92 | 激情丁香在线 | 探花视频在线观看+在线播放 | 特级西西444www大精品视频免费看 | 九九视频这里只有精品 | 日本精品中文字幕 | 日日日操 | 日韩在线视频网站 | 狠狠色丁香久久婷婷综合丁香 | 亚洲精品色视频 | 国产精品色在线 | 国产丝袜在线 | 中文字幕视频三区 | 久久久久久久久久久久国产精品 | 一区二区电影在线观看 | 日韩成人欧美 | a在线免费| 天天射日| 天天干天天射天天操 | 91av小视频 | 午夜精品一区二区三区免费视频 | 成人小电影在线看 | 久久黄色免费观看 | 欧美日韩aaaa| 成人在线视频一区 | 91亚洲精品在线观看 | 色婷婷电影网 | 99精品观看 | 国产一级在线视频 | av电影免费| 西西4444www大胆无视频 | av一级在线观看 | 中文字幕丝袜美腿 | 干天天 | 97精品视频在线播放 | 狠狠色狠狠色综合日日92 | 欧美人人爱 | www最近高清中文国语在线观看 | 99热在线观看免费 | 丝袜美腿亚洲综合 |