日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

自定义 DataLoader

發(fā)布時間:2023/11/28 生活经验 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 自定义 DataLoader 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

自定義 DataLoader
如 數(shù)據(jù)輸入 一文所介紹,OneFlow 支持兩種數(shù)據(jù)加載方式:直接使用 NumPy 數(shù)據(jù)或者使用 DataLoader 及其相關(guān)算子。
在大型工業(yè)場景下,數(shù)據(jù)加載容易成為訓練的瓶頸。在其它的框架中,數(shù)據(jù)加載流水線往往作為單獨的模塊存在,需要針對不同場景進行調(diào)整,通用性不高。在 OneFlow 中,DataLoader 及相關(guān)預(yù)處理算子,與其它普通算子地位等同,可以享受與其它算子一樣的流水加速效果,輕松解決大規(guī)模數(shù)據(jù)加載的痛點。
在 OneFlow 中使用 DataLoader,一般通過調(diào)用 XXXReader 加載文件中的數(shù)據(jù),調(diào)用 XXXDeocde 等對數(shù)據(jù)進行解碼或其它預(yù)處理,一起協(xié)同完成 Dataloader 的功能。
OneFlow 目前內(nèi)置了一些文件格式的 DataLoader。如果想使用 DataLoader 提高數(shù)據(jù)加載的效率,但是加載的數(shù)據(jù)格式暫時沒有內(nèi)置在 OneFlow 中,此時,可以自己實現(xiàn) DataLoader,加載自定義的數(shù)據(jù)格式。
本文實現(xiàn)了一個 Mini Dataloader,在倉庫中可查看完整代碼。
作為示例,Mini Dataloader 支持的文件格式為:以逗號分隔的每行兩列數(shù)字的純文本文件(見代碼中的 part-000 及 part-001 文件):
1.01,2.02
2.01,4.02
3.0,6.05
4.1,8.205
5,10
6.0,12.0
7.0,14.2
8.0,16.3
9.1,18.03
本文將以 Mini Dataloader 為例,對自定義格式的 DataLoader 的實現(xiàn)要點,進行講解。
Dataloader 的組成
完整的 Dataloader 一般包括兩類 Op:
? Data Reader:負責將文件系統(tǒng)中的數(shù)據(jù),加載到內(nèi)存中的輸入流,并最終將數(shù)據(jù)設(shè)置到 Op 的輸出中。它又可以細分為 Loader 與 Parser 兩部分,Loader 負責從文件系統(tǒng)中讀取原始數(shù)據(jù),Parser 負責將原始數(shù)據(jù)組織為 Data Reader Op 的輸出
? Data Preprocessor:將 Data Reader Op 的輸出的數(shù)據(jù)進行預(yù)處理,常見的預(yù)處理有圖片解碼、剪裁圖片、解碼等
對于一些簡單的數(shù)據(jù)格式,不需要預(yù)處理,可以省略掉 Data Preprocessor,只使用 Data Reader 即可。
作為示例, Mini Dataloader 處理的數(shù)據(jù)格式雖然簡單,但是依然實現(xiàn)了 DataReader 及 Data Preprocessor 兩類 op,其中:
? MiniReader 負責從文件中讀取數(shù)據(jù),并按逗號分隔字符串,將文本轉(zhuǎn)為浮點數(shù)據(jù)后,設(shè)置到 Op 的輸出中,輸出形狀為每行兩列
? MiniDecoder 負責將以上每行兩列的輸出進行分割,得到2個每行1列的輸出 x 與 y
在 test_mini_dataloader.py 中可以看到 Python 層次兩者的使用:
miniRecord = MiniReader(
“./”,
batch_size=batch_size,
data_part_num=2,
part_name_suffix_length=3,
random_shuffle=True,
shuffle_after_epoch=True,
)

x, y = MiniDecoder(miniRecord, name="d1")

