日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

数据湖架构Hudi(五)Hudi集成Flink案例详解

發布時間:2024/1/8 编程问答 61 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据湖架构Hudi(五)Hudi集成Flink案例详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

五、Hudi集成Flink案例詳解

5.1 hudi集成flink

flink的下載地址:

https://archive.apache.org/dist/flink/

HudiSupported Flink version
0.12.x1.15.x、1.14.x1.13.x
0.11.x1.14.x、1.13.x
0.10.x1.13.x
0.9.01.12.2
  • 將上述編譯好的安裝包拷貝到flink下的jars目錄中:
cp /opt/apps/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/apps/flink-1.13.6/lib/
  • 拷貝guava包,解決依賴沖突
cp /opt/apps/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flink-1.13.6/lib/
  • 配置Hadoop環境變量
vim /etc/profile.d/my_env.shexport HADOOP_CLASSPATH=`hadoop classpath` export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopsource /etc/profile.d/my_env.sh

5.2 sql-client之yarn-session模式

配置hadoop調度器yarn

mapred-site.xml<configuration> <!-- 指定MapReduce作業執?時,使?YARN進?資源調度 --><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value> </property><property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property><property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property> </configuration>yarn-site.xml<configuration> <!-- 設置ResourceManager --><property><name>yarn.resourcemanager.hostname</name><value>centos04</value> </property> <!--配置yarn的shuffle服務--><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property> </configuration>hadoop-env.sh # 在最后面添加如下: export YARN_RESOURCEMANAGER_USER=root export YARN_NODEMANAGER_USER=root# 記得配置sql-client-defaults.yaml

5.2.1 啟動

# 1、修改配置文件 vim /opt/apps/flink-1.13.6/conf/flink-conf.yamlclassloader.check-leaked-classloader: false taskmanager.numberOfTaskSlots: 4state.backend: rocksdb execution.checkpointing.interval: 30000 # 開啟ck,才能快速從內存中flush出去 state.checkpoints.dir: hdfs://centos04:9000/ckps state.backend.incremental: true# 2、yarn-session模式啟動# 解決依賴問題 cp /opt/apps/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/apps/flink-1.13.6/lib/# 啟動yarn-session /opt/apps/flink-1.13.6/bin/yarn-session.sh -d # 啟動sql-client /opt/apps/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session

5.2.2 插入數據

set sql-client.execution.result-mode=tableau;-- 創建hudi表 CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' -- 默認是COW );或如下寫法 CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' );-- 插入數據 INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');-- 查詢數據 select * from t1;

5.2.3 流式插入

-- 1、創建測試表 CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20) ) WITH ('connector' = 'datagen','rows-per-second' = '1' );create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20) ) with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ' );-- 2、執行插入 insert into t2 select * from sourceT;查詢結果 set sql-client.execution.result-mode=tableau; Flink SQL> select * from t2 limit 10; -- 會產生一個collect的flink任務,拉取10條數據,注意:不是流讀取 2023-03-06 22:45:10,403 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false 2023-03-06 22:45:12,897 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at centos04/192.168.42.104:8032 2023-03-06 22:45:12,899 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2023-03-06 22:45:12,918 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface centos04:45452 of application 'application_1678113536312_0001'. +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | op | uuid | name | age | ts | partition | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | +I | d0523c31d3da5b8e2a8ff676dcf... | 327db70824413c5dcde0a7ac10c... | 1971040768 | 2023-03-06 14:40:58.780 | 42b45346672bf719b5393232763... | | +I | cfc07cbebf6890a04942ec88947... | 36fc7a58aab88835f11b3b51a40... | -12199364 | 2023-03-06 14:41:05.781 | e33c02173f4c744fb9c1c68e774... | | +I | 668b204a933494a89b829c76bc6... | aa9ff2109457fdcd5f099b8ce98... | 2061449955 | 2023-03-06 14:41:14.780 | 680514e53b196324423cd12cda5... | | +I | 95fe7878909a801c2726f1d05f5... | 1c86b29fe313e557688df0ba950... | 519997290 | 2023-03-06 14:41:11.781 | b9817c52301ab4614c3053c9ccc... | | +I | 8661c25c8c930f4660fbefa867e... | 01a2bee6b99064c7bca9513ca37... | -682830738 | 2023-03-06 14:41:32.781 | 16ab837502a31e208b06bb74efd... | | +I | 55ce03895e229b29546dbdd2ff3... | 77f2552de13337e8092c1445654... | 2011273584 | 2023-03-06 14:41:09.780 | 3fd688cfa17b2a3a6fd3ffac6bd... | | +I | 50c23f315d736c313b652b34fc5... | 4f9c84ff75466fba8e800daabd0... | -190184764 | 2023-03-06 14:42:26.780 | 7f2a07a1007b2fbfea8cbb2062e... | | +I | 8073e8c70a9bc0e79c2e69aa885... | 30bf89c80d9ab0f0a8f5f883ee6... | -1639873427 | 2023-03-06 14:41:24.781 | 15df7d527d6d7edae496e76d02f... | | +I | 29a61b7cd348d08498d2b089a5d... | 77a63ca7a2e77e6d167de20c673... | 71527378 | 2023-03-06 14:42:14.781 | 2842db44a691f4f1d597ac79086... | | +I | e5defc24191f60557644b7d14e2... | 56bdd04424b8f422d4075ade510... | 1054223989 | 2023-03-06 14:40:42.781 | e8d2d3c6fed90d37b15647d1ecd... | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+

5.3 使用IDEA開發

除了用sql-client,還可以自己編寫FlinkSQL程序,打包提交Flink作業。

1、首先,需要將hudi集成flink的jar包,裝載到本地的倉庫,命令如下:

