数据湖架构Hudi(五)Hudi集成Flink案例详解
五、Hudi集成Flink案例詳解
5.1 hudi集成flink
flink的下載地址:
https://archive.apache.org/dist/flink/
| 0.12.x | 1.15.x、1.14.x、1.13.x |
| 0.11.x | 1.14.x、1.13.x |
| 0.10.x | 1.13.x |
| 0.9.0 | 1.12.2 |
- 將上述編譯好的安裝包拷貝到flink下的jars目錄中:
- 拷貝guava包,解決依賴沖突
- 配置Hadoop環境變量
5.2 sql-client之yarn-session模式
配置hadoop調度器yarn
mapred-site.xml<configuration> <!-- 指定MapReduce作業執?時,使?YARN進?資源調度 --><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value> </property><property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property><property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property> </configuration>yarn-site.xml<configuration> <!-- 設置ResourceManager --><property><name>yarn.resourcemanager.hostname</name><value>centos04</value> </property> <!--配置yarn的shuffle服務--><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property> </configuration>hadoop-env.sh # 在最后面添加如下: export YARN_RESOURCEMANAGER_USER=root export YARN_NODEMANAGER_USER=root# 記得配置sql-client-defaults.yaml5.2.1 啟動
# 1、修改配置文件 vim /opt/apps/flink-1.13.6/conf/flink-conf.yamlclassloader.check-leaked-classloader: false taskmanager.numberOfTaskSlots: 4state.backend: rocksdb execution.checkpointing.interval: 30000 # 開啟ck,才能快速從內存中flush出去 state.checkpoints.dir: hdfs://centos04:9000/ckps state.backend.incremental: true# 2、yarn-session模式啟動# 解決依賴問題 cp /opt/apps/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/apps/flink-1.13.6/lib/# 啟動yarn-session /opt/apps/flink-1.13.6/bin/yarn-session.sh -d # 啟動sql-client /opt/apps/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session5.2.2 插入數據
set sql-client.execution.result-mode=tableau;-- 創建hudi表 CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' -- 默認是COW );或如下寫法 CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' );-- 插入數據 INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');-- 查詢數據 select * from t1;5.2.3 流式插入
-- 1、創建測試表 CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20) ) WITH ('connector' = 'datagen','rows-per-second' = '1' );create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20) ) with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ' );-- 2、執行插入 insert into t2 select * from sourceT;查詢結果 set sql-client.execution.result-mode=tableau; Flink SQL> select * from t2 limit 10; -- 會產生一個collect的flink任務,拉取10條數據,注意:不是流讀取 2023-03-06 22:45:10,403 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false 2023-03-06 22:45:12,897 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at centos04/192.168.42.104:8032 2023-03-06 22:45:12,899 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2023-03-06 22:45:12,918 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface centos04:45452 of application 'application_1678113536312_0001'. +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | op | uuid | name | age | ts | partition | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | +I | d0523c31d3da5b8e2a8ff676dcf... | 327db70824413c5dcde0a7ac10c... | 1971040768 | 2023-03-06 14:40:58.780 | 42b45346672bf719b5393232763... | | +I | cfc07cbebf6890a04942ec88947... | 36fc7a58aab88835f11b3b51a40... | -12199364 | 2023-03-06 14:41:05.781 | e33c02173f4c744fb9c1c68e774... | | +I | 668b204a933494a89b829c76bc6... | aa9ff2109457fdcd5f099b8ce98... | 2061449955 | 2023-03-06 14:41:14.780 | 680514e53b196324423cd12cda5... | | +I | 95fe7878909a801c2726f1d05f5... | 1c86b29fe313e557688df0ba950... | 519997290 | 2023-03-06 14:41:11.781 | b9817c52301ab4614c3053c9ccc... | | +I | 8661c25c8c930f4660fbefa867e... | 01a2bee6b99064c7bca9513ca37... | -682830738 | 2023-03-06 14:41:32.781 | 16ab837502a31e208b06bb74efd... | | +I | 55ce03895e229b29546dbdd2ff3... | 77f2552de13337e8092c1445654... | 2011273584 | 2023-03-06 14:41:09.780 | 3fd688cfa17b2a3a6fd3ffac6bd... | | +I | 50c23f315d736c313b652b34fc5... | 4f9c84ff75466fba8e800daabd0... | -190184764 | 2023-03-06 14:42:26.780 | 7f2a07a1007b2fbfea8cbb2062e... | | +I | 8073e8c70a9bc0e79c2e69aa885... | 30bf89c80d9ab0f0a8f5f883ee6... | -1639873427 | 2023-03-06 14:41:24.781 | 15df7d527d6d7edae496e76d02f... | | +I | 29a61b7cd348d08498d2b089a5d... | 77a63ca7a2e77e6d167de20c673... | 71527378 | 2023-03-06 14:42:14.781 | 2842db44a691f4f1d597ac79086... | | +I | e5defc24191f60557644b7d14e2... | 56bdd04424b8f422d4075ade510... | 1054223989 | 2023-03-06 14:40:42.781 | e8d2d3c6fed90d37b15647d1ecd... | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+5.3 使用IDEA開發
除了用sql-client,還可以自己編寫FlinkSQL程序,打包提交Flink作業。
1、首先,需要將hudi集成flink的jar包,裝載到本地的倉庫,命令如下:
D:\bigdata\hudi從入門到精通\apps>mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar[INFO] Scanning for projects... [INFO] [INFO] ------------------< org.apache.maven:standalone-pom >------------------- [INFO] Building Maven Stub Project (No POM) 1 [INFO] --------------------------------[ pom ]--------------------------------- [INFO] [INFO] --- maven-install-plugin:2.4:install-file (default-cli) @ standalone-pom --- [INFO] Installing D:\bigdata\hudi從入門到精通\apps\hudi-flink1.13-bundle-0.12.0.jar to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.jar [INFO] Installing C:\Users\Undo\AppData\Local\Temp\mvninstall50353756903805721.pom to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.pom [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.111 s [INFO] Finished at: 2023-03-02T10:08:15+08:00 [INFO] ------------------------------------------------------------------------2、導入pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>hudi-start</artifactId><groupId>com.yyds</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>hudi-flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><hudi.version>0.12.0</hudi.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope> <!--不會打包到依賴中,只參與編譯,不參與運行 --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--idea運行時也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><!--手動install到本地maven倉庫,要不然會報錯--><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink_2.12</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project> package com.yyds.hudi.flink;import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiTest {public static void main(String[] args) {System.setProperty("HADOOP_USER_NAME","root");// 1、創建flinksql的執行環境Configuration conf = new Configuration();conf.setString(RestOptions.BIND_PORT, "8081-8089");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);// 注意:需要設置check-point// 設置狀態后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://centos04:9000/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2、使用flink自帶connector模擬數據tabEnv.executeSql("CREATE TABLE sourceT (\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1'\n" +")");// 3、創建hudi表tabEnv.executeSql("create table t2(\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +")\n" +"with (\n" +" 'connector' = 'hudi',\n" + // 指定connector為hudi" 'path' = 'hdfs://192.168.42.104:9000/datas/hudi_warehouse/hudi_flink/t2',\n" +" 'table.type' = 'MERGE_ON_READ'\n" + // MOR類型的表")");// 4、將模擬產生的數據,寫入到Hudi表中tabEnv.executeSql("insert into t2 select * from sourceT");} }jar包運行
bin/flink run -t yarn-per-job \ -c com.yyds.hudi.flink.HudiTest \ ./myjars/hudi-flink-1.0-SNAPSHOT.jar類型映射
| CHAR / VARCHAR / STRING | string | |
| BOOLEAN | boolean | |
| BINARY / VARBINARY | bytes | |
| DECIMAL | fixed | decimal |
| TINYINT | int | |
| SMALLINT | int | |
| INT | int | |
| BIGINT | long | |
| FLOAT | float | |
| DOUBLE | double | |
| DATE | int | date |
| TIME | int | time-millis |
| TIMESTAMP | long | timestamp-millis |
| ARRAY | array | |
| MAP(key must be string/char/varchar type) | map | |
| MULTISET(element must be string/char/varchar type) | map | |
| ROW | record |
5.4 hudi核心參數
5.4.1 去重參數
-- 通過如下語法設置主鍵: -- 設置單個主鍵 create table hoodie_table (f0 int primary key not enforced,f1 varchar(20),... ) with ('connector' = 'hudi',... )-- 設置聯合主鍵 create table hoodie_table (f0 int,f1 varchar(20),...primary key(f0, f1) not enforced ) with ('connector' = 'hudi',... )| hoodie.datasource.write.recordkey.field | 主鍵字段 | – | 支持主鍵語法 PRIMARY KEY 設置,支持逗號分隔的多個字段 |
| precombine.field(0.13.0 之前版本為 write.precombine.field) | 去重時間字段 | – | record 合并的時候會按照該字段排序,選值較大的 record 為合并結果;不指定則為處理序:選擇后到的 record |
5.4.2 并發參數
| write.tasks | writer 的并發,每個 writer 順序寫 1~N 個 buckets | 4 | 增加并發對小文件個數沒影響 |
| write.bucket_assign.tasks | bucket assigner 的并發 | Flink的并行度 | 增加并發同時增加了并發寫的 bucekt 數,也就變相增加了小文件(小 bucket) 數 |
| write.index_bootstrap.tasks | Index bootstrap 算子的并發,增加并發可以加快 bootstrap 階段的效率,bootstrap 階段會阻塞 checkpoint,因此需要設置多一些的 checkpoint 失敗容忍次數 | Flink的并行度 | 只在 index.bootstrap.enabled 為 true 時生效 |
| read.tasks | 讀算子的并發(batch 和 stream) | 4 | |
| compaction.tasks | online compaction 算子的并發 | writer 的并發 | online compaction 比較耗費資源,建議走 offline compaction |
可以flink建表時在with中指定,或Hints臨時指定參數的方式:在需要調整的表名后面加上 /*+ OPTIONS() */
案例如下:
insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */ select * from sourceT;# 從下圖可以看出,writer 的并發變成了2,bucket assigner 的并發變成了3,compaction_task 變成了4
可以參考下面Hudi表讀取原理,看上圖。
5.4.3 壓縮參數
? 在線壓縮的參數,通過設置 compaction.async.enabled =false關閉在線壓縮執行,但是調度compaction.schedule.enabled 仍然建議開啟(即上圖的compact_plan_generate步驟),之后通過離線壓縮直接執行 在線壓縮任務 階段性調度的壓縮 plan。
| compaction.schedule.enabled | 是否階段性生成壓縮 plan | true | 建議開啟,即使compaction.async.enabled 關閉的情況下 |
| compaction.async.enabled | 是否開啟異步壓縮 | true | 通過關閉此參數關閉在線壓縮 |
| compaction.tasks | 壓縮 task 并發 | 4 | |
| compaction.trigger.strategy | 壓縮策略 | num_commits | 支持四種策略:num_commits、time_elapsed、num_and_time、num_or_time |
| compaction.delta_commits | 默認策略,5 個 commits 壓縮一次 | 5 | |
| compaction.delta_seconds | 3600 | ||
| compaction.max_memory | 壓縮去重的 hash map 可用內存 | 100(MB) | 資源夠用的話建議調整到 1GB |
| compaction.target_io | 每個壓縮 plan 的 IO 上限,默認 5GB | 500(GB) |
案例如下:
CREATE TABLE t3(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t3','compaction.async.enabled' = 'true', -- 異步在線壓縮'compaction.tasks' = '1','compaction.schedule.enabled' = 'true', -- 生成壓縮 plan'compaction.trigger.strategy' = 'num_commits', -- 壓縮策略,安裝commit次數進行壓縮'compaction.delta_commits' = '2', -- 2次進行壓縮'table.type' = 'MERGE_ON_READ' );set table.dynamic-table-options.enabled=true; insert into t3 select * from sourceT/*+ OPTIONS('rows-per-second' = '5') */;-- 從hdfs上可以看到,flink發生兩次ck,delta_commit提交兩次后,將log文件進行壓縮,然后生成了parquet文件。5.4.4 文件大小
? Hudi會自管理文件大小,避免向查詢引擎暴露小文件,其中自動處理文件大小起很大作用。在進行insert/upsert操作時,Hudi可以將文件大小維護在一個指定文件大小。
? 目前只有 log 文件的寫入大小可以做到精確控制,parquet 文件大小按照估算值。
| hoodie.parquet.max.file.size | 最大可寫入的 parquet 文件大小 | 120 * 1024 * 1024默認 120MB(單位 byte) | 超過該大小切新的 file group |
| hoodie.logfile.to.parquet.compression.ratio | log文件大小轉 parquet 的比率 | 0.35 | hoodie 統一依據 parquet 大小來評估小文件策略 |
| hoodie.parquet.small.file.limit | 在寫入時,hudi 會嘗試先追加寫已存小文件,該參數設置了小文件的大小閾值,小于該參數的文件被認為是小文件 | 104857600默認 100MB(單位 byte) | 大于 100MB,小于 120MB 的文件會被忽略,避免寫過度放大 |
| hoodie.copyonwrite.record.size.estimate | 預估的 record 大小,hoodie 會依據歷史的 commits 動態估算 record 的大小,但是前提是之前有單次寫入超過 hoodie.parquet.small.file.limit 大小,在未達到這個大小時會使用這個參數 | 1024默認 1KB(單位 byte) | 如果作業流量比較小,可以設置下這個參數 |
| hoodie.logfile.max.size | LogFile最大大小。這是在將Log滾轉到下一個版本之前允許的最大大小。 | 1073741824默認1GB(單位 byte) |
案例如下:
CREATE TABLE t4(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t4','compaction.tasks' = '1','hoodie.parquet.max.file.size'= '10000', -- 最大可寫入的 parquet 文件大小,設置為10 KB'hoodie.parquet.small.file.limit'='5000', -- 小文件的大小閾值,小于該參數的文件被認為是小文件 設置為5KB'table.type' = 'MERGE_ON_READ' );set table.dynamic-table-options.enabled=true; insert into t4 select * from sourceT /*+ OPTIONS('rows-per-second' = '5')*/;5.4.5 hadoop參數
從 0.12.0 開始支持,如果有跨集群提交執行的需求,可以通過 sql 的 ddl 指定 per-job 級別的 hadoop 配置
| hadoop.${you option key} | 通過 hadoop.前綴指定 hadoop 配置項 | – | 支持同時指定多個 hadoop 配置項 |
5.5 內存優化
5.5.1 內存參數
| write.task.max.size | 一個 write task 的最大可用內存 | 1024 | 當前預留給 write buffer 的內存為write.task.max.size -compaction.max_memory當 write task 的內存 buffer達到閾值后會將內存里最大的 buffer flush 出去 |
| write.batch.size | Flink 的寫 task 為了提高寫數據效率,會按照寫 bucket 提前 buffer 數據,每個 bucket 的數據在內存達到閾值之前會一直 cache 在內存中,當閾值達到會把數據 buffer 傳遞給 hoodie 的 writer 執行寫操作 | 256 | 一般不用設置,保持默認值就好 |
| write.log_block.size | hoodie 的 log writer 在收到 write task 的數據后不會馬上 flush 數據,writer 是以 LogBlock 為單位往磁盤刷數據的,在 LogBlock 攢夠之前 records 會以序列化字節的形式 buffer 在 writer 內部 | 128 | 一般不用設置,保持默認值就好 |
| write.merge.max_memory | hoodie 在 COW 寫操作的時候,會有增量數據和 base file 數據 merge 的過程,增量的數據會緩存在內存的 map 結構里,這個 map 是可 spill 的,這個參數控制了 map 可以使用的堆內存大小 | 100 | 一般不用設置,保持默認值就好 |
| compaction.max_memory | 同 write.merge.max_memory: 100MB 類似,只是發生在壓縮時。 | 100 | 如果是 online compaction,資源充足時可以開大些,比如 1GB |
5.5.2 MOR
(1)state backend 換成 rocksdb (默認的 in-memory state-backend 非常吃內存)(2)內存夠的話,compaction.max_memory 調大些 (默認是 100MB 可以調到 1GB)(3)關注 TM 分配給每個 write task 的內存,保證每個 write task 能夠分配到 write.task.max.size 所配置的大小,比如 TM 的內存是 4GB 跑了 2 個 StreamWriteFunction 那每個 write function 能分到 2GB,盡量預留一些 buffer,因為網絡 buffer,TM 上其他類型 task (比如 BucketAssignFunction 也會吃些內存)(4)需要關注 compaction 的內存變化,compaction.max_memory 控制了每個 compaction task 讀 log 時可以利用的內存大小,compaction.tasks 控制了 compaction task 的并發注意: write.task.max.size - compaction.max_memory 是預留給每個 write task 的內存 buffer5.5.3 COW
(1)state backend 換成 rocksdb(默認的 in-memory state-backend 非常吃內存)。(2)write.task.max.size 和 write.merge.max_memory 同時調大(默認是 1GB 和 100MB 可以調到 2GB 和 1GB)。(3)關注 TM 分配給每個 write task 的內存,保證每個 write task 能夠分配到 write.task.max.size 所配置的大小,比如 TM 的內存是 4GB 跑了 2 個 StreamWriteFunction 那每個 write function 能分到 2GB,盡量預留一些 buffer,因為網絡 buffer,TM 上其他類型 task(比如 BucketAssignFunction 也會吃些內存)。注意:write.task.max.size - write.merge.max_memory 是預留給每個 write task 的內存 buffer。5.6 讀取方式
5.6.1 流讀
? 當前表默認是快照讀取,即讀取最新的全量快照數據并一次性返回。通過參數read.streaming.enabled 參數開啟流讀模式,通過 read.start-commit 參數指定起始消費位置,支持指定 earliest 從最早消費。
| read.streaming.enabled | false | false | 設置 true 開啟流讀模式 |
| read.start-commit | false | 最新 commit | 指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(閉區間) |
| read.streaming.skip_compaction | false | false | 流讀時是否跳過 compaction 的 commits,跳過 compaction 有兩個用途:1)避免 upsert 語義下重復消費 (compaction 的 instant 為重復數據,如果不跳過,有小概率會重復消費) 2) changelog 模式下保證語義正確性 0.11 開始,以上兩個問題已經通過保留 compaction 的 instant time 修復 |
| clean.retain_commits | false | 10 | cleaner 最多保留的歷史 commits 數,大于此數量的歷史 commits 會被清理掉,changelog 模式下,這個參數可以控制 changelog 的保留時間,例如 checkpoint 周期為 5 分鐘一次,默認最少保留 50 分鐘的時間。 |
5.6.2 增量讀取
從 0.10.0 開始支持。如果有增量讀取 batch 數據的需求,增量讀取包含三種場景。 (1)Stream 增量消費,通過參數 read.start-commit 指定起始消費位置; (2)Batch 增量消費,通過參數 read.start-commit 指定起始消費位置,通過參數 read.end-commit 指定結束消費位置,區間為閉區間,即包含起始、結束的 commit (3)TimeTravel:Batch 消費某個時間點的數據:通過參數 read.end-commit 指定結束消費位置即可(由于起始位置默認從最新,所以無需重復聲明)| read.start-commit | false | 默認從最新 commit | 支持 earliest 從最早消費 |
| read.end-commit | false | 默認到最新 commit |
5.6.3 限流
? 如果將全量數據(百億數量級) 和增量先同步到 kafka,再通過 flink 流式消費的方式將庫表數據直接導成 hoodie 表,因為直接消費全量部分數據:量大(吞吐高)、亂序嚴重(寫入的 partition 隨機),會導致寫入性能退化,出現吞吐毛刺,這時候可以開啟限速參數,保證流量平穩寫入。
| write.rate.limit | false | 0 | 默認關閉限速 |
5.7 寫入方式
5.7.1 通過flink-cdc進行寫入
CDC 數據保存了完整的數據庫變更,當前可通過兩種途徑將數據導入 hudi
第一種:通過 cdc-connector 直接對接 DB 的 binlog 將數據導入 hudi,優點是不依賴消息隊列,缺點是對 db server 造成壓力。第二種:對接 cdc format 消費 kafka 數據導入 hudi,優點是可擴展性強,缺點是依賴 kafka。注意:如果上游數據無法保證順序,需要指定 write.precombine.field 字段。1)準備MySQL表
(1)MySQL開啟binlog
(2)建表
create database test; use test;create table stu3 (id int unsigned auto_increment primary key COMMENT '自增id',name varchar(20) not null comment '學生名字',school varchar(20) not null comment '學校名字',nickname varchar(20) not null comment '學生小名',age int not null comment '學生年齡',class_num int not null comment '班級人數',phone bigint not null comment '電話號碼',email varchar(64) comment '家庭網絡郵箱',ip varchar(32) comment 'IP地址') engine=InnoDB default charset=utf8;2)flink讀取mysql binlog并寫入kafka
(1)創建MySQL表
create table stu3_binlog(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) with ('connector' = 'mysql-cdc','hostname' = 'centos01','port' = '3306','username' = 'root','password' = 'root','database-name' = 'test','table-name' = 'stu3' );(2)創建Kafka表
create table stu3_binlog_sink_kafka(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) with ('connector' = 'upsert-kafka','topic' = 'cdc_mysql_stu3_sink','properties.zookeeper.connect' = 'centos01:2181','properties.bootstrap.servers' = 'centos01:9092','key.format' = 'json','value.format' = 'json');(3)將mysql binlog日志寫入kafka
insert into stu3_binlog_sink_kafka select * from stu3_binlog;3)flink讀取kafka數據并寫入hudi數據湖
(1)創建kafka源表
create table stu3_binlog_source_kafka(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string) with ('connector' = 'kafka','topic' = 'cdc_mysql_stu3_sink','properties.bootstrap.servers' = 'hadoop1:9092','format' = 'json','scan.startup.mode' = 'earliest-offset','properties.group.id' = 'testGroup');(2)創建hudi目標表
create table stu3_binlog_sink_hudi(id bigint not null,name string,`school` string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) partitioned by (`school`) with ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/stu3_binlog_sink_hudi','table.type' = 'MERGE_ON_READ','write.option' = 'insert','write.precombine.field' = 'school');(3)將kafka數據寫入到hudi中
insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka;5.7.2 離線批量導入
如果存量數據來源于其他數據源,可以使用批量導入功能,快速將存量數據導成 Hoodie 表格式。(1)批量導入省去了 avro 的序列化以及數據的 merge 過程,后續不會再有去重操作,數據的唯一性需要自己來保證。 (2)bulk_insert 需要在 Batch Execuiton Mode 下執行更高效,Batch 模式默認會按照 partition path 排序輸入消息再寫入 Hoodie,避免 file handle 頻繁切換導致性能下降。SET execution.runtime-mode = batch; SET execution.checkpointing.interval = 0;(3)bulk_insert write task 的并發通過參數 write.tasks 指定,并發的數量會影響到小文件的數量,理論上,bulk_insert write task 的并發數就是劃分的 bucket 數,當然每個 bucket 在寫到文件大小上限(parquet 120 MB)的時候會 roll over 到新的文件句柄,所以最后:寫文件數量 >= bulk_insert write task 數。| write.operation | TRUE | upsert | 配置 bulk_insert 開啟該功能 |
| write.tasks | FALSE | 4 | bulk_insert 寫 task 的并發,最后的文件數 >=write.tasks |
| write.bulk_insert.shuffle_by_partitionwrite.bulk_insert.shuffle_input(從 0.11 開始) | FALSE | TRUE | 是否將數據按照 partition 字段 shuffle 再通過 write task 寫入,開啟該參數將減少小文件的數量 但是可能有數據傾斜風險 |
| write.bulk_insert.sort_by_partitionwrite.bulk_insert.sort_input(從 0.11 開始) | FALSE | TRUE | 是否將數據線按照 partition 字段排序再寫入,當一個 write task 寫多個 partition,開啟可以減少小文件數量 |
| write.sort.memory | 128 | sort 算子的可用 managed memory(單位 MB) |
5.7.3 全量接增量
如果已經有全量的離線 Hoodie 表,需要接上實時寫入,并且保證數據不重復,可以開啟 index bootstrap 功能。
如果覺得流程冗長,可以在寫入全量數據的時候資源調大直接走流模式寫,全量走完接新數據再將資源調小(或者開啟限流功能)。
| index.bootstrap.enabled | true | false | 開啟索引加載,會將已存表的最新數據一次性加載到 state 中 |
| index.partition.regex | false | * | 設置正則表達式進行分區篩選,默認為加載全部分區 |
5.8 寫入模式
5.8.1 Changelog 模式
? 如果希望 Hoodie 保留消息的所有變更(I/-U/U/D),之后接上 Flink 引擎的有狀態計算實現全鏈路近實時數倉生產(增量計算),Hoodie 的 MOR 表通過行存原生支持保留消息的所有變更(format 層面的集成),通過流讀 MOR 表可以消費到所有的變更記錄。
1)WITH 參數
| changelog.enabled | false | false | 默認是關閉狀態,即 UPSERT 語義,所有的消息僅保證最后一條合并消息,中間的變更可能會被 merge 掉;改成 true 支持消費所有變更。 |
? 批(快照)讀仍然會合并所有的中間結果,不管 format 是否已存儲中間狀態。
? 開啟 changelog.enabled 參數后,中間的變更也只是 Best Effort: 異步的壓縮任務會將中間變更合并成 1 條,所以如果流讀消費不夠及時,被壓縮后只能讀到最后一條記錄。當然,通過調整壓縮的 buffer 時間可以預留一定的時間 buffer 給 reader,比如調整壓縮的兩個參數:
? ? compaction.delta_commits:5
? ? compaction.delta_seconds: 3600。
說明:
Changelog 模式開啟流讀的話,要在 sql-client 里面設置參數:
set sql-client.execution.result-mode=tableau;
或者
set sql-client.execution.result-mode=changelog;
否則中間結果在讀的時候會被直接合并。
2)流讀 changelog
僅在 0.10.0 支持,本 feature 為實驗性。
開啟 changelog 模式后,hudi 會保留一段時間的 changelog 供下游 consumer 消費,我們可以通過流讀 ODS 層 changelog 接上 ETL 邏輯寫入到 DWD 層,如下圖的 pipeline:
? 流讀的時候我們要注意 changelog 有可能會被 compaction 合并掉,中間記錄會消除,可能會影響計算結果,需要關注sql-client的屬性(result-mode)同上。
3)案例演示
(1)使用changelog
set sql-client.execution.result-mode=tableau; CREATE TABLE t6(id int,ts int,primary key (id) not enforced ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '4','changelog.enabled' = 'true' );insert into t6 values (1,1); insert into t6 values (1,2);set table.dynamic-table-options.enabled=true; select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;(2)不使用changelog
CREATE TABLE t6_v(id int,ts int,primary key (id) not enforced ) WITH ('connector' = 'hudi','path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true','read.streaming.check-interval' = '4' );select * from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/; select count(*) from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;5.8.2 Append 模式
從 0.10 開始支持
對于 INSERT 模式:
? ? MOR 默認會 apply 小文件策略: 會追加寫 avro log 文件
? ? COW 每次直接寫新的 parquet 文件,沒有小文件策略
Hudi 支持豐富的 Clustering 策略,優化 INSERT 模式下的小文件問題:
1)Inline Clustering
只有 Copy On Write 表支持該模式
| write.insert.cluster | false | false | 是否在寫入時合并小文件,COW 表默認 insert 寫不合并小文件,開啟該參數后,每次寫入會優先合并之前的小文件(不會去重),吞吐會受影響 |
2) Async Clustering
? 從 0.12 開始支持
(1)WITH參數
| clustering.schedule.enabled | false | false | 是否在寫入時定時異步調度 clustering plan,默認關閉 |
| clustering.delta_commits | false | 4 | 調度 clsutering plan 的間隔 commits,clustering.schedule.enabled 為 true 時生效 |
| clustering.async.enabled | false | false | 是否異步執行 clustering plan,默認關閉 |
| clustering.tasks | false | 4 | Clustering task 執行并發 |
| clustering.plan.strategy.target.file.max.bytes | false | 1024 * 1024 * 1024 | Clustering 單文件目標大小,默認 1GB |
| clustering.plan.strategy.small.file.limit | false | 600 | 小于該大小的文件才會參與 clustering,默認600MB |
| clustering.plan.strategy.sort.columns | false | N/A | 支持指定特殊的排序字段 |
| clustering.plan.partition.filter.mode | false | NONE | 支持NONE:不做限制RECENT_DAYS:按時間(天)回溯SELECTED_PARTITIONS:指定固定的 partition |
| clustering.plan.strategy.daybased.lookback.partitions | false | 2 | RECENT_DAYS 生效,默認 2 天 |
(2)Clustering Plan Strategy
? 支持定制化的 clustering 策略。
| clustering.plan.partition.filter.mode | false | NONE | 支持· NONE:不做限制· RECENT_DAYS:按時間(天)回溯· SELECTED_PARTITIONS:指定固定的 partition |
| clustering.plan.strategy.daybased.lookback.partitions | false | 2 | RECENT_DAYS 生效,默認 2 天 |
| clustering.plan.strategy.cluster.begin.partition | false | N/A | SELECTED_PARTITIONS 生效,指定開始 partition(inclusive) |
| clustering.plan.strategy.cluster.end.partition | false | N/A | SELECTED_PARTITIONS 生效,指定結束 partition(incluseve) |
| clustering.plan.strategy.partition.regex.pattern | false | N/A | 正則表達式過濾 partitions |
| clustering.plan.strategy.partition.selected | false | N/A | 顯示指定目標 partitions,支持逗號 , 分割多個 partition |
5.9 Bucket索引
? 默認的 flink 流式寫入使用 state 存儲索引信息:primary key 到 fileId 的映射關系。當數據量比較大的時候,state的存儲開銷可能成為瓶頸,bucket 索引通過固定的 hash 策略,將相同 key 的數據分配到同一個 fileGroup 中,避免了索引的存儲和查詢開銷。
| index.type | false | FLINK_STATE | 設置 BUCKET 開啟 Bucket 索引功能 |
| hoodie.bucket.index.hash.field | false | 主鍵 | 可以設置成主鍵的子集 |
| hoodie.bucket.index.num.buckets | false | 4 | 默認每個 partition 的 bucket 數,當前設置后則不可再變更。 |
5.10 Hudi Catalog
? 從 0.12.0 開始支持,通過 catalog 可以管理 flink 創建的表,避免重復建表操作,另外 hms 模式的 catalog 支持自動補全 hive 同步參數。
-- DFS 模式 Catalog SQL樣例: CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默認路徑}','mode'='dfs' );-- Hms 模式 Catalog SQL 樣例: CREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '${catalog 的默認路徑}','hive.conf.dir' = '${hive-site.xml 所在的目錄}','mode'='hms' -- 支持 'dfs' 模式通過文件系統管理表屬性);| catalog.path | true | – | 默認的 catalog 根路徑,用作表路徑的自動推導,默認的表路徑: c a t a l o g . p a t h / {catalog.path}/ catalog.path/{db_name}/${table_name} |
| default-database | false | default | 默認的 database 名 |
| hive.conf.dir | false | – | hive-site.xml 所在的目錄,只在 hms 模式下生效 |
| mode | false | dfs | 支持 hms模式通過 hive 管理元數據 |
| table.external | false | false | 是否創建外部表,只在 hms 模式下生效 |
案例如下:
--(1)創建sql-client初始化sql文件 vim /opt/apps/flink-1.13.6/conf/sql-client-init.sqlCREATE CATALOG hoodie_catalogWITH ('type'='hudi','catalog.path' = '/tmp/hudi_catalog','mode'='dfs' );USE CATALOG hoodie_catalog; --(2)指定sql-client啟動時加載sql文件 hadoop fs -mkdir /tmp/hudi_catalogbin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session --(3)建庫建表插入 create database test; use test;create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20), primary key (uuid) not enforced ) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t2','table.type' = 'MERGE_ON_READ' );insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a'); --(4)退出sql-client,重新進入,表信息還在 use test; show tables; select * from t2;5.11 離線壓縮
? MOR 表的 compaction 默認是自動打開的,策略是 5 個 commits 執行一次壓縮。 因為壓縮操作比較耗費內存,和寫流程放在同一個 pipeline,在數據量比較大的時候(10w+/s qps),容易干擾寫流程,此時采用離線定時任務的方式執行 compaction 任務更穩定。
5.11.1 設置參數
? compaction.async.enabled 為 false,關閉在線 compaction。
? compaction.schedule.enabled 仍然保持開啟,由寫任務階段性觸發壓縮 plan。
5.11.2 原理
一個 compaction 的任務的執行包括兩部分:
? schedule 壓縮 plan
該過程推薦由寫任務定時觸發,寫參數 compaction.schedule.enabled 默認開啟
? 執行對應的壓縮 plan
5.11.3 使用方式
1)執行命令
離線 compaction 需要手動執行 Java 程序,程序入口:
? hudi-flink1.13-bundle-0.12.0.jar
? org.apache.hudi.sink.compact.HoodieFlinkCompactor
# 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:9000/table2)參數配置
| –path | true | – | 目標表的路徑 |
| –compaction-tasks | false | -1 | 壓縮 task 的并發,默認是待壓縮 file group 的數量 |
| –compaction-max-memory | false | 100 (單位 MB) | 壓縮時 log 數據的索引 map,默認 100MB,內存足夠可以開大些 |
| –schedule | false | false | 是否要執行 schedule compaction 的操作,當寫流程還在持續寫入表數據的時候,開啟這個參數有丟失查詢數據的風險,所以開啟該參數一定要保證當前沒有任務往表里寫數據, 寫任務的 compaction plan 默認是一直 schedule 的,除非手動關閉(默認 5 個 commits 一次壓縮) |
| –seq | false | LIFO | 執行壓縮任務的順序,默認是從最新的壓縮 plan 開始執行,可選值:LIFO: 從最新的 plan 開始執行;FIFO: 從最老的 plan 開始執行 |
| –service | false | false | 是否開啟 service 模式,service 模式會打開常駐進程,一直監聽壓縮任務并提交到集群執行(從 0.11 開始執行) |
| –min-compaction-interval-seconds | false | 600 (單位 秒) | service 模式下的執行間隔,默認 10 分鐘 |
案例如下:
create table t7(id int,ts int,primary key (id) not enforced ) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t7','compaction.async.enabled' = 'false', -- 關閉自動壓縮'compaction.schedule.enabled' = 'true', -- 由寫任務階段性觸發壓縮 plan'table.type' = 'MERGE_ON_READ' );insert into t7 values(1,1); insert into t7 values(2,2); insert into t7 values(3,3); insert into t7 values(4,4); insert into t7 values(5,5);// 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t75.12 離線 Clustering
? 異步的 clustering 相對于 online 的 async clustering 資源隔離,從而更加穩定。
5.12.1 設置參數
? clustering.async.enabled 為 false,關閉在線 clustering。
? clustering.schedule.enabled 仍然保持開啟,由寫任務階段性觸發 clustering plan。
5.12.2 原理
一個 clustering 的任務的執行包括兩部分:
? schedule plan
推薦由寫任務定時觸發,寫參數 clustering.schedule.enabled 默認開啟。
? 執行對應的 plan
5.12.3 使用方式
1)執行命令
離線 clustering 需要手動執行 Java 程序,程序入口:
? hudi-flink1.13-bundle-0.12.0.jar
? org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob
注意:必須是分區表,否則報錯空指針異常。
# 命令行的方式./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/table2)參數配置
| –path | true | – | 目標表的路徑。 |
| –clustering-tasks | false | -1 | Clustering task 的并發,默認是待壓縮 file group 的數量。 |
| –schedule | false | false | 是否要執行 schedule clustering plan 的操作,當寫流程還在持續寫入表數據的時候,開啟這個參數有丟失查詢數據的風險,所以開啟該參數一定要保證當前沒有任務往表里寫數據, 寫任務的 clustering plan 默認是一直 schedule 的,除非手動關閉(默認 4 個 commits 一次 clustering)。 |
| –seq | false | FIFO | 執行壓縮任務的順序,默認是從最老的 clustering plan 開始執行,可選值:LIFO: 從最新的 plan 開始執行;FIFO: 從最老的 plan 開始執行 |
| –target-file-max-bytes | false | 1024 * 1024 * 1024 | 最大目標文件,默認 1GB。 |
| –small-file-limit | false | 600 | 小于該大小的文件會參與 clustering,默認 600MB。 |
| –sort-columns | false | N/A | Clustering 可選排序列。 |
| –service | false | false | 是否開啟 service 模式,service 模式會打開常駐進程,一直監聽壓縮任務并提交到集群執行(從 0.11 開始執行)。 |
| –min-compaction-interval-seconds | false | 600 (單位 秒) | service 模式下的執行間隔,默認 10 分鐘。 |
3)案例演示
create table t8(id int,age int,ts int,primary key (id) not enforced ) partitioned by (age) with ('connector' = 'hudi','path' = '/tmp/hudi_catalog/default/t8','clustering.async.enabled' = 'false','clustering.schedule.enabled' = 'true','table.type' = 'COPY_ON_WRITE' );insert into t8 values(1,18,1); insert into t8 values(2,18,2); insert into t8 values(3,18,3); insert into t8 values(4,18,4); insert into t8 values(5,18,5);-- 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t85.12.4 常見問題
# 存儲一直看不到數據如果是 streaming 寫,請確保開啟 checkpoint,Flink 的 writer 有 3 種刷數據到磁盤的策略: ?當某個 bucket 在內存積攢到一定大小 (可配,默認 64MB) ?當總的 buffer 大小積攢到一定大小(可配,默認 1GB) ?當 checkpoint 觸發,將內存里的數據全部 flush 出去# 數據有重復如果是 COW 寫,需要開啟參數 write.insert.drop.duplicates,COW 寫每個 bucket 的第一個文件默認是不去重的,只有增量的數據會去重,全局去重需要開啟該參數;MOR 寫不需要開啟任何參數,定義好 primary key 后默認全局去重。(注意:從 0.10 版本開始,該屬性改名 write.precombine 并且默認為 true。)如果需要多 partition 去重,需要開啟參數: index.global.enabled 為 true。(注意:從 0.10 版本開始,該屬性默認true。)索引 index 是判斷數據重復的核心數據結構,index.state.ttl 設置了索引保存的時間,默認為 1.5 天,對于長時間周期的更新,比如更新一個月前的數據,需要將 index.state.ttl 調大(單位天),設置小于 0 代表永久保存。(注意:從 0.10 版本開始,該屬性默認為 0。)# Merge On Read 寫只有 log 文件Merge On Read 默認開啟了異步的 compaction,策略是 5 個 commits 壓縮一次,當條件滿足參會觸發壓縮任務,另外,壓縮本身因為耗費資源,所以不一定能跟上寫入效率,可能會有滯后。5.13 Hudi核心原理
5.13.1 Hudi數據去重原理
Hoodie 的數據去重分兩步:
(1)寫入前攢 buffer 階段去重,核心接口HoodieRecordPayload#preCombine
(2)寫入過程中去重,核心接口HoodieRecordPayload#combineAndGetUpdateValue。
1)消息版本新舊
? 相同 record key (主鍵)的數據通過write.precombine.field指定的字段來判斷哪個更新,即 precombine 字段更大的 record 更新,如果是相等的 precombine 字段,則后來的數據更新。
? 從 0.10 版本開始,write.precombine.field 字段為可選,如果沒有指定,會看 schema 中是否有 ts 字段,如果有,ts 字段被選為 precombine 字段;如果沒有指定,schema 中也沒有 ts 字段,則為處理順序:后來的消息默認較新。
2)攢消息階段的去重
? Hoodie 將 buffer 消息發給 write handle 之前可以執行一次去重操作,通過HoodieRecordPayload#preCombine 接口,保留 precombine 字段較大的消息,此操作為純內存的計算,在同一個 write task 中為單并發執行。
? 注意:write.precombine 選項控制了攢消息的去重。
3)寫 parquet 增量消息的去重
? 在Hoodie 寫入流程中,Hoodie 每寫一個 parquet 都會有 base + 增量 merge 的過程,增量的消息會先放到一個 spillable map 的數據結構構建內存 index,這里的增量數據如果沒有提前去重,那么同 key 的后來消息會直接覆蓋先來的消息。
? Writer 接著掃 base 文件,過程中會不斷查看內存 index 是否有同 key 的新消息,如果有,會走 HoodieRecordPayload#combineAndGetUpdateValue 接口判斷保留哪個消息。
? 注意: MOR 表的 compaction 階段和 COW 表的寫入流程都會有 parquet 增量消息去重的邏輯。
4)跨 partition 的消息去重
? 默認情況下,不同的 partition 的消息是不去重的,即相同的 key 消息,如果新消息換了 partition,那么老的 partiiton 消息仍然保留。
? 開啟 index.global.enabled 選項開啟跨 partition 去重,原理是先往老的 partiton 發一條刪除消息,再寫新 partition。
5.13.2 Hudi表寫入原理
數據寫入、數據壓縮與數據清理
1)數據寫入分析 (1)基礎數據封裝:將數據流中flink的RowData封裝成Hoodie實體; (2)BucketAssigner:桶分配器,主要是給數據分配寫入的文件地址:若為插入操作,則取大小最小的FileGroup對應的FileId文件內進行插入;在此文件的后續寫入中文件 ID 保持不變,并且提交時間會更新以顯示最新版本。這也意味著記錄的任何特定版本,給定其分區路徑,都可以使用文件 ID 和 instantTime進行唯一定位;若為更新操作,則直接在當前location進行數據更新; (3)Hoodie Stream Writer: 數據寫入,將數據緩存起來,在超過設置的最大flushSize或是做checkpoint時進行刷新到文件中; (4)Oprator Coordinator:主要與Hoodie Stream Writer進行交互,處理checkpoint等事件,在做checkpoint時,提交instant到timeLine上,并生成下一個instant的時間,算法為取當前最新的commi時間,比對當前時間與commit時間,若當前時間大于commit時間,則返回,否則一直循環等待生成。2)數據壓縮壓縮(compaction)用于在 MergeOnRead存儲類型時將基于行的log日志文件轉化為parquet列式數據文件,用于加快記錄的查找。compaction首先會遍歷各分區下最新的parquet數據文件和其對應的log日志文件進行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance:具體策略分為4種,具體見官網說明: compaction.trigger.strategy: Strategy to trigger compaction, options are 1.'num_commits': trigger compaction when reach N delta commits; 2.'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 3.'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 4.'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits' Default Value: num_commits (Optional)在項目實踐中需要注意參數'read.streaming.skip_compaction' 參數的配置,其表示在流式讀取該表是否跳過壓縮后的數據,若該表用于后續聚合操作表的輸入表,則需要配置值為true,表示聚合操作表不再消費讀取壓縮數據。若不配置或配置為false,則該表中的數據在未被壓縮之前被聚合操作表讀取了一次,在壓縮后數據又被讀取一次,會導致聚合表的sum、count等算子結果出現雙倍情況。3)數據清理隨著用戶向表中寫入更多數據,對于每次更新,Hudi會生成一個新版本的數據文件用于保存更新后的記錄(COPY_ON_WRITE)或將這些增量更新寫入日志文件以避免重寫更新版本的數據文件(MERGE_ON_READ)。在這種情況下,根據更新頻率,文件版本數可能會無限增長,但如果不需要保留無限的歷史記錄,則必須有一個流程(服務)來回收舊版本的數據,這就是 Hudi 的清理服務。具體清理策略可參考官網,一般使用的清理策略為:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 個文件版本而不受時間限制的效果。會刪除N之外的FileSlice。5.13.3 Hudi表讀取原理
(1)開啟split_monitor算子,每隔N秒(可配置)監聽TimeLine上變化,并將變更的Instance封裝為FileSlice。
(2)分發log文件時候,按照fileId值進行keyBy,保證同一file group下數據文件都給一個Task進行處理,從而保證數據處理的有序性。
(3)split_reader根據FileSlice信息進行數據讀取。
總結
以上是生活随笔為你收集整理的数据湖架构Hudi(五)Hudi集成Flink案例详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mall商城的高级篇的开发(三)缓存与分
- 下一篇: struts2 数据校验