以下,將介紹 C++ 層次如何實現(xiàn) Data Reader 算子與 Data Preprocessor 算子。
Data Reader 算子
Data Reader 的類關(guān)系
需要實現(xiàn)一個繼承自 DataReader 的類,該類包含了兩個重要對象 loader_ 與 parser_,分別繼承自 Dataset 與 Parser。
? loader_ 的工作是從文件系統(tǒng)中讀取數(shù)據(jù)至緩沖區(qū),Op 作者通過重寫 Next 方法編寫這部分的邏輯
? parser_ 的工作是將緩沖區(qū)中的數(shù)據(jù),設(shè)置到 Op 的輸出中,Op 作者通過重寫 Parser 方法編寫這部分的邏輯
當 Data Reader Op 工作時,會調(diào)用 loader_ 中的相關(guān)方法打開文件系統(tǒng)中的文件,并調(diào)用 loader_ 的 Next 方法按照 Op 作者預(yù)定的邏輯從文件系統(tǒng)讀取數(shù)據(jù),然后,再調(diào)用 parser_ 的 Parser 方法,將數(shù)據(jù)設(shè)置到 Op 的輸出中。
以下的偽代碼展示了以上類關(guān)系和調(diào)用過程,實際代碼比偽代碼要復(fù)雜,并不是一一對應(yīng)的關(guān)系:
class DataReader{
void Read(user_op::KernelComputeContext* ctx) {
// 運行到此處,已經(jīng)啟動了多線程加速數(shù)據(jù)處理
loader->next();
parser_->Parse();
}
Dataset* loader_;
Parser* parser_;
};

class MiniDataReader : DataReader{
loader_ = new MiniDataSet;
parser_ = new MiniParser;
};

class MiniDataset: Dataset {
MiniDataset() {
//在文件系統(tǒng)中找到數(shù)據(jù)集,并打開文件,初始化輸入流
//…
}

Next() {
// 從輸入流中讀取數(shù)據(jù)的邏輯
}
};

class MiniParser: Parser {
void Parse(){
// 將 DataSet 中的數(shù)據(jù) 設(shè)置到 Op 的輸出中
}
};
在 Data Reader Op 的 Kernel 中,會觸發(fā) DataReader 的 Read 方法,進而完成以上偽代碼所展示的一連串操作。
以下針對 MiniReader 算子的真實代碼進行解析。
Op 及 Kernel 注冊
通過以下代碼,注冊了 MiniReader 的 Op:
REGISTER_CPU_ONLY_USER_OP(“MiniReader”)
.Output(“out”)
.Attrstd::string(“data_dir”)
.Attrstd::int32_t(“data_part_num”)
.Attrstd::string(“part_name_prefix”, “part-”)
.Attr<int32_t>(“part_name_suffix_length”, -1)
.Attr<int32_t>(“batch_size”)
.Attr(“random_shuffle”, false)
.Attr(“shuffle_after_epoch”, false)
.Attr<int64_t>(“seed”, -1)
.Attr<int32_t>(“shuffle_buffer_size”, 1024)
.SetTensorDescInferFn([](user_op::InferContext* ctx) -> Maybe {
//…
*out_tensor->mut_shape() = Shape({local_batch_size, 2});
out_tensor->mut_data_type() = DataType::kDouble;
//…
})
.SetGetSbpFn([](user_op::SbpContext
ctx) -> Maybe {
ctx->NewBuilder().Split(ctx->outputs(), 0).Build();
//…
});
可以看到,因為 Data Reader 是比較特殊的 Op,只有輸出,沒有輸入(數(shù)據(jù)來自文件系統(tǒng),而不是神經(jīng)網(wǎng)絡(luò)中的某個上游節(jié)點),只通過 Out 方法設(shè)置了輸出,并在 SetTensorDescInferFn 設(shè)置了輸出的性質(zhì)為每行2列,數(shù)據(jù)類為 DataType::kDouble。同理,在設(shè)置 SetGetSbpFn 中設(shè)置 SBP Signature 時,只需要設(shè)置輸出的 SBP 屬性,將其設(shè)置為 Split(0)。
而設(shè)置的各種屬性(data_dir、data_part_num 等),沿用了 OFRecord 數(shù)據(jù)集 中關(guān)于文件命名規(guī)范的要求,這使得可以復(fù)用 OneFlow 中已有的相關(guān)代碼,像 加載 OFRecord 數(shù)據(jù)集 那樣,加載自定義格式的文件。
接著看這個 Op 的 Kernel 實現(xiàn):
class MiniReaderKernel final : public user_op::OpKernel {
public:
//…

std::shared_ptr<user_op::OpKernelState>
CreateOpKernelState(user_op::KernelInitContext* ctx) override{
std::shared_ptr reader(new MiniReaderWrapper(ctx));
return reader;
}

void Compute(user_op::KernelComputeContext* ctx,
user_op::OpKernelState* state) override {
auto* reader = dynamic_cast<MiniReaderWrapper*>(state);
reader->Read(ctx);
}
//…
};