D:\bigdata\hudi從入門到精通\apps>mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar[INFO] Scanning for projects... [INFO] [INFO] ------------------< org.apache.maven:standalone-pom >------------------- [INFO] Building Maven Stub Project (No POM) 1 [INFO] --------------------------------[ pom ]--------------------------------- [INFO] [INFO] --- maven-install-plugin:2.4:install-file (default-cli) @ standalone-pom --- [INFO] Installing D:\bigdata\hudi從入門到精通\apps\hudi-flink1.13-bundle-0.12.0.jar to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.jar [INFO] Installing C:\Users\Undo\AppData\Local\Temp\mvninstall50353756903805721.pom to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.pom [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.111 s [INFO] Finished at: 2023-03-02T10:08:15+08:00 [INFO] ------------------------------------------------------------------------

2、導入pom文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>hudi-start</artifactId><groupId>com.yyds</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>hudi-flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><hudi.version>0.12.0</hudi.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope> <!--不會打包到依賴中,只參與編譯,不參與運行 --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--idea運行時也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><!--手動install到本地maven倉庫,要不然會報錯--><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink_2.12</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project> package com.yyds.hudi.flink;import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiTest {public static void main(String[] args) {System.setProperty("HADOOP_USER_NAME","root");// 1、創建flinksql的執行環境Configuration conf = new Configuration();conf.setString(RestOptions.BIND_PORT, "8081-8089");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);// 注意:需要設置check-point// 設置狀態后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://centos04:9000/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2、使用flink自帶connector模擬數據tabEnv.executeSql("CREATE TABLE sourceT (\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1'\n" +")");// 3、創建hudi表tabEnv.executeSql("create table t2(\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +")\n" +"with (\n" +" 'connector' = 'hudi',\n" + // 指定connector為hudi" 'path' = 'hdfs://192.168.42.104:9000/datas/hudi_warehouse/hudi_flink/t2',\n" +" 'table.type' = 'MERGE_ON_READ'\n" + // MOR類型的表")");// 4、將模擬產生的數據,寫入到Hudi表中tabEnv.executeSql("insert into t2 select * from sourceT");} }

jar包運行

bin/flink run -t yarn-per-job \ -c com.yyds.hudi.flink.HudiTest \ ./myjars/hudi-flink-1.0-SNAPSHOT.jar

類型映射

Flink SQL TypeHudi TypeAvro logical type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYbytes
DECIMALfixeddecimal
TINYINTint
SMALLINTint
INTint
BIGINTlong
FLOATfloat
DOUBLEdouble
DATEintdate
TIMEinttime-millis
TIMESTAMPlongtimestamp-millis
ARRAYarray
MAP(key must be string/char/varchar type)map
MULTISET(element must be string/char/varchar type)map
ROWrecord

5.4 hudi核心參數

5.4.1 去重參數

-- 通過如下語法設置主鍵: -- 設置單個主鍵 create table hoodie_table (f0 int primary key not enforced,f1 varchar(20),... ) with ('connector' = 'hudi',... )-- 設置聯合主鍵 create table hoodie_table (f0 int,f1 varchar(20),...primary key(f0, f1) not enforced ) with ('connector' = 'hudi',... ) 名稱說明默認值備注
hoodie.datasource.write.recordkey.field主鍵字段支持主鍵語法 PRIMARY KEY 設置,支持逗號分隔的多個字段
precombine.field(0.13.0 之前版本為 write.precombine.field)去重時間字段record 合并的時候會按照該字段排序,選值較大的 record 為合并結果;不指定則為處理序:選擇后到的 record

5.4.2 并發參數

名稱說明默認值備注
write.taskswriter 的并發,每個 writer 順序寫 1~N 個 buckets4增加并發對小文件個數沒影響
write.bucket_assign.tasksbucket assigner 的并發Flink的并行度增加并發同時增加了并發寫的 bucekt 數,也就變相增加了小文件(小 bucket) 數
write.index_bootstrap.tasksIndex bootstrap 算子的并發,增加并發可以加快 bootstrap 階段的效率,bootstrap 階段會阻塞 checkpoint,因此需要設置多一些的 checkpoint 失敗容忍次數Flink的并行度只在 index.bootstrap.enabled 為 true 時生效
read.tasks讀算子的并發(batch 和 stream)4
compaction.tasksonline compaction 算子的并發writer 的并發online compaction 比較耗費資源,建議走 offline compaction

可以flink建表時在with中指定,或Hints臨時指定參數的方式:在需要調整的表名后面加上 /*+ OPTIONS() */

案例如下:

insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */ select * from sourceT;# 從下圖可以看出,writer 的并發變成了2,bucket assigner 的并發變成了3,compaction_task 變成了4


可以參考下面Hudi表讀取原理,看上圖。

5.4.3 壓縮參數

? 在線壓縮的參數,通過設置 compaction.async.enabled =false關閉在線壓縮執行,但是調度compaction.schedule.enabled 仍然建議開啟(即上圖的compact_plan_generate步驟),之后通過離線壓縮直接執行 在線壓縮任務 階段性調度的壓縮 plan。

名稱說明默認值備注
compaction.schedule.enabled是否階段性生成壓縮 plantrue建議開啟,即使compaction.async.enabled 關閉的情況下
compaction.async.enabled是否開啟異步壓縮true通過關閉此參數關閉在線壓縮
compaction.tasks壓縮 task 并發4
compaction.trigger.strategy壓縮策略num_commits支持四種策略:num_commits、time_elapsed、num_and_time、num_or_time
compaction.delta_commits默認策略,5 個 commits 壓縮一次5
compaction.delta_seconds3600
compaction.max_memory壓縮去重的 hash map 可用內存100(MB)資源夠用的話建議調整到 1GB
compaction.target_io每個壓縮 plan 的 IO 上限,默認 5GB500(GB)

案例如下:

CREATE TABLE t3(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t3','compaction.async.enabled' = 'true', -- 異步在線壓縮'compaction.tasks' = '1','compaction.schedule.enabled' = 'true', -- 生成壓縮 plan'compaction.trigger.strategy' = 'num_commits', -- 壓縮策略,安裝commit次數進行壓縮'compaction.delta_commits' = '2', -- 2次進行壓縮'table.type' = 'MERGE_ON_READ' );set table.dynamic-table-options.enabled=true; insert into t3 select * from sourceT/*+ OPTIONS('rows-per-second' = '5') */;-- 從hdfs上可以看到,flink發生兩次ck,delta_commit提交兩次后,將log文件進行壓縮,然后生成了parquet文件。

5.4.4 文件大小

? Hudi會自管理文件大小,避免向查詢引擎暴露小文件,其中自動處理文件大小起很大作用。在進行insert/upsert操作時,Hudi可以將文件大小維護在一個指定文件大小。

? 目前只有 log 文件的寫入大小可以做到精確控制,parquet 文件大小按照估算值。

名稱說明默認值備注
hoodie.parquet.max.file.size最大可寫入的 parquet 文件大小120 * 1024 * 1024默認 120MB(單位 byte)超過該大小切新的 file group
hoodie.logfile.to.parquet.compression.ratiolog文件大小轉 parquet 的比率0.35hoodie 統一依據 parquet 大小來評估小文件策略
hoodie.parquet.small.file.limit在寫入時,hudi 會嘗試先追加寫已存小文件,該參數設置了小文件的大小閾值,小于該參數的文件被認為是小文件104857600默認 100MB(單位 byte)大于 100MB,小于 120MB 的文件會被忽略,避免寫過度放大
hoodie.copyonwrite.record.size.estimate預估的 record 大小,hoodie 會依據歷史的 commits 動態估算 record 的大小,但是前提是之前有單次寫入超過 hoodie.parquet.small.file.limit 大小,在未達到這個大小時會使用這個參數1024默認 1KB(單位 byte)如果作業流量比較小,可以設置下這個參數
hoodie.logfile.max.sizeLogFile最大大小。這是在將Log滾轉到下一個版本之前允許的最大大小。1073741824默認1GB(單位 byte)

案例如下:

CREATE TABLE t4(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t4','compaction.tasks' = '1','hoodie.parquet.max.file.size'= '10000', -- 最大可寫入的 parquet 文件大小,設置為10 KB'hoodie.parquet.small.file.limit'='5000', -- 小文件的大小閾值,小于該參數的文件被認為是小文件 設置為5KB'table.type' = 'MERGE_ON_READ' );set table.dynamic-table-options.enabled=true; insert into t4 select * from sourceT /*+ OPTIONS('rows-per-second' = '5')*/;

5.4.5 hadoop參數

從 0.12.0 開始支持,如果有跨集群提交執行的需求,可以通過 sql 的 ddl 指定 per-job 級別的 hadoop 配置

名稱說明默認值備注
hadoop.${you option key}通過 hadoop.前綴指定 hadoop 配置項支持同時指定多個 hadoop 配置項

5.5 內存優化

5.5.1 內存參數

名稱說明默認值備注
write.task.max.size一個 write task 的最大可用內存1024當前預留給 write buffer 的內存為write.task.max.size -compaction.max_memory當 write task 的內存 buffer達到閾值后會將內存里最大的 buffer flush 出去
write.batch.sizeFlink 的寫 task 為了提高寫數據效率,會按照寫 bucket 提前 buffer 數據,每個 bucket 的數據在內存達到閾值之前會一直 cache 在內存中,當閾值達到會把數據 buffer 傳遞給 hoodie 的 writer 執行寫操作256一般不用設置,保持默認值就好
write.log_block.sizehoodie 的 log writer 在收到 write task 的數據后不會馬上 flush 數據,writer 是以 LogBlock 為單位往磁盤刷數據的,在 LogBlock 攢夠之前 records 會以序列化字節的形式 buffer 在 writer 內部128一般不用設置,保持默認值就好
write.merge.max_memoryhoodie 在 COW 寫操作的時候,會有增量數據和 base file 數據 merge 的過程,增量的數據會緩存在內存的 map 結構里,這個 map 是可 spill 的,這個參數控制了 map 可以使用的堆內存大小100一般不用設置,保持默認值就好
compaction.max_memory同 write.merge.max_memory: 100MB 類似,只是發生在壓縮時。100如果是 online compaction,資源充足時可以開大些,比如 1GB

5.5.2 MOR

(1)state backend 換成 rocksdb (默認的 in-memory state-backend 非常吃內存)(2)內存夠的話,compaction.max_memory 調大些 (默認是 100MB 可以調到 1GB)(3)關注 TM 分配給每個 write task 的內存,保證每個 write task 能夠分配到 write.task.max.size 所配置的大小,比如 TM 的內存是 4GB 跑了 2 個 StreamWriteFunction 那每個 write function 能分到 2GB,盡量預留一些 buffer,因為網絡 buffer,TM 上其他類型 task (比如 BucketAssignFunction 也會吃些內存)(4)需要關注 compaction 的內存變化,compaction.max_memory 控制了每個 compaction task 讀 log 時可以利用的內存大小,compaction.tasks 控制了 compaction task 的并發注意: write.task.max.size - compaction.max_memory 是預留給每個 write task 的內存 buffer

5.5.3 COW

(1)state backend 換成 rocksdb(默認的 in-memory state-backend 非常吃內存)。(2)write.task.max.size 和 write.merge.max_memory 同時調大(默認是 1GB 和 100MB 可以調到 2GB 和 1GB)。(3)關注 TM 分配給每個 write task 的內存,保證每個 write task 能夠分配到 write.task.max.size 所配置的大小,比如 TM 的內存是 4GB 跑了 2 個 StreamWriteFunction 那每個 write function 能分到 2GB,盡量預留一些 buffer,因為網絡 buffer,TM 上其他類型 task(比如 BucketAssignFunction 也會吃些內存)。注意:write.task.max.size - write.merge.max_memory 是預留給每個 write task 的內存 buffer。

5.6 讀取方式

5.6.1 流讀

? 當前表默認是快照讀取,即讀取最新的全量快照數據并一次性返回。通過參數read.streaming.enabled 參數開啟流讀模式,通過 read.start-commit 參數指定起始消費位置,支持指定 earliest 從最早消費。

名稱Required默認值說明
read.streaming.enabledfalsefalse設置 true 開啟流讀模式
read.start-commitfalse最新 commit指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(閉區間)
read.streaming.skip_compactionfalsefalse流讀時是否跳過 compaction 的 commits,跳過 compaction 有兩個用途:1)避免 upsert 語義下重復消費 (compaction 的 instant 為重復數據,如果不跳過,有小概率會重復消費) 2) changelog 模式下保證語義正確性 0.11 開始,以上兩個問題已經通過保留 compaction 的 instant time 修復
clean.retain_commitsfalse10cleaner 最多保留的歷史 commits 數,大于此數量的歷史 commits 會被清理掉,changelog 模式下,這個參數可以控制 changelog 的保留時間,例如 checkpoint 周期為 5 分鐘一次,默認最少保留 50 分鐘的時間。
set sql-client.execution.result-mode=tableau;CREATE TABLE t5(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t5','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4' -- 默認60s );insert into t5 select * from sourceT;select * from t5;-- 如下圖,能夠不斷的獲取數據

5.6.2 增量讀取

0.10.0 開始支持。如果有增量讀取 batch 數據的需求,增量讀取包含三種場景。 (1)Stream 增量消費,通過參數 read.start-commit 指定起始消費位置; (2)Batch 增量消費,通過參數 read.start-commit 指定起始消費位置,通過參數 read.end-commit 指定結束消費位置,區間為閉區間,即包含起始、結束的 commit (3)TimeTravel:Batch 消費某個時間點的數據:通過參數 read.end-commit 指定結束消費位置即可(由于起始位置默認從最新,所以無需重復聲明) 名稱Required默認值說明
read.start-commitfalse默認從最新 commit支持 earliest 從最早消費
read.end-commitfalse默認到最新 commit

5.6.3 限流

? 如果將全量數據(百億數量級) 和增量先同步到 kafka,再通過 flink 流式消費的方式將庫表數據直接導成 hoodie 表,因為直接消費全量部分數據:量大(吞吐高)、亂序嚴重(寫入的 partition 隨機),會導致寫入性能退化,出現吞吐毛刺,這時候可以開啟限速參數,保證流量平穩寫入。

名稱Required默認值說明
write.rate.limitfalse0默認關閉限速

5.7 寫入方式

5.7.1 通過flink-cdc進行寫入

CDC 數據保存了完整的數據庫變更,當前可通過兩種途徑將數據導入 hudi

第一種:通過 cdc-connector 直接對接 DB 的 binlog 將數據導入 hudi,優點是不依賴消息隊列,缺點是對 db server 造成壓力。第二種:對接 cdc format 消費 kafka 數據導入 hudi,優點是可擴展性強,缺點是依賴 kafka。注意:如果上游數據無法保證順序,需要指定 write.precombine.field 字段。

1)準備MySQL表

(1)MySQL開啟binlog

(2)建表

create database test; use test;create table stu3 (id int unsigned auto_increment primary key COMMENT '自增id',name varchar(20) not null comment '學生名字',school varchar(20) not null comment '學校名字',nickname varchar(20) not null comment '學生小名',age int not null comment '學生年齡',class_num int not null comment '班級人數',phone bigint not null comment '電話號碼',email varchar(64) comment '家庭網絡郵箱',ip varchar(32) comment 'IP地址') engine=InnoDB default charset=utf8;

2)flink讀取mysql binlog并寫入kafka

(1)創建MySQL表

create table stu3_binlog(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) with ('connector' = 'mysql-cdc','hostname' = 'centos01','port' = '3306','username' = 'root','password' = 'root','database-name' = 'test','table-name' = 'stu3' );

(2)創建Kafka表

create table stu3_binlog_sink_kafka(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) with ('connector' = 'upsert-kafka','topic' = 'cdc_mysql_stu3_sink','properties.zookeeper.connect' = 'centos01:2181','properties.bootstrap.servers' = 'centos01:9092','key.format' = 'json','value.format' = 'json');

(3)將mysql binlog日志寫入kafka

insert into stu3_binlog_sink_kafka select * from stu3_binlog;

3)flink讀取kafka數據并寫入hudi數據湖

(1)創建kafka源表

create table stu3_binlog_source_kafka(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string) with ('connector' = 'kafka','topic' = 'cdc_mysql_stu3_sink','properties.bootstrap.servers' = 'hadoop1:9092','format' = 'json','scan.startup.mode' = 'earliest-offset','properties.group.id' = 'testGroup');

(2)創建hudi目標表

create table stu3_binlog_sink_hudi(id bigint not null,name string,`school` string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) partitioned by (`school`) with ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/stu3_binlog_sink_hudi','table.type' = 'MERGE_ON_READ','write.option' = 'insert','write.precombine.field' = 'school');

(3)將kafka數據寫入到hudi中

insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka;

5.7.2 離線批量導入

如果存量數據來源于其他數據源,可以使用批量導入功能,快速將存量數據導成 Hoodie 表格式。(1)批量導入省去了 avro 的序列化以及數據的 merge 過程,后續不會再有去重操作,數據的唯一性需要自己來保證。 (2)bulk_insert 需要在 Batch Execuiton Mode 下執行更高效,Batch 模式默認會按照 partition path 排序輸入消息再寫入 Hoodie,避免 file handle 頻繁切換導致性能下降。SET execution.runtime-mode = batch; SET execution.checkpointing.interval = 0;(3)bulk_insert write task 的并發通過參數 write.tasks 指定,并發的數量會影響到小文件的數量,理論上,bulk_insert write task 的并發數就是劃分的 bucket 數,當然每個 bucket 在寫到文件大小上限(parquet 120 MB)的時候會 roll over 到新的文件句柄,所以最后:寫文件數量 >= bulk_insert write task 數。 名稱Required默認值說明
write.operationTRUEupsert配置 bulk_insert 開啟該功能
write.tasksFALSE4bulk_insert 寫 task 的并發,最后的文件數 >=write.tasks
write.bulk_insert.shuffle_by_partitionwrite.bulk_insert.shuffle_input(從 0.11 開始)FALSETRUE是否將數據按照 partition 字段 shuffle 再通過 write task 寫入,開啟該參數將減少小文件的數量 但是可能有數據傾斜風險
write.bulk_insert.sort_by_partitionwrite.bulk_insert.sort_input(從 0.11 開始)FALSETRUE是否將數據線按照 partition 字段排序再寫入,當一個 write task 寫多個 partition,開啟可以減少小文件數量
write.sort.memory128sort 算子的可用 managed memory(單位 MB)

5.7.3 全量接增量

如果已經有全量的離線 Hoodie 表,需要接上實時寫入,并且保證數據不重復,可以開啟 index bootstrap 功能。

如果覺得流程冗長,可以在寫入全量數據的時候資源調大直接走流模式寫,全量走完接新數據再將資源調小(或者開啟限流功能)。

名稱Required默認值說明
index.bootstrap.enabledtruefalse開啟索引加載,會將已存表的最新數據一次性加載到 state 中
index.partition.regexfalse*設置正則表達式進行分區篩選,默認為加載全部分區
使用流程 (1) CREATE TABLE 創建和 Hoodie 表對應的語句,注意 table type 要正確 (2)設置 index.bootstrap.enabled = true開啟索引加載功能 (3)重啟任務將 index.bootstrap.enabled 關閉,參數配置到合適的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并發不同,可以重啟避免 shuffle說明: 索引加載為并發加載,根據數據量大小加載時間不同,可以在log中搜索 finish loading the index under partition 和 Load records from file 日志來觀察索引加載的進度

5.8 寫入模式

5.8.1 Changelog 模式

? 如果希望 Hoodie 保留消息的所有變更(I/-U/U/D),之后接上 Flink 引擎的有狀態計算實現全鏈路近實時數倉生產(增量計算),Hoodie 的 MOR 表通過行存原生支持保留消息的所有變更(format 層面的集成),通過流讀 MOR 表可以消費到所有的變更記錄。

1)WITH 參數

