日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

数据湖架构Hudi(五)Hudi集成Flink案例详解

發布時間:2024/1/8 编程问答 61 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据湖架构Hudi(五)Hudi集成Flink案例详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

五、Hudi集成Flink案例詳解

5.1 hudi集成flink

flink的下載地址:

https://archive.apache.org/dist/flink/

HudiSupported Flink version
0.12.x1.15.x、1.14.x1.13.x
0.11.x1.14.x、1.13.x
0.10.x1.13.x
0.9.01.12.2
  • 將上述編譯好的安裝包拷貝到flink下的jars目錄中:
cp /opt/apps/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/apps/flink-1.13.6/lib/
  • 拷貝guava包,解決依賴沖突
cp /opt/apps/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flink-1.13.6/lib/
  • 配置Hadoop環境變量
vim /etc/profile.d/my_env.shexport HADOOP_CLASSPATH=`hadoop classpath` export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopsource /etc/profile.d/my_env.sh

5.2 sql-client之yarn-session模式

配置hadoop調度器yarn

mapred-site.xml<configuration> <!-- 指定MapReduce作業執?時,使?YARN進?資源調度 --><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value> </property><property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property><property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property> </configuration>yarn-site.xml<configuration> <!-- 設置ResourceManager --><property><name>yarn.resourcemanager.hostname</name><value>centos04</value> </property> <!--配置yarn的shuffle服務--><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property> </configuration>hadoop-env.sh # 在最后面添加如下: export YARN_RESOURCEMANAGER_USER=root export YARN_NODEMANAGER_USER=root# 記得配置sql-client-defaults.yaml

5.2.1 啟動

# 1、修改配置文件 vim /opt/apps/flink-1.13.6/conf/flink-conf.yamlclassloader.check-leaked-classloader: false taskmanager.numberOfTaskSlots: 4state.backend: rocksdb execution.checkpointing.interval: 30000 # 開啟ck,才能快速從內存中flush出去 state.checkpoints.dir: hdfs://centos04:9000/ckps state.backend.incremental: true# 2、yarn-session模式啟動# 解決依賴問題 cp /opt/apps/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/apps/flink-1.13.6/lib/# 啟動yarn-session /opt/apps/flink-1.13.6/bin/yarn-session.sh -d # 啟動sql-client /opt/apps/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session

5.2.2 插入數據

set sql-client.execution.result-mode=tableau;-- 創建hudi表 CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' -- 默認是COW );或如下寫法 CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' );-- 插入數據 INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');-- 查詢數據 select * from t1;

5.2.3 流式插入

-- 1、創建測試表 CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20) ) WITH ('connector' = 'datagen','rows-per-second' = '1' );create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20) ) with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ' );-- 2、執行插入 insert into t2 select * from sourceT;查詢結果 set sql-client.execution.result-mode=tableau; Flink SQL> select * from t2 limit 10; -- 會產生一個collect的flink任務,拉取10條數據,注意:不是流讀取 2023-03-06 22:45:10,403 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false 2023-03-06 22:45:12,897 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at centos04/192.168.42.104:8032 2023-03-06 22:45:12,899 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2023-03-06 22:45:12,918 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface centos04:45452 of application 'application_1678113536312_0001'. +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | op | uuid | name | age | ts | partition | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | +I | d0523c31d3da5b8e2a8ff676dcf... | 327db70824413c5dcde0a7ac10c... | 1971040768 | 2023-03-06 14:40:58.780 | 42b45346672bf719b5393232763... | | +I | cfc07cbebf6890a04942ec88947... | 36fc7a58aab88835f11b3b51a40... | -12199364 | 2023-03-06 14:41:05.781 | e33c02173f4c744fb9c1c68e774... | | +I | 668b204a933494a89b829c76bc6... | aa9ff2109457fdcd5f099b8ce98... | 2061449955 | 2023-03-06 14:41:14.780 | 680514e53b196324423cd12cda5... | | +I | 95fe7878909a801c2726f1d05f5... | 1c86b29fe313e557688df0ba950... | 519997290 | 2023-03-06 14:41:11.781 | b9817c52301ab4614c3053c9ccc... | | +I | 8661c25c8c930f4660fbefa867e... | 01a2bee6b99064c7bca9513ca37... | -682830738 | 2023-03-06 14:41:32.781 | 16ab837502a31e208b06bb74efd... | | +I | 55ce03895e229b29546dbdd2ff3... | 77f2552de13337e8092c1445654... | 2011273584 | 2023-03-06 14:41:09.780 | 3fd688cfa17b2a3a6fd3ffac6bd... | | +I | 50c23f315d736c313b652b34fc5... | 4f9c84ff75466fba8e800daabd0... | -190184764 | 2023-03-06 14:42:26.780 | 7f2a07a1007b2fbfea8cbb2062e... | | +I | 8073e8c70a9bc0e79c2e69aa885... | 30bf89c80d9ab0f0a8f5f883ee6... | -1639873427 | 2023-03-06 14:41:24.781 | 15df7d527d6d7edae496e76d02f... | | +I | 29a61b7cd348d08498d2b089a5d... | 77a63ca7a2e77e6d167de20c673... | 71527378 | 2023-03-06 14:42:14.781 | 2842db44a691f4f1d597ac79086... | | +I | e5defc24191f60557644b7d14e2... | 56bdd04424b8f422d4075ade510... | 1054223989 | 2023-03-06 14:40:42.781 | e8d2d3c6fed90d37b15647d1ecd... | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+

5.3 使用IDEA開發

除了用sql-client,還可以自己編寫FlinkSQL程序,打包提交Flink作業。

1、首先,需要將hudi集成flink的jar包,裝載到本地的倉庫,命令如下:

D:\bigdata\hudi從入門到精通\apps>mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar[INFO] Scanning for projects... [INFO] [INFO] ------------------< org.apache.maven:standalone-pom >------------------- [INFO] Building Maven Stub Project (No POM) 1 [INFO] --------------------------------[ pom ]--------------------------------- [INFO] [INFO] --- maven-install-plugin:2.4:install-file (default-cli) @ standalone-pom --- [INFO] Installing D:\bigdata\hudi從入門到精通\apps\hudi-flink1.13-bundle-0.12.0.jar to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.jar [INFO] Installing C:\Users\Undo\AppData\Local\Temp\mvninstall50353756903805721.pom to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.pom [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.111 s [INFO] Finished at: 2023-03-02T10:08:15+08:00 [INFO] ------------------------------------------------------------------------

2、導入pom文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>hudi-start</artifactId><groupId>com.yyds</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>hudi-flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><hudi.version>0.12.0</hudi.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope> <!--不會打包到依賴中,只參與編譯,不參與運行 --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--idea運行時也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><!--手動install到本地maven倉庫,要不然會報錯--><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink_2.12</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project> package com.yyds.hudi.flink;import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiTest {public static void main(String[] args) {System.setProperty("HADOOP_USER_NAME","root");// 1、創建flinksql的執行環境Configuration conf = new Configuration();conf.setString(RestOptions.BIND_PORT, "8081-8089");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);// 注意:需要設置check-point// 設置狀態后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://centos04:9000/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2、使用flink自帶connector模擬數據tabEnv.executeSql("CREATE TABLE sourceT (\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1'\n" +")");// 3、創建hudi表tabEnv.executeSql("create table t2(\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +")\n" +"with (\n" +" 'connector' = 'hudi',\n" + // 指定connector為hudi" 'path' = 'hdfs://192.168.42.104:9000/datas/hudi_warehouse/hudi_flink/t2',\n" +" 'table.type' = 'MERGE_ON_READ'\n" + // MOR類型的表")");// 4、將模擬產生的數據,寫入到Hudi表中tabEnv.executeSql("insert into t2 select * from sourceT");} }

jar包運行

bin/flink run -t yarn-per-job \ -c com.yyds.hudi.flink.HudiTest \ ./myjars/hudi-flink-1.0-SNAPSHOT.jar

類型映射

Flink SQL TypeHudi TypeAvro logical type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYbytes
DECIMALfixeddecimal
TINYINTint
SMALLINTint
INTint
BIGINTlong
FLOATfloat
DOUBLEdouble
DATEintdate
TIMEinttime-millis
TIMESTAMPlongtimestamp-millis
ARRAYarray
MAP(key must be string/char/varchar type)map
MULTISET(element must be string/char/varchar type)map
ROWrecord

5.4 hudi核心參數

5.4.1 去重參數

-- 通過如下語法設置主鍵: -- 設置單個主鍵 create table hoodie_table (f0 int primary key not enforced,f1 varchar(20),... ) with ('connector' = 'hudi',... )-- 設置聯合主鍵 create table hoodie_table (f0 int,f1 varchar(20),...primary key(f0, f1) not enforced ) with ('connector' = 'hudi',... ) 名稱說明默認值備注
hoodie.datasource.write.recordkey.field主鍵字段支持主鍵語法 PRIMARY KEY 設置,支持逗號分隔的多個字段
precombine.field(0.13.0 之前版本為 write.precombine.field)去重時間字段record 合并的時候會按照該字段排序,選值較大的 record 為合并結果;不指定則為處理序:選擇后到的 record

5.4.2 并發參數

名稱說明默認值備注
write.taskswriter 的并發,每個 writer 順序寫 1~N 個 buckets4增加并發對小文件個數沒影響
write.bucket_assign.tasksbucket assigner 的并發Flink的并行度增加并發同時增加了并發寫的 bucekt 數,也就變相增加了小文件(小 bucket) 數
write.index_bootstrap.tasksIndex bootstrap 算子的并發,增加并發可以加快 bootstrap 階段的效率,bootstrap 階段會阻塞 checkpoint,因此需要設置多一些的 checkpoint 失敗容忍次數Flink的并行度只在 index.bootstrap.enabled 為 true 時生效
read.tasks讀算子的并發(batch 和 stream)4
compaction.tasksonline compaction 算子的并發writer 的并發online compaction 比較耗費資源,建議走 offline compaction

可以flink建表時在with中指定,或Hints臨時指定參數的方式:在需要調整的表名后面加上 /*+ OPTIONS() */

案例如下:

insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */ select * from sourceT;# 從下圖可以看出,writer 的并發變成了2,bucket assigner 的并發變成了3,compaction_task 變成了4


可以參考下面Hudi表讀取原理,看上圖。

5.4.3 壓縮參數

? 在線壓縮的參數,通過設置 compaction.async.enabled =false關閉在線壓縮執行,但是調度compaction.schedule.enabled 仍然建議開啟(即上圖的compact_plan_generate步驟),之后通過離線壓縮直接執行 在線壓縮任務 階段性調度的壓縮 plan。

名稱說明默認值備注
compaction.schedule.enabled是否階段性生成壓縮 plantrue建議開啟,即使compaction.async.enabled 關閉的情況下
compaction.async.enabled是否開啟異步壓縮true通過關閉此參數關閉在線壓縮
compaction.tasks壓縮 task 并發4
compaction.trigger.strategy壓縮策略num_commits支持四種策略:num_commits、time_elapsed、num_and_time、num_or_time
compaction.delta_commits默認策略,5 個 commits 壓縮一次5
compaction.delta_seconds3600
compaction.max_memory壓縮去重的 hash map 可用內存100(MB)資源夠用的話建議調整到 1GB
compaction.target_io每個壓縮 plan 的 IO 上限,默認 5GB500(GB)

案例如下:

CREATE TABLE t3(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t3','compaction.async.enabled' = 'true', -- 異步在線壓縮'compaction.tasks' = '1','compaction.schedule.enabled' = 'true', -- 生成壓縮 plan'compaction.trigger.strategy' = 'num_commits', -- 壓縮策略,安裝commit次數進行壓縮'compaction.delta_commits' = '2', -- 2次進行壓縮'table.type' = 'MERGE_ON_READ' );set table.dynamic-table-options.enabled=true; insert into t3 select * from sourceT/*+ OPTIONS('rows-per-second' = '5') */;-- 從hdfs上可以看到,flink發生兩次ck,delta_commit提交兩次后,將log文件進行壓縮,然后生成了parquet文件。

5.4.4 文件大小

? Hudi會自管理文件大小,避免向查詢引擎暴露小文件,其中自動處理文件大小起很大作用。在進行insert/upsert操作時,Hudi可以將文件大小維護在一個指定文件大小。

? 目前只有 log 文件的寫入大小可以做到精確控制,parquet 文件大小按照估算值。

名稱說明默認值備注
hoodie.parquet.max.file.size最大可寫入的 parquet 文件大小120 * 1024 * 1024默認 120MB(單位 byte)超過該大小切新的 file group
hoodie.logfile.to.parquet.compression.ratiolog文件大小轉 parquet 的比率0.35hoodie 統一依據 parquet 大小來評估小文件策略
hoodie.parquet.small.file.limit在寫入時,hudi 會嘗試先追加寫已存小文件,該參數設置了小文件的大小閾值,小于該參數的文件被認為是小文件104857600默認 100MB(單位 byte)大于 100MB,小于 120MB 的文件會被忽略,避免寫過度放大
hoodie.copyonwrite.record.size.estimate預估的 record 大小,hoodie 會依據歷史的 commits 動態估算 record 的大小,但是前提是之前有單次寫入超過 hoodie.parquet.small.file.limit 大小,在未達到這個大小時會使用這個參數1024默認 1KB(單位 byte)如果作業流量比較小,可以設置下這個參數
hoodie.logfile.max.sizeLogFile最大大小。這是在將Log滾轉到下一個版本之前允許的最大大小。1073741824默認1GB(單位 byte)

案例如下:

CREATE TABLE t4(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t4','compaction.tasks' = '1','hoodie.parquet.max.file.size'= '10000', -- 最大可寫入的 parquet 文件大小,設置為10 KB'hoodie.parquet.small.file.limit'='5000', -- 小文件的大小閾值,小于該參數的文件被認為是小文件 設置為5KB'table.type' = 'MERGE_ON_READ' );set table.dynamic-table-options.enabled=true; insert into t4 select * from sourceT /*+ OPTIONS('rows-per-second' = '5')*/;

5.4.5 hadoop參數

從 0.12.0 開始支持,如果有跨集群提交執行的需求,可以通過 sql 的 ddl 指定 per-job 級別的 hadoop 配置

名稱說明默認值備注
hadoop.${you option key}通過 hadoop.前綴指定 hadoop 配置項支持同時指定多個 hadoop 配置項

5.5 內存優化

5.5.1 內存參數

名稱說明默認值備注
write.task.max.size一個 write task 的最大可用內存1024當前預留給 write buffer 的內存為write.task.max.size -compaction.max_memory當 write task 的內存 buffer達到閾值后會將內存里最大的 buffer flush 出去
write.batch.sizeFlink 的寫 task 為了提高寫數據效率,會按照寫 bucket 提前 buffer 數據,每個 bucket 的數據在內存達到閾值之前會一直 cache 在內存中,當閾值達到會把數據 buffer 傳遞給 hoodie 的 writer 執行寫操作256一般不用設置,保持默認值就好
write.log_block.sizehoodie 的 log writer 在收到 write task 的數據后不會馬上 flush 數據,writer 是以 LogBlock 為單位往磁盤刷數據的,在 LogBlock 攢夠之前 records 會以序列化字節的形式 buffer 在 writer 內部128一般不用設置,保持默認值就好
write.merge.max_memoryhoodie 在 COW 寫操作的時候,會有增量數據和 base file 數據 merge 的過程,增量的數據會緩存在內存的 map 結構里,這個 map 是可 spill 的,這個參數控制了 map 可以使用的堆內存大小100一般不用設置,保持默認值就好
compaction.max_memory同 write.merge.max_memory: 100MB 類似,只是發生在壓縮時。100如果是 online compaction,資源充足時可以開大些,比如 1GB

5.5.2 MOR

(1)state backend 換成 rocksdb (默認的 in-memory state-backend 非常吃內存)(2)內存夠的話,compaction.max_memory 調大些 (默認是 100MB 可以調到 1GB)(3)關注 TM 分配給每個 write task 的內存,保證每個 write task 能夠分配到 write.task.max.size 所配置的大小,比如 TM 的內存是 4GB 跑了 2 個 StreamWriteFunction 那每個 write function 能分到 2GB,盡量預留一些 buffer,因為網絡 buffer,TM 上其他類型 task (比如 BucketAssignFunction 也會吃些內存)(4)需要關注 compaction 的內存變化,compaction.max_memory 控制了每個 compaction task 讀 log 時可以利用的內存大小,compaction.tasks 控制了 compaction task 的并發注意: write.task.max.size - compaction.max_memory 是預留給每個 write task 的內存 buffer

5.5.3 COW

(1)state backend 換成 rocksdb(默認的 in-memory state-backend 非常吃內存)。(2)write.task.max.size 和 write.merge.max_memory 同時調大(默認是 1GB 和 100MB 可以調到 2GB 和 1GB)。(3)關注 TM 分配給每個 write task 的內存,保證每個 write task 能夠分配到 write.task.max.size 所配置的大小,比如 TM 的內存是 4GB 跑了 2 個 StreamWriteFunction 那每個 write function 能分到 2GB,盡量預留一些 buffer,因為網絡 buffer,TM 上其他類型 task(比如 BucketAssignFunction 也會吃些內存)。注意:write.task.max.size - write.merge.max_memory 是預留給每個 write task 的內存 buffer。

5.6 讀取方式

5.6.1 流讀

? 當前表默認是快照讀取,即讀取最新的全量快照數據并一次性返回。通過參數read.streaming.enabled 參數開啟流讀模式,通過 read.start-commit 參數指定起始消費位置,支持指定 earliest 從最早消費。

名稱Required默認值說明
read.streaming.enabledfalsefalse設置 true 開啟流讀模式
read.start-commitfalse最新 commit指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(閉區間)
read.streaming.skip_compactionfalsefalse流讀時是否跳過 compaction 的 commits,跳過 compaction 有兩個用途:1)避免 upsert 語義下重復消費 (compaction 的 instant 為重復數據,如果不跳過,有小概率會重復消費) 2) changelog 模式下保證語義正確性 0.11 開始,以上兩個問題已經通過保留 compaction 的 instant time 修復
clean.retain_commitsfalse10cleaner 最多保留的歷史 commits 數,大于此數量的歷史 commits 會被清理掉,changelog 模式下,這個參數可以控制 changelog 的保留時間,例如 checkpoint 周期為 5 分鐘一次,默認最少保留 50 分鐘的時間。
set sql-client.execution.result-mode=tableau;CREATE TABLE t5(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t5','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4' -- 默認60s );insert into t5 select * from sourceT;select * from t5;-- 如下圖,能夠不斷的獲取數據

5.6.2 增量讀取

0.10.0 開始支持。如果有增量讀取 batch 數據的需求,增量讀取包含三種場景。 (1)Stream 增量消費,通過參數 read.start-commit 指定起始消費位置; (2)Batch 增量消費,通過參數 read.start-commit 指定起始消費位置,通過參數 read.end-commit 指定結束消費位置,區間為閉區間,即包含起始、結束的 commit (3)TimeTravel:Batch 消費某個時間點的數據:通過參數 read.end-commit 指定結束消費位置即可(由于起始位置默認從最新,所以無需重復聲明) 名稱Required默認值說明
read.start-commitfalse默認從最新 commit支持 earliest 從最早消費
read.end-commitfalse默認到最新 commit

5.6.3 限流

? 如果將全量數據(百億數量級) 和增量先同步到 kafka,再通過 flink 流式消費的方式將庫表數據直接導成 hoodie 表,因為直接消費全量部分數據:量大(吞吐高)、亂序嚴重(寫入的 partition 隨機),會導致寫入性能退化,出現吞吐毛刺,這時候可以開啟限速參數,保證流量平穩寫入。

名稱Required默認值說明
write.rate.limitfalse0默認關閉限速

5.7 寫入方式

5.7.1 通過flink-cdc進行寫入

CDC 數據保存了完整的數據庫變更,當前可通過兩種途徑將數據導入 hudi

第一種:通過 cdc-connector 直接對接 DB 的 binlog 將數據導入 hudi,優點是不依賴消息隊列,缺點是對 db server 造成壓力。第二種:對接 cdc format 消費 kafka 數據導入 hudi,優點是可擴展性強,缺點是依賴 kafka。注意:如果上游數據無法保證順序,需要指定 write.precombine.field 字段。

1)準備MySQL表

(1)MySQL開啟binlog

(2)建表

create database test; use test;create table stu3 (id int unsigned auto_increment primary key COMMENT '自增id',name varchar(20) not null comment '學生名字',school varchar(20) not null comment '學校名字',nickname varchar(20) not null comment '學生小名',age int not null comment '學生年齡',class_num int not null comment '班級人數',phone bigint not null comment '電話號碼',email varchar(64) comment '家庭網絡郵箱',ip varchar(32) comment 'IP地址') engine=InnoDB default charset=utf8;

2)flink讀取mysql binlog并寫入kafka

(1)創建MySQL表

create table stu3_binlog(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) with ('connector' = 'mysql-cdc','hostname' = 'centos01','port' = '3306','username' = 'root','password' = 'root','database-name' = 'test','table-name' = 'stu3' );

(2)創建Kafka表

create table stu3_binlog_sink_kafka(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) with ('connector' = 'upsert-kafka','topic' = 'cdc_mysql_stu3_sink','properties.zookeeper.connect' = 'centos01:2181','properties.bootstrap.servers' = 'centos01:9092','key.format' = 'json','value.format' = 'json');

(3)將mysql binlog日志寫入kafka

insert into stu3_binlog_sink_kafka select * from stu3_binlog;

3)flink讀取kafka數據并寫入hudi數據湖

(1)創建kafka源表

create table stu3_binlog_source_kafka(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string) with ('connector' = 'kafka','topic' = 'cdc_mysql_stu3_sink','properties.bootstrap.servers' = 'hadoop1:9092','format' = 'json','scan.startup.mode' = 'earliest-offset','properties.group.id' = 'testGroup');

(2)創建hudi目標表

create table stu3_binlog_sink_hudi(id bigint not null,name string,`school` string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) partitioned by (`school`) with ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/stu3_binlog_sink_hudi','table.type' = 'MERGE_ON_READ','write.option' = 'insert','write.precombine.field' = 'school');

(3)將kafka數據寫入到hudi中

insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka;

5.7.2 離線批量導入

如果存量數據來源于其他數據源,可以使用批量導入功能,快速將存量數據導成 Hoodie 表格式。(1)批量導入省去了 avro 的序列化以及數據的 merge 過程,后續不會再有去重操作,數據的唯一性需要自己來保證。 (2)bulk_insert 需要在 Batch Execuiton Mode 下執行更高效,Batch 模式默認會按照 partition path 排序輸入消息再寫入 Hoodie,避免 file handle 頻繁切換導致性能下降。SET execution.runtime-mode = batch; SET execution.checkpointing.interval = 0;(3)bulk_insert write task 的并發通過參數 write.tasks 指定,并發的數量會影響到小文件的數量,理論上,bulk_insert write task 的并發數就是劃分的 bucket 數,當然每個 bucket 在寫到文件大小上限(parquet 120 MB)的時候會 roll over 到新的文件句柄,所以最后:寫文件數量 >= bulk_insert write task 數。 名稱Required默認值說明
write.operationTRUEupsert配置 bulk_insert 開啟該功能
write.tasksFALSE4bulk_insert 寫 task 的并發,最后的文件數 >=write.tasks
write.bulk_insert.shuffle_by_partitionwrite.bulk_insert.shuffle_input(從 0.11 開始)FALSETRUE是否將數據按照 partition 字段 shuffle 再通過 write task 寫入,開啟該參數將減少小文件的數量 但是可能有數據傾斜風險
write.bulk_insert.sort_by_partitionwrite.bulk_insert.sort_input(從 0.11 開始)FALSETRUE是否將數據線按照 partition 字段排序再寫入,當一個 write task 寫多個 partition,開啟可以減少小文件數量
write.sort.memory128sort 算子的可用 managed memory(單位 MB)

5.7.3 全量接增量

如果已經有全量的離線 Hoodie 表,需要接上實時寫入,并且保證數據不重復,可以開啟 index bootstrap 功能。

如果覺得流程冗長,可以在寫入全量數據的時候資源調大直接走流模式寫,全量走完接新數據再將資源調小(或者開啟限流功能)。

名稱Required默認值說明
index.bootstrap.enabledtruefalse開啟索引加載,會將已存表的最新數據一次性加載到 state 中
index.partition.regexfalse*設置正則表達式進行分區篩選,默認為加載全部分區
使用流程 (1) CREATE TABLE 創建和 Hoodie 表對應的語句,注意 table type 要正確 (2)設置 index.bootstrap.enabled = true開啟索引加載功能 (3)重啟任務將 index.bootstrap.enabled 關閉,參數配置到合適的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并發不同,可以重啟避免 shuffle說明: 索引加載為并發加載,根據數據量大小加載時間不同,可以在log中搜索 finish loading the index under partition 和 Load records from file 日志來觀察索引加載的進度

5.8 寫入模式

5.8.1 Changelog 模式

? 如果希望 Hoodie 保留消息的所有變更(I/-U/U/D),之后接上 Flink 引擎的有狀態計算實現全鏈路近實時數倉生產(增量計算),Hoodie 的 MOR 表通過行存原生支持保留消息的所有變更(format 層面的集成),通過流讀 MOR 表可以消費到所有的變更記錄。

1)WITH 參數

