如何在 PyFlink 1.10 中自定义 Python UDF?
我們知道 PyFlink 是在 Apache Flink 1.9 版新增的,那么在 Apache Flink 1.10 中 Python UDF 功能支持的速度是否能夠滿足用戶的急切需求呢?
Python UDF 的發展趨勢
直觀的判斷,PyFlink Python UDF 的功能也可以如上圖一樣能夠迅速從幼苗變成大樹,為啥有此判斷,請繼續往下看…
Flink on Beam
我們都知道有 Beam on Flink 的場景,就是 Beam 支持多種 Runner,也就是說 Beam SDK 編寫的 Job 可以運行在 Flink 之上。如下圖所示:
上面這圖是 Beam Portability Framework 的架構圖,他描述了 Beam 如何支持多語言,如何支持多 Runner,單獨說 Apache Flink 的時候我們就可以說是 Beam on Flink,那么怎么解釋 Flink on Beam 呢?
在 Apache Flink 1.10 中我們所說的 Flink on Beam 更精確的說是 PyFlink on Beam Portability Framework。我們看一下簡單的架構圖,如下:
Beam Portability Framework 是一個成熟的多語言支持框架,框架高度抽象了語言之間的通信協議(gRPC),定義了數據的傳輸格式(Protobuf),并且根據通用流計算框架所需要的組件,抽象個各種服務,比如 DataService,StateService,MetricsService 等。在這樣一個成熟的框架下,PyFlink 可以快速的構建自己的 Python 算子,同時重用 Apache Beam Portability Framework 中現有 SDK harness 組件,可以支持多種 Python 運行模式,如:Process,Docker,etc.,這使得 PyFlink 對 Python UDF 的支持變得非常容易,在 Apache Flink 1.10 中的功能也非常的穩定和完整。那么為啥說是 Apache Flink 和 Apache Beam 共同打造呢,是因為我發現目前 Apache Beam Portability Framework 的框架也存在很多優化的空間,所以我在 Beam 社區進行了優化討論,并且在 Beam 社區也貢獻了?20+ 的優化補丁。
概要了解了 Apache Flink 1.10 中 Python UDF 的架構之后,我們還是切入的代碼部分,看看如何開發和使用 Python UDF。
如何定義 Python UDF
在 Apache Flink 1.10 中我們有多種方式進行 UDF 的定義,比如:
- Extend ScalarFunction, e.g.:
- Lambda Functio
- Named Function
- Callable Function
我們發現上面定義函數除了第一個擴展 ScalaFunction 的方式是 PyFlink 特有的,其他方式都是 Python 語言本身就支持的,也就是說,在 Apache Flink 1.10 中 PyFlink 允許以任何 Python 語言所支持的方式定義 UDF。
如何使用 Python UDF
那么定義完 UDF 我們應該怎樣使用呢?Apache Flink 1.10 中提供了 2 種 Decorators,如下:
- Decorators - udf(), e.g. :
- Decorators - @udf, e.g. :
然后在使用之前進行注冊,如下:
st_env.register_function("hash_code", hash_code_mean)接下來就可以在 Table API/SQL 中進行使用了,如下:
my_table.select("hash_code_mean(a, b)").insert_into("Results")目前為止,我們已經完成了 Python UDF 的定義,聲明和注冊了。接下來我們還是看一個完整的示例吧:)
案例描述
- 需求
假設蘋果公司要統計該公司產品在雙 11 期間各城市的銷售數量和銷售金額分布情況。
- 數據格式
每一筆訂單是一個字符串,字段用逗號分隔, 例如:
ItemName, OrderCount, Price, City ------------------------------------------- iPhone 11, 30, 5499, Beijing\n iPhone 11 Pro,20,8699,Guangzhou\n案例分析
根據案例的需求和數據結構分析,我們需要對原始字符串進行結構化解析,那么需要一個按“,”號分隔的 UDF(split) 和一個能夠將各個列信息展平的 DUF(get)。同時我們需要根據城市進行分組統計。
核心實現
UDF 定義
- Split UDF
- Get UDF
注冊 UDF
- 注冊 Split UDF
- 注冊 Get UDF
核心實現邏輯
如下代碼我們發現核心實現邏輯非常簡單,只需要對數據進行解析和對數據進行集合計算:
t_env.from_table_source(SocketTableSource(port=9999))\ .alias("line")\ .select("split(line) as str_array")\ .select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price")\ .select("city, count, count * unit_price as total_price")\ .group_by("city")\ .select("city, sum(count) as sales_volume, sum(total_price) as sales")\.insert_into("sink") t_env.execute("Sales Statistic")上面的代碼我們假設是一個 Socket 的 Source,Sink 是一個 Chart Sink,那么最終運行效果圖,如下:
我總是認為在博客中只是文本描述而不能讓讀者真正的在自己的機器上運行起來的博客,不是好博客,所以接下來我們看看按照我們下面的操作,是否能在你的機器上也運行起來?:)
環境
因為目前 PyFlink 還沒有部署到 PyPI 上面,在 Apache Flink 1.10 發布之前,我們需要通過構建 Flink 的 master 分支源碼來構建運行我們 Python UDF 的 PyFlink 版本。
源代碼編譯
在進行編譯代碼之前,我們需要你已經安裝了?JDK8?和?Maven3x。
- 下載解壓
- 修改環境變量(~/.bashrc)
除了 JDK 和 MAVEN 完整的環境依賴性如下:
- JDK 1.8+ (1.8.0_211)
- Maven 3.x (3.2.5)
- Scala 2.11+ (2.12.0)
- Python 3.6+ (3.7.3)
- Git 2.20+ (2.20.1)
- Pip3 19+ (19.1.1)
我們看到基礎環境安裝比較簡單,我這里就不每一個都貼出來了。如果大家有問題歡迎郵件或者博客留言。
- 下載 Flink 源代碼:
- 編譯
- 構建 PyFlink 發布包
- 安裝 PyFlink(PyFlink 1.10 需要 Python3.6+)
也可以查看一下,我們核心需要 apache-beam 和 apache-flink,如下命令:
jincheng:flink-python jincheng.sunjc$ pip3 list Package Version ----------------------------- --------- alabaster 0.7.12 apache-beam 2.15.0 apache-flink 1.10.dev0 atomicwrites 1.3.0如上信息證明你我們所需的 Python 依賴已經沒問題了,接下來回過頭來在看看如何進行業務需求的開發。
PyFlinlk 的 Job 結構
一個完成的 PyFlink 的 Job 需要有外部數據源的定義,有業務邏輯的定義和最終計算結果輸出的定義。也就是 Source connector, Transformations, Sink connector,接下來我們根據這個三個部分進行介紹來完成我們的需求。
Source Connector
我們需要實現一個 Socket Connector,首先要實現一個 StreamTableSource, 核心代碼是實現 getDataStream,代碼如下:
@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment env) {return env.socketTextStream(hostname, port, lineDelimiter, MAX_RETRY).flatMap(new Spliter(fieldNames.length, fieldDelimiter, appendProctime)).returns(getReturnType());}上面代碼利用了 StreamExecutionEnvironment 中現有 socketTextStream 方法接收數據,然后將業務訂單數據傳個一個 FlatMapFunction, FlatMapFunction 主要實現將數據類型封裝為 Row,詳細代碼查閱?Spliter。
同時,我們還需要在 Python 封裝一個 SocketTableSource,詳情查閱?socket_table_source.py。
Sink Connector
我們預期要得到的一個效果是能夠將結果數據進行圖形化展示,簡單的思路是將數據寫到一個本地的文件,然后在寫一個 HTML 頁面,使其能夠自動更新結果文件,并展示結果。所以我們還需要自定義一個 Sink 來完成該功能,我們的需求計算結果是會不斷的更新的,也就是涉及到 Retraction(如果大家不理解這個概念,可以查閱我以前的博客),目前在 Flink 里面還沒有默認支持 Retract 的 Sink,所以我們需要自定義一個 RetractSink,比如我們實現一下 CsvRetractTableSink。
CsvRetractTableSink 的核心邏輯是緩沖計算結果,每次更新進行一次全量(這是個純 demo,不能用于生產環境)文件輸出。源代碼查閱?CsvRetractTableSink。
同時我們還需要利用 Python 進行封裝,詳見 chart_table_sink.py。
在 chart_table_sink.py 我們封裝了一個 http server,這樣我們可以在瀏覽器中查閱我們的統計結果。
業務邏輯
完成自定義的 Source 和 Sink 之后我們終于可以進行業務邏輯的開發了,其實整個過程自定義 Source 和 Sink 是最麻煩的,核心計算邏輯似乎要簡單的多。
- 設置 Python 版本(很重要)
如果你本地環境 python 命令版本是 2.x,那么需要對 Python 版本進行設置,如下:
t_env.get_config().set_python_executable("python3")PyFlink 1.10 之后支持 Python 3.6+ 版本。
- 讀取數據源
PyFlink 讀取數據源非常簡單,如下:
... ... t_env.from_table_source(SocketTableSource(port=9999)).alias("line")上面這一行代碼定義了監聽端口 9999 的數據源,同時結構化 Table 只有一個名為 line 的列。
- 解析原始數據
我們需要對上面列進行分析,為了演示 Python UDF,我們在 SocketTableSource中并沒有對數據進行預處理,所以我們利用上面 UDF 定義 一節定義的 UDF,來對原始數據進行預處理。
... ... .select("split(line) as str_array") .select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price") .select("city, count, count * unit_price as total_price")- 統計分析
核心的統計邏輯是根據 city 進行分組,然后對 銷售數量和銷售金額進行求和,如下:
... ... .group_by("city") .select("city, sum(count) as sales_volume, sum(total_price) as sales")\- 計算結果輸出
計算結果寫入到我們自定義的 Sink 中,如下:
... ... .insert_into("sink")- 完整的代碼(blog_demo.py)
上面代碼中大家會發現一個陌生的部分,就是 from pyflink.demo import ChartConnector, SocketTableSource. 其中 pyflink.demo 是哪里來的呢?其實就是包含了上面我們介紹的 自定義 Source/Sink(Java&Python)。下面我們來介紹如何增加這個 pyflink.demo 模塊。
安裝 pyflink.demo
為了大家方便我把自定義 Source/Sink(Java&Python)的源代碼放到了這里 ,大家可以進行如下操作:
- 下載源碼
- 編譯源碼
- 構建發布包
- 安裝 Pyflink.demo
出現上面信息證明已經將 PyFlink.demo 模塊成功安裝。接下來我們可以運行我們的示例了 :)
運行示例
示例的代碼在上面下載的源代碼里面已經包含了,為了簡單,我們利用 PyCharm 打開enjoyment.code/myPyFlink。同時在 Terminal 啟動一個端口:
nc -l 6666啟動 blog_demo,如果一切順利,啟動之后,控制臺會輸出一個 web 地址,如下所示:
我們打開這個頁面,開始是一個空白頁面,如下:
我們嘗試將下面的數據,一條,一條的發送給 Source Connector:
iPhone 11,30,5499,Beijing iPhone 11 Pro,20,8699,Guangzhou MacBook Pro,10,9999,Beijing AirPods Pro,50,1999,Beijing MacBook Pro,10,11499,Shanghai iPhone 11,30,5999,Shanghai iPhone 11 Pro,20,9999,Shenzhen MacBook Pro,10,13899,Hangzhou iPhone 11,10,6799,Beijing MacBook Pro,10,18999,Beijing iPhone 11 Pro,10,11799,Shenzhen MacBook Pro,10,22199,Shanghai AirPods Pro,40,1999,Shanghai當輸入第一條訂單 iPhone 11,30,5499,Beijing,之后,頁面變化如下:
隨之訂單數據的不斷輸入,統計圖不斷變化。一個完整的 GIF 演示如下:
小結
本篇從架構到 UDF 接口定義,再到具體的實例,向大家介紹了在 Apache Flink 1.10 發布之后,如何利用 PyFlink 進行業務開發,其中 用戶自定義 Source 和 Sink部分比較復雜,這也是目前社區需要進行改進的部分(Java/Scala)。真正的核心邏輯部分其實比較簡單,為了大家按照本篇進行實戰操作有些成就感,所以我增加了自定義 Source/Sink 和圖形化部分。但如果大家想簡化實例的實現也可以利用 Kafka 作為 Source 和 Sink,這樣就可以省去自定義的部分,做起來也會簡單一些。
雙12來襲!500元淘寶紅包、iPhone11等你拿。
https://www.aliyun.com/1212/2019/home?utm_content=g_1000092611
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的如何在 PyFlink 1.10 中自定义 Python UDF?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用Flink取代Spark Stream
- 下一篇: websocket python爬虫_p