REGISTER_USER_KERNEL(“MiniReader”)
.SetCreateFn()
.SetIsMatchedHob((user_op::HobDeviceTag() == “cpu”)
& (user_op::HobDataType(“out”, 0) == DataType::kDouble));
依據(jù) 自定義 Op 一文中的知識,知道 MiniReaderKernel::Compute 負責 Op 的運算邏輯。不過此處使用使用的 Compute 是包含2個參數(shù)的重載,有必要介紹下它的第二個參數(shù) OpKernelState。
當進行 Compute 時,有時除了從 ctx 獲取的信息外,還需要維護一些其他的對象,這種對象不需要反復(fù)創(chuàng)建,其中的信息狀態(tài)可能隨著 Compute 多次調(diào)用而改變。為了應(yīng)對這種需求,OneFlow 提供了2個參數(shù)的 Compute 的重載,為了使用它,必須同時重寫 CreateOpKernelState,CreateOpKernelState 的作用是返回一個 user_op::OpKernelState 派生類對象,這個對象,將在 Compute 調(diào)用時,作為第二個參數(shù)傳遞。
為此,只需要將除 ctx 外想要維護的信息,封裝為 user_op::OpKernelState 的派生類,并在 CreateOpKernelState 實例化并返回即可。
具體到實現(xiàn)的 Mini Reader 的 Kernel,先實現(xiàn)了一個繼承自 user_op::OpKernelState 的類 MiniReaderWrapper, 它是對 MiniDataReader 的簡單封裝,之所以封裝一層 MiniReaderWrapper 而不直接使用 MiniDataReader,僅僅是為了符合以上所述的 OneFlow 框架要求。
class MiniReaderWrapper final : public user_op::OpKernelState {
public:
explicit MiniReaderWrapper(user_op::KernelInitContext* ctx) : reader_(ctx) {}
~MiniReaderWrapper() = default;

void Read(user_op::KernelComputeContext* ctx) { reader_.Read(ctx); }

private:
data::MiniDataReader reader_;
};
然后,重寫 CreateOpKernelState,在其內(nèi)部創(chuàng)建一個 MiniReaderWrapper 對象:
std::shared_ptr<user_op::OpKernelState>
CreateOpKernelState(user_op::KernelInitContext* ctx) override{
std::shared_ptr reader(new MiniReaderWrapper(ctx));
return reader;
}
這樣,在適當?shù)臅r機,OneFlow 就會自動調(diào)用 CreateOpKernelState 創(chuàng)建對象,并將其作為第2個參數(shù)傳遞給 Compute??梢栽?Compute 中拿到這個對象,并使用:
auto* reader = dynamic_cast<MiniReaderWrapper*>(state);
reader->Read(ctx);
可以看到,在 MiniReader 的 Kernel 中,僅僅是簡單調(diào)用了 MiniReaderWrapper::Reader,這會觸發(fā)上文偽代碼中所提及的 DataReader::Read 流程。
MiniDataReader
上文偽代碼中已經(jīng)提及,在 MiniDataReader 內(nèi)部,會實例化一個 MiniDataset 并賦值給 loader_ 指針。 以下是真實代碼:
class MiniDataReader final : public DataReader {
public:
MiniDataReader(user_op::KernelInitContext* ctx) : DataReader(ctx) {
loader_.reset(new MiniDataset(ctx));
parser_.reset(new MiniParser());
if (ctx->Attr(“random_shuffle”)) {
loader_.reset(new RandomShuffleDataset(ctx, std::move(loader_)));
}
int32_t batch_size = ctx->TensorDesc4ArgNameAndIndex(“out”, 0)->shape().elem_cnt();
loader_.reset(new BatchDataset(batch_size, std::move(loader_)));
StartLoadThread();
}
};
可以看到,除了自己繼承自 DataSet 的 MiniDataset 類之外,OneFlow 還內(nèi)置了其他的 XXXDataSet,稱為 修飾器 。
修飾器可以在已有的 DataSet 基礎(chǔ)上增加額外功能,如以上的 BatchDataset 用于批量讀取數(shù)據(jù)。DataSet 修飾器均在 user/data 目錄,常見的修飾器有:
? BatchDataset:用于批量讀取數(shù)據(jù)
? RandomShuffleDataset:用于將數(shù)據(jù)的順序隨機化
? GroupBatchDataset:用于更定制化地組 batch,會把相同 group id 的數(shù)據(jù)實例放在同一個 batch 內(nèi)
? DistributedTrainingDataset:用于分布式的情況下,把一個 epoch 內(nèi)的數(shù)據(jù)平均分配到不同節(jié)點讀取
? 一切完成后,最后調(diào)用 StartLoadThread,顧名思義,啟動加載線程,在 StartLoadThread 中,最終會觸發(fā)重寫的 MiniDataset::Next 方法。
以上 MiniDataReader 的構(gòu)造,可以作為模板,沒有特殊要求,在實現(xiàn)自定義的 DataLoader 過程中,不需要修改。
MiniDataset
對于 MiniDataSet,只需要關(guān)心它的構(gòu)造函數(shù)以及重寫的 Next 方法。
構(gòu)造函數(shù)主要是通過 Attr 獲取用戶的配置,然后根據(jù)用戶配置,初始化輸入流。以下代碼中的 JoinDirPath 內(nèi)部,主要根據(jù)數(shù)據(jù)集文件名的約定(前綴、文件數(shù)目,文件名編號是否補齊等),獲取所有的文件名稱;而 InitInStream 是將數(shù)據(jù)集中的文件,都初始化為 OneFlow 封裝的輸入流(in_stream_ 成員),這在后續(xù)的 Next 方法中會使用。
MiniDataset(user_op::KernelInitContext* ctx) {
current_epoch_ = 0;
shuffle_after_epoch_ = ctx->Attr(“shuffle_after_epoch”);

//Join Dir Path
JoinDirPath(ctx);// in stream
InitInStream(ctx);

}
從文件中加載的邏輯,寫在 Next 虛函數(shù)中:
LoadTargetPtrList Next() override {
LoadTargetPtrList ret;
LoadTargetPtr sample_ptr(new TensorBuffer());

std::string sampleline;
if (in_stream_->ReadLine(&sampleline) != 0) {ShuffleAfterEpoch();in_stream_->ReadLine(&sampleline);
}auto numbers = CommaSplit(sampleline);
sample_ptr->Resize(Shape({2}), DataType::kDouble);
auto pNums = sample_ptr->mut_data<double>();
pNums[0] = std::stod(numbers[0]);
pNums[1] = std::stod(numbers[1]);
ret.push_back(std::move(sample_ptr));return ret;

}
在以上代碼中,通過調(diào)用 in_stream_ 的 ReadLine 方法,將文件中的數(shù)據(jù),讀取至 string 對象 sampleline 中。然后通過 CommaSplit 等操作,將字符串按逗號分隔,并轉(zhuǎn)為浮點數(shù),放置到 TensorBuffer 對象中。
值得一提的是,in_stream_ 有2種方法從文件中讀取數(shù)據(jù),分別是:
int32_t PersistentInStream::ReadLine(std::string* l);
int32_t PersistentInStream::ReadFully(char* s, size_t n);
ReadLine 讀取文件中的一行,至 l 對象;ReadFully 讀取 n 個字節(jié)的數(shù)據(jù),至 s 所指向的內(nèi)存。均以0作為成功時的返回值。
MiniDataSet 完成從文件到內(nèi)存緩沖區(qū)的工作,接著,將使用 MiniParser,將緩沖區(qū)中的內(nèi)容,設(shè)置到 Op 的輸出中。
MiniParser
MiniParser 繼承自 Parser,只需要重寫其中的 Parser 方法即可:
class MiniParser final : public Parser {
public:
using LoadTargetPtr = std::shared_ptr;
using LoadTargetPtrList = std::vector;

void Parse(std::shared_ptr batch_data,
user_op::KernelComputeContext* ctx) override {
user_op::Tensor* out_tensor = ctx->Tensor4ArgNameAndIndex(“out”, 0);
double* dptr = out_tensor->mut_dptr();

MultiThreadLoop(batch_data->size(), [&](size_t i) {TensorBuffer* buffer = batch_data->at(i).get();dptr[i*2]= *(buffer->data<double>());dptr[i*2+1]= *(buffer->data<double>()+1);
});

}
};
Parser 包含2個參數(shù),其中 batch_data 其實是一個封裝了的 vecotr,這個容器內(nèi)的每個元素,就是之前 MiniDataSet 通過 Next 讀取的數(shù)據(jù)。 參數(shù) ctx 使得可以獲取 Op 的信息,在這里,主要通過 ctx 獲取輸出,并獲取指向輸出緩沖區(qū)的指針 dptr。
注意,將 batch_data 中的數(shù)據(jù)設(shè)置到 Op 的輸出 dptr 的過程中,使用了宏 MultiThreadLoop。MultiThreadLoop 可以讓的循環(huán)邏輯在多線程中執(zhí)行,它接受2個參數(shù),第一個參數(shù)為循環(huán)的總次數(shù);第二個參數(shù)是一個回調(diào)函數(shù),原型為 void callback(size_t i),OneFlow 會創(chuàng)建多個線程,然后并發(fā)調(diào)用這個回調(diào)函數(shù)。回調(diào)函數(shù)的參數(shù) i 表明了當前循環(huán)的序號,使得可以根據(jù) i 來劃分數(shù)據(jù),完成自己的業(yè)務(wù)邏輯。
在以上的代碼中,通過 batch_data->at(i).get() 獲取了緩沖區(qū)的第 i 個的數(shù)據(jù),然后將其設(shè)置到輸出的內(nèi)存區(qū)的第 i 行的位置,一共2列。
Data Preprocessor 算子
Data Preprocessor 算子,其實就是一種普通的算子,接受 DataReader 的輸出作為自己的輸入,然后通過運算后,輸出一個或者多個 Blob。
在 ofrecord_decoder_ops.cpp 可以看到針對 OFRecord 數(shù)據(jù)的各種預(yù)處理操作(以解碼為主)。
的 Mini Dataloader 處理的數(shù)據(jù)比較簡單,因此 MiniDecoder 所做的工作也很簡單,僅僅是將 DataReader 所輸出的每行2列的數(shù)據(jù),拆分為2個每行1列的輸出 x 與 y。
Mini Decoder 的 Op 注冊為:
REGISTER_CPU_ONLY_USER_OP(“mini_decoder”)
.Input(“in”)
.Output(“x”)
.Output(“y”)
.SetTensorDescInferFn([](user_op::InferContext* ctx) -> Maybe {
user_op::TensorDesc* in_tensor = ctx->TensorDesc4ArgNameAndIndex(“in”, 0);
user_op::TensorDesc* out_tensor_x = ctx->TensorDesc4ArgNameAndIndex(“x”, 0);
user_op::TensorDesc* out_tensor_y = ctx->TensorDesc4ArgNameAndIndex(“y”, 0);
// 設(shè)置輸入輸出 Blob 的屬性
// …
})
.SetGetSbpFn([](user_op::SbpContext* ctx) -> Maybe {
ctx->NewBuilder()
.Split(user_op::OpArg(“in”, 0), 0)
.Split(user_op::OpArg(“x”, 0), 0)
.Split(user_op::OpArg(“y”, 0), 0)
.Build();
//…
});
Mini Decoder 的 Kernel 的實現(xiàn):
class MiniDecoderKernel final : public user_op::OpKernel {
//…
void Compute(user_op::KernelComputeContext* ctx) const override {
user_op::Tensor* in_blob = ctx->Tensor4ArgNameAndIndex(“in”, 0);
user_op::Tensor* out_blob_x = ctx->Tensor4ArgNameAndIndex(“x”, 0);
user_op::Tensor* out_blob_y = ctx->Tensor4ArgNameAndIndex(“y”, 0);

int64_t record_num = in_blob->shape().At(0);const double* input = in_blob->dptr<double>();
double* out_dptr_x = out_blob_x->mut_dptr<double>();
double* out_dptr_y = out_blob_y->mut_dptr<double>();MultiThreadLoop(record_num, [&](size_t i){*(out_dptr_x + i) = *(input+i*2);*(out_dptr_y + i) = *(input+i*2 + 1);
});

}
//…
};
可見,在 MiniDecoderKernel::Compute 中主要是獲取到輸入 in_blob, 然后在多線程循環(huán) MultiThreadLoop 中,將輸入的數(shù)據(jù)拆分到 out_dptr_x 與 out_dptr_y 中,分別對應(yīng)了輸出 x 與 y。
自定義 DataLoader 的使用
如 自定義 Op 一文中所描述,要使用 C++ 層編寫的 Op,還需要在 Python 層封裝一個 Python Wrapper。這些工作放到了 test_mini_dataloader.py中:
def MiniDecoder(
input_blob,
name = None,
):
if name is None:
name = “Mini_Decoder_uniqueID”
return (
flow.user_op_builder(name)
.Op(“mini_decoder”)
.Input(“in”, [input_blob])
.Output(“x”)
.Output(“y”)
.Build()
.InferAndTryRun()
.RemoteBlobList()
)

