日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python分布式框架_高性能分布式执行框架——Ray

發(fā)布時(shí)間:2023/12/10 python 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python分布式框架_高性能分布式执行框架——Ray 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Ray是UC Berkeley RISELab新推出的高性能分布式執(zhí)行框架,它使用了和傳統(tǒng)分布式計(jì)算系統(tǒng)不一樣的架構(gòu)和對(duì)分布式計(jì)算的抽象方式,具有比Spark更優(yōu)異的計(jì)算性能。

Ray目前還處于實(shí)驗(yàn)室階段,最新版本為0.2.2版本。雖然Ray自稱是面向AI應(yīng)用的分布式計(jì)算框架,但是它的架構(gòu)具有通用的分布式計(jì)算抽象。本文對(duì)Ray進(jìn)行簡(jiǎn)單的介紹,幫助大家更快地了解Ray是什么,如有描述不當(dāng)?shù)牡胤?#xff0c;歡迎不吝指正。

一、簡(jiǎn)單開(kāi)始

首先來(lái)看一下最簡(jiǎn)單的Ray程序是如何編寫的。

# 導(dǎo)入ray,并初始化執(zhí)行環(huán)境

import ray

ray.init()

# 定義ray remote函數(shù)

@ray.remote

def hello():

return "Hello world !"

# 異步執(zhí)行remote函數(shù),返回結(jié)果id

object_id = hello.remote()

# 同步獲取計(jì)算結(jié)果

hello = ray.get(object_id)

# 輸出計(jì)算結(jié)果

print hello

在Ray里,通過(guò)Python注解@ray.remote定義remote函數(shù)。使用此注解聲明的函數(shù)都會(huì)自帶一個(gè)默認(rèn)的方法remote,通過(guò)此方法發(fā)起的函數(shù)調(diào)用都是以提交分布式任務(wù)的方式異步執(zhí)行的,函數(shù)的返回值是一個(gè)對(duì)象id,使用ray.get內(nèi)置操作可以同步獲取該id對(duì)應(yīng)的對(duì)象。熟悉Java里的Future機(jī)制的話對(duì)此應(yīng)該并不陌生,或許會(huì)有人疑惑這和普通的異步函數(shù)調(diào)用沒(méi)什么大的區(qū)別,但是這里最大的差異是,函數(shù)hello是分布式異步執(zhí)行的。

remote函數(shù)是Ray分布式計(jì)算抽象中的核心概念,通過(guò)它開(kāi)發(fā)者擁有了動(dòng)態(tài)定制計(jì)算依賴(任務(wù)DAG)的能力。比如:

@ray.remote

def A():

return "A"

@ray.remote

def B():

return "B"

@ray.remote

def C(a, b):

return "C"

a_id = A.remote()

b_id = B.remote()

c_id = C.remote(a_id, b_id)

print ray.get(c_id)

例子代碼中,對(duì)函數(shù)A、B的調(diào)用是完全并行執(zhí)行的,但是對(duì)函數(shù)C的調(diào)用依賴于A、B函數(shù)的返回結(jié)果。Ray可以保證函數(shù)C需要等待A、B函數(shù)的結(jié)果真正計(jì)算出來(lái)后才會(huì)執(zhí)行。如果將函數(shù)A、B、C類比為DAG的節(jié)點(diǎn)的話,那么DAG的邊就是函數(shù)C參數(shù)對(duì)函數(shù)A、B計(jì)算結(jié)果的依賴,自由的函數(shù)調(diào)用方式允許Ray可以自由地定制DAG的結(jié)構(gòu)和計(jì)算依賴關(guān)系。另外,提及一點(diǎn)的是Python的函數(shù)可以定義函數(shù)具有多個(gè)返回值,這也使得Python的函數(shù)更天然具備了DAG節(jié)點(diǎn)多入和多出的特點(diǎn)。

二、系統(tǒng)架構(gòu)

Ray是使用什么樣的架構(gòu)對(duì)分布式計(jì)算做出如上抽象的呢,一下給出了Ray的系統(tǒng)架構(gòu)(來(lái)自Ray論文,參考文獻(xiàn)1)。

