日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

徒手打造基于Spark的数据工厂(Data Factory):从设计到实现

發(fā)布時間:2023/12/4 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 徒手打造基于Spark的数据工厂(Data Factory):从设计到实现 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在大數(shù)據(jù)處理和人工智能時代,數(shù)據(jù)工廠(Data Factory)無疑是一個非常重要的大數(shù)據(jù)處理平臺。市面上也有成熟的相關(guān)產(chǎn)品,比如Azure Data Factory,不僅功能強大,而且依托微軟的云計算平臺Azure,為大數(shù)據(jù)處理提供了強大的計算能力,讓大數(shù)據(jù)處理變得更為穩(wěn)定高效。由于工作中我的項目也與大數(shù)據(jù)處理相關(guān),于是我就在思考,是否自己也可以設(shè)計打造一個數(shù)據(jù)工廠,以便尋求一些技術(shù)痛點的解決方案,并且引入一些有趣的新功能。因此,我利用業(yè)余時間,逐步打造了一個基于Spark的數(shù)據(jù)工廠,并取名為Abacuza(Abacus是中國的“算盤”的意思,隱喻它是一個專門做數(shù)據(jù)計算的平臺,使用“算盤”一詞的變體,也算是體現(xiàn)一點中國元素吧)。說是基于Spark,其實從整個架構(gòu)來看,Abacuza并不一定非要基于Spark,只需要為其定制某種數(shù)據(jù)處理引擎的插件即可,所以,Spark其實僅僅是Abacuza的一個插件,當然,Spark是目前主流的數(shù)據(jù)處理引擎,Abacuza將其作為默認的數(shù)據(jù)處理插件。?Abacuza是開源的,項目地址是:https://github.com/daxnet/abacuza。徒手打造?是的,沒錯,從前端界面都后端開發(fā),從代碼到持續(xù)集成,再到部署腳本和SDK與容器鏡像的發(fā)布,都是自己一步步弄出來的。項目主頁上有一個簡單的教程,后面我會詳細介紹一下。在介紹如何使用Abacuza之前,我們先了解一下它的整體架構(gòu)和設(shè)計思想。雖然目前Abacuza還有很多功能沒有完成,但并不影響整個數(shù)據(jù)處理流程的執(zhí)行。

整體架構(gòu)

Abacuza和其它的數(shù)據(jù)工廠平臺一樣,它的業(yè)務(wù)流程就是分三步走:數(shù)據(jù)讀入、數(shù)據(jù)處理、結(jié)果輸出。Abacuza的整體架構(gòu)圖就很清楚地體現(xiàn)了這個業(yè)務(wù)流程:?

(點擊查看大圖)

數(shù)據(jù)輸入部分

數(shù)據(jù)的輸入是由輸入端點(Input Endpoints)來定義的。Abacuza支持多種數(shù)據(jù)類型的輸入:CSV文件、JSON文件、TXT文本文件、Microsoft SQL Server(暫未完全實現(xiàn))以及S3的對象存儲路徑,今后還可以繼續(xù)擴展輸入端點,以支持基于管道(Pipeline)的數(shù)據(jù)處理流程,這樣一來,用戶就不需要自己使用C#或者Scala來編寫數(shù)據(jù)處理的邏輯代碼,只需要一套JSON文件進行Pipeline定義就可以了。

數(shù)據(jù)處理部分

當數(shù)據(jù)輸入已經(jīng)定義好以后,Abacuza會根據(jù)Input Endpoint的設(shè)置,將數(shù)據(jù)讀入,然后轉(zhuǎn)交給后端的數(shù)據(jù)處理集群(Cluster)進行處理。Abacuza可以以插件的形式支持不同類型的集群,如上文所說,Apache Spark是Abacuza所支持的一種數(shù)據(jù)處理集群,在上面的架構(gòu)圖中可以看到,Abacuza Cluster Service管理這些集群,工作任務(wù)調(diào)度器(Job Scheduler)會通過Abacuza Cluster Service將數(shù)據(jù)處理任務(wù)分配到指定類型的集群上進行處理。對于Spark而言,具體的數(shù)據(jù)處理邏輯是由用戶自己編寫代碼實現(xiàn)的。Spark原生支持Scala,也可以使用PySpark,Abacuza使用Microsoft .NET for Spark項目實現(xiàn)從.NET到Spark的綁定(Binding),用戶可以使用C#來編寫Spark的數(shù)據(jù)處理邏輯,后面的演練部分我會詳細介紹。那么與Scala相比,通過.NET for Spark使用C#編寫的數(shù)據(jù)處理程序會不會有性能問題?嗯,會有點性能問題,請看下圖(圖片來源:微軟.NET for Spark官方網(wǎng)站):?