名稱Required默認值說明
changelog.enabledfalsefalse默認是關閉狀態,即 UPSERT 語義,所有的消息僅保證最后一條合并消息,中間的變更可能會被 merge 掉;改成 true 支持消費所有變更。

? 批(快照)讀仍然會合并所有的中間結果,不管 format 是否已存儲中間狀態。

? 開啟 changelog.enabled 參數后,中間的變更也只是 Best Effort: 異步的壓縮任務會將中間變更合并成 1 條,所以如果流讀消費不夠及時,被壓縮后只能讀到最后一條記錄。當然,通過調整壓縮的 buffer 時間可以預留一定的時間 buffer 給 reader,比如調整壓縮的兩個參數:

? ? compaction.delta_commits:5

? ? compaction.delta_seconds: 3600。

說明:

Changelog 模式開啟流讀的話,要在 sql-client 里面設置參數:

set sql-client.execution.result-mode=tableau;

或者

set sql-client.execution.result-mode=changelog;

否則中間結果在讀的時候會被直接合并。

2)流讀 changelog

僅在 0.10.0 支持,本 feature 為實驗性。

開啟 changelog 模式后,hudi 會保留一段時間的 changelog 供下游 consumer 消費,我們可以通過流讀 ODS 層 changelog 接上 ETL 邏輯寫入到 DWD 層,如下圖的 pipeline:

? 流讀的時候我們要注意 changelog 有可能會被 compaction 合并掉,中間記錄會消除,可能會影響計算結果,需要關注sql-client的屬性(result-mode)同上。

3)案例演示

(1)使用changelog

set sql-client.execution.result-mode=tableau; CREATE TABLE t6(id int,ts int,primary key (id) not enforced ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '4','changelog.enabled' = 'true' );insert into t6 values (1,1); insert into t6 values (1,2);set table.dynamic-table-options.enabled=true; select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

(2)不使用changelog

CREATE TABLE t6_v(id int,ts int,primary key (id) not enforced ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4' );select * from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/; select count(*) from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;

5.8.2 Append 模式

從 0.10 開始支持

對于 INSERT 模式:

? ? MOR 默認會 apply 小文件策略: 會追加寫 avro log 文件

? ? COW 每次直接寫新的 parquet 文件,沒有小文件策略

Hudi 支持豐富的 Clustering 策略,優化 INSERT 模式下的小文件問題:

1)Inline Clustering

只有 Copy On Write 表支持該模式

