【数据湖Hudi-8-Hudi集成Flink-入门】
生活随笔
收集整理的這篇文章主要介紹了
【数据湖Hudi-8-Hudi集成Flink-入门】
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
數(shù)據(jù)湖Hudi-8-Hudi集成Flink-入門
- Hudi集成Flink入門
- 1.Hudi集成Flink版本對照關(guān)系
- 2.Flink環(huán)境準(zhǔn)備
- 3.Flink SQL Client方式處理任務(wù)
- 1.修改配置
- 2.創(chuàng)建表格,插入數(shù)據(jù)
- 3.流式插入數(shù)據(jù)
- 4.Flink IDEA編碼方式處理任務(wù)
- 1.環(huán)境準(zhǔn)備
- 2.創(chuàng)建Maven工程,并編寫代碼
- 3.提交運(yùn)行
- 5.Flink和Hudi類型映射關(guān)系
Hudi集成Flink入門
1.Hudi集成Flink版本對照關(guān)系
0.11.x不建議使用,如果要用請使用補(bǔ)丁分支:https://github.com/apache/hudi/pull/6182
2.Flink環(huán)境準(zhǔn)備
1)拷貝編譯好的jar包到Flink的lib目錄下
cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle_2.12-0.12.0.jar /opt/module/flink-1.13.6/lib/2)拷貝guava包,解決依賴沖突
cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/3)配置Hadoop環(huán)境變量
sudo vim /etc/profile.d/my_env.shexport HADOOP_CLASSPATH=`hadoop classpath` export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopsource /etc/profile3.Flink SQL Client方式處理任務(wù)
1.修改配置
- 1)修改flink-conf.yaml配置
- 2)yarn-session模式
(1)解決依賴問題
注意:
下面包依賴問題的處理,主要是解決 flink集成Hudi的時(shí)候,flink任務(wù)在執(zhí)行的時(shí)候,需要進(jìn)行 compaction,但是 compaction不會成功,且此錯誤不會上報(bào)到總?cè)罩痉?wù)器上,所以需要進(jìn)入到Flink對應(yīng)的單獨(dú)的任務(wù)里面,查看報(bào)錯,報(bào)錯信息如下,實(shí)際上在flink集成hudi里面有這個(gè)包,最終原因是以來沖突問題。
(2)啟動yarn-session
/opt/module/flink-1.13.6/bin/yarn-session.sh -d(3)啟動sql-client
/opt/module/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session2.創(chuàng)建表格,插入數(shù)據(jù)
set sql-client.execution.result-mode=tableau;– 創(chuàng)建hudi表
CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' –- 默認(rèn)是COW ); 或如下寫法 CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' );- 插入數(shù)據(jù)
- 查詢數(shù)據(jù)
- 更新數(shù)據(jù)
注意,保存模式現(xiàn)在是Append。通常,除非是第一次創(chuàng)建表,否則請始終使用追加模式。現(xiàn)在再次查詢數(shù)據(jù)將顯示更新的記錄。每個(gè)寫操作都會生成一個(gè)用時(shí)間戳表示的新提交。查找前一次提交中相同的_hoodie_record_keys在_hoodie_commit_time、age字段中的變化。
3.流式插入數(shù)據(jù)
- 1)創(chuàng)建測試表
- 2)執(zhí)行插入
- 3)查詢結(jié)果
4.Flink IDEA編碼方式處理任務(wù)
1.環(huán)境準(zhǔn)備
- 1.手動install依賴
在hudi-flink1.13-bundle-0.12.0.jar所在目錄下,打開cmd,執(zhí)行此命令,然后查看idea中settings的maven中 local repository多對應(yīng)的本地依賴庫目錄跟執(zhí)行完下面命令所對應(yīng)的目錄是否一致,如果不一致,需要將下面命令編譯完的jar移動到剛剛目錄下面。
2.創(chuàng)建Maven工程,并編寫代碼
代碼如下:
import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiDemo {public static void main(String[] args) {//IDEA運(yùn)行時(shí),提供WEBUI // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設(shè)置狀態(tài)后端 RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);//idea本地運(yùn)行時(shí),指定rocksdb存儲路徑 // embeddedRocksDBStateBackend.setDbStoragePath("file:///E:/rocksdb");embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);//checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);tableEnvironment.executeSql("CREATE TABLE sourceT (\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1'\n" +")");tableEnvironment.executeSql("create table t2(\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +")\n" +"with (\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t2',\n" +" 'table.type' = 'MERGE_ON_READ'\n" +")");tableEnvironment.executeSql("insert into t2 select * from sourceT");} }3.提交運(yùn)行
將代碼打成jar包,上傳到目錄myjars,執(zhí)行提交命令:
flink run -t yarn-per-job \ -c com.yang.hudi.flink.HudiDemo \ ./myjars/flink-hudi-demo-1.0-SNAPSHOT.jar5.Flink和Hudi類型映射關(guān)系
總結(jié)
以上是生活随笔為你收集整理的【数据湖Hudi-8-Hudi集成Flink-入门】的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 湖西大学计算机专业排名,韩国留学热门的3
- 下一篇: DenseTNT翻译