hive内置函数_flink教程flink modules详解之使用hive函数
modules概念
通過hive module使用hive函數
內置函數
自定義函數
sql 客戶端的使用
原理分析和源碼解析
實現
modules概念
flink 提供了一個module的概念,使用戶能擴展flink的內置對象,比如內置函數。這個功能是插件化的,用戶可以方便的加載或者移除相應的module。
flink內置了CoreModule,并且提供了一個hive module,允許用戶在加載了hive module之后使用hive的函數,包括內置函數、自定義hive函數等等。如果多個module里有重名的函數,則以先加載的函數為準。
用戶還可以自定義module,只需要實現Module接口即可。如果是在sql 客戶端使用,還需要實現ModuleFactory接口,因為加載的時候,flink會使用SPI機制去匹配獲取相應的ModuleFactory,然后實例化相應的moudule。
通過hive module使用hive函數
我們以hive module為例,講解一下如何使用flink提供的module功能,使用hive module的一些注意事項:
- 通過 Hive Metastore 將帶有 UDF 的 HiveCatalog 設置為當前會話的 catalog。
- 將帶有 UDF 的 jar 包放入 Flink classpath 中,并在代碼中引入。
- 使用 Blink planner,flink 1.11默認就是,不用顯示指定
內置函數
- 引入pom
- 加載module
??String?version?=?"3.1.2";
??tEnv.loadModule(name,?new?HiveModule(version));
- 查看module
??String[]?modules?=?tEnv.listModules();
??Arrays.stream(modules).forEach(System.out::println);
運行結果我們看到有兩個module
list?modules?------------------?core
myhive
- 查看函數
??String[]?functions?=?tEnv.listFunctions();
??Arrays.stream(functions).forEach(System.out::println);
我們看到列出來大概300多個函數,包含flink和hive的內置函數。
- hive函數的使用
在hive里有一個常用的解析json的函數get_json_object,這個可以把json字符串解析之后得到想要的字段,但是flink中沒有這個函數,所以我們可以通過這種方式來使用hive的函數,就不用我們自己開發UDF了。
?System.out.println("hive?函數的使用:??------------------??");??String?sql?=?"SELECT?data,get_json_object(data,?'$.name')??FROM?(VALUES?('{\"name\":\"flink\"}'),?('{\"name\":\"hadoop\"}'))?AS?MyTable(data)";
??List?results?=?Lists.newArrayList(tEnv.sqlQuery(sql)
?????????????????????????????????????????????.execute()
?????????????????????????????????????????????.collect());
??results.stream().forEach(System.out::println);
輸出結果:
hive?函數的使用:??------------------??{"name":"flink"},flink
{"name":"hadoop"},hadoop
自定義函數
前面我們講了如何使用hive的內置函數,這個比較簡單,接在了hive的module之后就可以用了,還有一種就是如何使用hive的udf函數呢?我們接下來簡單聊聊。
- 自定義hive函數
首先我們來自定義一個hive的udf函數
實現一個自定義函數,就是實現兩個int類型數字的加和操作
?public?IntWritable?evaluate(IntWritable?i,IntWritable?j){
??return?new?IntWritable(i.get()?+?j.get());
?}
}
完整代碼:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/hive/src/main/java/com/test/TestHiveUDF.java
把相應的jar放到hive的classpath下面
定義函數
add?jar?/home/work/work/hive/lib/hive-1.0-SNAPSHOT.jar;?CREATE??FUNCTION?mysum?AS?"com.test.TestHiveUDF";?
??System.out.println("是否包含自定義函數:?"?+?b);
??String?sqlUdf?=?"select?mysum(1,2)";
??List?results1?=?Lists.newArrayList(tEnv.sqlQuery(sqlUdf)
?????????????????????????????????????????.execute()
?????????????????????????????????????????.collect());
??System.out.println("使用自定義函數處理結果:?");
??results1.stream().forEach(System.out::println);
完整的代碼請參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/modules/HiveModulesTest.java
sql 客戶端的使用
在sql-client-defaults.yaml里配置相關的模塊,然后就可以使用了.
#?Define?modules?here.modules:?#?note?the?following?modules?will?be?of?the?order?they?are?specified
??-?name:?core
????type:?core
??-?name:?hive
????type:?hive
原理分析和源碼解析
其實相關的源碼實現也不難,就是將hive的相關函數轉成了flink的函數,我們簡單的來看下,主要是在HiveModule類里面。
public?class?HiveModule?implements?Module?{????.............
?private?final?HiveFunctionDefinitionFactory?factory;
?private?final?String?hiveVersion;
?private?final?HiveShim?hiveShim;
這個里面有三個主要的變量,用于構造函數的factory,hive的版本hiveVersion,以及用于處理不同版本hive的處理類hiveShim。
實現
具體轉換函數的方法是getFunctionDefinition,這個方法調用了工廠類的createFunctionDefinitionFromHiveFunction方法,
我們進入 HiveFunctionDefinitionFactory#createFunctionDefinitionFromHiveFunction。
public?FunctionDefinition?createFunctionDefinitionFromHiveFunction(String?name,?String?functionClassName)?{??Class?clazz;
??try?{
???clazz?=?Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
???LOG.info("Successfully?loaded?Hive?udf?'{}'?with?class?'{}'",?name,?functionClassName);
??}?catch?(ClassNotFoundException?e)?{
???throw?new?TableException(
????String.format("Failed?to?initiate?an?instance?of?class?%s.",?functionClassName),?e);
??}
??if?(UDF.class.isAssignableFrom(clazz))?{
???LOG.info("Transforming?Hive?function?'{}'?into?a?HiveSimpleUDF",?name);
???return?new?ScalarFunctionDefinition(
????name,
????new?HiveSimpleUDF(new?HiveFunctionWrapper<>(functionClassName),?hiveShim)
???);
??}
??..........
我們看到首先會加載相關函數,這個也就是為什么要求我們把hive的udf jar放到flink的classpath的原因。之后是一堆if else判斷,Hive UDF 和 GenericUDF 函數會自動轉換成 Flink 中的 ScalarFunction,GenericUDTF 會被自動轉換成 Flink 中的 TableFunction,UDAF 和 GenericUDAFResolver2 則轉換成 Flink 聚合函數(AggregateFunction).這樣當我們就可以在flink中使用相應的hive函數了。
參考資料:
[1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_functions.html
更多內容,歡迎關注我的公眾號【大數據技術與應用實戰】
image總結
以上是生活随笔為你收集整理的hive内置函数_flink教程flink modules详解之使用hive函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hive 指定字段插入数据_Hive 表
- 下一篇: 存在就不插入_动画:面试官问我插入排序和