日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 综合教程 >内容正文

综合教程

实时数仓(二):DWD层-数据处理

發(fā)布時(shí)間:2024/6/21 综合教程 39 生活家
生活随笔 收集整理的這篇文章主要介紹了 实时数仓(二):DWD层-数据处理 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄實(shí)時(shí)數(shù)倉(cāng)(二):DWD層-數(shù)據(jù)處理1.數(shù)據(jù)源2.用戶行為日志2.1開發(fā)環(huán)境搭建1)包結(jié)構(gòu)2)pom.xml3)MykafkaUtil.java4)log4j.properties2.2 實(shí)現(xiàn)功能1)代碼實(shí)現(xiàn)2)部署運(yùn)行3.業(yè)務(wù)數(shù)據(jù)3.1 實(shí)現(xiàn)功能3.2 動(dòng)態(tài)分流1)建配置表:create.sql2)配置類:TableProcess.java3)MysqlUtil.java4)常量類:GmallConfig.java5)主程序:BaseDBApp.java6)自定義分流函數(shù):TableProcessFunction.java7)HbaseSink:DimSink.java8)自定義序列化 kafka sink3.4 主程序:流程總結(jié)分析3.5 思考4.整體流程圖分析

實(shí)時(shí)數(shù)倉(cāng)(二):DWD層-數(shù)據(jù)處理

1.數(shù)據(jù)源

dwd的數(shù)據(jù)來(lái)自Kafka的ods層原始數(shù)據(jù):業(yè)務(wù)數(shù)據(jù)(ods_base_db) 、日志數(shù)據(jù)(ods_base_log)

從Kafka的ODS層讀取用戶行為日志以及業(yè)務(wù)數(shù)據(jù),并進(jìn)行簡(jiǎn)單處理,寫回到Kafka作為DWD層。

2.用戶行為日志

2.1開發(fā)環(huán)境搭建

1)包結(jié)構(gòu)

2)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>03_gmall2021</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.12.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <!--  flink Web UI -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--如果保存檢查點(diǎn)到hdfs上,需要引入此依賴-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--Flink默認(rèn)使用的是slf4j記錄日志,相當(dāng)于一個(gè)日志的接口,我們這里使用log4j作為具體的日志實(shí)現(xiàn)-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>

        <!--lomback插件依賴-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>5.0.0-HBase-2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--commons-beanutils是Apache開源組織提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,我們可以很方便的對(duì)bean對(duì)象的屬性進(jìn)行操作-->
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.3</version>
        </dependency>

        <!--Guava工程包含了若干被Google的Java項(xiàng)目廣泛依賴的核心庫(kù),方便開發(fā)-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>29.0-jre</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
            <version>2012_u6</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
3)MykafkaUtil.java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class MyKafkaUtil {

    private static Properties properties = new Properties();

    private static String DEFAULT_TOPIC = "dwd_default_topic";

    static {
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    }

    /**
     * todo kafka sink:自定義序列化,各種類型自定義傳輸
     *
     * @return
     */
    public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
        Properties props = new Properties();
        //kafka地址
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //生產(chǎn)數(shù)據(jù)超時(shí)時(shí)間
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    /**
     * 獲取生產(chǎn)者對(duì)象 ,只能傳輸String類型
     *
     * @param topic 主題
     */
    public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic) {
        return new FlinkKafkaProducer<String>(topic,
                new SimpleStringSchema(),
                properties);
    }


    /**
     * todo 構(gòu)建消費(fèi)者 -> 優(yōu)化
     *
     * @param bootstrapServers:kafka地址
     * @param topic                    :topic可以用逗號(hào)分隔
     * @param groupId:消費(fèi)者組
     * @param isSecurity:是否kafka設(shè)置sasl
     * @param offsetStrategy:消費(fèi)策略:3種
     * @return
     */
    public static FlinkKafkaConsumer<String> getKafkaConsumer(String bootstrapServers, String topic, String groupId, String isSecurity, String offsetStrategy) {
        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id", groupId);
        props.setProperty("flink.partition-discovery.interval-millis", "60000");
        //kafka開啟sasl認(rèn)證
        if ("true".equalsIgnoreCase(isSecurity)) {
            props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";");
            props.setProperty("security.protocol", "SASL_PLAINTEXT");
            props.setProperty("sasl.mechanism", "PLAIN");
        }
        //消費(fèi)多個(gè)topic
        String[] split = topic.split(",");
        List<String> topics = Arrays.asList(split);
        //kafka消費(fèi)者
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topics, simpleStringSchema, props);

        //消費(fèi)方式:earliest,latest,setStartFromTimestamp
        switch (offsetStrategy) {
            case "earliest":
                consumer.setStartFromEarliest();
                return consumer;
            case "latest":
                consumer.setStartFromLatest();
                return consumer;
            default:
                consumer.setStartFromTimestamp(System.currentTimeMillis() - Integer.valueOf(offsetStrategy) * 60 * 1000);
                return consumer;
        }
    }

    /**
     * 獲取消費(fèi)者對(duì)象
     *
     * @param topic   主題
     * @param groupId 消費(fèi)者組
     */
    public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {

        //添加消費(fèi)組屬性
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        return new FlinkKafkaConsumer<String>(topic,
                new SimpleStringSchema(),
                properties);
    }

    //拼接Kafka相關(guān)屬性到DDL
    public static String getKafkaDDL(String topic, String groupId) {
        return "'connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
                " 'properties.group.id' = '" + groupId + "', " +
                "  'format' = 'json', " +
                "  'scan.startup.mode' = 'latest-offset'";
    }
}
4)log4j.properties
log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

2.2 實(shí)現(xiàn)功能

我們前面采集的日志數(shù)據(jù)已經(jīng)保存到Kafka中,作為日志數(shù)據(jù)的ODS層,從Kafka的ODS層讀取的日志數(shù)據(jù)分為3類, 頁(yè)面日志、啟動(dòng)日志和曝光日志。這三類數(shù)據(jù)雖然都是用戶行為數(shù)據(jù),但是有著完全不一樣的數(shù)據(jù)結(jié)構(gòu),所以要拆分處理。將拆分后的不同的日志寫回Kafka不同主題中,作為日志DWD層。

頁(yè)面日志輸出到主流,啟動(dòng)日志輸出到啟動(dòng)側(cè)輸出流,曝光日志輸出到曝光側(cè)輸出流

1)從kafka讀取ods數(shù)據(jù)
2)判斷新老用戶
3)分流
4)寫回到kafka的dwd層

1)代碼實(shí)現(xiàn)
package com.flink.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @description: todo->準(zhǔn)備用戶行為日志dwd層
 * @author: HaoWu
 * @create: 2021年06月22日
 */