作為分布式計(jì)算系統(tǒng),Ray仍舊遵循了典型的Master-Slave的設(shè)計(jì):Master負(fù)責(zé)全局協(xié)調(diào)和狀態(tài)維護(hù),Slave執(zhí)行分布式計(jì)算任務(wù)。不過(guò)和傳統(tǒng)的分布式計(jì)算系統(tǒng)不同的是,Ray使用了混合任務(wù)調(diào)度的思路。在集群部署模式下,Ray啟動(dòng)了以下關(guān)鍵組件:

GlobalScheduler:Master上啟動(dòng)了一個(gè)全局調(diào)度器,用于接收本地調(diào)度器提交的任務(wù),并將任務(wù)分發(fā)給合適的本地任務(wù)調(diào)度器執(zhí)行。

RedisServer:Master上啟動(dòng)了一到多個(gè)RedisServer用于保存分布式任務(wù)的狀態(tài)信息(ControlState),包括對(duì)象機(jī)器的映射、任務(wù)描述、任務(wù)debug信息等。

LocalScheduler:每個(gè)Slave上啟動(dòng)了一個(gè)本地調(diào)度器,用于提交任務(wù)到全局調(diào)度器,以及分配任務(wù)給當(dāng)前機(jī)器的Worker進(jìn)程。

Worker:每個(gè)Slave上可以啟動(dòng)多個(gè)Worker進(jìn)程執(zhí)行分布式任務(wù),并將計(jì)算結(jié)果存儲(chǔ)到ObjectStore。

ObjectStore:每個(gè)Slave上啟動(dòng)了一個(gè)ObjectStore存儲(chǔ)只讀數(shù)據(jù)對(duì)象,Worker可以通過(guò)共享內(nèi)存的方式訪問(wèn)這些對(duì)象數(shù)據(jù),這樣可以有效地減少內(nèi)存拷貝和對(duì)象序列化成本。ObjectStore底層由Apache Arrow實(shí)現(xiàn)。

Plasma:每個(gè)Slave上的ObjectStore都由一個(gè)名為Plasma的對(duì)象管理器進(jìn)行管理,它可以在Worker訪問(wèn)本地ObjectStore上不存在的遠(yuǎn)程數(shù)據(jù)對(duì)象時(shí),主動(dòng)拉取其它Slave上的對(duì)象數(shù)據(jù)到當(dāng)前機(jī)器。

需要說(shuō)明的是,Ray的論文中提及,全局調(diào)度器可以啟動(dòng)一到多個(gè),而目前Ray的實(shí)現(xiàn)文檔里討論的內(nèi)容都是基于一個(gè)全局調(diào)度器的情況。我猜測(cè)可能是Ray尚在建設(shè)中,一些機(jī)制還未完善,后續(xù)讀者可以留意此處的細(xì)節(jié)變化。

Ray的任務(wù)也是通過(guò)類似Spark中Driver的概念的方式進(jìn)行提交的,有所不同的是:

Spark的Driver提交的是任務(wù)DAG,一旦提交則不可更改。

而Ray提交的是更細(xì)粒度的remote function,任務(wù)DAG依賴關(guān)系由函數(shù)依賴關(guān)系自由定制。

論文給出的架構(gòu)圖里并未畫出Driver的概念,因此我在其基礎(chǔ)上做了一些修改和擴(kuò)充。

Ray的Driver節(jié)點(diǎn)和和Slave節(jié)點(diǎn)啟動(dòng)的組件幾乎相同,不過(guò)卻有以下區(qū)別:

Driver上的工作進(jìn)程DriverProcess一般只有一個(gè),即用戶啟動(dòng)的PythonShell。Slave可以根據(jù)需要?jiǎng)?chuàng)建多個(gè)WorkerProcess。

Driver只能提交任務(wù),卻不能接收來(lái)自全局調(diào)度器分配的任務(wù)。Slave可以提交任務(wù),也可以接收全局調(diào)度器分配的任務(wù)。

Driver可以主動(dòng)繞過(guò)全局調(diào)度器給Slave發(fā)送Actor調(diào)用任務(wù)(此處設(shè)計(jì)是否合理尚不討論)。Slave只能接收全局調(diào)度器分配的計(jì)算任務(wù)。

三、核心操作

基于以上架構(gòu),我們簡(jiǎn)單討論一下Ray中關(guān)鍵的操作和流程。

1. ray.init()