名稱Required默認值說明
changelog.enabledfalsefalse默認是關閉狀態,即 UPSERT 語義,所有的消息僅保證最后一條合并消息,中間的變更可能會被 merge 掉;改成 true 支持消費所有變更。

? 批(快照)讀仍然會合并所有的中間結果,不管 format 是否已存儲中間狀態。

? 開啟 changelog.enabled 參數后,中間的變更也只是 Best Effort: 異步的壓縮任務會將中間變更合并成 1 條,所以如果流讀消費不夠及時,被壓縮后只能讀到最后一條記錄。當然,通過調整壓縮的 buffer 時間可以預留一定的時間 buffer 給 reader,比如調整壓縮的兩個參數:

? ? compaction.delta_commits:5

? ? compaction.delta_seconds: 3600。

說明:

Changelog 模式開啟流讀的話,要在 sql-client 里面設置參數:

set sql-client.execution.result-mode=tableau;

或者

set sql-client.execution.result-mode=changelog;

否則中間結果在讀的時候會被直接合并。

2)流讀 changelog

僅在 0.10.0 支持,本 feature 為實驗性。

開啟 changelog 模式后,hudi 會保留一段時間的 changelog 供下游 consumer 消費,我們可以通過流讀 ODS 層 changelog 接上 ETL 邏輯寫入到 DWD 層,如下圖的 pipeline:

? 流讀的時候我們要注意 changelog 有可能會被 compaction 合并掉,中間記錄會消除,可能會影響計算結果,需要關注sql-client的屬性(result-mode)同上。

3)案例演示

(1)使用changelog

set sql-client.execution.result-mode=tableau; CREATE TABLE t6(id int,ts int,primary key (id) not enforced ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '4','changelog.enabled' = 'true' );insert into t6 values (1,1); insert into t6 values (1,2);set table.dynamic-table-options.enabled=true; select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

(2)不使用changelog

CREATE TABLE t6_v(id int,ts int,primary key (id) not enforced ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4' );select * from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/; select count(*) from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;

5.8.2 Append 模式

從 0.10 開始支持

對于 INSERT 模式:

? ? MOR 默認會 apply 小文件策略: 會追加寫 avro log 文件

? ? COW 每次直接寫新的 parquet 文件,沒有小文件策略

Hudi 支持豐富的 Clustering 策略,優化 INSERT 模式下的小文件問題:

1)Inline Clustering

只有 Copy On Write 表支持該模式

