日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

用 Mars Remote API 轻松分布式执行 Python 函数

發(fā)布時間:2024/8/23 python 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 用 Mars Remote API 轻松分布式执行 Python 函数 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Mars 是一個并行和分布式 Python 框架,能輕松把單機大家耳熟能詳?shù)牡?numpy、pandas、scikit-learn 等庫,以及 Python 函數(shù)利用多核或者多機加速。這其中,并行和分布式 Python 函數(shù)主要利用 Mars Remote API。

啟動 Mars 分布式環(huán)境可以參考:

  • 命令行方式在集群中部署。
  • Kubernetes 中部署。
  • MaxCompute 開箱即用的環(huán)境,購買了 MaxCompute 服務(wù)的可以直接使用。
  • 如何使用 Mars Remote API

    使用 Mars Remote API 非常簡單,只需要對原有的代碼做少許改動,就可以分布式執(zhí)行。

    拿用蒙特卡洛方法計算 π 為例。代碼如下,我們編寫了兩個函數(shù),calc_chunk?用來計算每個分片內(nèi)落在圓內(nèi)的點的個數(shù),calc_pi?用來把多個分片?calc_chunk?計算的結(jié)果匯總最后得出 π 值。

    from typing import List import numpy as npdef calc_chunk(n: int, i: int):# 計算n個隨機點(x和y軸落在-1到1之間)到原點距離小于1的點的個數(shù)rs = np.random.RandomState(i)a = rs.uniform(-1, 1, size=(n, 2))d = np.linalg.norm(a, axis=1)return (d < 1).sum()def calc_pi(fs: List[int], N: int):# 將若干次 calc_chunk 計算的結(jié)果匯總,計算 pi 的值return sum(fs) * 4 / NN = 200_000_000 n = 10_000_000fs = [calc_chunk(n, i)for i in range(N // n)] pi = calc_pi(fs, N) print(pi)

    %%time?下可以看到結(jié)果:

    3.1416312 CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s Wall time: 12.3 s

    在單機需要 12.3 s。

    要讓這個計算使用 Mars Remote API 并行起來,我們不需要對函數(shù)做任何改動,需要變動的僅僅是最后部分。

    import mars.remote as mr# 函數(shù)調(diào)用改成 mars.remote.spawn fs = [mr.spawn(calc_chunk, args=(n, i))for i in range(N // n)] # 把 spawn 的列表傳入作為參數(shù),再 spawn 新的函數(shù) pi = mr.spawn(calc_pi, args=(fs, N)) # 通過 execute() 觸發(fā)執(zhí)行,fetch() 獲取結(jié)果 print(pi.execute().fetch())

    %%time?下看到結(jié)果:

    3.1416312 CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms Wall time: 2.85 s

    結(jié)果一模一樣,但是卻有數(shù)倍的性能提升。

    可以看到,對已有的 Python 代碼,Mars remote API 幾乎不需要做多少改動,就能有效并行和分布式來加速執(zhí)行過程。

    一個例子

    為了讓讀者理解 Mars Remote API 的作用,我們從另一個例子開始。現(xiàn)在我們有一個數(shù)據(jù)集,我們希望對它們做一個分類任務(wù)。要做分類,我們有很多算法和庫可以選擇,這里我們用 RandomForest、LogisticRegression,以及 XGBoost。

    困難的地方是,除了有多個模型選擇,這些模型也會包含多個超參,那哪個超參效果最好呢?對于調(diào)參不那么有經(jīng)驗的同學,跑過了才知道。所以,我們希望能生成一堆可選的超參,然后把他們都跑一遍,看看效果。

    準備數(shù)據(jù)

    這個例子里我們使用?otto 數(shù)據(jù)集。

    首先,我們準備數(shù)據(jù)。讀取數(shù)據(jù)后,我們按 2:1 的比例把數(shù)據(jù)分成訓練集和測試集。

    import pandas as pd from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import train_test_splitdef gen_data():df = pd.read_csv('otto/train.csv')X = df.drop(['target', 'id'], axis=1)y = df['target']label_encoder = LabelEncoder()label_encoder.fit(y)y = label_encoder.transform(y)return train_test_split(X, y, test_size=0.33, random_state=123)X_train, X_test, y_train, y_test = gen_data()

    模型

    接著,我們使用 scikit-learn 的 RandomForest 和 LogisticRegression 來處理分類。

    RandomForest:

    from sklearn.ensemble import RandomForestClassifierdef random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False,**kw):model = RandomForestClassifier(verbose=verbose, **kw)model.fit(X_train, y_train)return model

    接著,我們生成供 RandomForest 使用的超參,我們用 yield 的方式來迭代返回。

    def gen_random_forest_parameters():for n_estimators in [50, 100, 600]:for max_depth in [None, 3, 15]:for criterion in ['gini', 'entropy']:yield {'n_estimators': n_estimators,'max_depth': max_depth,'criterion': criterion}

    LogisticRegression 也是這個過程。我們先定義模型。

    from sklearn.linear_model import LogisticRegressiondef logistic_regression(X_train: pd.DataFrame,y_train: pd.Series,verbose: bool = False,**kw):model = LogisticRegression(verbose=verbose, **kw)model.fit(X_train, y_train)return model

    接著生成供 LogisticRegression 使用的超參。

    def gen_lr_parameters():for penalty in ['l2', 'none']:for tol in [0.1, 0.01, 1e-4]:yield {'penalty': penalty,'tol': tol}

    XGBoost 也是一樣,我們用?XGBClassifier?來執(zhí)行分類任務(wù)。

    from xgboost import XGBClassifierdef xgb(X_train: pd.DataFrame,y_train: pd.Series,verbose: bool = False,**kw):model = XGBClassifier(verbosity=int(verbose), **kw)model.fit(X_train, y_train)return model

    生成一系列超參。

    def gen_xgb_parameters():for n_estimators in [100, 600]:for criterion in ['gini', 'entropy']:for learning_rate in [0.001, 0.1, 0.5]:yield {'n_estimators': n_estimators,'criterion': criterion,'learning_rate': learning_rate}

    驗證

    接著我們編寫驗證邏輯,這里我們使用?log_loss?來作為評價函數(shù)。

    from sklearn.metrics import log_lossdef metric_model(model, X_test: pd.DataFrame,y_test: pd.Series) -> float:if isinstance(model, bytes):model = pickle.loads(model)y_pred = model.predict_proba(X_test)return log_loss(y_test, y_pred)def train_and_metric(train_func,train_params: dict,X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series,verbose: bool = False):# 把訓練和驗證封裝到一起model = train_func(X_train, y_train, verbose=verbose, **train_params)metric = metric_model(model, X_test, y_test)return model, metric

    找出最好的模型

    做好準備工作后,我們就開始來跑模型了。針對每個模型,我們把每次生成的超參們送進去訓練,除了這些超參,我們還把?n_jobs?設(shè)成 -1,這樣能更好利用單機的多核。

    results = []# ------------- # Random Forest # -------------for params in gen_random_forest_parameters():print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(random_forest, params,X_train, y_train,X_test, y_test)print(f'metric: {metric}')results.append({'model': model, 'metric': metric})# ------------------- # Logistic Regression # -------------------for params in gen_lr_parameters():print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(logistic_regression, params,X_train, y_train,X_test, y_test)print(f'metric: {metric}')results.append({'model': model, 'metric': metric})# ------- # XGBoost # -------for params in gen_xgb_parameters():print(f'calculating on {params}')# fixed random_stateparams['random_state'] = 123# use all CPU coresparams['n_jobs'] = -1model, metric = train_and_metric(xgb, params,X_train, y_train,X_test, y_test)print(f'metric: {metric}')results.append({'model': model, 'metric': metric})

    運行一下,需要相當長時間,我們省略掉一部分輸出內(nèi)容。

    calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'} metric: 0.6964123781828575 calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'} metric: 0.6912312790832288 # 省略其他模型的輸出結(jié)果 CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s Wall time: 31min 44s

    從 CPU 時間和 Wall 時間,能看出來這些訓練還是充分利用了多核的性能。但整個過程還是花費了 31 分鐘。

    使用 Remote API 分布式加速

    現(xiàn)在我們嘗試使用 Remote API 通過分布式方式加速整個過程。

    集群方面,我們使用最開始說的第三種方式,直接在 MaxCompute 上拉起一個集群。大家可以選擇其他方式,效果是一樣的。

    n_cores = 8 mem = 2 * n_cores # 16G # o 是 MaxCompute 入口,這里創(chuàng)建 10 個 worker 的集群,每個 worker 8核16G cluster = o.create_mars_cluster(10, n_cores, mem, image='extended')

    為了方便在分布式讀取數(shù)據(jù),我們對數(shù)據(jù)處理稍作改動,把數(shù)據(jù)上傳到 MaxCompute 資源。對于其他環(huán)境,用戶可以考慮 HDFS、Aliyun OSS 或者 Amazon S3 等存儲。

    if not o.exist_resource('otto_train.csv'):with open('otto/train.csv') as f:# 上傳資源o.create_resource('otto_train.csv', 'file', fileobj=f)def gen_data():# 改成從資源讀取df = pd.read_csv(o.open_resource('otto_train.csv'))X = df.drop(['target', 'id'], axis=1)y = df['target']label_encoder = LabelEncoder()label_encoder.fit(y)y = label_encoder.transform(y)return train_test_split(X, y, test_size=0.33, random_state=123)

    稍作改動之后,我們使用?mars.remote.spawn?方法來讓?gen_data?調(diào)度到集群上運行。

    import mars.remote as mr# n_output 說明是 4 輸出 # execute() 執(zhí)行后,數(shù)據(jù)會讀取到 Mars 集群內(nèi)部 data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute() # remote_ 開頭的都是 Mars 對象,這時候數(shù)據(jù)在集群內(nèi),這些對象只是引用 remote_X_train, remote_X_test, remote_y_train, remote_y_test = data

    目前 Mars 能正確序列化 numpy ndarray、pandas DataFrame 等,還不能序列化模型,所以,我們要對?train_and_metric?稍作改動,把模型 pickle 了之后再返回。

    def distributed_train_and_metric(train_func,train_params: dict,X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series,verbose: bool = False):model, metric = train_and_metric(train_func, train_params,X_train, y_train, X_test, y_test, verbose=verbose)return pickle.dumps(model), metric

    后續(xù) Mars 支持了序列化模型后可以直接 spawn 原本的函數(shù)。

    接著我們就對前面的執(zhí)行過程稍作改動,把函數(shù)調(diào)用全部都用?mars.remote.spawn?來改寫。

    import numpy as nptasks = [] models = [] metrics = []# ------------- # Random Forest # -------------for params in gen_random_forest_parameters():# fixed random_stateparams['random_state'] = 123task = mr.spawn(distributed_train_and_metric,args=(random_forest, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs={'verbose': 2},n_output=2)tasks.extend(task)# 把模型和評價分別存儲models.append(task[0])metrics.append(task[1])# ------------------- # Logistic Regression # -------------------for params in gen_lr_parameters():# fixed random_stateparams['random_state'] = 123task = mr.spawn(distributed_train_and_metric,args=(logistic_regression, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs={'verbose': 2},n_output=2)tasks.extend(task)# 把模型和評價分別存儲models.append(task[0])metrics.append(task[1])# ------- # XGBoost # -------for params in gen_xgb_parameters():# fixed random_stateparams['random_state'] = 123# 再指定并發(fā)為核的個數(shù)params['n_jobs'] = n_corestask = mr.spawn(distributed_train_and_metric,args=(xgb, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs={'verbose': 2},n_output=2)tasks.extend(task)# 把模型和評價分別存儲models.append(task[0])metrics.append(task[1])# 把順序打亂,目的是能分散到 worker 上平均一點 shuffled_tasks = np.random.permutation(tasks) _ = mr.ExecutableTuple(shuffled_tasks).execute()

    可以看到代碼幾乎一致。

    運行查看結(jié)果:

    CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms Wall time: 1min 59s

    時間一下子從 31 分鐘多來到了 2 分鐘,提升 15x+。但代碼修改的代價可以忽略不計。

    細心的讀者可能注意到了,分布式運行的代碼中,我們把模型的 verbose 給打開了,在分布式環(huán)境下,因為這些函數(shù)遠程執(zhí)行,打印的內(nèi)容只會輸出到 worker 的標準輸出流,我們在客戶端不會看到打印的結(jié)果,但 Mars 提供了一個非常有用的接口來讓我們查看每個模型運行時的輸出。

    以第0個模型為例,我們可以在 Mars 對象上直接調(diào)用?fetch_log?方法。

    print(models[0].fetch_log())

    輸出我們簡略一部分。

    building tree 1 of 50 building tree 2 of 50 building tree 3 of 50 building tree 4 of 50 building tree 5 of 50 building tree 6 of 50 # 中間省略 building tree 49 of 50 building tree 50 of 50

    要看哪個模型都可以通過這種方式。試想下,如果沒有?fetch_log?API,你確想看中間過程的輸出有多麻煩。首先這個函數(shù)在哪個 worker 上執(zhí)行,不得而知;然后,即便知道是哪個 worker,因為每個 worker 上可能有多個函數(shù)執(zhí)行,這些輸出就可能混雜在一起,甚至被龐大日志淹沒了。fetch_log?接口讓用戶不需要關(guān)心在哪個 worker 上執(zhí)行,也不用擔心日志混合在一起。

    想要了解?fetch_log?接口,可以查看?文檔。

    還有更多

    Mars Remote API 的能力其實不止這些,舉個例子,在 remote 內(nèi)部可以 spawn 新的函數(shù);也可以調(diào)用 Mars tensor、DataFrame 或者 learn 的算法。這些內(nèi)容,讀者們可以先行探索,后續(xù)我們再寫別的文章介紹。

    總結(jié)

    Mars Remote API 通過并行和分布式 Python 函數(shù),用很小的修改代價,極大提升了執(zhí)行效率。

    原文鏈接
    本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

    總結(jié)

    以上是生活随笔為你收集整理的用 Mars Remote API 轻松分布式执行 Python 函数的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。