日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路

發(fā)布時間:2025/3/20 数据库 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

導(dǎo)讀: 最近在對 Flink 進(jìn)行平臺化,基于 REST API 構(gòu)建一個平臺實(shí)現(xiàn)通過純 SQL 化編寫和管理 Job。盡管 Flink官方希望用戶將所有的依賴和業(yè)務(wù)邏輯打成一個fat jar,這樣方便提交。但我們在開發(fā)的過程中想對用戶自定義 UDF Jar 進(jìn)行管理,想將 UDF Jar 存儲管理在阿里云 OSS ,在 Job 中通過動態(tài)加載的方式將 UDF Jar 加載進(jìn)來,取代之前將 UDF 和 Job 打成一個 fat jar 的方式。下面將從幾點(diǎn)展開討論:

  • 將 UDF 寫到 Job 中并打成一個 fat jar 的實(shí)現(xiàn)方式
  • 動態(tài)加載 UDF Jar 代碼調(diào)整
  • 代碼調(diào)整后存在的問題
  • 解決 UDF Jar URL 分發(fā)的思路

環(huán)境

  • Flink 1.11.2
  • 部署方式:Flink on Kubernetes
  • 部署模式: Session Cluster

將 UDF 寫到 Job 中并打成一個 fat jar 的方式

下面是一個簡單采用 FlinkSQL 編寫 Job 的例子。使用 datagen 連接器作為 Source 生成數(shù)據(jù), print 作為 Sink 將結(jié)果打印到控制臺。自定義的一個簡單 UDF自定義函數(shù)(returnSelf)。

