日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink 与 Hive 的磨合期

發(fā)布時間:2024/8/23 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 与 Hive 的磨合期 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

有不少讀者反饋,參考上篇文章《Hive 終于等來了 Flink》部署 Flink 并集成 Hive 時,出現(xiàn)一些 bug 以及兼容性等問題。雖已等來,卻未可用。所以筆者增加了這一篇文章,作為姊妹篇。 回顧 在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可理解),Flink 源代碼本身對 Hive 1.1.0 版本兼容性不好,存在不少問題。為了兼容目前版本,筆者基于 CDH 5.16.2 環(huán)境,對 Flink 代碼進行了修改,重新打包并部署。 其實經過很多開源項目的實戰(zhàn),比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情況下,替換一些 Jar 包,是可以解決兼容性的問題。對于筆者的環(huán)境來說,可以使用 Hive 1.2.1 版本的一些 Jar 包來代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的開始部分,筆者會解決這個問題,然后再補充上篇文章缺少的實戰(zhàn)內容。 剪不斷理還亂的問題 根據讀者的反饋,筆者將所有的問題總結為三類:

  • Flink 如何連接 Hive 除了 API 外,有沒有類似 spark-sql 命令

  • 識別不到 Hadoop 環(huán)境或配置文件找不到

  • 依賴包、類或方法找不到

  • 1. Flink 如何連接 Hive 有的讀者不太清楚,如何配置 Flink 連接 Hive 的 Catalog,這里補充一個完整的 conf/sql-client-hive.yaml 示例: catalogs: - name: staginghive type: hive hive-conf-dir: /etc/hive/conf hive-version: 1.2.1 execution: planner: blink type: batch time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 1 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: staginghive current-database: ssb restart-strategy: type: fallback deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 m: yarn-cluster yn: 2 ys: 5 yjm: 1024 ytm: 2048 sql-client-hive.yaml 配置文件里面包含:

  • Hive 配置文件 catalogs 中配置了 Hive 的配置文件路徑。

  • Yarn 配置信息 deployment 中配置了 Yarn 的配置信息。

  • 執(zhí)行引擎信息 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比較穩(wěn)定,適合傳統(tǒng)的批處理作業(yè),而且可以容錯,另外中間數(shù)據落盤,建議開啟壓縮功能。除了 batch,Flink 也支持 streaming 模式。

  • ■ Flink SQL CLI 工具 類似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 腳本。在 Flink 1.10 版本中,Flink SQL CLI 改進了很多功能,筆者后面講解。 sql-client.sh 使用方式如下: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 2. 識別不到 Hadoop 環(huán)境或配置文件找不到 筆者在上篇文章中提到過,在部署 Flink 的環(huán)境上部署 CDH gateway,包括 Hadoop、Hive 客戶端,另外還需要配置一些環(huán)境變量,如下: export HADOOP_CONF_DIR=/etc/hadoop/conf export YARN_CONF_DIR=/etc/hadoop/conf export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive export HIVE_CONF_DIR=/etc/hive/conf 3. 依賴包、類或方法找不到 先查看一下 Flink 家目錄下的 lib 目錄: $ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive-exec-1.1.0-cdh5.16.2.jar ├── hive-metastore-1.1.0-cdh5.16.2.jar ├── libfb303-0.9.3.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar 如果上面前兩個問題都解決后,執(zhí)行如下命令: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 報錯,報錯,還是報錯: Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory 其實在運行 sql-client.sh 腳本前,需要指定 Hadoop 環(huán)境的依賴包的路徑,建議不要報錯一個添加一個,除非有的讀者喜歡。這里筆者提示一個方便的方式,即設置 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)環(huán)境變量: export HADOOP_CLASSPATH=`hadoop classpath` 再次執(zhí)行: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 很抱歉,繼續(xù)報錯: Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client 這里就是 Hive 1.1.0 版本的 Jar 包與 Flink 出現(xiàn)版本不兼容性的問題了,解決方法是:

  • 下載 apache-hive-1.2.1 版本

  • 替換 Flink lib 目錄下的 Hive Jar 包 刪除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然后添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次查看 lib 目錄:

  • $ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive-exec-1.2.1.jar ├── hive-metastore-1.2.1.jar ├── libfb303-0.9.2.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar 最后再執(zhí)行: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 這時,讀者就可以看到手握栗子的可愛小松鼠了。

    Flink SQL CLI 實踐 在 Flink 1.10 版本(目前為 RC1 階段) 中,Flink 社區(qū)對 SQL CLI 做了大量的改動,比如支持 View、支持更多的數(shù)據類型和 DDL 語句、支持分區(qū)讀寫、支持 INSERT OVERWRITE 等,實現(xiàn)了更多的 TableEnvironment API 的功能,更加方便用戶使用。 接下來,筆者詳細講解 Flink SQL CLI。 0. Help 執(zhí)行下面命令,登錄 Flink SQL 客戶端: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> 執(zhí)行 HELP,查看 Flink SQL 支持的命令,如下為大部分常用的:

    • CREATE TABLE

    • DROP TABLE

    • CREATE VIEW

    • DESCRIBE

    • DROP VIEW

    • EXPLAIN

    • INSERT INTO

    • INSERT OVERWRITE

    • SELECT

    • SHOW FUNCTIONS

    • USE CATALOG

    • SHOW TABLES

    • SHOW DATABASES

    • SOURCE

    • USE

    • SHOW CATALOGS

    1. Hive 操作 ■ 1.1 創(chuàng)建表和導入數(shù)據 為了方便讀者進行實驗,筆者使用 ssb-dbgen 生成測試數(shù)據,讀者也可以使用測試環(huán)境已有的數(shù)據來進行實驗。 具體如何在 Hive 中一鍵式創(chuàng)建表并插入數(shù)據,可以參考筆者早期的項目 https://github.com/MLikeWater/ssb-kylin。 ■ 1.2 Hive 表 查看上個步驟中創(chuàng)建的 Hive 表: 0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables; +--------------+--+ | tab_name | +--------------+--+ | customer | | dates | | lineorder | | p_lineorder | | part | | supplier | +--------------+--+ 讀者可以對 Hive 進行各種查詢,對比后面 Flink SQL 查詢的結果。 2. Flink 操作 ■ 2.1 通過 HiveCatalog 訪問 Hive 數(shù)據庫 登錄 Flink SQL CLI,并查詢 catalogs: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> show catalogs; default_catalog staginghive Flink SQL> use catalog staginghive; 通過 show catalogs 獲取配置的所有 catalog。由于筆者在 sql-client-hive.yaml 文件中設置了默認的 catalog,即為 staginghive。如果需要切換到其他 catalog,可以使用 usecatalog xxx。 ■ 2.2 查詢 Hive 元數(shù)據 通過 Flink SQL 查詢 Hive 數(shù)據庫和表: # 查詢數(shù)據庫 Flink SQL> show databases; ... ssb tmp ... Flink SQL> use ssb; # 查詢表 Flink SQL> show tables; customer dates lineorder p_lineorder part supplier # 查詢表結構 Flink SQL> DESCRIBE customer; root |-- c_custkey: INT |-- c_name: STRING |-- c_address: STRING |-- c_city: STRING |-- c_nation: STRING |-- c_region: STRING |-- c_phone: STRING |-- c_mktsegment: STRING 這里需要注意,Hive 的元數(shù)據在 Flink catalog 中都以小寫字母使用。 ■ 2.3 查詢 接下來,在 Flink SQL CLI 中查詢一些 SQL 語句,完整 SQL 參考 https://github.com/MLikeWater/ssb-kylin 的 README。 目前 Flink SQL 解析 Hive 視圖元數(shù)據時,會遇到一些 Bug,比如執(zhí)行 Q1.1 SQL: Flink SQL> select sum(v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'? Flink SQL 找不到視圖中的實體表。 p_lineorder 表是 Hive 中的一張視圖,創(chuàng)建表的語句如下: CREATE VIEW P_LINEORDER AS SELECT LO_ORDERKEY, LO_LINENUMBER, LO_CUSTKEY, LO_PARTKEY, LO_SUPPKEY, LO_ORDERDATE, LO_ORDERPRIOTITY, LO_SHIPPRIOTITY, LO_QUANTITY, LO_EXTENDEDPRICE, LO_ORDTOTALPRICE, LO_DISCOUNT, LO_REVENUE, LO_SUPPLYCOST, LO_TAX, LO_COMMITDATE, LO_SHIPMODE, LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE FROM ssb.LINEORDER; 但是對于 Hive 中視圖的定義,Flink SQL 并沒有很好地處理元數(shù)據。為了后面 SQL 的順利執(zhí)行,這里筆者在 Hive 中刪除并重建該視圖: 0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder as select lo_orderkey, lo_linenumber, lo_custkey, lo_partkey, lo_suppkey, lo_orderdate, lo_orderpriotity, lo_shippriotity, lo_quantity, lo_extendedprice, lo_ordtotalprice, lo_discount, lo_revenue, lo_supplycost, lo_tax, lo_commitdate, lo_shipmode, lo_extendedprice*lo_discount as v_revenue from ssb.lineorder; 然后繼續(xù)在 Flink SQL CLI 中查詢 Q1.1 SQL: Flink SQL> select sum(v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; revenue 894280292647 繼續(xù)查詢 Q2.1 SQL: Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join part on lo_partkey = p_partkey > left join supplier on lo_suppkey = s_suppkey > where p_category = 'MFGR#12' and s_region = 'AMERICA' > group by d_year, p_brand > order by d_year, p_brand; lo_revenue d_year p_brand 819634128 1998 MFGR#1206 877651232 1998 MFGR#1207 754489428 1998 MFGR#1208 816369488 1998 MFGR#1209 668482306 1998 MFGR#1210 660366608 1998 MFGR#1211 862902570 1998 MFGR#1212 ... 最后再查詢一個 Q4.3 SQL: Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join customer on lo_custkey = c_custkey > left join supplier on lo_suppkey = s_suppkey > left join part on lo_partkey = p_partkey > where c_region = 'AMERICA'and s_nation = 'UNITED STATES' > and (d_year = 1997 or d_year = 1998) > and p_category = 'MFGR#14' > group by d_year, s_city, p_brand > order by d_year, s_city, p_brand; d_year s_city p_brand profit 1998 UNITED ST9 MFGR#1440 6665681 如果讀者感興趣的話,可以查詢剩余的 SQL,當然也可以和 Spark SQL 進行比較。另外 Flink SQL 也支持 EXPLAIN,查詢 SQL 的執(zhí)行計劃。 ■ 2.4 創(chuàng)建視圖 同樣,可以在 Flink SQL CLI 中創(chuàng)建和刪除視圖,如下: Flink SQL> create view p_lineorder2 as > select lo_orderkey, > lo_linenumber, > lo_custkey, > lo_partkey, > lo_suppkey, > lo_orderdate, > lo_orderpriotity, > lo_shippriotity, > lo_quantity, > lo_extendedprice, > lo_ordtotalprice, > lo_discount, > lo_revenue, > lo_supplycost, > lo_tax, > lo_commitdate, > lo_shipmode, > lo_extendedprice * lo_discount as v_revenue > from ssb.lineorder; [INFO] View has been created. 這里筆者需要特別強調的是,目前 Flink 無法刪除 Hive 中的視圖: Flink SQL> drop view p_lineorder; [ERROR] Could not execute SQL statement. Reason: The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed. ■ 2.5 分區(qū)操作 Hive 數(shù)據庫中創(chuàng)建一張分區(qū)表: CREATE TABLE IF NOT EXISTS flink_partition_test ( id int, name string ) PARTITIONED BY (day string, type string) stored as textfile; 接著,通過 Flink SQL 插入和查詢數(shù)據: # 插入靜態(tài)分區(qū)的數(shù)據 Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001'; # 查詢 Flink SQL> select * from flink_partition_test; id name day type 100001 Flink001 2020-02-01 Flink # 插入動態(tài)分區(qū) Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL'; # 查詢 Flink SQL> select * from flink_partition_test; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink # 動態(tài)和靜態(tài)分區(qū)結合使用類似,不再演示 # 覆蓋插入數(shù)據 Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4'; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink 字段 day 在 Flink 屬于關鍵字,要特殊處理。 ■ 2.6 其他功能

    • 2.6.1 函數(shù)

    Flink SQL 支持內置的函數(shù)和自定義函數(shù)。對于內置的函數(shù),可以執(zhí)行 show functions 進行查看,這一塊筆者以后會單獨介紹如何創(chuàng)建自定義函數(shù)。

    • 2.6.2 設置參數(shù)

    Flink SQL 支持設置環(huán)境參數(shù),可以使用 set 命令查看和設置參數(shù): Flink SQL> set; deployment.gateway-address= deployment.gateway-port=0 deployment.m=yarn-cluster deployment.response-timeout=5000 deployment.yjm=1024 deployment.yn=2 deployment.ys=5 deployment.ytm=2048 execution.current-catalog=staginghive execution.current-database=ssb execution.max-idle-state-retention=0 execution.max-parallelism=128 execution.max-table-result-rows=1000000 execution.min-idle-state-retention=0 execution.parallelism=1 execution.periodic-watermarks-interval=200 execution.planner=blink execution.restart-strategy.type=fallback execution.result-mode=table execution.time-characteristic=event-time execution.type=batch Flink SQL> set deployment.yjm = 2048; 總結 在本文中,筆者通過 Flink SQL 比較詳細地去操作 Hive 數(shù)據庫,以及 Flink SQL 提供的一些功能。 當然,目前 Flink SQL 操作 Hive 數(shù)據庫還是存在一些問題:

    • 目前只支持 TextFile 存儲格式,還無法指定其他存儲格式 只支持 Hive 數(shù)據庫中 TextFile 存儲格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。雖然實現(xiàn)了 RCFile、ORC、Parquet、Sequence 等存儲格式,但是無法自動識別 Hive 表的存儲格式。如果要使用其他存儲格式,需要修改源碼,重新編譯。不過社區(qū)已經對這些存儲格式進行了測試,相信不久以后就可以在 Flink SQL 中使用。

    • OpenCSVSerde 支持不完善 如果讀者使用 TextFile 的 row format serde 為 org.apache.hadoop.hive.serde2.OpenCSVSerde 時,無法正確識別字段類型,會把 Hive 表的字段全部映射為 String 類型。

    • 暫時不支持 Bucket 表

    • 暫時不支持 ACID 表

    • Flink SQL 優(yōu)化方面功能較少

    • 權限控制方面 這方面和 Spark SQL 類似,目前基于 HDFS ACL 控制,暫時還沒有實現(xiàn) Sentry 或 Ranger 控制權限,不過目前 Cloudera 正在開發(fā)基于 Ranger 設置 Spark SQL 和 Hive 共享訪問權限的策略,實現(xiàn)行/列級控制以及審計信息。

    Flink 社區(qū)發(fā)展很快,所有這些問題只是暫時的,隨著新版本的發(fā)布會被逐個解決。 如果 Flink SQL 目前不滿足的需求,建議使用 API 方式來解決問題。

    原文鏈接
    本文為云棲社區(qū)原創(chuàng)內容,未經允許不得轉載。

    總結

    以上是生活随笔為你收集整理的Flink 与 Hive 的磨合期的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。