日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark入门_入门必读 | Spark 论文导读

發(fā)布時間:2023/12/15 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark入门_入门必读 | Spark 论文导读 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Resilient Distributed Datasets: A fault-tolerant abstraction for in-Memory cluster computing, 是講述 Spark RDD 的基礎(chǔ)論文,通讀論文能給我們帶來全景的 Spark 知識面

摘要:RDD,全稱Resilient Distributed Dataset,可伸縮性數(shù)據(jù)集。使用它編程,可以有效利用大規(guī)模集群的內(nèi)存,并且兼顧容錯。RDD的流行,完美解決了兩類應(yīng)用難題:迭代算法(Iterative Algorithm)和交互性數(shù)據(jù)挖掘工具。在這兩類應(yīng)用中,RDD緩存中間結(jié)果集的辦法,使得程序運(yùn)行性能提高了一個量級。在容錯方面,RDD使用了粗放型的共享內(nèi)存轉(zhuǎn)換方法,而不是對其(共享內(nèi)存)做精控更新。RDD完全可以勝任迭代算法(此前這類任務(wù)都由Pregel這樣的編程模型完成),并且對新數(shù)據(jù)分析算法、應(yīng)用都提供更好的支持。通過大量的用戶應(yīng)用和壓力測試,最終Spark實(shí)現(xiàn)了RDD.

1 簡介:

像MapReduce和Dryad這樣的集群計(jì)算框架,已經(jīng)廣泛應(yīng)用于大規(guī)模數(shù)據(jù)分析。這類計(jì)算框架,最大的兩大優(yōu)點(diǎn),旨在幫助程序員專注業(yè)務(wù)編程,而非花精力分發(fā)計(jì)算任務(wù)和實(shí)現(xiàn)程序容錯。

當(dāng)今的計(jì)算框架雖然對利用集群中的計(jì)算資源做了各類抽象,但還沒有實(shí)現(xiàn)對集群內(nèi)存的抽象封裝。這樣對那些需要重復(fù)利用中間結(jié)果集的應(yīng)用就很不友好,比如機(jī)器學(xué)習(xí)和圖算法,PageRank,K-means 聚類以及邏輯回歸等等。另一類計(jì)算,比如交互式數(shù)據(jù)分析,因涉及大量的即席數(shù)據(jù)查詢,為確保下一次數(shù)據(jù)集可以被重用,需要借助存儲物化結(jié)果集,這引發(fā)大量寫入實(shí)體磁盤的操作,導(dǎo)致執(zhí)行時間拉長。

意識到這個問題的存在,專家們做了大量嘗試,比如 Pregel,把大量中間數(shù)據(jù)緩存起來,專為圖計(jì)算封裝了框架;HaLoop 則提供了實(shí)現(xiàn)迭代算法的MapReduce接口。但這些僅僅對個案有幫助,回到通用的計(jì)算上來,毫無優(yōu)勢。比如最常見的數(shù)據(jù)分析,裝載多樣化多源頭數(shù)據(jù),展開即席查詢。

RDD彌補(bǔ)了“專家型計(jì)算框架”的缺陷,支持通用型分布式并行計(jì)算。使用集群中所有節(jié)點(diǎn)內(nèi)存來裝載同一應(yīng)用所需的數(shù)據(jù),兼容并包圖形數(shù)據(jù),二維數(shù)據(jù),非結(jié)構(gòu)化數(shù)據(jù)。且提供容錯機(jī)制,控制并行數(shù)據(jù)結(jié)構(gòu)和持久化中間數(shù)據(jù)集。最神奇的地方是,RDD根據(jù)分區(qū)有效控制數(shù)據(jù)分布,利用高度抽象豐富的API去操作數(shù)據(jù)。

設(shè)計(jì)RDD遇到最大的難點(diǎn)是容錯。現(xiàn)存的對集群內(nèi)存的抽象,包括分布式共享內(nèi)存,鍵值對,數(shù)據(jù)庫和Piccolo,提供的接口都是針對穩(wěn)定狀態(tài)的精控更新,比如二維表中的單元格。利用這類接口,保證容錯的方法只能制作跨節(jié)點(diǎn)數(shù)據(jù)副本,或者異地日志備份。顯而易見,這些操作對于大數(shù)據(jù)量的支持不夠友好,既浪費(fèi)網(wǎng)絡(luò)流量還增加了存儲開銷。

與這些老式的設(shè)計(jì)相比,RDD的優(yōu)勢在于存儲計(jì)算方法而不是數(shù)據(jù)。數(shù)據(jù)經(jīng)過一系列計(jì)算得到最終的結(jié)果,如果要保存這些數(shù)據(jù)的中間狀態(tài)來完成容錯,那還不如保存如何得到這些數(shù)據(jù)的計(jì)算方法來的開銷少。就如前面所說,保存這些中間數(shù)據(jù)集好處是可以提高性能。與容錯機(jī)制并不矛盾。舉例:讀取數(shù)據(jù)源后,原始RDD就初始化成功,經(jīng)過map,filter,reduce得到一系列新的RDD,一旦RDD失效,只要重新按照RDD的生成路徑執(zhí)行,數(shù)據(jù)還能復(fù)原。RDD天然還有分區(qū)屬性,即他的數(shù)據(jù)是分區(qū)存儲于集群中某些節(jié)點(diǎn)上,同一時間點(diǎn)不會所有分區(qū)都失效,那么重新計(jì)算某一個或幾個失效分區(qū),需花費(fèi)的時間肯定比重新計(jì)算所有分區(qū)來的少。