在PythonShell中,使用ray.init()可以在本地啟動(dòng)ray,包括Driver、HeadNode(Master)和若干Slave。

import ray

ray.init()

如果是直連已有的Ray集群,只需要指定RedisServer的地址即可。

ray.init(redis_address="")

本地啟動(dòng)Ray得到的輸出如下:

>>> ray.init()

Waiting for redis server at 127.0.0.1:58807 to respond...

Waiting for redis server at 127.0.0.1:23148 to respond...

Allowing the Plasma store to use up to 13.7439GB of memory.

Starting object store with directory /tmp and huge page support disabled

Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================

View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5

======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}

>>>

本地啟動(dòng)Ray時(shí),可以看到Ray的WebUI的訪問(wèn)地址。

2. ray.put()

使用ray.put()可以將Python對(duì)象存入本地ObjectStore,并且異步返回一個(gè)唯一的ObjectID。通過(guò)該ID,Ray可以訪問(wèn)集群中任一個(gè)節(jié)點(diǎn)上的對(duì)象(遠(yuǎn)程對(duì)象通過(guò)查閱Master的對(duì)象表獲得)。

對(duì)象一旦存入ObjectStore便不可更改,Ray的remote函數(shù)可以將直接將該對(duì)象的ID作為參數(shù)傳入。使用ObjectID作為remote函數(shù)參數(shù),可以有效地減少函數(shù)參數(shù)的寫ObjectStore的次數(shù)。

@ray.remote

def f(x):

pass

x = "hello"

# 對(duì)象x往ObjectStore拷貝里10次

[f.remote(x) for _ in range(10)]

# 對(duì)象x僅往ObjectStore拷貝1次

x_id = ray.put(x)

