日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Linux下设计一个简单的线程池

發布時間:2025/3/15 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Linux下设计一个简单的线程池 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

定義

???????? 什么是線程池?簡單點說,線程池就是有一堆已經創建好了的線程,初始它們都處于空閑等待狀態,當有新的任務需要處理的時候,就從這個池子里面取一個空閑等待的線程來處理該任務,當處理完成了就再次把該線程放回池中,以供后面的任務使用。當池子里的線程全都處理忙碌狀態時,線程池中沒有可用的空閑等待線程,此時,根據需要選擇創建一個新的線程并置入池中,或者通知任務線程池忙,稍后再試。

?

為什么要用線程池?

???????? 我們說,線程的創建和銷毀比之進程的創建和銷毀是輕量級的,但是當我們的任務需要大量進行大量線程的創建和銷毀操作時,這個消耗就會變成的相當大。比如,當你設計一個壓力性能測試框架的時候,需要連續產生大量的并發操作,這個是時候,線程池就可以很好的幫上你的忙。線程池的好處就在于線程復用,一個任務處理完成后,當前線程可以直接處理下一個任務,而不是銷毀后再創建,非常適用于連續產生大量并發任務的場合。

?

線程池工作原理

???????? 線程池中每一個線程的工作過程如下:


圖 1: 線程的工作流程

???????? 線程池的任務就在于負責這些線程的創建,銷毀和任務處理參數傳遞、喚醒和等待。

1.??????創建若干線程,置入線程池

2.??????任務達到時,從線程池取空閑線程

3.??????取得了空閑線程,立即進行任務處理

4.??????否則新建一個線程,并置入線程池,執行3

5.??????如果創建失敗或者線程池已滿,根據設計策略選擇返回錯誤或將任務置入處理隊列,等待處理

6.??????銷毀線程池

?

圖 2:線程池的工作原理

?

線程池設計


數據結構設計


任務設計