在RDD發(fā)明之前,很多特殊的計(jì)算需求只能靠不斷引入新的計(jì)算框架才能解決,比如 MapReduce, DryadLINQ,SQL, Pregel和HaLoop. 而RDD發(fā)明之后,對于這類在多個數(shù)據(jù)集上重復(fù)一組運(yùn)算的操作,變得簡單和通用了。乍看上去,RDD似乎有很多缺陷,但在解決實(shí)際問題上,RDD卻是把合適的利劍。

以RDD為編程核心的Spark,廣泛用于 UC Berkeley 和眾多公司。它以Scala為首要編程語言,提供方便的集成編程接口,有點(diǎn)類似DryadLINQ.除此之外,Spark提供的Scala解釋器,可以很容易讓編程人員完成大規(guī)模數(shù)據(jù)集的集成處理。大概Spark是第一個使用通用編程語言來達(dá)到在集群中完成交互式數(shù)據(jù)挖掘的工具。

通過壓力測試,Spark處理迭代計(jì)算的速度是Hadoop的20倍,完成一份數(shù)據(jù)報(bào)表的分析,總耗時比之前的技術(shù)快40倍。甚至在5-7秒的延遲內(nèi),可以處理1TB的數(shù)據(jù)掃描。從更底層的角度出發(fā),實(shí)際上在Spark中還繼承了Pregel和HaLoop編程模式,并采用了代碼優(yōu)化,使得編程庫只有200行代碼那么輕便。

2 可伸縮性分布式數(shù)據(jù)集(RDD-Resilient Distributed Datasets)

在本小節(jié),主要探討以下方面內(nèi)容:

1)RDD 的編程接口
2)RDD 與精控共享內(nèi)存的對比
3)RDD 的缺陷

2.1 RDD 抽象

首先,RDD 最明顯的兩個特征:1)只讀;2)分區(qū)。

只讀的屬性注定了RDD的產(chǎn)生方式只有新建,要么從其他數(shù)據(jù)源讀取而來,要么從已有的RDD修剪出來。

看到這里我有兩個問題:1)其他數(shù)據(jù)源讀取,如何分區(qū)并行讀取?比如讀一張數(shù)據(jù)庫二維表,如何并行地去讀? 2)RDD從另一個RDD派生出來,會造成大量數(shù)據(jù)重復(fù),占用大量內(nèi)容,如何優(yōu)化?

RDD 的這類生產(chǎn)方式,叫做轉(zhuǎn)換操作(Transformation).這類操作并沒有直接作用在RDD本身的數(shù)據(jù)結(jié)構(gòu)上,而是重新生成新的RDD.那么為什么不是直接在原RDD上做轉(zhuǎn)換呢,這是需要思考的問題。

常見的轉(zhuǎn)換操作有map(),filter(),join()等,后續(xù)詳解。

但,RDD并不總需要物化數(shù)據(jù)。它記錄了足夠多的繼承、轉(zhuǎn)換步驟等信息,即血統(tǒng),以便在必要的時候,實(shí)現(xiàn)自我修復(fù),從頭再生一個RDD。RDD的分區(qū)屬性,又幫助再生RDD的過程執(zhí)行得非常高效,僅再生丟失的RDD分區(qū)即可。如果RDD丟失了血統(tǒng)信息,它將不能被任何程序調(diào)用。那么RDD的血統(tǒng)數(shù)據(jù)結(jié)構(gòu)又是如何的呢?

用戶可控的兩個RDD屬性是分區(qū)和持久化。持久化提高RDD數(shù)據(jù)的可復(fù)用性,可以存儲在內(nèi)存,可以存在硬盤(當(dāng)然是在逼不得已的情況下)。分區(qū)是特別優(yōu)雅的屬性,它方便程序員靈活的部署數(shù)據(jù)分布,使得最終需要JOIN的兩個數(shù)據(jù)集,按照同一個鍵值做哈希分區(qū)(Hash-Partition),這樣在Join時加快了處理速度。

2.2 Spark編程接口

與 DryadLINQ,FlumeJava 一樣,Spark 操控RDD同樣使用語言集成化編程接口(language-integrated API), 即把RDD當(dāng)做對象,使用對象的方法來操作RDD.

在編寫 Spark 應(yīng)用程序時,程序員首先做的事情,便是通過轉(zhuǎn)換函數(shù),將源數(shù)據(jù)抽取過來,生成一組RDD;在RDD上執(zhí)行動作函數(shù)(Action),使得結(jié)算結(jié)果返回驅(qū)動程序(Spark程序發(fā)起點(diǎn)),或是單值,或是數(shù)據(jù)集,或裝載到其他存儲設(shè)備或文件。整個過程中,最有技巧的地方是,RDD的動作函數(shù)(Action)才是真正的程序起始點(diǎn),第一個動作函數(shù)開始執(zhí)行時,整個數(shù)據(jù)流和任務(wù)流才開始。這是RDD典型的惰性計(jì)算。

在復(fù)雜的Spark程序中,轉(zhuǎn)換函數(shù)在動作函數(shù)之前可能會有很多,每一步的轉(zhuǎn)換函數(shù)都能重生一個RDD,當(dāng)這些RDD需要在長鏈條的轉(zhuǎn)換函數(shù)中重復(fù)利用時,把特定的RDD固化下來,是提高性能的不二法門。Spark做得完美的地方在于,他允許我們將中間結(jié)果集(RDD)用persist方法暫存于內(nèi)存中。比如對于從Hive來的數(shù)據(jù),我們既需要做總計(jì),還需要分維度做分計(jì),那么計(jì)算整理出來的原始數(shù)據(jù),就最好存入內(nèi)存。除非內(nèi)存不夠大,則選擇存入硬盤,或復(fù)制到更遠(yuǎn)的遠(yuǎn)程服務(wù)器。甚至還可以控制RDD存盤的優(yōu)先級別。

