2021年大数据Flink(三十八):Table与SQL 案例五 FlinkSQL整合Hive
目錄
案例五 FlinkSQL整合Hive
介紹
集成Hive的基本方式
準(zhǔn)備工作
1.添加hadoop_classpath
2.下載jar并上傳至flink/lib目錄
3.修改hive配置
4.啟動(dòng)hive元數(shù)據(jù)服務(wù)
SQL CLI
1.修改flinksql配置
2.啟動(dòng)flink集群
3.啟動(dòng)flink-sql客戶端
4.執(zhí)行sql:
代碼演示
案例五 FlinkSQL整合Hive
介紹
Apache Flink 1.12 Documentation: Hive
Flink集成Hive之快速入門--以Flink1.12為例 - 知乎
使用Hive構(gòu)建數(shù)據(jù)倉(cāng)庫(kù)已經(jīng)成為了比較普遍的一種解決方案。目前,一些比較常見的大數(shù)據(jù)處理引擎,都無一例外兼容Hive。Flink從1.9開始支持集成Hive,不過1.9版本為beta版,不推薦在生產(chǎn)環(huán)境中使用。在Flink1.10版本中,標(biāo)志著對(duì) Blink的整合宣告完成,對(duì) Hive 的集成也達(dá)到了生產(chǎn)級(jí)別的要求。值得注意的是,不同版本的Flink對(duì)于Hive的集成有所差異,接下來將以最新的Flink1.12版本為例,實(shí)現(xiàn)Flink集成Hive
集成Hive的基本方式
Flink 與 Hive 的集成主要體現(xiàn)在以下兩個(gè)方面:
- 持久化元數(shù)據(jù)
Flink利用 Hive 的 MetaStore 作為持久化的 Catalog,我們可通過HiveCatalog將不同會(huì)話中的 Flink 元數(shù)據(jù)存儲(chǔ)到 Hive Metastore 中。例如,我們可以使用HiveCatalog將其 Kafka的數(shù)據(jù)源表存儲(chǔ)在 Hive Metastore 中,這樣該表的元數(shù)據(jù)信息會(huì)被持久化到Hive的MetaStore對(duì)應(yīng)的元數(shù)據(jù)庫(kù)中,在后續(xù)的 SQL 查詢中,我們可以重復(fù)使用它們。
- 利用 Flink 來讀寫 Hive 的表
Flink打通了與Hive的集成,如同使用SparkSQL或者Impala操作Hive中的數(shù)據(jù)一樣,我們可以使用Flink直接讀寫Hive中的表。
HiveCatalog的設(shè)計(jì)提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive表。不需要修改現(xiàn)有的 Hive Metastore,也不需要更改表的數(shù)據(jù)位置或分區(qū)。
???????準(zhǔn)備工作
1.添加hadoop_classpath
vim /etc/profile
增加如下配置
export HADOOP_CLASSPATH=`hadoop classpath`
刷新配置
source /etc/profile
2.下載jar并上傳至flink/lib目錄
Apache Flink 1.12 Documentation: Hive
3.修改hive配置
vim /export/server/hive/conf/hive-site.xml
<property><name>hive.metastore.uris</name><value>thrift://node3:9083</value></property>
4.啟動(dòng)hive元數(shù)據(jù)服務(wù)
nohup /export/server/hive/bin/hive?--service metastore &
???????SQL CLI
1.修改flinksql配置
vim /export/server/flink/conf/sql-client-defaults.yaml
增加如下配置
catalogs:- name: myhivetype: hivehive-conf-dir: /export/server/hive/confdefault-database: default
2.啟動(dòng)flink集群
/export/server/flink/bin/start-cluster.sh
3.啟動(dòng)flink-sql客戶端
/export/server/flink/bin/sql-client.sh embedded
4.執(zhí)行sql:
show catalogs;use catalog myhive;show tables;select * from person;
???????代碼演示
package cn.it.extend;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** Author lanson* Desc*/
public class HiveDemo {public static void main(String[] args){EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();TableEnvironment tableEnv = TableEnvironment.create(settings);String name ???????????= "myhive";String defaultDatabase = "default";String hiveConfDir = "./conf";HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);//注冊(cè)catalogtableEnv.registerCatalog("myhive", hive);//使用注冊(cè)的catalogtableEnv.useCatalog("myhive");//向Hive表中寫入數(shù)據(jù)String insertSQL = "insert into person select * from person";TableResult result = tableEnv.executeSql(insertSQL);System.out.println(result.getJobClient().get().getJobStatus());}
}
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Flink(三十八):Table与SQL 案例五 FlinkSQL整合Hive的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(三十七):
- 下一篇: 2021年大数据Flink(三十九):