[cpp] view plaincopy print?
  • typedef?struct?tp_work_desc_s?TpWorkDesc;??
  • typedef?void?(*process_job)(TpWorkDesc*job);??
  • struct?tp_work_desc_s?{??
  • ?????????void?*ret;?//call?in,?that?is?arguments??
  • ?????????void?*arg;?//call?out,?that?is?return?value??
  • };??
  • typedef struct tp_work_desc_s TpWorkDesc; typedef void (*process_job)(TpWorkDesc*job); struct tp_work_desc_s {void *ret; //call in, that is argumentsvoid *arg; //call out, that is return value };
    其中,TpWorkDesc是任務參數描述,arg是傳遞給任務的參數,ret則是任務處理完成后的返回值;

    process_job函數是任務處理函數原型,每個任務處理函數都應該這樣定義,然后將它作為參數傳給線程池處理,線程池將會選擇一個空閑線程通過調用該函數來進行任務處理;

    ?

    線程設計

    [cpp] view plaincopy print?
  • typedef?struct?tp_thread_info_s?TpThreadInfo;??
  • struct?tp_thread_info_s?{??
  • ?????????pthread_t?thread_id;?//thread?id?num??
  • ?????????TPBOOL?is_busy;?//thread?status:true-busy;flase-idle??
  • ?????????pthread_cond_t?thread_cond;??
  • ?????????pthread_mutex_t?thread_lock;??
  • ?????????process_job?proc_fun;??
  • ?????????TpWorkDesc*?th_job;??
  • ?????????TpThreadPool*?tp_pool;??
  • };??
  • typedef struct tp_thread_info_s TpThreadInfo; struct tp_thread_info_s {pthread_t thread_id; //thread id numTPBOOL is_busy; //thread status:true-busy;flase-idlepthread_cond_t thread_cond;pthread_mutex_t thread_lock;process_job proc_fun;TpWorkDesc* th_job;TpThreadPool* tp_pool; };
    TpThreadInfo是對一個線程的描述。

    thread_id是該線程的ID;

    is_busy用于標識該線程是否正處理忙碌狀態;

    thread_cond用于任務處理時的喚醒和等待;

    thread_lock,用于任務加鎖,用于條件變量等待加鎖;

    proc_fun是當前任務的回調函數地址;

    th_job是任務的參數信息;

    tp_pool是所在線程池的指針;

    ?

    線程池設計

    [cpp] view plaincopy print?
  • typedef?struct?tp_thread_pool_s?TpThreadPool;??
  • struct?tp_thread_pool_s?{??
  • ?????????unsigned?min_th_num;?//min?thread?number?in?the?pool??
  • ?????????unsigned?cur_th_num;?//current?thread?number?in?the?pool??
  • ?????????unsigned?max_th_num;?//max?thread?number?in?the?pool??
  • ?????????pthread_mutex_t?tp_lock;??
  • ?????????pthread_t?manage_thread_id;?//manage?thread?id?num??
  • ?????????TpThreadInfo*?thread_info;??
  • ?????????Queue?idle_q;??
  • ?????????TPBOOL?stop_flag;??
  • };??
  • typedef struct tp_thread_pool_s TpThreadPool; struct tp_thread_pool_s {unsigned min_th_num; //min thread number in the poolunsigned cur_th_num; //current thread number in the poolunsigned max_th_num; //max thread number in the poolpthread_mutex_t tp_lock;pthread_t manage_thread_id; //manage thread id numTpThreadInfo* thread_info;Queue idle_q;TPBOOL stop_flag; };
    TpThreadPool是對線程池的描述。

    min_th_num是線程池中至少存在的線程數,線程池初始化的過程中會創建min_th_num數量的線程;

    cur_th_num是線程池當前存在的線程數量;

    max_th_num則是線程池最多可以存在的線程數量;

    tp_lock用于線程池管理時的互斥;

    manage_thread_id是線程池的管理線程ID;

    thread_info則是指向線程池數據,這里使用一個數組來存儲線程池中線程的信息,該數組的大小為max_th_num;

    idle_q是存儲線程池空閑線程指針的隊列,用于從線程池快速取得空閑線程;

    stop_flag用于線程池的銷毀,當stop_flag為FALSE時,表明當前線程池需要銷毀,所有忙碌線程在處理完當前任務后會退出;


    算法設計


    線程池的創建和初始化

    線程創建

    創建伊始,線程池線程容量大小上限為max_th_num,初始容量為min_th_num;

    [cpp] view plaincopy print?
  • TpThreadPool?*tp_create(unsigned?min_num,?unsigned?max_num)?{??
  • ????TpThreadPool?*pTp;??
  • ????pTp?=?(TpThreadPool*)?malloc(sizeof(TpThreadPool));??
  • ??
  • ????memset(pTp,?0,?sizeof(TpThreadPool));??
  • ??
  • ????//init?member?var??
  • ????pTp->min_th_num?=?min_num;??
  • ????pTp->cur_th_num?=?min_num;??
  • ????pTp->max_th_num?=?max_num;??
  • ????pthread_mutex_init(&pTp->tp_lock,?NULL);??
  • ??
  • ????//malloc?mem?for?num?thread?info?struct??
  • ????if?(NULL?!=?pTp->thread_info)??
  • ????????free(pTp->thread_info);??
  • ????pTp->thread_info?=?(TpThreadInfo*)?malloc(sizeof(TpThreadInfo)?*?pTp->max_th_num);??
  • ????memset(pTp->thread_info,?0,?sizeof(TpThreadInfo)?*?pTp->max_th_num);??
  • ??
  • ????return?pTp;??
  • }??
  • TpThreadPool *tp_create(unsigned min_num, unsigned max_num) {TpThreadPool *pTp;pTp = (TpThreadPool*) malloc(sizeof(TpThreadPool));memset(pTp, 0, sizeof(TpThreadPool));//init member varpTp->min_th_num = min_num;pTp->cur_th_num = min_num;pTp->max_th_num = max_num;pthread_mutex_init(&pTp->tp_lock, NULL);//malloc mem for num thread info structif (NULL != pTp->thread_info)free(pTp->thread_info);pTp->thread_info = (TpThreadInfo*) malloc(sizeof(TpThreadInfo) * pTp->max_th_num);memset(pTp->thread_info, 0, sizeof(TpThreadInfo) * pTp->max_th_num);return pTp; }

    線程初始化

    [cpp] view plaincopy print?
  • TPBOOL?tp_init(TpThreadPool?*pTp)?{??
  • ????int?i;??
  • ????int?err;??
  • ????TpThreadInfo?*pThi;??
  • ??
  • ????initQueue(&pTp->idle_q);??
  • ????pTp->stop_flag?=?FALSE;??
  • ??
  • ????//create?work?thread?and?init?work?thread?info??
  • ????for?(i?=?0;?i?<?pTp->min_th_num;?i++)?{??
  • ????????pThi?=?pTp->thread_info?+i;??
  • ????????pThi->tp_pool?=?pTp;??
  • ????????pThi->is_busy?=?FALSE;??
  • ????????pthread_cond_init(&pThi->thread_cond,?NULL);??
  • ????????pthread_mutex_init(&pThi->thread_lock,?NULL);??
  • ????????pThi->proc_fun?=?def_proc_fun;??
  • ????????pThi->th_job?=?NULL;??
  • ????????enQueue(&pTp->idle_q,?pThi);??
  • ??
  • ????????err?=?pthread_create(&pThi->thread_id,?NULL,?tp_work_thread,?pThi);??
  • ????????if?(0?!=?err)?{??
  • ????????????perror("tp_init:?create?work?thread?failed.");??
  • ????????????clearQueue(&pTp->idle_q);??
  • ????????????return?FALSE;??
  • ????????}??
  • ????}??
  • ??
  • ????//create?manage?thread??
  • ????err?=?pthread_create(&pTp->manage_thread_id,?NULL,?tp_manage_thread,?pTp);??
  • ????if?(0?!=?err)?{??
  • ????????clearQueue(&pTp->idle_q);??
  • ????????printf("tp_init:?creat?manage?thread?failed\n");??
  • ????????return?FALSE;??
  • ????}??
  • ??
  • ????return?TRUE;??
  • }??
  • TPBOOL tp_init(TpThreadPool *pTp) {int i;int err;TpThreadInfo *pThi;initQueue(&pTp->idle_q);pTp->stop_flag = FALSE;//create work thread and init work thread infofor (i = 0; i < pTp->min_th_num; i++) {pThi = pTp->thread_info +i;pThi->tp_pool = pTp;pThi->is_busy = FALSE;pthread_cond_init(&pThi->thread_cond, NULL);pthread_mutex_init(&pThi->thread_lock, NULL);pThi->proc_fun = def_proc_fun;pThi->th_job = NULL;enQueue(&pTp->idle_q, pThi);err = pthread_create(&pThi->thread_id, NULL, tp_work_thread, pThi);if (0 != err) {perror("tp_init: create work thread failed.");clearQueue(&pTp->idle_q);return FALSE;}}//create manage threaderr = pthread_create(&pTp->manage_thread_id, NULL, tp_manage_thread, pTp);if (0 != err) {clearQueue(&pTp->idle_q);printf("tp_init: creat manage thread failed\n");return FALSE;}return TRUE; }

    初始線程池中線程數量為min_th_num,對這些線程一一進行初始化;

    將這些初始化的空閑線程一一置入空閑隊列;

    創建管理線程,用于監控線程池的狀態,并適當回收多余的線程資源;

    ?

    線程池的關閉和銷毀

    [cpp] view plaincopy print?
  • void?tp_close(TpThreadPool?*pTp,?TPBOOL?wait)?{??
  • ????unsigned?i;??
  • ??
  • ????pTp->stop_flag?=?TRUE;??
  • ????if?(wait)?{??
  • ????????for?(i?=?0;?i?<?pTp->cur_th_num;?i++)?{??
  • ????????????pthread_cond_signal(&pTp->thread_info[i].thread_cond);??
  • ????????}??
  • ????????for?(i?=?0;?i?<?pTp->cur_th_num;?i++)?{??
  • ????????????pthread_join(pTp->thread_info[i].thread_id,?NULL);??
  • ????????????pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);??
  • ????????????pthread_cond_destroy(&pTp->thread_info[i].thread_cond);??
  • ????????}??
  • ????}?else?{??
  • ????????//close?work?thread??
  • ????????for?(i?=?0;?i?<?pTp->cur_th_num;?i++)?{??
  • ????????????kill((pid_t)pTp->thread_info[i].thread_id,?SIGKILL);??
  • ????????????pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);??
  • ????????????pthread_cond_destroy(&pTp->thread_info[i].thread_cond);??
  • ????????}??
  • ????}??
  • ????//close?manage?thread??
  • ????kill((pid_t)pTp->manage_thread_id,?SIGKILL);??
  • ????pthread_mutex_destroy(&pTp->tp_lock);??
  • ??
  • ????//free?thread?struct??
  • ????free(pTp->thread_info);??
  • ????pTp->thread_info?=?NULL;??
  • }??
  • void tp_close(TpThreadPool *pTp, TPBOOL wait) {unsigned i;pTp->stop_flag = TRUE;if (wait) {for (i = 0; i < pTp->cur_th_num; i++) {pthread_cond_signal(&pTp->thread_info[i].thread_cond);}for (i = 0; i < pTp->cur_th_num; i++) {pthread_join(pTp->thread_info[i].thread_id, NULL);pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);pthread_cond_destroy(&pTp->thread_info[i].thread_cond);}} else {//close work threadfor (i = 0; i < pTp->cur_th_num; i++) {kill((pid_t)pTp->thread_info[i].thread_id, SIGKILL);pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);pthread_cond_destroy(&pTp->thread_info[i].thread_cond);}}//close manage threadkill((pid_t)pTp->manage_thread_id, SIGKILL);pthread_mutex_destroy(&pTp->tp_lock);//free thread structfree(pTp->thread_info);pTp->thread_info = NULL; }
    線程池關閉的過程中,可以選擇是否對正在處理的任務進行等待,如果是,則會喚醒所有任務,然后等待所有任務執行完成,然后返回;如果不是,則將立即殺死所有線程,然后返回, 注意:這可能會導致任務的處理中斷而產生錯誤!

    ?

    任務處理

    [cpp] view plaincopy print?
  • TPBOOL?tp_process_job(TpThreadPool?*pTp,?process_job?proc_fun,?TpWorkDesc?*job)?{??
  • ????TpThreadInfo?*pThi?;??
  • ????//fill?pTp->thread_info's?relative?work?key??
  • ????pthread_mutex_lock(&pTp->tp_lock);??
  • ????pThi?=?(TpThreadInfo?*)?deQueue(&pTp->idle_q);??
  • ????pthread_mutex_unlock(&pTp->tp_lock);??
  • ????if(pThi){??
  • ????????pThi->is_busy?=TRUE;??
  • ????????pThi->proc_fun?=?proc_fun;??
  • ????????pThi->th_job?=?job;??
  • ????????pthread_cond_signal(&pThi->thread_cond);??
  • ????????DEBUG("Fetch?a?thread?from?pool.\n");??
  • ????????return?TRUE;??
  • ????}??
  • ????//if?all?current?thread?are?busy,?new?thread?is?created?here??
  • ????pthread_mutex_lock(&pTp->tp_lock);??
  • ????pThi?=?tp_add_thread(pTp);??
  • ????pthread_mutex_unlock(&pTp->tp_lock);??
  • ??
  • ????if(!pThi){??
  • ????????DEBUG("The?thread?pool?is?full,?no?more?thread?available.\n");??
  • ????????return?FALSE;??
  • ????}??
  • ????DEBUG("No?more?idle?thread,?created?a?new?one.\n");??
  • ????pThi->proc_fun?=?proc_fun;??
  • ????pThi->th_job?=?job;??
  • ??
  • ????//send?cond?to?work?thread??
  • ????pthread_cond_signal(&pThi->thread_cond);??
  • ????return?TRUE;??
  • }??
  • TPBOOL tp_process_job(TpThreadPool *pTp, process_job proc_fun, TpWorkDesc *job) {TpThreadInfo *pThi ;//fill pTp->thread_info's relative work keypthread_mutex_lock(&pTp->tp_lock);pThi = (TpThreadInfo *) deQueue(&pTp->idle_q);pthread_mutex_unlock(&pTp->tp_lock);if(pThi){pThi->is_busy =TRUE;pThi->proc_fun = proc_fun;pThi->th_job = job;pthread_cond_signal(&pThi->thread_cond);DEBUG("Fetch a thread from pool.\n");return TRUE;}//if all current thread are busy, new thread is created herepthread_mutex_lock(&pTp->tp_lock);pThi = tp_add_thread(pTp);pthread_mutex_unlock(&pTp->tp_lock);if(!pThi){DEBUG("The thread pool is full, no more thread available.\n");return FALSE;}DEBUG("No more idle thread, created a new one.\n");pThi->proc_fun = proc_fun;pThi->th_job = job;//send cond to work threadpthread_cond_signal(&pThi->thread_cond);return TRUE; }
    當一個新任務到達是,線程池首先會檢查是否有可用的空閑線程,如果是,則采用才空閑線程進行任務處理并返回TRUE,如果不是,則嘗試新建一個線程,并使用該線程對任務進行處理,如果失敗則返回FALSE,說明線程池忙碌或者出錯。

    [cpp] view plaincopy print?
  • static?void?*tp_work_thread(void?*arg)?{??
  • ????pthread_t?curid;//current?thread?id??
  • ????TpThreadInfo?*pTinfo?=?(TpThreadInfo?*)?arg;??
  • ??
  • ????//wait?cond?for?processing?real?job.??
  • ????while?(!(pTinfo->tp_pool->stop_flag))?{??
  • ????????pthread_mutex_lock(&pTinfo->thread_lock);??
  • ????????pthread_cond_wait(&pTinfo->thread_cond,?&pTinfo->thread_lock);??
  • ????????pthread_mutex_unlock(&pTinfo->thread_lock);??
  • ??
  • ????????//process??
  • ????????pTinfo->proc_fun(pTinfo->th_job);??
  • ??
  • ????????//thread?state?be?set?idle?after?work??
  • ????????//pthread_mutex_lock(&pTinfo->thread_lock);??
  • ????????pTinfo->is_busy?=?FALSE;??
  • ????????enQueue(&pTinfo->tp_pool->idle_q,?pTinfo);??
  • ????????//pthread_mutex_unlock(&pTinfo->thread_lock);??
  • ????????DEBUG("Job?done,?I?am?idle?now.\n");??
  • ????}??
  • }??
  • static void *tp_work_thread(void *arg) {pthread_t curid;//current thread idTpThreadInfo *pTinfo = (TpThreadInfo *) arg;//wait cond for processing real job.while (!(pTinfo->tp_pool->stop_flag)) {pthread_mutex_lock(&pTinfo->thread_lock);pthread_cond_wait(&pTinfo->thread_cond, &pTinfo->thread_lock);pthread_mutex_unlock(&pTinfo->thread_lock);//processpTinfo->proc_fun(pTinfo->th_job);//thread state be set idle after work//pthread_mutex_lock(&pTinfo->thread_lock);pTinfo->is_busy = FALSE;enQueue(&pTinfo->tp_pool->idle_q, pTinfo);//pthread_mutex_unlock(&pTinfo->thread_lock);DEBUG("Job done, I am idle now.\n");} }
    上面這個函數是任務處理函數,該函數將始終處理等待喚醒狀態,直到新任務到達或者線程銷毀時被喚醒,然后調用任務處理回調函數對任務進行處理;當任務處理完成時,則將自己置入空閑隊列中,以供下一個任務處理。

    [cpp] view plaincopy print?
  • TpThreadInfo?*tp_add_thread(TpThreadPool?*pTp)?{??
  • ????int?err;??
  • ????TpThreadInfo?*new_thread;??
  • ??
  • ????if?(pTp->max_th_num?<=?pTp->cur_th_num)??
  • ????????return?NULL;??
  • ??
  • ????//malloc?new?thread?info?struct??
  • ????new_thread?=?pTp->thread_info?+?pTp->cur_th_num;???
  • ??
  • ????new_thread->tp_pool?=?pTp;??
  • ????//init?new?thread's?cond?&?mutex??
  • ????pthread_cond_init(&new_thread->thread_cond,?NULL);??
  • ????pthread_mutex_init(&new_thread->thread_lock,?NULL);??
  • ??
  • ????//init?status?is?busy,?only?new?process?job?will?call?this?function??
  • ????new_thread->is_busy?=?TRUE;??
  • ????err?=?pthread_create(&new_thread->thread_id,?NULL,?tp_work_thread,?new_thread);??
  • ????if?(0?!=?err)?{??
  • ????????free(new_thread);??
  • ????????return?NULL;??
  • ????}??
  • ????//add?current?thread?number?in?the?pool.??
  • ????pTp->cur_th_num++;??
  • ??
  • ????return?new_thread;??
  • }??
  • TpThreadInfo *tp_add_thread(TpThreadPool *pTp) {int err;TpThreadInfo *new_thread;if (pTp->max_th_num <= pTp->cur_th_num)return NULL;//malloc new thread info structnew_thread = pTp->thread_info + pTp->cur_th_num; new_thread->tp_pool = pTp;//init new thread's cond & mutexpthread_cond_init(&new_thread->thread_cond, NULL);pthread_mutex_init(&new_thread->thread_lock, NULL);//init status is busy, only new process job will call this functionnew_thread->is_busy = TRUE;err = pthread_create(&new_thread->thread_id, NULL, tp_work_thread, new_thread);if (0 != err) {free(new_thread);return NULL;}//add current thread number in the pool.pTp->cur_th_num++;return new_thread; }
    上面這個函數用于向線程池中添加新的線程,該函數將會在當線程池沒有空閑線程可用時被調用。

    函數將會新建一個線程,并設置自己的狀態為busy(立即就要被用于執行任務)。

    線程池管理

    線程池的管理主要是監控線程池的整體忙碌狀態,當線程池大部分線程處于空閑狀態時,管理線程將適當的銷毀一定數量的空閑線程,以便減少線程池對系統資源的消耗。

    ?

    這里設計認為,當空閑線程的數量超過線程池線程數量的1/2時,線程池總體處理空閑狀態,可以適當銷毀部分線程池的線程,以減少線程池對系統資源的開銷。

    ?

    線程池狀態計算

    這里的BUSY_THRESHOLD的值是0.5,也即是當空閑線程數量超過一半時,返回0,說明線程池整體狀態為閑,否則返回1,說明為忙。

    [cpp] view plaincopy print?
  • int?tp_get_tp_status(TpThreadPool?*pTp)?{??
  • ????float?busy_num?=?0.0;??
  • ????int?i;??
  • ??
  • ????//get?busy?thread?number??
  • ????busy_num?=?pTp->cur_th_num?-?pTp->idle_q.count;?????
  • ??
  • ????DEBUG("Current?thread?pool?status,?current?num:?%u,?busy?num:?%u,?idle?num:?%u\n",?pTp->cur_th_num,?(unsigned)busy_num,?pTp->idle_q.count);??
  • ????//0.2??or?other?num???
  • ????if?(busy_num?/?(pTp->cur_th_num)?<?BUSY_THRESHOLD)??
  • ????????return?0;//idle?status??
  • ????else??
  • ????????return?1;//busy?or?normal?status??????
  • }??
  • int tp_get_tp_status(TpThreadPool *pTp) {float busy_num = 0.0;int i;//get busy thread numberbusy_num = pTp->cur_th_num - pTp->idle_q.count; DEBUG("Current thread pool status, current num: %u, busy num: %u, idle num: %u\n", pTp->cur_th_num, (unsigned)busy_num, pTp->idle_q.count);//0.2? or other num?if (busy_num / (pTp->cur_th_num) < BUSY_THRESHOLD)return 0;//idle statuselsereturn 1;//busy or normal status }
    線程的銷毀算法

    1.??????從空閑隊列中dequeue一個空閑線程指針,該指針指向線程信息數組的某項,例如這里是p;

    2.??????銷毀該線程

    3.??????把線程信息數組的最后一項拷貝至位置p

    4.??????線程池數量減少一,即cur_th_num--


    圖 3:線程銷毀

    ?

    [cpp] view plaincopy print?
  • TPBOOL?tp_delete_thread(TpThreadPool?*pTp)?{??
  • ????unsigned?idx;??
  • ????TpThreadInfo?*pThi;??
  • ????TpThreadInfo?tT;??
  • ??
  • ????//current?thread?num?can't?<?min?thread?num??
  • ????if?(pTp->cur_th_num?<=?pTp->min_th_num)??
  • ????????return?FALSE;??
  • ????//pthread_mutex_lock(&pTp->tp_lock);??
  • ????pThi?=?deQueue(&pTp->idle_q);??
  • ????//pthread_mutex_unlock(&pTp->tp_lock);??
  • ????if(!pThi)??
  • ??????return?FALSE;??
  • ??????
  • ????//after?deleting?idle?thread,?current?thread?num?-1??
  • ????pTp->cur_th_num--;??
  • ????memcpy(&tT,?pThi,?sizeof(TpThreadInfo));??
  • ????memcpy(pThi,?pTp->thread_info?+?pTp->cur_th_num,?sizeof(TpThreadInfo));??
  • ??
  • ????//kill?the?idle?thread?and?free?info?struct??
  • ????kill((pid_t)tT.thread_id,?SIGKILL);??
  • ????pthread_mutex_destroy(&tT.thread_lock);??
  • ????pthread_cond_destroy(&tT.thread_cond);??
  • ??
  • ????return?TRUE;??
  • }??
  • TPBOOL tp_delete_thread(TpThreadPool *pTp) {unsigned idx;TpThreadInfo *pThi;TpThreadInfo tT;//current thread num can't < min thread numif (pTp->cur_th_num <= pTp->min_th_num)return FALSE;//pthread_mutex_lock(&pTp->tp_lock);pThi = deQueue(&pTp->idle_q);//pthread_mutex_unlock(&pTp->tp_lock);if(!pThi)return FALSE;//after deleting idle thread, current thread num -1pTp->cur_th_num--;memcpy(&tT, pThi, sizeof(TpThreadInfo));memcpy(pThi, pTp->thread_info + pTp->cur_th_num, sizeof(TpThreadInfo));//kill the idle thread and free info structkill((pid_t)tT.thread_id, SIGKILL);pthread_mutex_destroy(&tT.thread_lock);pthread_cond_destroy(&tT.thread_cond);return TRUE; }
    線程池監控

    線程池通過一個管理線程來進行監控,管理線程將會每隔一段時間對線程池的狀態進行計算,根據線程池的狀態適當的銷毀部分線程,減少對系統資源的消耗。

    ?

    [cpp] view plaincopy print?
  • static?void?*tp_manage_thread(void?*arg)?{??
  • ????TpThreadPool?*pTp?=?(TpThreadPool*)?arg;//main?thread?pool?struct?instance??
  • ??
  • ????//1???
  • ????sleep(MANAGE_INTERVAL);??
  • ??
  • ????do?{??
  • ????????if?(tp_get_tp_status(pTp)?==?0)?{??
  • ????????????do?{??
  • ????????????????if?(!tp_delete_thread(pTp))??
  • ????????????????????break;??
  • ????????????}?while?(TRUE);??
  • ????????}//end?for?if??
  • ??
  • ????????//1???
  • ????????sleep(MANAGE_INTERVAL);??
  • ????}?while?(!pTp->stop_flag);??
  • ????return?NULL;??
  • }??
  • static void *tp_manage_thread(void *arg) {TpThreadPool *pTp = (TpThreadPool*) arg;//main thread pool struct instance//1?sleep(MANAGE_INTERVAL);do {if (tp_get_tp_status(pTp) == 0) {do {if (!tp_delete_thread(pTp))break;} while (TRUE);}//end for if//1?sleep(MANAGE_INTERVAL);} while (!pTp->stop_flag);return NULL; }

    程序測試

    至此,我們的設計需要使用一個測試程序來進行驗證。于是,我們寫下這樣一段代碼。

    [cpp] view plaincopy print?
  • #include?<stdio.h>??
  • #include?<unistd.h>??
  • #include?"thread_pool.h"??
  • ??
  • #define?THD_NUM?10???
  • void?proc_fun(TpWorkDesc?*job){??
  • ????int?i;??
  • ????int?idx=*(int?*)job->arg;??
  • ????printf("Begin:?thread?%d\n",?idx);??
  • ????sleep(3);??
  • ????printf("End:???thread?%d\n",?idx);??
  • }??
  • ??
  • int?main(int?argc,?char?**argv){??
  • ????TpThreadPool?*pTp=?tp_create(5,10);??
  • ????TpWorkDesc?pWd[THD_NUM];??
  • ????int?i,?*idx;??
  • ??
  • ????tp_init(pTp);??
  • ????for(i=0;?i?<?THD_NUM;?i++){??
  • ????????idx=(int?*)?malloc(sizeof(int));??
  • ????????*idx=i;??
  • ????????pWd[i].arg=idx;??
  • ????????tp_process_job(pTp,?proc_fun,?pWd+i);??
  • ????????usleep(400000);??
  • ????}??
  • ????//sleep(1);??
  • ????tp_close(pTp,?TRUE);??
  • ????free(pTp);??
  • ????printf("All?jobs?done!\n");??
  • ????return?0;??
  • }??
  • #include <stdio.h> #include <unistd.h> #include "thread_pool.h"#define THD_NUM 10 void proc_fun(TpWorkDesc *job){int i;int idx=*(int *)job->arg;printf("Begin: thread %d\n", idx);sleep(3);printf("End: thread %d\n", idx); }int main(int argc, char **argv){TpThreadPool *pTp= tp_create(5,10);TpWorkDesc pWd[THD_NUM];int i, *idx;tp_init(pTp);for(i=0; i < THD_NUM; i++){idx=(int *) malloc(sizeof(int));*idx=i;pWd[i].arg=idx;tp_process_job(pTp, proc_fun, pWd+i);usleep(400000);}//sleep(1);tp_close(pTp, TRUE);free(pTp);printf("All jobs done!\n");return 0; }
    執行結果:

    ?

    源碼下載

    地址:https://sourceforge.net/projects/thd-pool-linux/

    備注

    該線程池設計比較簡單,尚存在不少BUG,歡迎各位提出改進意見。

    ?


    修正:

    2011/08/04:

    tp_close函數增加隊列清空操作,參見源碼注釋部分。

    [cpp] view plaincopy print?
  • void?tp_close(TpThreadPool?*pTp,?TPBOOL?wait)?{??
  • ????unsigned?i;??
  • ??
  • ??
  • ????pTp->stop_flag?=?TRUE;??
  • ????if?(wait)?{??
  • ????????for?(i?=?0;?i?<?pTp->cur_th_num;?i++)?{??
  • ????????????pthread_cond_signal(&pTp->thread_info[i].thread_cond);??
  • ????????}??
  • ????????for?(i?=?0;?i?<?pTp->cur_th_num;?i++)?{??
  • ????????????pthread_join(pTp->thread_info[i].thread_id,?NULL);??
  • ????????????pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);??
  • ????????????pthread_cond_destroy(&pTp->thread_info[i].thread_cond);??
  • ????????}??
  • ????}?else?{??
  • ????????//close?work?thread??
  • ????????for?(i?=?0;?i?<?pTp->cur_th_num;?i++)?{??
  • ????????????kill((pid_t)pTp->thread_info[i].thread_id,?SIGKILL);??
  • ????????????pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);??
  • ????????????pthread_cond_destroy(&pTp->thread_info[i].thread_cond);??
  • ????????}??
  • ????}??
  • ????//close?manage?thread??
  • ????kill((pid_t)pTp->manage_thread_id,?SIGKILL);??
  • ????pthread_mutex_destroy(&pTp->tp_lock);??
  • ??
  • ??
  • ????clearQueue(&pTp->idle_q);?/**?這里添加隊列清空?**/??
  • ????//free?thread?struct??
  • ????free(pTp->thread_info);??
  • ????pTp->thread_info?=?NULL;??
  • }??
  • void tp_close(TpThreadPool *pTp, TPBOOL wait) {unsigned i;pTp->stop_flag = TRUE;if (wait) {for (i = 0; i < pTp->cur_th_num; i++) {pthread_cond_signal(&pTp->thread_info[i].thread_cond);}for (i = 0; i < pTp->cur_th_num; i++) {pthread_join(pTp->thread_info[i].thread_id, NULL);pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);pthread_cond_destroy(&pTp->thread_info[i].thread_cond);}} else {//close work threadfor (i = 0; i < pTp->cur_th_num; i++) {kill((pid_t)pTp->thread_info[i].thread_id, SIGKILL);pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);pthread_cond_destroy(&pTp->thread_info[i].thread_cond);}}//close manage threadkill((pid_t)pTp->manage_thread_id, SIGKILL);pthread_mutex_destroy(&pTp->tp_lock);clearQueue(&pTp->idle_q); /** 這里添加隊列清空 **///free thread structfree(pTp->thread_info);pTp->thread_info = NULL; }

    上述操作將導致段錯誤,原因是隊列在刪除元素的時候,對元素進行了free。而我們的元素其實是數組中某個元素的地址,這里將導致段錯誤的發生。源碼中隊列部分增加了元素釋放函數回調,設置該函數為NULL或者空函數(什么都不做),在刪除元素時將不會進行free操作。完整源碼請到上面的地址下載。


    在線程池初始化時,需要設置元素釋放函數為NULL,參見源碼注釋部分。

    [cpp] view plaincopy print?
  • TPBOOL?tp_init(TpThreadPool?*pTp)?{??
  • ????int?i;??
  • ????int?err;??
  • ????TpThreadInfo?*pThi;??
  • ??
  • ????initQueue(&pTp->idle_q,?NULL);?/**?初始化時設置元素釋放函數為NULL?**/??
  • ????pTp->stop_flag?=?FALSE;??
  • ??
  • ????//create?work?thread?and?init?work?thread?info??
  • ????for?(i?=?0;?i?<?pTp->min_th_num;?i++)?{??
  • ????????pThi?=?pTp->thread_info?+i;??
  • ????????pThi->tp_pool?=?pTp;??
  • ????????pThi->is_busy?=?FALSE;??
  • ????????pthread_cond_init(&pThi->thread_cond,?NULL);??
  • ????????pthread_mutex_init(&pThi->thread_lock,?NULL);??
  • ????????pThi->proc_fun?=?def_proc_fun;??
  • ????????pThi->th_job?=?NULL;??
  • ????????enQueue(&pTp->idle_q,?pThi);??
  • ??
  • ????????err?=?pthread_create(&pThi->thread_id,?NULL,?tp_work_thread,?pThi);??
  • ????????if?(0?!=?err)?{??
  • ????????????perror("tp_init:?create?work?thread?failed.");??
  • ????????????clearQueue(&pTp->idle_q);??
  • ????????????return?FALSE;??
  • ????????}??
  • ????}??
  • ??
  • ????//create?manage?thread??
  • ????err?=?pthread_create(&pTp->manage_thread_id,?NULL,?tp_manage_thread,?pTp);??
  • ????if?(0?!=?err)?{??
  • ????????clearQueue(&pTp->idle_q);??
  • ????????printf("tp_init:?creat?manage?thread?failed\n");??
  • ????????return?FALSE;??
  • ????}??
  • ??
  • ????return?TRUE;??
  • }??
  • TPBOOL tp_init(TpThreadPool *pTp) {int i;int err;TpThreadInfo *pThi;initQueue(&pTp->idle_q, NULL); /** 初始化時設置元素釋放函數為NULL **/pTp->stop_flag = FALSE;//create work thread and init work thread infofor (i = 0; i < pTp->min_th_num; i++) {pThi = pTp->thread_info +i;pThi->tp_pool = pTp;pThi->is_busy = FALSE;pthread_cond_init(&pThi->thread_cond, NULL);pthread_mutex_init(&pThi->thread_lock, NULL);pThi->proc_fun = def_proc_fun;pThi->th_job = NULL;enQueue(&pTp->idle_q, pThi);err = pthread_create(&pThi->thread_id, NULL, tp_work_thread, pThi);if (0 != err) {perror("tp_init: create work thread failed.");clearQueue(&pTp->idle_q);return FALSE;}}//create manage threaderr = pthread_create(&pTp->manage_thread_id, NULL, tp_manage_thread, pTp);if (0 != err) {clearQueue(&pTp->idle_q);printf("tp_init: creat manage thread failed\n");return FALSE;}return TRUE; }


    這里順便附上隊列頭文件部分源碼:

    [cpp] view plaincopy print?
  • #ifndef?__QUEUE_H_??
  • #define?__QUEUE_H_??
  • ??
  • #include?<pthread.h>??
  • ??
  • typedef?struct?sNode?QNode;??
  • typedef?struct?queueLK?Queue;??
  • ??
  • typedef?void?*?EType;??
  • ??
  • typedef?void?(*free_data_fun)(void?*data);??
  • ??
  • struct?sNode?{??
  • ????EType?*?data;??
  • ????struct?sNode?*next;??
  • };??
  • ??
  • struct?queueLK?{??
  • ????struct?sNode?*front;??
  • ????struct?sNode?*rear;??
  • ????free_data_fun?free_fun;??
  • ????unsigned?count;??
  • ????pthread_mutex_t?lock;??
  • };??
  • ??
  • void?initQueue(Queue?*hq,?free_data_fun?pff);??
  • int?enQueue(Queue?*hq,?EType?x);??
  • EType?deQueue(Queue?*hq);??
  • EType?peekQueue(Queue?*hq);??
  • int?isEmptyQueue(Queue?*hq);??
  • void?clearQueue(Queue?*hq);??
  • 總結

    以上是生活随笔為你收集整理的Linux下设计一个简单的线程池的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。