[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()可以通過(guò)ObjectID獲取ObjectStore內(nèi)的對(duì)象并將之轉(zhuǎn)換為Python對(duì)象。對(duì)于數(shù)組類型的對(duì)象,Ray使用共享內(nèi)存機(jī)制減少數(shù)據(jù)的拷貝成本。而對(duì)于其它對(duì)象則需要將數(shù)據(jù)從ObjectStore拷貝到進(jìn)程的堆內(nèi)存中。

如果調(diào)用ray.get()操作時(shí),對(duì)象尚未創(chuàng)建好,則get操作會(huì)阻塞,直到對(duì)象創(chuàng)建完成后返回。get操作的關(guān)鍵流程如下:

Driver或者Worker進(jìn)程首先到ObjectStore內(nèi)請(qǐng)求ObjectID對(duì)應(yīng)的對(duì)象數(shù)據(jù)。

如果本地ObjectStore沒(méi)有對(duì)應(yīng)的對(duì)象數(shù)據(jù),本地對(duì)象管理器Plasma會(huì)檢查Master上的對(duì)象表查看對(duì)象是否存儲(chǔ)其它節(jié)點(diǎn)的ObjectStore。

如果對(duì)象數(shù)據(jù)在其它節(jié)點(diǎn)的ObjectStore內(nèi),Plasma會(huì)發(fā)送網(wǎng)絡(luò)請(qǐng)求將對(duì)象數(shù)據(jù)拉到本地ObjectStore。

如果對(duì)象數(shù)據(jù)還沒(méi)有創(chuàng)建好,Master會(huì)在對(duì)象創(chuàng)建完成后通知請(qǐng)求的Plasma讀取。

如果對(duì)象數(shù)據(jù)已經(jīng)被所有的ObjectStore移除(被LRU策略刪除),本地調(diào)度器會(huì)根據(jù)任務(wù)血緣關(guān)系執(zhí)行對(duì)象的重新創(chuàng)建工作。

一旦對(duì)象數(shù)據(jù)在本地ObjectStore可用,Driver或者Worker進(jìn)程會(huì)通過(guò)共享內(nèi)存的方式直接將對(duì)象內(nèi)存區(qū)域映射到自己的進(jìn)程地址空間中,并反序列化為Python對(duì)象。

另外,ray.get()可以一次性讀取多個(gè)對(duì)象的數(shù)據(jù):

result_ids = [ray.put(i) for i in range(10)]

ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

4. @ray.remote

Ray中使用注解@ray.remote可以聲明一個(gè)remote function。remote函數(shù)時(shí)Ray的基本任務(wù)調(diào)度單元,remote函數(shù)定義后會(huì)立即被序列化存儲(chǔ)到RedisServer中,并且分配了一個(gè)唯一的ID,這樣就保證了集群的所有節(jié)點(diǎn)都可以看到這個(gè)函數(shù)的定義。

不過(guò),這樣對(duì)remote函數(shù)定義有了一個(gè)潛在的要求,即remote函數(shù)內(nèi)如果調(diào)用了其它的用戶函數(shù),則必須提前定義,否則remote函數(shù)無(wú)法找到對(duì)應(yīng)的函數(shù)定義內(nèi)容。

remote函數(shù)內(nèi)也可以調(diào)用其它的remote函數(shù),Driver和Slave每次調(diào)用remote函數(shù)時(shí),其實(shí)都是向集群提交了一個(gè)計(jì)算任務(wù),從這里也可以看到Ray的分布式計(jì)算的自由性。

Ray中調(diào)用remote函數(shù)的關(guān)鍵流程如下:

調(diào)用remote函數(shù)時(shí),首先會(huì)創(chuàng)建一個(gè)任務(wù)對(duì)象,它包含了函數(shù)的ID、參數(shù)的ID或者值(Python的基本對(duì)象直接傳值,復(fù)雜對(duì)象會(huì)先通過(guò)ray.put()操作存入ObjectStore然后返回ObjectID)、函數(shù)返回值對(duì)象的ID。

任務(wù)對(duì)象被發(fā)送到本地調(diào)度器。

本地調(diào)度器決定任務(wù)對(duì)象是在本地調(diào)度還是發(fā)送給全局調(diào)度器。如果任務(wù)對(duì)象的依賴(參數(shù))在本地的ObejctStore已經(jīng)存在且本地的CPU和GPU計(jì)算資源充足,那么本地調(diào)度器將任務(wù)分配給本地的WorkerProcess執(zhí)行。否則,任務(wù)對(duì)象被發(fā)送給全局調(diào)度器并存儲(chǔ)到任務(wù)表(TaskTable)中,全局調(diào)度器根據(jù)當(dāng)前的任務(wù)狀態(tài)信息決定將任務(wù)發(fā)給集群中的某一個(gè)本地調(diào)度器。

本地調(diào)度器收到任務(wù)對(duì)象后(來(lái)自本地的任務(wù)或者全局調(diào)度分配的任務(wù)),會(huì)將其放入一個(gè)任務(wù)隊(duì)列中,等待計(jì)算資源和本地依賴滿足后分配給WorkerProcess執(zhí)行。

Worker收到任務(wù)對(duì)象后執(zhí)行該任務(wù),并將函數(shù)返回值存入ObjectStore,并更新Master的對(duì)象表(ObjectTable)信息。

@ray.remote注解有一個(gè)參數(shù)num_return_vals用于聲明remote函數(shù)的返回值個(gè)數(shù),基于此實(shí)現(xiàn)remote函數(shù)的多返回值機(jī)制。

@ray.remote(num_return_vals=2)

def f():

return 1, 2

x_id, y_id = f.remote()

ray.get(x_id) # 1

ray.get(y_id) # 2

@ray.remote注解的另一個(gè)參數(shù)num_gpus可以為任務(wù)指定GPU的資源。使用內(nèi)置函數(shù)ray.get_gpu_ids()可以獲取當(dāng)前任務(wù)可以使用的GPU信息。

@ray.remote(num_gpus=1)

def gpu_method():

return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())

5. ray.wait()

ray.wait()操作支持批量的任務(wù)等待,基于此可以實(shí)現(xiàn)一次性獲取多個(gè)ObjectID對(duì)應(yīng)的數(shù)據(jù)。

# 啟動(dòng)5個(gè)remote函數(shù)調(diào)用任務(wù)

results = [f.remote(i) for i in range(5)]

# 阻塞等待4個(gè)任務(wù)完成,超時(shí)時(shí)間為2.5s

ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包含了5個(gè)ObjectID,使用ray.wait操作可以一直等待有4個(gè)任務(wù)完成后返回,并將完成的數(shù)據(jù)對(duì)象放在第一個(gè)list類型返回值內(nèi),未完成的ObjectID放在第二個(gè)list返回值內(nèi)。如果設(shè)置了超時(shí)時(shí)間,那么在超時(shí)時(shí)間結(jié)束后仍未等到預(yù)期的返回值個(gè)數(shù),則已超時(shí)完成時(shí)的返回值為準(zhǔn)。

