漫谈分布式计算框架
如果問(wèn) mapreduce 和 spark 什么關(guān)系,或者說(shuō)有什么共同屬性,你可能會(huì)回答他們都是大數(shù)據(jù)處理引擎。如果問(wèn) spark 與 tensorflow 呢,就可能有點(diǎn)迷糊,這倆關(guān)注的領(lǐng)域不太一樣啊。但是再問(wèn) spark 與 MPI 呢?這個(gè)就更遠(yuǎn)了。雖然這樣問(wèn)多少有些不嚴(yán)謹(jǐn),但是它們都有共同的一部分,這就是我們今天談?wù)摰囊粋€(gè)話題,一個(gè)比較大的話題:分布式計(jì)算框架。
不管是 mapreduce,還是 spark 亦或 tensorflow,它們都是利用分布式的能力,運(yùn)行某些計(jì)算,解決一些特定的問(wèn)題。從這個(gè) level 講,它們都定義了一種“分布式計(jì)算模型”,即提出了一種計(jì)算的方法,通過(guò)這種計(jì)算方法,就能夠解決大量數(shù)據(jù)的分布式計(jì)算問(wèn)題。它們的區(qū)別在于提出的分布式計(jì)算模型不同。Mapreduce 正如其名,是一個(gè)很基本的 map-reduce 式的計(jì)算模型(好像沒(méi)說(shuō)一樣)。Spark 定義了一套 RDD 模型,本質(zhì)上是一系列的 map/reduce 組成的一個(gè) DAG 圖。Tensorflow 的計(jì)算模型也是一張圖,但是 tensorflow 的圖比起 spark 來(lái),顯得更“復(fù)雜”一點(diǎn)。你需要為圖中的每個(gè)節(jié)點(diǎn)和邊作出定義。根據(jù)這些定義,可以指導(dǎo) tensorflow 如何計(jì)算這張圖。Tensorflow 的這種具體化的定義使它比較適合處理特定類型的的計(jì)算,對(duì) tensorflow 來(lái)講就是神經(jīng)網(wǎng)絡(luò)。而 spark 的 RDD 模型使它比較適合那種沒(méi)有相互關(guān)聯(lián)的的數(shù)據(jù)并行任務(wù)。那么有沒(méi)有一種通用的、簡(jiǎn)單的、性能還高的分布式計(jì)算模型?我覺(jué)著挺難。通用往往意味著性能不能針對(duì)具體情形作出優(yōu)化。而為專門任務(wù)寫的分布式任務(wù)又做不到通用,當(dāng)然也做不到簡(jiǎn)單。
插一句題外話,分布式計(jì)算模型有一塊伴隨的內(nèi)容,就是調(diào)度。雖然不怎么受關(guān)注,但這是分布式計(jì)算引擎必備的東西。mapreduce 的調(diào)度是 yarn,spark 的調(diào)度有自己內(nèi)嵌的調(diào)度器,tensorflow 也一樣。MPI 呢?它的調(diào)度就是幾乎沒(méi)有調(diào)度,一切假設(shè)集群有資源,靠 ssh 把所有任務(wù)拉起來(lái)。調(diào)度實(shí)際上應(yīng)當(dāng)分為資源調(diào)度器和任務(wù)調(diào)度器。前者用于向一些資源管理者申請(qǐng)一些硬件資源,后者用于將計(jì)算圖中的任務(wù)下發(fā)到這些遠(yuǎn)程資源進(jìn)行計(jì)算,其實(shí)也就是所謂的兩階段調(diào)度。近年來(lái)有一些 TensorflowOnSpark 之類的項(xiàng)目。這類項(xiàng)目的本質(zhì)實(shí)際上是用 spark 的資源調(diào)度,加上 tensorflow 的計(jì)算模型。
當(dāng)我們寫完一個(gè)單機(jī)程序,而面臨數(shù)據(jù)量上的問(wèn)題的時(shí)候,一個(gè)自然的想法就是,我能不能讓它運(yùn)行在分布式的環(huán)境中?如果能夠不加改動(dòng)或稍加改動(dòng)就能讓它分布式化,那就太好了。當(dāng)然現(xiàn)實(shí)是比較殘酷的。通常情況下,對(duì)于一個(gè)一般性的程序,用戶需要自己手動(dòng)編寫它的分布式版本,利用比如 MPI 之類的框架,自己控制數(shù)據(jù)的分發(fā)、匯總,自己對(duì)任務(wù)的失敗做容災(zāi)(通常沒(méi)有容災(zāi))。如果要處理的目標(biāo)是恰好是對(duì)一批數(shù)據(jù)進(jìn)行批量化處理,那么 可以用 mapreduce 或者 spark 預(yù)定義的 api。對(duì)于這一類任務(wù),計(jì)算框架已經(jīng)幫我們把業(yè)務(wù)之外的部分(腳手架代碼)做好了。同樣的,如果我們的任務(wù)是訓(xùn)練一個(gè)神經(jīng)網(wǎng)絡(luò),那么用 tensorflow pytorch 之類的框架就好了。這段話的意思是,如果你要處理的問(wèn)題已經(jīng)有了對(duì)應(yīng)框架,那么拿來(lái)用就好了。但是如果沒(méi)有呢?除了自己實(shí)現(xiàn)之外有沒(méi)有什么別的辦法呢?
今天注意到一個(gè)項(xiàng)目,Ray,聲稱你只需要稍微修改一下你的代碼,就能讓它變?yōu)榉植际降?#xff08;實(shí)際上這個(gè)項(xiàng)目早就發(fā)布了,只是一直沒(méi)有刻意關(guān)注它)。當(dāng)然這個(gè)代碼僅局限于 python,比如下面這個(gè)例子,
| **Basic Python** | **Distributed with Ray** | +------------------------------------------------+----------------------------------------------------+ | | | | # Execute f serially. | # Execute f in parallel. | | | | | | @ray.remote | | def f(): | def f(): | | time.sleep(1) | time.sleep(1) | | return 1 | return 1 | | | | | | | | | ray.init() | | results = [f() for i in range(4)] | results = ray.get([f.remote() for i in range(4)]) | +------------------------------------------------+----------------------------------------------------+這么簡(jiǎn)單?這樣筆者想到了 openmp(注意不是 openmpi)。來(lái)看看,
#include<iostream> #include"omp.h"using namespace std;void main() { #pragma omp parallel forfor(int i = 0; i < 10; ++i) {cout << "Test" << endl;}system("pause"); }把頭文件導(dǎo)入,添加一行預(yù)處理指令就可以了,這段代碼立馬變?yōu)椴⑿袌?zhí)行。當(dāng)然 openmp 不是分布式,只是借助編譯器將代碼中需要并行化的部分編譯為多線程運(yùn)行,本身還是一個(gè)進(jìn)程,因此其并行度收到 CPU 線程數(shù)量所限。如果 CPU 是雙線程,那只能 2 倍加速。在一些服務(wù)器上,CPU 可以是單核 32 線程,自然能夠享受到 32 倍加速(被并行化的部分)。不過(guò)這些都不重要,在用戶看來(lái),Ray 的這個(gè)做法和 openmp 是不是有幾分相似之處?你不需要做過(guò)多的代碼改動(dòng),就能將代碼變?yōu)榉植际綀?zhí)行(當(dāng)然 openmp 要更絕一點(diǎn),因?yàn)閷?duì)于不支持 openmp 的編譯器它就是一行注釋而已)。
那么 Ray 是怎么做到這一點(diǎn)的呢?其實(shí) Ray 的做法說(shuō)起來(lái)也比較簡(jiǎn)單,就是定義了一些 API,類似于 MPI 中的定義的通信原語(yǔ)。使用的時(shí)候,將這些 API “注入”到代碼合適的位置,那么代碼就變成了用戶代碼夾雜著一些 Ray 框架層的 API 調(diào)用,整個(gè)代碼實(shí)際上就形成了一張計(jì)算圖。接下來(lái)的事情就是等待 Ray 把這張計(jì)算圖完成返回就好了。Ray 的論文給了個(gè)例子:
@ray.remote def create_policy():# Initialize the policy randomly.return policy @ray.remote(num_gpus=1) class Simulator(object):def __init__(self):# Initialize the environment.self.env = Environment()def rollout(self, policy, num_steps):observations = []observation = self.env.current_state()for _ in range(num_steps):action = policy(observation)observation = self.env.step(action)observations.append(observation)return observations @ray.remote(num_gpus=2) def update_policy(policy, *rollouts):# Update the policy.return policy @ray.remote def train_policy():# Create a policy.policy_id = create_policy.remote()# Create 10 actors.simulators = [Simulator.remote() for _ in range(10)]# Do 100 steps of training.for _ in range(100):# Perform one rollout on each actor.rollout_ids = [s.rollout.remote(policy_id)for s in simulators]# Update the policy with the rollouts.policy_id = update_policy.remote(policy_id, *rollout_ids)return ray.get(policy_id)生成的計(jì)算圖為
所以,用戶要做的事情,就是在自己的代碼里加入適當(dāng)?shù)?Ray API 調(diào)用,然后自己的代碼就實(shí)際上變成了一張分布式計(jì)算圖了。作為對(duì)比,我們?cè)賮?lái)看看 tensorflow 對(duì)圖的定義,
import tensorflow as tf # 創(chuàng)建數(shù)據(jù)流圖:y = W * x + b,其中W和b為存儲(chǔ)節(jié)點(diǎn),x為數(shù)據(jù)節(jié)點(diǎn)。 x = tf.placeholder(tf.float32) W = tf.Variable(1.0) b = tf.Variable(1.0) y = W * x + b with tf.Session() as sess:tf.global_variables_initializer().run() # Operation.runfetch = y.eval(feed_dict={x: 3.0}) # Tensor.evalprint(fetch) # fetch = 1.0 * 3.0 + 1.0 ''' 輸出: 4.0 '''可以看出,tensorflow 中是自己需要自己顯式的、明確的定義出圖的節(jié)點(diǎn),placeholder Variable 等等(這些都是圖節(jié)點(diǎn)的具體類型),而 Ray 中圖是以一種隱式的方式定義的。我認(rèn)為后者是一種更自然的方式,站在開(kāi)發(fā)者的角度看問(wèn)題,而前者更像是為了使用 tensorflow 把自己代碼邏輯去適配這個(gè)輪子。
那么 ray 是不是就我們要尋找的那個(gè)即通用、又簡(jiǎn)單、還靈活的分布式計(jì)算框架呢?由于筆者沒(méi)有太多的 ray 的使用經(jīng)驗(yàn),這個(gè)問(wèn)題不太好說(shuō)。從官方介紹來(lái)看,有限的幾個(gè) API 確實(shí)是足夠簡(jiǎn)單的。僅靠這幾個(gè) API 能不能達(dá)成通用且靈活的目的還不好講。本質(zhì)上來(lái)說(shuō),Tensorflow 對(duì)圖的定義也足夠 General,但是它并不是一個(gè)通用的分布式計(jì)算框架。由于某些問(wèn)題不在于框架,而在于問(wèn)題本身的分布式化就存在困難,所以試圖尋求一種通用分布式計(jì)算框架解決單機(jī)問(wèn)題可能是個(gè)偽命題。
話扯遠(yuǎn)了。假設(shè) ray 能夠讓我們以一種比較容易的方式分布式地執(zhí)行程序,那么會(huì)怎么樣呢?前不久 Databricks 開(kāi)源了一個(gè)新項(xiàng)目,Koalas,試圖以 RDD 的框架并行化 pandas。由于 pandas 的場(chǎng)景是數(shù)據(jù)分析,和 spark 面對(duì)的場(chǎng)景類似,兩者的底層存儲(chǔ)結(jié)構(gòu)、概念也是很相似的,因此用 RDD 來(lái)分布式化 pandas 也是可行的。我想,如果 ray 足夠簡(jiǎn)單好用,在 pandas 里加一些 ray 的 api 調(diào)用花費(fèi)的時(shí)間精力可能會(huì)遠(yuǎn)遠(yuǎn)小于開(kāi)發(fā)一套 koalas。但是在 pandas 里加 ray 就把 pandas 綁定到了 ray 上,即便單機(jī)也是這樣,因?yàn)?ray 做不到像 openmp 那樣如果支持,很好,不支持也不影響代碼運(yùn)行。
啰嗦這么多,其實(shí)就想從這么多引擎的細(xì)節(jié)中跳出來(lái),思考一下到底什么是分布式計(jì)算框架,每種框架又是設(shè)計(jì)的,解決什么問(wèn)題,有什么優(yōu)缺點(diǎn)。最后拿大佬的一個(gè)觀點(diǎn)結(jié)束本文。David Patterson 在演講 “New Golden Age For Computer Architecture” 中提到,通用硬件越來(lái)越逼近極限,要想要達(dá)到更高的效率,我們需要設(shè)計(jì)面向領(lǐng)域的架構(gòu)(Domain Specific Architectures)。這是一個(gè)計(jì)算架構(gòu)層出不窮的時(shí)代,每種架構(gòu)都是為了解決其面對(duì)的領(lǐng)域問(wèn)題出現(xiàn)的,必然包含對(duì)其問(wèn)題的特殊優(yōu)化。通用性不是用戶解決問(wèn)題的出發(fā)點(diǎn),而更多的是框架設(shè)計(jì)者的“一廂情愿”,用戶關(guān)注的永遠(yuǎn)是領(lǐng)域問(wèn)題。從這個(gè)意義上講,面向領(lǐng)域的計(jì)算架構(gòu)應(yīng)該才是正確的方向。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
- 上一篇: 如何回答性能优化的问题,才能打动阿里面试
- 下一篇: 运维编排场景系列----给实例加到SLS