获取作业函数的结果
獲取作業函數的結果
本文主要介紹如何在 OneFlow 中獲取作業函數的返回結果,主要包括:
? 如何同步方式獲取作業函數的結果
? 如何異步方式獲取作業函數的結果
在 OneFlow 中,被 @flow.global_function 裝飾器修飾的函數定義稱為作業函數,作業函數可以用于訓練或預測。通過指定作業函數的返回值類型,可以使用同步或者異步的方式獲取作業函數的運算結果。
同步/異步對比
同步
在同步訓練中,只有上一個 step 的工作完成后,才能開始下一個 step 的訓練。
異步
在異步訓練中,作業函數的執行是并發的,某個 step 不必等上一個 step 的作業結束,而是可以提前進行數據加載和預處理。
通過以上對比可知,在 OneFlow 中使用異步執行作業函數,有效利用了計算機資源,尤其是在數據集規模巨大的情況下,開啟異步執行能有效縮短數據的加載和準備時間,加快模型訓練。
接下來,將分別講解如何用同步、異步的方式獲取作業函數的計算結果,并在文章的最后提供完整代碼的鏈接。
要點在于:
? 定義作業函數時,通過注解返回值類型,告之 OneFlow 是同步還是異步模式
? 作業函數的返回值類型在 oneflow.typing 模塊中選擇
? 調用作業函數時,同步/異步調用作業函數的形式略有不同
同步獲取結果
定義作業函數時,通過注解指定作業函數的返回結果為 oneflow.typing.Numpy 時,作業函數為一個同步作業函數。
比如,如果定義了如下的作業函數:
@flow.global_function(type=“train”)
def train_job(
images: tp.Numpy.Placeholder((BATCH_SIZE, 1, 28, 28), dtype=flow.float),
labels: tp.Numpy.Placeholder((BATCH_SIZE,), dtype=flow.int32),
) -> tp.Numpy:
with flow.scope.placement(“gpu”, “0:0”):
logits = lenet(images, train=True)
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(
labels, logits, name=“softmax_loss”
)
lr_scheduler = flow.optimizer.PiecewiseConstantScheduler([], [0.1])
flow.optimizer.SGD(lr_scheduler, momentum=0).minimize(loss)
return loss
以上代碼,通過 python 注解的方式告之 OneFlow 系統,返回的是 tp.Numpy ( tp 是 腳本中 oneflow.typing 的別名)類型,它對應了 NumPy 中的 ndarray。
那么,當調用作業函數時,作業函數會直接返回 ndarray 對象:
loss = train_job(images, labels)
if i % 20 == 0:
print(loss.mean())
從以上示例中,應該注意到:
? 定義作業函數時,作業函數返回的對象(上文中的 loss) 只是數據占位符,用于構建計算圖,并沒有真實數據。
? 通過指定作業函數的返回值類型為 oneflow.typing.Numpy,可以告之 OneFlow 此作業函數調用時,返回的真實數據類型為 NumPy ndarray 對象
? 通過調用作業函數 train_job(images, labels), 可以直接獲取作業函數的運行計算結果,類型為 oneflow.typing.Numpy 對應的 ndarray 對象。
oneflow.typing 中的數據類型
oneflow.typing 中包含了作業函數可以返回的數據類型,上文中出現的 oneflow.typing.Numpy 只是其中一種,現將其中常用的幾種類型及對應意義羅列如下:
? oneflow.typing.Numpy:對應了 numpy.ndarray,本文主要以 oneflow.typing.Numpy 舉例
? oneflow.typing.ListNumpy:對應了一個 list 容器,其中每個元素都是一個 numpy.ndarray 對象。與 OneFlow 進行分布式訓練的視角有關,將在分布式訓練的 consistent 與 mirrored 視角中看到其作用
? oneflow.typing.ListListNumpy:對應了一個 list 容器,其中每個元素都是一個 TensorList 對象,OneFlow 的某些接口需要處理或者返回多個 TensorList 對象。具體可以參閱 概念清單 及相關 API 文檔
? oneflow.typing.Callback:對應了一個回調函數,用于異步調用作業函數,下文會介紹
此外,OneFlow 還允許作業函數以字典的形式傳出數據,有關 ListNumpy、ListNumpy、ListListNumpy 以及如何用字典方式傳出數據的示例,可以參考 OneFlow 的測試案例。
異步獲取結果
一般而言,采用異步方式獲取訓練結果的效率高于同步方式。 以下介紹如何異步調用作業函數并處理訓練結果。
其基本步驟包括:
? 準備回調函數:需要通過注解的方式指定回調函數所接受的參數,在回調函數的內部,實現處理作業函數返回值結果的邏輯
? 實現作業函數:通過注解的方式,指定 flow.typing.Callback 為作業函數的返回類型。將在下文例子中看到,通過 Callback 可以指定回調函數的參數類型
? 調用作業函數:并注冊以上第一步準備的回調函數
以上工作三個步驟由 OneFlow 的用戶完成,在程序運行時,注冊的回調函數會被 OneFlow 調用,并將作業函數的返回值作為參數傳遞給回調函數。
編寫回調函數
回調函數的原型如下:
def cb_func(result: T):
#…
其中的 result ,需要通過注解,指定其類型 T,即上文中提到的 Numpy、ListNumpy等,也可以是它們的復合類型,下文將有對應的實例。
參數 result 對應了作業函數的返回值,因此必須與作業函數返回值所注解的一致。
比如,定義了一個作業函數:
@flow.global_function(type=“train”)
def train_job(
images: tp.Numpy.Placeholder((BATCH_SIZE, 1, 28, 28), dtype=flow.float),
labels: tp.Numpy.Placeholder((BATCH_SIZE,), dtype=flow.int32),
) -> tp.Callback[tp.Numpy]:
# mlp
initializer = flow.truncated_normal(0.1)
reshape = flow.reshape(images, [images.shape[0], -1])
hidden = flow.layers.dense(
reshape,
512,
activation=flow.nn.relu,
kernel_initializer=initializer,
name=“hidden”,
)
logits = flow.layers.dense(
hidden, 10, kernel_initializer=initializer, name=“output”
)
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(labels, logits, name="softmax_loss"
)
lr_scheduler = flow.optimizer.PiecewiseConstantScheduler([], [0.1])
flow.optimizer.SGD(lr_scheduler, momentum=0).minimize(loss)
return loss
注解-> tp.Callback[tp.Numpy] 表示此作業函數,返回一個 tp.Numpy 類型的對象,并且需要異步調用。
那么,定義的回調函數,就應該接受一個 Numpy 類型的參數:
def cb_print_loss(result: tp.Numpy):
global g_i
if g_i % 20 == 0:
print(result.mean())
g_i += 1
類似的,如果作業函數的定義為:
@flow.global_function(type=“predict”)
def eval_job(
images: tp.Numpy.Placeholder((BATCH_SIZE, 1, 28, 28), dtype=flow.float),
labels: tp.Numpy.Placeholder((BATCH_SIZE,), dtype=flow.int32),
) -> tp.Callback[Tuple[tp.Numpy, tp.Numpy]]:
with flow.scope.placement(“cpu”, “0:0”):
logits = mlp(images)
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(
labels, logits, name=“softmax_loss”
)
return (labels, logits)
其中 -> tp.Callback[Tuple[tp.Numpy, tp.Numpy]] 表示此作業函數,返回一個包含2個元素的 tuple,且每個元素都是 tp.Numpy 類型,并且作業函數需要異步調用。
那么,對應的回調函數的參數注解應該為:
g_total = 0
g_correct = 0
def acc(arguments: Tuple[tp.Numpy, tp.Numpy]):
global g_total
global g_correct
labels = arguments[0]
logits = arguments[1]
predictions = np.argmax(logits, 1)
right_count = np.sum(predictions == labels)
g_total += labels.shape[0]
g_correct += right_count
arguments 對應了以上作業函數的返回類型。
注冊回調函數
當異步調用作業函數時,返回一個 Callback 對象,將準備好的回調函數傳遞給它,就完成了注冊。
OneFlow 會在獲取到訓練結果時,自動調用注冊的回調。
callbacker = train_job(images, labels)
callbacker(cb_print_loss)
不過以上的寫法比較冗余,推薦使用:
train_job(images, labels)(cb_print_loss)
完整代碼
同步獲取一個結果
在本例中,使用一個 LeNet 網絡,通過同步方式獲取唯一的返回結果 loss ,并每隔20輪打印一次 loss。
代碼鏈接:synchronize_single_job.py
運行:
wget https://docs.oneflow.org/code/basics_topics/synchronize_single_job.py
python3 synchronize_single_job.py
會有類似輸出:
File mnist.npz already exist, path: ./mnist.npz
7.3258467
2.1435719
1.1712438
0.7531896
…
…
model saved
同步獲取多個返回結果
在本例中,作業函數返回一個 tuple ,通過同步方式獲取 tuple 中 labels 與 logits ,并對上例中訓練好的模型進行評估,輸出準確率。
代碼:synchronize_batch_job.py
其中,預訓練模型文件下載:lenet_models_1.zip
運行:
wget https://oneflow-public.oss-cn-beijing.aliyuncs.com/online_document/docs/quick_start/lenet_models_1.zip
unzip lenet_models_1.zip
wget https://docs.oneflow.org/code/basics_topics/synchronize_batch_job.py
python3 synchronize_batch_job.py
會有輸出:
accuracy: 99.3%
異步獲取一個返回結果
在本例中,使用 mlp 訓練,通過異步方式獲取唯一的返回結果 loss ,并每隔20輪打印一次 loss。
代碼下載:async_single_job.py
運行:
wget https://docs.oneflow.org/code/basics_topics/async_single_job.py
python3 async_single_job.py
會有類似輸出:
File mnist.npz already exist, path: ./mnist.npz
3.0865736
0.8949808
0.47858357
0.3486296
…
異步獲取多個返回結果
在以下的例子中,展示了如何異步方式獲取作業函數的多個返回結果,并對上例中訓練好的模型進行評估,輸出準確率。
代碼下載:async_batch_job.py
其中,預訓練模型文件可以點此處下載:mlp_models_1.zip
wget https://oneflow-public.oss-cn-beijing.aliyuncs.com/online_document/docs/basics_topics/mlp_models_1.zip
unzip mlp_models_1.zip
wget https://docs.oneflow.org/code/basics_topics/async_batch_job.py
python3 async_batch_job.py
輸出:
File mnist.npz already exist, path: ./mnist.npz
accuracy: 97.6%
總結