實(shí)例:使用控制臺挖掘日志

當(dāng)運(yùn)營需要對上T的網(wǎng)絡(luò)日志做錯誤分析時,如果使用Hadoop平臺HDFS格式存儲,要分析日志,首先要編寫MapReduce程序,在程序中篩選錯誤日志,之后聚合匯總;也可以使用Hive來查詢,前提是搭建Hive環(huán)境,并設(shè)計(jì)好表結(jié)構(gòu)。

如果采用Spark查詢,會是下面的編程腳本,非常簡易:

image

圖解:圖中的方框代表RDD,箭頭則表示一個轉(zhuǎn)換函數(shù)。

lines=spark.textFile("hdfs://...")errors=lines.filter(_.startsWith("ERROR"))errors.persist()

這三行代碼就能解決查詢所有錯誤日志的信息。具體展開說明下:

lines=spark.textFile("hdfs://...")

lines 是RDD,作用是從 hdfs 讀取日志;

errors=lines.filter(_.startsWith("ERROR"))

errors是另一個新RDD,用來存儲含有ERROR的錯誤日志;

errors.persist()

是將errors的數(shù)據(jù)固化在內(nèi)存中,以供之后程序反復(fù)使用。但此時,spark并未開始執(zhí)行。

若要執(zhí)行這個Spark程序,需要執(zhí)行一個動作函數(shù),比如:

errors.count()

這是在計(jì)算總共有多少次錯誤發(fā)生,此時Spark程序就執(zhí)行了。這就是典型的“惰計(jì)算”,Spark獨(dú)有的特性。

再舉個具體的例子:比如MySQL數(shù)據(jù)庫的錯誤日志,歸檔之后放在了HDFS上面,那么用Spark計(jì)算總數(shù)就簡單了:

errors.filter(_.contains("MySQL")).count()

除了count()這個總計(jì)動作函數(shù)外,還有很多動作函數(shù)也可以使得Spark程序立即運(yùn)行起來:

errors.filter(_.contains("HDFS")).map(_.split('')(3)).collect()

這是取了包含HDFS錯誤信息的第三個字段的值,并返回前臺。

當(dāng)Spark的第一個動作函數(shù)執(zhí)行時,lines,errors就相繼建立, lines因?yàn)闆]有將其他非錯信息剔除,所以數(shù)據(jù)量巨大,全部裝載到內(nèi)存里就容易溢出。但errors就不一樣了,因數(shù)據(jù)量小,適合暫留在內(nèi)存中,為后續(xù)的復(fù)用提供準(zhǔn)備。

最后,RDD是如何做到容錯的呢?在開始的簡易計(jì)算譜系圖中,每一步轉(zhuǎn)換操作都會被記錄下來。一旦errors RDD其中的一個分區(qū)丟失,重新按照這份譜系圖執(zhí)行一遍,相當(dāng)丟失分區(qū)的數(shù)據(jù)就回來了。

2.3 RDD模型的優(yōu)點(diǎn)

image


(圖1)

分布式共享內(nèi)存的概念

Distributed Shared Memory, 分布式共享內(nèi)存

https://en.wikipedia.org/wiki/Distributed_shared_memory

分布式共享內(nèi)存,最大的優(yōu)點(diǎn)在于寫一次,多機(jī)同步。集群中的所有計(jì)算機(jī)節(jié)點(diǎn),在同一內(nèi)存位置存儲了同一份數(shù)據(jù)。

弊端也很明顯,一旦數(shù)據(jù)損壞,所有數(shù)據(jù)都要重新還原或重做;同步導(dǎo)致的延遲會很高,因?yàn)橄到y(tǒng)要保障數(shù)據(jù)的完整性。這在分布式數(shù)據(jù)庫中常見。

RDD 與 DSM 的區(qū)別在于,前者是粗放式寫入,通過轉(zhuǎn)換函數(shù)生成,而后者在內(nèi)存任意位置均可寫入。 RDD不能很好地支持大批量寫入,卻可以很好的支持分區(qū)容錯。前面也說道,譜系圖是RDD容錯的利器,丟失分區(qū)可重生。

RDD的第二大優(yōu)勢在于,備份節(jié)點(diǎn)可以迅速的被喚起,去代替那些緩慢節(jié)點(diǎn)執(zhí)行任務(wù)。即在緩慢節(jié)點(diǎn)執(zhí)行任務(wù)的同時,備份節(jié)點(diǎn)同時也執(zhí)行相同的任務(wù),哪個節(jié)點(diǎn)快就用那個節(jié)點(diǎn)的結(jié)果。而DSM則會被備份節(jié)點(diǎn)干擾,引起大家同時緩慢,因?yàn)楣蚕韮?nèi)存之間會同步狀態(tài),互相干擾。

RDD的另外兩大優(yōu)點(diǎn),基于數(shù)據(jù)存儲分發(fā)任務(wù)溢出緩存至硬盤。在大量寫入的操作中,比如生成RDD,會選擇離數(shù)據(jù)最近的節(jié)點(diǎn)開始任務(wù)(如下圖所示);而在只讀操作中,大量數(shù)據(jù)沒發(fā)存入內(nèi)存時,會自動存到硬盤上而不是報(bào)錯停止執(zhí)行。

image


(圖2)

上圖所示的,便是驅(qū)動器程序(Driver)將計(jì)算任務(wù)分發(fā)到數(shù)據(jù)分區(qū)所在節(jié)點(diǎn),執(zhí)行轉(zhuǎn)換操作。多節(jié)點(diǎn)并行執(zhí)行一個巨大數(shù)據(jù)量的操作得以完成。