def MiniReader(
minidata_dir: str,
batch_size: int = 1,
data_part_num: int = 2,
part_name_prefix: str = “part-”,
part_name_suffix_length: int = -1,
random_shuffle: bool = False,
shuffle_after_epoch: bool = False,
shuffle_buffer_size: int = 1024,
name = None,
):
if name is None:
name = “Mini_Reader_uniqueID”

return (flow.user_op_builder(name).Op("MiniReader").Output("out").Attr("data_dir", minidata_dir).Attr("data_part_num", data_part_num).Attr("batch_size", batch_size).Attr("part_name_prefix", part_name_prefix).Attr("random_shuffle", random_shuffle).Attr("shuffle_after_epoch", shuffle_after_epoch).Attr("part_name_suffix_length", part_name_suffix_length).Attr("shuffle_buffer_size", shuffle_buffer_size).Build().InferAndTryRun().RemoteBlobList()[0]
)

在 test_mini_dataloader.py 中,使用自己實現(xiàn)的 MiniReader 與 MiniDecoder 加載并解碼了數(shù)據(jù)集(part-000 與 part-001)中的數(shù)據(jù),完成了一次訓練。
Mini Dataloader 的編譯與測試?
進入到本文對應(yīng)的 data_loader 目錄。 修改 Makefile 文件中的 ONEFLOW_ROOT 變量為 OneFlow 源碼路徑。 然后通過
make
可生成 miniloader.so 文件。
然后運行 test_mini_dataloader.py 腳本,可以使用 Mini Dataloader 加載數(shù)據(jù)并完成訓練。
python test_mini_dataloader.py

總結(jié)

以上是生活随笔為你收集整理的自定义 DataLoader的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。