C++11学习笔记-----互斥量以及条件变量的使用
在多線程環境中,當多個線程同時訪問共享資源時,由于操作系統CPU調度的緣故,經常會出現一個線程執行到一半突然切換到另一個線程的情況。以多個線程同時對一個共享變量做加法運算為例,自增的匯編指令大致如下,先將變量值存放在某個寄存器中(eax),然后對寄存器進行加一,隨后將結果回寫到變量內存上
mov [#address#] eax; // 這里#address#簡要表示目標變量的地址 // 1 inc eax; // 2 mov eax [#address#]; // 3假設存在兩個線程同時對變量a進行加法操作,a初值為0,如果其中一個線程在第一步執行完后被切走,那么最終a的結果可能不是2而是1
由圖片可知,由于cpu調度的緣故,多線程下同時對共享變量進行操作,可能會導致最終的結果并不是期望值。所以,為了保護共享變量,保證同一時刻只能允許一個線程對共享變量進行操作,就需要借助互斥量的協助
Linux下的原生互斥量
Linux下提供了原生互斥量api,定義在頭文件<pthread.t>中。互斥量,形象點理解就是一把鎖,在對共享變量進行操作之前,先上鎖,只有獲得鎖的這個線程能夠繼續運行,而其他線程運行到上鎖語句時,會阻塞在那里直到獲得鎖的那個線程執行解鎖操作,隨后繼續爭搶鎖,搶到鎖的線程接著執行,沒有搶到鎖的線程繼續阻塞
示例:利用互斥鎖解決多線程共享變量問題
在上一篇中提到了創建10個線程同時對一個共享變量進行自增,發現結果和預期不同,接下來利用互斥量解決這一問題
#include <unistd.h> #include <pthread.h> #include <sys/types.h>#include <iostream> #include <vector>long long int total = 0; pthread_mutex_t m;void* thread_task(void* arg) {for(int i = 0; i < 10000; ++i){/* 對total進行加法之前先上鎖,保證同一時刻只能有一個線程執行++total */::pthread_mutex_lock(&m);++total;/* 解鎖 */::pthread_mutex_unlock(&m);}::pthread_exit(nullptr); }int main() {/* 初始化互斥量 */::pthread_mutex_init(&m, nullptr);std::vector<pthread_t> tids;for(int i = 0; i < 10; ++i){pthread_t tid;::pthread_create(&tid, nullptr, thread_task, nullptr);tids.emplace_back(tid);}for(auto& tid : tids)::pthread_join(tid, nullptr);/* 釋放互斥量 */::pthread_mutex_destroy(&m);std::cout << total << std::endl;return 0; }C++11下的互斥量和條件變量
互斥量
對比linux原生的庫函數,C++11提供的互斥量突出的特點有
- 無需考慮互斥量的初始化和銷毀,在類的構造和析構函數中管理,無需使用者操心
- 采用RAII對互斥量進行了不同封裝,提供了更加友好的上鎖機制
C++11提供的互斥量位于<mutex>頭文件中,提供的接口有
- lock,上鎖
- try_lock,嘗試上鎖,如果失敗則返回false
- unlock,解鎖
這三個函數和linux下的接口差不多,其實也沒什么不同嘛~。事實上,多數程序都不直接使用std::mutex,標準庫采用RAII(資源獲取時就進行初始化)對std::mutex進行了封裝,使用起來當然是方便得不得了
簡單的鎖機制lock_guard
最簡單的封裝是std::lock_guard,單純利用RAII,構造時上鎖,析構時解鎖,使用示例為
#include <iostream> #include <thread> #include <mutex> #include <vector>int main() {long long int total = 0;std::mutex m;std::vector<std::thread> threads; for(int i = 0; i < 10; ++i){threads.emplace_back([&m, &total]{for(int i = 0; i < 10000; ++i){{std::lock_guard<std::mutex> lock(m);++total;}}});}for(auto& th : threads)th.join();std::cout << total << std::endl;return 0; }想對于共享數據的提供保護,使用std::lock_guard是完全沒有問題的,進入共享區前上鎖,離開后解鎖
更靈活的鎖unique_lock
稍微復雜的封裝是std::unique_lock,它提供了更靈活的上鎖機制,即通過構造函數的參數進行設置,分別可以
- 直接上鎖
- 延遲上鎖,僅保存互斥量,不進行上鎖工作
- 嘗試上鎖
但是多數情況下采用默認的直接上鎖就可以了,而在std::unique_lock的生存期間,使用者也可以對其進行解鎖再上鎖等工作,這個作用體現在和條件變量的配合上
條件變量
標準庫中條件變量位于頭文件<condition_variable>中
其中有三個接口用于阻塞當前線程,常用的是wait
void wait(std::unique_lock<std::mutex>& lock); template <class Predicate> void wait(std::unique_lock<std::mutex>& lock, Predicate pred);原子操作釋放鎖lock,阻塞當前線程,并將當前線程添加到*this上的等待線程列表,等待notify_one或者notify_all調用時結束阻塞(第二個重載當pred返回true時也會結束阻塞)
此外,還有兩個接口用于通知一個或多個等待線程,將其從阻塞狀態變為非阻塞
void notify_one() noexcept; void notify_all() noexcept;示例,利用互斥量和條件變量實現線程池
線程池工作原理
線程池的工作原理是預先創建若干線程,同時維護一個任務隊列,每個線程不斷地從任務隊列中取出任務并執行,使用者可以隨時向任務隊列中添加新任務。當任務隊列為空,線程池中的線程要么執行自己那個沒有結束的任務,要么處于睡眠狀態
在這個問題模型中,任務隊列就相當于共享變量,同一時刻只能有一個線程訪問任務隊列并從中取出任務,而添加任務時也需要避免添加和取出同時進行,這就需要互斥量的協助,凡是涉及到對任務隊列的存和取,都需要事先上鎖。
另外,如果任務隊列為空,那么每個線程都不斷的上鎖,取任務(發現為空),解鎖,再上鎖,取任務(發現為空),解鎖…這樣的busy loop會極大消耗cpu,造成了不必要的開銷,所以需要引入條件變量,當任務隊列為空時,采用條件變量令線程睡眠
線程池定義
可以明確的是,線程池除了構造析構函數外,需要提供一個接口用于調用者添加任務,所以線程池的定義可以明確如下
#include <future> #include <thread> #include <mutex> #include <condition_variable> #include <vector> #include <queue>class ThreadPool {public:ThreadPool(std::size_t threadNums);~ThreadPool();void stop() { quit_ = true; }public:/* 用于添加任務,std::future<>用于保存函數f的執行結果 */template <class F, class... Args>auto enqueue(F&& f, Args... args)-> std::future<typename std::result_of<F(Args...)>::type>;private:std::vector<std::thread> threads_;std::queue<std::function<void()>> tasks_;std::atomic<bool> quit_;std::mutex mutex_;std::condition_variable cond_; };構造函數
當線程池構造時,創建threadNums個線程,每個線程都從任務隊列中取出任務然后執行
ThreadPool::ThreadPool(std::size_t threadNums): quit_(false) {for(std::size_t i = 0; i < threadNums; ++i){threads_.emplace_back([this]{while(!this->quit_){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->mutex_);/* 利用條件變量,等待直到線程池退出或者任務隊列不為空 */cond_.wait(lock, [this]() { return this->quit_ || !this->tasks_.empty(); });if(this->quit_) return;task = this->tasks_.front();this->tasks_.pop();}task();}}); } }析構函數
析構函數用于回收線程資源
ThreadPool::~ThreadPool() {stop();cond_.notify_all();for(auto& th : threads_)th.join(); }添加任務
enqueue函數用于添加任務,涉及到了一些std::future的內容,這里先簡單看看
/* class... 表示不定長參數列表 */ template <class F, class... Args> /* * auto會根據->后的內容自動推導返回類型* std::future用于保存函數運行結果* std::result_of用于獲取函數運行結果* std::packaged_task<T>是一個函數包,類似std::function,用于包裝函數* std::packaged_task<T>::get_future用于返回函數運行結果* std::unique_lock<std::mutex> 上鎖(這里也可以用std::lock_guard */ auto ThreadPool::enqueue(F&& f, Args... args)-> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();std::unique_lock<std::mutex> lock(mutex_);tasks_.push([task]() { (*task)(); });return res; }測試代碼
int main() {ThreadPool pool(4);std::vector<std::future<int>> results;for(int i = 0; i < 10; ++i){results.emplace_back(pool.enqueue([i]{/* std::this_thread::sleep_for(std::chrono::seconds(1)); */return i * i;}));}for(auto&& result : results)std::cout << result.get() << std::endl;return 0; }小結
互斥鎖是多線程環境中不可缺少的重要部分,用于保護共享資源免受cpu調度的危害。另外,條件變量和互斥鎖配合使用可以避免busy loop帶來的不必要損耗
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的C++11学习笔记-----互斥量以及条件变量的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode-----以单词
- 下一篇: C++11学习笔记-----获取异步操作