public static void main(String[] args) throws Exception { //創(chuàng)建流運(yùn)行時環(huán)境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //采用BlinkPlanner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //創(chuàng)建StreamTable環(huán)境 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.setParallelism(1); bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'"); bsTableEnv.executeSql("CREATE TABLE sourceTable (" + " f_sequence INT," + " f_random INT," + " f_random_str STRING," + " ts AS localtimestamp," + " WATERMARK FOR ts AS ts" + " ) WITH (" + " 'connector' = 'datagen'," + " 'rows-per-second'='5'," + " 'fields.f_sequence.kind'='sequence'," + " 'fields.f_sequence.start'='1'," + " 'fields.f_sequence.end'='1000'," + " 'fields.f_random.min'='1'," + " 'fields.f_random.max'='1000'," + " 'fields.f_random_str.length'='10'" + ")"); bsTableEnv.executeSql("CREATE TABLE sinktable (" + " f_random_str STRING" + ") WITH (" + " 'connector' = 'print'" + ")"); bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable"); }

要將該 Job 提交給遠(yuǎn)程 Flink 集群時,我們需要將 Job(包括自定義 UDF) 打成一個 fat Jar。但這并不是我們期望的操作,由于打成 fat jar 會顯得比較臃腫,同時不方便管理 UDF Jar ,有些 UDF 具有通用性,可復(fù)用。所以我們希望將自定義的UDF Jar 獨(dú)立出來保存管理,并在 Job 中通過動態(tài)加載的方式使用,如下圖:

動態(tài)加載 UDF Jar 代碼調(diào)整

  • 將 returnSelf 并獨(dú)立打成一個 UDF Jar 上傳到阿里云OSS。
  • 在 Job 的 main() 方法中新增動態(tài)加載的代碼
public static void main(String[] args) throws Exception { //創(chuàng)建流運(yùn)行時環(huán)境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //采用BlinkPlanner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //創(chuàng)建StreamTable環(huán)境 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.setParallelism(1); // 動態(tài)加載 String funJarPath = "UDF jar 在 OSS 中所在的 URL 路徑"; loadJar(new URL(funJarPath)); bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'"); bsTableEnv.executeSql("CREATE TABLE sourceTable (" + " f_sequence INT," + " f_random INT," + " f_random_str STRING," + " ts AS localtimestamp," + " WATERMARK FOR ts AS ts" + " ) WITH (" + " 'connector' = 'datagen'," + " 'rows-per-second'='5'," + " 'fields.f_sequence.kind'='sequence'," + " 'fields.f_sequence.start'='1'," + " 'fields.f_sequence.end'='1000'," + " 'fields.f_random.min'='1'," + " 'fields.f_random.max'='1000'," + " 'fields.f_random_str.length'='10'" + ")"); bsTableEnv.executeSql("CREATE TABLE sinktable (" + " f_random_str STRING" + ") WITH (" + " 'connector' = 'print'" + ")"); bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable"); } //動態(tài)加載Jar public static void loadJar(URL jarUrl) { //從URLClassLoader類加載器中獲取類的addURL方法 Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 獲取方法的訪問權(quán)限 boolean accessible = method.isAccessible(); try { //修改訪問權(quán)限為可寫 if (accessible == false) { method.setAccessible(true); } // 獲取系統(tǒng)類加載器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路徑加入到系統(tǒng)url路徑里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } }

修改后,我們將 UDF jar 存放到 OSS 中進(jìn)行管理。當(dāng) Job 需要依賴某個 UDF 時,只需要通過動態(tài)加載就可以完成。動態(tài)加載使用 URLClassLoader 實(shí)現(xiàn),使用被管理于 OSS 的 UDF Jar 的 URL 將 Jar 加載進(jìn) JVM 中,并取得 returnSelf 類。

代碼調(diào)整后存在的問題

運(yùn)行結(jié)果:代碼調(diào)整后,在本地 IDEA 運(yùn)行程序(即,啟動了 Mini Cluster集群)是可以成功運(yùn)行的。但是當(dāng)發(fā)布到遠(yuǎn)程 Flink 集群上時(采用 Flink on K8S , Session Cluster 部署模式),會出現(xiàn)找不到 UDF 異常,如下:

Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf

分析:這是由于 Flink 的部署方式有多種。在本地運(yùn)行的啟動的是 MiniCluster,即 JobManager 和 TaskManager 在同一個JVM 進(jìn)程中。而我們在遠(yuǎn)程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 進(jìn)程。

在 Session 模式下,客戶端在 main() 方法開始執(zhí)行直到 env.execute() 方法之前需要完成以下三件事情

  • 獲取作業(yè)所需的依賴項(xiàng)
  • 通過執(zhí)行環(huán)境分析并取得邏輯計(jì)劃,即StreamGraph→JobGraph
  • 將依賴項(xiàng)和JobGraph上傳到集群中

只有在這些都完成之后,才會通過env.execute() 方法觸發(fā) Flink 運(yùn)行時真正地開始執(zhí)行作業(yè)。所以在本地運(yùn)行的 Mini Cluster,因?yàn)槎继幱谕粋€ JVM 進(jìn)程,客戶端運(yùn)行 main() 方法進(jìn)行動態(tài)加載后將依賴項(xiàng)和 JobGraph 提交給 JobMananger 再由 TaskManager 執(zhí)行 Job。

而當(dāng)在遠(yuǎn)程集群時,客戶端實(shí)現(xiàn)動態(tài)加載 Jar 后將依賴項(xiàng)和 JobGraph 提交給 JobMananger,但是由于 JobMananger 和 TaskMananger 是處于不同的 JVM進(jìn)程中,且沒有對自定義 UDF Jar URL 進(jìn)行分發(fā),這會讓 TaskMananger 在運(yùn)行任務(wù)時出現(xiàn) Class Not Found 異常,這是因?yàn)?TaskMananger 沒有進(jìn)行類加載,JVM 中沒有 returnSelf 類所導(dǎo)致。

解決 UDF Jar 分發(fā)的思路

基于以上問題我們查閱了一些相關(guān)資料及閱讀源碼,以以下三點(diǎn)為條件

  • 基于采用 Session 模式部署
  • 基于 REST API 提交 Job 而不采用命令行方式
  • 不改動 Flink 源碼