在這個Benchmark中,處理相同總量的數(shù)據(jù),Scala使用了375秒,.NET花了406秒,Python使用433秒,雖然與Scala相比有些差距,但是比Python要好一些。但是不用擔心,如果在你的應(yīng)用場景中,性能是放在第一位的,那么Abacuza的Job Runner機制允許你使用Scala編寫數(shù)據(jù)處理程序,然后上傳到Spark集群執(zhí)行(也就是你不需要依賴于.NET和C#)。

數(shù)據(jù)輸出部分

與數(shù)據(jù)輸入部分類似,處理之后的數(shù)據(jù)輸出方式是由輸出端點(Output Endpoints)來定義的。Abacuza也支持多種數(shù)據(jù)輸出方式:將結(jié)果打印到日志、將結(jié)果輸出到外部文件系統(tǒng)以及將結(jié)果輸出到當前項目所在的S3對象存儲路徑。無論是數(shù)據(jù)輸入部分還是輸出部分,這些端點都是可以定制的,并且可以通過ASP.NET Core的插件系統(tǒng)以及docker-compose或者Kubernetes的volume/Block Storage來實現(xiàn)動態(tài)加載。

相關(guān)概念和運作機理

Abacuza有以下這些概念:

  • 集群(Cluster):一個集群是一個完整的大數(shù)據(jù)處理平臺,比如Apache Spark

  • 集群類型(Cluster Type):定義集群的類型,例如,運行在localhost的Spark集群和運行在云端的Spark集群都是Spark集群,那么它們的集群類型就是spark。

  • 集群連接(Cluster Connection):定義了Abacuza數(shù)據(jù)工廠訪問集群的方式,類似于數(shù)據(jù)庫系統(tǒng)的連接字符串

  • 任務(wù)執(zhí)行器(Job Runner):定義了數(shù)據(jù)處理任務(wù)應(yīng)該如何被提交到集群上執(zhí)行。它可以包含具體的數(shù)據(jù)處理業(yè)務(wù)邏輯

  • 輸入端點(Input Endpoint):定義了原始數(shù)據(jù)(需要被處理的數(shù)據(jù))的來源

  • 輸出端點(Output Endpoint):定義了處理完成后的數(shù)據(jù)的輸出方式

  • 項目(Project):一種類型數(shù)據(jù)處理任務(wù)的邏輯定義,它包括多個輸入端點、一個輸出端點以及多個數(shù)據(jù)處理版本(Revision)的信息,同時它還定義了應(yīng)該使用哪個任務(wù)執(zhí)行器來執(zhí)行數(shù)據(jù)處理任務(wù)

  • 數(shù)據(jù)處理版本(Revision):它歸屬于一個特定的項目,表示不同批次的數(shù)據(jù)處理結(jié)果

  • 當一個用戶準備使用Abacuza完成一次大數(shù)據(jù)處理的任務(wù)時,一般會按照下面的步驟進行:

  • 使用用戶名/密碼(暫時只支持用戶名密碼登錄)登錄Abacuza的管理界面

  • 基于一個已經(jīng)安裝好的集群(比如Apache Spark),配置它的集群類型集群連接,用來定義Abacuza與該集群的通信方式(集群和集群連接定義了數(shù)據(jù)應(yīng)該在哪里被處理(where))

  • 定義任務(wù)執(zhí)行器,在任務(wù)執(zhí)行器中,設(shè)置運行數(shù)據(jù)處理任務(wù)的集群類型,當數(shù)據(jù)處理任務(wù)被提交時,Abacuza Cluster Service會基于所選的集群類型,根據(jù)一定的算法來選擇一個集群進行數(shù)據(jù)處理。任務(wù)執(zhí)行器中也定義了數(shù)據(jù)處理的邏輯,(比如,由Scala、C#或者Python編寫的應(yīng)用程序,可以上傳到spark類型的集群上運行)。簡單地說,任務(wù)執(zhí)行器定義了數(shù)據(jù)應(yīng)該如何被處理(how

  • 創(chuàng)建一個新的項目,在這個項目中,通過輸入端點來設(shè)置所需處理的數(shù)據(jù)來源,通過輸出端點來設(shè)置處理后的數(shù)據(jù)的存放地點,并設(shè)置該項目所用到的任務(wù)執(zhí)行器。之后,用戶點擊Submit按鈕,將數(shù)據(jù)提交到集群上進行處理。處理完成后,在數(shù)據(jù)處理版本列表中查看結(jié)果

  • 技術(shù)選型

    Abacuza采用微服務(wù)架構(gòu)風格,每個單獨的微服務(wù)都在容器中運行,目前實驗階段采用docker-compose進行容器編排,今后會加入Kubernetes支持。現(xiàn)將Abacuza所使用的框架與相關(guān)技術(shù)簡單羅列一下:

  • Spark執(zhí)行程序選擇Microsoft .NET for Spark,一方面自己對.NET技術(shù)棧比較熟悉,另一方面,.NET for Spark有著很好的流式數(shù)據(jù)處理的SDK API,并且可以很方便地整合ML.NET實現(xiàn)機器學習的業(yè)務(wù)場景

  • 所有的微服務(wù)都是使用運行在.NET 5下的ASP.NET Core?Web API實現(xiàn),每個微服務(wù)的后端數(shù)據(jù)庫采用MongoDB

  • 用于任務(wù)調(diào)度的Abacuza Job Service微服務(wù)使用Quartz.NET實現(xiàn)定期任務(wù)調(diào)度,用來提交數(shù)據(jù)處理任務(wù)以及更新任務(wù)狀態(tài)。后端同時采用了PostgreSQL數(shù)據(jù)庫

  • 存儲層與服務(wù)層之間引入Redis做數(shù)據(jù)緩存,減少MongoDB的查詢負載

  • 默認支持的Spark集群使用Apache Livy為其提供RESTful API接口

  • 文件對象存儲采用MinIO?S3

  • API網(wǎng)關(guān)采用Ocelot框架

  • 微服務(wù)的瞬態(tài)故障處理:Polly框架

  • 身份認證與授權(quán)采用ASP.NET Core Identity集成的IdentityServer4解決方案

  • 反向代理:nginx

  • 前端頁面:Angular 12、Angular powered Bootstrap、Bootstrap、AdminLTE

  • 弱弱補一句:本人前端技術(shù)沒有后端技術(shù)精湛,所以前端頁面會有不少問題,樣式也不是那么的專業(yè)美觀,前端高手請忽略這些細節(jié)。;)?Abacuza采用了插件化的設(shè)計,用戶可以根據(jù)需要擴展下面這些組件:

    • 實現(xiàn)自己的數(shù)據(jù)處理集群以及集群連接:因此你不必拘泥于使用Apache Spark

    • 實現(xiàn)自己的輸入端點輸出端點:因此你可以自定義數(shù)據(jù)的輸入部分和輸出部分

    • 實現(xiàn)自己的任務(wù)執(zhí)行器:因此你可以選擇不采用基于.NET for Spark的解決方案,你可以自己用Scala或者Python來編寫數(shù)據(jù)處理程序

    在Abacuza的管理界面中,可以很方便地看到目前系統(tǒng)中已經(jīng)被加載的插件:??因此,Abacuza數(shù)據(jù)工廠應(yīng)該可以滿足絕大部分大數(shù)據(jù)處理的業(yè)務(wù)場景。本身整個平臺都是基于.NET開發(fā),并且通過NuGet分發(fā)了Abacuza SDK,因此擴展這些組件是非常簡單的,后面的演練部分可以看到詳細介紹。

    部署拓撲

    以下是Abacuza的部署拓撲:?

    ?

    整個部署結(jié)構(gòu)還是比較簡單的:5個主要的微服務(wù)由基于Ocelot實現(xiàn)的API Gateway負責代理,Ocelot可以整合IdentityServer4,在Gateway的層面完成用戶的認證(Gateway層面的授權(quán)暫未實現(xiàn))。基于IdentityServer4實現(xiàn)的Identity Service并沒有部署在API Gateway的后端,因為在這個架構(gòu)中,它的認證授權(quán)策略與一般的微服務(wù)不同。API Gateway、Identity Service以及基于Angular實現(xiàn)的web app都由nginx反向代理,向外界(客戶端瀏覽器)提供統(tǒng)一的訪問端點。所有的后端服務(wù)都運行在docker里,并可以部署在Kubernetes中。

    演練:在Abacuza上運行Word Count程序

    Word Count是Spark官方推薦的第一個案例程序,它的任務(wù)是統(tǒng)計輸入文件中每個單詞的出現(xiàn)次數(shù)。.NET for Spark也有一個相同的Word Count案例。在此,我仍然使用Word Count案例,介紹如何在Abacuza上運行數(shù)據(jù)處理程序。

    先決條件

    你需要一臺Windows、MacOS或者Linux的計算機,上面裝有.NET 5 SDK、docker以及docker-compose(如果是Windows或者MacOS,則安裝docker的桌面版),同時確保安裝了git客戶端命令行。

    創(chuàng)建Word Count數(shù)據(jù)處理程序

    首先使用dotnet命令行創(chuàng)建一個控制臺應(yīng)用程序,然后添加相關(guān)的引用:


    $ dotnet new console -f net5.0 -n WordCountApp

    $ cd WordCountApp

    $ dotnet add package Microsoft.Spark --version 1.0.0

    $ dotnet add package Abacuza.JobRunners.Spark.SDK --prerelease

    然后在項目中新加入一個class文件,實現(xiàn)一個WordCountRunner類:


    using?Abacuza.JobRunners.Spark.SDK;

    using?Microsoft.Spark.Sql;

    ?

    namespace?WordCountApp

    {

    ???public?class?WordCountRunner : SparkRunnerBase

    ???{

    ??????public?WordCountRunner(string[] args) : base(args)

    ??????{

    ??????}

    ?

    ??????protected?override?DataFrame RunInternal(SparkSession sparkSession, DataFrame dataFrame)

    ????????????=> dataFrame

    ???????????????.Select(Functions.Split(Functions.Col("value"), " ").Alias("words"))

    ???????????????.Select(Functions.Explode(Functions.Col("words"))

    ???????????????.Alias("word"))

    ???????????????.GroupBy("word")

    ???????????????.Count()

    ???????????????.OrderBy(Functions.Col("count").Desc());

    ???}

    }

    接下來修改Program.cs文件,在Main函數(shù)中調(diào)用WordCountRunner:


    static?void?Main(string[] args)

    {

    ???new?WordCountRunner(args).Run();

    }

    然后,在命令行中,WordCountApp.csproj所在的目錄下,使用下面的命令來生成基于Linux x64平臺的編譯輸出:


    $ dotnet publish -c Release -f net5.0 -r linux-x64 -o published

    最后,使用ZIP工具,將published下的所有文件(不包括published目錄本身)全部打包成一個ZIP壓縮包。例如,在Linux下,可以使用下面的命令將published目錄下的所有文件打成一個ZIP包:


    $ zip -rj WordCountApp.zip published/.

    Word Count程序已經(jīng)寫好了,接下來我們就啟動Abacuza,并在其中運行這個WordCountApp。

    運行Word Count程序

    你可以使用git clone https://github.com/daxnet/abacuza.git命令,將Abacuza源代碼下載到本地,然后在Abacuza的根目錄下,使用下面的命令進行編譯:

    1

    $ docker-compose -f docker-compose.build.yaml build

    編譯成功之后,用文本編輯器編輯template.env文件,在里面設(shè)置好本機的IP地址(不能使用localhost或者127.0.0.1,因為在容器環(huán)境中,localhost和127.0.0.1表示當前容器本身,而不是運行容器的主機),端口號可以默認:

    然后,使用下面的命令啟動Abacuza:

    1

    $ docker-compose --env-file template.env up

    啟動成功后,可以使用docker ps命令查看正在運行的容器:?

    用瀏覽器訪問http://<你的IP地址>:9320,即可打開Abacuza登錄界面,輸入用戶名super,密碼P@ssw0rd完成登錄,進入Dashboard(目前Dashboard還未完成)。然后在左側(cè)菜單中,點擊Cluster Connections,然后點擊右上角的Add Connection按鈕:

    在彈出的對話框中,輸入集群連接的名稱和描述,集群類型選擇spark,在設(shè)置欄中,輸入用于連接Spark集群的JSON配置信息。由于我們本地啟動的Spark在容器中,直接使用本機的IP地址即可,如果你的Spark集群部署在其它機器上,也可以使用其它的IP地址。在配置完這些信息后,點擊Save按鈕保存:

    接下來就是創(chuàng)建任務(wù)執(zhí)行器。在Abacuza管理界面,點擊左邊的Job Runners菜單,然后點擊右上角的Add Job Runner按鈕:?

    在彈出的對話框中,輸入任務(wù)執(zhí)行器的名稱和描述信息,集群類型選擇spark,之后當該任務(wù)執(zhí)行器開始執(zhí)行時,會挑選任意一個類型為spark的集群來處理數(shù)據(jù)。?

    填入這些基本信息后,點擊Save按鈕,此時會進入任務(wù)執(zhí)行器的詳細頁面,用來進行進一步的設(shè)置。在Payload template中,輸入以下JSON文本:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    {

    ??"file": "${jr:binaries:microsoft-spark-3-0_2.12-1.0.0.jar}",

    ??"className": "org.apache.spark.deploy.dotnet.DotnetRunner",

    ??"args": [

    ????"${jr:binaries:WordCountApp.zip}",

    ????"WordCountApp",

    ????"${proj:input-defs}",

    ????"${proj:output-defs}",

    ????"${proj:context}"

    ??]

    }

    大概介紹一下每個參數(shù):

    • file:指定了在Spark集群上需要運行的程序所在的JAR包,這里直接使用微軟的Spark JAR

    • className:指定了需要運行的程序在JAR包中的名稱,這里固定使用org.apache.spark.deploy.dotnet.DotnetRunner

    • ${jr:binaries:WordCountApp.zip} 表示由className指定的DotnetRunner會調(diào)用當前任務(wù)執(zhí)行器中的二進制文件WordCountApp.zip中的程序來執(zhí)行數(shù)據(jù)處理任務(wù)

    • WordCountApp 為ZIP包中可執(zhí)行程序的名稱

    • ${proj:input-defs} 表示輸入文件及其配置將引用當前執(zhí)行數(shù)據(jù)處理的項目中的輸入端點的定義

    • ${proj:output-defs} 表示輸出文件及其配置將引用當前執(zhí)行數(shù)據(jù)處理的項目中的輸出端點的定義

    • ${proj:context} 表示Spark會從當前項目讀入相關(guān)信息并將其傳遞給任務(wù)執(zhí)行器

    在上面的配置中,引用了兩個binary文件:microsoft-spark-3-0_2.12-1.0.0.jar和WordCountApp.zip。于是,我們需要將這兩個文件上傳到任務(wù)執(zhí)行器中。仍然在任務(wù)執(zhí)行器的編輯界面,在Binaries列表中,點擊加號按鈕,將這兩個文件附加到任務(wù)執(zhí)行器上。注意:microsoft-spark-3-0_2.12-1.0.0.jar文件位于上文用到的published目錄中,而WordCountApp.zip則是在上文中生成的ZIP壓縮包。

    配置完成后,點擊Save & Close按鈕,保存任務(wù)執(zhí)行器。接下來,創(chuàng)建一個數(shù)據(jù)處理項目,在左邊的菜單中,點擊Projects,然后在右上角點擊Add Project按鈕:?

    在彈出的Add Project對話框中,輸入項目的名稱、描述,然后選擇輸入端點和輸出端點,以及負責處理該項目數(shù)據(jù)的任務(wù)執(zhí)行器:

    在此,我們將輸入端點設(shè)置為文本文件(Text Files),輸出端點設(shè)置為控制臺(Console),也就是直接輸出到日志中。這些配置在后續(xù)的項目編輯頁面中也是可以更改的。一個項目可以包含多個輸入端點,但是只能有一個輸出端點。點擊Save按鈕保存設(shè)置,此時Abacuza會打開項目的詳細頁,在INPUT選項卡下,添加需要統(tǒng)計單詞出現(xiàn)次數(shù)的文本文件:?

    在OUTPUT選項卡下,確認輸出端點設(shè)置為Console:

    然后點擊右上角或者右下角的Submit按鈕,提交數(shù)據(jù)處理任務(wù),此時,選項卡會自動切換到REVISIONS,并且更新Job的狀態(tài):?

    稍等片刻,如果數(shù)據(jù)處理成功,Job Status會從RUNNING變?yōu)镃OMPLETED:

    點擊Actions欄中的文件按鈕,即可查看數(shù)據(jù)處理的日志輸出:

    從日志文件中可以看到,Abacuza已經(jīng)根據(jù)我們寫的數(shù)據(jù)處理程序,統(tǒng)計出輸入文件input.txt中每個單詞的出現(xiàn)次數(shù)。通過容器的日志輸出也能看到同樣的信息:?

    總結(jié)

    本文介紹了自己純手工打造的數(shù)據(jù)工廠(Data Factory)的設(shè)計與實現(xiàn),并開發(fā)了一個案例來演示該數(shù)據(jù)工廠完成數(shù)據(jù)處理的整個過程。之后還有很多功能可以完善:Dashboard、認證授權(quán)的優(yōu)化、用戶與組的管理、第三方IdP的集成、Pipeline的實現(xiàn)等等,今后有空再慢慢弄吧。

    總結(jié)

    以上是生活随笔為你收集整理的徒手打造基于Spark的数据工厂(Data Factory):从设计到实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。