名稱Required默認值說明
write.insert.clusterfalsefalse是否在寫入時合并小文件,COW 表默認 insert 寫不合并小文件,開啟該參數后,每次寫入會優先合并之前的小文件(不會去重),吞吐會受影響

2) Async Clustering

? 從 0.12 開始支持

(1)WITH參數

名稱Required默認值說明
clustering.schedule.enabledfalsefalse是否在寫入時定時異步調度 clustering plan,默認關閉
clustering.delta_commitsfalse4調度 clsutering plan 的間隔 commits,clustering.schedule.enabled 為 true 時生效
clustering.async.enabledfalsefalse是否異步執行 clustering plan,默認關閉
clustering.tasksfalse4Clustering task 執行并發
clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 單文件目標大小,默認 1GB
clustering.plan.strategy.small.file.limitfalse600小于該大小的文件才會參與 clustering,默認600MB
clustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段
clustering.plan.partition.filter.modefalseNONE支持NONE:不做限制RECENT_DAYS:按時間(天)回溯SELECTED_PARTITIONS:指定固定的 partition
clustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默認 2 天

(2)Clustering Plan Strategy

? 支持定制化的 clustering 策略。

名稱Required默認值說明
clustering.plan.partition.filter.modefalseNONE支持· NONE:不做限制· RECENT_DAYS:按時間(天)回溯· SELECTED_PARTITIONS:指定固定的 partition
clustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默認 2 天
clustering.plan.strategy.cluster.begin.partitionfalseN/ASELECTED_PARTITIONS 生效,指定開始 partition(inclusive)
clustering.plan.strategy.cluster.end.partitionfalseN/ASELECTED_PARTITIONS 生效,指定結束 partition(incluseve)
clustering.plan.strategy.partition.regex.patternfalseN/A正則表達式過濾 partitions
clustering.plan.strategy.partition.selectedfalseN/A顯示指定目標 partitions,支持逗號 , 分割多個 partition

5.9 Bucket索引

? 默認的 flink 流式寫入使用 state 存儲索引信息:primary key 到 fileId 的映射關系。當數據量比較大的時候,state的存儲開銷可能成為瓶頸,bucket 索引通過固定的 hash 策略,將相同 key 的數據分配到同一個 fileGroup 中,避免了索引的存儲和查詢開銷。

名稱Required默認值說明
index.typefalseFLINK_STATE設置 BUCKET 開啟 Bucket 索引功能
hoodie.bucket.index.hash.fieldfalse主鍵可以設置成主鍵的子集
hoodie.bucket.index.num.bucketsfalse4默認每個 partition 的 bucket 數,當前設置后則不可再變更。
(1)bucket index 沒有 state 的存儲計算開銷,性能較好 (2)bucket index 無法擴 buckets,state index 則可以依據文件的大小動態擴容 (3)bucket index 不支持跨 partition 的變更(如果輸入是 cdc 流則沒有這個限制),state index 沒有限制

5.10 Hudi Catalog

? 從 0.12.0 開始支持,通過 catalog 可以管理 flink 創建的表,避免重復建表操作,另外 hms 模式的 catalog 支持自動補全 hive 同步參數。

-- DFS 模式 Catalog SQL樣例: CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默認路徑}','mode'='dfs' );-- Hms 模式 Catalog SQL 樣例: CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默認路徑}','hive.conf.dir' = '${hive-site.xml 所在的目錄}','mode'='hms' -- 支持 'dfs' 模式通過文件系統管理表屬性); 名稱Required默認值說明
catalog.pathtrue默認的 catalog 根路徑,用作表路徑的自動推導,默認的表路徑: c a t a l o g . p a t h / {catalog.path}/ catalog.path/{db_name}/${table_name}
default-databasefalsedefault默認的 database 名
hive.conf.dirfalsehive-site.xml 所在的目錄,只在 hms 模式下生效
modefalsedfs支持 hms模式通過 hive 管理元數據
table.externalfalsefalse是否創建外部表,只在 hms 模式下生效

案例如下:

--(1)創建sql-client初始化sql文件 vim /opt/apps/flink-1.13.6/conf/sql-client-init.sqlCREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '/tmp/hudi_catalog','mode'='dfs' );USE CATALOG hoodie_catalog; --(2)指定sql-client啟動時加載sql文件 hadoop fs -mkdir /tmp/hudi_catalogbin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session --(3)建庫建表插入 create database test; use test;create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20), primary key (uuid) not enforced ) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t2','table.type' = 'MERGE_ON_READ' );insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a'); --(4)退出sql-client,重新進入,表信息還在 use test; show tables; select * from t2;

5.11 離線壓縮

? MOR 表的 compaction 默認是自動打開的,策略是 5 個 commits 執行一次壓縮。 因為壓縮操作比較耗費內存,和寫流程放在同一個 pipeline,在數據量比較大的時候(10w+/s qps),容易干擾寫流程,此時采用離線定時任務的方式執行 compaction 任務更穩定。

5.11.1 設置參數

? compaction.async.enabled 為 false,關閉在線 compaction。

? compaction.schedule.enabled 仍然保持開啟,由寫任務階段性觸發壓縮 plan。

5.11.2 原理

一個 compaction 的任務的執行包括兩部分:

? schedule 壓縮 plan

該過程推薦由寫任務定時觸發,寫參數 compaction.schedule.enabled 默認開啟

? 執行對應的壓縮 plan

5.11.3 使用方式

1)執行命令

離線 compaction 需要手動執行 Java 程序,程序入口:

? hudi-flink1.13-bundle-0.12.0.jar

? org.apache.hudi.sink.compact.HoodieFlinkCompactor

# 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:9000/table

2)參數配置

參數名required默認值備注
–pathtrue目標表的路徑
–compaction-tasksfalse-1壓縮 task 的并發,默認是待壓縮 file group 的數量
–compaction-max-memoryfalse100 (單位 MB)壓縮時 log 數據的索引 map,默認 100MB,內存足夠可以開大些
–schedulefalsefalse是否要執行 schedule compaction 的操作,當寫流程還在持續寫入表數據的時候,開啟這個參數有丟失查詢數據的風險,所以開啟該參數一定要保證當前沒有任務往表里寫數據, 寫任務的 compaction plan 默認是一直 schedule 的,除非手動關閉(默認 5 個 commits 一次壓縮)
–seqfalseLIFO執行壓縮任務的順序,默認是從最新的壓縮 plan 開始執行,可選值:LIFO: 從最新的 plan 開始執行;FIFO: 從最老的 plan 開始執行
–servicefalsefalse是否開啟 service 模式,service 模式會打開常駐進程,一直監聽壓縮任務并提交到集群執行(從 0.11 開始執行)
–min-compaction-interval-secondsfalse600 (單位 秒)service 模式下的執行間隔,默認 10 分鐘

案例如下:

create table t7(id int,ts int,primary key (id) not enforced ) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t7','compaction.async.enabled' = 'false', -- 關閉自動壓縮'compaction.schedule.enabled' = 'true', -- 由寫任務階段性觸發壓縮 plan'table.type' = 'MERGE_ON_READ' );insert into t7 values(1,1); insert into t7 values(2,2); insert into t7 values(3,3); insert into t7 values(4,4); insert into t7 values(5,5);// 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t7

5.12 離線 Clustering

? 異步的 clustering 相對于 online 的 async clustering 資源隔離,從而更加穩定。

5.12.1 設置參數

? clustering.async.enabled 為 false,關閉在線 clustering。

? clustering.schedule.enabled 仍然保持開啟,由寫任務階段性觸發 clustering plan。

5.12.2 原理

一個 clustering 的任務的執行包括兩部分:

? schedule plan

推薦由寫任務定時觸發,寫參數 clustering.schedule.enabled 默認開啟。

? 執行對應的 plan

5.12.3 使用方式

1)執行命令

離線 clustering 需要手動執行 Java 程序,程序入口:

? hudi-flink1.13-bundle-0.12.0.jar

? org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob

注意:必須是分區表,否則報錯空指針異常。

# 命令行的方式./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/table

2)參數配置

參數名required默認值備注
–pathtrue目標表的路徑。
–clustering-tasksfalse-1Clustering task 的并發,默認是待壓縮 file group 的數量。
–schedulefalsefalse是否要執行 schedule clustering plan 的操作,當寫流程還在持續寫入表數據的時候,開啟這個參數有丟失查詢數據的風險,所以開啟該參數一定要保證當前沒有任務往表里寫數據, 寫任務的 clustering plan 默認是一直 schedule 的,除非手動關閉(默認 4 個 commits 一次 clustering)。
–seqfalseFIFO執行壓縮任務的順序,默認是從最老的 clustering plan 開始執行,可選值:LIFO: 從最新的 plan 開始執行;FIFO: 從最老的 plan 開始執行
–target-file-max-bytesfalse1024 * 1024 * 1024最大目標文件,默認 1GB。
–small-file-limitfalse600小于該大小的文件會參與 clustering,默認 600MB。
–sort-columnsfalseN/AClustering 可選排序列。
–servicefalsefalse是否開啟 service 模式,service 模式會打開常駐進程,一直監聽壓縮任務并提交到集群執行(從 0.11 開始執行)。
–min-compaction-interval-secondsfalse600 (單位 秒)service 模式下的執行間隔,默認 10 分鐘。

3)案例演示

