日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Mars 如何分布式地执行

發(fā)布時間:2024/8/23 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Mars 如何分布式地执行 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

先前,我們已經(jīng)介紹過?Mars 是什么。如今?Mars 已在 Github 開源并對內(nèi)上線試用,本文將介紹 Mars 已實現(xiàn)的分布式執(zhí)行架構(gòu),歡迎大家提出意見。

架構(gòu)

Mars 提供了一套分布式執(zhí)行 Tensor 的庫。該庫使用?mars.actors?實現(xiàn)的 Actor 模型編寫,包含 Scheduler、Worker 和 Web 服務。

用戶向 Mars Web Service 提交的是由 Tensor 組成的 Graph。Web Service 接收這些圖并提交到一臺 Scheduler。在提交作業(yè)到各個 Worker 之前,Mars Scheduler 先將 Tensor 圖編譯成一張由 Chunk 和 Operand 組成的圖,此后對圖進行分析和切分。此后,Scheduler 在所有 Scheduler 中根據(jù)一致性哈希創(chuàng)建一系列控制單個 Operand 執(zhí)行的 OperandActor。Operand 以符合拓撲序的順序進行調(diào)度,當所有 Operand 完成執(zhí)行,整張圖將被標記為已完成,客戶端能夠從 Web 中拉取結(jié)果。整個執(zhí)行過程如下圖所述。

作業(yè)提交

用戶端通過 RESTful API 向 Mars 服務提交作業(yè)。用戶通過編寫 Tensor 上的代碼,此后通過?session.run(tensor)?將 Tensor 操作轉(zhuǎn)換為 Tensor 構(gòu)成的 Graph 并提交到 Web API。此后,Web API 將作業(yè)提交到 SessionActor 并在集群中創(chuàng)建一個 GraphActor 用于圖的分析和管理。用戶端則開始查詢圖的執(zhí)行狀態(tài),直至執(zhí)行結(jié)束。

在 GraphActor 中,我們首先根據(jù) chunks 設置將 Tensor 圖轉(zhuǎn)換為 Operand 和 Chunk 組成的圖,這一過程使得圖可以被進一步拆分并能夠并行執(zhí)行。此后,我們在圖上進行一系列的分析以獲得 Operand 的優(yōu)先級,同時向起始 Operand 指派 Worker,關(guān)于這一部分的細節(jié)可以參考 準備執(zhí)行圖 章節(jié)。此后,每個 Operand 均建立一個 OperandActor 用于控制該 Operand 的具體執(zhí)行。當 Operand 處于?READY狀態(tài)(如同在?Operand 狀態(tài)?章節(jié)描述的那樣),Scheduler 將會為 Operand 選擇目標 Worker,隨后作業(yè)被提交 Worker 進行實際的執(zhí)行。

執(zhí)行控制

當一個 Operand 被提交到 Worker,OperandActor 等待 Worker 上的回調(diào)。如果 Operand 執(zhí)行成功,Operand 的后繼將被調(diào)度。如果 Operand 執(zhí)行失敗,OperandActor 將會嘗試數(shù)次,如果仍失敗則將此次執(zhí)行標記為失敗。

取消作業(yè)

用戶端可以使用 RESTful API 取消運行中的作業(yè)。取消請求將被寫入 Graph 的狀態(tài)存儲中,同時 GraphActor 上的取消接口將被調(diào)用。如果作業(yè)在準備階段,它將在檢測到停止請求后立即結(jié)束,否則請求將被下發(fā)到每個 OperandActor,并設置狀態(tài)為 CANCELLING。如果此時 Operand 沒有運行,Operand 狀態(tài)將被直接置為 CANCELLED。如果 Operand 正在運行,停止請求將被下發(fā)到 Worker 中并導致一個 ExecutionInterrupted 錯誤,該錯誤將返回給 OperandActor,此時 Operand 的狀態(tài)將被標記為 CANCELLED。

準備執(zhí)行圖

當一個 Tensor 圖被提交到 Mars Scheduler,一張包含更細粒度的,由 Operand 和 Chunk 構(gòu)成的圖將根據(jù)數(shù)據(jù)源中包含的 chunks 參數(shù)被生成。

圖壓縮

當完成 Chunk 圖的生成后,我們將會通過合并圖中相鄰的節(jié)點來減小圖的規(guī)模,這一合并也能讓我們充分利用 numexpr 這樣的加速庫來加速計算過程。目前 Mars 僅會合并形成單條鏈的 Operand。例如,當執(zhí)行下面的代碼

