flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台
背景
zeppelin不提供per job模式
實時平臺開發周期長
基于zeppelin開發一個簡易實時平臺
開發zeppelin Interpreter
提交sql任務
提交jar任務
背景
隨著flink的蓬勃發展,zeppelin社區也大力推進flink與zeppelin的集成.zeppelin的定位是一種使用sql或者scala等語言的一個交互式的分析查詢分析工具。
Web-based?notebook?that?enables?data-driven,interactive?data?analytics?and?collaborative?documents?with?SQL,?Scala?and?more.
所以zeppelin與flink或者是其他的解釋器集成的時候,就會有這么一個架構的特點,我需要啟動一個處理數據的服務,相關的任務都提交到這個上面,拿flink來說,就是需要啟動一個flink的集群,比如local、remote、session模式的集群。當我們執行一些flink sql的時候,都是提交到這個集群來執行的。
zeppelin不提供per job模式
但是我們在生產環境中,對于一些flink的流式任務,我們一般會采用per job的模式提交任務,主要是為了任務資源的隔離,每個任務互不影響。目前zeppelin是不支持這種模式的。所以很多公司都會開發一個自己的實時流式任務計算平臺,可以實現使用sql或者jar的方式通過平臺來提交任務到集群,避免了底層一些復雜的操作,使一些只會sql的人也能開發flink任務。
實時平臺開發周期長
但是開發一個實時計算平臺其實是相對比較復雜的,它需要有前端的寫sql的頁面,后端的提交邏輯,以及前后端的交互等等。所以我的想法是既然zeppelin已經提供了我們做一個實時平臺的很多的功能,比如寫sql的頁面、前后端交互、提交任務、獲取任務的狀態等等,那么我們是不是可以用zeppelin來開發一個簡化版的實時計算平臺呢。
基于zeppelin開發一個簡易實時平臺
今天我們談談怎么通過zeppelin來實現一個簡易的實時平臺,目的是可以把flink的sql和jar的流式任務以per job的方式提交到yarn集群。
我們簡單的看下zeppelin中flink 解釋器的源碼,他底層是使用了flink scala shell,具體相關內容可以參考 Flink Scala REPL :https://ci.apache.org/projects/flink/flink-docs-stable/ops/scala_shell.html.
zeppelin在提交flink的任務的時候,會判斷下集群是否啟動,如果沒有啟動flink集群,會根據設置的模式(local、yarn)先啟動一個非隔離模式的flink集群(remote模式需要提前啟動好一個集群),然后客戶端保持著和服務器的連接,后續有用戶提交的任務,就把任務提交到剛起啟動的集群。我研究了一下代碼覺得在這個上面加一個per job模式的話可能會破壞原來的架構,改動還會比較大,所以后來想自己做一個zepplin的解釋器,功能就是通過sql或者jar的方式專門用來提交flink的流式任務。
開發zeppelin Interpreter
具體zeppelin的Interpreter的開發可以參考這篇文章。
https://zeppelin.apache.org/docs/0.9.0-preview1/development/writing_zeppelin_interpreter.html
核心的代碼就是繼承抽象類Interpreter,實現其中的幾個方法,我們簡單來講講。
public?abstract?class?Interpreter?{????
??/**
??*?初始化的時候調用,可以在這個里面加一些系統初始化的工作,這個方法只調用一次。
??*?寫過flink自定義source和sink的同學應該不會陌生。
???*/
??@ZeppelinApi
??public?abstract?void?open()?throws?InterpreterException;
??/**
???*?
???*?釋放Interpreter資源,也只會被調用一次。
???*/
??@ZeppelinApi
??public?abstract?void?close()?throws?InterpreterException;
????
????/**
???*?異步的運行輸入框里面的代碼并返回結果。.
???*
???*?@param?st?就是頁面那個框里你輸入的東西
???*/
??@ZeppelinApi
??public?abstract?InterpreterResult?interpret(String?st,
??????????????????????????????????????????????InterpreterContext?context)
??????throws?InterpreterException;????
????
}
除了上面列出來的這幾個,還有其他的幾個,我這里就不羅列代碼了,大家有興趣的可以自己看下。
底層我使用的是flink application模式來提交的任務,在open里面做一些提交flink初始化的工作,比如構造配置文件,啟動yarnClient等等。在interpret方法解析內容,執行提交任務的工作。
最終我們實現了可以通過jar包和sql的方式來提交任務到yarn集群。
提交sql任務
我們可以指定一些任務的參數,比如jobname,并行度、checkpoint間隔等等,頁面大概長這個樣子,提交任務之后,可以在yarn集群看到相關的任務。
在這里插入圖片描述提交jar任務
首先把相應的jar上傳到hdfs相關路徑,然后提交任務之前,指定jar的路徑,以及jobname、并行度等等,正文就不需要寫什么了,然后把這個任務提交到yarn集群。
在這里插入圖片描述目前只是實現了一些核心的功能,還有一些其他的功能需要后續完善。
更多內容,歡迎關注我的公眾號【大數據技術與應用實戰】
image總結
以上是生活随笔為你收集整理的flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: docker删除镜像命令_第三章 Doc
- 下一篇: 代码编程教学_少儿编程教学环境开发之代码