日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Rocksdb 的优秀代码(三)-- 工业级 线程池实现分享

發布時間:2023/11/27 生活经验 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Rocksdb 的优秀代码(三)-- 工业级 线程池实现分享 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • 前言
    • 1. Rocksdb線程池概覽
    • 2. Rocksdb 線程池實現
      • 2.1 基本數據結構
      • 2.2 線程池創建
      • 2.3 線程池 調度線程執行
      • 2.4 線程池銷毀線程
      • 2.5 線程池優先級調度
      • 2.6 動態調整線程池 線程數目上限
    • 3. 總結

前言

Rocksdb 作為一個第三方庫的形態嵌入到各個存儲系統之中存儲元數據,當rocksdb被使用的時候其內部會自啟動一些線程,隨著需要處理的用戶數據越來越多,為了保證性能,rocksdb會讓這一些線程也會不斷增加。而在分布式存儲場景,往往一個機器節點會有很多rocksdb實例(64個實例,每一個實例都會有compaction/flush線程),這個時候在Rocksdb內部使用合理的線程管理方式會節省系統CPU調度資源。

所以Rocksdb自實現的Thread Pool就是為了更好得管理Rocksdb內部線程,除了一些基本的線程調度之外,還會有可控制的線程優先級的調度,因為大多數場景Rocksdb讓Flush線程的優先級高于Compaction線程,而有的場景則需要Compaction的優先級高于Flush,為了更快速的compaction清理掉舊數據。

接下來簡單看一下Rocksdb 線程池的基本實現,本人已經將該線程池代碼摘出來單獨維護,可作為一個獨立線程池去調度。

https://github.com/BaronStack/ThreadPool

線程池存在的目的 正如上面Rocksdb使用線程池的目的一樣, 能夠更加方便得管理我們應用中的線程,包括但不限于:線程創建,線程資源約束,線程優先級調度,線程銷毀 等。

1. Rocksdb線程池概覽

Rocksdb 實現的線程池支持的特性:

  • 創建/銷毀線程
  • 動態增加、減少線程池線程數目上限(線程池數目需要設置上限,因為Compaction/Flush占用的資源也不能無限增加,需根據實際的Rocksdb 寫入量來動態增加)
  • 支持動態調整 線程CPU 和 I/O優先級(為了暴露足夠的接口給用戶,來讓用戶選擇兩個功能調度的優先順序)

2. Rocksdb 線程池實現

2.1 基本數據結構

// 線程池核心的數據結構
struct Impl {private:bool low_io_priority_;  // I/O 優先級bool low_cpu_priority_; // CPU 優先級Env::Priority priority_; // 線程優先級Env*         env_;       // 獲取當前線程池的環境變量int total_threads_limit_; // 線程池線程總數std::atomic_uint queue_len_;  // 當前線程池中執行線程的排隊長度bool exit_all_threads_; // 清理線程池時會調度所有未執行的線程bool wait_for_jobs_to_complete_; // 等待所有線程池的線程執行完畢// Entry per Schedule()/Submit() callstruct BGItem {void* tag = nullptr;std::function<void()> function; // 執行函數std::function<void()> unschedFunction; // 不執行函數};using BGQueue = std::deque<BGItem>;BGQueue       queue_; // deque 保存線程池中調度的線程相關的信息:線程函數、函數參數std::mutex               mu_;std::condition_variable  bgsignal_; // 條件變量,喚醒正在睡眠的線程std::vector<port::Thread> bgthreads_; // 保存需要調度的線程
}

線程池類:

class ThreadPoolImpl : public ThreadPool {private:std::unique_ptr<Impl>   impl_;// 線程池核心數據結構
};

2.2 線程池創建

Rocksdb維護了一個Env 類,這個類再同一個進程中的多個rocksdb實例之間是能夠共享的。所以Rocksdb將這個類作為線程池的入口,從而讓Flush/Compaction 這樣的線程調度過程中,多個db可以只使用同一個線程池。

Rocksdb實現了多個環境變量:HdfsEnvPosixEnv等,方便Rocksdb的文件操作/線程操作 接口在不同的環境平臺下進行擴展,當然如果用戶變更了新的平臺,只需要支持Env基類的接口,就能擴展到用戶的新平臺。

Env默認實例是PosixEnv,為了保證多db實例間共享同一個環境變量,PosixEnv僅維護一個單例。

