Rocksdb 通过ingestfile 来支持高效的离线数据导入
文章目錄
- 前言
- 使用方式
- 實現原理
- 總結
前言
很多時候,我們使用數據庫時會有離線向數據庫導入數據的需求。比如大量用戶在本地的一些離線數據,想要將這一些數據導入到已有的數據庫中;或者說NewSQL場景中部分機器離線,重新上線之后的數據增量/全量同步 等場景。這個時候 我們并不想要讓這一些數據占用過多的系統資源,更不希望他們對正常的線上業務有影響,所以盡可能高效得完成這一些數據的同步就需要深入設計一番。
而如果底層引擎使用的是rocksdb,那就非常省事了,只需要組織好你們的數據調用接口就完事了,剩下的導入過程由引擎完成。 tikv便是通過 rocksdb的這個功能完成集群異常恢復之后 region之間的全量增量同步的。回到今天我們要討論的主題,便是rocksdb的這個數據導入過程是如何盡可能快、盡可能高效得完成的。
使用方式
講解實現原理之前我們先看看如何使用這個功能,功能的易用性也很重要,用戶還是希望盡可能得少寫代碼來完成這個工作。使用上主要是兩部分:創建SST文件 和 導入SST文件。
-
創建sst文件:這一步主要是通過一個sst_filter_writer,將需要導入的 k/v 數據轉換成sst文件
需要注意的是:
- 用戶k/v 數據需要按照options.comparator 嚴格有序,默認是按照key的字典序
- 這里的options 建議和db寫入的options用一套(壓縮選項,sst文件相關選項等)
Options options;SstFileWriter sst_file_writer(EnvOptions(), options); // 指定形成的sst文件的路徑 std::string file_path = "/home/usr/file1.sst";// open file_path Status s = sst_file_writer.Open(file_path); for (...) {// 寫入sst,用戶保證k/v 的順序s = sst_file_writer.Put(key, value);if (!s.ok()) {printf("Error while adding Key: %s, Error: %s\n", key.c_str(),s.ToString().c_str());return 1;} }// 完成寫入 s = sst_file_writer.Finish(); -
導入sst文件:這個步驟就是將創建好的一個或者多個sst文件導入到db中,也允許向多個cf中導入
IngestExternalFileOptions ifo;
// Ingest the 2 passed SST files into the DB
// 導入數據
Status s = db_->IngestExternalFile({"/home/usr/file1.sst", "/home/usr/file2.sst"}, ifo);
使用還是比較簡單的,整體的使用過程如下:
#include <iostream>
#include <vector>#include <gflags/gflags.h>#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/sst_file_writer.h>#define DATA_SIZE 10
#define VALUE_SIZE 1024using namespace std;// 比較函數
bool cmp(pair<string, string> str1,pair<string, string> str2) {if(str1.first < str2.first) {return true;} else if (str1.first == str2.first && str1.second < str2.second) {return true;} else {return false;}
}// 隨機字符串
static string rand_data(long data_range) {char buff[30];unsigned long long num = 1;for (int i = 0;i < 4; ++i) {num *= (unsigned long long )rand();}sprintf(buff, "%llu", num % (unsigned long long)data_range );string data(buff);return data;
}// 構造有序數據
void construct_data(vector<pair<string,string>> &input) {int i;string key;string value;for (i = 0;i < DATA_SIZE; i++) {if(key == "0") {continue;}key = rand_data(VALUE_SIZE);value = rand_data(VALUE_SIZE);input.push_back(make_pair(key, value));}
}void traverse_data(vector<pair<string,string>> input) {int i;for(auto data : input) {cout << data.first << " " << data.second << endl;}
}// 創建sst文件
int create_sst(string file_path) {vector<pair<string,string>> input;vector<pair<string,string>>::iterator input_itr;rocksdb::Options option;/* open statistics and disable compression */option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), option);rocksdb::Status s = sst_file_writer.Open(file_path);if (!s.ok()) {printf("Error while opening file %s, Error: %s\n", file_path.c_str(),s.ToString().c_str());return 1;}// 需要保證數據有序后再寫入construct_data(input);sort(input.begin(), input.end(), cmp);traverse_data(input);// Insert rows into the SST file, note that inserted keys must be // strictly increasing (based on options.comparator)for (input_itr = input.begin(); input_itr != input.end();input_itr ++) {rocksdb::Slice key(input_itr->first);rocksdb::Slice value(input_itr->second);s = sst_file_writer.Put(key, value);if (!s.ok()) {printf("Error while adding Key: %s, Error: %s\n",key.ToString().c_str(),s.ToString().c_str());return 1;}}// Close the files = sst_file_writer.Finish();if (!s.ok()) {printf("Error while finishing file %s, Error: %s\n", file_path.c_str(),s.ToString().c_str());return 1;}return 0;
}static rocksdb::DB *db;void create_db() {rocksdb::Options option;/* open statistics and disable compression */option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::Status s = rocksdb::DB::Open( option,"./db", &db);if (!s.ok()) {printf("Open db failed : %s\n", s.ToString().c_str());return;}
}void db_write(int num_keys) {rocksdb::WriteOptions write_option;write_option.sync = true;rocksdb::Slice key;rocksdb::Slice value;rocksdb::Status s;int i;printf("begin write \n");for (i = 0;i < num_keys; i++) {key = rand_data(VALUE_SIZE);value = rand_data(VALUE_SIZE);s = db->Put(write_option, key, value);if (!s.ok()) {printf("Put db failed : %s\n", s.ToString().c_str());return;}}db->Flush(rocksdb::FlushOptions());printf("finish write \n");
}int main() {// 先寫入一批數據create_db();db_write(100000);// 創建sst文件if (create_sst("./test.sst") == 0) {printf("creates sst success !\n");} else {printf("creates sst failed !\n");}// 導入數據rocksdb::IngestExternalFileOptions ifo;// Ingest the 2 passed SST files into the DBprintf("Ingest sst !\n");rocksdb::Status s = db->IngestExternalFile({"test.sst"}, ifo);if (!s.ok()) {printf("Error while adding file test.sst , Error %s\n",s.ToString().c_str());return 1;}return 0;
}
運行輸出如下:
begin write
finish write
# consturct data,需按照字典序,如果沒有按照字典序構造的話會報錯
1008 232
240 880
288 63
410 768
506 56
534 256
640 180
72 248
800 672
944 217
creates sst success !
通過db日志可以看到我們創建的sst文件test.sst被成功導入到db,形成了./db/000020.sst,且在db目錄中。
╰─$ cat db/LOG |grep ingested
[AddFile] External SST file test.sst was ingested in L0 with path ./db/000020.sst (global_seqno=200012)╰─$ ls db
000017.log 000020.sst IDENTITY LOG LOG.old.1618643738564935 OPTIONS-000008
000019.sst CURRENT LOCK LOG.old.1618123487361092 MANIFEST-000013 OPTIONS-000016
實現原理
從如何使用這個功能上我們能夠感覺到這一些數據并不是通過rocksdb正常的I/O流程寫入的。如果使用正常的接口,那我們用戶不需要排序,而是直接通過db->Put接口將k/v寫入,凡事都有但是,但是這樣來導入離線數據在rocksdb內部后續的flush/compaction 都會消耗大量的系統資源,而這并不是我們想要的高效。所以,rocksdb提供的ingest接口肯定不會讓這一些要導入的數據消耗過多的資源,接下來我們一起看看底層的詳細實現。
為了更形象得告訴大家在rocksdb作為存儲引擎的場景,如果通過傳統的put接口導入數據會多出哪一些I/O,如下圖
其中紅色的尖頭 是ingest file 相比于傳統的put接口 少的I/O部分,可以說ingest方式導入數據極大得節約了整個系統資源的開銷(包括但不限于I/O , CPU 資源的開銷)。
下面主要介紹的是有了sst文件,接下來如何導入到db中的過程。關于通過sst_file_writer創建具體的sst文件的過程就不多說了,也就是按照sst文件的格式(datablock,index block…footer)等將有序的數據一個個添加進去而已。
主要有如下幾步:
- 為待插入的sst文件創建file link到db目錄,或者直接拷貝進去
- 停止寫入,需要保證即將導入的sst文件在db中擁有一個安全合理的seqno,如果持續寫入,那這個seqno可能不會全局遞增了。
- 檢查導入的sst文件是否和memtable中的key-range有重疊,有的話需要flush memtable
- 為這個sst文件 按照其key-range挑選一個合適的level放進去
- 為這個問天添加一個全局的seqno
- 恢復db的寫入
其中停止寫入到恢復寫入這段時間對于用戶來說越小越好,所以ingest的性能很重要。
接下來看看詳細的源代碼實現:
導入數據的函數入口是DBImpl::IngestExternalFiles
導入的sst文件最后都需要形成一個db內部的sst文件,因為這個時候已經停止寫入了,所以會從最新的sst文件編號之后取一個文件編號,后續的其他要導入的sst文件會不斷追加。
Status DBImpl::IngestExternalFiles(const std::vector<IngestExternalFileArg>& args) {...// 構造文件編號到next_file_number中Status status = ReserveFileNumbersBeforeIngestion(static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,pending_output_elem, &next_file_number);if (!status.ok()) {InstrumentedMutexLock l(&mutex_);ReleaseFileNumberFromPendingOutputs(pending_output_elem);return status;}...
}
有了在db內部的合法文件編號,我們就可以進行文件遷移了,將待導入的sst文件遷移到db內部已經構造好的sst文件編號之中。
會為每一個cf構造一個ingest_job, 將待導入文件拷貝/移動到 db內部的sst文件中,這個過程是在接下來的Prepare函數中。
uint64_t start_file_number = next_file_number;for (size_t i = 1; i != num_cfs; ++i) {start_file_number += args[i - 1].external_files.size();auto* cfd =static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);// prepare 函數exec_results[i].second = ingestion_jobs[i].Prepare(args[i].external_files, start_file_number, super_version);exec_results[i].first = true;CleanupSuperVersion(super_version);}
看看Prepare的函數實現:
- 拿著輸入的多個sst文件,如果有多個,則需要檢查這一些文件之間是否有重疊key,有的話就不支持了(rocksdb除了l0,其他層不允許有重疊key)。
- 根據用戶指定的ingest option: move_files 是否為true,來將待導入文件move到db中, 如果move失敗了就拷貝文件。
Status ExternalSstFileIngestionJob::Prepare(const std::vector<std::string>& external_files_paths,uint64_t next_file_number, SuperVersion* sv) {// 解析文件信息for (const std::string& file_path : external_files_paths) {IngestedFileInfo file_to_ingest;status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);if (!status.ok()) {return status;}files_to_ingest_.push_back(file_to_ingest);}// 確保導入的多個sst文件之間沒有重疊......} else if (num_files > 1) {// Verify that passed files dont have overlapping rangesautovector<const IngestedFileInfo*> sorted_files;for (size_t i = 0; i < num_files; i++) {sorted_files.push_back(&files_to_ingest_[i]);}std::sort(sorted_files.begin(), sorted_files.end(),[&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {return sstableKeyCompare(ucmp, info1->smallest_internal_key,info2->smallest_internal_key) < 0;});// 如果有重疊的話,ingest也無法支持,因為在db中大于level0的更高層level內部的// sst文件之間是不允許有重疊的,加速更高層的二分查找。for (size_t i = 0; i < num_files - 1; i++) {if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,sorted_files[i + 1]->smallest_internal_key) >= 0) {files_overlap_ = true;break;}}}......// 根據用戶參數move文件if (ingestion_options_.move_files) {status = env_->LinkFile(path_outside_db, path_inside_db);...} else { // 否則就拷貝文件f.copy_file = true;}if (f.copy_file) {TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",nullptr);// CopyFile also sync the new file.status = CopyFile(env_, path_outside_db, path_inside_db, 0,db_options_.use_fsync);}...
}
到此,文件就已經進入到了rocksdb 之中,ingest_job的prepare流程就結束了。
接下來 就到了我們前面介紹總步驟的第二步,停止用戶對當前db的寫入:
DBImpl::IngestExternalFilesWriteThread::EnterUnbatched
其中WriteThread::EnterUnbatched函數會讓當前db的寫入線程都處于wait狀態。
接下來就是檢查當前要導入的文件是否和memtable中的key-range有重疊,函數調用如下:
DBImpl::IngestExternalFilesExternalSstFileIngestionJob::NeedsFlushColumnFamilyData::RangesOverlapWithMemtables
這個函數ColumnFamilyData::RangesOverlapWithMemtables會拿著從ingest files中構造好的key-range和memtable中的 key-range 進行對比,如果有重疊key,則會將memtable flush置為true
Status ColumnFamilyData::RangesOverlapWithMemtables(const autovector<Range>& ranges, SuperVersion* super_version,bool* overlap) {...Status status;// 拿著ingest files的range中的每一個key,看是否能夠從memtable中找到for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {auto* vstorage = super_version->current->storage_info();auto* ucmp = vstorage->InternalComparator()->user_comparator();InternalKey range_start(ranges[i].start, kMaxSequenceNumber,kValueTypeForSeek);// 從memtable中找memtable_iter->Seek(range_start.Encode());status = memtable_iter->status();ParsedInternalKey seek_result;if (status.ok()) {if (memtable_iter->Valid() &&!ParseInternalKey(memtable_iter->key(), &seek_result)) {status = Status::Corruption("DB have corrupted keys");}}// 找到了,則置overlap為trueif (status.ok()) {if (memtable_iter->Valid() &&ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {*overlap = true;} else if (range_del_agg.IsRangeOverlapped(ranges[i].start,ranges[i].limit)) {*overlap = true;}}}...
}
在后續的DBImpl::FlushMemTable函數中會flush memtable,不同的cf是分開進行的
DBImpl::IngestExternalFilesDBImpl::FlushMemTable
接下來就開始了第四步和第五步的處理邏輯,需要為每一個落到db中的sst文件挑選合適的level以及分配全局seqno,處理邏輯在Run函數中:
DBImpl::IngestExternalFilesExternalSstFileIngestionJob::Run
主要處理邏輯如下:
一個一個ingest file進行處理
-
選擇一個合適的level,將ingest file插入進去
如果user配置了allow_ingest_behind=true,即允許導入的數據直接插入到最后一層的文件位置,且ingest的時候配置的ingest option中ingest_behind=true,則會先嘗試插入到bottomest level,如果最后一層的文件和待插入的文件有重疊,則插入失敗。處理邏輯在CheckLevelForIngestedBehindFile函數之中。否則逐層遍歷,找到第一個和這一些key-range有重疊的level即可。函數
AssignLevelAndSeqnoForIngestedFile -
找到了合適的level的同時會記錄一個
assigned_seqno,是在當前last_sequence的基礎上+1得到的。函數AssignLevelAndSeqnoForIngestedFile之中。 -
為當前ingest_file 寫入一個global seq no, 并執行fsync/sync。函數
AssignGlobalSeqnoForIngestedFile之中。 -
最后就是將當完成更新的ingest file的元信息更新到
VersionEdit之中。
接下來就進入尾聲了:
- 將更新的
VersionEdit寫入到MANIFEST文件之中 - 更新每個ingest file對應的cf信息,并且調度compaction/flush, 因為之前ingest file時找的是有重疊key的一層。
- 恢復db的寫入
// 將`VersionEdit`寫入到MANIFEST文件之中status =versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,edit_lists, &mutex_, directories_.GetDbDir());}if (status.ok()) {for (size_t i = 0; i != num_cfs; ++i) {auto* cfd =static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();if (!cfd->IsDropped()) {//更新每個ingest file對應的cf信息,并且調度compaction/flush, 因為之前ingest file時找的是有重疊key的一層InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],*cfd->GetLatestMutableCFOptions());...}}}// 恢復db的寫入,喚醒db的其他所有的writerwrite_thread_.ExitUnbatched(&w);
到此,整個ingest就算是結束了。
總結
通過ingest的實現,我們能夠看到rocksdb通過ingest的方式支持離線數據導入確實能夠極大得降低系統資源的開銷。不需要一個key在LSM中被反復的寫入、讀取。
總結
以上是生活随笔為你收集整理的Rocksdb 通过ingestfile 来支持高效的离线数据导入的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: “非惜年芳绝”下一句是什么
- 下一篇: 关于Titandb Ratelimite