线程池的分析与实现
在需要頻繁開線程時,創建和銷毀線程會話費大量時間,為了提高效率,我們可以在任務開始前,先創建一定數量的線程。這樣在接收到任務時,就可以直接使用線程池中處于wait狀態的線程,在任務結束后線程回到wait狀態,等待新任務的到來,這就避免了線程的創建與銷毀,從而提高程序執行效率。
所需數據
- 需要存儲有多少線程( int thread_number )
- 需要開辟對應的數組,存儲線程號( pthread_t *threads )
- 需要一個任務隊列來存儲未執行的任務,便于線程競爭任務并執行( task_queue )
- 需要一個flag來標記線程池是否結束,該標記可以在線程池結束后喚醒所有處于等待線程的線程,讓它們可以正常退出(其中,所有線程處于脫離(detach)狀態)
- 互斥鎖與條件變量,用于避免在獲取與添加任務時發生錯誤(同步與互斥)
運行流程
- 執行ThreadPool的構造函數,初始化有關數據,進行線程的創建,并將線程進行脫離(使線程在運行完后可以自動回收)
- 創建的線程會去執行工作線程,如果任務隊列為空,則一直while循環等待,直到被喚醒(signal)去競爭任務。若競爭到任務,則去執行,執行完后操作與前面相同;若沒有競爭到任務,則回到wait狀態
- 有新任務到來,接收到信號的程序會給線程池的任務隊列添加信息,線程池便會喚醒處于wait的線程執行任務,或直接被剛執行完任務的線程繼續執行(由于是任務隊列,所以只有處于隊首的任務會被執行)
代碼實現(c++,后面會有對代碼的分析)
源代碼請看我的Github
封裝線程池的類
// threadPool.h#ifndef _THREADPOOL_H_ #define _THREADPOOL_H_#include "locker.h" // 該頭文件見文章最后 或 Github #include <queue> using namespace std;template< typename T > class ThreadPool { private:int thread_number; // 線程池的線程數pthread_t *threads; // 線程數組queue<T *> task_queue; // 任務隊列MutexLocker queue_mutex_locker; // 任務隊列的互斥鎖Cond queue_cond_locker; // 任務隊列的條件變量bool m_stop; // 是否結束線程 public:ThreadPool( int thread_num = 20 );~ThreadPool();bool append( T *task ); // 向任務隊列添加任務 private:static void *worker( void * ); // 工作線程void run(); // 線程池中線程開始運行的函數T *getTask(); // 從任務隊列中獲取隊首的任務 };template< typename T > ThreadPool<T>::ThreadPool( int thread_num ) :thread_number(thread_num), threads(NULL), m_stop(false) {if( thread_number < 0 ) {cout << "thread_number < 0\n";throw exception();}// 創建數組存放線程號threads = new pthread_t[ thread_number ];if( !threads ) {cout << "threads is NULL\n";throw exception();}// 創建規定數量的線程for( int i = 0; i < thread_number; i++ ) {// 由于pthread_create第三個參數必須指向靜態函數,要使用類成員函數和變量,只能通過:// 1) 類的靜態對象// 2) 將類的對象作為參數傳給靜態函數// 這里通過第二種方法實現if( pthread_create( &threads[i], NULL, worker, this ) ) { // 成功返回0delete[] threads; // 創建失敗則釋放所有已分配的空間cout << "pthread_create error\n";throw exception();}// 將線程進行脫離,線程運行完后自動回收,避免使用主線程進行join等待其結束if( pthread_detach( threads[i] ) ) {delete[] threads;cout << "pthread_detach error\n";throw exception();}} }// 析構函數中,將m_stop置true,此時將阻塞中的所有線程喚醒 // 由于 !m_stop 為false,線程會退出循環,線程結束被回收( 詳見函數run() ) // 若不喚醒線程,則在程序退出后,線程非正常結束,同時會導致 template< typename T > ThreadPool<T>::~ThreadPool() {delete[] threads;m_stop = true;queue_cond_locker.broadcast(); }/* 添加任務時需要先上鎖,并判斷隊列是否為空 */ template< typename T > bool ThreadPool<T>::append( T *task ) {queue_mutex_locker.mutex_lock();bool need_signal = task_queue.empty(); // 記錄添加任務之前隊列是否為空task_queue.push( task );queue_mutex_locker.mutex_unlock();// 如果添加任務之前隊列為空,即所有線程都在wait,所以需要喚醒某個線程if( need_signal ) {queue_cond_locker.signal();}return true; }// 線程函數,調用run()來使線程開始工作 template< typename T > void * ThreadPool<T>::worker( void *arg ) {ThreadPool *pool = ( ThreadPool * )arg;pool->run();return pool; }// 獲取處于隊首的任務,獲取時需要加鎖,避免發生錯誤 // 若隊列為空,則返回NULL,該線程成為等待狀態(詳見函數run()) template< typename T > T* ThreadPool<T>::getTask() {T *task = NULL;queue_mutex_locker.mutex_lock();if( !task_queue.empty() ) {task = task_queue.front();task_queue.pop();}queue_mutex_locker.mutex_unlock();return task; }template< typename T > void ThreadPool<T>::run() {while( !m_stop ) { // 當線程池沒有結束時,線程循環獲取任務進行執行T *task = getTask();if( !task ) {queue_cond_locker.wait(); // 隊列為空,線程開始等待} else {task->doit(); // 開始執行任務delete task; //task指向的對象在WebServer中new出來,因此需要手動delete}} }#endif代碼分析
首先需要注意,threadpool的實現使用了模板類,這樣就需要把類的定義與成員函數的實現放在一個文件里,因為成員函數不能單獨編譯。
關于m_stop的作用:可以發現m_stop的初值為false,這樣線程就可以一直循環等待執行任務( 見ThreadPool::run()成員函數 ),但是m_stop的值改變的位置在析構函數中,線程池都退出了,改變m_stop的值還有什么用呢?其實,在將m_stop的值改變后,又調用了broadcast()來喚醒所有線程,這樣所有被阻塞的線程就會開始執行,run()方法的循環條件是while( !m_stop ),這樣其循環會被破壞,進而線程結束,被自動回收。// PS:然而只要主線程(暫且就這樣叫了)退出( 不是通過pthread_exit()退出 ),那么所有線程就會被強制終止,同樣也不會正常退出,所以線程池析構函數中喚醒所有線程并不會有什么作用,可能被喚醒的線程仍會在主線程結束前未執行完,仍為非正常退出。可以通過sleep()來環節,不過這樣仍不會對正在執行任務的線程有很好的作用,因為任務執行時間未知。(可以考慮在析構函數中使用join()等待線程結束 )
在任務隊列進行添加任務、取任務、刪除任務時,為了避免多個線程對任務隊列同時操作,需要在進行修改時將任務隊列加鎖
由于pthread_create第三個參數必須指向靜態函數,要使用類成員函數和變量,只能通過:
1) 類的靜態對象
2) 將類的對象作為參數傳給靜態函數
這里通過第二種方法實現,將對象本身作為參數傳到線程工作函數,在線程函數中進行類型強轉獲取調用對象中存儲的數據。在從任務隊列獲取任務時:如果任務隊列非空,就加鎖獲得隊首任務,再將隊首出隊,將任務返回即可;如果任務隊列為空,即無法獲取任務,就會返回NULL,那么run()函數對應就會進入else后的語句,將線程進行阻塞,等待下一個任務的到來。
關于在run() 中 delete task;的說明:由于所有任務都是webServer類中使用new創建的(詳見我的Github),因為如果不適用new的話,在將任務加入隊列后,webServer.cpp中任務會出作用域,進行析構,那么任務可能會在執行完之前被析構,造成錯誤,所以使用new創建。然而,new創建的對象不會自動釋放,只能手動delete,因此在任務結束后進行delete,這樣才能使其資源釋放,同事可以調用Task的析構函數,關閉該任務對應的連接。
附:頭文件locker.h
封裝互斥鎖與條件變量的類
// locker.h #ifndef _LOCKER_H_ #define _LOCKER_H_#include <iostream> #include <exception> #include <pthread.h> using namespace std;/* 線程鎖 */ class MutexLocker { private:pthread_mutex_t m_mutex; public:MutexLocker() { //初始化if( pthread_mutex_init( &m_mutex, NULL ) ) {cout << "mutex init errror __ 1\n";throw exception();}}~MutexLocker() {pthread_mutex_destroy( &m_mutex );}bool mutex_lock() {return pthread_mutex_lock( &m_mutex ) == 0;}bool mutex_unlock() {return pthread_mutex_unlock( &m_mutex );} };/* 條件變量 */ class Cond { private:pthread_mutex_t m_mutex;pthread_cond_t m_cond; public:Cond() {if( pthread_mutex_init( &m_mutex, NULL ) ) {throw exception();}if( pthread_cond_init( &m_cond, NULL ) ) {pthread_cond_destroy( &m_cond );throw exception();}}~Cond() {pthread_mutex_destroy( &m_mutex );pthread_cond_destroy( &m_cond );}// 等待條件變量,cond與mutex搭配使用,避免造成共享數據的混亂bool wait() {pthread_mutex_lock( &m_mutex );int ret = pthread_cond_wait( &m_cond, &m_mutex );pthread_mutex_unlock( &m_mutex );return ret == 0;}// 喚醒等待該條件變量的某個線程bool signal() {return pthread_cond_signal( &m_cond ) == 0;}// 喚醒所有等待該條件變量的線程bool broadcast() {return pthread_cond_broadcast( &m_cond ) == 0;} };#endif總結
- 上一篇: 移动单页面中有哪些好的办法判断所有图片加
- 下一篇: 基于epoll+threadpool的w