// 創建Env,初始化幾個類的單例
// 這里注意調用的順序,先調用ThreadLocalPtr實例的初始化,再調用PosixEnv的
// 這樣在Env析構的時候能夠反方向析構,從而保證ThreadLocal的信息最后一個被清理
Env* Env::Default() {ThreadLocalPtr::InitSingletons(); // Threadlocal 實例數據,用來訪問當前db實例運行的線程狀態信息CompressionContextCache::InitSingleton();INIT_SYNC_POINT_SINGLETONS();static PosixEnv default_env; // 創建posix env	return &default_env;
}

緊接著通過 PosixEnv的構造函數創建線程池

// 根據Env設置的線程優先級,為每一個優先級創建一個線程池(方便優先級線程池的調度)
// 創建多個線程池: enum Priority { BOTTOM, LOW, HIGH, USER, TOTAL };
std::vector<ThreadPoolImpl> thread_pools_;PosixEnv::PosixEnv(): checkedDiskForMmap_(false),forceMmapOff_(false),page_size_(getpagesize()),thread_pools_(Priority::TOTAL),allow_non_owner_access_(true) {ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));// 根據優先級創建線程池,默認創建四個線程池,但一般只會用到兩個(LOW,HIGH)for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {thread_pools_[pool_id].SetThreadPriority(static_cast<Env::Priority>(pool_id));// This allows later initializing the thread-local-env of each thread.thread_pools_[pool_id].SetHostEnv(this);}thread_status_updater_ = CreateThreadStatusUpdater();
}

2.3 線程池 調度線程執行

線程池調度棧如下:從入口到具體的線程函數的執行

Env::Schedule() // Env對外接口PosixEnv::Schedule()ThreadPoolImpl::Schedule() // 線程池的調度入口ThreadPoolImpl::Impl::Submit() // 將線程函數、參數、線程回收函數封裝,添加到待調度隊列queue_ThreadPoolImpl::Impl::StartBGThreads() ThreadPoolImpl::Impl::BGThreadWrapper() // 更新當前執行的線程狀態并啟動一個調度隊列中的線程ThreadPoolImpl::Impl::BGThread()// 從待調度隊列queue_中調度線程func() // 執行線程函數

Env的實例調用Schedule接口,接收待調度的線程執行函數,參數,所屬優先級線程池,以及線程銷毀函數及其參數。

  virtual void Schedule(void (*function)(void* arg), void* arg,Priority pri = LOW, void* tag = nullptr,void (*unschedFunction)(void* arg) = nullptr) = 0;

后續會執行到ThreadPoolImpl::Impl::Submit()

void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,std::function<void()>&& unschedule, void* tag) {// 后續需要更新當前線程池的線程調度隊列,需要保證更新過程的原子性std::lock_guard<std::mutex> lock(mu_);// 需要銷毀線程池了,不接受新的線程加入if (exit_all_threads_) {return;}// 啟動線程StartBGThreads();// 更新線程函數相關的信息 到線程調度隊列尾部(雙端隊列)queue_.push_back(BGItem());// 更新auto& item = queue_.back();item.tag = tag;item.function = std::move(schedule);item.unschedFunction = std::move(unschedule);queue_len_.store(static_cast<unsigned int>(queue_.size()),std::memory_order_relaxed);// 如果正在執行的線程沒有超過線程池線程數限制,則喚醒一個正在休眠的線程if (!HasExcessiveThread()) {// Wake up at least one waiting thread.bgsignal_.notify_one();} else { // 。。。這個邏輯不太懂,超過限制之后 不應該就不喚醒了嗎?// Need to wake up all threads to make sure the one woken// up is not the one to terminate.WakeUpAllThreads();}
}

后續的執行就是按照以上調用棧進行的,從線程調度隊列頭部取線程函執行。

2.4 線程池銷毀線程

線程池的銷毀也就是Env變量的析構函數,db被destory或者close,則會進入該邏輯,Env的默認環境變量是PosixEnv,即Env的子類。則會先調用PosixEnv 的析構函數,其中線程池相關的清理邏輯:

整體的調用棧如下:

~PosixEnv()ThreadPoolImpl::JoinAllThreads() ThreadPoolImpl::Impl::JoinThreads()

在析構函數中調用相關的線程清理工作:

~PosixEnv() override {// 通過Posix startthread 的接口調度的線程函數并發執行完畢for (const auto tid : threads_to_join_) {pthread_join(tid, nullptr);}// 讓不同優先級線程池中待執行線程執行完for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {thread_pools_[pool_id].JoinAllThreads();}// 放置Posix析構過程中不應該thread_status_updater_ ,防止一些子線程更新線程狀態出錯// Delete the thread_status_updater_ only when the current Env is not// Env::Default().  This is to avoid the free-after-use error when// Env::Default() is destructed while some other child threads are// still trying to update thread status.if (this != Env::Default()) {delete thread_status_updater_;}
}