create table t8(id int,age int,ts int,primary key (id) not enforced ) partitioned by (age) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t8','clustering.async.enabled' = 'false','clustering.schedule.enabled' = 'true','table.type' = 'COPY_ON_WRITE' );insert into t8 values(1,18,1); insert into t8 values(2,18,2); insert into t8 values(3,18,3); insert into t8 values(4,18,4); insert into t8 values(5,18,5);-- 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t8

5.12.4 常見問題

# 存儲一直看不到數據如果是 streaming 寫,請確保開啟 checkpoint,Flink 的 writer 有 3 種刷數據到磁盤的策略: ?當某個 bucket 在內存積攢到一定大小 (可配,默認 64MB) ?當總的 buffer 大小積攢到一定大小(可配,默認 1GB) ?當 checkpoint 觸發,將內存里的數據全部 flush 出去# 數據有重復如果是 COW 寫,需要開啟參數 write.insert.drop.duplicates,COW 寫每個 bucket 的第一個文件默認是不去重的,只有增量的數據會去重,全局去重需要開啟該參數;MOR 寫不需要開啟任何參數,定義好 primary key 后默認全局去重。(注意:從 0.10 版本開始,該屬性改名 write.precombine 并且默認為 true。)如果需要多 partition 去重,需要開啟參數: index.global.enabled 為 true。(注意:從 0.10 版本開始,該屬性默認true。)索引 index 是判斷數據重復的核心數據結構,index.state.ttl 設置了索引保存的時間,默認為 1.5 天,對于長時間周期的更新,比如更新一個月前的數據,需要將 index.state.ttl 調大(單位天),設置小于 0 代表永久保存。(注意:從 0.10 版本開始,該屬性默認為 0。)# Merge On Read 寫只有 log 文件Merge On Read 默認開啟了異步的 compaction,策略是 5 個 commits 壓縮一次,當條件滿足參會觸發壓縮任務,另外,壓縮本身因為耗費資源,所以不一定能跟上寫入效率,可能會有滯后。

5.13 Hudi核心原理

5.13.1 Hudi數據去重原理

Hoodie 的數據去重分兩步:

(1)寫入前攢 buffer 階段去重,核心接口HoodieRecordPayload#preCombine

(2)寫入過程中去重,核心接口HoodieRecordPayload#combineAndGetUpdateValue。

1)消息版本新舊

? 相同 record key (主鍵)的數據通過write.precombine.field指定的字段來判斷哪個更新,即 precombine 字段更大的 record 更新,如果是相等的 precombine 字段,則后來的數據更新。

? 從 0.10 版本開始,write.precombine.field 字段為可選,如果沒有指定,會看 schema 中是否有 ts 字段,如果有,ts 字段被選為 precombine 字段;如果沒有指定,schema 中也沒有 ts 字段,則為處理順序:后來的消息默認較新。

2)攢消息階段的去重

? Hoodie 將 buffer 消息發給 write handle 之前可以執行一次去重操作,通過HoodieRecordPayload#preCombine 接口,保留 precombine 字段較大的消息,此操作為純內存的計算,在同一個 write task 中為單并發執行。

? 注意:write.precombine 選項控制了攢消息的去重。

3)寫 parquet 增量消息的去重

? 在Hoodie 寫入流程中,Hoodie 每寫一個 parquet 都會有 base + 增量 merge 的過程,增量的消息會先放到一個 spillable map 的數據結構構建內存 index,這里的增量數據如果沒有提前去重,那么同 key 的后來消息會直接覆蓋先來的消息。

? Writer 接著掃 base 文件,過程中會不斷查看內存 index 是否有同 key 的新消息,如果有,會走 HoodieRecordPayload#combineAndGetUpdateValue 接口判斷保留哪個消息。

? 注意: MOR 表的 compaction 階段和 COW 表的寫入流程都會有 parquet 增量消息去重的邏輯。

4)跨 partition 的消息去重

? 默認情況下,不同的 partition 的消息是不去重的,即相同的 key 消息,如果新消息換了 partition,那么老的 partiiton 消息仍然保留。

? 開啟 index.global.enabled 選項開啟跨 partition 去重,原理是先往老的 partiton 發一條刪除消息,再寫新 partition。

5.13.2 Hudi表寫入原理

數據寫入、數據壓縮與數據清理

1)數據寫入分析 (1)基礎數據封裝:將數據流中flink的RowData封裝成Hoodie實體; (2)BucketAssigner:桶分配器,主要是給數據分配寫入的文件地址:若為插入操作,則取大小最小的FileGroup對應的FileId文件內進行插入;在此文件的后續寫入中文件 ID 保持不變,并且提交時間會更新以顯示最新版本。這也意味著記錄的任何特定版本,給定其分區路徑,都可以使用文件 ID 和 instantTime進行唯一定位;若為更新操作,則直接在當前location進行數據更新; (3)Hoodie Stream Writer: 數據寫入,將數據緩存起來,在超過設置的最大flushSize或是做checkpoint時進行刷新到文件中; (4)Oprator Coordinator:主要與Hoodie Stream Writer進行交互,處理checkpoint等事件,在做checkpoint時,提交instant到timeLine上,并生成下一個instant的時間,算法為取當前最新的commi時間,比對當前時間與commit時間,若當前時間大于commit時間,則返回,否則一直循環等待生成。2)數據壓縮壓縮(compaction)用于在 MergeOnRead存儲類型時將基于行的log日志文件轉化為parquet列式數據文件,用于加快記錄的查找。compaction首先會遍歷各分區下最新的parquet數據文件和其對應的log日志文件進行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance:具體策略分為4種,具體見官網說明: compaction.trigger.strategy: Strategy to trigger compaction, options are 1.'num_commits': trigger compaction when reach N delta commits; 2.'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 3.'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 4.'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits' Default Value: num_commits (Optional)在項目實踐中需要注意參數'read.streaming.skip_compaction' 參數的配置,其表示在流式讀取該表是否跳過壓縮后的數據,若該表用于后續聚合操作表的輸入表,則需要配置值為true,表示聚合操作表不再消費讀取壓縮數據。若不配置或配置為false,則該表中的數據在未被壓縮之前被聚合操作表讀取了一次,在壓縮后數據又被讀取一次,會導致聚合表的sum、count等算子結果出現雙倍情況。3)數據清理隨著用戶向表中寫入更多數據,對于每次更新,Hudi會生成一個新版本的數據文件用于保存更新后的記錄(COPY_ON_WRITE)或將這些增量更新寫入日志文件以避免重寫更新版本的數據文件(MERGE_ON_READ)。在這種情況下,根據更新頻率,文件版本數可能會無限增長,但如果不需要保留無限的歷史記錄,則必須有一個流程(服務)來回收舊版本的數據,這就是 Hudi 的清理服務。具體清理策略可參考官網,一般使用的清理策略為:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 個文件版本而不受時間限制的效果。會刪除N之外的FileSlice。

5.13.3 Hudi表讀取原理

(1)開啟split_monitor算子,每隔N秒(可配置)監聽TimeLine上變化,并將變更的Instance封裝為FileSlice。

(2)分發log文件時候,按照fileId值進行keyBy,保證同一file group下數據文件都給一個Task進行處理,從而保證數據處理的有序性。

(3)split_reader根據FileSlice信息進行數據讀取。

總結

以上是生活随笔為你收集整理的数据湖架构Hudi(五)Hudi集成Flink案例详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