名稱Required默認值說明
write.insert.clusterfalsefalse是否在寫入時合并小文件,COW 表默認 insert 寫不合并小文件,開啟該參數后,每次寫入會優先合并之前的小文件(不會去重),吞吐會受影響

2) Async Clustering

? 從 0.12 開始支持

(1)WITH參數

名稱Required默認值說明
clustering.schedule.enabledfalsefalse是否在寫入時定時異步調度 clustering plan,默認關閉
clustering.delta_commitsfalse4調度 clsutering plan 的間隔 commits,clustering.schedule.enabled 為 true 時生效
clustering.async.enabledfalsefalse是否異步執行 clustering plan,默認關閉
clustering.tasksfalse4Clustering task 執行并發
clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 單文件目標大小,默認 1GB
clustering.plan.strategy.small.file.limitfalse600小于該大小的文件才會參與 clustering,默認600MB
clustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段
clustering.plan.partition.filter.modefalseNONE支持NONE:不做限制RECENT_DAYS:按時間(天)回溯SELECTED_PARTITIONS:指定固定的 partition
clustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默認 2 天

(2)Clustering Plan Strategy

? 支持定制化的 clustering 策略。

名稱Required默認值說明
clustering.plan.partition.filter.modefalseNONE支持· NONE:不做限制· RECENT_DAYS:按時間(天)回溯· SELECTED_PARTITIONS:指定固定的 partition
clustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默認 2 天
clustering.plan.strategy.cluster.begin.partitionfalseN/ASELECTED_PARTITIONS 生效,指定開始 partition(inclusive)
clustering.plan.strategy.cluster.end.partitionfalseN/ASELECTED_PARTITIONS 生效,指定結束 partition(incluseve)
clustering.plan.strategy.partition.regex.patternfalseN/A正則表達式過濾 partitions
clustering.plan.strategy.partition.selectedfalseN/A顯示指定目標 partitions,支持逗號 , 分割多個 partition

5.9 Bucket索引

? 默認的 flink 流式寫入使用 state 存儲索引信息:primary key 到 fileId 的映射關系。當數據量比較大的時候,state的存儲開銷可能成為瓶頸,bucket 索引通過固定的 hash 策略,將相同 key 的數據分配到同一個 fileGroup 中,避免了索引的存儲和查詢開銷。

名稱Required默認值說明
index.typefalseFLINK_STATE設置 BUCKET 開啟 Bucket 索引功能
hoodie.bucket.index.hash.fieldfalse主鍵可以設置成主鍵的子集
hoodie.bucket.index.num.bucketsfalse4默認每個 partition 的 bucket 數,當前設置后則不可再變更。
(1)bucket index 沒有 state 的存儲計算開銷,性能較好 (2)bucket index 無法擴 buckets,state index 則可以依據文件的大小動態擴容 (3)bucket index 不支持跨 partition 的變更(如果輸入是 cdc 流則沒有這個限制),state index 沒有限制

5.10 Hudi Catalog

? 從 0.12.0 開始支持,通過 catalog 可以管理 flink 創建的表,避免重復建表操作,另外 hms 模式的 catalog 支持自動補全 hive 同步參數。

-- DFS 模式 Catalog SQL樣例: CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默認路徑}','mode'='dfs' );-- Hms 模式 Catalog SQL 樣例: CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默認路徑}','hive.conf.dir' = '${hive-site.xml 所在的目錄}','mode'='hms' -- 支持 'dfs' 模式通過文件系統管理表屬性); 名稱Required默認值說明
catalog.pathtrue默認的 catalog 根路徑,用作表路徑的自動推導,默認的表路徑: c a t a l o g . p a t h / {catalog.path}/ catalog.path/{db_name}/${table_name}
default-databasefalsedefault默認的 database 名
hive.conf.dirfalsehive-site.xml 所在的目錄,只在 hms 模式下生效
modefalsedfs支持 hms模式通過 hive 管理元數據
table.externalfalsefalse是否創建外部表,只在 hms 模式下生效

案例如下:

--(1)創建sql-client初始化sql文件 vim /opt/apps/flink-1.13.6/conf/sql-client-init.sqlCREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '/tmp/hudi_catalog','mode'='dfs' );USE CATALOG hoodie_catalog; --(2)指定sql-client啟動時加載sql文件 hadoop fs -mkdir /tmp/hudi_catalogbin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session --(3)建庫建表插入 create database test; use test;create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20), primary key (uuid) not enforced ) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t2','table.type' = 'MERGE_ON_READ' );insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a'); --(4)退出sql-client,重新進入,表信息還在 use test; show tables; select * from t2;

5.11 離線壓縮

? MOR 表的 compaction 默認是自動打開的,策略是 5 個 commits 執行一次壓縮。 因為壓縮操作比較耗費內存,和寫流程放在同一個 pipeline,在數據量比較大的時候(10w+/s qps),容易干擾寫流程,此時采用離線定時任務的方式執行 compaction 任務更穩定。

5.11.1 設置參數

? compaction.async.enabled 為 false,關閉在線 compaction。

? compaction.schedule.enabled 仍然保持開啟,由寫任務階段性觸發壓縮 plan。

5.11.2 原理

一個 compaction 的任務的執行包括兩部分:

? schedule 壓縮 plan

該過程推薦由寫任務定時觸發,寫參數 compaction.schedule.enabled 默認開啟

? 執行對應的壓縮 plan

5.11.3 使用方式

1)執行命令

離線 compaction 需要手動執行 Java 程序,程序入口:

? hudi-flink1.13-bundle-0.12.0.jar

? org.apache.hudi.sink.compact.HoodieFlinkCompactor

# 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:9000/table

2)參數配置

參數名required默認值備注
–pathtrue目標表的路徑
–compaction-tasksfalse-1壓縮 task 的并發,默認是待壓縮 file group 的數量
–compaction-max-memoryfalse100 (單位 MB)壓縮時 log 數據的索引 map,默認 100MB,內存足夠可以開大些
–schedulefalsefalse是否要執行 schedule compaction 的操作,當寫流程還在持續寫入表數據的時候,開啟這個參數有丟失查詢數據的風險,所以開啟該參數一定要保證當前沒有任務往表里寫數據, 寫任務的 compaction plan 默認是一直 schedule 的,除非手動關閉(默認 5 個 commits 一次壓縮)
–seqfalseLIFO執行壓縮任務的順序,默認是從最新的壓縮 plan 開始執行,可選值:LIFO: 從最新的 plan 開始執行;FIFO: 從最老的 plan 開始執行
–servicefalsefalse是否開啟 service 模式,service 模式會打開常駐進程,一直監聽壓縮任務并提交到集群執行(從 0.11 開始執行)
–min-compaction-interval-secondsfalse600 (單位 秒)service 模式下的執行間隔,默認 10 分鐘

案例如下:

create table t7(id int,ts int,primary key (id) not enforced ) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t7','compaction.async.enabled' = 'false', -- 關閉自動壓縮'compaction.schedule.enabled' = 'true', -- 由寫任務階段性觸發壓縮 plan'table.type' = 'MERGE_ON_READ' );insert into t7 values(1,1); insert into t7 values(2,2); insert into t7 values(3,3); insert into t7 values(4,4); insert into t7 values(5,5);// 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t7

5.12 離線 Clustering

? 異步的 clustering 相對于 online 的 async clustering 資源隔離,從而更加穩定。

5.12.1 設置參數

? clustering.async.enabled 為 false,關閉在線 clustering。

? clustering.schedule.enabled 仍然保持開啟,由寫任務階段性觸發 clustering plan。

5.12.2 原理

一個 clustering 的任務的執行包括兩部分:

? schedule plan

推薦由寫任務定時觸發,寫參數 clustering.schedule.enabled 默認開啟。

? 執行對應的 plan

5.12.3 使用方式

1)執行命令

離線 clustering 需要手動執行 Java 程序,程序入口:

? hudi-flink1.13-bundle-0.12.0.jar

? org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob

注意:必須是分區表,否則報錯空指針異常。

# 命令行的方式./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/table

2)參數配置

參數名required默認值備注
–pathtrue目標表的路徑。
–clustering-tasksfalse-1Clustering task 的并發,默認是待壓縮 file group 的數量。
–schedulefalsefalse是否要執行 schedule clustering plan 的操作,當寫流程還在持續寫入表數據的時候,開啟這個參數有丟失查詢數據的風險,所以開啟該參數一定要保證當前沒有任務往表里寫數據, 寫任務的 clustering plan 默認是一直 schedule 的,除非手動關閉(默認 4 個 commits 一次 clustering)。
–seqfalseFIFO執行壓縮任務的順序,默認是從最老的 clustering plan 開始執行,可選值:LIFO: 從最新的 plan 開始執行;FIFO: 從最老的 plan 開始執行
–target-file-max-bytesfalse1024 * 1024 * 1024最大目標文件,默認 1GB。
–small-file-limitfalse600小于該大小的文件會參與 clustering,默認 600MB。
–sort-columnsfalseN/AClustering 可選排序列。
–servicefalsefalse是否開啟 service 模式,service 模式會打開常駐進程,一直監聽壓縮任務并提交到集群執行(從 0.11 開始執行)。
–min-compaction-interval-secondsfalse600 (單位 秒)service 模式下的執行間隔,默認 10 分鐘。

3)案例演示