不適合使用RDD的場景

如前所述,RDD的最大優(yōu)點(diǎn)是,并行處理只讀數(shù)據(jù)。RDD之間有完整的血統(tǒng)關(guān)系,稱之為譜系圖。其中之一丟失后,可以憑借譜系圖恢復(fù)數(shù)據(jù)。但對于大量寫入的程序,比如爬蟲就不適合了。保障爬蟲數(shù)據(jù)的完整性,需要做及時的checkpoint,實(shí)現(xiàn)多重副本的建立。這種異步機(jī)制,只能靠傳統(tǒng)的日志型系統(tǒng)完成,比如數(shù)據(jù)庫, RAMCloud, Percolator, Piccolo.

3 Spark編程接口

Spark提供了Scala,一種類DryadLINQ的Java vm函數(shù)編程語言,用來封裝 RDD 的編程接口(Api). Scala有兩個好處,一是方便交互式操作;二是靜態(tài)類型的效率極高。

Scala 是靜態(tài)類型的語言,即在編譯時就已經(jīng)完成了數(shù)據(jù)類型的檢查,比起動態(tài)類型,是要提高不少效率

如圖2所示,Spark是由Driver程序啟動,分發(fā)任務(wù)到各節(jié)點(diǎn)上運(yùn)行,這些節(jié)點(diǎn)稱為worker程序,生成的RDD數(shù)據(jù)分區(qū)會在worker程序里面保存起來,直到程序結(jié)束。Driver還負(fù)責(zé)每個RDD分區(qū)的血統(tǒng)記錄,即每個RDD分區(qū)的父分區(qū)或者數(shù)據(jù)源是什么,以便丟失后恢復(fù)。

在Spark的編程接口里,有個很重要的特性是傳遞函數(shù)閉包(function closure).函數(shù)閉包被當(dāng)做變量可以傳遞到轉(zhuǎn)換函數(shù)或動作函數(shù)中去,而閉包中的變量,常量都可以被共享訪問。因此當(dāng)轉(zhuǎn)換函數(shù)與動作函數(shù)有閉包函數(shù)傳入時,事實(shí)上每個RDD分區(qū)都會接收到相同的一個閉包函數(shù)。

比如:

var?x?=?5;rdd.map(_?+?x)

就把 x 傳到了每個RDD分區(qū)的map函數(shù)中。

Scala是門靜態(tài)語言,RDD的元素類型需要首先定義好,但支持隱式轉(zhuǎn)換,比如RDD[Int]理論上需要存儲整型(int)元素,但事實(shí)上Int可以省卻,因?yàn)橐坏┐鎯梢噪[式轉(zhuǎn)換成int的字符串,也沒問題。

RDD及其操作非常簡單,但理解RDD的重點(diǎn)卻在于閉包函數(shù)。閉包函數(shù)在傳遞過程中,需要序列化,反射。這些都需要嚴(yán)肅處理。

3.1 RDD的操作

image

上圖給出的是Spark支持的轉(zhuǎn)換函數(shù)與動作函數(shù),方括號[]中的T代表元素類型。轉(zhuǎn)換函數(shù)用來生成RDD,而動作函數(shù)用來計(jì)算值或保存計(jì)算值到外部存儲。最大的特性是惰性執(zhí)行,即只有第一個動作函數(shù)的執(zhí)行,才會引起數(shù)據(jù)流真正的流動。

詳細(xì)解釋下這些函數(shù)。比如:

-Join: 必須兩個RDD都是鍵值對RDD;
-map:一對一匹配,輸入與輸出同數(shù)量,一條輸入產(chǎn)生一條輸出;
-flatMap:一對多匹配,輸入與輸出可不同數(shù)量,一條輸入產(chǎn)生多條輸出;
-groupByKey,reduceByKey,sort:自動產(chǎn)生一個哈希(hash)或范圍(range)分區(qū)

3.2 應(yīng)用一,邏輯回歸

很多機(jī)器學(xué)習(xí)的算法都采用了迭代處理,使得最終算法更加優(yōu)化。那么在迭代過程中,顯然能把之前的結(jié)果保留下來,重復(fù)使用,使得迭代時間更快。

比如,邏輯回歸,最常見的分類算法,用來計(jì)算最恰當(dāng)?shù)某矫娣指罹€(比如區(qū)分垃圾郵件)。算法使用了梯度下降,從隨機(jī)數(shù)開始,每一次迭代更優(yōu)化一次求值。

val?points?=?spark.textFile(...).map(parsePoint).persist()var?w?=?//random?initial?vectorfor(i?????p.x?*?(1/(1+exp(-p.y*(w?dot?p.x)))-1)*p.y????}.rduce((a,b)?=>?a+b)????w?-=gradient}

把 points 固化在內(nèi)存中,可以使得計(jì)算時間縮短 20倍左右。

3.2 應(yīng)用二,PageRank

PageRank是知名的網(wǎng)頁排名(網(wǎng)頁影響力)算法。一個網(wǎng)頁被指向的次數(shù)越多,在搜索引擎中的排名越高。除了計(jì)算網(wǎng)頁影響力之外,還可以用來計(jì)算社交網(wǎng)絡(luò)中的影響力。

在計(jì)算過程中,每一次迭代更新,增加的是被指向網(wǎng)頁的權(quán)重。每一個帶有出鏈的網(wǎng)頁,都將帶給其出鏈網(wǎng)頁r/n的貢獻(xiàn)值,這些貢獻(xiàn)值的總計(jì),就是出鏈網(wǎng)頁的排名。

