线程池原理及创建(C++实现)
本文給出了一個通用的線程池框架,該框架將與線程執行相關的任務進行了高層次的抽象,使之與具體的執行任務無關。另外該線程池具有動態伸縮性,它能根據執行任務的輕重自動調整線程池中線程的數量。文章的最后,我們給出一個簡單示例程序,通過該示例程序,我們會發現,通過該線程池框架執行多線程任務是多么的簡單。
為什么需要線程池
目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,但處理時間卻相對較短。
傳統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就是是“即時創建,即時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處于不停的創建線程,銷毀線程的狀態。
我們將傳統方案中的線程執行過程分為三個過程:T1、T2、T3。
T1:線程創建時間
T2:線程執行時間,包括線程的同步等時間
T3:線程銷毀時間
那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務執行時間很頻繁的話,這筆開銷將是不可忽略的。
除此之外,線程池能夠減少創建的線程個數。通常線程池所允許的并發線程是有上界的,如果同時需要并發的線程數超過上界,那么一部分線程將會等待。而傳統方案中,如果同時請求數目為2000,那么最壞情況下,系統可能需要產生2000個線程。盡管這不是一個很大的數目,但是也有部分機器可能達不到這種要求。
因此線程池的出現正是著眼于減少線程池本身帶來的開銷。線程池采用預創建的技術,在應用程序啟動之后,將立即創建一定數量的線程(N1),放入空閑隊列中。這些線程都是處于阻塞(Suspended)狀態,不消耗CPU,但占用較小的內存空間。當任務到來后,緩沖池選擇一個空閑線程,把任務傳入此線程中運行。當N1個線程都在處理任務后,緩沖池自動創建一定數量的新線程,用于處理更多的任務。在任務執行完畢后線程也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閑時,大部分線程都一直處于暫停狀態,線程池自動銷毀一部分線程,回收系統資源。
基于這種預創建技術,線程池將線程創建和銷毀本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的線程本身開銷則越小,不過我們另外可能需要考慮進去線程之間同步所帶來的開銷。
?
構建線程池框架
一般線程池都必須具備下面幾個組成部分:
線程池管理器:用于創建并管理線程池
工作線程::線程池中實際執行的線程
任務接口::盡管線程池大多數情況下是用來支持網絡服務器,但是我們將線程執行的任務抽象出來,形成任務接口,從而是的線程池與具體的任務無關。
任務隊列:線程池的概念具體到實現則可能是隊列,鏈表之類的數據結構,其中保存執行線程。
我們實現的通用線程池框架由五個重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。
CJob是所有的任務的基類,其提供一個接口Run,所有的任務類都必須從該類繼承,同時實現Run方法。該方法中實現具體的任務邏輯。
CThread是Linux中線程的包裝,其封裝了Linux線程最經常使用的屬性和方法,它也是一個抽象類,是所有線程類的基類,具有一個接口Run。
CWorkerThread是實際被調度和執行的線程類,其從CThread繼承而來,實現了CThread中的Run方法。
CThreadPool是線程池類,其負責保存線程,釋放線程以及調度線程。
CThreadManage是線程池與用戶的直接接口,其屏蔽了內部的具體實現。
CThreadMutex用于線程之間的互斥。
CCondition則是條件變量的封裝,用于線程之間的同步。
它們的類的繼承關系如下圖所示:
?
線程池的時序很簡單,如下圖所示。CThreadManage直接跟客戶端打交道,其接受需要創建的線程初始個數,并接受客戶端提交的任務。這兒的任務是具體的非抽象的任務。CThreadManage的內部實際上調用的都是CThreadPool的相關操作。CThreadPool創建具體的線程,并把客戶端提交的任務分發給CWorkerThread,CWorkerThread實際執行具體的任務。
?(若有了線程池框架,我們所需要的做的就是派生CJob類,將需要完成的任務實現在Run方法中。然后將該Job交由CThreadManage去執行)
理解系統組件
下面我們分開來了解系統中的各個組件。
CThreadManage
CThreadManage的功能非常簡單,其提供最簡單的方法,其類定義如下:
class CThreadManage
{
private:
??? CThreadPool*??? m_Pool;
??? int????? ??? m_NumOfThread;
protected:
public:
??? void ??? SetParallelNum(int num);
??? CThreadManage();
??? CThreadManage(int num);
??? virtual ~CThreadManage();
?
??? void??? Run(CJob* job,void* jobdata);
??? void??? TerminateAll(void);
};
其中m_Pool指向實際的線程池;m_NumOfThread是初始創建時候允許創建的并發的線程個數。另外Run和TerminateAll方法也非常簡單,只是簡單的調用CThreadPool的一些相關方法而已。其具體的實現如下:
CThreadManage::CThreadManage(){
??? m_NumOfThread = 10;
??? m_Pool = new CThreadPool(m_NumOfThread);
?}
CThreadManage::CThreadManage(int num){
??? m_NumOfThread = num;
??? m_Pool = new CThreadPool(m_NumOfThread);
}
CThreadManage::~CThreadManage(){
??? if(NULL != m_Pool)
??? delete m_Pool;
}
void CThreadManage::SetParallelNum(int num){
??? m_NumOfThread = num;
}
void CThreadManage::Run(CJob* job,void* jobdata){
??? m_Pool->Run(job,jobdata);
}
void CThreadManage::TerminateAll(void){
??? m_Pool->TerminateAll();
}
CThread
CThread?類實現了對Linux中線程操作的封裝,它是所有線程的基類,也是一個抽象類,提供了一個抽象接口Run,所有的CThread都必須實現該Run方法。CThread的定義如下所示:
class CThread
{
private:
??? int????? ??? m_ErrCode;
??? Semaphore??? m_ThreadSemaphore;? //the inner semaphore, which is used to realize
??? unsigned ??? long m_ThreadID;???
??? bool ??????? m_Detach;?????? //The thread is detached
??? bool ??????? m_CreateSuspended;? //if suspend after creating
??? char*??? ??? m_ThreadName;
??? ThreadState m_ThreadState;????? //the state of the thread
protected:
??? void ??? SetErrcode(int errcode){m_ErrCode = errcode;}
??? static void* ThreadFunction(void*);
public:
??? CThread();
??? CThread(bool createsuspended,bool detach);
??? virtual ~CThread();
??? virtual void Run(void) = 0;
??? void ??? SetThreadState(ThreadState state){m_ThreadState = state;}
?
??? bool ??? Terminate(void);??? //Terminate the threa
??? bool ??? Start(void);??????? //Start to execute the thread
??? void ??? Exit(void);
??? bool ??? Wakeup(void);
???
??? ThreadState? GetThreadState(void){return m_ThreadState;}
??? int????? GetLastError(void){return m_ErrCode;}
??? void ??? SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);}
??? char*??? GetThreadName(void){return m_ThreadName;}
??? int????? GetThreadID(void){return m_ThreadID;}
?
??? bool ??? SetPriority(int priority);
??? int????? GetPriority(void);
??? int????? GetConcurrency(void);
??? void ??? SetConcurrency(int num);
??? bool ??? Detach(void);
??? bool ??? Join(void);
??? bool ??? Yield(void);
??? int????? Self(void);
};
線程的狀態可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創建后不想立即執行任務,那么我們可以將其“暫停”,如果需要運行,則喚醒。有一點必須注意的是,一旦線程開始執行任務,將不能被掛起,其將一直執行任務至完畢。
線程類的相關操作均十分簡單。線程的執行入口是從Start()函數開始,其將調用函數ThreadFunction,ThreadFunction再調用實際的Run函數,執行實際的任務。
?
CThreadPool
CThreadPool是線程的承載容器,一般可以將其實現為堆棧、單向隊列或者雙向隊列。在我們的系統中我們使用STL Vector對線程進行保存。CThreadPool的實現代碼如下:
class CThreadPool
{
friend class CWorkerThread;
private:
??? unsigned int m_MaxNum;?? //the max thread num that can create at the same time
??? unsigned int m_AvailLow; //The min num of idle thread that shoule kept
??? unsigned int m_AvailHigh;??? //The max num of idle thread that kept at the same time
??? unsigned int m_AvailNum; //the normal thread num of idle num;
??? unsigned int m_InitNum;? //Normal thread num;
protected:
??? CWorkerThread* GetIdleThread(void);
?
??? void??? AppendToIdleList(CWorkerThread* jobthread);
??? void??? MoveToBusyList(CWorkerThread* idlethread);
??? void??? MoveToIdleList(CWorkerThread* busythread);
?
??? void??? DeleteIdleThread(int num);
??? void??? CreateIdleThread(int num);
public:
??? CThreadMutex m_BusyMutex;??? //when visit busy list,use m_BusyMutex to lock and unlock
??? CThreadMutex m_IdleMutex;??? //when visit idle list,use m_IdleMutex to lock and unlock
??? CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
??? CThreadMutex m_VarMutex;
?
??? CCondition?????? m_BusyCond; //m_BusyCond is used to sync busy thread list
??? CCondition?????? m_IdleCond; //m_IdleCond is used to sync idle thread list
??? CCondition?????? m_IdleJobCond;? //m_JobCond is used to sync job list
??? CCondition?????? m_MaxNumCond;
?
??? vector<CWorkerThread*>?? m_ThreadList;
??? vector<CWorkerThread*>?? m_BusyList;???? //Thread List
??? vector<CWorkerThread*>?? m_IdleList; //Idle List
?
??? CThreadPool();
??? CThreadPool(int initnum);
??? virtual ~CThreadPool();
?
??? void??? SetMaxNum(int maxnum){m_MaxNum = maxnum;}
??? int???? GetMaxNum(void){return m_MaxNum;}
??? void??? SetAvailLowNum(int minnum){m_AvailLow = minnum;}
??? int???? GetAvailLowNum(void){return m_AvailLow;}
??? void??? SetAvailHighNum(int highnum){m_AvailHigh = highnum;}
??? int???? GetAvailHighNum(void){return m_AvailHigh;}
??? int???? GetActualAvailNum(void){return m_AvailNum;}
??? int???? GetAllNum(void){return m_ThreadList.size();}
??? int???? GetBusyNum(void){return m_BusyList.size();}
??? void??? SetInitNum(int initnum){m_InitNum = initnum;}
??? int???? GetInitNum(void){return m_InitNum;}
??
??? void??? TerminateAll(void);
??? void??? Run(CJob* job,void* jobdata);
};
CThreadPool::CThreadPool()
{
??? m_MaxNum = 50;
??? m_AvailLow = 5;
??? m_InitNum=m_AvailNum = 10 ;??
??? m_AvailHigh = 20;
?
??? m_BusyList.clear();
??? m_IdleList.clear();
??? for(int i=0;i<m_InitNum;i++){
??? CWorkerThread* thr = new CWorkerThread();
??? thr->SetThreadPool(this);
??? AppendToIdleList(thr);
??? thr->Start();
??? }
}
?
CThreadPool::CThreadPool(int initnum)
{
??? assert(initnum>0 && initnum<=30);
??? m_MaxNum?? = 30;
??? m_AvailLow = initnum-10>0?initnum-10:3;
??? m_InitNum=m_AvailNum = initnum ;??
??? m_AvailHigh = initnum+10;
?
??? m_BusyList.clear();
??? m_IdleList.clear();
??? for(int i=0;i<m_InitNum;i++){
??? CWorkerThread* thr = new CWorkerThread();
??? AppendToIdleList(thr);
??? thr->SetThreadPool(this);
??? thr->Start();?????? //begin the thread,the thread wait for job
??? }
}
?
CThreadPool::~CThreadPool()
{
?? TerminateAll();
}
?
void CThreadPool::TerminateAll()
{
??? for(int i=0;i < m_ThreadList.size();i++) {
??? CWorkerThread* thr = m_ThreadList[i];
??? thr->Join();
??? }
??? return;
}
?
CWorkerThread* CThreadPool::GetIdleThread(void)
{
??? while(m_IdleList.size() ==0 )
??? m_IdleCond.Wait();
???
??? m_IdleMutex.Lock();
??? if(m_IdleList.size() > 0 )
??? {
??? CWorkerThread* thr = (CWorkerThread*)m_IdleList.front();
??? printf("Get Idle thread %dn",thr->GetThreadID());
??? m_IdleMutex.Unlock();
??? return thr;
??? }
??? m_IdleMutex.Unlock();
?
??? return NULL;
}
?
//add an idle thread to idle list
void CThreadPool::AppendToIdleList(CWorkerThread* jobthread)
{
??? m_IdleMutex.Lock();
??? m_IdleList.push_back(jobthread);
??? m_ThreadList.push_back(jobthread);
??? m_IdleMutex.Unlock();
}
?
//move and idle thread to busy thread
void CThreadPool::MoveToBusyList(CWorkerThread* idlethread)
{
??? m_BusyMutex.Lock();
??? m_BusyList.push_back(idlethread);
??? m_AvailNum--;
??? m_BusyMutex.Unlock();
??
??? m_IdleMutex.Lock();
??? vector<CWorkerThread*>::iterator pos;
??? pos = find(m_IdleList.begin(),m_IdleList.end(),idlethread);
??? if(pos !=m_IdleList.end())
??? m_IdleList.erase(pos);
??? m_IdleMutex.Unlock();
}
?
void CThreadPool::MoveToIdleList(CWorkerThread* busythread)
{
??? m_IdleMutex.Lock();
??? m_IdleList.push_back(busythread);
??? m_AvailNum++;
??? m_IdleMutex.Unlock();
?
??? m_BusyMutex.Lock();
??? vector<CWorkerThread*>::iterator pos;
??? pos = find(m_BusyList.begin(),m_BusyList.end(),busythread);
??? if(pos!=m_BusyList.end())
??? m_BusyList.erase(pos);
??? m_BusyMutex.Unlock();
?
??? m_IdleCond.Signal();
??? m_MaxNumCond.Signal();
}
?
//create num idle thread and put them to idlelist
void CThreadPool::CreateIdleThread(int num)
{
??? for(int i=0;i<num;i++){
??? CWorkerThread* thr = new CWorkerThread();
??? thr->SetThreadPool(this);
??? AppendToIdleList(thr);
??? m_VarMutex.Lock();
??? m_AvailNum++;
??? m_VarMutex.Unlock();
??? thr->Start();?????? //begin the thread,the thread wait for job
??? }
}
?
void CThreadPool::DeleteIdleThread(int num)
{
??? printf("Enter into CThreadPool::DeleteIdleThreadn");
??? m_IdleMutex.Lock();
??? printf("Delete Num is %dn",num);
??? for(int i=0;i<num;i++){
??? CWorkerThread* thr;
??? if(m_IdleList.size() > 0 ){
??????????? thr = (CWorkerThread*)m_IdleList.front();
??????????? printf("Get Idle thread %dn",thr->GetThreadID());
??? }
?
??? vector<CWorkerThread*>::iterator pos;
??? pos = find(m_IdleList.begin(),m_IdleList.end(),thr);
??? if(pos!=m_IdleList.end())
??? ??? m_IdleList.erase(pos);
??? m_AvailNum--;
??? printf("The idle thread available num:%d n",m_AvailNum);
??? printf("The idlelist????????????? num:%d n",m_IdleList.size());
??? }
??? m_IdleMutex.Unlock();
}
void CThreadPool::Run(CJob* job,void* jobdata)
{
??? assert(job!=NULL);
???
??? //if the busy thread num adds to m_MaxNum,so we should wait
??? if(GetBusyNum() == m_MaxNum)
??? ??? m_MaxNumCond.Wait();
?
??? if(m_IdleList.size()<m_AvailLow)
??? {
??? if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
??? ??? CreateIdleThread(m_InitNum-m_IdleList.size());
??? else
??? ??? CreateIdleThread(m_MaxNum-GetAllNum());
??? }
?
??? CWorkerThread*? idlethr = GetIdleThread();
??? if(idlethr !=NULL)
??? {
??? idlethr->m_WorkMutex.Lock();
??? MoveToBusyList(idlethr);
??? idlethr->SetThreadPool(this);
??? job->SetWorkThread(idlethr);
??? printf("Job is set to thread %d n",idlethr->GetThreadID());
??? idlethr->SetJob(job,jobdata);
??? }
}
在CThreadPool中存在兩個鏈表,一個是空閑鏈表,一個是忙碌鏈表。Idle鏈表中存放所有的空閑進程,當線程執行任務時候,其狀態變為忙碌狀態,同時從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構造函數中,我們將執行下面的代碼:
for(int i=0;i<m_InitNum;i++)
??? {
??? CWorkerThread* thr = new CWorkerThread();
??? AppendToIdleList(thr);
??? thr->SetThreadPool(this);
??? thr->Start();?????? //begin the thread,the thread wait for job
??? }
在該代碼中,我們將創建m_InitNum個線程,創建之后即調用AppendToIdleList放入Idle鏈表中,由于目前沒有任務分發給這些線程,因此線程執行Start后將自己掛起。
事實上,線程池中容納的線程數目并不是一成不變的,其會根據執行負載進行自動伸縮。為此在CThreadPool中設定四個變量:
m_InitNum:處世創建時線程池中的線程的個數。
m_MaxNum:當前線程池中所允許并發存在的線程的最大數目。
m_AvailLow:當前線程池中所允許存在的空閑線程的最小數目,如果空閑數目低于該值,表明負載可能過重,此時有必要增加空閑線程池的數目。實現中我們總是將線程調整為m_InitNum個。
m_AvailHigh:當前線程池中所允許的空閑的線程的最大數目,如果空閑數目高于該值,表明當前負載可能較輕,此時將刪除多余的空閑線程,刪除后調整數也為m_InitNum個。
m_AvailNum:目前線程池中實際存在的線程的個數,其值介于m_AvailHigh和m_AvailLow之間。如果線程的個數始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要創建,也不需要刪除,保持平衡狀態。因此如何設定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態,是線程池設計必須考慮的問題。
線程池在接受到新的任務之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個步驟:
(1)檢查當前處于忙碌狀態的線程是否達到了設定的最大值m_MaxNum,如果達到了,表明目前沒有空閑線程可用,而且也不能創建新的線程,因此必須等待直到有線程執行完畢返回到空閑隊列中。
(2)如果當前的空閑線程數目小于我們設定的最小的空閑數目m_AvailLow,則我們必須創建新的線程,默認情況下,創建后的線程數目應該為m_InitNum,因此創建的線程數目應該為(?當前空閑線程數與m_InitNum);但是有一種特殊情況必須考慮,就是現有的線程總數加上創建后的線程數可能超過m_MaxNum,因此我們必須對線程的創建區別對待。
??? if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
??? ??? CreateIdleThread(m_InitNum-m_IdleList.size());
??? else
??? ??? CreateIdleThread(m_MaxNum-GetAllNum());
如果創建后總數不超過m_MaxNum,則創建后的線程為m_InitNum;如果超過了,則只創建( m_MaxNum-當前線程總數)個。
(3)調用GetIdleThread方法查找空閑線程。如果當前沒有空閑線程,則掛起;否則將任務指派給該線程,同時將其移入忙碌隊列。
當線程執行完畢后,其會調用MoveToIdleList方法移入空閑鏈表中,其中還調用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。
?
CWorkerThread
CWorkerThread是CThread的派生類,是事實上的工作線程。在CThreadPool的構造函數中,我們創建了一定數量的CWorkerThread。一旦這些線程創建完畢,我們將調用Start()啟動該線程。Start方法最終會調用Run方法。Run方法是個無限循環的過程。在沒有接受到實際的任務的時候,m_Job為NULL,此時線程將調用Wait方法進行等待,從而處于掛起狀態。一旦線程池將具體的任務分發給該線程,其將被喚醒,從而通知線程從掛起的地方繼續執行。CWorkerThread的完整定義如下:
class CWorkerThread:public CThread
{
private:
??? CThreadPool*? m_ThreadPool;
??? CJob*??? m_Job;
??? void*??? m_JobData;
???
??? CThreadMutex m_VarMutex;
??? bool????? m_IsEnd;
protected:
public:
??? CCondition?? m_JobCond;
??? CThreadMutex m_WorkMutex;
??? CWorkerThread();
??? virtual ~CWorkerThread();
??? void Run();
??? void??? SetJob(CJob* job,void* jobdata);
??? CJob*?? GetJob(void){return m_Job;}
??? void??? SetThreadPool(CThreadPool* thrpool);
??? CThreadPool* GetThreadPool(void){return m_ThreadPool;}
};
CWorkerThread::CWorkerThread()
{
??? m_Job = NULL;
??? m_JobData = NULL;
??? m_ThreadPool = NULL;
??? m_IsEnd = false;
}
CWorkerThread::~CWorkerThread()
{
??? if(NULL != m_Job)
??? delete m_Job;
??? if(m_ThreadPool != NULL)
??? delete m_ThreadPool;
}
?
void CWorkerThread::Run()
{
??? SetThreadState(THREAD_RUNNING);
??? for(;;)
??? {
??? while(m_Job == NULL)
??????? m_JobCond.Wait();
?
??? m_Job->Run(m_JobData);
??? m_Job->SetWorkThread(NULL);
??? m_Job = NULL;
??? m_ThreadPool->MoveToIdleList(this);
??? if(m_ThreadPool->m_IdleList.size() > m_ThreadPool->GetAvailHighNum())
??? {
m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_T
hreadPool->GetInitNum());
??? }
??? m_WorkMutex.Unlock();
??? }
}
void CWorkerThread::SetJob(CJob* job,void* jobdata)
{
??? m_VarMutex.Lock();
??? m_Job = job;
??? m_JobData = jobdata;
??? job->SetWorkThread(this);
??? m_VarMutex.Unlock();
??? m_JobCond.Signal();
}
void CWorkerThread::SetThreadPool(CThreadPool* thrpool)
{
??? m_VarMutex.Lock();
??? m_ThreadPool = thrpool;
??? m_VarMutex.Unlock();
}
當線程執行任務之前首先必須判斷空閑線程的數目是否低于m_AvailLow,如果低于,則必須創建足夠的空閑線程,使其數目達到m_InitNum個,然后將調用MoveToBusyList()移出空閑隊列,移入忙碌隊列。當任務執行完畢后,其又調用MoveToIdleList()移出忙碌隊列,移入空閑隊列,等待新的任務。
除了Run方法之外,CWorkerThread中另外一個重要的方法就是SetJob,該方法將實際的任務賦值給線程。當沒有任何執行任務即m_Job為NULL的時候,線程將調用m_JobCond.Wait進行等待。一旦Job被賦值給線程,其將調用m_JobCond.Signal方法喚醒該線程。由于m_JobCond屬于線程內部的變量,每個線程都維持一個m_JobCond,只有得到任務的線程才被喚醒,沒有得到任務的將繼續等待。無論一個線程何時被喚醒,其都將從等待的地方繼續執行m_Job->Run(m_JobData),這是線程執行實際任務的地方。
在線程執行給定Job期間,我們必須防止另外一個Job又賦給該線程,因此在賦值之前,通過m_VarMutex進行鎖定, Job執行期間,其于的Job將不能關聯到該線程;任務執行完畢,我們調用m_VarMutex.Unlock()進行解鎖,此時,線程又可以接受新的執行任務。
在線程執行任務結束后返回空閑隊列前,我們還需要判斷當前空閑隊列中的線程是否高于m_AvailHigh個。如果超過m_AvailHigh,則必須從其中刪除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())個線程,使線程數目保持在m_InitNum個。
?
CJob
CJob類相對簡單,其封裝了任務的基本的屬性和方法,其中最重要的是Run方法,代碼如下:
class CJob
{
private:
??? int????? m_JobNo;??????? //The num was assigned to the job
??? char*??? m_JobName;????? //The job name
??? CThread? *m_pWorkThread;???? //The thread associated with the job
public:
??? CJob( void );
??? virtual ~CJob();
???????
??? int????? GetJobNo(void) const { return m_JobNo; }
??? void???? SetJobNo(int jobno){ m_JobNo = jobno;}
??? char*??? GetJobName(void) const { return m_JobName; }
??? void???? SetJobName(char* jobname);
??? CThread *GetWorkThread(void){ return m_pWorkThread; }
??? void???? SetWorkThread ( CThread *pWorkThread ){
??????? m_pWorkThread = pWorkThread;
??? }
??? virtual void Run ( void *ptr ) = 0;
};
CJob::CJob(void)
:m_pWorkThread(NULL)
,m_JobNo(0)
,m_JobName(NULL)
{
}
CJob::~CJob(){
??? if(NULL != m_JobName)
??? free(m_JobName);
}
void CJob::SetJobName(char* jobname)
{
??? if(NULL !=m_JobName)??? {
??????? free(m_JobName);
??????? m_JobName = NULL;
??? }
??? if(NULL !=jobname)??? {
??????? m_JobName = (char*)malloc(strlen(jobname)+1);
??????? strcpy(m_JobName,jobname);
??? }
}
線程池使用示例
至此我們給出了一個簡單的與具體任務無關的線程池框架。使用該框架非常的簡單,我們所需要的做的就是派生CJob類,將需要完成的任務實現在Run方法中。然后將該Job交由CThreadManage去執行。下面我們給出一個簡單的示例程序
class CXJob:public CJob
{
public:
??? CXJob(){i=0;}
??? ~CXJob(){}
??? void Run(void* jobdata)??? {
??????? printf("The Job comes from CXJOB/n");
??????? sleep(2);
??? }
};
?
class CYJob:public CJob
{
public:
??? CYJob(){i=0;}
??? ~CYJob(){}
??? void Run(void* jobdata)??? {
??????? printf("The Job comes from CYJob/n");
??? }
};
?
main()
{
??? CThreadManage* manage = new CThreadManage(10);
??? for(int i=0;i<40;i++)
??? {
??????? CXJob*?? job = new CXJob();
??????? manage->Run(job,NULL);
??? }
??? sleep(2);
??? CYJob* job = new CYJob();
??? manage->Run(job,NULL);
??? manage->TerminateAll();
}
CXJob和CYJob都是從Job類繼承而來,其都實現了Run接口。CXJob只是簡單的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒鐘。在主程序中我們初始創建10個工作線程。然后分別執行40次CXJob和一次CYJob。
線程池使用后記
線程池適合場合
事實上,線程池并不是萬能的。它有其特定的使用場合。線程池致力于減少線程本身的開銷對應用所產生的影響,這是有前提的,前提就是線程本身開銷與線程執行任務相比不可忽略。如果線程本身的開銷相對于線程任務執行開銷而言是可以忽略不計的,那么此時線程池所帶來的好處是不明顯的,比如對于FTP服務器以及Telnet服務器,通常傳送文件的時間較長,開銷較大,那么此時,我們采用線程池未必是理想的方法,我們可以選擇“即時創建,即時銷毀”的策略。
總之線程池通常適合下面的幾個場合:
(1)? 單位時間內處理任務頻繁而且任務處理時間短
(2)? 對實時性要求較高。如果接受到任務后在創建線程,可能滿足不了實時要求,因此必須采用線程池進行預創建。
(3)? 必須經常面對高突發性事件,比如Web服務器,如果有足球轉播,則服務器將產生巨大的沖擊。此時如果采取傳統方法,則必須不停的大量產生線程,銷毀線程。此時采用動態線程池可以避免這種情況的發生。
?
結束語
本文給出了一個簡單的通用的與任務無關的線程池的實現,通過該線程池能夠極大的簡化Linux下多線程的開發工作。該線程池的進一步完善開發工作還在進行中,希望能夠得到你的建議和支持。
參考資料
http://www-900.ibm.com/developerWorks/cn/java/j-jtp0730/index.shtml
POSIX多線程程序設計,David R.Butenhof??? 譯者:于磊 曾剛,中國電力出版社
C++面向對象多線程編程,CAMERON HUGHES等著 周良忠譯,人民郵電出版社
Java Pro,結合線程和分析器池,Edy Yu
關于作者
張中慶,西安交通大學軟件所,在讀碩士,目前研究方向為分布式網絡與移動中間件,對Linux極其愛好,可以通過flydish1234@sina.com.cn與我聯系。
總結
以上是生活随笔為你收集整理的线程池原理及创建(C++实现)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么代码才是线程安全的
- 下一篇: VC网络通信API概览