Apache Spark 的设计与实现(job逻辑执行图)
生活随笔
收集整理的這篇文章主要介紹了
Apache Spark 的设计与实现(job逻辑执行图)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
2019獨角獸企業(yè)重金招聘Python工程師標準>>>
Job 邏輯執(zhí)行圖
General logical plan
典型的 Job 邏輯執(zhí)行圖如上所示,經(jīng)過下面四個步驟可以得到最終執(zhí)行結(jié)果:- 從數(shù)據(jù)源(可以是本地 file,內(nèi)存數(shù)據(jù)結(jié)構(gòu), HDFS,HBase 等)讀取數(shù)據(jù)創(chuàng)建最初的 RDD。上一章例子中的 parallelize() 相當于 createRDD()。
- 對 RDD 進行一系列的 transformation() 操作,每一個 transformation() 會產(chǎn)生一個或多個包含不同類型 T 的 RDD[T]。T 可以是 Scala 里面的基本類型或數(shù)據(jù)結(jié)構(gòu),不限于 (K, V)。但如果是 (K, V),K 不能是 Array 等復雜類型(因為難以在復雜類型上定義 partition 函數(shù))。
- 對最后的 final RDD 進行 action() 操作,每個 partition 計算后產(chǎn)生結(jié)果 result。
- 將 result 回送到 driver 端,進行最后的 f(list[result]) 計算。例子中的 count() 實際包含了action() 和 sum() 兩步計算。
邏輯執(zhí)行圖的生成
了解了 Job 的邏輯執(zhí)行圖后,寫程序時候會在腦中形成類似上面的數(shù)據(jù)依賴圖。然而,實際生成的 RDD 個數(shù)往往比我們想想的個數(shù)多。 要解決邏輯執(zhí)行圖生成問題,實際需要解決:- 如何產(chǎn)生 RDD,應(yīng)該產(chǎn)生哪些 RDD?
- 如何建立 RDD 之間的依賴關(guān)系?
1. 如何產(chǎn)生 RDD,應(yīng)該產(chǎn)生哪些 RDD?
解決這個問題的初步想法是讓每一個 transformation() 方法返回(new)一個 RDD。事實也基本如此,只是某些 transformation() 比較復雜,會包含多個子 transformation(),因而會生成多個 RDD。這就是 實際 RDD 個數(shù)比我們想象的多一些 的原因。 如何計算每個 RDD 中的數(shù)據(jù)?邏輯執(zhí)行圖實際上是 computing chain,那么 transformation() 的計算邏輯在哪里被 perform?每個 RDD 里有 compute() 方法,負責接收來自上一個 RDD 或者數(shù)據(jù)源的 input records,perform transformation() 的計算邏輯,然后輸出 records。 產(chǎn)生哪些 RDD 與 transformation() 的計算邏輯有關(guān),下面討論一些典型的 transformation() 及其創(chuàng)建的 RDD。官網(wǎng)上已經(jīng)解釋了每個 transformation 的含義。iterator(split) 的意思是 foreach record in the partition。這里空了很多,是因為那些 transformation() 較為復雜,會產(chǎn)生多個 RDD,具體會在下一節(jié)圖示出來。| map(func) | MappedRDD | iterator(split).map(f) | |
| filter(func) | FilteredRDD | iterator(split).filter(f) | |
| flatMap(func) | FlatMappedRDD | iterator(split).flatMap(f) | |
| mapPartitions(func) | MapPartitionsRDD | f(iterator(split)) | |
| mapPartitionsWithIndex(func) | MapPartitionsRDD | f(split.index, iterator(split)) | |
| sample(withReplacement, fraction, seed) | PartitionwiseSampledRDD | PoissonSampler.sample(iterator(split)) BernoulliSampler.sample(iterator(split)) | |
| pipe(command, [envVars]) | PipedRDD | ||
| union(otherDataset) | |||
| intersection(otherDataset) | |||
| distinct([numTasks])) | |||
| groupByKey([numTasks]) | |||
| reduceByKey(func, [numTasks]) | |||
| sortByKey([ascending], [numTasks]) | |||
| join(otherDataset, [numTasks]) | |||
| cogroup(otherDataset, [numTasks]) | |||
| cartesian(otherDataset) | |||
| coalesce(numPartitions) | |||
| repartition(numPartitions) |
2. 如何建立 RDD 之間的聯(lián)系?
RDD 之間的數(shù)據(jù)依賴問題實際包括三部分:- RDD 本身的依賴關(guān)系。要生成的 RDD(以后用 RDD x 表示)是依賴一個 parent RDD,還是多個 parent RDDs?
- RDD x 中會有多少個 partition ?
- RDD x 與其 parent RDDs 中 partition 之間是什么依賴關(guān)系?是依賴 parent RDD 中一個還是多個 partition?
- 第一種 1:1 的情況被稱為 OneToOneDependency。
- 第二種 N:1 的情況被稱為 N:1 NarrowDependency。
- 第三種 N:N 的情況被稱為 N:N NarrowDependency。不屬于前兩種情況的完全依賴都屬于這個類別。
- 第四種被稱為 ShuffleDependency。
- NarrowDependency (使用黑色實線或黑色虛線箭頭表示)
- OneToOneDependency (1:1)
- NarrowDependency (N:1)
- NarrowDependency (N:N)
- RangeDependency (只在 UnionRDD 中使用)
- OneToOneDependency (1:1)
- ShuffleDependency (使用紅色箭頭表示)
3. 給出一些典型的 transformation() 的計算過程及數(shù)據(jù)依賴圖
1) union(otherRDD) union() 將兩個 RDD 簡單合并在一起,不改變 partition 里面的數(shù)據(jù)。RangeDependency 實際上也是 1:1,只是為了訪問 union() 后的 RDD 中的 partition 方便,保留了原始 RDD 的 range 邊界。 2) groupByKey(numPartitions) 上一章已經(jīng)介紹了 groupByKey 的數(shù)據(jù)依賴,這里算是 溫故而知新 吧。 groupByKey() 只需要將 Key 相同的 records 聚合在一起,一個簡單的 shuffle 過程就可以完成。ShuffledRDD 中的 compute() 只負責將屬于每個 partition 的數(shù)據(jù) fetch 過來,之后使用 mapPartitions() 操作(前面的 OneToOneDependency 展示過)進行 aggregate,生成 MapPartitionsRDD,到這里 groupByKey() 已經(jīng)結(jié)束。最后為了統(tǒng)一返回值接口,將 value 中的 ArrayBuffer[] 數(shù)據(jù)結(jié)構(gòu)抽象化成 Iterable[]。 groupByKey() 沒有在 map 端進行 combine,因為 map 端 combine 只會省掉 partition 里面重復 key 占用的空間,當重復 key 特別多時,可以考慮開啟 combine。 這里的 ArrayBuffer 實際上應(yīng)該是 CompactBuffer,An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers. ParallelCollectionRDD 是最基礎(chǔ)的 RDD,直接從 local 數(shù)據(jù)結(jié)構(gòu) create 出的 RDD 屬于這個類型,比如 val pairs = sc.parallelize(List(1, 2, 3, 4, 5), 3) ? 生成的 pairs 就是 ParallelCollectionRDD。 2) reduceyByKey(func, numPartitions) reduceyByKey() 相當于傳統(tǒng)的 MapReduce,整個數(shù)據(jù)流也與 Hadoop 中的數(shù)據(jù)流基本一樣。reduceyByKey() 默認在 map 端開啟 combine(),因此在 shuffle 之前先通過 mapPartitions 操作進行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再進行 reduce(通過 aggregate + mapPartitions() 操作來實現(xiàn))得到 MapPartitionsRDD。 3) distinct(numPartitions) distinct() 功能是 deduplicate RDD 中的所有的重復數(shù)據(jù)。由于重復數(shù)據(jù)可能分散在不同的 partition 里面,因此需要 shuffle 來進行 aggregate 后再去重。然而,shuffle 要求數(shù)據(jù)類型是 <K, V>。如果原始數(shù)據(jù)只有 Key(比如例子中 record 只有一個整數(shù)),那么需要補充成 <K, null>。這個補充過程由 map() 操作完成,生成 MappedRDD。然后調(diào)用上面的 reduceByKey() 來進行 shuffle,在 map 端進行 combine,然后 reduce 進一步去重,生成 MapPartitionsRDD。最后,將 <K, null> 還原成 K,仍然由 map() 完成,生成 MappedRDD。藍色的部分就是調(diào)用的 reduceByKey()。 4) cogroup(otherRDD, numPartitions) 與 groupByKey() 不同,cogroup() 要 aggregate 兩個或兩個以上的 RDD。 那么 CoGroupedRDD 與 RDD a 和 RDD b 的關(guān)系都必須是 ShuffleDependency 么?是否存在 OneToOneDependency? 首先要明確的是 CoGroupedRDD 存在幾個 partition 可以由用戶直接設(shè)定,與 RDD a 和 RDD b 無關(guān)。然而,如果 CoGroupedRDD 中 partition 個數(shù)與 RDD a/b 中的 partition 個數(shù)不一樣,那么不可能存在 1:1 的關(guān)系。 再次,cogroup() 的計算結(jié)果放在 CoGroupedRDD 中哪個 partition 是由用戶設(shè)置的 partitioner 確定的(默認是 HashPartitioner)。那么可以推出:即使 RDD a/b 中的 partition 個數(shù)與 CoGroupedRDD 中的一樣,如果 RDD a/b 中的 partitioner 與 CoGroupedRDD 中的不一樣,也不可能存在 1:1 的關(guān)系。比如,在上圖的 example 里面,RDD a 是 RangePartitioner,b 是 HashPartitioner,CoGroupedRDD 也是 RangePartitioner 且 partition 個數(shù)與 a 的相同。那么很自然地,a 中的每個 partition 中 records 可以直接送到 CoGroupedRDD 中對應(yīng)的 partition。RDD b 中的 records 必須再次進行劃分與 shuffle 后才能進入對應(yīng)的 partition。 最后,經(jīng)過上面分析, 對于兩個或兩個以上的 RDD 聚合,當且僅當聚合后的 RDD 中 partitioner 類別及 partition 個數(shù)與前面的 RDD 都相同,才會與前面的 RDD 構(gòu)成 1:1 的關(guān)系。否則,只能是 ShuffleDependency。這個算法對應(yīng)的代碼可以在 CoGroupedRDD.getDependencies() 中找到,雖然比較難理解。 Spark 代碼中如何表示 CoGroupedRDD 中的 partition 依賴于多個 parent RDDs 中的 partitions? 首先,將 CoGroupedRDD 依賴的所有 RDD 放進數(shù)組 rdds[RDD] 中。再次,foreach i,如果 CoGroupedRDD 和 rdds(i) 對應(yīng)的 RDD 是 OneToOneDependency 關(guān)系,那么 Dependecy[i] = new OneToOneDependency(rdd),否則 = new ShuffleDependency(rdd)。最后,返回與每個 parent RDD 的依賴關(guān)系數(shù)組 deps[Dependency]。 Dependency 類中的 getParents(partition id) 負責給出某個 partition 按照該 dependency 所依賴的 parent RDD 中的 partitions: List[Int]。 getPartitions() 負責給出 RDD 中有多少個 partition,以及每個 partition 如何序列化。 5) intersection(otherRDD) intersection() 功能是抽取出 RDD a 和 RDD b 中的公共數(shù)據(jù)。先使用 map() 將 RDD[T] 轉(zhuǎn)變成 RDD[(T, null)],這里的 T 只要不是 Array 等集合類型即可。接著,進行 a.cogroup(b),藍色部分與前面的 cogroup() 一樣。之后再使用 filter() 過濾掉 [iter(groupA()), iter(groupB())] 中 groupA 或 groupB 為空的 records,得到 FilteredRDD。最后,使用 keys() 只保留 key 即可,得到 MappedRDD。 6) join(otherRDD, numPartitions) join() 將兩個 RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一起。與 intersection() 類似,首先進行 cogroup(),得到 <K, (Iterable[V1], Iterable[V2])>類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,并將集合 flat() 化。 這里給出了兩個 example,第一個 example 的 RDD 1 和 RDD 2 使用 RangePartitioner 劃分,而 CoGroupedRDD 使用 HashPartitioner,與 RDD 1/2 都不一樣,因此是 ShuffleDependency。第二個 example 中, RDD 1 事先使用 HashPartitioner 對其 key 進行劃分,得到三個 partition,與 CoGroupedRDD 使用的 HashPartitioner(3) 一致,因此數(shù)據(jù)依賴是 1:1。如果 RDD 2 事先也使用 HashPartitioner 對其 key 進行劃分,得到三個 partition,那么 join() 就不存在 ShuffleDependency 了,這個 join() 也就變成了 hashjoin()。 7) sortByKey(ascending, numPartitions) sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的數(shù)據(jù)依賴很簡單,先使用 shuffle 將 records 聚集在一起(放到對應(yīng)的 partition 里面),然后將 partition 內(nèi)的所有 records 按 key 排序,最后得到的 MapPartitionsRDD 中的 records 就有序了。 目前 sortByKey() 先使用 Array 來保存 partition 中所有的 records,再排序。 8) cartesian(otherRDD) Cartesian 對兩個 RDD 做笛卡爾集,生成的 CartesianRDD 中 partition 個數(shù) = partitionNum(RDD a) * partitionNum(RDD b)。 這里的依賴關(guān)系與前面的不太一樣,CartesianRDD 中每個partition 依賴兩個 parent RDD,而且其中每個 partition 完全依賴 RDD a 中一個 partition,同時又完全依賴 RDD b 中另一個 partition。這里沒有紅色箭頭,因為所有依賴都是 NarrowDependency。 CartesianRDD.getDependencies() 返回 rdds[RDD a, RDD b]。CartesianRDD 中的 partiton i 依賴于 (RDD a).List(i / numPartitionsInRDDb) 和 (RDD b).List(i % numPartitionsInRDDb)。 9) coalesce(numPartitions, shuffle = false) coalesce() 可以將 parent RDD 的 partition 個數(shù)進行調(diào)整,比如從 5 個減少到 3 個,或者從 5 個增加到 10 個。需要注意的是當 shuffle = false 的時候,是不能增加 partition 個數(shù)的(不能從 5 個變?yōu)?10 個)。 coalesce() 的核心問題是 如何確立 CoalescedRDD 中 partition 和其 parent RDD 中 partition 的關(guān)系。- coalesce(shuffle = false) 時,由于不能進行 shuffle,問題變?yōu)?parent RDD 中哪些partition 可以合并在一起。合并因素除了要考慮 partition 中元素個數(shù)外,還要考慮 locality 及 balance 的問題。因此,Spark 設(shè)計了一個非常復雜的算法來解決該問題(算法部分我還沒有深究)。注意Example: a.coalesce(3, shuffle = false)展示了 N:1 的 NarrowDependency。
- coalesce(shuffle = true) 時,由于可以進行 shuffle,問題變?yōu)槿绾螌?RDD 中所有 records 平均劃分到 N 個 partition 中。很簡單,在每個 partition 中,給每個 record 附加一個 key,key 遞增,這樣經(jīng)過 hash(key) 后,key 可以被平均分配到不同的 partition 中,類似 Round-robin 算法。在第二個例子中,RDD a 中的每個元素,先被加上了遞增的 key(如 MapPartitionsRDD 第二個 partition 中 (1, 3) 中的 1)。在每個 partition 中,第一個元素 (Key, Value) 中的 key 由 (new Random(index)).nextInt(numPartitions) 計算得到,index 是該 partition 的索引,numPartitions 是 CoalescedRDD 中的 partition 個數(shù)。接下來元素的 key 是遞增的,然后 shuffle 后的 ShuffledRDD 可以得到均分的 records,然后經(jīng)過復雜算法來建立 ShuffledRDD 和 CoalescedRDD 之間的數(shù)據(jù)聯(lián)系,最后過濾掉 key,得到 coalesce 后的結(jié)果 MappedRDD。
Primitive transformation()
combineByKey() 分析了這么多 RDD 的邏輯執(zhí)行圖,它們之間有沒有共同之處?如果有,是怎么被設(shè)計和實現(xiàn)的? 仔細分析 RDD 的邏輯執(zhí)行圖會發(fā)現(xiàn),ShuffleDependency 左邊的 RDD 中的 record 要求是 <key, value> 型的,經(jīng)過 ShuffleDependency 后,包含相同 key 的 records 會被 aggregate 到一起,然后在 aggregated 的 records 上執(zhí)行不同的計算邏輯。實際執(zhí)行時(后面的章節(jié)會具體談到)很多 transformation() 如 groupByKey(),reduceByKey() 是邊 aggregate 數(shù)據(jù)邊執(zhí)行計算邏輯的,因此共同之處就是 aggregate 同時 compute()。Spark 使用 combineByKey() 來實現(xiàn)這個 aggregate + compute() 的基礎(chǔ)操作。 combineByKey() 的定義如下: def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)] ? 其中主要有三個參數(shù) createCombiner,mergeValue 和 mergeCombiners。簡單解釋下這三個函數(shù)及 combineByKey() 的意義,注意它們的類型: 假設(shè)一組具有相同 K 的 <K, V> records 正在一個個流向 combineByKey(),createCombiner 將第一個 record 的 value 初始化為 c (比如,c = value),然后從第二個 record 開始,來一個 record 就使用 mergeValue(c, record.value) 來更新 c,比如想要對這些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到 records 全部被 mergeValue(),得到結(jié)果 c。假設(shè)還有一組 records(key 與前面那組的 key 均相同)一個個到來,combineByKey() 使用前面的方法不斷計算得到 c'。現(xiàn)在如果要求這兩組 records 總的 combineByKey() 后的結(jié)果,那么可以使用 final c = mergeCombiners(c, c') 來計算。Discussion
至此,我們討論了如何生成 job 的邏輯執(zhí)行圖,這些圖也是 Spark 看似簡單的 API 背后的復雜計算邏輯及數(shù)據(jù)依賴關(guān)系。 整個 job 會產(chǎn)生哪些 RDD 由 transformation() 語義決定。一些 transformation(), 比如 cogroup() 會被很多其他操作用到。 RDD 本身的依賴關(guān)系由 transformation() 生成的每一個 RDD 本身語義決定。如 CoGroupedRDD 依賴于所有參加 cogroup() 的 RDDs。 RDD 中 partition 依賴關(guān)系分為 NarrowDependency 和 ShuffleDependency。前者是完全依賴,后者是部分依賴。NarrowDependency 里面又包含多種情況,只有前后兩個 RDD 的 partition 個數(shù)以及 partitioner 都一樣,才會出現(xiàn) NarrowDependency。 從數(shù)據(jù)處理邏輯的角度來看,MapReduce 相當于 Spark 中的 map() + reduceByKey(),但嚴格來講 MapReduce 中的 reduce() 要比 reduceByKey() 的功能強大些,詳細差別會在 Shuffle details 一章中繼續(xù)討論。轉(zhuǎn)載于:https://my.oschina.net/u/559635/blog/749768
總結(jié)
以上是生活随笔為你收集整理的Apache Spark 的设计与实现(job逻辑执行图)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: wupdmgr.exe是什么
- 下一篇: 使用COSBench工具对ceph s3