a/N?+?(1-a)∑Ci

PageRank算法詳細(xì)解答,可看這里 https://www.cnblogs.com/jpcflyer/p/11180263.html

用Spark來計(jì)算PageRank,可以這么寫:

//?從源文件抽取RDD[URL,outlinks]val?links?=?spark.textFile(...).map(...).persist()var?ranks?=?//?RDD[URL,?rank]for(i?????????????links.map(dest?=>(dest,rank/links.size))????}????ranks?=?contribs.reduceByKey((x,y)?=>?x+y).mapValues(sum?=>?a/N?+?(1-a)*sum)}

下圖是對這段代碼的譜系圖,每一次的迭代都會重新計(jì)算并生成ranks RDD.

image

從圖中很明顯的可以看出,ranks RDD的數(shù)量隨著link的增加而長度變得越來越長,當(dāng) ranks RDD 有一次失效(丟失或者故障)時,重新計(jì)算會耗時很多。因此,需要將這些中間步驟的ranks RDD保存或者另存副本,執(zhí)行這個操作,可以使用 persist函數(shù)的 RELIABLE 開關(guān)。

計(jì)算中有一處Join,如果links, ranks的分區(qū)都在同一個節(jié)點(diǎn)上,那么計(jì)算并不需要通信節(jié)點(diǎn),假如不巧的是同一URL,links,ranks的分區(qū)卻在不同的分區(qū)上,那通信成本就高了。所以控制links,ranks的分區(qū)就很講究,盡量(使用相同分區(qū)方式,比如hash分區(qū))使得參加join的兩個分區(qū)都分配在同一個節(jié)點(diǎn)上。

控制分區(qū)的分配,也可以通過自定義分區(qū)類Partitioner,來完成:

links?=?spark.textFile(...).map(...).partitionBy(myPartFunc).persist()

如果源文件在分布式系統(tǒng)比如hdfs上的分區(qū),與 Spark 的分區(qū)不一致,在使用轉(zhuǎn)換函數(shù)前,一定會經(jīng)過混洗(shuffle),這是最大的耗時。

4 RDDs的表達(dá)手法

在長串的轉(zhuǎn)換函數(shù)鏈條中,抽象地表現(xiàn)RDD的譜系,是非常困難的。從完美的角度來講,一個實(shí)現(xiàn)了RDD的系統(tǒng),必須能提供一系列豐富的轉(zhuǎn)換函數(shù),而且還要讓用戶自由的重組這些函數(shù)。Spark提供了圖化的RDD表現(xiàn)形式,達(dá)到了這些目的。

總之,RDD的表現(xiàn)方式,在Spark中是常用接口,涵蓋了5個方面的信息:

分區(qū)集合:
每個分區(qū)是最小的原子單位;

父RDD依賴:
每個子分區(qū)都依賴父分區(qū);

轉(zhuǎn)換函數(shù):
每個父分區(qū)只有通過轉(zhuǎn)換函數(shù),才能生成子分區(qū);

分區(qū)形式和分區(qū)數(shù)據(jù)地址:
分區(qū)形式(partitioning schema),即分區(qū)標(biāo)準(zhǔn)。比如按照銷售區(qū)域(華東,華北,華西,華南,華中)分區(qū));分區(qū)數(shù)據(jù)地址(partition data placement),按照標(biāo)準(zhǔn)分好的區(qū),數(shù)據(jù)應(yīng)該保存到哪些節(jié)點(diǎn)上。比如以HDFS文件為數(shù)據(jù)源,并要以HDFS文件數(shù)據(jù)塊為分區(qū),那么Spark創(chuàng)建RDD的時候,會從當(dāng)前含有這些數(shù)據(jù)塊的節(jié)點(diǎn)上,直接創(chuàng)建RDD分區(qū)。倘若要在RDD上應(yīng)用轉(zhuǎn)換函數(shù),直接操作數(shù)據(jù)所在節(jié)點(diǎn)的本地內(nèi)存即可,無需通過網(wǎng)絡(luò)傳輸,非常高效。

image

partitions():
查詢分區(qū)集合包含的所有分區(qū);

preferredLocations(p):
根據(jù)數(shù)據(jù)歸屬地,查詢能迅速找到數(shù)據(jù)分區(qū)的所在節(jié)點(diǎn)地址;

dependencies():
查詢RDD的譜系圖;

iterator(p,parentIters):
基于給定的父RDD,查找對應(yīng)子分區(qū)所有對象;

partitioner():
確定分區(qū)方法是hash還是range分區(qū)

設(shè)計(jì)RDD接口的有趣之處,在于如何去表達(dá)依賴關(guān)系。最終,獲得認(rèn)可的有效方法是定義為兩類,一是窄依賴(narrow dependencies),二是寬依賴(wide dependencies) 。窄依賴是指父RDD頂多能產(chǎn)生一個子RDD,比如map;寬依賴指父RDD能產(chǎn)生多個子RDD,比如Join.

之所以這么區(qū)分寬窄依賴關(guān)系,有兩個原因:

1)窄依賴關(guān)系,使得父子分區(qū)可以在同一個節(jié)點(diǎn)上完成轉(zhuǎn)換,比如map,filter;而寬依賴關(guān)系,則需要所有上層分區(qū)都同時存在,且大概率是要從不同的數(shù)據(jù)分區(qū),抽取數(shù)據(jù)到一個分區(qū)或多個分區(qū)進(jìn)行計(jì)算,這個過程稱之為 shuffle, shuffle是 Spark 最具有破壞性能的操作。