国产精品乱码久久久 | 免费色视频在线 | 国产精品美女网站 | 黄色精品网站 | 色综合在| 日本中文字幕视频 | 97国产在线视频 | 成人av免费电影 | 操操操影院| va视频在线观看 | 亚洲精品国 | 亚洲欧洲一级 | 2021久久 | 一本—道久久a久久精品蜜桃 | 亚洲精品美女久久久 | 亚洲综合色站 | 久久激情小视频 | 91香蕉视频黄| 996久久国产精品线观看 | 91麻豆国产 | 亚洲午夜小视频 | 久久天天躁狠狠躁亚洲综合公司 | 97在线公开视频 | 日韩精品国产一区 | 天天色宗合 | 国产精品mv| 久99视频 | 四虎在线免费观看视频 | www国产亚洲精品久久麻豆 | 精品亚洲国产视频 | 日韩在线观看电影 | 人人射人人澡 | 天天做日日做天天爽视频免费 | 99re8这里有精品热视频免费 | 国产精品毛片 | 国产亚洲精品美女 | 91精品久久久久久久91蜜桃 | 国产精品xxxx18a99| 国产成人精品亚洲日本在线观看 | 国产一级二级视频 | 99国产精品一区二区 | 九九交易行官网 | 黄色软件在线观看免费 | 精品国产成人在线影院 | 91色在线观看 | 国产专区视频在线观看 | av一区在线播放 | 天天干夜夜干 | 久久精品欧美视频 | 久久久精品国产免费观看一区二区 | 97成人精品视频在线播放 | 欧美日韩一区二区免费在线观看 | 国产精品久久久久四虎 | 在线观看岛国av | 九九热在线精品视频 | 久久久久国产一区二区三区四区 | 欧美日韩在线观看一区 | 国产福利在线免费 | av一级片网站 | 久艹视频在线观看 | 人人玩人人添人人澡超碰 | 欧美少妇18p| 久久99最新地址 | 欧美久久久久久久久久久久久 | 99在线看| 最新av在线播放 | 亚洲国产成人久久综合 | 超碰av免费| 五月综合色 | 国产精品免费观看久久 | 99久久精品免费一区 | 一区在线观看 | 日本免费久久高清视频 | www.xxxx变态.com | av电影 一区二区 | 亚洲影院一区 | 成人免费看视频 | 久久久久看片 | 久99久精品 | 欧美精品一区在线发布 | 在线免费观看国产 | 久久女同性恋中文字幕 | www久久久久 | 婷婷爱五月天 | 日韩在线观看视频在线 | 久草视频免费在线播放 | 看v片| 日韩视频1区 | 日韩中文字幕第一页 | 日本天天色| 热99在线视频 | 国产色道| 色综合久久综合中文综合网 | 91麻豆精品国产自产在线游戏 | 天天综合网天天综合色 | 久草视频资源 | 人人看人人艹 | 丁香六月综合网 | 久久精彩免费视频 | 99视屏 | 精品日韩中文字幕 | 三上悠亚在线免费 | 天天做夜夜做 | 日日干,天天干 | 国产精品第三页 | 日本黄色a级大片 | 国产剧情一区二区 | 中文字幕黄色 | 久久久午夜精品福利内容 | 亚洲国产成人在线观看 | 在线观看你懂的网址 | 亚洲夜夜网 | 色婷婷综合久久久 | 韩日电影在线免费看 | 亚洲一区 影院 | 天天操天天干天天爱 | 久久久久久久久久久网 | 丁香六月婷婷开心婷婷网 | 亚洲 欧洲 国产 精品 | 久久久精品一区二区 | 欧美一级片在线免费观看 | 米奇影视7777 | 日韩资源在线观看 | 在线观看不卡视频 | 色吊丝在线永久观看最新版本 | 国产精品mv在线观看 | 一区二区精品视频 | 99视频精品免费视频 | 91精品婷婷国产综合久久蝌蚪 | 激情五月婷婷综合 | 2021国产精品 | 天堂va在线观看 | 日本久久高清视频 | 999国内精品永久免费视频 | 夜色成人av | 三级在线国产 | 蜜桃av人人夜夜澡人人爽 | 欧美大片aaa | 国产激情久久久 | 欧美日韩视频在线观看免费 | 亚洲高清在线观看视频 | 成人91在线| av网站免费在线 | 成人a在线| 久久精品视频网址 | 国产精品免费在线观看视频 | 黄色一级在线观看 | 国产九色91 | 国产视频2 | 在线观看一区 | 成人av资源网站 | 久久久www成人免费毛片麻豆 | 久久美女高清视频 | 欧美日韩高清在线观看 | 久久久www成人免费精品张筱雨 | 亚洲精品国产精品国自产 | 国产毛片aaa | 亚洲国产大片 | 亚洲欧洲国产视频 | 又色又爽又激情的59视频 | 日韩手机在线观看 | 成人免费在线视频 | 天天看天天干 | 天天综合久久 | 亚洲欧美激情精品一区二区 | 一区二区欧美激情 | 亚洲成年人av | 日日夜夜狠狠干 | 中文字幕在线中文 | 99视频在线免费播放 | 中文字幕在线播放视频 | 欧美日韩一区二区视频在线观看 | 中文字幕欧美日韩va免费视频 | 国产手机视频在线 | 成人影片在线免费观看 | 日韩欧美精品一区二区三区经典 | 亚洲丝袜中文 | 五月婷婷亚洲 | 国产精品美女久久久 | 色婷婷精品大在线视频 | 精品视频国产一区 | 黄色一级大片在线免费看产 | 九九热在线视频 | 久久这里只有精品久久 | 欧美性色黄 | 日本久久久亚洲精品 | 97综合在线 | 久久精品国产一区二区三区 | 十八岁以下禁止观看的1000个网站 | 99久热在线精品视频成人一区 | 国产成人av福利 | 精品一二 | 天天干天天操av | 国产精品久久久久久久久免费看 | 中文字幕在线观看播放 | 美女国产| 亚洲欧美日韩精品久久久 | 日韩一二区在线 | 麻豆久久一区二区 | 国产精品精品国产色婷婷 | 亚洲第一区在线观看 | 综合久久久久久久 | 亚洲综合在线一区二区三区 | 久久成人国产精品免费软件 | 日本性高潮视频 | 欧美激情视频一区二区三区 | 成人影音在线 | 中文字幕美女免费在线 | 国产精品99在线播放 | 国产精品1区2区3区在线观看 | 一区二区三区在线影院 | 日韩av电影手机在线观看 | 精品v亚洲v欧美v高清v | 久久免费公开视频 | 国产精品中文在线 | 日韩影片在线观看 | 免费在线黄网 | 日韩精品无 | 国产欧美精品一区二区三区四区 | 欧美激情第十页 | 亚洲综合在线播放 | 欧美夫妻性生活电影 | 日韩丝袜| 91完整版 | 精品久久久久久国产 | 亚洲精品免费在线观看视频 | 性色av一区二区三区在线观看 | www.黄色片网站 | 六月婷婷久香在线视频 | 国产美女在线精品免费观看 | 亚洲国产中文在线观看 | 青青草国产精品视频 | 天天操天天射天天爽 | 国内精品久久久久影院优 | 91最新视频| a在线观看免费视频 | 日本黄色免费观看 | 国产黑丝袜在线 | 免费高清在线视频一区· | 日韩免费不卡av | 99精品视频在线看 | 特级毛片网 | www.婷婷com| 国产一级免费在线观看 | 国产精品成久久久久 | 久久美女免费视频 | av中文字幕第一页 | av三级在线播放 | 亚洲精品一区二区三区四区高清 | 日韩精品一区二区在线观看视频 | 激情图片久久 | 99国产成+人+综合+亚洲 欧美 | 国产一区二区三区免费在线 | 久草在线精品观看 | 97免费在线观看视频 | 天天躁日日躁狠狠 | 久久久久久久av麻豆果冻 | 99久久久久久国产精品 | 日韩欧美一区二区三区在线观看 | 国产69熟 | 久久精品视频网 | 欧美va日韩va | 深爱激情站| 久久久久亚洲a | 国产成人精品av在线 | 精品国产亚洲一区二区麻豆 | 国内精品视频在线 | 国产亚洲情侣一区二区无 | 亚洲精品动漫在线 | 日韩欧美在线影院 | 国产成人精品免费在线观看 | 中文字幕最新精品 | 激情综合网天天干 | 免费h视频 | 色综合久久精品 | 久久撸在线视频 | 午夜精品99久久免费 | 国产成人av一区二区三区在线观看 | www.香蕉视频在线观看 | 在线免费观看欧美日韩 | 免费毛片一区二区三区久久久 | 综合国产在线观看 | 超碰在线免费福利 | 麻豆91在线观看 | av在线成人 | 黄色精品国产 | 天天综合亚洲 | 国产精品免费视频一区二区 | 国产美女网站视频 | 日本三级香港三级人妇99 | 久草网在线观看 | 日韩理论电影网 | 成人小视频在线观看免费 | 成 人 黄 色视频免费播放 | 福利网址在线观看 | 天天操天天添 | 日本最新中文字幕 | 免费看的黄色片 | 国产精品18久久久久白浆 | 中文字幕在线观看完整版电影 | 91丨九色丨蝌蚪丨老版 | 欧美小视频在线观看 | 国产精品毛片一区二区 | 亚洲专区免费观看 | 婷婷综合五月天 | 一级欧美黄| 成年人视频在线免费播放 | 欧美日韩在线免费观看视频 | 日韩字幕 | 麻豆视频网址 | 91中文字幕在线视频 | 91精品看片 | 国产我不卡 | 在线观看日韩视频 | 色 免费观看 | 久久精品这里都是精品 | 97超碰在线免费 | 欧美成年人在线观看 | 亚洲国产色一区 | 亚洲精品视频在线观看免费视频 | 国产精品一区二区三区在线播放 | 三级黄色免费片 | 久久黄色小说视频 | 亚洲欧洲在线视频 | 性色av免费在线观看 | 天天看天天干 | 在线日韩视频 | 国产一区视频免费在线观看 | 伊人宗合网 | 亚洲色图av | 久久精品视频2 | 2018亚洲男人天堂 | 又黄又爽免费视频 | 欧美整片sss | 黄色视屏在线免费观看 | 91精品视频观看 | 亚洲精品美女在线 | 天天鲁一鲁摸一摸爽一爽 | 国产97视频在线 | 欧美激情综合色综合啪啪五月 | 国产一级电影在线 | 久久99久久久久久 | 99久高清在线观看视频99精品热在线观看视频 | 五月激情电影 | 精品成人在线 | 在线免费黄色av | 色婷婷成人网 | 深爱激情站| 免费av网站在线看 | 久久综合婷婷综合 | 99综合电影在线视频 | 日韩一区二区三区免费电影 | 亚洲一级黄色大片 | 国产精品免费人成网站 | 婷婷国产在线 | 国产片免费在线观看视频 | 免费一级片在线观看 | 亚洲精品久久久久久久不卡四虎 | 精品久久久久免费极品大片 | 久久国产精品免费视频 | 色婷婷亚洲综合 | 在线国产能看的 | 在线观看国产高清视频 | 亚洲人人爱 | 久久免费视频在线观看6 | 在线日韩| 91漂亮少妇露脸在线播放 | 免费在线观看成年人视频 | 欧美成年网站 | 久久久精品国产一区二区电影四季 | 日本aaa在线观看 | 黄色片网站av | 午夜精品婷婷 | 久久亚洲欧美 | 丝袜美腿在线 | 亚洲色图22p | 国产成人精品免费在线观看 | 日本精品免费看 | 国产精品国产三级国产不产一地 | 国产99久久九九精品 | 天堂av在线免费观看 | 国产小视频免费在线观看 | 久久午夜网 | 欧美成人精品欧美一级乱 | 808电影免费观看三年 | 免费观看一级特黄欧美大片 | a级国产乱理论片在线观看 特级毛片在线观看 | 日韩性色 | 欧美一级电影 | 欧美精品一区二区在线观看 | 亚洲电影影音先锋 | 国产精品亚 | 国产一区在线不卡 | 五月天天在线 | 久久久999免费视频 日韩网站在线 | 国色天香av | 91精品一区二区在线观看 | 久精品视频 | 欧美日韩高清在线一区 | 最近中文字幕国语免费av | av软件在线观看 | 亚洲精品99久久久久中文字幕 | 国产看片免费 | 亚洲精品小视频 | 久草免费在线观看 | 婷婷香蕉 | 亚洲国产免费网站 | 99热免费在线 | 五月天精品视频 | 亚洲做受高潮欧美裸体 | 911亚洲精品第一 | 日韩资源在线 | 免费a现在观看 | 天天操天天干天天综合网 | 国产精品一区二区三区免费视频 | 国产精品综合久久久久久 | 视频在线国产 | 久久久精品免费看 | 99 久久久久 | 天天干天天天天 | 六月色丁 | 99视屏| 麻豆91精品91久久久 | 国产夫妻自拍av | 国产中文字幕网 | 欧美成人精品欧美一级乱黄 | 又色又爽又激情的59视频 | 婷婷丁香色 | 91毛片在线观看 | 久草在线免费资源 | 蜜桃视频色| 波多野结依在线观看 | 黄色特级一级片 | 日韩在线视频观看 | 日韩三级视频在线看 | 国产色婷婷精品综合在线手机播放 | 中文字幕在线观看完整版电影 | 国产精品久久久久久久妇 | 日韩黄色网络 | 久久综合五月天婷婷伊人 | 99久久这里有精品 | 在线观看视频你懂得 | 久久伦理网 | 国产日韩精品在线观看 | 99久久99久久免费精品蜜臀 | 91一区二区三区久久久久国产乱 | 欧美一区二视频在线免费观看 | 天天色欧美 | 国产成人一二三 | 久久尤物电影视频在线观看 | a资源在线 | 久久久精品网 | 久黄色| 成人在线黄色 | 久久综合久色欧美综合狠狠 | 亚洲成人免费观看 | 五月天六月婷婷 | 精品国产成人av在线免 | 久久久精品网站 | 欧美日韩在线观看一区二区三区 | 国产一级一片免费播放放a 一区二区三区国产欧美 | 日韩美女一级片 | 麻豆 videos | 天天综合网入口 | 国产精品日韩在线播放 | 最近能播放的中文字幕 | 91天堂素人约啪 | 深爱五月激情五月 | 天天激情天天干 | 欧美激情视频一区二区三区免费 | 人人要人人澡人人爽人人dvd | 国产99久久九九精品免费 | 国产一级二级在线播放 | 中文字幕久久久精品 | 美女一二三区 | 九九视频一区 | 欧美精品中文字幕亚洲专区 | 中文有码在线视频 | 国产69久久久欧美一级 | 亚洲欧美日韩在线一区二区 | 中文字幕 国产专区 | 三级免费黄 | 麻豆精品在线 | 97色国产 | 久久久久中文 | 色九色| wwwwwww黄| 日本久久精品 | 国产精品中文字幕在线观看 | 99热这里只有精品国产首页 | 男女啪啪免费网站 | 日韩特级毛片 | 天天干天天色2020 | 亚洲v欧美v国产v在线观看 | 久草在线一免费新视频 | 日日干夜夜爱 | 国产精品免费在线播放 | 国产九九精品视频 | 精品电影一区 | 免费av网站在线看 | 黄色电影在线免费观看 | 四虎海外影库www4hu | av电影在线观看完整版一区二区 | 欧美日韩在线精品 | 永久免费av在线播放 | 四虎永久精品在线 | 97精品在线 | 国产美女精品在线 | 国产精品6999成人免费视频 | 97人人模人人爽人人喊中文字 | 日韩二区在线观看 | 在线看一级片 | 91中文字幕在线视频 | 欧美在线1区| 日韩精品一区二区在线 | 中文字幕乱码亚洲精品一区 | 99久久精品免费一区 | 成人国产精品一区二区 | 又紧又大又爽精品一区二区 | 亚洲一级片av | 国产99色| 丝袜美腿亚洲 | 91大神电影| 91中文字幕一区 | 国产成人一区二区在线观看 | 黄色免费在线看 | 中文免费观看 | 精品在线小视频 | 黄色国产区 | 国产精品一区二区三区在线看 | 免费观看mv大片高清 | 国产中文字幕在线免费观看 | 久久伦理电影 | 日韩精品一区二区在线视频 | 激情五月av | 日韩欧美一区二区在线观看 | 91桃色免费视频 | 91黄色小网站 | 亚洲免费av网站 | 久久国色夜色精品国产 | 2000xxx影视 | 亚洲高清在线视频 | 国产福利免费在线观看 | 亚洲国产网址 | 亚洲精品成人网 | 视频一区二区视频 | 少妇视频一区 | 日韩高清一 | 久久午夜电影院 | 九七视频在线观看 | 久久网页 | 日韩a级黄色 | 久久精品中文 | 91手机电视| 欧美做受高潮 | 国产黄色精品 | 精品国产乱码 | 国产精品一区一区三区 | 婷婷综合导航 | 国产精品一区二区三区四区在线观看 | 超碰97免费在线 | 久久久久久久久久久成人 | 深爱五月激情网 | 国产原创在线视频 | 国产99一区二区 | 亚洲精品国产视频 | 国产视频精品久久 | 五月激情丁香 | 免费毛片一区二区三区久久久 | 日韩伦理片hd | 91视频在线免费观看 | 99久久久免费视频 | 免费aa大片 | 九九精品视频在线 | 81国产精品久久久久久久久久 | 99精品视频免费观看 | 91大神精品视频在线观看 | 国产v欧美 | 久久一线| 91综合视频在线观看 | 欧美性春潮 | 免费观看第二部31集 | 欧美精品v国产精品v日韩精品 | 日韩一区精品 | 国产精品免费久久久久久久久久中文 | 色婷婷综合久色 | 国产精品一区二区在线免费观看 | 成年人视频在线 | 丁香六月激情婷婷 | 狠狠色丁香九九婷婷综合五月 | 激情av一区二区 | 香蕉免费| 色综合久久久 | 青青射| 天天综合操 | 中文字幕在线观看完整版 | 国产亚洲精品中文字幕 | av免费观看高清 | 97在线观看免费高清 | 国产精品 日韩精品 | 日本在线中文 | 日韩特黄av | 99热在 | 香蕉影视在线观看 | 亚洲免费在线观看视频 | 成人在线免费小视频 | 超碰人人舔 | 国产精品永久免费观看 | 日韩精品最新在线观看 | 色婷婷中文 | 亚洲情婷婷 | 国产人成一区二区三区影院 | 国产精品久久久久一区二区三区共 | 91九色成人 | 91视频免费观看 | 一级一片免费视频 | 激情文学丁香 | 亚洲三级性片 | 在线观看欧美成人 | 国产在线一区二区 | 国产精品99免视看9 国产精品毛片一区视频 | 黄色av成人在线 | 激情婷婷六月 | 久久精品视 | 欧美一级片免费在线观看 | 99视频在线免费看 | 91麻豆精品国产 | 亚洲日本va中文字幕 | 99 久久久久| 国产91国语对白在线 | 久草在线视频中文 | 五月婷丁香 | 视频99爱 | 亚洲成人黄色在线观看 | 97国产超碰 | 黄色字幕网 | 欧美午夜a| 日日婷婷夜日日天干 | 激情网五月婷婷 | 麻豆av一区二区三区在线观看 | 91x色| 激情在线免费视频 | av免费在线观 | 在线国产91| 91在线观看视频网站 | 欧美一二三专区 | 国产色婷婷 | 久久网站av | 又粗又长又大又爽又黄少妇毛片 | 黄色亚洲| 国产69熟 | 看av在线| 中文成人字幕 | 国产在线第三页 | 国产视频在线观看一区二区 | 久久人人添人人爽添人人88v | 国产高清在线免费视频 | 国内成人精品2018免费看 | 视频二区| 亚洲无人区小视频 | 亚洲高清av | 激情黄色一级片 | 免费av福利| 在线三级播放 | 麻豆视频大全 | 狠狠狠色丁香婷婷综合久久五月 | 久久久久久久久久久电影 | 天天摸夜夜添 | 久久免费99精品久久久久久 | 久久综合网色—综合色88 | 日日操操操 | 日日干综合 | 麻豆视频成人 | 日日夜夜天天人人 | 亚洲高清资源 | 天天综合网天天 | 欧美日韩中文在线视频 | 欧美a级一区二区 | 一区二区三区中文字幕在线 | 黄色免费观看网址 | 粉嫩一区二区三区粉嫩91 | 在线v片免费观看视频 | 一区二区三区观看 | 精品91久久久久 | 偷拍精品一区二区三区 | 亚洲日本va中文字幕 | 在线看污网站 | 久久免费a| 国产高清久久久久 | 国产麻豆精品一区二区 | 五月婷婷六月丁香 | 99热在线观看免费 | 亚洲1区 在线| 天天看天天操 | 天天综合入口 | 粉嫩av一区二区三区四区在线观看 | 91精品国产乱码在线观看 | 国产高清黄色 | 日韩欧美在线一区 | 99精品国产一区二区三区不卡 | 亚洲欧美激情插 | 国产91在| 天天玩天天操天天射 | 亚洲精品小视频在线观看 | 亚洲情婷婷 | 一区三区在线欧 | av电影免费在线看 | 九七人人干 | 久久久久婷 | 91视频在线国产 | 久久不卡免费视频 | 99久久免费看 | 欧美日韩二区在线 | 在线观看亚洲精品 | 欧美日韩国产色综合一二三四 | 美女黄频在线观看 | 欧洲激情在线 | 欧美午夜a | 国产真实精品久久二三区 | a资源在线| 午夜私人影院 | 国内精品久久久久影院男同志 | 成人黄色在线视频 | 国产成人黄色网址 | 狠狠躁夜夜躁人人爽超碰97香蕉 | 99精品久久久久 | 久久久国产精品一区二区中文 | 国产激情电影综合在线看 | 免费a级毛片在线看 | 成片视频在线观看 | 一区二区三区在线免费观看视频 | 色综合久久88 | 久久久www免费电影网 | 婷婷在线色 | 高清av在线免费观看 | 欧美日韩裸体免费视频 | 婷婷 中文字幕 | 久久精品视频一 | 国产精品99久久久久久久久 | 夜夜看av| 欧美精品久久久久久久久久久 | 91精品国产99久久久久久红楼 | 97成人精品视频在线播放 | 国产精品v欧美精品 | 91色一区二区三区 | 久久男人中文字幕资源站 | 99精品视频一区二区 | 91九色网站 | 99久热在线精品视频观看 | 麻豆精品视频在线观看免费 | 国产精品一区二区中文字幕 | 亚洲黄色激情小说 | 成人av免费在线 | 久久精品在线 | 国产一区欧美在线 | 久久综合免费视频影院 | 亚洲 综合 国产 精品 | 国产精品一区二区在线 | 色综合网在线 | 国产精品亚州 | 91av视屏 | 成人av中文字幕在线观看 | 91一区啪爱嗯打偷拍欧美 | 色av色av色av | 伊人网综合在线观看 | 欧美 国产 视频 | 久久久伊人网 | 高清不卡一区二区三区 | 性日韩欧美在线视频 | 精品久久久久久久久久国产 | 久久久久久久久久影视 | 国产涩图 | 热久久国产精品 | 免费看黄色毛片 | 黄色片亚洲 | 很污的网站 | 国产资源网站 | 久久精品国产免费观看 | 国产日韩欧美在线免费观看 | 97精品视频在线播放 | 国产在线观看高清视频 | 免费在线看v | 天天操操操操操 | 欧美日本不卡 | 在线观看视频你懂的 | 伊人天堂网 | 91超级碰 | 欧美日韩aa | 国产综合在线视频 | 在线观看www91 | 欧美成人视 | 亚洲在线精品视频 | 中文字幕在线久一本久 | 成人午夜免费福利 | 性色xxxxhd| 国产精品女教师 | 国产精品黄色影片导航在线观看 | 免费在线观看国产精品 | 一区二区三区手机在线观看 | 日本不卡一区二区三区在线观看 | 国产午夜精品一区二区三区嫩草 | 国产丝袜 | 一区二区三区www | 精品一二三四五区 | 久久婷婷国产色一区二区三区 | 成人在线观看免费视频 | 欧美日韩免费在线观看视频 | 天天综合网天天综合色 | 在线观看视频h | 国外成人在线视频网站 | 久久久久成人精品 | 91桃色在线播放 | www.久久com| 永久免费av在线播放 | 亚洲三级黄色 | 深爱激情婷婷网 | 又黄又刺激的视频 | 国产精品6| 看黄色.com | 久久国产精品免费观看 | 色婷婷狠狠五月综合天色拍 | 久草视频播放 | 精品国产免费人成在线观看 | 国内精品小视频 | 一本一道久久a久久精品 | 亚洲精品中文字幕在线观看 | 色综合婷婷 | 日本大片免费观看在线 | 五月婷丁香 | 美女免费电影 | 久久在视频 | 午夜精品一区二区三区免费 | 一级片免费观看 | 国产精品免费在线 | 五月激情五月激情 | 成人国产亚洲 | 欧美精品小视频 | 久久黄色片子 | 国产理论片在线观看 | 欧美福利网址 | 97视频在线免费播放 | 天天草夜夜 | 人人看97 | 国产一二区在线观看 | 亚洲黄色网络 | 亚洲日本精品视频 | 亚洲国产精选 | 在线观看国产麻豆 | 亚洲涩涩色 | 欧美久久影院 | 久草网在线视频 | 色网站在线观看 | 久久国产电影 | 久久躁日日躁aaaaxxxx | 丁香婷婷综合五月 | 国产看片网站 | 久久伦理电影 | 公与妇乱理三级xxx 在线观看视频在线观看 | 午夜精品久久久久久久99无限制 | 国产精品手机看片 | 国产精品中文在线 | 日韩欧美视频免费观看 | 色婷婷97 | 综合婷婷| 欧美在线一二 | 婷婷伊人综合 | 天天干天天操天天射 | 久久综合久久久久88 | 欧美性大胆 | 麻豆国产视频下载 | 亚洲黄色成人av | 国产精品久久久久久久久久久久午 | 日韩二区三区在线 | 超碰97免费 | 日韩网站在线 | 日韩sese | 亚洲精品影院在线观看 | 国产在线播放一区二区三区 | 日本午夜免费福利视频 | 99久高清在线观看视频99精品热在线观看视频 | 黄色网中文字幕 | 欧美91视频| 国产午夜精品久久久久久久久久 | 国产精品白浆 | 久久久在线观看 | 免费在线观看国产精品 | 国产资源精品 | a一片一级 | 亚洲电影在线看 | av片中文| 国产福利av在线 | 日韩中文字幕国产精品 | 欧美日韩免费视频 | 国产精品永久免费视频 | 日韩成人在线免费观看 | 中文字幕专区高清在线观看 | 中文字幕av电影下载 | 国产精品尤物视频 | 亚洲国产视频在线 | 国产亚洲欧美日韩高清 | 婷婷伊人网| 免费日韩电影 | 亚洲色图色 | 欧美日韩一区二区免费在线观看 | 亚洲国产av精品毛片鲁大师 | 国产精品第10页 | 日韩久久网站 | 国产精品夜夜夜一区二区三区尤 | 国产一区二区三区在线 | 日韩成人黄色 | 欧美日韩亚洲在线 | 激情网站五月天 | 视频一区二区在线观看 | 国产在线精品区 | 最近免费中文字幕mv在线视频3 | 日韩欧美精品免费 | 亚洲国产三级 | 五月激情五月激情 | 日韩婷婷 | 亚洲欧洲精品视频 | 国产精品日韩高清 | 免费日韩视 | 中文字幕成人在线观看 | 精品在线亚洲视频 | 日日爽夜夜爽 | 国产精品手机在线观看 | 成人免费观看视频大全 | 91成品视频 | 91网在线 | 国产精品男女 | 久久黄色小说 | 国产精品一区二区久久 | 性色视频在线 | 久久视频99 | 国产精品福利在线观看 | 国产精品四虎 | 黄色特级一级片 | 久久99视频免费观看 | 国产精品国产三级在线专区 | 在线观看午夜av | 国产网站av | 91免费网址| 亚洲精品免费在线 | 亚洲女同ⅹxx女同tv | 色噜噜狠狠狠狠色综合 | 精品一区二区综合 | 免费观看的黄色片 | 亚洲国产成人精品在线观看 | 欧美做受69| 色中色亚洲 | 免费看久久 | 亚洲成人av片 | 免费观看一级视频 | 伊人欧美| 五月婷婷爱 | 97人人超| 天堂黄色片 | 公与妇乱理三级xxx 在线观看视频在线观看 | 精品久久久久久久久久久久 | 亚洲精品视频在线观看免费 | 天天天综合 | 国产精品毛片久久久久久 | 黄网站app在线观看免费视频 | 日韩av免费大片 | 视频在线观看入口黄最新永久免费国产 | 欧美精品你懂的 | 午夜少妇 | 欧美va天堂va视频va在线 | 日韩欧美区 | 天堂av在线免费观看 | av高清在线 | 亚洲国产成人精品久久 | 日韩精品视频免费专区在线播放 | 91在线网址 | 亚洲伊人第一页 | 在线国产日本 | 9797在线看片亚洲精品 | 成人午夜精品 | 中文在线www| 日本激情动作片免费看 | 欧美日韩高清在线一区 | 最新国产精品拍自在线播放 | 黄色网免费 | 亚洲国产999| 91在线操 | 69国产盗摄一区二区三区五区 | 99精品在线免费视频 | 日日夜夜天天综合 | 日韩动漫免费观看高清完整版在线观看 | 九九热精 | 欧美精品久久久 | 亚洲国产精品一区二区尤物区 | 免费下载高清毛片 | 国产专区在线视频 | 国产小视频免费在线网址 | 国产a精品 | 丰满少妇高潮在线观看 | 久久久天堂 |