Flink 与 Hive 的磨合期
有不少讀者反饋,參考上篇文章《Hive 終于等來(lái)了 Flink》部署 Flink 并集成 Hive 時(shí),出現(xiàn)一些 bug 以及兼容性等問(wèn)題。雖已等來(lái),卻未可用。所以筆者增加了這一篇文章,作為姊妹篇。 回顧 在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可理解),Flink 源代碼本身對(duì) Hive 1.1.0 版本兼容性不好,存在不少問(wèn)題。為了兼容目前版本,筆者基于 CDH 5.16.2 環(huán)境,對(duì) Flink 代碼進(jìn)行了修改,重新打包并部署。 其實(shí)經(jīng)過(guò)很多開(kāi)源項(xiàng)目的實(shí)戰(zhàn),比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情況下,替換一些 Jar 包,是可以解決兼容性的問(wèn)題。對(duì)于筆者的環(huán)境來(lái)說(shuō),可以使用 Hive 1.2.1 版本的一些 Jar 包來(lái)代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的開(kāi)始部分,筆者會(huì)解決這個(gè)問(wèn)題,然后再補(bǔ)充上篇文章缺少的實(shí)戰(zhàn)內(nèi)容。 剪不斷理還亂的問(wèn)題 根據(jù)讀者的反饋,筆者將所有的問(wèn)題總結(jié)為三類:
Flink 如何連接 Hive 除了 API 外,有沒(méi)有類似 spark-sql 命令
識(shí)別不到 Hadoop 環(huán)境或配置文件找不到
依賴包、類或方法找不到
1. Flink 如何連接 Hive 有的讀者不太清楚,如何配置 Flink 連接 Hive 的 Catalog,這里補(bǔ)充一個(gè)完整的 conf/sql-client-hive.yaml 示例: catalogs: - name: staginghive type: hive hive-conf-dir: /etc/hive/conf hive-version: 1.2.1 execution: planner: blink type: batch time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 1 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: staginghive current-database: ssb restart-strategy: type: fallback deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 m: yarn-cluster yn: 2 ys: 5 yjm: 1024 ytm: 2048 sql-client-hive.yaml 配置文件里面包含:
Hive 配置文件 catalogs 中配置了 Hive 的配置文件路徑。
Yarn 配置信息 deployment 中配置了 Yarn 的配置信息。
執(zhí)行引擎信息 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比較穩(wěn)定,適合傳統(tǒng)的批處理作業(yè),而且可以容錯(cuò),另外中間數(shù)據(jù)落盤(pán),建議開(kāi)啟壓縮功能。除了 batch,Flink 也支持 streaming 模式。
■ Flink SQL CLI 工具 類似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 腳本。在 Flink 1.10 版本中,Flink SQL CLI 改進(jìn)了很多功能,筆者后面講解。 sql-client.sh 使用方式如下: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 2. 識(shí)別不到 Hadoop 環(huán)境或配置文件找不到 筆者在上篇文章中提到過(guò),在部署 Flink 的環(huán)境上部署 CDH gateway,包括 Hadoop、Hive 客戶端,另外還需要配置一些環(huán)境變量,如下: export HADOOP_CONF_DIR=/etc/hadoop/conf export YARN_CONF_DIR=/etc/hadoop/conf export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive export HIVE_CONF_DIR=/etc/hive/conf 3. 依賴包、類或方法找不到 先查看一下 Flink 家目錄下的 lib 目錄: $ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive-exec-1.1.0-cdh5.16.2.jar ├── hive-metastore-1.1.0-cdh5.16.2.jar ├── libfb303-0.9.3.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar 如果上面前兩個(gè)問(wèn)題都解決后,執(zhí)行如下命令: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 報(bào)錯(cuò),報(bào)錯(cuò),還是報(bào)錯(cuò): Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory 其實(shí)在運(yùn)行 sql-client.sh 腳本前,需要指定 Hadoop 環(huán)境的依賴包的路徑,建議不要報(bào)錯(cuò)一個(gè)添加一個(gè),除非有的讀者喜歡。這里筆者提示一個(gè)方便的方式,即設(shè)置 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)環(huán)境變量: export HADOOP_CLASSPATH=`hadoop classpath` 再次執(zhí)行: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 很抱歉,繼續(xù)報(bào)錯(cuò): Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client 這里就是 Hive 1.1.0 版本的 Jar 包與 Flink 出現(xiàn)版本不兼容性的問(wèn)題了,解決方法是:
下載 apache-hive-1.2.1 版本
替換 Flink lib 目錄下的 Hive Jar 包 刪除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然后添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次查看 lib 目錄:
$ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive-exec-1.2.1.jar ├── hive-metastore-1.2.1.jar ├── libfb303-0.9.2.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar 最后再執(zhí)行: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 這時(shí),讀者就可以看到手握栗子的可愛(ài)小松鼠了。
Flink SQL CLI 實(shí)踐 在 Flink 1.10 版本(目前為 RC1 階段) 中,Flink 社區(qū)對(duì) SQL CLI 做了大量的改動(dòng),比如支持 View、支持更多的數(shù)據(jù)類型和 DDL 語(yǔ)句、支持分區(qū)讀寫(xiě)、支持 INSERT OVERWRITE 等,實(shí)現(xiàn)了更多的 TableEnvironment API 的功能,更加方便用戶使用。 接下來(lái),筆者詳細(xì)講解 Flink SQL CLI。 0. Help 執(zhí)行下面命令,登錄 Flink SQL 客戶端: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> 執(zhí)行 HELP,查看 Flink SQL 支持的命令,如下為大部分常用的:
-
CREATE TABLE
-
DROP TABLE
-
CREATE VIEW
-
DESCRIBE
-
DROP VIEW
-
EXPLAIN
-
INSERT INTO
-
INSERT OVERWRITE
-
SELECT
-
SHOW FUNCTIONS
-
USE CATALOG
-
SHOW TABLES
-
SHOW DATABASES
-
SOURCE
-
USE
-
SHOW CATALOGS
1. Hive 操作 ■ 1.1 創(chuàng)建表和導(dǎo)入數(shù)據(jù) 為了方便讀者進(jìn)行實(shí)驗(yàn),筆者使用 ssb-dbgen 生成測(cè)試數(shù)據(jù),讀者也可以使用測(cè)試環(huán)境已有的數(shù)據(jù)來(lái)進(jìn)行實(shí)驗(yàn)。 具體如何在 Hive 中一鍵式創(chuàng)建表并插入數(shù)據(jù),可以參考筆者早期的項(xiàng)目 https://github.com/MLikeWater/ssb-kylin。 ■ 1.2 Hive 表 查看上個(gè)步驟中創(chuàng)建的 Hive 表: 0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables; +--------------+--+ | tab_name | +--------------+--+ | customer | | dates | | lineorder | | p_lineorder | | part | | supplier | +--------------+--+ 讀者可以對(duì) Hive 進(jìn)行各種查詢,對(duì)比后面 Flink SQL 查詢的結(jié)果。 2. Flink 操作 ■ 2.1 通過(guò) HiveCatalog 訪問(wèn) Hive 數(shù)據(jù)庫(kù) 登錄 Flink SQL CLI,并查詢 catalogs: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> show catalogs; default_catalog staginghive Flink SQL> use catalog staginghive; 通過(guò) show catalogs 獲取配置的所有 catalog。由于筆者在 sql-client-hive.yaml 文件中設(shè)置了默認(rèn)的 catalog,即為 staginghive。如果需要切換到其他 catalog,可以使用 usecatalog xxx。 ■ 2.2 查詢 Hive 元數(shù)據(jù) 通過(guò) Flink SQL 查詢 Hive 數(shù)據(jù)庫(kù)和表: # 查詢數(shù)據(jù)庫(kù) Flink SQL> show databases; ... ssb tmp ... Flink SQL> use ssb; # 查詢表 Flink SQL> show tables; customer dates lineorder p_lineorder part supplier # 查詢表結(jié)構(gòu) Flink SQL> DESCRIBE customer; root |-- c_custkey: INT |-- c_name: STRING |-- c_address: STRING |-- c_city: STRING |-- c_nation: STRING |-- c_region: STRING |-- c_phone: STRING |-- c_mktsegment: STRING 這里需要注意,Hive 的元數(shù)據(jù)在 Flink catalog 中都以小寫(xiě)字母使用。 ■ 2.3 查詢 接下來(lái),在 Flink SQL CLI 中查詢一些 SQL 語(yǔ)句,完整 SQL 參考 https://github.com/MLikeWater/ssb-kylin 的 README。 目前 Flink SQL 解析 Hive 視圖元數(shù)據(jù)時(shí),會(huì)遇到一些 Bug,比如執(zhí)行 Q1.1 SQL: Flink SQL> select sum(v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'? Flink SQL 找不到視圖中的實(shí)體表。 p_lineorder 表是 Hive 中的一張視圖,創(chuàng)建表的語(yǔ)句如下: CREATE VIEW P_LINEORDER AS SELECT LO_ORDERKEY, LO_LINENUMBER, LO_CUSTKEY, LO_PARTKEY, LO_SUPPKEY, LO_ORDERDATE, LO_ORDERPRIOTITY, LO_SHIPPRIOTITY, LO_QUANTITY, LO_EXTENDEDPRICE, LO_ORDTOTALPRICE, LO_DISCOUNT, LO_REVENUE, LO_SUPPLYCOST, LO_TAX, LO_COMMITDATE, LO_SHIPMODE, LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE FROM ssb.LINEORDER; 但是對(duì)于 Hive 中視圖的定義,Flink SQL 并沒(méi)有很好地處理元數(shù)據(jù)。為了后面 SQL 的順利執(zhí)行,這里筆者在 Hive 中刪除并重建該視圖: 0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder as select lo_orderkey, lo_linenumber, lo_custkey, lo_partkey, lo_suppkey, lo_orderdate, lo_orderpriotity, lo_shippriotity, lo_quantity, lo_extendedprice, lo_ordtotalprice, lo_discount, lo_revenue, lo_supplycost, lo_tax, lo_commitdate, lo_shipmode, lo_extendedprice*lo_discount as v_revenue from ssb.lineorder; 然后繼續(xù)在 Flink SQL CLI 中查詢 Q1.1 SQL: Flink SQL> select sum(v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; revenue 894280292647 繼續(xù)查詢 Q2.1 SQL: Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join part on lo_partkey = p_partkey > left join supplier on lo_suppkey = s_suppkey > where p_category = 'MFGR#12' and s_region = 'AMERICA' > group by d_year, p_brand > order by d_year, p_brand; lo_revenue d_year p_brand 819634128 1998 MFGR#1206 877651232 1998 MFGR#1207 754489428 1998 MFGR#1208 816369488 1998 MFGR#1209 668482306 1998 MFGR#1210 660366608 1998 MFGR#1211 862902570 1998 MFGR#1212 ... 最后再查詢一個(gè) Q4.3 SQL: Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join customer on lo_custkey = c_custkey > left join supplier on lo_suppkey = s_suppkey > left join part on lo_partkey = p_partkey > where c_region = 'AMERICA'and s_nation = 'UNITED STATES' > and (d_year = 1997 or d_year = 1998) > and p_category = 'MFGR#14' > group by d_year, s_city, p_brand > order by d_year, s_city, p_brand; d_year s_city p_brand profit 1998 UNITED ST9 MFGR#1440 6665681 如果讀者感興趣的話,可以查詢剩余的 SQL,當(dāng)然也可以和 Spark SQL 進(jìn)行比較。另外 Flink SQL 也支持 EXPLAIN,查詢 SQL 的執(zhí)行計(jì)劃。 ■ 2.4 創(chuàng)建視圖 同樣,可以在 Flink SQL CLI 中創(chuàng)建和刪除視圖,如下: Flink SQL> create view p_lineorder2 as > select lo_orderkey, > lo_linenumber, > lo_custkey, > lo_partkey, > lo_suppkey, > lo_orderdate, > lo_orderpriotity, > lo_shippriotity, > lo_quantity, > lo_extendedprice, > lo_ordtotalprice, > lo_discount, > lo_revenue, > lo_supplycost, > lo_tax, > lo_commitdate, > lo_shipmode, > lo_extendedprice * lo_discount as v_revenue > from ssb.lineorder; [INFO] View has been created. 這里筆者需要特別強(qiáng)調(diào)的是,目前 Flink 無(wú)法刪除 Hive 中的視圖: Flink SQL> drop view p_lineorder; [ERROR] Could not execute SQL statement. Reason: The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed. ■ 2.5 分區(qū)操作 Hive 數(shù)據(jù)庫(kù)中創(chuàng)建一張分區(qū)表: CREATE TABLE IF NOT EXISTS flink_partition_test ( id int, name string ) PARTITIONED BY (day string, type string) stored as textfile; 接著,通過(guò) Flink SQL 插入和查詢數(shù)據(jù): # 插入靜態(tài)分區(qū)的數(shù)據(jù) Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001'; # 查詢 Flink SQL> select * from flink_partition_test; id name day type 100001 Flink001 2020-02-01 Flink # 插入動(dòng)態(tài)分區(qū) Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL'; # 查詢 Flink SQL> select * from flink_partition_test; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink # 動(dòng)態(tài)和靜態(tài)分區(qū)結(jié)合使用類似,不再演示 # 覆蓋插入數(shù)據(jù) Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4'; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink 字段 day 在 Flink 屬于關(guān)鍵字,要特殊處理。 ■ 2.6 其他功能
-
2.6.1 函數(shù)
Flink SQL 支持內(nèi)置的函數(shù)和自定義函數(shù)。對(duì)于內(nèi)置的函數(shù),可以執(zhí)行 show functions 進(jìn)行查看,這一塊筆者以后會(huì)單獨(dú)介紹如何創(chuàng)建自定義函數(shù)。
-
2.6.2 設(shè)置參數(shù)
Flink SQL 支持設(shè)置環(huán)境參數(shù),可以使用 set 命令查看和設(shè)置參數(shù): Flink SQL> set; deployment.gateway-address= deployment.gateway-port=0 deployment.m=yarn-cluster deployment.response-timeout=5000 deployment.yjm=1024 deployment.yn=2 deployment.ys=5 deployment.ytm=2048 execution.current-catalog=staginghive execution.current-database=ssb execution.max-idle-state-retention=0 execution.max-parallelism=128 execution.max-table-result-rows=1000000 execution.min-idle-state-retention=0 execution.parallelism=1 execution.periodic-watermarks-interval=200 execution.planner=blink execution.restart-strategy.type=fallback execution.result-mode=table execution.time-characteristic=event-time execution.type=batch Flink SQL> set deployment.yjm = 2048; 總結(jié) 在本文中,筆者通過(guò) Flink SQL 比較詳細(xì)地去操作 Hive 數(shù)據(jù)庫(kù),以及 Flink SQL 提供的一些功能。 當(dāng)然,目前 Flink SQL 操作 Hive 數(shù)據(jù)庫(kù)還是存在一些問(wèn)題:
-
目前只支持 TextFile 存儲(chǔ)格式,還無(wú)法指定其他存儲(chǔ)格式 只支持 Hive 數(shù)據(jù)庫(kù)中 TextFile 存儲(chǔ)格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。雖然實(shí)現(xiàn)了 RCFile、ORC、Parquet、Sequence 等存儲(chǔ)格式,但是無(wú)法自動(dòng)識(shí)別 Hive 表的存儲(chǔ)格式。如果要使用其他存儲(chǔ)格式,需要修改源碼,重新編譯。不過(guò)社區(qū)已經(jīng)對(duì)這些存儲(chǔ)格式進(jìn)行了測(cè)試,相信不久以后就可以在 Flink SQL 中使用。
-
OpenCSVSerde 支持不完善 如果讀者使用 TextFile 的 row format serde 為 org.apache.hadoop.hive.serde2.OpenCSVSerde 時(shí),無(wú)法正確識(shí)別字段類型,會(huì)把 Hive 表的字段全部映射為 String 類型。
-
暫時(shí)不支持 Bucket 表
-
暫時(shí)不支持 ACID 表
-
Flink SQL 優(yōu)化方面功能較少
-
權(quán)限控制方面 這方面和 Spark SQL 類似,目前基于 HDFS ACL 控制,暫時(shí)還沒(méi)有實(shí)現(xiàn) Sentry 或 Ranger 控制權(quán)限,不過(guò)目前 Cloudera 正在開(kāi)發(fā)基于 Ranger 設(shè)置 Spark SQL 和 Hive 共享訪問(wèn)權(quán)限的策略,實(shí)現(xiàn)行/列級(jí)控制以及審計(jì)信息。
Flink 社區(qū)發(fā)展很快,所有這些問(wèn)題只是暫時(shí)的,隨著新版本的發(fā)布會(huì)被逐個(gè)解決。 如果 Flink SQL 目前不滿足的需求,建議使用 API 方式來(lái)解決問(wèn)題。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Flink 与 Hive 的磨合期的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: F1 Query: Declarativ
- 下一篇: Nexus协议,闲鱼一体化开发的幕后玩家