2)故障恢復(fù):窄依賴的數(shù)據(jù)分區(qū)如果故障了,只要從上層的RDD分區(qū)重新生成,而且就在本地即可高效完成,就算是多個分區(qū)損壞,也可以并行完成恢復(fù);但寬依賴關(guān)系就需要多個RDD分區(qū)聯(lián)合執(zhí)行恢復(fù),不亞于重新執(zhí)行Spark程序。

image

最有意思的地方是Join操作。父RDD分區(qū)的方法決定了子RDD生成的方式,比如父RDD按照hash來分區(qū),Join的時候,就不需要shuffle了。

5 Spark系統(tǒng)實(shí)現(xiàn)

Spark是以Scala寫就的,總共有14000行代碼(初始化版本,現(xiàn)在不止)。Spark程序運(yùn)行在 Mesos 集群管理器上,但也可與 Hadoop, MAPI等做互連,利用Hadoop提供的輸入接口插件,讀取HDFS,HBase的數(shù)據(jù)。每個Spark程序作為一個單獨(dú)應(yīng)用運(yùn)行在Mesos上,程序間的交互由Mesos處理。一個完整的Spark程序由Driver和Worker組成,Driver是主程,用來協(xié)調(diào)和收集各個Worker的工作。

接下來,主要闡述系統(tǒng)調(diào)度器,交互式程序解釋器,內(nèi)存管理和checkpointing技術(shù)。

5.1 任務(wù)調(diào)度

image

總體來說,任務(wù)調(diào)度器(scheduler)按照 driver, workder 中的程序,在集群中分配任務(wù)。上圖是經(jīng)典的有向無環(huán)圖(DAG),每一步都是在生成一個新的RDD,只有第一個作用在RDD上的動作函數(shù)開始時,正式的數(shù)據(jù)流才開啟。圖中矩形框代表一個RDD,有背景色(不管藍(lán)黑)的矩形代表一個分區(qū),黑色代表該分區(qū)是持久化駐留在內(nèi)存中的。

持久化駐留,只在當(dāng)前程序中生效,一旦程序執(zhí)行完畢,還是銷毀,其他程序不能訪問。

任務(wù)調(diào)度器最有特點(diǎn)的功能在于它對數(shù)據(jù)歸屬非常敏感。如果程序需要的RDD分區(qū)數(shù)據(jù)在某臺節(jié)點(diǎn)的內(nèi)存里,任務(wù)就優(yōu)先分發(fā)到那臺節(jié)點(diǎn)上;如果集群中所有內(nèi)存都沒有需要的分區(qū)數(shù)據(jù),任務(wù)調(diào)取器則會根據(jù)RDD提供的優(yōu)選地址,將任務(wù)分配到那些節(jié)點(diǎn)上。

窄依賴的RDD譜系比較簡單,每次分區(qū)失效都可以高效重生,但寬依賴的RDD在恢復(fù)時就比較復(fù)雜,需要所有父RDD都存在,若父RDD也失效了,則需要更上層的RDD,依次類推,直到源RDD全部重生,才能恢復(fù)當(dāng)前RDD,程序才能進(jìn)行下去。所以寬依賴RDD通常會在產(chǎn)生時,將其所有父RDD都物化下來,以使得恢復(fù)時更快。

如果任務(wù)執(zhí)行失敗,原因有很多,內(nèi)存不夠,機(jī)器故障等等,任務(wù)調(diào)度器會安排另一臺節(jié)點(diǎn)來繼續(xù)執(zhí)行失敗的任務(wù),只要父RDD都還存在。若父RDD失效了,也沒關(guān)系,根據(jù)圖譜自動再生成這些父RDD即可。但若任務(wù)調(diào)度器失敗,則整個程序就是失敗,并不會重新自動跑起來。

目前Spark的程序設(shè)計(jì),都是在針對RDD的動作做響應(yīng)式啟動執(zhí)行,當(dāng)然另一種嘗試也是有意義的,那就是針對動作中涉及的RDD,一步步往前推,少了什么RDD,根據(jù)圖譜去生成。這種想法暫時還只是處于試驗(yàn)階段。

5.2 集成的解釋器(Interpreter Integration)

Spark計(jì)算框架允許用戶在Scala提供的解釋器窗口(與Python,Ruby類似的解釋器窗口),交互式的利用大數(shù)據(jù)集群提供的算力,查詢和操控大規(guī)模數(shù)據(jù)庫集。交互式操作,即一次運(yùn)算表達(dá)式,可以操作數(shù)千臺計(jì)算機(jī)的計(jì)算資源,并且得益于集群內(nèi)存計(jì)算模式,而非MapReduce借助硬盤的低效模式,以低延遲的方式得到該步計(jì)算的結(jié)果。

看以下簡單代碼,一窺Scala編程的不同:

var?x?=?5?;println(x);

每一行Scala代碼,會被解釋為單行類,執(zhí)行時,實(shí)際上運(yùn)行的便是這單行類的賦值或者函數(shù)調(diào)用。

因此上面這兩行代碼,可以解釋為:

println(Line1.getInstance().x)

Line1 就是將單行代碼抽象為一個類并實(shí)例化后的結(jié)果對象。

實(shí)際上,我覺得更確切的說,應(yīng)該是 Line2.getInstance().println(Line1.getInstance().x).但原論文并沒有這么解釋

最神秘的事情,并不是scala獨(dú)特的解釋器特性,而是Spark如何分發(fā)scala程序。就拿上面兩行代碼來說,Spark把這兩行代碼,分發(fā)到了1000臺計(jì)算機(jī)上,并行地跑了一次批處理,得到最終結(jié)果,且中間有任何機(jī)器故障,都沒有影響到程序的執(zhí)行和結(jié)果的正確。