分析:官網(wǎng)提供了一個 -C 參數(shù),大致用法就是把用戶自定義 Jar 放到一個 JobMananger 和 TaskMananger 都能訪問到的存儲地方,然后通過命令行方式啟動 Job 時使用 -C 參數(shù),后面加上自定義 Jar 的URLs 就可以實(shí)現(xiàn)分發(fā)。

但是我們平臺由于采用 REST API,而提交 Job 的 API 并沒有提供該參數(shù),所以在不改變 Flink 源碼的前提下進(jìn)行源碼研究,最后發(fā)現(xiàn)可以在 main 中將 UDF Jar 的 URL 加到配置項(xiàng) pipeline.classpaths 中,也就是曲線救國實(shí)現(xiàn)了 -C 的效果。在 main 中增加以下代碼片段:

Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration"); configurationField.setAccessible(true); Configuration o = (Configuration)configurationField.get(bsEnv); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map temp = (Map)confData.get(o); List jarList = new ArrayList<>(); jarList.add(funJarPath); temp.put("pipeline.classpaths",jarList);

完整代碼

public static void main(String[] args) throws Exception { //創(chuàng)建流運(yùn)行時環(huán)境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //采用BlinkPlanner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //創(chuàng)建StreamTable環(huán)境 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.setParallelism(1); // 動態(tài)加載 String funJarPath = "UDF jar 在 OSS 中所在的 URL 路徑"; loadJar(new URL(funJarPath)); Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration"); configurationField.setAccessible(true); Configuration o = (Configuration)configurationField.get(bsEnv); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map temp = (Map)confData.get(o); List jarList = new ArrayList<>(); jarList.add(funJarPath); temp.put("pipeline.classpaths",jarList); bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'"); bsTableEnv.executeSql("CREATE TABLE sourceTable (" + " f_sequence INT," + " f_random INT," + " f_random_str STRING," + " ts AS localtimestamp," + " WATERMARK FOR ts AS ts" + " ) WITH (" + " 'connector' = 'datagen'," + " 'rows-per-second'='5'," + " 'fields.f_sequence.kind'='sequence'," + " 'fields.f_sequence.start'='1'," + " 'fields.f_sequence.end'='1000'," + " 'fields.f_random.min'='1'," + " 'fields.f_random.max'='1000'," + " 'fields.f_random_str.length'='10'" + ")"); bsTableEnv.executeSql("CREATE TABLE sinktable (" + " f_random_str STRING" + ") WITH (" + " 'connector' = 'print'" + ")"); bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable"); } //動態(tài)加載Jar public static void loadJar(URL jarUrl) { //從URLClassLoader類加載器中獲取類的addURL方法 Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 獲取方法的訪問權(quán)限 boolean accessible = method.isAccessible(); try { //修改訪問權(quán)限為可寫 if (accessible == false) { method.setAccessible(true); } // 獲取系統(tǒng)類加載器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路徑加入到系統(tǒng)url路徑里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } }

最后

以上就是在 Flink on K8S 集群 Session 模式下, FlinkSQL 動態(tài)加載 Jar 的解決方案。由于 REST API 沒有提供 -C 效果,自定義 Jar URL 沒有分發(fā)到 TaskMananger,導(dǎo)致 TaskMananger 沒有進(jìn)行類加載到其 JVM 中。通過在 Job 的 main 方法中增加動態(tài)加載方法及配置 pipeline.classpaths,可以達(dá)到不改動 Flink 源碼的情況下實(shí)現(xiàn) -C 效果。以上方案剛實(shí)現(xiàn)不久,還不保證是否有其他未知的問題,如果有更好的解決方案或者該方案中存在錯誤或者疏漏也歡迎提出共同討論。

感謝您的閱讀,如果喜歡本文歡迎關(guān)注和轉(zhuǎn)發(fā),本頭條號將堅(jiān)持持續(xù)分享IT技術(shù)知識。對于文章內(nèi)容有其他想法或意見建議等,歡迎提出共同討論共同進(jìn)步。

《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。