import mars.tensor as mt a = mt.random.rand(100, chunks=100) b = mt.random.rand(100, chunks=100) c = (a + b).sum()

Mars 將會合并 Operand ADD 和 SUM 成為 FUSE 節(jié)點。RAND Operand 不會被合并,因為它們并沒有和 ADD 及 SUM 組成一條簡單的直線。

初始 Worker 分配

為 Operand 分配 Worker 對于圖執(zhí)行的性能而言至關(guān)重要。隨機分配初始 Operand 可能導致巨大的網(wǎng)絡開銷,并有可能導致不同 Worker 間作業(yè)分配的不平衡。因為非初始節(jié)點的分配能夠根據(jù)其前驅(qū)生成數(shù)據(jù)的物理分布及各個 Worker 的空閑情況方便地確定,在執(zhí)行圖準備階段,我們只考慮初始 Operand 的分配問題。

初始 Worker 分配需要遵循幾個準則。首先,分配給每個 Worker 執(zhí)行的 Operand 需要盡量保持平衡滿,這能夠使計算集群在整個執(zhí)行階段都有較高的利用率,這在執(zhí)行的最后階段顯得尤其重要。其次,初始節(jié)點分配需要使后續(xù)節(jié)點執(zhí)行時的網(wǎng)絡”傳輸盡量小。也就是說,初始點分配需要充分遵循局部性原則。