因此,探索Spark如何完成這整個執(zhí)行過程就變得非常有意義。事實(shí)上,Spark解釋器就暗藏了答案:

1)類運(yùn)送(class shipping): 為了讓每個工作節(jié)點(diǎn)(workder node)都能得到可執(zhí)行代碼字節(jié)(bytecode),scala提供的解釋器,就負(fù)責(zé)為這些節(jié)點(diǎn)提供類運(yùn)送,且是通過http傳送的方式.

為什么 http 傳送方式在這里會被指定為傳送協(xié)議,值得思考!

2)改變代碼的產(chǎn)生方式(modified code generation):讓所有的工作節(jié)點(diǎn)(worker node)都得到相同的程序代碼,最大的問題是同時傳送閉包引用的上下文,包括閉包中引用的變量。如果變量是在閉包之前定義的,工作節(jié)點(diǎn)上的Java就無法定位閉包之前的變量。所以改變代碼的產(chǎn)生方式就解決了這一點(diǎn),也就是為什么每一行 Scala代碼要被解釋為當(dāng)行類,這行里定義的變量或方法,在閉包中引用時,會被追溯到變量或方法定義的單行類,從而這些單行類會被遺棄運(yùn)送到工作節(jié)點(diǎn)上。

在實(shí)際的業(yè)務(wù)應(yīng)用場景里,在交互式解釋器中查詢大規(guī)模數(shù)據(jù)集,比如從HDFS上分析日志文件,非常實(shí)用。后期加入的 Spark SQL 更是將 Spark 的分布式計(jì)算能力擴(kuò)大化到極致,普惠了每個數(shù)據(jù)分析師。

image

上面的示意圖,很好地解釋了單行類的同步運(yùn)送,對于工作節(jié)點(diǎn)的意義。當(dāng)閉包中引用了上行的變量,則需要將上行封裝成一個類實(shí)例,同時運(yùn)送到其他節(jié)點(diǎn)。

5.3 內(nèi)存管理

