日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用 Flink Hudi 构建流式数据湖

發布時間:2024/8/23 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 Flink Hudi 构建流式数据湖 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡介:?本文介紹了 Flink Hudi 通過流計算對原有基于 mini-batch 的增量計算模型的不斷優化演進。

本文介紹了 Flink Hudi 通過流計算對原有基于 mini-batch 的增量計算模型不斷優化演進。用戶可以通過 Flink SQL 將 CDC 數據實時寫入 Hudi 存儲,且在即將發布的 0.9 版本 Hudi 原生支持 CDC format。主要內容為:

  • 背景
  • 增量 ETL
  • 演示
  • 一、背景

    近實時

    從 2016 年開始,Apache Hudi 社區就開始通過 Hudi 的 UPSERT 能力探索近實時場景的使用案例 [1]。通過 MR/Spark 的批處理模型,用戶可以實現小時級別的數據注入 HDFS/OSS。在純實時場景,用戶通過流計算引擎 Flink + KV/OLAP 存儲的架構可以實現端到端的秒級 (5分鐘級) 實時分析。然而在秒級 (5分鐘級) 到小時級時的場景還存在大量的用例,我們稱之為 NEAR-REAL-TIME (近實時)。

    在實踐中有大量的案例都屬于近實時的范疇:

  • 分鐘級別的大屏;
  • 各種 BI 分析 (OLAP);
  • 機器學習分鐘級別的特征提取。
  • 增量計算

    解決近實時的方案當前是比較開放的。

    • 流處理的時延低,但是 SQL 的 pattern 比較固定,查詢端的能力(索引、ad hoc)欠缺;
    • 批處理的數倉能力豐富但是數據時延大。

    于是 Hudi 社區提出基于 mini-batch 的增量計算模型:

    增量數據集 =>?增量計算結果?merge?已存結果 => 外存

    這套模型通過湖存儲的 snapshot 拉取增量的數據集 (兩個 commits 之前的數據集),通過 Spark/Hive 等批處理框架計算增量的結果 (比如簡單的 count) 再 merge 到已存結果中。

    核心問題

    增量模型需要解決的核心問題:

  • UPSERT 能力:類似 KUDU 和 Hive ACID,Hudi 也提供了分鐘級的更新能力;
  • 增量消費:Hudi 通過湖存儲的多 snapshots 提供增量拉取。
  • 基于 mini-batch 的增量計算模型可以提升部分場景的時延、節省計算成本,但有一個很大的限制:對 SQL 的 pattern 有要求。因為計算走的是批,批計算本身不維護狀態,這就要求計算的指標能夠比較方便地 merge,簡單的 count、sum 可以做,但是 avg、count distinct 這些還是需要拉取全量數據重算。

    隨著流計算和實時數倉的普及,Hudi 社區也在積極的擁抱變化,通過流計算對原有基于 mini-batch 的增量計算模型不斷優化演進:在 0.7 版本引入了流式數據入湖,在 0.9 版本支持了原生的 CDC format。

    二、增量 ETL

    DB 數據入湖

    隨著 CDC 技術的成熟,debezium 這樣的 CDC 工具越來越流行,Hudi 社區也先后集成了流寫,流讀的能力。用戶可以通過 Flink SQL 將 CDC 數據實時寫入 Hudi 存儲:

    • 用戶既可以通過 Flink CDC connector 直接將 DB 數據導入 Hudi;
    • 也可以先將 CDC 數據導入 Kafka,再通過 Kafka connector 導入 Hudi。

    第二種方案的容錯和擴展性會好一些。

    數據湖 CDC

    在即將發布的 0.9 版本,Hudi 原生支持 CDC format,一條 record 的所有變更記錄都可以保存,基于此,Hudi 和流計算系統結合的更加完善,可以流式讀取 CDC 數據 [2]:

    源頭 CDC 流的所有消息變更都在入湖之后保存下來,被用于流式消費。Flink 的有狀態計算實時累加計算結果 (state),通過流式寫 Hudi 將計算的變更同步到 Hudi 湖存儲,之后繼續對接 Flink 流式消費 Hudi 存儲的 changelog, 實現下一層級的有狀態計算。近實時端到端 ETL pipeline:

    這套架構將端到端的 ETL 時延縮短到分鐘級,并且每一層的存儲格式都可以通過 compaction 壓縮成列存(Parquet、ORC)以提供 OLAP 分析能力,由于數據湖的開放性,壓縮后的格式可以對接各種查詢引擎:Flink、Spark、Presto、Hive 等。

    一張 Hudi 數據湖表具備兩種形態:

    • 表形態:查詢最新的快照結果,同時提供高效的列存格式
    • 流形態:流式消費變更,可以指定任意點位流讀之后的 changelog

    三、演示

    我們通過一段 Demo 演示 Hudi 表的兩種形態。

    環境準備

    • Flink SQL Client
    • Hudi master 打包?hudi-flink-bundle?jar
    • Flink 1.13.1

    這里提前準備一段 debezium-json 格式的 CDC 數據

    {"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null} {"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} {"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} {"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} {"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} {"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} {"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} {"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} {"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null} {"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null} {"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null} {"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null} {"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null} {"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null} {"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null} {"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}

    通過 Flink SQL Client 創建表用來讀取 CDC 數據文件

    Flink SQL> CREATE TABLE debezium_source( > id INT NOT NULL, > ts BIGINT, > name STRING, > description STRING, > weight DOUBLE > ) WITH ( > 'connector' = 'filesystem', > 'path' = '/Users/chenyuzhao/workspace/hudi-demo/source.data', > 'format' = 'debezium-json' > ); [INFO] Execute statement succeed.

    執行 SELECT 觀察結果,可以看到一共有 20 條記錄,中間有一些 UPDATE s,最后一條消息是 DELETE

    Flink SQL> select * from debezium_source; +----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+ | op | id | ts | name | description | weight | +----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+ | +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 | | +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 | | +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 | | +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 | | +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 | | +I | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 | | +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 | | +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 | | +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 | | -U | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 | | +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 | | -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 | | +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 | | +I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 | | +I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 | | -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 | | +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 | | -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 | | +U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 | | -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 | +----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+ Received a total of 20 rows

    創建 Hudi 表,這里設置表的形態為?MERGE_ON_READ?并且打開 changelog 模式屬性?changelog.enabled

    Flink SQL> CREATE TABLE hoodie_table( > id INT NOT NULL PRIMARY KEY NOT ENFORCED, > ts BIGINT, > name STRING, > description STRING, > weight DOUBLE > ) WITH ( > 'connector' = 'hudi', > 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1', > 'table.type' = 'MERGE_ON_READ', > 'changelog.enabled' = 'true', > 'compaction.async.enabled' = 'false' > ); [INFO] Execute statement succeed.

    查詢

    通過 INSERT 語句將數據導入 Hudi,開啟流讀模式,并執行查詢觀察結果

    Flink SQL> select * from hoodie_table/*+ OPTIONS('read.streaming.enabled'='true')*/; +----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+ | op | id | ts | name | description | weight | +----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+ | +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 | | +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 | | +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 | | +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 | | +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 | | +I | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 | | +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 | | +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 | | +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 | | -U | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 | | +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 | | -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 | | +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 | | +I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 | | +I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 | | -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 | | +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 | | -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 | | +U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 | | -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 |

    可以看到 Hudi 保留了每行的變更記錄,包括 change log 的 operation 類型,這里我們打開 TABLE HINTS 功能,方便動態設置表參數。

    繼續使用 batch 讀模式,執行查詢觀察輸出結果,可以看到中間的變更被合并。

    Flink SQL> select * from hoodie_table; 2021-08-20 20:51:25,052 INFO org.apache.hadoop.conf.Configuration.deprecation [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb +----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+ | op | id | ts | name | description | weight | +----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+ | +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 | | +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 | | +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 | | +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 | | +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 | | +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 | | +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 | | +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 | | +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 | | +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 | +----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+ Received a total of 10 rows

    聚合

    Bounded Source 讀模式下計算?count(*)

    Flink SQL> select count (*) from hoodie_table; +----+----------------------+ | op | EXPR$0 | +----+----------------------+ | +I | 1 | | -U | 1 | | +U | 2 | | -U | 2 | | +U | 3 | | -U | 3 | | +U | 4 | | -U | 4 | | +U | 5 | | -U | 5 | | +U | 6 | | -U | 6 | | +U | 7 | | -U | 7 | | +U | 8 | | -U | 8 | | +U | 9 | | -U | 9 | | +U | 10 | +----+----------------------+ Received a total of 19 rows

    Streaming 讀模式下計算?count(*)

    Flink SQL> select count (*) from hoodie_table/*+OPTIONS('read.streaming.enabled'='true')*/; +----+----------------------+ | op | EXPR$0 | +----+----------------------+ | +I | 1 | | -U | 1 | | +U | 2 | | -U | 2 | | +U | 3 | | -U | 3 | | +U | 4 | | -U | 4 | | +U | 5 | | -U | 5 | | +U | 6 | | -U | 6 | | +U | 7 | | -U | 7 | | +U | 8 | | -U | 8 | | +U | 9 | | -U | 9 | | +U | 8 | | -U | 8 | | +U | 9 | | -U | 9 | | +U | 8 | | -U | 8 | | +U | 9 | | -U | 9 | | +U | 10 | | -U | 10 | | +U | 11 | | -U | 11 | | +U | 10 | | -U | 10 | | +U | 11 | | -U | 11 | | +U | 10 | | -U | 10 | | +U | 11 | | -U | 11 | | +U | 10 |

    可以看到 batch 和 streaming 模式下的計算結果是一致的。

    原文鏈接

    本文為阿里云原創內容,未經允許不得轉載。?

    總結

    以上是生活随笔為你收集整理的使用 Flink Hudi 构建流式数据湖的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。