6. ray.error_info()

使用ray.error_info()可以獲取任務(wù)執(zhí)行時(shí)產(chǎn)生的錯(cuò)誤信息。

>>> import time

>>> @ray.remote

>>> def f():

>>> time.sleep(5)

>>> raise Exception("This task failed!!")

>>> f.remote()

Remote function __main__.f failed with:

Traceback (most recent call last):

File "", line 4, in f

Exception: This task failed!!

You can inspect errors by running

ray.error_info()

If this driver is hanging, start a new one with

ray.init(redis_address="127.0.0.1:65452")

>>> ray.error_info()

[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n File "", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

7. Actor

Ray的remote函數(shù)只能處理無(wú)狀態(tài)的計(jì)算需求,有狀態(tài)的計(jì)算需求需要使用Ray的Actor實(shí)現(xiàn)。在Python的class定義前使用@ray.remote可以聲明Actor。

@ray.remote

class Counter(object):

def __init__(self):

self.value = 0

def increment(self):

self.value += 1

return self.value

使用如下方式創(chuàng)建Actor對(duì)象。

a1 = Counter.remote()

a2 = Counter.remote()

Ray創(chuàng)建Actor的流程為:

Master選取一個(gè)Slave,并將Actor創(chuàng)建任務(wù)分發(fā)給它的本地調(diào)度器。

創(chuàng)建Actor對(duì)象,并執(zhí)行它的構(gòu)造函數(shù)。

從流程可以看出,Actor對(duì)象的創(chuàng)建時(shí)并行的。

通過(guò)調(diào)用Actor對(duì)象的方法使用Actor。

a1.increment.remote() # ray.get returns 1

a2.increment.remote() # ray.get returns 1

調(diào)用Actor對(duì)象的方法的流程為:

首先創(chuàng)建一個(gè)任務(wù)。

該任務(wù)被Driver直接分配到創(chuàng)建該Actor對(duì)應(yīng)的本地執(zhí)行器執(zhí)行,這個(gè)操作繞開(kāi)了全局調(diào)度器(Worker是否也可以使用Actor直接分配任務(wù)尚存疑問(wèn))。

返回Actor方法調(diào)用結(jié)果的ObjectID。

為了保證Actor狀態(tài)的一致性,對(duì)同一個(gè)Actor的方法調(diào)用是串行執(zhí)行的。

四、安裝Ray

如果只是使用Ray,可以使用如下命令直接安裝。

pip intall ray

如果需要編譯Ray的最新源碼進(jìn)行安裝,按照如下步驟進(jìn)行(MaxOS):

# 更新編譯依賴包

brew update

brew install cmake pkg-config automake autoconf libtool boost wget

pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six

# 下載源碼編譯安裝

git clone https://github.com/ray-project/ray.git

cd ray/python

python setup.py install

# 測(cè)試

python test/runtest.py

# 安裝WebUI需要的庫(kù)[可選]

pip install jupyter ipywidgets bokeh

# 編譯Ray文檔[可選]

cd ray/doc

pip install -r requirements-doc.txt

make html

open _build/html/index.html

我在MacOS上安裝jupyter時(shí),遇到了Python的setuptools庫(kù)無(wú)法升級(jí)的情況,原因是MacOS的安全性設(shè)置問(wèn)題,可以使用如下方式解決:

重啟電腦,啟動(dòng)時(shí)按住Command+R進(jìn)入Mac保護(hù)模式。

打開(kāi)命令行,輸入命令csrutils disable關(guān)閉系統(tǒng)安全策略。

重啟電腦,繼續(xù)安裝jupyter。

安裝完成后,重復(fù)如上的方式執(zhí)行csrutils enable,再次重啟即可。

進(jìn)入PythonShell,輸入代碼本地啟動(dòng)Ray:

import ray

ray.init()

瀏覽器內(nèi)打開(kāi)WebUI界面如下:

參考資料

總結(jié)

以上是生活随笔為你收集整理的python分布式框架_高性能分布式执行框架——Ray的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。