create table t8(id int,age int,ts int,primary key (id) not enforced ) partitioned by (age) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t8','clustering.async.enabled' = 'false','clustering.schedule.enabled' = 'true','table.type' = 'COPY_ON_WRITE' );insert into t8 values(1,18,1); insert into t8 values(2,18,2); insert into t8 values(3,18,3); insert into t8 values(4,18,4); insert into t8 values(5,18,5);-- 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t8

5.12.4 常見問題

# 存儲一直看不到數據如果是 streaming 寫,請確保開啟 checkpoint,Flink 的 writer 有 3 種刷數據到磁盤的策略: ?當某個 bucket 在內存積攢到一定大小 (可配,默認 64MB) ?當總的 buffer 大小積攢到一定大小(可配,默認 1GB) ?當 checkpoint 觸發,將內存里的數據全部 flush 出去# 數據有重復如果是 COW 寫,需要開啟參數 write.insert.drop.duplicates,COW 寫每個 bucket 的第一個文件默認是不去重的,只有增量的數據會去重,全局去重需要開啟該參數;MOR 寫不需要開啟任何參數,定義好 primary key 后默認全局去重。(注意:從 0.10 版本開始,該屬性改名 write.precombine 并且默認為 true。)如果需要多 partition 去重,需要開啟參數: index.global.enabled 為 true。(注意:從 0.10 版本開始,該屬性默認true。)索引 index 是判斷數據重復的核心數據結構,index.state.ttl 設置了索引保存的時間,默認為 1.5 天,對于長時間周期的更新,比如更新一個月前的數據,需要將 index.state.ttl 調大(單位天),設置小于 0 代表永久保存。(注意:從 0.10 版本開始,該屬性默認為 0。)# Merge On Read 寫只有 log 文件Merge On Read 默認開啟了異步的 compaction,策略是 5 個 commits 壓縮一次,當條件滿足參會觸發壓縮任務,另外,壓縮本身因為耗費資源,所以不一定能跟上寫入效率,可能會有滯后。

5.13 Hudi核心原理

5.13.1 Hudi數據去重原理

Hoodie 的數據去重分兩步:

(1)寫入前攢 buffer 階段去重,核心接口HoodieRecordPayload#preCombine

(2)寫入過程中去重,核心接口HoodieRecordPayload#combineAndGetUpdateValue。

1)消息版本新舊

? 相同 record key (主鍵)的數據通過write.precombine.field指定的字段來判斷哪個更新,即 precombine 字段更大的 record 更新,如果是相等的 precombine 字段,則后來的數據更新。

? 從 0.10 版本開始,write.precombine.field 字段為可選,如果沒有指定,會看 schema 中是否有 ts 字段,如果有,ts 字段被選為 precombine 字段;如果沒有指定,schema 中也沒有 ts 字段,則為處理順序:后來的消息默認較新。

2)攢消息階段的去重

? Hoodie 將 buffer 消息發給 write handle 之前可以執行一次去重操作,通過HoodieRecordPayload#preCombine 接口,保留 precombine 字段較大的消息,此操作為純內存的計算,在同一個 write task 中為單并發執行。

? 注意:write.precombine 選項控制了攢消息的去重。

3)寫 parquet 增量消息的去重

? 在Hoodie 寫入流程中,Hoodie 每寫一個 parquet 都會有 base + 增量 merge 的過程,增量的消息會先放到一個 spillable map 的數據結構構建內存 index,這里的增量數據如果沒有提前去重,那么同 key 的后來消息會直接覆蓋先來的消息。

? Writer 接著掃 base 文件,過程中會不斷查看內存 index 是否有同 key 的新消息,如果有,會走 HoodieRecordPayload#combineAndGetUpdateValue 接口判斷保留哪個消息。

? 注意: MOR 表的 compaction 階段和 COW 表的寫入流程都會有 parquet 增量消息去重的邏輯。

4)跨 partition 的消息去重

? 默認情況下,不同的 partition 的消息是不去重的,即相同的 key 消息,如果新消息換了 partition,那么老的 partiiton 消息仍然保留。

? 開啟 index.global.enabled 選項開啟跨 partition 去重,原理是先往老的 partiton 發一條刪除消息,再寫新 partition。

5.13.2 Hudi表寫入原理

數據寫入、數據壓縮與數據清理

1)數據寫入分析 (1)基礎數據封裝:將數據流中flink的RowData封裝成Hoodie實體; (2)BucketAssigner:桶分配器,主要是給數據分配寫入的文件地址:若為插入操作,則取大小最小的FileGroup對應的FileId文件內進行插入;在此文件的后續寫入中文件 ID 保持不變,并且提交時間會更新以顯示最新版本。這也意味著記錄的任何特定版本,給定其分區路徑,都可以使用文件 ID 和 instantTime進行唯一定位;若為更新操作,則直接在當前location進行數據更新; (3)Hoodie Stream Writer: 數據寫入,將數據緩存起來,在超過設置的最大flushSize或是做checkpoint時進行刷新到文件中; (4)Oprator Coordinator:主要與Hoodie Stream Writer進行交互,處理checkpoint等事件,在做checkpoint時,提交instant到timeLine上,并生成下一個instant的時間,算法為取當前最新的commi時間,比對當前時間與commit時間,若當前時間大于commit時間,則返回,否則一直循環等待生成。2)數據壓縮壓縮(compaction)用于在 MergeOnRead存儲類型時將基于行的log日志文件轉化為parquet列式數據文件,用于加快記錄的查找。compaction首先會遍歷各分區下最新的parquet數據文件和其對應的log日志文件進行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance:具體策略分為4種,具體見官網說明: compaction.trigger.strategy: Strategy to trigger compaction, options are 1.'num_commits': trigger compaction when reach N delta commits; 2.'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 3.'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 4.'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits' Default Value: num_commits (Optional)在項目實踐中需要注意參數'read.streaming.skip_compaction' 參數的配置,其表示在流式讀取該表是否跳過壓縮后的數據,若該表用于后續聚合操作表的輸入表,則需要配置值為true,表示聚合操作表不再消費讀取壓縮數據。若不配置或配置為false,則該表中的數據在未被壓縮之前被聚合操作表讀取了一次,在壓縮后數據又被讀取一次,會導致聚合表的sum、count等算子結果出現雙倍情況。3)數據清理隨著用戶向表中寫入更多數據,對于每次更新,Hudi會生成一個新版本的數據文件用于保存更新后的記錄(COPY_ON_WRITE)或將這些增量更新寫入日志文件以避免重寫更新版本的數據文件(MERGE_ON_READ)。在這種情況下,根據更新頻率,文件版本數可能會無限增長,但如果不需要保留無限的歷史記錄,則必須有一個流程(服務)來回收舊版本的數據,這就是 Hudi 的清理服務。具體清理策略可參考官網,一般使用的清理策略為:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 個文件版本而不受時間限制的效果。會刪除N之外的FileSlice。

5.13.3 Hudi表讀取原理

(1)開啟split_monitor算子,每隔N秒(可配置)監聽TimeLine上變化,并將變更的Instance封裝為FileSlice。

(2)分發log文件時候,按照fileId值進行keyBy,保證同一file group下數據文件都給一個Task進行處理,從而保證數據處理的有序性。

(3)split_reader根據FileSlice信息進行數據讀取。

總結

以上是生活随笔為你收集整理的数据湖架构Hudi(五)Hudi集成Flink案例详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