需要注意的是,上述準則在某些情況下會彼此沖突。一個網(wǎng)絡傳輸量最小的分配方案可能會非常偏斜。我們開發(fā)了一套啟發(fā)式算法來獲取兩個目標的平衡,該算法描述如下:

  • 選擇列表中的第一個初始節(jié)點和第一臺機器;
  • 從 Operand 圖轉(zhuǎn)換出的無向圖中自該點開始進行深度優(yōu)先搜索;
  • 如果另一個未被分配的初始節(jié)點被訪問到,我們將其分配給步驟1中選擇的機器;
  • 當訪問到的 Operand 總數(shù)大于平均每個 Worker 接受的 Operand 個數(shù)時,停止分配;
  • 前往步驟1,如果仍有 Worker 未被分配 Operand,否則結(jié)束。
  • 調(diào)度策略

    當一個 Operand 組成的 Graph 執(zhí)行時,合適的執(zhí)行順序會減少集群中暫存的數(shù)據(jù)總量,從而減小數(shù)據(jù)被 Spill 到磁盤的可能性。合適的 Worker 能夠減少執(zhí)行時網(wǎng)絡傳輸?shù)目偭俊?/p>

    Operand 選擇策略

    合適的執(zhí)行順序能夠顯著減小集群中暫存的數(shù)據(jù)總量。下圖中展示了 Tree Reduction 的例子,圓形代表 Operand,方形代表 Chunk,紅色代表 Operand 正在執(zhí)行,藍色代表 Operand 可被執(zhí)行,綠色代表 Operand 產(chǎn)生的 Chunk 已被存儲,灰色代表 Operand 及其相關(guān)數(shù)據(jù)已被釋放。假設我們有兩臺 Worker,并且每個 Operand 的資源使用量均相等,每張圖展示的是不同策略下經(jīng)過5個時間單元的執(zhí)行后的狀態(tài)。左圖展示的是節(jié)點依照層次分別執(zhí)行,而右圖展示的是依照接近深度優(yōu)先的順序執(zhí)行。左圖中,有6個 Chunk 的數(shù)據(jù)需要暫存,右圖只有2個。

    因為我們的目標是減少存儲在集群中的數(shù)據(jù)總數(shù),我們?yōu)檫M入 READY 狀態(tài)的 Operand 設定了一套優(yōu)先級策略:

  • 深度更大的 Operand 需要被優(yōu)先執(zhí)行;
  • 被更深的 Operand 依賴的 Operand 需要被優(yōu)先執(zhí)行;
  • 輸出規(guī)模更小的節(jié)點需要被優(yōu)先執(zhí)行。
  • Worker 選擇策略

    當 Scheduler 準備執(zhí)行圖時,初始 Operand 的 Worker 已被確定。我們選擇后續(xù) Operand 分配 Worker 的依據(jù)是輸入數(shù)據(jù)所在的 Worker。如果某個 Worker 擁有的輸入數(shù)據(jù)大小最大,則該 Worker 將被選擇用于執(zhí)行后續(xù) Operand。如果這樣的 Worker 有多個,則各個候選 Worker 的資源狀況將起到?jīng)Q定作用。

    Operand 狀態(tài)

    Mars 中的每一個操作符都被一個 OperandActor 單獨調(diào)度。執(zhí)行的過程是一個狀態(tài)轉(zhuǎn)移的過程。在 OperandActor 中,我們?yōu)槊恳粋€狀態(tài)的進入過程定義一個狀態(tài)轉(zhuǎn)移函數(shù)。起始 Operand 在初始化時位于 READY 狀態(tài),非起始 Operand 在初始化時則位于 UNSCHEDULED 狀態(tài)。當給定的條件滿足,Operand 將轉(zhuǎn)移到另一個狀態(tài)并執(zhí)行相應的操作。狀態(tài)轉(zhuǎn)移的流程可以參考下圖:

    我們在下面描述每個狀態(tài)的含義及 Mats 在這些狀態(tài)下執(zhí)行的操作。

    • UNSCHEDUED:一個 Operand 位于此狀態(tài),當它的上游數(shù)據(jù)沒有準備好。
    • READY:一個 Operand 位于此狀態(tài),當所有上游輸入數(shù)據(jù)均已準備完畢。在進入這一狀態(tài)時,OperandActor 向 AssignerActor 中選擇的所有 Worker 提交作業(yè)。如果某一 Worker 準備運行作業(yè),它將向 Scheduler 發(fā)送消息,Scheduler 將向其他 Worker 發(fā)送停止運行的消息,此后向該 Worker 發(fā)送消息以啟動作業(yè)執(zhí)行。
    • RUNNING:一個 Operand 位于此狀態(tài),當它的執(zhí)行已經(jīng)啟動。在進入此狀態(tài)時,OperandActor 會檢查作業(yè)是否已經(jīng)提交。如果尚未提交,OperandActor 將構(gòu)造一個由 FetchChunk Operand 和當前 Operand 組成的圖,并將其提交到 Worker 中。此后,OperandActor 會在 Worker 中注冊一個回調(diào)來獲取作業(yè)執(zhí)行完成的消息。
    • FINISHED:一個 Operand 位于此狀態(tài),當作業(yè)執(zhí)行已完成。當 Operand 進入此狀態(tài),且 Operand 無后繼,一個消息將被發(fā)送到 GraphActor 以決定是否整個 Graph 的執(zhí)行都已結(jié)束。與此同時,OperandActor 向它的前驅(qū)和后繼發(fā)送執(zhí)行完成的消息。如果一個前驅(qū)收到此消息,它將檢查是否所有的后繼都已執(zhí)行完成。如是,當前 Operand 上的數(shù)據(jù)可以被釋放。如果一個后繼收到此消息,它將檢查是否所有的前驅(qū)已完成。如是,該后繼的狀態(tài)可以轉(zhuǎn)移到 READY。
    • FREED:一個 Operand 位于此狀態(tài),當其上所有數(shù)據(jù)都已被釋放。
    • CANCELLED:一個 Operand 位于此狀態(tài),當所有重新執(zhí)行的嘗試均告失敗。當 Operand 進入此狀態(tài),它將把相同狀態(tài)傳遞到后繼節(jié)點。
    • CANCELLING:一個 Operand 位于此狀態(tài),當它正在被取消執(zhí)行。如果此前作業(yè)正在執(zhí)行,一個取消執(zhí)行的請求會被發(fā)送到 Worker 上。
    • CANCELLED:一個 Operand 位于此狀態(tài),當執(zhí)行已被取消并停止運行。如果執(zhí)行進入這一狀態(tài),OperandActor 會嘗試將書友的后繼都轉(zhuǎn)為 CANCELLING。

    Worker 中的執(zhí)行細節(jié)

    一個 Mars Worker 包含多個進程,以減少全局解釋器鎖(GIL)對執(zhí)行的影響。具體的執(zhí)行在獨立的進程中完成。為減少不必要的內(nèi)存拷貝和進程間通訊,Mars Worker 使用共享內(nèi)存來存儲執(zhí)行結(jié)果。

    當一個作業(yè)被提交到 Worker,它將首先被置于隊列中等待分配內(nèi)存。當內(nèi)存被分配后,其他 Worker 上的數(shù)據(jù),或者當前 Worker 上已被 spill 到磁盤的數(shù)據(jù)將會被重新載入內(nèi)存中。此時,所有計算需要的數(shù)據(jù)已經(jīng)都在內(nèi)存中,真正的計算過程將啟動。當計算完成,Worker 將會把作業(yè)放到共享存儲空間中。這四種執(zhí)行狀態(tài)的轉(zhuǎn)換關(guān)系見下圖。

    執(zhí)行控制

    Mars Worker 通過 ExecutionActor 控制所有?Operand 在 Worker 中的執(zhí)行。該 Actor 本身并不參與實際運算或者數(shù)據(jù)傳輸,只是向其他 Actor 提交任務。

    Scheduler 中的 OperandActor 通過 ExecutionActor 上的?enqueue_graph?調(diào)用向 Worker 提交作業(yè)。Worker 接受 Operand 提交并且將其換存在隊列中。當作業(yè)可以執(zhí)行時,ExecutionActor 將會向 Scheduler 發(fā)送消息,Scheduler 將確定是否將執(zhí)行該操作。當 Scheduler 確定在當前 Worker 上執(zhí)行 Operand,它將調(diào)用?start_execution?方法,并通過?add_finish_callback注冊一個回調(diào)。這一設計允許執(zhí)行結(jié)果被多個位置接收,這對故障恢復有價值。

    ExecutionActor 使用?mars.promise?模塊來同時處理多個 Operand 的執(zhí)行請求。具體的執(zhí)行步驟通過 Promise 類的 then 方法相串聯(lián)。當最終的執(zhí)行結(jié)果被存儲,之前注冊的回調(diào)將被觸發(fā)。如果在之前的任意執(zhí)行步驟中發(fā)生錯誤,該錯誤會被傳導到最后 catch 方法注冊的處理函數(shù)中并得到處理。

    Operand 的排序

    所有在 READY 狀態(tài)的 Operand 都被提交到 Scheduler 選擇的 Worker 中。因此,在執(zhí)行的絕大多數(shù)時間里,提交到 Operand 的 Worker 個數(shù)通常都高于單個 Worker 能夠處理的 Operand 總數(shù)。因此,Worker 需要對 Operand 進行排序,此后選擇一部分 Worker 來執(zhí)行。這一排序過程在 TaskQueueActor 中進行,該 Actor 中維護一個優(yōu)先隊列,其中存儲 Operand 的相關(guān)信息。與此同時,TaskQueueActor 定時運行一個作業(yè)分配任務,對處于優(yōu)先隊列頭部的 Operand 分配執(zhí)行資源直至沒有多余的資源來運行 Operand,這一分配過程也會在新 Operand 提交或者 Operand 執(zhí)行完成時觸發(fā)。

    內(nèi)存管理

    Mars Worker 管理兩部分內(nèi)存。第一部分是每個 Worker 進程私有的內(nèi)存空間,由每個進程自己持有。第二部分是所有進程共享的內(nèi)存空間,由?Apache Arrow 中的 plasma_store?持有。

    為了避免進程內(nèi)存溢出,我們引入了 Worker 級別的 QuotaActor,用于分配進程內(nèi)存。當一個 Operand 開始執(zhí)行前,它將為輸入和輸出 Chunk 向 QuotaActor 發(fā)送批量內(nèi)存請求。如果剩余的內(nèi)存空間可以滿足請求,該請求會被 QuotaActor 接受。否則,請求將排隊等待空閑資源。當相關(guān)內(nèi)存使用被釋放,請求的資源會被釋放,此時,QuotaActor 能夠為其他 Operand 分配資源。

    共享內(nèi)存由 plasma_store 管理,通常會占據(jù)整個內(nèi)存的 50%。由于不存在溢出的可能,這部分內(nèi)存無需經(jīng)過 QuotaActor 而是直接通過 plasma_store 的相關(guān)方法進行分配。當共享內(nèi)存使用殆盡,Mars Worker 會嘗試將一部分不在使用的 Chunk spill 到磁盤中,以騰出空間容納新的 Chunk。

    從共享內(nèi)存 spill 到磁盤的 Chunk 數(shù)據(jù)可能會被未來的 Operand 重新使用,而從磁盤重新載入共享內(nèi)存的操作可能會非常耗費 IO 資源,尤其在共享內(nèi)存已經(jīng)耗盡,需要 spill 其他 Chunk 到磁盤以容納載入的 Chunk 時。因此,當數(shù)據(jù)共享并不需要時,例如該 Chunk 只會被一個 Operand 使用,我們會將 Chunk 直接載入進程私有內(nèi)存中,而不是共享內(nèi)存,這可以顯著減少作業(yè)總執(zhí)行時間。

    未來工作

    Mars 目前正在快速迭代,近期將考慮實現(xiàn) Worker 級別的 failover 及 shuffle 支持,Scheduler 級別的 failover 也在計劃中。

    ?


    原文鏈接
    本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

    總結(jié)

    以上是生活随笔為你收集整理的Mars 如何分布式地执行的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。