public class BaseLogApp {
    public static void main(String[] args) throws Exception {
        // TODO 1.獲取執(zhí)行環(huán)境
        // 1.1 創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.2 并行度設(shè)置為Kafka的分區(qū)數(shù)
        env.setParallelism(4);

        /*
        // 1.3 設(shè)置checkpoint
        env.enableCheckpointing(5000L); //每5000ms做一次ck
        env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超時(shí)時(shí)間:1min
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默認(rèn):exactly_once
        //正常Cancel任務(wù)時(shí),保留最后一次CK
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        //狀態(tài)后端:
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_log_app"));
        // 訪問(wèn)hdfs訪問(wèn)權(quán)限問(wèn)題
        // 報(bào)錯(cuò)異常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
        // 解決:/根目錄沒有寫權(quán)限 解決方案1.hadoop fs -chown 777 /   2.System.setProperty("HADOOP_USER_NAME", "atguigu");
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        */

        //TODO 2.獲取kafka ods_base_log 主題數(shù)據(jù)
        String sourceTopic = "ods_base_log";
        String groupId = "base_log_app_group";
        //FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getFlinkKafkaConsumer(sourceTopic, groupId);
        FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", sourceTopic, groupId, "false", "earliest");
        DataStreamSource<String> kafkaDS = env.addSource(consumer);


       //TODO 3.將每行數(shù)據(jù)轉(zhuǎn)換為JSONObject
        // 處理臟數(shù)據(jù)
        OutputTag<String> dirty = new OutputTag<String>("DirtyData") {};
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context context, Collector<JSONObject> collector) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    //轉(zhuǎn)JSON對(duì)象
                    collector.collect(jsonObject);
                } catch (Exception e) {
                    //JSON解析異常輸出臟數(shù)據(jù)
                    context.output(dirty, value);
                }
            }
        });

        //jsonObjDS.print("json>>>>>>>>");


        //TODO 4.按照設(shè)備ID分組、使用狀態(tài)編程做新老用戶校驗(yàn)
        //4.1 根據(jù)mid對(duì)日志進(jìn)行分組
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlag = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
                .process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
                    //聲明狀態(tài)用于表示當(dāng)前Mid是否已經(jīng)訪問(wèn)過(guò)
                    private ValueState<String> firstVisitDateState;
                    private SimpleDateFormat simpleDateFormat;

                    //初始化狀態(tài)
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("new-mid", String.class));
                        simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                    }

                    @Override
                    public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
                        //取出新用戶標(biāo)記 :is_new:1->新用戶 ,0->老用戶
                        String isNew = value.getJSONObject("common").getString("is_new");
                        //如果當(dāng)前前端傳輸數(shù)據(jù)表示為新用戶,則進(jìn)行校驗(yàn)
                        if ("1".equals(isNew)) {
                            //取出狀態(tài)數(shù)據(jù)并取出當(dāng)前訪問(wèn)時(shí)間
                            String firstDate = firstVisitDateState.value();
                            Long ts = value.getLong("ts");
                            //判斷狀態(tài)數(shù)據(jù)是否為Null
                            if (firstDate != null) {
                                //修復(fù)
                                value.getJSONObject("common").put("is_new", "0");
                            } else {
                                //更新狀態(tài)
                                firstVisitDateState.update(simpleDateFormat.format(ts));
                            }
                        }
                        out.collect(value);
                    }

                });
        //測(cè)試打印
        //jsonObjWithNewFlag.print();

        //TODO 5.使用側(cè)輸出流將 啟動(dòng)、曝光、頁(yè)面數(shù)據(jù)分流
        OutputTag<String> startOutPutTag = new OutputTag<String>("start"){}; //啟動(dòng)
        OutputTag<String> displayOutputTag = new OutputTag<String>("display"){}; //曝光
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlag.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
                //判斷是否為啟動(dòng)數(shù)據(jù)
                String start = value.getString("start");
                if (start != null && start.length() > 0) {
                    //啟動(dòng)數(shù)據(jù)
                    ctx.output(startOutPutTag, value.toJSONString());
                } else {
                    //不是啟動(dòng)數(shù)據(jù)一定是頁(yè)面數(shù)據(jù)
                    out.collect(value.toJSONString());

                    //抽取公共字段、頁(yè)面信息、時(shí)間戳
                    JSONObject common = value.getJSONObject("common");
                    JSONObject page = value.getJSONObject("page");
                    Long ts = value.getLong("ts");

                    //獲取曝光數(shù)據(jù)
                    JSONArray displayArr = value.getJSONArray("displays");

                    if (displayArr != null && displayArr.size() > 0) {
                        JSONObject displayObj = new JSONObject();
                        displayObj.put("common", common);
                        displayObj.put("page", page);
                        displayObj.put("ts", ts);
                        //遍歷曝光信息
                        for (Object display : displayArr) {
                            displayObj.put("display", display);
                            //輸出曝光數(shù)據(jù)到側(cè)輸出流
                            ctx.output(displayOutputTag, displayObj.toJSONString());
                        }

                    }

                }
            }
        });

        //TODO 6.將三個(gè)流的數(shù)據(jù)分別寫入Kafka
        //打印
        jsonObjDS.getSideOutput(dirty).print("Dirty>>>>>>>>>>>");
        //主流:頁(yè)面
        pageDS.print("Page>>>>>>>>>>>");
        //側(cè)流:?jiǎn)?dòng)
        pageDS.getSideOutput(startOutPutTag).print("Start>>>>>>>>>>>>");
        //側(cè)流:曝光
        pageDS.getSideOutput(displayOutputTag).print("Display>>>>>>>>>>>>>");

        //輸出到kafka
        String pageSinkTopic = "dwd_page_log";
        String startSinkTopic = "dwd_start_log";
        String displaySinkTopic = "dwd_display_log";
        pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(pageSinkTopic));
        pageDS.getSideOutput(startOutPutTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(startSinkTopic));
        pageDS.getSideOutput(displayOutputTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(displaySinkTopic));

        env.execute();
    }
}
2)部署運(yùn)行

BaseLogApp.sh

#!/bin/bash

source  ~/.bashrc

cd $(dirname $0)
day=$(date +%Y%m%d%H%M)

#flink
job_name=02_dwd_BaseLogApp
clazz=com.flink.realtime.app.dwd.BaseLogApp
jar_path=/opt/module/gmall-flink/03_gmall2021-1.0-SNAPSHOT-jar-with-dependencies.jar

#-----------------------run----------------------------------------------
#yarn模式:per-job
/opt/module/flink-1.12.0/bin/flink run 
-t yarn-per-job 
-Dyarn.application.name=${job_name} 
-Dyarn.application.queue=default 
-Djobmanager.memory.process.size=1024m 
-Dtaskmanager.memory.process.size=1024m 
-Dtaskmanager.numberOfTaskSlots=2 
-c ${clazz} ${jar_path}

3.業(yè)務(wù)數(shù)據(jù)

3.1 實(shí)現(xiàn)功能

業(yè)務(wù)數(shù)據(jù)的變化,我們可以通過(guò)FlinkCDC采集到,但是FlinkCDC是把全部數(shù)據(jù)統(tǒng)一寫入一個(gè)Topic中, 這些數(shù)據(jù)包括事實(shí)數(shù)據(jù),也包含維度數(shù)據(jù),這樣顯然不利于日后的數(shù)據(jù)處理,所以這個(gè)功能是從Kafka的業(yè)務(wù)數(shù)據(jù)ODS層讀取數(shù)據(jù),經(jīng)過(guò)處理后,將維度數(shù)據(jù)保存到HBase,將事實(shí)數(shù)據(jù)寫回Kafka作為業(yè)務(wù)數(shù)據(jù)的DWD層。

3.2 動(dòng)態(tài)分流

由于FlinkCDC是把全部數(shù)據(jù)統(tǒng)一寫入一個(gè)Topic中, 這樣顯然不利于日后的數(shù)據(jù)處理。所以需要把各個(gè)表拆開處理。但是由于每個(gè)表有不同的特點(diǎn),有些表是維度表,有些表是事實(shí)表。

在實(shí)時(shí)計(jì)算中一般把維度數(shù)據(jù)寫入存儲(chǔ)容器,一般是方便通過(guò)主鍵查詢的數(shù)據(jù)庫(kù)比如HBase,Redis,MySQL等。一般把事實(shí)數(shù)據(jù)寫入流中,進(jìn)行進(jìn)一步處理,最終形成寬表。

這樣的配置不適合寫在配置文件中,因?yàn)檫@樣的話,業(yè)務(wù)端隨著需求變化每增加一張表,就要修改配置重啟計(jì)算程序。所以這里需要一種動(dòng)態(tài)配置方案,把這種配置長(zhǎng)期保存起來(lái),一旦配置有變化,實(shí)時(shí)計(jì)算可以自動(dòng)感知。

這種可以有兩個(gè)方案實(shí)現(xiàn)

一種是用Zookeeper存儲(chǔ),通過(guò)Watch感知數(shù)據(jù)變化。

另一種是用mysql數(shù)據(jù)庫(kù)存儲(chǔ),周期性的同步。

這里選擇第二種方案,主要是MySQL對(duì)于配置數(shù)據(jù)初始化和維護(hù)管理,使用FlinkCDC讀取配置信息表,將配置流作為廣播流與主流進(jìn)行連接。