Spark 為 RDD 提供了三種存儲格式:

  • 內(nèi)存中反序列化的Java對象;
  • 內(nèi)存中序列化的Java對象;
  • 以及硬盤存儲
  • 訪問速度從快到慢,即第一種方式最快,無需任何轉(zhuǎn)換就可以被自由訪問。最后一種最慢,因每次使用,需從硬盤抽取數(shù)據(jù),有不必要的IO開銷

    當(dāng)內(nèi)存吃緊,新建的RDD分區(qū)沒有足夠內(nèi)存存儲時,Spark會采用回收分區(qū)方式,以給新分區(qū)提供空間。除非新的分區(qū)和要回收的老分區(qū)在同一個RDD。回收機(jī)制采用的是常規(guī)LRU(Least Recently Used)算法,即最近最少使用的算法。這套回收機(jī)制很有用,至少目前來說是。但分權(quán)機(jī)制也很有用,比如設(shè)定RDD的權(quán)限等級,控制RDD分區(qū)被回收的可能性。

    5.4 支持 checkpointing

    checkpointing的技術(shù)本質(zhì)是為長鏈操作尤其是依賴寬關(guān)系的計(jì)算做結(jié)果緩存。

    長鏈操作:經(jīng)由一系列轉(zhuǎn)換操作得來的RDD,在故障之后,恢復(fù)需要經(jīng)歷同樣多步驟,會導(dǎo)致時間過多的消耗,這就是長鏈操作。

    實(shí)現(xiàn)checkpointing的api是persist的replicate開關(guān),即:

    rdd.persist(REPLICATE)

    通過將數(shù)據(jù)暫存至穩(wěn)定的存儲設(shè)備,以防備RDD失效后的重算。

    checkpointing的決策是留給用戶的,但也可以做成自動化。在保障數(shù)據(jù)一致性角度看,自動在RDD創(chuàng)建成功后保留一份副本,不會引起數(shù)據(jù)不一致的尷尬,看起來是件一勞永逸的事情。為什么不這么做呢?我想這其中涉及的一個判斷是,是否有足夠的必要去消耗原本應(yīng)該留給其他Spark程序的資源,來保障僅有百萬分之一的可能會丟掉的分區(qū)。

    6 性能評估

    Spark 在性能方面的出眾,對標(biāo)物是Hadop,以下是基于 Amazon EC2做出的4相對比數(shù)據(jù):

    1)在圖運(yùn)算和迭代機(jī)器學(xué)習(xí)方面,優(yōu)先Hadoop 20倍速度。性能的提高得益于無需硬盤I/O,且在內(nèi)存中的Java對象計(jì)算,沒有序列化和反序列化的開銷

    2)性能與擴(kuò)展性都很好。單測一張分析報(bào)表,就比Hadoop提高了40倍性能

    3)當(dāng)有節(jié)點(diǎn)故障時,Spark能自動恢復(fù)已丟失的分區(qū)

    4)查詢1TB的數(shù)據(jù),延遲僅在5-7秒

    image

    7 一些討論

    學(xué)習(xí)一門技術(shù),就要徹底了解其歷史,知其應(yīng)用。從這些應(yīng)用著手,由點(diǎn)到面的知悉這門技術(shù)的優(yōu)勢。而不至于學(xué)得茫然而不知所措。

    7.1 囊括眾多集群編程模式

    當(dāng)年Spark發(fā)明的時候,市面上有很多獨(dú)立的軟件解決方案,來完成大規(guī)模數(shù)據(jù)應(yīng)用。這些獨(dú)立的解決方案僅僅是某類應(yīng)用中的佼佼者,換個場景,效果就沒那么突出了。Spark的出現(xiàn),統(tǒng)一了這些獨(dú)立的軟件解決方案,使得用戶只需Spark一個框架,即可完成原本需要4-5個獨(dú)立解決方案才能解決的問題。

    因此,首先就要討論Spark出現(xiàn)之前,市面上有哪些應(yīng)用:

    1) MapReduce
    2) DryadLINQ
    3) SQL
    4) Pregel
    5) Iterative MapReduce
    6) Batched Stream Processing

    這些應(yīng)用就不再過多闡述了,Spark 將他們集成起來,提供方便的api供使用,原本這些技術(shù)的細(xì)節(jié)就不用深究了。

    ***7.2 RDD調(diào)試 ***

    在分區(qū)故障時,如何快速恢復(fù)是個痛點(diǎn)。依賴RDD的譜系圖,可以保障分區(qū)故障后的數(shù)據(jù)一致性。記錄RDD的譜系圖,對于程序的健壯性變得非常重要。與先前的分布式系統(tǒng)調(diào)試器,最大的優(yōu)勢在于,不需要記錄每個事件在不同節(jié)點(diǎn)上的執(zhí)行順序。

    8 其他相關(guān)工作進(jìn)展

    集群編程模式:在Spark出現(xiàn)之前,大規(guī)模利用集群計(jì)算資源處理數(shù)據(jù)應(yīng)用已經(jīng)有成熟的方案了,比如MapReduce,Dryad和Ciel. 這些方案靠的是移動硬盤數(shù)據(jù)來實(shí)現(xiàn)分布式進(jìn)程之間的數(shù)據(jù)共享。Spark出現(xiàn)之后,數(shù)據(jù)共享有了新的突破,雖然穩(wěn)定的存儲依舊可以使用,但更多利用了高效的存儲,實(shí)現(xiàn)了無盤(不需要借助硬盤)計(jì)算,之前借盤運(yùn)算的開銷,比如序列化,反序列化和刻錄副本都可以去掉。

    第二種高級編程語言的集群編程模式,就像 DryadLINQ 和 FlumeJava, 提供了語言集成的編程接口(API),用戶需要調(diào)用集群處理大規(guī)模數(shù)據(jù)時,只要使用這些高級語言提供的編程接口,比如map, join 即可。這些系統(tǒng)唯一的缺點(diǎn)在于,他們無法把數(shù)據(jù)高效方便地共享到下一個查詢中去,只能在同一個查詢中,比如map接著一個map中,共享數(shù)據(jù)流。Spark 實(shí)現(xiàn)的 RDD,借用了同樣的編程語言集成接口,僅僅是完成一次分布式數(shù)據(jù)的抽象,就完美的實(shí)現(xiàn)了在多個查詢中共享數(shù)據(jù)流。

    第三種集群編程模式,采用的是特殊高級接口定制,采用這種定制支持特定的應(yīng)用,比如圖運(yùn)算和迭代計(jì)算。Pregel 系統(tǒng)支持迭代圖計(jì)算,而 Twister 和 HaLoop 則是迭代的MapReduce計(jì)算運(yùn)行時刻庫。他們都不支持通用計(jì)算,比如建立數(shù)據(jù)集,裝載到內(nèi)存中,使用任何方式去查詢這份數(shù)據(jù)集。而Spark使用的是分布式數(shù)據(jù)抽象,基于抽象做出靈活的操作標(biāo)準(zhǔn),因此類似及時分析這樣的操作,完全受到Spark的支持。

    最后,有些分布式系統(tǒng),比如Piccolo, 分布式共享內(nèi)存(DSM)系統(tǒng)和鍵值對系統(tǒng)都采取的是共享可變狀態(tài)集。用戶既可以讀也可以寫入這些共享內(nèi)存。由于系統(tǒng)狀態(tài)可變,可被更新,只有依靠checkpoint技術(shù)才能保障數(shù)據(jù)完整性,一致性,因此開銷會比Spark多很多。

    緩存系統(tǒng):Nectar 系統(tǒng)可以在任意的 DryadLINQ應(yīng)用程序之間共享中間數(shù)據(jù)集,實(shí)現(xiàn)的方法是將數(shù)據(jù)集輸出到穩(wěn)定的存儲設(shè)備上,而不是內(nèi)存。并且Nectar也不允許用戶傾倒指定的分區(qū),連分區(qū)方法也不受用戶控制。Ciel和FlumeJava提供結(jié)果緩存,但不支持用戶自定義緩存內(nèi)容。

    譜系圖: 在科學(xué)計(jì)算和數(shù)據(jù)庫領(lǐng)域,譜系圖或源數(shù)據(jù)管理一直是重點(diǎn)研究對象。一旦數(shù)據(jù)丟失,從從源頭開始重新計(jì)算是最慢的一項(xiàng)恢復(fù)操作,如果自動修復(fù)能從丟失的上一級開始追溯,那是最快的。很多系統(tǒng)能保障斷點(diǎn)恢復(fù),但所用的措施卻是耗時耗資源最多的構(gòu)建副本方法。而譜系圖在單個MapReduce任務(wù)之后,被丟失的無影無蹤。

    關(guān)系型數(shù)據(jù)庫: 在數(shù)據(jù)庫中,視圖就像是RDD,物化視圖就像是持久化的RDD,但數(shù)據(jù)庫在更新這些對象時,都需要做日志登記的操作,有些類似構(gòu)建副本的方法,開銷巨大。

    總結(jié)

    以上是生活随笔為你收集整理的spark入门_入门必读 | Spark 论文导读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。