徒手打造基于Spark的数据工厂(Data Factory):从设计到实现
在大數(shù)據(jù)處理和人工智能時代,數(shù)據(jù)工廠(Data Factory)無疑是一個非常重要的大數(shù)據(jù)處理平臺。市面上也有成熟的相關(guān)產(chǎn)品,比如Azure Data Factory,不僅功能強大,而且依托微軟的云計算平臺Azure,為大數(shù)據(jù)處理提供了強大的計算能力,讓大數(shù)據(jù)處理變得更為穩(wěn)定高效。由于工作中我的項目也與大數(shù)據(jù)處理相關(guān),于是我就在思考,是否自己也可以設(shè)計打造一個數(shù)據(jù)工廠,以便尋求一些技術(shù)痛點的解決方案,并且引入一些有趣的新功能。因此,我利用業(yè)余時間,逐步打造了一個基于Spark的數(shù)據(jù)工廠,并取名為Abacuza(Abacus是中國的“算盤”的意思,隱喻它是一個專門做數(shù)據(jù)計算的平臺,使用“算盤”一詞的變體,也算是體現(xiàn)一點中國元素吧)。說是基于Spark,其實從整個架構(gòu)來看,Abacuza并不一定非要基于Spark,只需要為其定制某種數(shù)據(jù)處理引擎的插件即可,所以,Spark其實僅僅是Abacuza的一個插件,當然,Spark是目前主流的數(shù)據(jù)處理引擎,Abacuza將其作為默認的數(shù)據(jù)處理插件。?Abacuza是開源的,項目地址是:https://github.com/daxnet/abacuza。徒手打造?是的,沒錯,從前端界面都后端開發(fā),從代碼到持續(xù)集成,再到部署腳本和SDK與容器鏡像的發(fā)布,都是自己一步步弄出來的。項目主頁上有一個簡單的教程,后面我會詳細介紹一下。在介紹如何使用Abacuza之前,我們先了解一下它的整體架構(gòu)和設(shè)計思想。雖然目前Abacuza還有很多功能沒有完成,但并不影響整個數(shù)據(jù)處理流程的執(zhí)行。
整體架構(gòu)
Abacuza和其它的數(shù)據(jù)工廠平臺一樣,它的業(yè)務(wù)流程就是分三步走:數(shù)據(jù)讀入、數(shù)據(jù)處理、結(jié)果輸出。Abacuza的整體架構(gòu)圖就很清楚地體現(xiàn)了這個業(yè)務(wù)流程:?
(點擊查看大圖)
數(shù)據(jù)輸入部分
數(shù)據(jù)的輸入是由輸入端點(Input Endpoints)來定義的。Abacuza支持多種數(shù)據(jù)類型的輸入:CSV文件、JSON文件、TXT文本文件、Microsoft SQL Server(暫未完全實現(xiàn))以及S3的對象存儲路徑,今后還可以繼續(xù)擴展輸入端點,以支持基于管道(Pipeline)的數(shù)據(jù)處理流程,這樣一來,用戶就不需要自己使用C#或者Scala來編寫數(shù)據(jù)處理的邏輯代碼,只需要一套JSON文件進行Pipeline定義就可以了。
數(shù)據(jù)處理部分
當數(shù)據(jù)輸入已經(jīng)定義好以后,Abacuza會根據(jù)Input Endpoint的設(shè)置,將數(shù)據(jù)讀入,然后轉(zhuǎn)交給后端的數(shù)據(jù)處理集群(Cluster)進行處理。Abacuza可以以插件的形式支持不同類型的集群,如上文所說,Apache Spark是Abacuza所支持的一種數(shù)據(jù)處理集群,在上面的架構(gòu)圖中可以看到,Abacuza Cluster Service管理這些集群,工作任務(wù)調(diào)度器(Job Scheduler)會通過Abacuza Cluster Service將數(shù)據(jù)處理任務(wù)分配到指定類型的集群上進行處理。對于Spark而言,具體的數(shù)據(jù)處理邏輯是由用戶自己編寫代碼實現(xiàn)的。Spark原生支持Scala,也可以使用PySpark,Abacuza使用Microsoft .NET for Spark項目實現(xiàn)從.NET到Spark的綁定(Binding),用戶可以使用C#來編寫Spark的數(shù)據(jù)處理邏輯,后面的演練部分我會詳細介紹。那么與Scala相比,通過.NET for Spark使用C#編寫的數(shù)據(jù)處理程序會不會有性能問題?嗯,會有點性能問題,請看下圖(圖片來源:微軟.NET for Spark官方網(wǎng)站):?
在這個Benchmark中,處理相同總量的數(shù)據(jù),Scala使用了375秒,.NET花了406秒,Python使用433秒,雖然與Scala相比有些差距,但是比Python要好一些。但是不用擔心,如果在你的應(yīng)用場景中,性能是放在第一位的,那么Abacuza的Job Runner機制允許你使用Scala編寫數(shù)據(jù)處理程序,然后上傳到Spark集群執(zhí)行(也就是你不需要依賴于.NET和C#)。
數(shù)據(jù)輸出部分
與數(shù)據(jù)輸入部分類似,處理之后的數(shù)據(jù)輸出方式是由輸出端點(Output Endpoints)來定義的。Abacuza也支持多種數(shù)據(jù)輸出方式:將結(jié)果打印到日志、將結(jié)果輸出到外部文件系統(tǒng)以及將結(jié)果輸出到當前項目所在的S3對象存儲路徑。無論是數(shù)據(jù)輸入部分還是輸出部分,這些端點都是可以定制的,并且可以通過ASP.NET Core的插件系統(tǒng)以及docker-compose或者Kubernetes的volume/Block Storage來實現(xiàn)動態(tài)加載。
相關(guān)概念和運作機理
Abacuza有以下這些概念:
集群(Cluster):一個集群是一個完整的大數(shù)據(jù)處理平臺,比如Apache Spark
集群類型(Cluster Type):定義集群的類型,例如,運行在localhost的Spark集群和運行在云端的Spark集群都是Spark集群,那么它們的集群類型就是spark。
集群連接(Cluster Connection):定義了Abacuza數(shù)據(jù)工廠訪問集群的方式,類似于數(shù)據(jù)庫系統(tǒng)的連接字符串
任務(wù)執(zhí)行器(Job Runner):定義了數(shù)據(jù)處理任務(wù)應(yīng)該如何被提交到集群上執(zhí)行。它可以包含具體的數(shù)據(jù)處理業(yè)務(wù)邏輯
輸入端點(Input Endpoint):定義了原始數(shù)據(jù)(需要被處理的數(shù)據(jù))的來源
輸出端點(Output Endpoint):定義了處理完成后的數(shù)據(jù)的輸出方式
項目(Project):一種類型數(shù)據(jù)處理任務(wù)的邏輯定義,它包括多個輸入端點、一個輸出端點以及多個數(shù)據(jù)處理版本(Revision)的信息,同時它還定義了應(yīng)該使用哪個任務(wù)執(zhí)行器來執(zhí)行數(shù)據(jù)處理任務(wù)
數(shù)據(jù)處理版本(Revision):它歸屬于一個特定的項目,表示不同批次的數(shù)據(jù)處理結(jié)果
當一個用戶準備使用Abacuza完成一次大數(shù)據(jù)處理的任務(wù)時,一般會按照下面的步驟進行:
使用用戶名/密碼(暫時只支持用戶名密碼登錄)登錄Abacuza的管理界面
基于一個已經(jīng)安裝好的集群(比如Apache Spark),配置它的集群類型和集群連接,用來定義Abacuza與該集群的通信方式(集群和集群連接定義了數(shù)據(jù)應(yīng)該在哪里被處理(where))
定義任務(wù)執(zhí)行器,在任務(wù)執(zhí)行器中,設(shè)置運行數(shù)據(jù)處理任務(wù)的集群類型,當數(shù)據(jù)處理任務(wù)被提交時,Abacuza Cluster Service會基于所選的集群類型,根據(jù)一定的算法來選擇一個集群進行數(shù)據(jù)處理。任務(wù)執(zhí)行器中也定義了數(shù)據(jù)處理的邏輯,(比如,由Scala、C#或者Python編寫的應(yīng)用程序,可以上傳到spark類型的集群上運行)。簡單地說,任務(wù)執(zhí)行器定義了數(shù)據(jù)應(yīng)該如何被處理(how)
創(chuàng)建一個新的項目,在這個項目中,通過輸入端點來設(shè)置所需處理的數(shù)據(jù)來源,通過輸出端點來設(shè)置處理后的數(shù)據(jù)的存放地點,并設(shè)置該項目所用到的任務(wù)執(zhí)行器。之后,用戶點擊Submit按鈕,將數(shù)據(jù)提交到集群上進行處理。處理完成后,在數(shù)據(jù)處理版本列表中查看結(jié)果
技術(shù)選型
Abacuza采用微服務(wù)架構(gòu)風格,每個單獨的微服務(wù)都在容器中運行,目前實驗階段采用docker-compose進行容器編排,今后會加入Kubernetes支持。現(xiàn)將Abacuza所使用的框架與相關(guān)技術(shù)簡單羅列一下:
Spark執(zhí)行程序選擇Microsoft .NET for Spark,一方面自己對.NET技術(shù)棧比較熟悉,另一方面,.NET for Spark有著很好的流式數(shù)據(jù)處理的SDK API,并且可以很方便地整合ML.NET實現(xiàn)機器學習的業(yè)務(wù)場景
所有的微服務(wù)都是使用運行在.NET 5下的ASP.NET Core?Web API實現(xiàn),每個微服務(wù)的后端數(shù)據(jù)庫采用MongoDB
用于任務(wù)調(diào)度的Abacuza Job Service微服務(wù)使用Quartz.NET實現(xiàn)定期任務(wù)調(diào)度,用來提交數(shù)據(jù)處理任務(wù)以及更新任務(wù)狀態(tài)。后端同時采用了PostgreSQL數(shù)據(jù)庫
存儲層與服務(wù)層之間引入Redis做數(shù)據(jù)緩存,減少MongoDB的查詢負載
默認支持的Spark集群使用Apache Livy為其提供RESTful API接口
文件對象存儲采用MinIO?S3
API網(wǎng)關(guān)采用Ocelot框架
微服務(wù)的瞬態(tài)故障處理:Polly框架
身份認證與授權(quán)采用ASP.NET Core Identity集成的IdentityServer4解決方案
反向代理:nginx
前端頁面:Angular 12、Angular powered Bootstrap、Bootstrap、AdminLTE
弱弱補一句:本人前端技術(shù)沒有后端技術(shù)精湛,所以前端頁面會有不少問題,樣式也不是那么的專業(yè)美觀,前端高手請忽略這些細節(jié)。;)?Abacuza采用了插件化的設(shè)計,用戶可以根據(jù)需要擴展下面這些組件:
實現(xiàn)自己的數(shù)據(jù)處理集群以及集群連接:因此你不必拘泥于使用Apache Spark
實現(xiàn)自己的輸入端點和輸出端點:因此你可以自定義數(shù)據(jù)的輸入部分和輸出部分
實現(xiàn)自己的任務(wù)執(zhí)行器:因此你可以選擇不采用基于.NET for Spark的解決方案,你可以自己用Scala或者Python來編寫數(shù)據(jù)處理程序
在Abacuza的管理界面中,可以很方便地看到目前系統(tǒng)中已經(jīng)被加載的插件:??因此,Abacuza數(shù)據(jù)工廠應(yīng)該可以滿足絕大部分大數(shù)據(jù)處理的業(yè)務(wù)場景。本身整個平臺都是基于.NET開發(fā),并且通過NuGet分發(fā)了Abacuza SDK,因此擴展這些組件是非常簡單的,后面的演練部分可以看到詳細介紹。
部署拓撲
以下是Abacuza的部署拓撲:?
?
整個部署結(jié)構(gòu)還是比較簡單的:5個主要的微服務(wù)由基于Ocelot實現(xiàn)的API Gateway負責代理,Ocelot可以整合IdentityServer4,在Gateway的層面完成用戶的認證(Gateway層面的授權(quán)暫未實現(xiàn))。基于IdentityServer4實現(xiàn)的Identity Service并沒有部署在API Gateway的后端,因為在這個架構(gòu)中,它的認證授權(quán)策略與一般的微服務(wù)不同。API Gateway、Identity Service以及基于Angular實現(xiàn)的web app都由nginx反向代理,向外界(客戶端瀏覽器)提供統(tǒng)一的訪問端點。所有的后端服務(wù)都運行在docker里,并可以部署在Kubernetes中。
演練:在Abacuza上運行Word Count程序
Word Count是Spark官方推薦的第一個案例程序,它的任務(wù)是統(tǒng)計輸入文件中每個單詞的出現(xiàn)次數(shù)。.NET for Spark也有一個相同的Word Count案例。在此,我仍然使用Word Count案例,介紹如何在Abacuza上運行數(shù)據(jù)處理程序。
先決條件
你需要一臺Windows、MacOS或者Linux的計算機,上面裝有.NET 5 SDK、docker以及docker-compose(如果是Windows或者MacOS,則安裝docker的桌面版),同時確保安裝了git客戶端命令行。
創(chuàng)建Word Count數(shù)據(jù)處理程序
首先使用dotnet命令行創(chuàng)建一個控制臺應(yīng)用程序,然后添加相關(guān)的引用:
$ dotnet new console -f net5.0 -n WordCountApp $ cd WordCountApp $ dotnet add package Microsoft.Spark --version 1.0.0 $ dotnet add package Abacuza.JobRunners.Spark.SDK --prerelease |
然后在項目中新加入一個class文件,實現(xiàn)一個WordCountRunner類:
using?Abacuza.JobRunners.Spark.SDK; using?Microsoft.Spark.Sql; ? namespace?WordCountApp { ???public?class?WordCountRunner : SparkRunnerBase ???{ ??????public?WordCountRunner(string[] args) : base(args) ??????{ ??????} ? ??????protected?override?DataFrame RunInternal(SparkSession sparkSession, DataFrame dataFrame) ????????????=> dataFrame ???????????????.Select(Functions.Split(Functions.Col("value"), " ").Alias("words")) ???????????????.Select(Functions.Explode(Functions.Col("words")) ???????????????.Alias("word")) ???????????????.GroupBy("word") ???????????????.Count() ???????????????.OrderBy(Functions.Col("count").Desc()); ???} } |
接下來修改Program.cs文件,在Main函數(shù)中調(diào)用WordCountRunner:
static?void?Main(string[] args) { ???new?WordCountRunner(args).Run(); } |
然后,在命令行中,WordCountApp.csproj所在的目錄下,使用下面的命令來生成基于Linux x64平臺的編譯輸出:
$ dotnet publish -c Release -f net5.0 -r linux-x64 -o published |
最后,使用ZIP工具,將published下的所有文件(不包括published目錄本身)全部打包成一個ZIP壓縮包。例如,在Linux下,可以使用下面的命令將published目錄下的所有文件打成一個ZIP包:
$ zip -rj WordCountApp.zip published/. |
Word Count程序已經(jīng)寫好了,接下來我們就啟動Abacuza,并在其中運行這個WordCountApp。
運行Word Count程序
你可以使用git clone https://github.com/daxnet/abacuza.git命令,將Abacuza源代碼下載到本地,然后在Abacuza的根目錄下,使用下面的命令進行編譯:
1 | $ docker-compose -f docker-compose.build.yaml build |
編譯成功之后,用文本編輯器編輯template.env文件,在里面設(shè)置好本機的IP地址(不能使用localhost或者127.0.0.1,因為在容器環(huán)境中,localhost和127.0.0.1表示當前容器本身,而不是運行容器的主機),端口號可以默認:
然后,使用下面的命令啟動Abacuza:
1 | $ docker-compose --env-file template.env up |
啟動成功后,可以使用docker ps命令查看正在運行的容器:?
用瀏覽器訪問http://<你的IP地址>:9320,即可打開Abacuza登錄界面,輸入用戶名super,密碼P@ssw0rd完成登錄,進入Dashboard(目前Dashboard還未完成)。然后在左側(cè)菜單中,點擊Cluster Connections,然后點擊右上角的Add Connection按鈕:
在彈出的對話框中,輸入集群連接的名稱和描述,集群類型選擇spark,在設(shè)置欄中,輸入用于連接Spark集群的JSON配置信息。由于我們本地啟動的Spark在容器中,直接使用本機的IP地址即可,如果你的Spark集群部署在其它機器上,也可以使用其它的IP地址。在配置完這些信息后,點擊Save按鈕保存:
接下來就是創(chuàng)建任務(wù)執(zhí)行器。在Abacuza管理界面,點擊左邊的Job Runners菜單,然后點擊右上角的Add Job Runner按鈕:?
在彈出的對話框中,輸入任務(wù)執(zhí)行器的名稱和描述信息,集群類型選擇spark,之后當該任務(wù)執(zhí)行器開始執(zhí)行時,會挑選任意一個類型為spark的集群來處理數(shù)據(jù)。?
填入這些基本信息后,點擊Save按鈕,此時會進入任務(wù)執(zhí)行器的詳細頁面,用來進行進一步的設(shè)置。在Payload template中,輸入以下JSON文本:
1 2 3 4 5 6 7 8 9 10 11 | { ??"file": "${jr:binaries:microsoft-spark-3-0_2.12-1.0.0.jar}", ??"className": "org.apache.spark.deploy.dotnet.DotnetRunner", ??"args": [ ????"${jr:binaries:WordCountApp.zip}", ????"WordCountApp", ????"${proj:input-defs}", ????"${proj:output-defs}", ????"${proj:context}" ??] } |
大概介紹一下每個參數(shù):
file:指定了在Spark集群上需要運行的程序所在的JAR包,這里直接使用微軟的Spark JAR
className:指定了需要運行的程序在JAR包中的名稱,這里固定使用org.apache.spark.deploy.dotnet.DotnetRunner
${jr:binaries:WordCountApp.zip} 表示由className指定的DotnetRunner會調(diào)用當前任務(wù)執(zhí)行器中的二進制文件WordCountApp.zip中的程序來執(zhí)行數(shù)據(jù)處理任務(wù)
WordCountApp 為ZIP包中可執(zhí)行程序的名稱
${proj:input-defs} 表示輸入文件及其配置將引用當前執(zhí)行數(shù)據(jù)處理的項目中的輸入端點的定義
${proj:output-defs} 表示輸出文件及其配置將引用當前執(zhí)行數(shù)據(jù)處理的項目中的輸出端點的定義
${proj:context} 表示Spark會從當前項目讀入相關(guān)信息并將其傳遞給任務(wù)執(zhí)行器
在上面的配置中,引用了兩個binary文件:microsoft-spark-3-0_2.12-1.0.0.jar和WordCountApp.zip。于是,我們需要將這兩個文件上傳到任務(wù)執(zhí)行器中。仍然在任務(wù)執(zhí)行器的編輯界面,在Binaries列表中,點擊加號按鈕,將這兩個文件附加到任務(wù)執(zhí)行器上。注意:microsoft-spark-3-0_2.12-1.0.0.jar文件位于上文用到的published目錄中,而WordCountApp.zip則是在上文中生成的ZIP壓縮包。
配置完成后,點擊Save & Close按鈕,保存任務(wù)執(zhí)行器。接下來,創(chuàng)建一個數(shù)據(jù)處理項目,在左邊的菜單中,點擊Projects,然后在右上角點擊Add Project按鈕:?
在彈出的Add Project對話框中,輸入項目的名稱、描述,然后選擇輸入端點和輸出端點,以及負責處理該項目數(shù)據(jù)的任務(wù)執(zhí)行器:
在此,我們將輸入端點設(shè)置為文本文件(Text Files),輸出端點設(shè)置為控制臺(Console),也就是直接輸出到日志中。這些配置在后續(xù)的項目編輯頁面中也是可以更改的。一個項目可以包含多個輸入端點,但是只能有一個輸出端點。點擊Save按鈕保存設(shè)置,此時Abacuza會打開項目的詳細頁,在INPUT選項卡下,添加需要統(tǒng)計單詞出現(xiàn)次數(shù)的文本文件:?
在OUTPUT選項卡下,確認輸出端點設(shè)置為Console:
然后點擊右上角或者右下角的Submit按鈕,提交數(shù)據(jù)處理任務(wù),此時,選項卡會自動切換到REVISIONS,并且更新Job的狀態(tài):?
稍等片刻,如果數(shù)據(jù)處理成功,Job Status會從RUNNING變?yōu)镃OMPLETED:
點擊Actions欄中的文件按鈕,即可查看數(shù)據(jù)處理的日志輸出:
從日志文件中可以看到,Abacuza已經(jīng)根據(jù)我們寫的數(shù)據(jù)處理程序,統(tǒng)計出輸入文件input.txt中每個單詞的出現(xiàn)次數(shù)。通過容器的日志輸出也能看到同樣的信息:?
總結(jié)
本文介紹了自己純手工打造的數(shù)據(jù)工廠(Data Factory)的設(shè)計與實現(xiàn),并開發(fā)了一個案例來演示該數(shù)據(jù)工廠完成數(shù)據(jù)處理的整個過程。之后還有很多功能可以完善:Dashboard、認證授權(quán)的優(yōu)化、用戶與組的管理、第三方IdP的集成、Pipeline的實現(xiàn)等等,今后有空再慢慢弄吧。
總結(jié)
以上是生活随笔為你收集整理的徒手打造基于Spark的数据工厂(Data Factory):从设计到实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何用 Linq 进行多重 Orderb
- 下一篇: BeetleX.FastHttpApi之