日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hive内置函数_flink教程flink modules详解之使用hive函数

發布時間:2025/3/20 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hive内置函数_flink教程flink modules详解之使用hive函数 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
  • modules概念

  • 通過hive module使用hive函數

    • 內置函數

    • 自定義函數

  • sql 客戶端的使用

  • 原理分析和源碼解析

    • 實現

modules概念

flink 提供了一個module的概念,使用戶能擴展flink的內置對象,比如內置函數。這個功能是插件化的,用戶可以方便的加載或者移除相應的module。

flink內置了CoreModule,并且提供了一個hive module,允許用戶在加載了hive module之后使用hive的函數,包括內置函數、自定義hive函數等等。如果多個module里有重名的函數,則以先加載的函數為準。

用戶還可以自定義module,只需要實現Module接口即可。如果是在sql 客戶端使用,還需要實現ModuleFactory接口,因為加載的時候,flink會使用SPI機制去匹配獲取相應的ModuleFactory,然后實例化相應的moudule。

通過hive module使用hive函數

我們以hive module為例,講解一下如何使用flink提供的module功能,使用hive module的一些注意事項:

  • 通過 Hive Metastore 將帶有 UDF 的 HiveCatalog 設置為當前會話的 catalog。
  • 將帶有 UDF 的 jar 包放入 Flink classpath 中,并在代碼中引入。
  • 使用 Blink planner,flink 1.11默認就是,不用顯示指定

內置函數

  • 引入pom
???org.apache.flinkflink-connector-hive_${scala.binary.version}${flink.version}
  • 加載module
??String?name?=?"myhive";
??String?version?=?"3.1.2";
??tEnv.loadModule(name,?new?HiveModule(version));
  • 查看module
??System.out.println("list?modules?------------------?");
??String[]?modules?=?tEnv.listModules();
??Arrays.stream(modules).forEach(System.out::println);

運行結果我們看到有兩個module

list?modules?------------------?
core
myhive
  • 查看函數
?System.out.println("list?functions?(包含hive函數):------------------??");
??String[]?functions?=?tEnv.listFunctions();
??Arrays.stream(functions).forEach(System.out::println);

我們看到列出來大概300多個函數,包含flink和hive的內置函數。

  • hive函數的使用

在hive里有一個常用的解析json的函數get_json_object,這個可以把json字符串解析之后得到想要的字段,但是flink中沒有這個函數,所以我們可以通過這種方式來使用hive的函數,就不用我們自己開發UDF了。

?System.out.println("hive?函數的使用:??------------------??");
??String?sql?=?"SELECT?data,get_json_object(data,?'$.name')??FROM?(VALUES?('{\"name\":\"flink\"}'),?('{\"name\":\"hadoop\"}'))?AS?MyTable(data)";

??List?results?=?Lists.newArrayList(tEnv.sqlQuery(sql)
?????????????????????????????????????????????.execute()
?????????????????????????????????????????????.collect());
??results.stream().forEach(System.out::println);

輸出結果:

hive?函數的使用:??------------------??
{"name":"flink"},flink
{"name":"hadoop"},hadoop

自定義函數

前面我們講了如何使用hive的內置函數,這個比較簡單,接在了hive的module之后就可以用了,還有一種就是如何使用hive的udf函數呢?我們接下來簡單聊聊。

  • 自定義hive函數

首先我們來自定義一個hive的udf函數

  • 引入pom
  • ????org.apache.hivehive-exec3.1.2

    實現一個自定義函數,就是實現兩個int類型數字的加和操作

  • 定義函數
  • public?class?TestHiveUDF?extends?UDF{

    ?public?IntWritable?evaluate(IntWritable?i,IntWritable?j){
    ??return?new?IntWritable(i.get()?+?j.get());
    ?}

    }

    完整代碼:
    https://github.com/zhangjun0x01/bigdata-examples/blob/master/hive/src/main/java/com/test/TestHiveUDF.java

  • 導入
  • 把相應的jar放到hive的classpath下面

    定義函數

    add?jar?/home/work/work/hive/lib/hive-1.0-SNAPSHOT.jar;?
    CREATE??FUNCTION?mysum?AS?"com.test.TestHiveUDF";?
  • 測試
  • ??boolean?b?=?Arrays.asList(functions1).contains("mysum");
    ??System.out.println("是否包含自定義函數:?"?+?b);

    ??String?sqlUdf?=?"select?mysum(1,2)";
    ??List?results1?=?Lists.newArrayList(tEnv.sqlQuery(sqlUdf)
    ?????????????????????????????????????????.execute()
    ?????????????????????????????????????????.collect());
    ??System.out.println("使用自定義函數處理結果:?");
    ??results1.stream().forEach(System.out::println);

    完整的代碼請參考:

    https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/modules/HiveModulesTest.java

    sql 客戶端的使用

    在sql-client-defaults.yaml里配置相關的模塊,然后就可以使用了.

    #?Define?modules?here.

    modules:?#?note?the?following?modules?will?be?of?the?order?they?are?specified
    ??-?name:?core
    ????type:?core
    ??-?name:?hive
    ????type:?hive

    原理分析和源碼解析

    其實相關的源碼實現也不難,就是將hive的相關函數轉成了flink的函數,我們簡單的來看下,主要是在HiveModule類里面。

    public?class?HiveModule?implements?Module?{
    ????.............
    ?private?final?HiveFunctionDefinitionFactory?factory;
    ?private?final?String?hiveVersion;
    ?private?final?HiveShim?hiveShim;

    這個里面有三個主要的變量,用于構造函數的factory,hive的版本hiveVersion,以及用于處理不同版本hive的處理類hiveShim。

    實現

    具體轉換函數的方法是getFunctionDefinition,這個方法調用了工廠類的createFunctionDefinitionFromHiveFunction方法,

    我們進入 HiveFunctionDefinitionFactory#createFunctionDefinitionFromHiveFunction。

    public?FunctionDefinition?createFunctionDefinitionFromHiveFunction(String?name,?String?functionClassName)?{
    ??Class?clazz;
    ??try?{
    ???clazz?=?Thread.currentThread().getContextClassLoader().loadClass(functionClassName);

    ???LOG.info("Successfully?loaded?Hive?udf?'{}'?with?class?'{}'",?name,?functionClassName);
    ??}?catch?(ClassNotFoundException?e)?{
    ???throw?new?TableException(
    ????String.format("Failed?to?initiate?an?instance?of?class?%s.",?functionClassName),?e);
    ??}

    ??if?(UDF.class.isAssignableFrom(clazz))?{
    ???LOG.info("Transforming?Hive?function?'{}'?into?a?HiveSimpleUDF",?name);

    ???return?new?ScalarFunctionDefinition(
    ????name,
    ????new?HiveSimpleUDF(new?HiveFunctionWrapper<>(functionClassName),?hiveShim)
    ???);
    ??}
    ??..........

    我們看到首先會加載相關函數,這個也就是為什么要求我們把hive的udf jar放到flink的classpath的原因。之后是一堆if else判斷,Hive UDF 和 GenericUDF 函數會自動轉換成 Flink 中的 ScalarFunction,GenericUDTF 會被自動轉換成 Flink 中的 TableFunction,UDAF 和 GenericUDAFResolver2 則轉換成 Flink 聚合函數(AggregateFunction).這樣當我們就可以在flink中使用相應的hive函數了。

    參考資料:
    [1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_functions.html

    更多內容,歡迎關注我的公眾號【大數據技術與應用實戰】

    image

    總結

    以上是生活随笔為你收集整理的hive内置函数_flink教程flink modules详解之使用hive函数的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。