1)建配置表:create.sql
--配置表
CREATE TABLE `table_process` (
  `source_table` varchar(200) NOT NULL COMMENT '來(lái)源表',
  `operate_type` varchar(200) NOT NULL COMMENT '操作類型 insert,update,delete',
  `sink_type` varchar(200) DEFAULT NULL COMMENT '輸出類型 hbase kafka',
  `sink_table` varchar(200) DEFAULT NULL COMMENT '輸出表(主題)',
  `sink_columns` varchar(2000) DEFAULT NULL COMMENT '輸出字段',
  `sink_pk` varchar(200) DEFAULT NULL COMMENT '主鍵字段',
  `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表擴(kuò)展',
  PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

品牌維表

 CREATE TABLE `base_trademark` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號(hào)',
  `tm_name` varchar(100) NOT NULL COMMENT '屬性值',
  `logo_url` varchar(200) DEFAULT NULL COMMENT '品牌logo的圖片路徑',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='品牌表' 
--品牌表:base_trademark表 insert 操作,插入hbase的dim_base_trademark只要 `id`,`name` 字段, `id`作為主鍵
insert into table_process values ('base_trademark','insert','hbase','dim_base_trademark','id,name','id','');
mysql> select * from table_process;
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
| source_table   | operate_type | sink_type | sink_table         | sink_columns | sink_pk | sink_extend |
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
| base_trademark | insert       | hbase     | dim_base_trademark | id,name      | id      |             |
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
1 row in set (0.00 sec)
2)配置類:TableProcess.java
import lombok.Data;

/**
 * @description: TODO 配置表實(shí)體類
 * @author: HaoWu
 * @create: 2021年06月25日
 */

@Data
public class TableProcess {
    //動(dòng)態(tài)分流Sink常量
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //來(lái)源表
    String sourceTable;
    //操作類型 insert,update,delete
    String operateType;
    //輸出類型 hbase kafka
    String sinkType;
    //輸出表(主題)
    String sinkTable;
    //輸出字段
    String sinkColumns;
    //主鍵字段
    String sinkPk;
    //建表擴(kuò)展
    String sinkExtend;
}

3)MysqlUtil.java
import com.flink.realtime.bean.TableProcess;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.base.CaseFormat;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

/**
 * @description: TODO Mysql工具類
 * @author: HaoWu
 * @create: 2021年07月23日
 * 完成ORM,對(duì)象關(guān)系映射
 * O:Object對(duì)象       Java中對(duì)象
 * R:Relation關(guān)系     關(guān)系型數(shù)據(jù)庫(kù)
 * M:Mapping映射      將Java中的對(duì)象和關(guān)系型數(shù)據(jù)庫(kù)的表中的記錄建立起映射關(guān)系
 */
public class MysqlUtil {

    /**
     * @param sql               執(zhí)行sql語(yǔ)句
     * @param clazz             封裝bean類型
     * @param underScoreToCamel 是否列名轉(zhuǎn)駝峰命名
     * @param <T>
     * @return
     */
    public static <T> List<T> queryList(String sql, Class<T> clazz, Boolean underScoreToCamel) {
        Connection con = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            // 注冊(cè)驅(qū)動(dòng)
            Class.forName("com.mysql.jdbc.Driver");
            // 獲取連接
            con = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall-realtime?characterEncoding=utf-8&useSSL=false", "root", "root");
            // 獲取數(shù)據(jù)庫(kù)操作對(duì)象
            ps = con.prepareStatement(sql);
            // 執(zhí)行sql
            rs = ps.executeQuery();
            // 處理結(jié)果集,封裝list對(duì)象
            ResultSetMetaData metaData = rs.getMetaData(); //獲取結(jié)果集元數(shù)據(jù)
            ArrayList<T> resultList = new ArrayList<>();
            while (rs.next()) {
                // 將單條記錄封裝對(duì)象
                T obj = clazz.newInstance();
                // 遍歷所有列,轉(zhuǎn)駝峰,對(duì)象屬性賦值
                for (int i = 1; i < metaData.getColumnCount(); i++) {
                    String columnName = metaData.getColumnName(i);
                    String propertyName = "";
                    if (underScoreToCamel) {
                        // 通過(guò)guava工具類,將表中的列轉(zhuǎn)換為類屬性的駝峰命名
                        propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName);
                    }
                    // 給屬性賦值
                    BeanUtils.setProperty(obj,propertyName,rs.getObject(i));
                }
                resultList.add(obj);
            }
            return resultList;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("從Mysql查詢數(shù)據(jù)失敗");
        } finally {
            // 釋放資源
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
            if (ps != null) {
                try {
                    ps.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
            if (con != null) {
                try {
                    con.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
        }
    }


    public static void main(String[] args) throws InvocationTargetException, IllegalAccessException {
        String sql="select * from table_process";
        List<TableProcess> list = MysqlUtil.queryList(sql, TableProcess.class, true);
        System.out.println(list);
        TableProcess tableProcess = new TableProcess();
        BeanUtils.setProperty(tableProcess,"sourceTable","redis");
        System.out.println(tableProcess);
    }
}
4)常量類:GmallConfig.java
package com.flink.realtime.common;

/**
 * @description: TODO 常量配置類
 * @author: HaoWu
 * @create: 2021年06月25日
 */
public class GmallConfig {
    //Phoenix庫(kù)名
    public static final String HBASE_SCHEMA = "bigdata";

    //Phoenix驅(qū)動(dòng)
    public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";

    //Phoenix連接參數(shù)
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";

    public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";

    public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop102:8123/default";
}
5)主程序:BaseDBApp.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.app.func.DimSink;
import com.flink.realtime.app.func.TableProcessFunction;
import com.flink.realtime.bean.TableProcess;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;

/**
 * @description: todo->準(zhǔn)備業(yè)務(wù)數(shù)據(jù)dwd層
 * @author: HaoWu
 * @create: 2021年06月22日
 */
public class BaseDbApp {
    public static void main(String[] args) throws Exception {
        // TODO 1.創(chuàng)建執(zhí)行環(huán)境
        // 1.1 創(chuàng)建stream執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.2 設(shè)置并行度
        env.setParallelism(1);
        /*
        // 1.3 設(shè)置checkpoint參數(shù)
        env.enableCheckpointing(5000L); //每5000ms做一次ck
        env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超時(shí)時(shí)間:1min
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默認(rèn):exactly_once
        //正常Cancel任務(wù)時(shí),保留最后一次CK
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        //狀態(tài)后端:
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_db_app"));
        // 訪問(wèn)hdfs訪問(wèn)權(quán)限問(wèn)題
        // 報(bào)錯(cuò)異常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
        // 解決:/根目錄沒有寫權(quán)限 解決方案1.hadoop fs -chown 777 /   2.System.setProperty("HADOOP_USER_NAME", "atguigu");
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        */

        // TODO 2.獲取kafka的ods層業(yè)務(wù)數(shù)據(jù):ods_basic_db
        String ods_db_topic = "ods_base_db";
        FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", ods_db_topic, "ods_base_db_consumer1", "false", "latest");
        DataStreamSource<String> jsonStrDS = env.addSource(kafkaConsumer);
        //jsonStrDS.print();
        // TODO 3.對(duì)jsonStrDS結(jié)構(gòu)轉(zhuǎn)換
        SingleOutputStreamOperator<JSONObject> jsonDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));

        // TODO 4.對(duì)數(shù)據(jù)ETL
        SingleOutputStreamOperator<JSONObject> filterDS = jsonDS.filter(
                json -> {
                    boolean flag = json.getString("table") != null //表名不為null
                            && json.getString("data") != null  //數(shù)據(jù)不為null
                            && json.getString("data").length() >= 3; //數(shù)據(jù)長(zhǎng)度大于3
                    return flag;
                }
        );
        //filterDS.print("filterDS>>>>>>>>>>");
        // TODO 5. 動(dòng)態(tài)分流:事實(shí)表放-主流 -> kafka dwd層 ,維度表-側(cè)輸出流 -> hbase
        // 5.1 定義輸出到Hbase的側(cè)輸出流標(biāo)簽
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE) {
        };
        // 5.2 主流輸出到kafka
        SingleOutputStreamOperator<JSONObject> kafkaDS = filterDS.process(new TableProcessFunction(hbaseTag));
        // 5.3 獲取側(cè)輸出流到hbase
        DataStream<JSONObject> hbaseDS = kafkaDS.getSideOutput(hbaseTag);

        kafkaDS.print("實(shí)時(shí):kafkaDS>>>>>>>>");
        hbaseDS.print("維度:hbaseDS>>>>>>>>");

        // TODO 6.維度數(shù)據(jù)保存到Hbase中
        hbaseDS.addSink(new DimSink());
        // TODO 7.實(shí)時(shí)數(shù)據(jù)保存到Kafka中,自定義序列化
        FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public void open(SerializationSchema.InitializationContext context) throws Exception {
                System.out.println("kafka序列化");
            }

            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
                String sink_topic = jsonObject.getString("sink_table");
                JSONObject data = jsonObject.getJSONObject("data");
                return new ProducerRecord<>(sink_topic, data.toString().getBytes());
            }
        });
        kafkaDS.addSink(kafkaSink);
        // TODO 8.執(zhí)行
        env.execute();
    }
}
6)自定義分流函數(shù):TableProcessFunction.java
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.bean.TableProcess;
import com.flink.realtime.common.GmallConfig;
import com.flink.realtime.utils.MysqlUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.List;

/**
 * @description: TODO 業(yè)務(wù)數(shù)據(jù)分流自定義Process函數(shù)
 * @author: HaoWu
 * @create: 2021年07月26日
 */
public class TableProcessFunction extends ProcessFunction<JSONObject, JSONObject> {

    //維表側(cè)輸出標(biāo)簽
    private OutputTag<JSONObject> outputTag;

    //內(nèi)存中存儲(chǔ)表配置對(duì)象{表名,表配置信息}
    private Map<String, TableProcess> tableProcessMap = new HashMap<>();

    //內(nèi)存中判斷是否已經(jīng)存在Hbase表
    private Set<String> existsTables = new HashSet<>();

    //定義Phoenix連接
    private Connection connection;

    public TableProcessFunction() {
    }

    public TableProcessFunction(OutputTag<JSONObject> outputTag) {
        this.outputTag = outputTag;
    }


    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化phoenix連接
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);

        //初始化配置表信息
        initTableProcessMap();
        //配置表的信息可能會(huì)發(fā)生表更,需要開啟定時(shí)任務(wù)從現(xiàn)在起5000ms后,每隔5000ms更新一次
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                initTableProcessMap();
            }
        }, 5000, 5000);
    }


    @Override
    public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
        //表名
        String tableName = jsonObj.getString("table");
        //操作類型
        String type = jsonObj.getString("type");
        //注意:?jiǎn)栴}修復(fù) 如果使用maxwell同步歷史數(shù)據(jù),他的操作類型是bootstrap-insert
        if ("bootstrap-insert".equals(type)) {
            type = "insert";
            jsonObj.put("type", type);
        }

        if (tableProcessMap != null && tableProcessMap.size() > 0) {
            //根據(jù)key取出配置信息
            String key = tableName + ":" + type;
            TableProcess tableProcess = tableProcessMap.get(key);
            //判斷是否獲取到配置對(duì)象
            if (tableProcess != null) {
                //獲取sinkTable,指明數(shù)據(jù)發(fā)往何處。 維度數(shù)據(jù)->hbase , 事實(shí)數(shù)據(jù)->kafka ,給這條數(shù)據(jù)打上一個(gè)標(biāo)記。
                jsonObj.put("sink_table", tableProcess.getSinkTable());
                //指定了sinkcolumn,對(duì)需要保留的字段進(jìn)行過(guò)濾
                String sinkColumns = tableProcess.getSinkColumns();
                if (sinkColumns != null && sinkColumns.length() > 0) {
                    filterColumn(jsonObj.getJSONObject("data"), sinkColumns);
                }
            } else {
                System.out.println("No this Key <<<< " + key + ">>>> in MySQL");
            }

            //根據(jù)sinkType輸出到不同的流
            if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)) {
                //sinkType=hbase 輸出到側(cè)輸出流
                ctx.output(outputTag, jsonObj);
            } else if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)) {
                //sinkType=kafka 輸出到主流
                out.collect(jsonObj);
            }

        }
    }


    /**
     * 從mysql查詢配置信息,保存到map內(nèi)存中
     */
    private void initTableProcessMap() {
        System.out.println("查詢配置表信息");
        //1.從mysql中查詢配置信息
        List<TableProcess> tableProcesses = MysqlUtil.queryList("select * from table_process", TableProcess.class, true);
        for (TableProcess tableProcess : tableProcesses) {
            String sourceTable = tableProcess.getSourceTable(); //源表
            String operateType = tableProcess.getOperateType(); //操作類型
            String sinkType = tableProcess.getSinkType(); //目標(biāo)表類型
            String sinkTable = tableProcess.getSinkTable(); //目標(biāo)表名
            String sinkPk = tableProcess.getSinkPk(); //目標(biāo)表主鍵
            String sinkColumns = tableProcess.getSinkColumns(); //目標(biāo)表字段
            String sinkExtend = tableProcess.getSinkExtend(); //擴(kuò)展字段
            //2.將配置信息封裝成map集合
            tableProcessMap.put(sourceTable + ":" + operateType, tableProcess);
            //3.檢查是表是否內(nèi)存中存在
            //如果向Hbase保存的表,那么判斷內(nèi)存中set是否存在過(guò)。
            if ("insert".equals(operateType) && "hbase".equals(sinkType)) {
                boolean isExist = existsTables.add(sinkTable);
                //4.如果內(nèi)存中不存在表數(shù)據(jù)信息,則創(chuàng)建新Hbase表
                if (isExist) {
                    checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
                }
            }
        }
    }

    /**
     * 通過(guò)Phoenix創(chuàng)建Hbase表
     *
     * @param tableName 表名
     * @param columns   列屬性
     * @param pk        主鍵
     * @param extend    擴(kuò)展字段
     */
    private void checkTable(String tableName, String columns, String pk, String extend) {
        //主鍵不存在給默認(rèn)值
        if (pk == null) {
            pk = "id";
        }
        //擴(kuò)展字段給默認(rèn)值
        if (extend == null) {
            extend = "";
        }
        //拼接sql建表語(yǔ)句
        StringBuilder createSql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + tableName + "(");
        //拼接列屬性
        String[] fieldArr = columns.split(",");
        for (int i = 0; i < fieldArr.length; i++) {
            String field = fieldArr[i];
            //判斷是否為主鍵
            if (field.equals(pk)) {
                createSql.append(field).append(" varchar primary key ");
            } else {
                createSql.append("info.").append(field).append(" varchar");
            }
            //非最后一個(gè)字段需要添加逗號(hào)
            if (i < fieldArr.length - 1) {
                createSql.append(",");
            }
        }
        createSql.append(")");
        createSql.append(extend);
        System.out.println("建表sql:" + createSql);
        //通過(guò)Phoenix創(chuàng)建hbase表
        PreparedStatement ps = null;
        try {
            ps = connection.prepareStatement(createSql.toString());
            ps.execute();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
            throw new RuntimeException("建表失敗!!!!!" + tableName);
        } finally {
            if (ps != null) {
                try {
                    ps.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
        }
    }

    /**
     * 篩選配置表中保留的字段
     *
     * @param data        每行數(shù)據(jù)記錄
     * @param sinkColumns 配置表保留字段
     */
    private void filterColumn(JSONObject data, String sinkColumns) {
        //需要保留的字段
        String[] columns = sinkColumns.split(",");
        //數(shù)組轉(zhuǎn)集合,判斷集合中是否包含某個(gè)元素
        List<String> columnList = Arrays.asList(columns);
        //獲取json中封裝的鍵值對(duì),每個(gè)鍵值對(duì)封裝為一個(gè)Entry類型
        Set<Map.Entry<String, Object>> entrySet = data.entrySet();
        /*for (Map.Entry<String, Object> entry : entrySet) {
            if (!columnList.contains(entry.getKey())) {
                entrySet.remove();
            }  遍歷集合刪除元素使用迭代器,for循環(huán)刪除會(huì)報(bào)錯(cuò)
        }*/
        Iterator<Map.Entry<String, Object>> iterator = entrySet.iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, Object> entry = iterator.next();
            if (!columnList.contains(entry.getKey())) {
                iterator.remove();
            }
        }
    }
}
7)HbaseSink:DimSink.java
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.common.GmallConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Set;

/**
 * @description: TODO Hbase sink 通過(guò)Phoenix向Hbase表中寫數(shù)據(jù)
 * @author: HaoWu
 * @create: 2021年07月30日
 */
public class DimSink extends RichSinkFunction<JSONObject> {

    //定義Phoenix連接
    Connection connection = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);

    }

    /**
     * 生成語(yǔ)句提交hbase
     *
     * @param jsonObj
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(JSONObject jsonObj, Context context) {
        String sinkTableName = jsonObj.getString("sink_table");
        JSONObject dataObj = jsonObj.getJSONObject("data");
        if (dataObj != null && dataObj.size() > 0) {
            String upsertSql = genUpdateSql(sinkTableName.toUpperCase(), jsonObj.getJSONObject("data"));
            System.out.println(upsertSql);
            try {
                PreparedStatement ps = connection.prepareStatement(upsertSql);
                ps.executeUpdate();
                connection.commit();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
                throw new RuntimeException("執(zhí)行upsert語(yǔ)句失敗!!!");
            }
        }
    }

    /**
     * 生成upsert語(yǔ)句
     *
     * @param sinkTableName
     * @param data
     * @return
     */
    private String genUpdateSql(String sinkTableName, JSONObject data) {
        Set<String> fields = data.keySet();
        String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTableName + " (" + StringUtils.join(fields, ",") + ")";
        String valuesSql = " values ('" + StringUtils.join(data.values(), "','") + "')";
        return upsertSql + valuesSql;
    }
}
8)自定義序列化 kafka sink
    /**
     * todo kafka sink 自定義序列化,各種類型自定義傳輸
     *
     * @return
     */
    public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
        Properties props = new Properties();
        //kafka地址
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //生產(chǎn)數(shù)據(jù)超時(shí)時(shí)間
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }


        // TODO 7.實(shí)時(shí)數(shù)據(jù)保存到Kafka中,自定義序列化
        FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public void open(SerializationSchema.InitializationContext context) throws Exception {
                System.out.println("kafka序列化");
            }

            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
                String sink_topic = jsonObject.getString("sink_table");
                JSONObject data = jsonObject.getJSONObject("data");
                return new ProducerRecord<>(sink_topic, data.toString().getBytes());
            }
        });

3.4 主程序:流程總結(jié)分析

TableProcessFunction是一個(gè)自定義算子,主要包括三條時(shí)間線任務(wù)

圖中紫線,這個(gè)時(shí)間線與數(shù)據(jù)流入無(wú)關(guān),只要任務(wù)啟動(dòng)就會(huì)執(zhí)行。主要的任務(wù)方法是open()這個(gè)方法在任務(wù)啟動(dòng)時(shí)就會(huì)執(zhí)行。他的主要工作就是初始化一些連接,開啟周期調(diào)度。

圖中綠線,這個(gè)時(shí)間線也與數(shù)據(jù)流入無(wú)關(guān),只要周期調(diào)度啟動(dòng),會(huì)自動(dòng)周期性執(zhí)行。主要的任務(wù)是同步配置表(tableProcessMap)。通過(guò)在open()方法中加入timer實(shí)現(xiàn)。同時(shí)還有個(gè)附帶任務(wù)就是如果發(fā)現(xiàn)不存在數(shù)據(jù)表,要根據(jù)配置自動(dòng)創(chuàng)建數(shù)據(jù)庫(kù)表。

圖中黑線,這個(gè)時(shí)間線就是隨著數(shù)據(jù)的流入持續(xù)發(fā)生,這部分的任務(wù)就是根據(jù)同步到內(nèi)存的tableProcessMap,來(lái)為流入的數(shù)據(jù)進(jìn)行標(biāo)識(shí),同時(shí)清理掉沒用的字段。

3.5 思考

1.目前的配置表只能識(shí)別新增的配置項(xiàng),不支持修改原有的配置項(xiàng) ?

4.整體流程圖分析

總結(jié)

以上是生活随笔為你收集整理的实时数仓(二):DWD层-数据处理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

中文字幕av一区二区三区四区 | 亚洲一二三久久 | 亚洲无吗视频在线 | 久久无码精品一区二区三区 | 日韩91在线 | 亚洲欧洲xxxx | 日韩毛片精品 | 日本久久片 | 欧美日韩精品二区第二页 | 久草精品视频在线观看 | 最新日韩视频 | 中文字幕在线观看完整版 | 中文字幕影片免费在线观看 | 久草在线在线精品观看 | 国产一区二区精品久久91 | 97品白浆高清久久久久久 | 黄色视屏av| 婷婷日 | 久久国产美女视频 | 超碰97免费观看 | 国产亚洲激情视频在线 | 日韩av在线看 | 在线日本看片免费人成视久网 | 91av手机在线 | 成人毛片在线观看视频 | 99久高清在线观看视频99精品热在线观看视频 | 成人午夜剧场在线观看 | 视频福利在线观看 | 日韩av免费观看网站 | 欧美国产三区 | 国产原厂视频在线观看 | 又长又大又黑又粗欧美 | 天天操天天干天天摸 | 国产精品色在线 | 色播激情五月 | 成年人毛片在线观看 | 久久精品一级片 | 日韩最新在线视频 | 91中文字幕网 | 久久精品国产免费看久久精品 | 国产精品永久久久久久久www | 婷婷久久五月 | 精品国产亚洲日本 | 国产精品一区二区无线 | 成人动图 | 人人爽影院| 天天艹日日干 | 亚洲一区二区高潮无套美女 | 国产亚洲综合精品 | 日本黄色免费播放 | 不卡的av | 爱av在线网 | 成人一区二区三区在线观看 | 国产资源在线免费观看 | 欧美va天堂va视频va在线 | 亚洲一区二区三区四区在线视频 | 88av视频| 亚洲激情校园春色 | av在线免费在线观看 | 在线播放精品一区二区三区 | 亚洲欧美国产精品18p | 一本一道久久a久久综合蜜桃 | 九九精品久久久 | 色婷婷综合五月 | 91精品国产99久久久久久久 | 日韩在线欧美在线 | 亚洲欧美日韩精品久久奇米一区 | 91免费视频网站在线观看 | 久久99国产精品久久99 | 免费视频a | 久久久久久久久久影院 | 色91av| 亚洲无吗av | 三级a毛片 | 婷婷色中文字幕 | 96精品高清视频在线观看软件特色 | 日韩动态视频 | 亚洲综合激情五月 | 亚洲免费av一区二区 | 黄色日本片 | 久草爱| 最近中文字幕免费视频 | 成人免费大片黄在线播放 | 亚洲精品欧洲精品 | 成人在线一区二区三区 | 国产精品破处视频 | 久久久精品免费看 | 国产在线精品国自产拍影院 | 9999毛片| 久久久伊人网 | 国产小视频在线观看 | 国产精品区在线观看 | 综合网久久 | 国产精品久久久久毛片大屁完整版 | 九九久久成人 | 免费福利在线视频 | 国产专区精品视频 | 成人黄色在线 | 丁香激情综合国产 | 成人永久视频 | 亚洲黄色影院 | 丁香九月婷婷 | 午夜精品一区二区三区可下载 | 免费久久99精品国产婷婷六月 | 婷婷成人亚洲综合国产xv88 | 久久久国产精品网站 | 日本视频网| 日韩成人免费电影 | 中文字幕精品一区二区精品 | 亚洲精品一区二区网址 | 六月婷色 | 国产成人精品一区二区三区福利 | 日操操| 久久久www成人免费毛片 | 日韩欧美精选 | 国产精品成人自拍 | 成人资源站 | 久久99国产精品二区护士 | 又黄又刺激视频 | 色偷偷av男人天堂 | 久久久精品一区二区 | 国产无遮挡又黄又爽馒头漫画 | 国产欧美三级 | 久插视频 | 欧美夫妻性生活电影 | 九九有精品 | 欧美久久久久久久久 | 天天干干| 国产第一页福利影院 | 久久99国产精品久久99 | 日韩电影中文,亚洲精品乱码 | www夜夜操 | 丁香婷婷激情国产高清秒播 | 色婷婷亚洲综合 | 欧美孕妇与黑人孕交 | 精品国产一区二区三区久久久蜜臀 | 99热日本 | 免费一级特黄毛大片 | 日韩在线视频网 | 欧美成人精品在线 | 国产 色 | 国产精品6 | 涩av在线| 黄色亚洲在线 | 天天操网站 | 久久影视网 | 亚洲电影图片小说 | 久久高清毛片 | av电影免费| 中文字幕一区二区三区四区在线视频 | 97超碰人人澡人人 | 国产一级特黄电影 | 在线视频观看国产 | 狠狠色狠狠色综合系列 | 国产一卡二卡四卡国 | av在线官网 | 天堂中文在线视频 | 亚洲成av人片一区二区梦乃 | 欧美激情综合五月色丁香小说 | 国产一二区视频 | 日韩在观看线 | 国产精品久久久久久久久婷婷 | 天天综合区 | 日本精品中文字幕 | 日日操日日插 | 人人干人人做 | 成年人在线看片 | 国产原创在线视频 | 91.dizhi永久地址最新 | 国产精品一区二区三区在线免费观看 | 婷婷去俺也去六月色 | 久久99婷婷 | 国产一区二区在线免费播放 | 国产福利午夜 | 91九色网址 | 国产精品免费久久久久久久久久中文 | 亚洲精品国偷自产在线91正片 | 免费a级毛片在线看 | 国产精品久久久久影视 | 天天摸天天操天天舔 | 精品国产一区二区三区四区vr | 久久精品国产免费看久久精品 | 久久国产精品视频观看 | 激情xxxx | 成人在线视频免费观看 | 91av资源网| 国产成人一区二区三区在线观看 | 91麻豆精品久久久久久 | 激情丁香综合五月 | 国产资源 | 美女黄频在线观看 | 久久精品在线免费观看 | 国产又粗又硬又长又爽的视频 | 99视频免费观看 | 国产又粗又猛又爽又黄的视频先 | 亚洲最大激情中文字幕 | 亚洲国产精品一区二区久久hs | 日韩一区二区三区高清在线观看 | 综合天天网 | 国产成人黄色av | 欧美成人h版电影 | 99久久精品免费看 | 国产理论一区二区三区 | 国产高清视频色在线www | 一本一本久久a久久精品综合 | 91精品视频在线看 | 国产日韩欧美在线观看视频 | 日本成人中文字幕在线观看 | 在线成人免费av | 99国产在线| 91激情视频在线观看 | 在线免费观看视频你懂的 | 久久亚洲精品国产亚洲老地址 | 亚州精品国产 | 91福利影院在线观看 | 中文字字幕在线 | 丁香色婷| 国产精品视频在线观看 | 婷婷综合视频 | av在线小说 | 国产视频中文字幕在线观看 | 国产v欧美 | 福利二区视频 | 亚洲视频999| 一区二区三区在线免费观看视频 | 黄色成品视频 | 中文字幕永久在线 | 日本精品一区二区三区在线播放视频 | 经典三级一区 | 黄色免费观看网址 | 超碰999| 波多野结衣视频一区二区 | av性网站 | 国产99在线免费 | 毛片网在线观看 | 欧美精品一区二区在线播放 | 波多野结衣一区二区三区中文字幕 | 欧美极品在线播放 | 97色婷婷人人爽人人 | 黄av免费在线观看 | 精品国产免费一区二区三区五区 | 丁香婷五月 | 在线观看日韩一区 | 黄色小网站免费看 | 日韩成人精品一区二区 | 久久国产综合视频 | 国产在线一区二区三区播放 | 国产精品久久久久久久毛片 | 伊人伊成久久人综合网站 | 国产这里只有精品 | av网址最新 | 色吊丝在线永久观看最新版本 | 99亚洲国产 | 正在播放 国产精品 | 永久免费的av电影 | 日日操天天射 | 99久久www | 婷婷干五月 | 久久一区二区三区国产精品 | 久久精品这里热有精品 | 国产视频一区二区在线 | 婷婷精品国产欧美精品亚洲人人爽 | 国产精品视频线看 | 在线观看福利网站 | 欧美精品乱码久久久久久按摩 | 99视频在线免费看 | 激情图片区| 91精品一区二区三区久久久久久 | 国产精品久久久av | 五月婷在线播放 | 激情久久一区二区三区 | 九九av| 国产亚洲精品成人av久久影院 | 91亚洲激情 | 日韩高清一| 成人免费大片黄在线播放 | a级国产片 | 久久免费视频在线观看 | 精品亚洲视频在线 | 日韩精品免费一线在线观看 | 欧美一级性生活视频 | 亚洲一区二区三区四区精品 | www色 | 在线观看精品视频 | 久久专区 | 中文字幕在线观看视频网站 | 日韩xxx视频 | 欧美一级黄色网 | 天天色天| 久久国产a| 日韩在线| 国产精品你懂的在线观看 | 97碰在线 | 久久天堂网站 | 国产精品二区在线 | 在线精品在线 | 日韩大片在线观看 | 91av手机在线观看 | 中文字幕首页 | 久久香蕉电影网 | 国产乱码精品一区二区三区介绍 | 久久激情综合网 | 天天色综合久久 | 国产xxxx| 欧美日韩在线视频一区二区 | 九九九九热精品免费视频点播观看 | 日韩亚洲国产中文字幕 | 日韩av电影中文字幕 | 91亚洲精品视频 | 91成人亚洲 | 日韩一区二区三免费高清在线观看 | 欧美在线1| 超碰97在线看| 久久久久亚洲精品 | 国产在线第三页 | 亚洲免费精品一区二区 | 国产精品资源在线 | 久久精品视频国产 | 一级免费黄视频 | 九九色综合 | 一区二区三区免费 | 在线播放精品一区二区三区 | 66av99精品福利视频在线 | 国产原创在线 | 亚洲综合在线观看视频 | 在线91观看 | 国产h在线播放 | 久草视频在线资源站 | 正在播放国产一区 | 久久久国产一区二区三区 | 久久国产99| 婷婷六月综合亚洲 | 国产精成人品免费观看 | 成人va在线观看 | 综合色中文 | 丁香五月亚洲综合在线 | av资源免费在线观看 | 国产精品女人久久久 | av免费网页 | 天天在线免费视频 | 日韩av网站在线播放 | 91视频亚洲 | 亚洲精品欧美专区 | 久久久香蕉视频 | 色av男人的天堂免费在线 | 亚洲精品在线视频观看 | 久久综合狠狠综合久久狠狠色综合 | 日本黄区免费视频观看 | 99国产在线视频 | 久久婷婷国产 | 日日爱av | 激情久久一区二区三区 | 亚洲二区精品 | 亚洲91精品在线观看 | 免费视频三区 | 高清有码中文字幕 | 久久精品毛片 | 99精品国产99久久久久久97 | 国产精品18久久久久久久久 | 欧美精品一区二区蜜臀亚洲 | 激情综合网五月激情 | 亚洲一级国产 | www免费黄色| 久久久精品久久 | 国产精品久久久久久久电影 | 国产视频一区在线播放 | 成人av电影免费在线播放 | 国产精品乱码久久久久久1区2区 | 日本天天色 | 午夜精品视频免费在线观看 | 国产偷国产偷亚洲清高 | 成人免费中文字幕 | 麻豆视频免费入口 | 久久久久久久久免费视频 | 精品综合久久久 | 国产高清av免费在线观看 | 国产麻豆精品免费视频 | av在线免费不卡 | 亚洲国产一区二区精品专区 | 麻豆av一区二区三区在线观看 | 国产一区在线免费观看 | 免费在线观看黄网站 | 国产日韩精品视频 | 奇米先锋| 九九免费观看视频 | 天天色草 | 激情 一区二区 | 正在播放亚洲精品 | 中文字幕av全部资源www中文字幕在线观看 | 狠狠操在线 | 中文字幕制服丝袜av久久 | 欧美一级免费高清 | 精品婷婷 | 人人澡视频 | 夜夜躁日日躁狠狠久久av | 亚洲国产精品第一区二区 | 黄色三级在线观看 | 久久超 | 狠狠躁夜夜躁人人爽超碰91 | 亚洲欧美综合精品久久成人 | 久久久免费电影 | 一区二区三区高清 | 国产一级片免费视频 | 婷婷丁香激情五月 | 美女视频黄是免费的 | 人人爽人人干 | 成人久久影院 | 欧美色图亚洲图片 | 久久99久久99免费视频 | 亚洲国内在线 | 美女天天操 | 精品人妖videos欧美人妖 | 香蕉视频免费看 | 99在线免费观看视频 | 久久在线免费观看视频 | 99久高清在线观看视频99精品热在线观看视频 | 欧美a级片免费看 | 国产高清免费 | 97成人资源 | 亚洲精品一区二区三区四区高清 | av在线免费观看网站 | 欧美日韩天堂 | 草久在线观看 | 国产成人91 | 国产精品一区二区在线 | 一区二区在线电影 | 国产精品theporn| 欧美精品久久久久久久久久久 | 日韩中文字幕网站 | 日本久久久久久 | 亚洲免费一级电影 | a视频免费在线观看 | 婷婷综合电影 | 日韩在线视频免费观看 | 极品久久久久 | 字幕网资源站中文字幕 | 久久伊人色综合 | adc在线观看 | 一区二区三区日韩在线 | 九色视频网站 | 久久国产精品久久久 | 国产在线观看91 | 超碰av在线免费观看 | 9在线观看免费高清完整版 玖玖爱免费视频 | 涩涩网站在线 | 免费看短| 国产专区欧美专区 | 亚洲一级特黄 | 午夜精品福利一区二区三区蜜桃 | 在线91av| 亚洲影院一区 | 狠狠综合久久av | 国产精品久久久一区二区三区网站 | 日本久久片 | 色偷偷88欧美精品久久久 | 黄色1级毛片| 中文av在线播放 | 免费十分钟 | 毛片在线播放网址 | 在线精品观看国产 | 久久精久久精 | 黄色1级大片 | 国产精品免费久久久久久久久久中文 | 精品成人网 | 亚洲欧美999 | 欧美亚洲三级 | 97精品国产97久久久久久春色 | 日韩激情视频在线 | 999久久国精品免费观看网站 | 成人小视频在线观看免费 | 欧美亚洲国产精品久久高清浪潮 | 波多野结衣在线观看一区二区三区 | 在线免费高清视频 | 久艹视频在线免费观看 | 中文字幕a∨在线乱码免费看 | avcom在线 | 9999在线视频 | 天天射天天射天天 | 久久精品老司机 | 天天色综合三 | 青草草在线视频 | 久草在线久 | 日韩黄色免费电影 | 国产69精品久久久久99尤 | 在线成人高清电影 | 九九视频精品在线 | 亚洲区另类春色综合小说 | 99视频在线观看免费 | 国产一区视频在线 | 成人国产电影在线观看 | 国产精品一区一区三区 | 国产精品中文字幕在线 | 香蕉视频免费看 | 射九九 | www.夜夜干.com | 久久国产精品免费 | 玖草在线观看 | av久久在线 | 999久久久久久久久久久 | 免费av影视| 91麻豆国产 | av电影免费在线 | 999电影免费在线观看2020 | 91成人免费看 | 国产精品一区二区三区在线看 | 999久久国精品免费观看网站 | 天天综合网 天天综合色 | 懂色av一区二区三区蜜臀 | 超碰成人免费电影 | 久久视频在线观看免费 | 色多视频在线观看 | 免费在线视频一区二区 | 午夜视频亚洲 | 亚洲成aⅴ人片久久青草影院 | 在线中文字幕电影 | 国产精品一区二区三区在线播放 | 精品国产乱码久久久久久1区2匹 | 国产精美视频 | 中文字幕免费高 | 日韩免费在线观看网站 | 色综合五月天 | 狠狠干成人综合网 | 天天草天天干天天 | 国产欧美最新羞羞视频在线观看 | 天天射色综合 | 久久8精品 | 国产精品免费在线观看视频 | 又污又黄的网站 | 狠狠干 狠狠操 | 亚洲欧美综合精品久久成人 | 超碰97人 | 久久视频网址 | 色av男人的天堂免费在线 | 2021av在线 | 91精品视频导航 | 亚洲综合欧美精品电影 | a在线v| 日韩av在线免费看 | 成人黄色资源 | av一级片在线观看 | 日本久久久亚洲精品 | 国产午夜精品视频 | 三级在线播放视频 | 免费黄色a网站 | 免费人成在线观看网站 | 99久久99久久综合 | 91视频在线播放视频 | 色狠狠综合天天综合综合 | 国产不卡精品 | 国产精品综合久久 | 国产精品婷婷午夜在线观看 | 久草在线视频免赞 | 91免费看黄色| 久久一二三四 | 久久亚洲欧美日韩精品专区 | 亚洲高清在线 | 国产精品美女免费看 | 久久久久久久久久久影院 | 欧美黄色特级片 | 国产福利在线免费 | 成人国产电影在线观看 | 国产精品久久久久久久久久ktv | 亚洲午夜精品在线观看 | 高清精品在线 | 久久精品国产免费看久久精品 | 久久精品人人做人人综合老师 | 日韩午夜精品福利 | 欧美精品一区二区在线观看 | 日本3级在线观看 | av播放在线 | 亚洲欧美日韩国产一区二区三区 | 51久久夜色精品国产麻豆 | 狠狠干成人综合网 | 九九热精品视频在线播放 | 亚洲午夜av电影 | 六月丁香伊人 | 久久久久亚洲精品国产 | 国产女人18毛片水真多18精品 | 精品国产一区二区三区日日嗨 | 免费电影播放 | 337p西西人体大胆瓣开下部 | 黄色电影网站在线观看 | 亚洲精品国产成人 | 欧美色久 | 日韩中文字幕免费在线播放 | 99国产视频 | 国产精品入口麻豆www | 欧美福利精品 | 成人9ⅰ免费影视网站 | 国产精品免费麻豆入口 | 91一区二区在线 | 在线影视 一区 二区 三区 | 波多野结衣视频一区 | 国产一区二区电影在线观看 | 久久久人人爽 | 国产亚洲精品女人久久久久久 | 久久天堂网站 | 精品久久久久久一区二区里番 | 一区二区三区手机在线观看 | 欧美精品亚洲二区 | www.888av | 日韩一区二区三区在线观看 | 免费av片在线 | 一级欧美一级日韩 | 欧洲精品二区 | 九九热只有精品 | av中文在线播放 | 美女在线免费视频 | 一本一本久久a久久精品综合妖精 | 欧美视频日韩 | 一区二区中文字幕在线播放 | 四虎免费av | 国产精品网址在线观看 | 免费黄色在线网站 | 免费成人黄色片 | 97精品国产91久久久久久 | 五月婷综合 | 日韩毛片在线播放 | 亚洲成人高清在线 | 久久久精品国产免费观看一区二区 | 狠狠狠狠狠干 | 久久免费国产电影 | 精品国产一区二区三区久久久蜜月 | 狠狠狠狠干 | 国产美女在线精品免费观看 | 一区中文字幕电影 | 天堂在线v | 日韩欧美高清在线观看 | 日韩精品你懂的 | 色爱成人网 | 四虎成人精品永久免费av | 久久久九色精品国产一区二区三区 | 久久av免费观看 | 一区二区三区高清不卡 | 中文字幕在线观看免费高清完整版 | 成人精品视频久久久久 | 久久久久久久久久久高潮一区二区 | 国产最新91| 在线观看免费成人av | 韩日在线一区 | 日韩欧美高清 | 亚洲激色 | 欧美一区二区视频97 | 免费日韩精品 | 久久精品一区二区 | av看片在线观看 | 欧美91片| 丁香六月婷婷激情 | 久久国产精品久久久 | 久久视频精品在线观看 | 国产九九热视频 | 国产精品久久网站 | 久久国产一区二区三区 | 国产欧美久久久精品影院 | 成人啪啪18免费游戏链接 | 久久久九九 | 日本天天色 | 国产成人综合在线观看 | 色狠狠狠 | 久久综合久久久久88 | 亚洲四虎在线 | 中文字幕在线免费观看视频 | 欧美日韩后| 亚洲成 人精品 | 天天草天天色 | 欧美日韩精品影院 | 一区二区三区在线电影 | 国产午夜精品一区二区三区 | 国产免费小视频 | 欧美精品亚洲二区 | 国产九九九九九 | 日韩伦理一区二区三区av在线 | 免费一级特黄录像 | 日日躁夜夜躁xxxxaaaa | 欧美a视频在线观看 | 日本韩国在线不卡 | 久草在线免费资源 | 日韩精品免费在线观看 | 麻豆视频免费入口 | 久久精品8 | 国内一级片在线观看 | 久久艹免费 | 欧美激情精品久久久久久免费印度 | 日韩精品一区二区三区免费视频观看 | 成人97视频一区二区 | 天堂在线一区二区 | 国产午夜精品一区二区三区在线观看 | 超碰在线99 | 91丨九色丨国产在线 | 日韩一二三 | 亚洲精品视频在线观看视频 | 久久tv| 视频国产在线 | 欧美一级性生活片 | 丁香五月网久久综合 | 国内久久看 | 日韩精品免费一区二区三区 | 国产激情电影综合在线看 | 精品国产1区 | 久久久国产精品一区二区三区 | 欧美日韩一区二区三区不卡 | 成年人免费看的视频 | 国产精在线 | 色婷婷免费视频 | 91最新视频 | 久久人人艹| 国产黑丝一区二区三区 | 日韩在线观看不卡 | 激情欧美日韩一区二区 | 久久久国产电影 | 91av小视频| 特黄一级毛片 | 国产精品男女视频 | 久草精品视频 | 中文高清av| 天天看天天干天天操 | 99视频一区二区 | 三级免费黄色 | 超碰97.com | 91九色视频国产 | 免费性网站 | 91网页版免费观看 | 国产高清精 | 欧美成人999 | 欧美一级片免费在线观看 | 日韩精品中文字幕在线观看 | 国偷自产中文字幕亚洲手机在线 | 五月婷丁香网 | 在线观看深夜视频 | 久久综合影院 | a级国产乱理伦片在线播放 久久久久国产精品一区 | 色婷婷av一区 | 1024手机基地在线观看 | 91探花国产综合在线精品 | 亚洲伊人婷婷 | 午夜色婷婷 | 久久久一本精品99久久精品 | 日韩一区二区三区免费电影 | 91九色在线观看视频 | 久久综合狠狠综合久久狠狠色综合 | 国内精品久久久久 | 毛片无卡免费无播放器 | 麻豆传媒电影在线观看 | 精品美女久久久久 | 久久久999 | 人人揉人人揉人人揉人人揉97 | 免费观看高清 | 国产一级做a爱片久久毛片a | 人人澡澡人人 | 久色免费视频 | 久草a在线 | 91视频在线免费看 | 免费在线一区二区 | 奇米导航| 色综合中文综合网 | 99热精品国产 | 久久看片网 | 黄色1级毛片 | 中文字幕一区二区三区精华液 | 日韩精品一区电影 | 中文十次啦 | 免费人做人爱www的视 | 成人毛片一区二区三区 | 国产精品久久久久久久免费 | 久久久精品免费看 | 免费成人在线电影 | av高清免费 | 波多野结衣一区二区三区中文字幕 | 国产高清专区 | 日韩黄色在线电影 | 久久艹艹 | 最新婷婷色 | 欧美久久久久 | 日韩免费网址 | 黄色www在线观看 | 精品久久一区 | 国产精品成人国产乱一区 | 国产精品女人久久久 | av资源在线看 | www.色午夜 | 久久综合婷婷综合 | 99热精品国产一区二区在线观看 | 国内精品久久久久久久影视简单 | 在线 视频 亚洲 | 丁香 久久 综合 | 狠狠色丁香婷婷 | 国产成人一区二区三区久久精品 | 这里只有精彩视频 | 久久久精品国产免费观看一区二区 | 久久免费国产精品 | 日日摸日日爽 | 亚洲激情视频在线观看 | 98超碰在线 | 国产精品一区二区久久精品爱涩 | av中文字幕网 | 日韩在线三级 | 99九九免费视频 | 国产精品 国内视频 | 午夜精品久久久久久久久久久 | 黄色的网站免费看 | 欧美电影黄色 | 日日爽天天 | 在线精品观看 | 久久精品国产亚洲精品2020 | 一级黄色在线免费观看 | 久久久久在线 | 国产精品色婷婷视频 | 国内成人综合 | 国产亚洲精品女人久久久久久 | 国产精品久久久区三区天天噜 | 久久综合一本 | 欧美国产在线看 | 中文字幕亚洲不卡 | 久久伊99综合婷婷久久伊 | 在线 国产 亚洲 欧美 | 视频高清 | 国产精品国产三级国产专区53 | 天天躁日日躁狠狠 | 国产中文字幕网 | 久草在线高清视频 | 国外av在线 | 免费视频成人 | 久热久草在线 | 在线欧美最极品的av | 夜夜看av| 天天操导航 | www.com.日本一级 | 日韩av高潮| 97色综合| 高清一区二区三区av | av在线一级 | 久久国产视频网站 | 国产高清视频免费观看 | 久久久久国产视频 | 综合网中文字幕 | 在线观看不卡的av | 天天操天天操 | 日本成人黄色片 | 精品久久综合 | 黄色一级在线免费观看 | 久久资源总站 | 黄色一级免费 | 欧美男同网站 | 色综合五月天 | 精品国产免费一区二区三区五区 | 国产成人精品一区二区三区在线观看 | 五月婷婷激情综合 | 久久精品欧美一 | 成人午夜电影在线观看 | av成人免费网站 | 97超碰在线资源 | 久亚洲 | 久久久久www | 九九热精品视频在线播放 | 国产最新在线观看 | 国产精品网站一区二区三区 | 精品久久一二三区 | 日韩美一区二区三区 | 国产精品入口a级 | 国产 日韩 欧美 中文 在线播放 | 狠狠干成人 | 久久1区 | 色视频国产直接看 | 四虎影视www | 六月激情丁香 | 国产精品美女久久久久久久 | 精品人人人人 | 精品视频专区 | 日韩在线大片 | 国产黄色片久久 | 三级av中文字幕 | 成人午夜免费福利 | 亚洲精品99久久久久久 | 国内精品久久久久 | 96视频免费在线观看 | 99精品久久99久久久久 | 日韩在线观看你懂得 | 黄色一级性片 | 亚洲一区二区精品视频 | 国产精品视频免费看 | 国内揄拍国产精品 | 国产精品18久久久久久首页狼 | 色网站在线免费观看 | 国产精品久久久久久久久久久久午夜 | 中文av不卡 | 午夜视频99| 婷婷丁香色综合狠狠色 | 99久国产| www好男人| 亚洲成人免费在线 | 久久久久久久久影院 | 日韩久久电影 | 久草在线最新免费 | 欧美午夜视频在线 | 国产网站色 | 精品视频免费 | 中文字幕免费国产精品 | 美女久久一区 | 久久草在线免费 | 免费网站看v片在线a | 亚洲禁18久人片 | 国产精品18久久久久久不卡孕妇 | 中文字幕 在线看 | 狠狠干夜夜操天天爽 | 久久tv| 日本大片免费观看在线 | 激情电影在线观看 | 欧美日韩高清一区二区三区 | 不卡av在线 | 婷婷丁香导航 | www.av中文字幕.com | 欧美日韩在线免费观看 | 日韩激情免费视频 | 日韩精品极品视频 | 日韩av一区二区在线播放 | 91亚洲精品久久久蜜桃借种 | 最近最新最好看中文视频 | 中文字幕麻豆 | 国际精品久久久 | 日韩在线高清免费视频 | 欧美日韩一区二区三区在线免费观看 | 黄色网中文字幕 | 久久精品伊人 | 中文字幕日韩一区二区三区不卡 | 日韩av一区二区三区在线观看 | 午夜视频在线观看一区二区三区 | av福利在线播放 | 久久免费在线视频 | 亚洲理论在线 | 美女网站色免费 | 日本久久电影 | 福利视频第一页 | 黄色a视频免费 | 激情亚洲综合在线 | 久久精品国产亚洲精品 | 亚洲激情av | 久久免费a | 日韩av电影免费观看 | 欧美日韩不卡在线 | 成人久久久电影 | 国产精品一区二区三区在线看 | a爱爱视频| 免费一级片在线观看 | 国产精品一区二区三区99 | 欧美激情视频一二区 | 精品国模一区二区三区 | 久久电影中文字幕视频 | 久久久久久久久久久久久影院 | 久久九精品 | 亚洲国产日韩一区 | 欧美午夜久久 | 久草网视频 | 日韩欧美成| 99理论片 | 国产一区二三区好的 | 在线网址你懂得 | 亚洲激情免费 | 日韩影视在线 | 国产免费不卡 | 久久国产精品成人免费浪潮 | 欧美一级乱黄 | 特黄特黄的视频 | 日韩av成人在线观看 | 91综合久久一区二区 | 国产日产精品久久久久快鸭 | 色婷婷久久久 | 久久免费视频网 | 欧美色图p | 中文资源在线播放 | 亚洲精品一区二区三区新线路 | 中文字幕一区二区三区视频 | 欧美日韩视频在线一区 | 国产二级视频 | 黄色在线看网站 | 中文av不卡 | 菠萝菠萝在线精品视频 | 日韩欧美大片免费观看 | 色免费在线 | 亚洲精品乱码久久久久久 | 丁香花中文在线免费观看 | 天天干夜夜擦 | 日本狠狠干 | 成人少妇影院yyyy | 国产一区二区免费看 | 国产美女被啪进深处喷白浆视频 | 日韩一二区在线 | 成人免费观看大片 | 一区二区三区在线视频111 | 超碰在97 | 日本中文字幕网站 | 久99视频 | 一本到在线 | 日韩黄色免费看 | 午夜免费久久看 | 欧美在线视频一区二区三区 | 韩国精品一区二区三区六区色诱 | 亚洲最快最全在线视频 | 美女视频黄是免费的 |