flink sql udf jar包_Flink 生态:一个案例快速上手 PyFlink
簡介: Flink 從 1.9.0 版本開始增加了對 Python 的支持(PyFlink),在剛剛發(fā)布的 Flink 1.10 中,PyFlink 添加了對 Python UDFs 的支持,現(xiàn)在可以在 Table API/SQL 中注冊并使用自定義函數(shù)。PyFlink 的架構(gòu)如何,適用于哪些場景?本文將詳細(xì)解析并進(jìn)行 CDN 日志分析的案例演示。
作者:孫金城(金竹)
Flink 從 1.9.0 版本開始增加了對 Python 的支持(PyFlink),在剛剛發(fā)布的 Flink 1.10 中,PyFlink 添加了對 Python UDFs 的支持,現(xiàn)在可以在 Table API/SQL 中注冊并使用自定義函數(shù)。PyFlink 的架構(gòu)如何,適用于哪些場景?本文將詳細(xì)解析并進(jìn)行 CDN 日志分析的案例演示。
PyFlink 的必要性
Flink on Python and Python on Flink
PyFlink 是什么?這個(gè)問題也許會讓人感覺問題的答案太明顯了,那就是 Flink + Python,也就是 Flink on Python。那么到底 Flink on Python 意味著這什么呢?那么一個(gè)非常容易想到的方面就是能夠讓 Python 用享受到 Flink 的所有功能。其實(shí)不僅如此,PyFlink 的存在還有另外一個(gè)非常重要的意義就是,Python on Flink,我們可以將 Python 豐富的生態(tài)計(jì)算能力運(yùn)行在 Flink 框架之上,這將極大的推動(dòng) Python 生態(tài)的發(fā)展。其實(shí),如果你再仔細(xì)深究一下,你會發(fā)現(xiàn)這個(gè)結(jié)合并非偶然。
Python 生態(tài)和大數(shù)據(jù)生態(tài)
Pythoh 生態(tài)與大數(shù)據(jù)生態(tài)有密不可分的關(guān)系,我們先看看大家都在用 Python 解決什么實(shí)際問題?通過一份用戶調(diào)查我們發(fā)現(xiàn),大多數(shù) Python 用戶正在解決 ”數(shù)據(jù)分析“,”機(jī)器學(xué)習(xí)“的問題,那么這些問題場景在大數(shù)據(jù)領(lǐng)域也有很好的解決方案。那么 Python 生態(tài)和大數(shù)據(jù)生態(tài)結(jié)合,拋開擴(kuò)大大數(shù)據(jù)產(chǎn)品的受眾用戶之外,對 Python 生態(tài)一個(gè)特別重要到意義就是單機(jī)到分布式的能力增強(qiáng),我想,這也是大數(shù)據(jù)時(shí)代海量數(shù)據(jù)分析對 Python 生態(tài)的強(qiáng)需求。
Why Flink and Why Python
好了, Python 生態(tài)和大數(shù)據(jù)的結(jié)合是時(shí)代的要求,那么 Flink 為啥選擇 Python 生態(tài)作為多語言支持的切入點(diǎn),而不是 Go 或者 R 呢?作為用戶的你,為啥選擇 PyFlink 而不是 PySpark 或者 PyHive 呢?
首先我們說說選擇 Flink 的理由:
- 第一,最主要的是架構(gòu)優(yōu)勢, Flink 是純流架構(gòu)的流批統(tǒng)一的計(jì)算引擎;
- 第二,從 ASF 的客觀統(tǒng)計(jì)看, Flink 是 2019 年度最活躍的開源項(xiàng)目,這意味著 Flink 鮮活的生命力;
- 第三, Flink 不僅僅是開源項(xiàng)目而且也經(jīng)歷過無數(shù)次,各個(gè)大數(shù)據(jù)公司的生產(chǎn)環(huán)境的歷練,值得信賴。
那么我們再來看看 Flink 在選擇多語言支持時(shí)候,為啥選擇了 Python 而不是其他語言呢?我們還是看一下數(shù)據(jù)統(tǒng)計(jì),如下: Python 語言流行程度僅次于 Java 和 C,其實(shí)我們發(fā)現(xiàn)自18年開始 Python 的發(fā)展非常迅速,并且還在持續(xù)。那么 Java/Scala 是 Flink 的默認(rèn)語言,所以選擇 Python 來進(jìn)行 Flink 多語言的支持似乎很合理。這些權(quán)威的統(tǒng)計(jì)信息,大家可以在我提供的鏈接進(jìn)行查看更詳細(xì)的信息。
目前看 PyFlink 的產(chǎn)生是時(shí)代的必然,但僅僅想清楚 PyFlink 存在的意義還遠(yuǎn)遠(yuǎn)不夠,因?yàn)槲覀冏罱K的目標(biāo)是讓 Flink 和 Python 用戶受益,真真正正的解決實(shí)際的現(xiàn)實(shí)問題。所以,我們還需要繼續(xù)深入,一起探究 PyFlink 該如何落地?
PyFlink 架構(gòu)
任何事情在想清楚之后,還要做明白,要將 PyFlink 落地,首要解決的是分析清楚要達(dá)成的核心目標(biāo)和要達(dá)成目標(biāo)解決的核心問題。那么 PyFlink 的核心目標(biāo)到底是什么呢?
PyFlink 的核心目標(biāo)
我們在前面的分析過程中已經(jīng)提到過,這里我們再具體化一下,PyFlink 的核心目標(biāo):
圍繞這 2 個(gè)核心的目標(biāo),我們再來分析,要達(dá)成這樣的目標(biāo),需要解決的核心問題是什么?
Flink 功能 Python 化
為了 PyFlink 落地,我們需要在 Flink 上開發(fā)一套和現(xiàn)有 Java 一樣的 Python 的引擎嗎?答案是 NO,這在 Flink 1.8 之前已經(jīng)嘗試過。我們做設(shè)計(jì)有一個(gè)很好的原則就是追求以最小的代價(jià)完成既定的目標(biāo),所以最好的方式是僅僅提供一層 Python API 復(fù)用現(xiàn)有的計(jì)算引擎。
那么對于 Flink 而言我們要提供怎樣的 Python API 呢?那就是我們熟知的: High-level 的 TableAPI/SQL 和有狀態(tài)的 DataStream API。好,我們現(xiàn)在的思考越來越切近 Flink 內(nèi)部了,接踵而來的問題就是,我們?nèi)绾翁峁┮惶?Python 的 Table API 和 DataStream API 呢?核心要解決的問題是什么呢?
■ Flink 功能 Python 化的核心問題
核心問題顯而易見是 Python VM 和 Java VM 的握手,他們之間要建立通訊,這是 Flink 多語言支持的核心問題。好,面對核心問題我們要進(jìn)行技術(shù)選型. Here we go…
■ Flink 功能 Python 化的 VM 通訊技術(shù)選型
就當(dāng)前的 Java VM 和 Python VM 通訊的問題而言,目前比較顯著的解決方案有 Apache Beam,一個(gè)著名的多語言多引擎支持項(xiàng)目,另外一個(gè)專門解決 Java VM 和 Python VM 通訊問題的 Py4J。我們從不同視角進(jìn)行分析對比, 首先, Py4J 和 Beam 對比,就好像有穿山功能的穿山甲和一個(gè)力量強(qiáng)大的大象,要穿越一道墻,我們可以打個(gè)洞,也可以推到整面墻。所以在當(dāng)前 VM 通訊的場景, Beam 顯得有些復(fù)雜。因?yàn)?Beam 在通用性上做了很多的努力,在極端情況會喪失一定程度的靈活性。
從另一個(gè)視角來看, Flink 本身有交互式編程的需求,比如 FLIP-36 ,同時(shí)還要在多語言支持的同時(shí),確保各種語言的接口設(shè)計(jì)語義一致性,這些在 Beam 現(xiàn)有的架構(gòu)下很難滿足。所以在這樣一種思考下,我們選擇 Py4J 作為 Java VM 和 Python VM 之間通訊的橋梁。
■ Flink 功能 Python 化的技術(shù)架構(gòu)
其實(shí)如果我們解決了 Python VM 和 Java VM 通訊的問題,本質(zhì)上是在努力達(dá)成我們第一個(gè)目標(biāo),就是將現(xiàn)有 Flink 功能輸出給 Python 用戶,也就是我們 Flink 1.9 所完成的工作,接下來我們看看 Flink 1.9 PyFlink API 的架構(gòu),如下:
我們利用Py4J解決通訊問題,在 PythonVM 啟動(dòng)一個(gè) Gateway,并且 Java VM 啟動(dòng)一個(gè) Gateway Server 用于接受 Python 的請求,同時(shí)在 Python API 里面提供和 Java API 一樣的對象,比如 TableENV, Table,等等。這樣 Python 在寫 Python API 的時(shí)候本質(zhì)是在調(diào)用 Java API。當(dāng)然,在 Flink 1.9 中還解決了作業(yè)部署問題,我們可以用 Python 命令,Python shell 和 CLI 等多種方式進(jìn)行作業(yè)提交。
那么基于這樣的架構(gòu)有怎樣的優(yōu)勢呢?第一個(gè)就是簡單,并確保 Python API 語義和 Java API 的一致性,第二點(diǎn), Python 作業(yè)可以達(dá)到和 Java 一樣的極致性能,那么 Java 的性能怎樣呢?我想大家已經(jīng)熟知,在去年雙 11 Flink Java API 已經(jīng)具備了每秒25.51億次的數(shù)據(jù)處理的能力。
Python 生態(tài)分布化
OK,在完成了現(xiàn)有 Flink 功能向 Python 用戶的輸出之后,接下來我們繼續(xù)探討,如何將 Python 生態(tài)功能引入 Flink 中,進(jìn)而將 Python 功能分布式化。如何達(dá)成?通常我們可以有如下2種做法:
好,我們針對這個(gè)核心問題進(jìn)行技術(shù)選型吧,Here we go…
■ Python 生態(tài)分布化的 UDF 執(zhí)行技術(shù)選型
解決 Python UDF 執(zhí)行問題可不僅僅是 VM 之間通訊的問題了,它涉及到 Python 執(zhí)行環(huán)境的管理,業(yè)務(wù)數(shù)據(jù)在 Java 和 Python 之間的解析, Flink State Backend 能力向 Python 的輸出, Python UDF 執(zhí)行的監(jiān)控等等,是一個(gè)非常復(fù)雜的問題。面對這樣復(fù)雜的問題,前面我們介紹過 Apache Beam ,支持多引擎多語言,無所不能的大象可以出場了,我們來看一下 Beam 是怎么解決 Python UDF 執(zhí)行問題的 :)
Beam 為了解決多語言和多引擎支持問題高度抽象了一個(gè)叫 Portability Framework 的架構(gòu),如下圖,Beam 目前可以支持 Java/Go/Python 等多種語言,其中圖下方 Beam Fu Runners 和 Execution 之間就解決了 引擎和 UDF 執(zhí)行環(huán)境的問題。其核心是對利用 Protobuf 進(jìn)行數(shù)據(jù)結(jié)構(gòu)抽象,利用 gRPC 協(xié)議進(jìn)行通訊,同時(shí)封裝了核心的 gRPC 服務(wù)。所以這時(shí)候 Beam 更像是一只螢火蟲,照亮了 PyFlink 解決 UDF 執(zhí)行問題之路。:)(多說一嘴,螢火蟲已經(jīng)成為了 Aapche Beam 的吉祥物)。
我們接下來看看 Beam 到底提供了哪些 gRPC 服務(wù)。
如圖 Runner部分是 Java 的算子執(zhí)行, SDK Worker部分是 Python 的執(zhí)行環(huán)境, Beam已經(jīng)抽象 Control/Data/State/Logging 等服務(wù)。并這些服務(wù)已經(jīng)在 Beam 的 Flink runner 上穩(wěn)定高效的運(yùn)行了很久了。所以在 PyFlink UDF 執(zhí)行上面我們可以站在巨人的肩膀上了:),這里我們發(fā)現(xiàn) Apache Beam 在 API 層面和在 UDF 的執(zhí)行層面都有解決方案,而 PyFlink 在 API 層面采用了 Py4J 解決 VM 通訊問題,在 UDF 執(zhí)行需求上采用了 Beam 的 Protability Framework 解決 UDF 執(zhí)行環(huán)境問題。
這也表明了 PyFlink 在技術(shù)選型上嚴(yán)格遵循以最小的代價(jià)達(dá)成既定目標(biāo)的原則,在技術(shù)選型上永遠(yuǎn)會選擇最合適的,最符合 PyFlink 長期發(fā)展的技術(shù)架構(gòu)。(BTW,與 Beam 的合作過程中,我也向 Beam 社區(qū)提交了20+的優(yōu)化 patch)。
■ Python 生態(tài)分布化的 UDF 技術(shù)架構(gòu)
在 UDF 的架構(gòu)中我們我既要考慮 Java VM 和 Python VM 的通訊問題,又要考慮在編譯階段和在運(yùn)行階段的不同需求。圖中我們以綠色表示 Java VM 的行為,藍(lán)色表示 Python VM 的行為。首先我們看看編譯階段,也就是local的設(shè)計(jì),在local的設(shè)計(jì)是純 API 的 mapping 調(diào)用,我們?nèi)匀灰^ Py4J 來解決通訊問題,也就是如圖 Python 每執(zhí)行一個(gè) API 就會同步的調(diào)用 Java 所對應(yīng)的 API 。
對 UDF 的支持上,需要添加 UDF 注冊的 API , register_function,但僅僅是注冊還不夠,用戶在自定義 Python UDF 的時(shí)候往往會依賴一些三方庫,所以我們還需要增加添加依賴的方法,那就是一系列的 add 方法,比如 add_Python_file()。在編寫 Python 作業(yè)的同時(shí), Java API 也會同時(shí)被調(diào)用在提交作業(yè)之前,Java 端會構(gòu)建JobGraph。然后通過 CLI 等多種方式將作業(yè)提交到集群進(jìn)行運(yùn)行。
我們再來看看運(yùn)行時(shí) Python 和 Java 的不同分工情況,首先在 Java 端與普通 Java 作業(yè)一樣, JobMaster 將作業(yè)分配給 TaskManger , TaskManager 會執(zhí)行一個(gè)個(gè) Task ,task里面就涉及到了Java和Python的算子執(zhí)行。在Python UDF的算子中我們會設(shè)計(jì)各種 gRPC 服務(wù)來完成 Java VM 和 Python VM 的各種通訊,比如 DataService 完成業(yè)務(wù)數(shù)據(jù)通訊, StateService 完成 Python UDF 對 Java Statebackend 的調(diào)用,當(dāng)然還有 Logging 和 Metrics 等其他服務(wù)。
這些服務(wù)都是基于 Beam 的 Fn API 來構(gòu)建的,最終在Python的 Worker 里面運(yùn)行用戶的 UDF,運(yùn)行結(jié)束之后再利用對應(yīng)的 gRPC 服務(wù)將結(jié)果返回給 Java 端的 PythonUDF 算子。當(dāng)然 Python 的 worker 不僅僅是 Process 模式,可以是 Docker 模式甚至是 External 的服務(wù)集群。這種擴(kuò)展機(jī)制,為后面 PyFlink 與 Python 生態(tài)的其他框架集成打下了堅(jiān)實(shí)的基礎(chǔ),在后面我們介紹 PyFlink 大圖的時(shí)候,我們會介紹這一點(diǎn):)。好,這就是 PyFlink 在 1.10 中引入 Python UDF 支持的架構(gòu)。那么這樣的架構(gòu)有怎樣的優(yōu)勢呢?
首先,Beam 是一個(gè)成熟的多語言支持框架,基于 Beam 進(jìn)行架構(gòu)我們后面可以很容易進(jìn)行其他語言的支持?jǐn)U展。 同時(shí) Beam 對 State 的服務(wù)抽象也方便 PyFlink 增加對 Stateful UDF 的支持。還有一個(gè)方面是方便維護(hù),同一套框架由 Apache Beam 和 Apache Flink 兩個(gè)非常活躍的社區(qū)共同維護(hù)和優(yōu)化 …
PyFlink 場景,怎么用?
好了解了這么多關(guān)于 PyFlink 的架構(gòu)和架構(gòu)背后的思考,我們還是以一個(gè)具體場景案例,來增加一些對 PyFlink 的體感吧!
PyFlink 適用的場景
在具體的案例之前我們先簡單分享一些 PyFlink 所能適用的業(yè)務(wù)場景。首先 PyFlink 既然是 Python+Flink,那其適用場景也可以從 java 和 Python 兩方面去分析,第一個(gè) Java 所適用的場景 PyFlink 都適用。
- 第一個(gè),事件驅(qū)動(dòng)型,比如:刷單,監(jiān)控等;
- 第二個(gè),數(shù)據(jù)分析型的,比如:庫存,雙11大屏等;
- 第三個(gè)適用的場景是數(shù)據(jù)管道,也就是ETL場景,比如一些日志的解析等;
- 第四個(gè)場景,機(jī)器學(xué)習(xí),比如個(gè)性推薦等。
這些都可以嘗試使用 PyFlink 。除此之外還有 Python 生態(tài)特有的場景,比如科學(xué)計(jì)算等。那么這么多的應(yīng)用場景,PyFlink 到底有哪些可用的 API 呢?
PyFlink 的安裝
使用具體的 API 開發(fā)之前,首先要安裝 PyFlink,目前 PyFlink 支持 pip install 進(jìn)行安裝,這里特別提醒一下具體命令是:pip install apache-Flink。
PyFlink 的 APIs
目前 PyFlink API 完全與 Java Table API 對齊,各種關(guān)系操作都支持,同時(shí)對 window 也有很好的支持,并且這里稍微提一下就是 PyFlink 里面有些易用性 API 比 SQL 還要強(qiáng)大,比如:這些對 columns 進(jìn)行操作的 APIs。除了這些 APIs,PyFlink還提供多種定義 Python UDF 的方式。
PyFlink 的 UDF 定義
首先,可以擴(kuò)展 ScalarFunction,這種方式可以提供更多的輔助功能,比如添加 Metrics 。除此之外 Python 語言所支持的任何方式的方法定義,在 PyFlink UDF 中都是支持的,比如:Lambda Function,Named Function 和 CallableFunction等。
當(dāng)定義完方法后,我們用 PyFlink 所提供的 Decorators 進(jìn)行打標(biāo),并描述 input 和 output 的數(shù)據(jù)類型就可以了。當(dāng)然后面版本我們也可以根據(jù) Python 語言的 type hint 特性再進(jìn)一步簡化,進(jìn)行類型推導(dǎo)。為了直觀,我們看一個(gè)具體的 UDF 定義的例子:
Python UDF 定義示例
我們定義兩個(gè)數(shù)相加的例子,首先導(dǎo)入必須的類,然后是剛才我們提到的幾種定義方式。這個(gè)簡單直接,我們閑話少敘,開始看看實(shí)際的案例吧:)
PyFlink 案例-阿里云 CDN 實(shí)時(shí)日志分析
我們這里以一個(gè)阿里云 CDN 實(shí)時(shí)日志分析的例子來介紹如何用 PyFlink 解決實(shí)際的業(yè)務(wù)問題。CDN 我們都很熟悉,為了進(jìn)行資源的下載加速。那么 CDN 日志的解析一般有一個(gè)通用的架構(gòu)模式,就是首先要將各個(gè)邊緣節(jié)點(diǎn)的日志數(shù)據(jù)進(jìn)行采集,一般會采集到消息隊(duì)列,然后將消息隊(duì)列和實(shí)時(shí)計(jì)算集群進(jìn)行集成進(jìn)行實(shí)時(shí)的日志分析,最后將分析的結(jié)果寫到存儲系統(tǒng)里面。那么我今天的案例將架構(gòu)實(shí)例化,消息隊(duì)列采用 Kafka,實(shí)時(shí)計(jì)算采用Flink,最終將數(shù)據(jù)存儲到 MySQL 中。
■ 阿里云 CDN 實(shí)時(shí)日志分析需求說明
我們在來看看業(yè)務(wù)統(tǒng)計(jì)的需求,為了介紹方便,我們將實(shí)際的統(tǒng)計(jì)需求進(jìn)行簡化,示例中只進(jìn)行按地區(qū)分組,進(jìn)行資源訪問量,下載量和下載速度的統(tǒng)計(jì)。數(shù)據(jù)格式我們只選取核心的字段,比如:uuid,表示唯一的日志標(biāo)示,client_ip 表示訪問來源,request_time 表示資源下載耗時(shí), response_size 表示資源數(shù)據(jù)大小。其中我們發(fā)現(xiàn)我們需求是按地區(qū)分組,但是原始日志里面并沒有地區(qū)的字段信息,所以我們需要定義一個(gè) Python UDF 根據(jù) client_ip 來查詢對應(yīng)的地區(qū)。好,我們首先看如何定義這個(gè) UDF。■ 阿里云 CDN 實(shí)時(shí)日志分析 UDF 定義
這里我們用了剛才提到的 named function 的方式定義一個(gè) ip_to_province() 的UDF,輸入是 ip 地址,輸出是地區(qū)名字字符串。我們這里描述了輸入類型是一個(gè)字符串,輸出類型也是一個(gè)字符串。當(dāng)然這里面的查詢服務(wù)僅供演示,大家在自己的生產(chǎn)環(huán)境要替換為可靠的地域查詢服務(wù)。
import re import json from pyFlink.table import DataTypes from pyFlink.table.udf import udf from urllib.parse import quote_plus from urllib.request import urlopen @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def ip_to_province(ip): """ format: { 'ip': '27.184.139.25', 'pro': '河北省', 'proCode': '130000', 'city': '石家莊市', 'cityCode': '130100', 'region': '靈壽縣', 'regionCode': '130126', 'addr': '河北省石家莊市靈壽縣 電信', 'regionNames': '', 'err': '' } """ try: urlobj = urlopen( 'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip)) data = str(urlobj.read(), "gbk") pos = re.search("{[^{}]+}", data).span() geo_data = json.loads(data[pos[0]:pos[1]]) if geo_data['pro']: return geo_data['pro'] else: return geo_data['err'] except: return "UnKnow"■ 阿里云 CDN 實(shí)時(shí)日志分析 Connector 定義
我們完成了需求分析和 UDF 的定義,我們開始進(jìn)行作業(yè)的開發(fā)了,按照通用的作業(yè)結(jié)構(gòu),需要定義 Source connector 來讀取 Kafka 數(shù)據(jù),定義 Sink connector 來將計(jì)算結(jié)果存儲到 MySQL。最后是編寫統(tǒng)計(jì)邏輯。
在這特別說明一下,在 PyFlink 中也支持 SQL DDL 的編寫,我們用一個(gè)簡單的 DDL 描述,就完成了 Source Connector的開發(fā)。其中 connector.type 填寫 kafka。SinkConnector 也一樣,用一行DDL描述即可,其中 connector.type 填寫 jdbc。描述 connector 的邏輯非常簡單,我們再看看核心統(tǒng)計(jì)邏輯是否也一樣簡單:)
kafka_source_ddl = """ CREATE TABLE cdn_access_log ( uuid VARCHAR, client_ip VARCHAR, request_time BIGINT, response_size BIGINT, uri VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'access_log', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'format.type' = 'csv', 'format.ignore-parse-errors' = 'true' ) """ mysql_sink_ddl = """ CREATE TABLE cdn_access_statistic ( province VARCHAR, access_count BIGINT, total_download BIGINT, download_speed DOUBLE ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/Flink', 'connector.table' = 'access_statistic', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.write.flush.interval' = '1s' ) """■ 阿里云 CDN 實(shí)時(shí)日志分析核心統(tǒng)計(jì)邏輯
首先從數(shù)據(jù)源讀取數(shù)據(jù),然后需要先將 clien_ip 利用我們剛才定義的 ip_to_province(ip) 轉(zhuǎn)換為具體的地區(qū)。之后,在進(jìn)行按地區(qū)分組,統(tǒng)計(jì)訪問量,下載量和資源下載速度。最后將統(tǒng)計(jì)結(jié)果存儲到結(jié)果表中。這個(gè)統(tǒng)計(jì)邏輯中,我們不僅使用了Python UDF,而且還使用了 Flink 內(nèi)置的 Java AGG 函數(shù),sum 和 count。
# 核心的統(tǒng)計(jì)邏輯 t_env.from_path("cdn_access_log") .select("uuid, " "ip_to_province(client_ip) as province, " # IP 轉(zhuǎn)換為地區(qū)名稱 "response_size, request_time") .group_by("province") .select( # 計(jì)算訪問量 "province, count(uuid) as access_count, " # 計(jì)算下載總量 "sum(response_size) as total_download, " # 計(jì)算下載速度 "sum(response_size) * 1.0 / sum(request_time) as download_speed") .insert_into("cdn_access_statistic")■ 阿里云 CDN 實(shí)時(shí)日志分析完整代碼
我們在整體看一遍完整代碼,首先是核心依賴的導(dǎo)入,然后是我們需要?jiǎng)?chuàng)建一個(gè)ENV,并設(shè)置采用的 planner(目前Flink支持Flink和blink兩套 planner)建議大家采用 blink planner。
接下來將我們剛才描述的 kafka 和 mysql 的 ddl 進(jìn)行表的注冊。再將 Python UDF 進(jìn)行注冊,這里特別提醒一點(diǎn),UDF所依賴的其他文件也可以在API里面進(jìn)行制定,這樣在job提交時(shí)候會一起提交到集群。然后是核心的統(tǒng)計(jì)邏輯,最后調(diào)用 executre 提交作業(yè)。這樣一個(gè)實(shí)際的CDN日志實(shí)時(shí)分析的作業(yè)就開發(fā)完成了。我們再看一下實(shí)際的統(tǒng)計(jì)效果。
import os from pyFlink.datastream import StreamExecutionEnvironment from pyFlink.table import StreamTableEnvironment, EnvironmentSettings from enjoyment.cdn.cdn_udf import ip_to_province from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl # 創(chuàng)建Table Environment, 并選擇使用的Planner env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) # 創(chuàng)建Kafka數(shù)據(jù)源表 t_env.sql_update(kafka_source_ddl) # 創(chuàng)建MySql結(jié)果表 t_env.sql_update(mysql_sink_ddl) # 注冊IP轉(zhuǎn)換地區(qū)名稱的UDF t_env.register_function("ip_to_province", ip_to_province) # 添加依賴的Python文件 t_env.add_Python_file( os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py") t_env.add_Python_file(os.path.dirname( os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py") # 核心的統(tǒng)計(jì)邏輯 t_env.from_path("cdn_access_log") .select("uuid, " "ip_to_province(client_ip) as province, " # IP 轉(zhuǎn)換為地區(qū)名稱 "response_size, request_time") .group_by("province") .select( # 計(jì)算訪問量 "province, count(uuid) as access_count, " # 計(jì)算下載總量 "sum(response_size) as total_download, " # 計(jì)算下載速度 "sum(response_size) * 1.0 / sum(request_time) as download_speed") .insert_into("cdn_access_statistic") # 執(zhí)行作業(yè) t_env.execute("pyFlink_parse_cdn_log")■ 阿里云 CDN 實(shí)時(shí)日志分析運(yùn)行效果
我們采用 mock 的數(shù)據(jù)向 kafka 發(fā)送 CDN 日志數(shù)據(jù),右邊實(shí)時(shí)的按地區(qū)統(tǒng)計(jì)資源的訪問量,下載量和下載速度。這個(gè)示例的 mock 數(shù)據(jù)工具,源代碼和操作過程,在今天的直播后,會更新到我的博客當(dāng)中。方便大家在自己的環(huán)境中進(jìn)行體驗(yàn)。
PyFlink 未來,會怎樣?
總體來看 PyFlink 的業(yè)務(wù)開發(fā)還是非常簡潔的,不用關(guān)心底層的實(shí)現(xiàn)細(xì)節(jié),只需要按照SQL或者Table API的方式描述業(yè)務(wù)邏輯就行。那么,我們再整體看看PyFlink的未來會怎樣呢?
PyFlink 本心驅(qū)動(dòng) Roadmap
PyFlink 的發(fā)展始終要以本心驅(qū)動(dòng),我們要圍繞將現(xiàn)有 Flink 功能輸出到 Python 用戶,將 Python 生態(tài)功能集成到Flink當(dāng)中為目標(biāo)。PyFlink的 Roadmap 如圖所示:首先解決 Python VM 和 Java VM 的通訊問題,然后將現(xiàn)有的 Table API 功能暴露給 Python 用戶,提供 Python Table API,這也就是 Flink 1.9 中所進(jìn)行的工作,接下來我們要為將Python功能集成到Flink做準(zhǔn)備就是集成 Apache Beam,提供 Python UDF 的執(zhí)行環(huán)境,并增加Python 對其他類庫依賴的管理功能,為用戶提供 User-defined-Funciton 的接口定義,支持 Python UDF,這就是 Flink 1.10 所做的工作。
為了進(jìn)一步擴(kuò)大Python生態(tài)的分布式功能,PyFlink 將提供 Pandas 的 Series 和 DataFram 的支持,也就是用戶可以在 PyFlink 中直接使用 Pandas 的UDF。同時(shí)為增強(qiáng)用戶的易用性,讓用戶有更多的方式使用 PyFlink,后續(xù)增加在 Sql Client 中使用 Python UDF。面對 Python 用戶的機(jī)器學(xué)習(xí)問題,增加 Python 的 ML pipeline API。監(jiān)控 Python UDF 的執(zhí)行情況對,對實(shí)際的生產(chǎn)業(yè)務(wù)非常關(guān)鍵,所以 PyFlink 會增加 Python UDF 的 Metric 管理。這些點(diǎn)將在 Flink 1.11 中將與用戶見面。
但這些功能只是 PyFlink 規(guī)劃的冰山一角,后續(xù)我們還要進(jìn)行性能優(yōu)化,圖計(jì)算API,Pandas on Flink 的 Pandas 原生 API 等等。。。進(jìn)而完成不斷將 Flink 現(xiàn)有功能推向 Python 生態(tài),將 Python 生態(tài)的強(qiáng)大功能不斷集成到 Flink 當(dāng)中,進(jìn)而完成 Python 生態(tài)分布化的初衷。
PyFlink 1.11 預(yù)覽
我們快速的預(yù)覽一下即將與大家見面的 Flink 1.11 中的 PyFlink 的重點(diǎn)內(nèi)容。■ 功能
我們將視角由遠(yuǎn)方拉近到 Flink 1.11 版本 PyFlink 的核心功能,PyFlink 會圍繞著 功能,性能和易用性不斷努力,在 1.11 在功能上會增加 Pandas UDF 的支持,這樣Pandas 生態(tài)的實(shí)用類庫功能可以在 PyFlink 中直接使用,比如累積分布函數(shù), CDF 等。
還會增加 ML Pipeline API 的支持,這樣大家可以利用 PyFlink 完成一些機(jī)器學(xué)習(xí)場景的業(yè)務(wù)需求,我這里是一個(gè)使用 pyFlink 完成 KMeans 的示例。
■ 性能
在性能上 PyFlink 也會有更多的投入,我們利用 Codegen,CPython,優(yōu)化序列化和反序列化的方式提高 PythonUDF 的執(zhí)行性能,目前我們初步對 1.10 和 1.11 進(jìn)行性能對比來看,1.11 將比 1.10 有近 15 倍的性能提升。
■ 易用性
在用戶的易用性上 PyFlink 會在 SQL DDL 和 SQL Client 中增加對 Python UDF 的支持。讓用戶有更多的方式選擇來使用 PyFlink。
PyFlink 大圖(使命愿景)
今天已經(jīng)介紹了很多,比如什么是 PyFlink,PyFlink 的存在的意義,PyFlink API 架構(gòu),UDF 架構(gòu),以及架構(gòu)背后的取舍和現(xiàn)有架構(gòu)的優(yōu)勢,并介紹了 CDN 的案例,介紹了 PyFlink 的 Roadmap,預(yù)覽了 Flink 1.11 版本中 PyFlink 的重點(diǎn),那么接下來還有什么呢?
那么最后我們再來看看 PyFlink 的未來會怎樣?在以 “Flink 功能 Python 化,Python 生態(tài)分布化” 的使命驅(qū)動(dòng)下,PyFlink 會有怎樣的布局?我們快速分享一下:PyFlink 是 Apache Flink 的一部分,涉及到 Runtime 層面和 API 層面。
在這兩個(gè)層面 PyFlink 會有怎樣的發(fā)展? Runtime 層面,PyFlink 會構(gòu)建解決 Java VM 和 Python VM 的通訊問題的 gRPC 通用服務(wù),比如(Control/Data/State等)在這套框架之上會抽象出 Java 的 Python UDF 算子,Python 的執(zhí)行容器構(gòu)建,支持多種 Python 的 Execution,比如 Process,Docker 和 External,尤其值得強(qiáng)調(diào)的是 External 以Socket 的方式提供了無限的擴(kuò)展能力,在后續(xù)的 Python 生態(tài)集成上至關(guān)重要。
API 層面,我們會使命驅(qū)動(dòng),將 Flink 上所以的 API 進(jìn)行 Python 化,當(dāng)然這也依托于引入 Py4J 的 VM 通訊框架之上,PyFlink 會逐漸增加各種 API 的支持,Python Table API,UDX 的接口 API,ML Pipeline,DataStream,CEP,Gelly,State,等Flink所具備的 Java APIs 和 Python 生態(tài)用戶的最愛 Pandas APIs 等。在這些 API 的基礎(chǔ)之上,PyFlink 還會不斷的進(jìn)行生態(tài)系統(tǒng)的集成,比如 方便用戶開發(fā)的 Notebook 的集成,Zeppelin,Jupyter,并與阿里開源的 Alink 進(jìn)行集成,目前 PyAlink 已經(jīng)完全應(yīng)用了 PyFlink 所提供的功能,后面還會和現(xiàn)有的 AI 系統(tǒng)平臺進(jìn)行集成,比如大家熟知的 TensorFlow 等等。
所以此時(shí)我會發(fā)現(xiàn)使命驅(qū)動(dòng)的力量會讓 PyFlink 的生命線不斷延續(xù)…當(dāng)然這種生命的延續(xù)更需要更多的血液融入。這里再次強(qiáng)調(diào)一下 PyFlink 的使命:“Flink 能力 Python 化,Python 生態(tài)分布化”。目前 PyFlink 的核心貢獻(xiàn)者們正以這樣的使命而持續(xù)活躍在社區(qū)。
PyFlink 核心貢獻(xiàn)者及問題支持
在分享的最后,我想介紹一下目前 PyFlink 的核心貢獻(xiàn)者。
首先是付典,目前付典是 Flink 以及另外兩個(gè) Apache 頂級項(xiàng)目的 Committer,在PyFlink 模塊做了巨大的貢獻(xiàn)。
第二位是黃興勃,目前專注 PyFlink 的 UDF 性能優(yōu)化,曾經(jīng)是阿里與安全算法挑戰(zhàn)賽的冠軍,在 AI 和中間件性能比賽中也有很好的成績。
第三位就是大家熟知的程鶴群,為大家做過多次分享,相信大家還記得他為大家?guī)淼摹禙link 知識圖譜》分享。
第四位是鐘葳,關(guān)注 PyFlink 的 UDF 依賴管理和易用性工作,目前已經(jīng)有很多的代碼貢獻(xiàn)。最后一個(gè)是我自己。大家后續(xù)在使用 PyFlink 的時(shí)候,如果有什么問題都可以聯(lián)系我們中的任何一位尋求支持。
當(dāng)然遇到通用性問題還是建議大家郵件到 Flink 的用戶列表和中文用戶列表,這樣能問題共享。當(dāng)然如果你遇到特別急的個(gè)別問題,也非常歡迎您郵件到剛才介紹的小伙伴郵箱,同時(shí),為了問題的積累和有效的分享,更期望大家遇到問題可以在 Stackoverflow 進(jìn)行提問題。首先搜索你遇到問題是否已經(jīng)被解答過,如果沒有,請描述清楚,最后提醒大家要為問題打上 PyFlink 的 tags。這樣我們及時(shí)訂閱回復(fù)您問題。
總結(jié)
今天深入剖析了 PyFlink 深層含義;介紹了 PyFlink API 架構(gòu)是核心采用 Py4J 框架進(jìn)行 VM 之間的通訊,API 的設(shè)計(jì)上 Python API 和 Java API 保持語義的一致;還介紹了 Python UDF 架構(gòu),以集成 Apache Beam 的 Portability Framework 的方式獲取高效穩(wěn)定的 Python UDF 的支持,并且細(xì)致分析了架構(gòu)背后思考,對技術(shù)選型的取舍和現(xiàn)有架構(gòu)的優(yōu)勢;
然后介紹了 PyFlink 所適用的業(yè)務(wù)場景,并以阿里云 CDN 日志實(shí)時(shí)分析的案例讓大家對 PyFlink 的使用有一定的體感;
最后介紹了 PyFlink 的 Roadmap 和預(yù)覽了 Flink 1.11 版本中 PyFlink 的重點(diǎn),預(yù)期 PyFlink 1.11 相對于1.10會有15倍以上的性能提升,最后和大家一起分享了 PyFlink 的使命,PyFlink 的使命是 ”Flink能力Python化,Python生態(tài)分布化”。
留在最后的是提供給大家一種更有效的問題求助的方式,大家有什么問題可以隨時(shí)拋給剛才向大家介紹的 PyFlink 小伙伴,那么這些小伙伴已經(jīng)在直播群里了,接下來有什么問題,我們可以一起探討。:)
https://enjoyment.cool/ (二維碼自動(dòng)識別)
作者介紹:
孫金城(金竹),2011 年加入阿里,9 年的阿里工作中,主導(dǎo)過很多內(nèi)部核心系統(tǒng),如,阿里集團(tuán)行為日志,阿里郎,云轉(zhuǎn)碼,文檔轉(zhuǎn)換等。在 2016 年初開始了解 Apache Flink 社區(qū),由初期的參與社區(qū)開發(fā)到后來逐漸主導(dǎo)具體模塊的開發(fā),到負(fù)責(zé) Apache Flink Python API(PyFlink) 的建設(shè)。 目前是 PMC member of Apache Flink and ALC(Beijing), 以及 Committer for Apache Flink, Apache Beam and Apache IoTDB。
SQL 消息中間件 監(jiān)控 Java 大數(shù)據(jù) API Apache 流計(jì)算 Python CDN
總結(jié)
以上是生活随笔為你收集整理的flink sql udf jar包_Flink 生态:一个案例快速上手 PyFlink的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode13
- 下一篇: 分类与聚类的本质区别