国产精品一区二区中文字幕 | 久久精品免视看 | av性在线| 国产成人免费在线 | 在线观看免费福利 | 久久免视频 | 国色天香永久免费 | 国产一区视频导航 | wwwwww黄| 超碰九九 | 日本久久综合网 | 91精品无人成人www | 91日韩精品| 久久久久久久久久影视 | 久久精品资源 | 69国产盗摄一区二区三区五区 | 国产高清视频免费观看 | 国产精品嫩草在线 | 九九免费观看视频 | 人人舔人人 | 久久久久久久99精品免费观看 | 国产不卡一区二区视频 | 91人人澡人人爽人人精品 | 国产福利91精品一区二区三区 | 黄色网在线免费观看 | 色九九影院 | 日韩毛片在线免费观看 | 国产第页 | 97手机电影网 | 婷婷亚洲激情 | 国产精品入口传媒 | 婷婷色在线 | av三区在线 | 亚洲九九九在线观看 | 摸bbb搡bbb搡bbbb | 婷婷丁香国产 | 亚洲无线视频 | 婷婷深爱五月 | 久久免费精品视频 | 91中文视频 | 精品你懂的 | 国产又黄又猛又粗 | 免费在线日韩 | 日本黄色片一区二区 | 丁香五婷 | 久草综合视频 | 久久99国产综合精品免费 | 久久综合久久久 | 国产午夜一区二区 | 高清av中文字幕 | 丝袜护士aⅴ在线白丝护士 天天综合精品 | 99精品视频播放 | 91九色精品女同系列 | 成人av中文字幕在线观看 | 国产精品久久久一区二区三区网站 | 热久久最新地址 | 色综合天天狠狠 | 13日本xxxxxⅹxxx20 | 久久少妇 | 日本三级久久久 | 色婷婷综合五月 | 久草精品视频在线观看 | 日韩 在线a| 久久成人欧美 | 精品国产aⅴ一区二区三区 在线直播av | 天天曰天天干 | 国产黄色片久久久 | 欧美日韩1区2区 | 国产九九热视频 | 黄色电影小说 | 欧美成人h版在线观看 | 国产精品99蜜臀久久不卡二区 | av在线免费不卡 | 久草视频中文在线 | 久久久久二区 | 99久久久国产免费 | 国产首页 | 精品电影一区 | 中文字幕123区 | 色99色| 奇米网网址 | 99精品视频播放 | 超碰97成人 | 中文字幕日本在线观看 | 日韩精品中文字幕在线 | 国内成人精品视频 | 国产成人av一区二区三区在线观看 | 又黄又爽又无遮挡免费的网站 | 中文国产成人精品久久一 | 亚洲午夜剧场 | 亚洲欧美日韩不卡 | 91人网站 | 久久成人福利 | 18+视频网站链接 | 亚洲在线黄色 | 欧美日韩在线观看一区二区三区 | av资源免费看 | 久久久久久久久毛片 | 免费国产一区二区 | 人人插人人干 | 日韩不卡高清 | 久久一区二区三区超碰国产精品 | 中文免费观看 | 久久影视网 | 午夜av一区 | 国产精品国产亚洲精品看不卡 | 最近免费中文视频 | 亚洲精品视频在线观看网站 | 视频国产精品 | 美女视频黄免费网站 | 97精品在线视频 | 亚洲自拍偷拍色图 | 成年人视频在线免费观看 | 日本字幕网 | 午夜电影一区 | 成人黄色小说视频 | 日产乱码一二三区别免费 | 久久久久久久免费观看 | 久热免费 | 国产成人一区二区三区久久精品 | 亚洲成年人在线播放 | 亚洲天堂网站视频 | 999精品视频 | 九九99视频 | 啪一啪在线 | 综合伊人av | 午夜男人影院 | www.久久久精品 | 久久人人爽人人爽人人片 | 91av蜜桃| 91在线操| 五月天亚洲激情 | 99久久国产免费看 | 日韩欧美精品在线观看视频 | 日韩在线视频观看 | 国产精品不卡一区 | 人人澡人人添人人爽一区二区 | 欧美精品久久99 | 亚洲一二区视频 | 日韩在线观看a | 免费精品在线视频 | 久草在线资源视频 | 久久最新网址 | 中文在线亚洲 | 欧美在线视频一区二区三区 | 日韩国产精品久久久久久亚洲 | 99精品国产免费久久久久久下载 | 在线观看片| 91精品啪在线观看国产线免费 | 成人影视免费看 | 日韩三区在线 | 高清av免费观看 | 六月丁香久久 | 欧美日韩精品影院 | 久久九九久久九九 | 亚洲永久国产精品 | 九九精品无码 | 成人av网址大全 | 日日插日日干 | 日韩电影在线观看一区二区三区 | 91麻豆产精品久久久久久 | 免费能看的av | 中日韩欧美精彩视频 | 国产91精品一区二区 | 中文字幕在线观看日本 | 麻豆精品视频 | 欧美日韩在线观看一区二区三区 | 久久精品福利 | 99中文在线| 四虎在线免费视频 | 欧美成人精品三级在线观看播放 | 国产精品欧美激情在线观看 | 夜夜夜夜爽| 婷婷开心久久网 | 在线视频99 | 成年人三级网站 | 亚洲伦理一区 | 国产一区二区在线免费 | 91九色精品 | 久久久精品 一区二区三区 国产99视频在线观看 | 午夜av不卡 | www色| a在线观看视频 | 国产精品午夜在线 | 日韩欧美高清一区二区 | 亚洲黄色av网址 | 99精彩视频 | 欧美最新另类人妖 | 亚洲高清精品在线 | 97av影院| 国产麻豆精品免费视频 | 午夜精品一区二区三区四区 | 97成人在线 | 超碰免费成人 | 色婷婷综合久久久中文字幕 | 精品美女在线视频 | 操操日| 国产免费作爱视频 | 天天色宗合 | 亚洲 欧洲av | 免费毛片一区二区三区久久久 | 成人三级网址 | 最近日韩中文字幕中文 | 又黄又刺激又爽的视频 | 日韩视频中文字幕在线观看 | 狠狠干.com| 狠狠狠操 | 手机av在线不卡 | 欧美性生活免费看 | 久草网免费 | 99精品视频免费观看 | 中文区中文字幕免费看 | 超碰日韩 | 在线直播av | 久久av影院 | 天天色草| 美女在线观看网站 | 91精品视频在线观看免费 | 成人精品影视 | 日韩一二区在线 | 亚洲国产日韩欧美在线 | 婷婷开心久久网 | 国产一在线精品一区在线观看 | 国产美腿白丝袜足在线av | 欧美激情视频一区二区三区 | 亚洲激情视频在线观看 | 国产精品专区h在线观看 | 开心丁香婷婷深爱五月 | 深爱激情开心 | 亚洲精品午夜一区人人爽 | 国产精品第十页 | 亚洲jizzjizz日本少妇 | 中文字幕乱视频 | 在线亚洲成人 | 美女视频黄免费的久久 | 国产精品成人av在线 | 久久亚洲人 | 色偷偷88888欧美精品久久久 | 九色最新网址 | 欧美 激情 国产 91 在线 | 国产免费观看高清完整版 | 成人精品福利 | 国产在线观看一 | 99视频偷窥在线精品国自产拍 | 91大神在线看 | 毛片网在线观看 | 欧美影片 | 在线a人片免费观看视频 | 夜夜躁狠狠躁日日躁视频黑人 | 久久人人爽人人爽人人片av免费 | 国产精品一区二区久久久 | 亚洲成av人片在线观看香蕉 | 日批网站免费观看 | 国产一区二区三区高清播放 | 91麻豆国产 | 欧美日韩视频精品 | 国产精品久久久久影院 | 狠狠干.com| 亚洲mv大片欧洲mv大片免费 | av先锋影音少妇 | 日日干夜夜操视频 | 日韩免| 人人草网站| 99精品国产免费久久 | 999久久国产精品免费观看网站 | 欧美国产日韩在线观看 | 国产香蕉久久精品综合网 | av超碰在线 | 最新av在线网址 | 999久久久 | 一级c片| 国产欧美日韩精品一区二区免费 | 国产丝袜美腿在线 | 久久久国产精品网站 | 日韩久久精品一区二区三区下载 | 欧美一区二区在线 | 国产流白浆高潮在线观看 | 狠狠久久 | 中文字幕在线看 | 亚洲欧美日韩在线看 | 中文字幕一区二区三区四区 | 中文字幕免费中文 | 国产黄在线| 亚洲黄色av网址 | 日韩欧美在线视频一区二区三区 | 欧美日韩国产一二三区 | jizzjizzjizz亚洲 | av综合 日韩| 五月天中文在线 | 在线三级av | 少妇高潮流白浆在线观看 | 91日本在线播放 | av在线com | 国产在线看一区 | 天天干天天摸 | 伊人久在线 | 亚洲成人高清在线 | 国产精品久久久久久久婷婷 | 精品国产一区二区三区不卡 | 又黄又爽的视频在线观看网站 | 人人插人人做 | 超级碰碰碰视频 | 91香蕉视频 | av在线网站观看 | 手机成人在线电影 | 日韩激情在线视频 | 在线免费观看黄色小说 | 国产成人一二三 | 久久久久久国产精品999 | 精品国产一区二区三区久久久蜜臀 | 欧美一区二区免费在线观看 | 高清av影院| av片在线观看 | 一区二区电影网 | 欧美a免费 | 欧美精品一区二区在线观看 | 91av视频免费观看 | 国产在线精品一区二区不卡了 | 98久久| 久久国产精品电影 | 日本久久精品 | 国产精品久久久久久影院 | 久久久午夜精品理论片中文字幕 | 深夜免费小视频 | 国产精品一区在线观看 | 国产成人精品aaa | 日韩欧美在线免费 | 精品国产一区二区三区蜜臀 | 日日草天天草 | 中文字幕在线观看第一区 | 99国产精品一区 | 午夜精品久久久久久久99 | 欧美日本高清视频 | 在线国产91 | 久久成人高清 | 国产综合91 | 欧洲一区二区三区精品 | 欧美激情在线看 | 五月综合激情网 | 亚洲综合少妇 | 免费看网站在线 | 国产一区免费看 | 激情小说 五月 | 亚洲欧洲xxxx | 国产福利在线不卡 | 高清国产在线一区 | avcom在线 | 成人免费观看av | 亚洲精品在线视频播放 | 丁香六月婷婷激情 | 爱爱av网 | 国产精品99久久久久久有的能看 | 久久国产经典视频 | 色婷婷久久久综合中文字幕 | 日日夜夜噜噜噜 | 九九久久久久久久久激情 | 九九热在线视频免费观看 | 亚洲精品在线观看网站 | 激情黄色av | 91在线视频免费播放 | 久草在线免费资源站 | 成人av午夜 | 国产精品区一区 | 91免费黄视频 | 日韩欧美在线视频一区二区 | 91mv.cool在线观看 | 天天鲁一鲁摸一摸爽一爽 | 成人a视频片观看免费 | 超碰国产在线观看 | 国产一级二级三级视频 | 日韩av高清在线观看 | 日本婷婷色 | 免费av大全 | 国产精品99久久免费黑人 | 婷婷久久综合九色综合 | 天天躁日日躁狠狠躁av麻豆 | 久久久久久欧美二区电影网 | 久久国产电影 | 精品1区2区 | 天天操综合网站 | 成人午夜精品福利免费 | 亚洲小视频在线观看 | 日韩在线色视频 | 国产99色| 久久精品影视 | 丁香五月网久久综合 | 欧美日韩国产区 | 最新日韩在线观看视频 | 成 人 黄 色 片 在线播放 | 日本大片免费观看在线 | 天堂在线v| 五月婷婷在线视频观看 | 亚洲精品小视频在线观看 | 国产精品一区二区av影院萌芽 | 日韩精品一区电影 | 91精品入口 | 精品国产_亚洲人成在线 | 成年人在线观看免费视频 | av黄色免费看 | 婷婷丁香色综合狠狠色 | 亚洲 欧美 综合 在线 精品 | 九九热精| 波多野结衣电影一区 | 国产色在线 | 五月天中文在线 | 免费在线观看国产黄 | 天天搞夜夜骑 | 国产精品s色 | 亚洲视频久久久久 | 日韩美在线 | 公开超碰在线 | 在线观看成人福利 | 美女中文字幕 | 中字幕视频在线永久在线观看免费 | 91在线免费视频观看 | 人人狠狠综合久久亚洲 | 狠狠躁夜夜躁人人爽视频 | 91精品日韩 | 特级黄录像视频 | 国产人成一区二区三区影院 | 亚洲黄色在线观看 | 超碰人人舔| 91精品视频免费看 | 久久久精品成人 | 天天射天天射天天射 | 亚洲区色 | 成人av高清在线 | av片子在线观看 | 国产系列在线观看 | 天天操夜操 | 天堂久色| 色综合 久久精品 | 国内揄拍国产精品 | 中文字幕乱在线伦视频中文字幕乱码在线 | 99久久精品视频免费 | 中文字幕有码在线观看 | 日本中文在线观看 | 在线成人一区 | aaa亚洲精品一二三区 | 免费日韩 | 日本中文字幕在线视频 | 亚洲精品小视频 | 日韩免费一区二区在线观看 | 国产爽视频 | 在线观看中文字幕亚洲 | 久久久国产影视 | 久久久久免费 | 最新成人在线 | 黄色不卡av | www免费看 | 日韩3区 | 精品亚洲男同gayvideo网站 | 在线免费观看黄网站 | 国产精品久久久久久欧美 | 日韩欧美高清视频在线观看 | 丝袜美腿av | 国产视频在线一区二区 | 日韩69av | 99色精品视频 | 热99在线 | 国产一区视频在线观看免费 | 日韩免| 一级黄毛片 | 日韩va欧美va亚洲va久久 | 中文永久免费观看 | 欧美日韩国产精品一区二区三区 | 免费在线观看av不卡 | www.av在线.com| www.色com| 久草网在线观看 | 正在播放国产一区二区 | 久久久久国产精品一区二区 | 狠狠操影视| 五月婷婷在线视频观看 | 2021国产视频 | 日韩大陆欧美高清视频区 | 色婷婷亚洲综合 | 久久这里只有精品23 | 国产又粗又猛又爽又黄的视频免费 | 在线你懂| 人人插人人费 | 精品在线不卡 | 激情综合一区 | 欧美激情精品久久久久 | 亚洲区另类春色综合小说校园片 | 成人国产精品av | 99久久www| 一本色道久久综合亚洲二区三区 | 日本在线观看黄色 | 亚洲aⅴ在线观看 | 麻豆视频免费入口 | 精品网站999www | 国产特级毛片aaaaaaa高清 | 欧美资源 | 国产免费又爽又刺激在线观看 | 成人免费在线播放视频 | 在线观看av免费观看 | 99热这里只有精品免费 | 91在线资源| 超级碰碰碰碰 | 色婷婷狠狠操 | 久久a级片 | 91成人亚洲 | 久久夜av | 日韩肉感妇bbwbbwbbw | 久久99九九99精品 | 久久久91精品国产一区二区精品 | 亚洲黄色片 | 亚洲精品字幕在线 | 亚洲天天做 | 日韩视频免费 | 亚洲综合视频在线观看 | 亚洲国产精品免费 | 国产精品a久久久久 | 国产日韩视频在线观看 | 不卡精品视频 | 免费h精品视频在线播放 | 国内精品视频一区二区三区八戒 | 欧美在线观看视频免费 | 久久超碰在线 | 亚洲九九爱| 久久国产剧场电影 | 在线观看一区视频 | 啪啪动态视频 | 免费午夜视频在线观看 | 久久午夜鲁丝片 | 精品99999| 亚洲人视频在线 | 高清不卡一区二区在线 | 亚洲国产资源 | 在线看污网站 | 黄色av电影免费观看 | 日日夜夜av| 精品一区二区久久久久久久网站 | 午夜色站 | 亚洲黄色免费在线看 | av高清网站在线观看 | 日韩av一区二区在线 | www.操.com | 成人a毛片 | 婷婷色影院| 天天玩天天操天天射 | 蜜臀av性久久久久蜜臀aⅴ涩爱 | 免费黄在线观看 | 一本一本久久a久久精品综合 | 中文字幕美女免费在线 | 最近日韩中文字幕中文 | 亚洲女人天堂成人av在线 | 免费福利视频网 | 久久综合精品国产一区二区三区 | 欧美贵妇性狂欢 | 日韩中文免费视频 | 久草在线精品观看 | 91看片淫黄大片一级在线观看 | 成人av免费 | 91大神视频网站 | 一本一本久久a久久精品牛牛影视 | 成人高清av在线 | 久久久www成人免费精品 | 人人爽人人av | 超碰成人网 | 天天色天天操天天爽 | 亚洲精品久久久久中文字幕二区 | av一级片在线观看 | 高清不卡毛片 | 91在线精品播放 | 精品免费久久 | 国模精品在线 | 一本一道波多野毛片中文在线 | 成人av网站在线 | japanese黑人亚洲人4k | 久草在线免费新视频 | 亚洲在线视频免费观看 | 在线免费观看视频a | 91大神一区二区三区 | 亚洲午夜精品久久久 | 中文字幕一区二区三 | 中文字幕亚洲五码 | 国产精品午夜久久久久久99热 | 九九欧美| 天天操天天干天天插 | 黄在线免费看 | 在线免费观看黄色大片 | 欧美日韩视频 | 国产成人精品一区二区在线观看 | 日韩免费视频一区二区 | 欧美日韩调教 | 超碰大片 | 日韩欧美国产精品 | 亚洲午夜久久久久久久久久久 | 国产精品一区在线观看你懂的 | 精品国产伦一区二区三区 | 免费av福利 | 亚洲91中文字幕无线码三区 | 91免费黄视频 | 色综合久久悠悠 | 婷婷色亚洲 | 亚洲精品视频免费在线观看 | 在线观看一级 | 国产一二区视频 | 天天天色综合 | 99久久这里有精品 | 国产一区视频在线播放 | 九九久久精品 | 亚洲高清av在线 | 在线91网| 夜夜躁狠狠躁 | 狠狠干综合 | 欧美乱熟臀69xxxxxx | 成人性生活大片 | 欧美日韩视频在线观看免费 | 亚洲日韩中文字幕 | 婷婷狠狠操 | 久久在线 | 日日夜夜天天操 | 一区二区av| 天天操夜夜曰 | a亚洲视频 | 成年人在线看片 | 手机在线永久免费观看av片 | 丁香花五月 | 91大神在线观看视频 | 国产精品久久久久久久7电影 | 久久免费av电影 | 久久国产精品免费一区二区三区 | 亚洲国产日韩在线 | 最新精品视频在线 | 亚洲成人黄色av | 97操碰 | 三级av在线免费观看 | 亚洲免费在线看 | 美女免费视频观看网站 | 日韩黄色在线观看 | 免费a网址 | 黄污视频网站 | 97在线影院| 91网免费看 | 国产中文在线视频 | 国产不卡在线观看视频 | 亚洲精品在 | 国产高清区 | 安徽妇搡bbbb搡bbbb | 99视频网站| 91精品夜夜 | 免费黄色特级片 | 美女黄频视频大全 | 国语精品免费视频 | 久色婷婷| 在线观看视频 | 在线色吧 | 伊人影院99 | 香蕉久久久久 | 精品一区二区视频 | 国产黄色精品在线 | 在线亚洲激情 | 久久综合婷婷综合 | 久久区二区 | 激情丁香综合 | www日韩| 天天干夜夜 | 一区二区 久久 | 久久免费中文视频 | 911精品美国片911久久久 | 九色琪琪久久综合网天天 | 国产乱码精品一区二区蜜臀 | 午夜精品久久久久久久99无限制 | 成人性生交大片免费观看网站 | 国产成人精品一区二区三区福利 | 亚洲欧美在线视频免费 | www.夜色321.com | 免费在线观看一区 | 五月婷婷开心 | 亚洲欧洲成人精品av97 | av资源免费在线观看 | 一区二区三区免费在线观看视频 | 99久久久国产精品免费观看 | 欧美色久| 欧美激情精品久久久久久 | 天天干天天射天天爽 | 国内成人精品2018免费看 | 免费在线黄网 | av网站在线观看免费 | 亚洲国产欧美在线人成大黄瓜 | 欧美性色综合 | 激情影音 | 91精品视频在线 | 国产99久久久欧美黑人 | 国产精品久久久久久久久久久久冷 | 日韩在线一二三区 | 日韩二级毛片 | 欧美一级电影免费观看 | 粉嫩av一区二区三区入口 | 又黄又刺激的网站 | 男女视频国产 | 在线观看91 | 久久国产网站 | 久久亚洲私人国产精品 | 国产精品视频久久久 | 毛片随便看 | 亚洲精品麻豆 | 国产剧情在线一区 | 美女在线黄 | 欧美久久影院 | 亚洲va欧美va人人爽 | 国产在线色站 | 成人动漫精品一区二区 | 久久免费视频5 | 免费观看一区二区三区视频 | 精品夜夜嗨av一区二区三区 | 久久精品伊人 | 怡红院成人在线 | 中文字幕中文字幕 | www黄色大片 | 久久午夜网| 国语自产偷拍精品视频偷 | 婷婷亚洲激情 | 国产在线精 | 日韩视频一 | 黄色国产高清 | 色综合色综合久久综合频道88 | 欧美日韩一区二区三区在线免费观看 | 免费情趣视频 | 97国产大学生情侣白嫩酒店 | 美女视频网站久久 | 天天曰夜夜操 | 麻豆传媒在线视频 | 伊人电影天堂 | 99精品视频免费全部在线 | 中日韩在线视频 | 欧美精品中文在线免费观看 | 五月婷婷欧美视频 | 人人爱人人添 | 久在线 | 精品视频123区在线观看 | 亚洲高清在线精品 | 91精品国产一区二区三区 | 91中文在线视频 | 中字幕视频在线永久在线观看免费 | 婷婷亚洲五月 | 日本精品一区二区三区在线观看 | 黄色精品久久 | 伊人五月天av | 一区二区三区高清不卡 | 中文字幕一区二区三区四区 | 久久久私人影院 | 日韩av中文字幕在线 | 999国产| 国产精品剧情在线亚洲 | 亚洲精品国产精品乱码不99热 | 天天干天天射天天爽 | 日韩精品视频网站 | 91看片一区二区三区 | 久久激情综合网 | 色橹橹欧美在线观看视频高清 | 国产一级淫片在线观看 | 国产精品原创av片国产免费 | 日韩欧美亚州 | 日韩久久久久久久久久久久 | 日本精品在线看 | 色婷婷电影 | av丝袜在线 | 亚洲综合成人婷婷小说 | 91精品999 | 在线免费观看黄网站 | 五月开心六月婷婷 | 久久国产美女视频 | www.香蕉视频在线观看 | 日本性xxxxx 亚洲精品午夜久久久 | 国产视频日韩视频欧美视频 | 久久久久国产精品一区二区 | 亚洲综合在线播放 | 欧美日韩视频免费看 | 91porny九色91啦中文 | 中文在线资源 | 热久久最新地址 | 国产不卡片 | 久久久久国产精品免费网站 | 九九有精品| 日本在线精品视频 | 国产明星视频三级a三级点| 69精品在线| 91精品国产综合久久久久久久 | 久久成视频 | 国产99一区二区 | av成人在线网站 | 欧美日韩国产页 | 欧美国产日韩一区二区三区 | 久久九九国产视频 | 久久av网 | 国产高清绿奴videos | 久久久久久久久久网 | 美女视频黄是免费的 | www久久精品 | 美女视频黄在线观看 | 国产精品自产拍在线观看网站 | 久久亚洲人 | 亚洲欧美视频在线观看 | 日日躁夜夜躁aaaaxxxx | 美女在线观看网站 | 911久久香蕉国产线看观看 | 日韩中文在线电影 | 久久不卡av | 国产精品久久久久aaaa九色 | 9999精品| 色鬼综合网 | 99久久精品免费看国产免费软件 | 久在线观看视频 | 久久久久久精 | 久久成人资源 | 日韩欧美99| 成人高清在线观看 | 日韩肉感妇bbwbbwbbw | 亚洲精品日韩一区二区电影 | 天堂网av 在线| 国产精品久久久久久五月尺 | 在线观看91精品视频 | 中文字幕91视频 | 99久久国产免费免费 | 啪啪激情网| 欧美 日韩精品 | 毛片精品免费在线观看 | 国产精品美女免费视频 | 午夜在线国产 | 日本午夜在线观看 | 国产精品美女www爽爽爽视频 | 天天做天天爱天天爽综合网 | 日日干综合| 深夜福利视频一区二区 | 亚洲成人午夜av | 91av在线播放视频 | 天天草天天爽 | 91在线观| 精品国产免费一区二区三区五区 | 色综合久久久久综合体桃花网 | 欧美疯狂性受xxxxx另类 | 亚洲精品久久久久中文字幕m男 | 中文字幕免费国产精品 | 久草在线观看视频免费 | 久久激情五月婷婷 | 超碰国产在线播放 | 一区二区三区电影大全 | 视频成人 | 天天色中文 | 激情网色 | 久久久91精品国产一区二区三区 | 日韩亚洲欧美中文字幕 | 亚洲成a人片在线观看中文 中文字幕在线视频第一页 狠狠色丁香婷婷综合 | 国产黄色精品网站 | 亚洲午夜精品电影 | 亚洲在线精品视频 | 国产偷国产偷亚洲清高 | 91精品国产一区 | 亚洲成人av一区 | 欧美日bb| 国外av在线 | 这里有精品在线视频 | 国产高清视频在线免费观看 | av网站地址| 国产精品www | 亚洲天堂视频在线 | zzijzzij日本成熟少妇 | 99国产免费网址 | 国产又粗又长的视频 | 日韩精品一区二区三区高清免费 | 色综合色综合久久综合频道88 | 国产精品毛片完整版 | www.色国产 | 国产福利a | www免费在线观看 | 国产黄av | 日韩大片免费观看 | 国产精品一区二区美女视频免费看 | 亚洲黄色小说网址 | 国产视频在线观看一区 | 中文字幕丝袜美腿 | 91精品91| 国产精品久久久久四虎 | 999超碰 | 日韩综合精品 | 国产午夜三级一区二区三桃花影视 | 日韩丝袜 | 欧美夫妻生活视频 | 一区二区三区免费网站 | 午夜视频一区二区三区 | 色干干 | 欧美乱淫视频 | 亚洲精品在线观看视频 | 色综合天天色综合 | 99爱视频 | 欧美午夜寂寞影院 | 超碰公开在线 | 激情综合六月 | 丁香国产视频 | 夜色在线资源 | 中文资源在线观看 | 午夜精品久久久久久久99水蜜桃 | 天天干天天干天天干天天干天天干天天干 | 亚洲精品国偷拍自产在线观看蜜桃 | 天天综合区 | 欧美专区国产专区 | 日韩av中文 | 91av大全 | 色狠狠婷婷| 国产成人三级在线观看 | 97视频人人免费看 | 久久艹国产 | 久久不射电影院 | 成人免费视频网址 | 伊人伊成久久人综合网小说 | 青草视频在线 | 免费一级片久久 | 精品国产精品一区二区夜夜嗨 | 中文字幕日韩无 | 成人黄在线观看 | 99精品久久精品一区二区 | 亚洲免费公开视频 | 正在播放 久久 | 91网在线观看 | 黄污网站在线 | 在线国产片| 精品福利国产 | 97精品国产97久久久久久免费 | 91在线视频网址 | www.午夜视频| 91看片网址 | 欧美黄在线 | 欧美日韩有码 | 日日日操操 | 一区二区三区日韩视频在线观看 | 2023天天干 | www.天天操.com| 国产视频精品视频 | 97精品国产97久久久久久 | a视频在线 | 天天干天天做 | 欧美黄污视频 | 国产日韩中文字幕在线 | www.黄色网.com| 天天摸天天操天天爽 | 日韩久久精品一区二区 | 日韩伦理一区二区三区av在线 | 久久国产手机看片 | 91资源在线免费观看 | 天天色视频 | 久碰视频在线观看 | 欧美日韩国产二区三区 | 色婷婷狠狠五月综合天色拍 | 久久人人爽av | 五月天六月丁香 | 伊人天堂久久 | 一区二区三区四区精品 | 欧美日韩xxxxx | 欧美精品久久久久久久亚洲调教 | 欧美成人高清 | 蜜臀av性久久久久蜜臀av | 综合网欧美 | 亚洲国产精品成人综合 | 欧美日韩一级久久久久久免费看 | 国产剧情一区二区在线观看 | 一区二区三区在线视频111 | 911久久香蕉国产线看观看 | 国产午夜麻豆影院在线观看 | 欧美日韩另类在线 | 视频国产一区二区三区 | av天天澡天天爽天天av | 免费国产一区二区 | 欧美性猛片, | 久久99精品久久久久久 | 亚洲亚洲精品在线观看 | 久久国产精品一二三区 | 丰满少妇麻豆av | 亚洲精品乱码久久久久久高潮 | 欧美一区二区三区在线看 | 麻豆一区在线观看 | 亚洲国产成人精品电影在线观看 | 天天爱天天插 | 六月丁香婷婷久久 | 欧美 日韩精品 | 日日日视频 | 视频在线观看91 | 国产精品2区 | 中文字幕最新精品 | 久久精品国亚洲 | 国产美女主播精品一区二区三区 | 99这里只有 | 亚洲经典在线 | 国产精品人成电影在线观看 | 色在线国产| 天天插天天 | av综合av| 欧美成人精品三级在线观看播放 | 国产一在线精品一区在线观看 | 欧美黑人猛交 | 国产一二三精品 | 国产91学生| 91视频成人免费 | 精品一二三区 | 国产精品粉嫩 |