其中JoinAllThreads函數用來喚醒所有子線程的執行,并設置標記防止接收新的線程

void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {std::unique_lock<std::mutex> lock(mu_);assert(!exit_all_threads_);wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;// 原子(加鎖)方式更新如下變量,用作在submit函數中屏蔽接收新的線程exit_all_threads_ = true;// prevent threads from being recreated right after they're joined, in case// the user is concurrently submitting jobs.// 重置線程池的線程上限,防止用戶并發調用submit添加待調度線程total_threads_limit_ = 0;lock.unlock();bgsignal_.notify_all(); //喚醒所有等待在bgsignal_的線程for (auto& th : bgthreads_) {// join 執行,直到執行完。th.join();}bgthreads_.clear();exit_all_threads_ = false;wait_for_jobs_to_complete_ = false;
}

2.5 線程池優先級調度

之前說過Rocksdb線程池支持 用戶針對不同LOW/HIGH 線程池的I/O或者CPU的優先級設置。

比如 設置LOW線程池具有更低的I/O優先級和CPU優先級

target_->LowerThreadPoolIOPriority(Env::Priority::LOW);
target_->LowerThreadPoolCPUPriority(Env::Priority::LOW);

具體底層的設置方式是針對之前提到的線程數據結構中的兩個參數Impl::low_io_priority_Impl::low_c pu_priority_進行置位true。在ThreadPoolImpl::Impl::BGThread調度函數執行之前,會通過系統調用setprioritysyscall(SYS_ioprio_set,,,)設置當前線程的I/O和CPU優先級。

void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {bool low_io_priority = false;bool low_cpu_priority = false;while (true) {// Wait until there is an item that is ready to runstd::unique_lock<std::mutex> lock(mu_);...bool decrease_io_priority = (low_io_priority != low_io_priority_);bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);lock.unlock();#ifdef OS_LINUX// Linux 系統支持 設置CPU優先級if (decrease_cpu_priority) {setpriority(PRIO_PROCESS,// Current thread.0,// Lowest priority possible.19);low_cpu_priority = true;}if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)// Put schedule into IOPRIO_CLASS_IDLE class (lowest)// These system calls only have an effect when used in conjunction// with an I/O scheduler that supports I/O priorities. As at// kernel 2.6.17 the only such scheduler is the Completely// Fair Queuing (CFQ) I/O scheduler.// To change scheduler://  echo cfq > /sys/block/<device_name>/queue/schedule// Tunables to consider://  /sys/block/<device_name>/queue/slice_idle//  /sys/block/<device_name>/queue/slice_sync// 設置I/O優先級syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS0,                  // current threadIOPRIO_PRIO_VALUE(3, 0));low_io_priority = true;}
#else// 非Linux系統的話就不做任何處理了,僅僅保證變量被使用而已,防止編譯warning (void)decrease_io_priority;  // avoid 'unused variable' error(void)decrease_cpu_priority;
#endiffunc();}
}

2.6 動態調整線程池 線程數目上限

支持動態調整線程池可調度的線程數目上限,這個能夠限制線程池資源的占用,主要用作Rocksdb 中調整Flush和Compaction的各自所處的HIGH和LOW線程池中的線程數目上限。能夠根據db的工作負載,動態增加或者減少線程池中可調度的線程數目。

void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {impl_->SetBackgroundThreadsInternal(num, false);
}void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,bool allow_reduce) {std::unique_lock<std::mutex> lock(mu_);// 如果線程池已經要被銷毀了,就不用增加線程池的調度線程數目上限了if (exit_all_threads_) {lock.unlock();return;}// 增加線程數目或者減少線程數目// 喚醒休眠的線程并調度后臺線程繼續執行。if (num > total_threads_limit_ ||(num < total_threads_limit_ && allow_reduce)) {total_threads_limit_ = std::max(0, num);WakeUpAllThreads();StartBGThreads();}
}

3. 總結

到此整個線程池的基本實現就描述完成了,這是一個非常成熟的線程池(經歷過接近十年的工業級考驗,2012年facebook開始開發rocksdb),規模雖小,但五臟俱全。其能夠支撐引擎級別的線程調度壓力,保證引擎的核心邏輯flush和compaction的高效調度。

目前該線程池的獨立實現已經放在了https://github.com/BaronStack/ThreadPool 中,擁有完備的線程池調度/銷毀,優先級配置,歡迎star。

總結

以上是生活随笔為你收集整理的Rocksdb 的优秀代码(三)-- 工业级 线程池实现分享的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。