日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > linux >内容正文

linux

Linux C++ 实现线程池

發(fā)布時間:2023/11/30 linux 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Linux C++ 实现线程池 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

http://blog.csdn.net/qq_25425023/article/details/53914609

線程池中的線程,在任務隊列為空的時候,等待任務的到來,任務隊列中有任務時,則依次獲取任務來執(zhí)行,任務隊列需要同步。

? Linux線程同步有多種方法:互斥量、信號量、條件變量等。


? 下面是根據互斥量、信號量、條件變量封裝的三個類。

? 線程池中用到了互斥量和信號量。

?

[cpp]?view plain?copy
  • #ifndef?_LOCKER_H_??
  • #define?_LOCKER_H_??
  • ??
  • #include?<pthread.h>??
  • #include?<stdio.h>??
  • #include?<semaphore.h>??
  • ??
  • /*信號量的類*/??
  • class?sem_locker??
  • {??
  • private:??
  • ????sem_t?m_sem;??
  • ??
  • public:??
  • ????//初始化信號量??
  • ????sem_locker()??
  • ????{??
  • ????if(sem_init(&m_sem,?0,?0)?!=?0)??
  • ????????printf("sem?init?error\n");??
  • ????}??
  • ????//銷毀信號量??
  • ????~sem_locker()??
  • ????{??
  • ????sem_destroy(&m_sem);??
  • ????}??
  • ??
  • ????//等待信號量??
  • ????bool?wait()??
  • ????{??
  • ????return?sem_wait(&m_sem)?==?0;??
  • ????}??
  • ????//添加信號量??
  • ????bool?add()??
  • ????{??
  • ????return?sem_post(&m_sem)?==?0;??
  • ????}??
  • };??
  • ??
  • ??
  • /*互斥?locker*/??
  • class?mutex_locker??
  • {??
  • private:??
  • ????pthread_mutex_t?m_mutex;??
  • ??
  • public:??
  • ????mutex_locker()??
  • ????{??
  • ????????if(pthread_mutex_init(&m_mutex,?NULL)?!=?0)??
  • ????????printf("mutex?init?error!");??
  • ????}??
  • ????~mutex_locker()??
  • ????{??
  • ????pthread_mutex_destroy(&m_mutex);??
  • ????}??
  • ??
  • ????bool?mutex_lock()??//lock?mutex??
  • ????{??
  • ????return?pthread_mutex_lock(&m_mutex)?==?0;??
  • ????}??
  • ????bool?mutex_unlock()???//unlock??
  • ????{??
  • ????return?pthread_mutex_unlock(&m_mutex)?==?0;??
  • ????}??
  • };??
  • ??
  • /*條件變量?locker*/??
  • class?cond_locker??
  • {??
  • private:??
  • ????pthread_mutex_t?m_mutex;??
  • ????pthread_cond_t?m_cond;??
  • ??
  • public:??
  • ????//?初始化?m_mutex?and?m_cond??
  • ????cond_locker()??
  • ????{??
  • ????if(pthread_mutex_init(&m_mutex,?NULL)?!=?0)??
  • ????????printf("mutex?init?error");??
  • ????if(pthread_cond_init(&m_cond,?NULL)?!=?0)??
  • ????{???//條件變量初始化是被,釋放初始化成功的mutex??
  • ????????pthread_mutex_destroy(&m_mutex);??
  • ????????printf("cond?init?error");??
  • ????}??
  • ????}??
  • ????//?destroy?mutex?and?cond??
  • ????~cond_locker()??
  • ????{??
  • ????pthread_mutex_destroy(&m_mutex);??
  • ????pthread_cond_destroy(&m_cond);??
  • ????}??
  • ????//等待條件變量??
  • ????bool?wait()??
  • ????{??
  • ????int?ans?=?0;??
  • ????pthread_mutex_lock(&m_mutex);??
  • ????ans?=?pthread_cond_wait(&m_cond,?&m_mutex);??
  • ????pthread_mutex_unlock(&m_mutex);??
  • ????return?ans?==?0;??
  • ????}??
  • ????//喚醒等待條件變量的線程??
  • ????bool?signal()??
  • ????{??
  • ????return?pthread_cond_signal(&m_cond)?==?0;??
  • ????}??
  • ??
  • };??
  • ??
  • #endif??

  • 下面的是線程池類,是一個模版類:

    [cpp]?view plain?copy
  • #ifndef?_PTHREAD_POOL_??
  • #define?_PTHREAD_POOL_??
  • ??
  • #include?"locker.h"??
  • #include?<list>??
  • #include?<stdio.h>??
  • #include?<exception>??
  • #include?<errno.h>??
  • #include?<pthread.h>??
  • #include?<iostream>??
  • ??
  • template<class?T>??
  • class?threadpool??
  • {??
  • private:??
  • ????int?thread_number;??//線程池的線程數??
  • ????int?max_task_number;??//任務隊列中的最大任務數??
  • ????pthread_t?*all_threads;???//線程數組??
  • ????std::list<T?*>?task_queue;?//任務隊列??
  • ????mutex_locker?queue_mutex_locker;??//互斥鎖??
  • ????sem_locker?queue_sem_locker;???//信號量??
  • ????bool?is_stop;?//是否結束線程??
  • public:??
  • ????threadpool(int?thread_num?=?20,?int?max_task_num?=?30);??
  • ????~threadpool();??
  • ????bool?append_task(T?*task);??
  • ????void?start();??
  • ????void?stop();??
  • private:??
  • ????//線程運行的函數。執(zhí)行run()函數??
  • ????static?void?*worker(void?*arg);??
  • ????void?run();??
  • };??
  • ??
  • template?<class?T>??
  • threadpool<T>::threadpool(int?thread_num,?int?max_task_num):??
  • ????thread_number(thread_num),?max_task_number(max_task_num),??
  • ????is_stop(false),?all_threads(NULL)??
  • {??
  • ????if((thread_num?<=?0)?||?max_task_num?<=?0)??
  • ????printf("threadpool?can't?init?because?thread_number?=?0"??
  • ????????"?or?max_task_number?=?0");??
  • ??
  • ????all_threads?=?new?pthread_t[thread_number];??
  • ????if(!all_threads)??
  • ????????printf("can't?init?threadpool?because?thread?array?can't?new");??
  • }??
  • ??
  • template?<class?T>??
  • threadpool<T>::~threadpool()??
  • {??
  • ????delete?[]all_threads;??
  • ????is_stop?=?true;??
  • }??
  • ??
  • template?<class?T>??
  • void?threadpool<T>::stop()??
  • {??
  • ????is_stop?=?true;??
  • ????//queue_sem_locker.add();??
  • }??
  • ??
  • template?<class?T>??
  • void?threadpool<T>::start()??
  • {??
  • ????for(int?i?=?0;?i?<?thread_number;?++i)??
  • ????{??
  • ????printf("create?the?%dth?pthread\n",?i);??
  • ????if(pthread_create(all_threads?+?i,?NULL,?worker,?this)?!=?0)??
  • ????{//創(chuàng)建線程失敗,清除成功申請的資源并拋出異常??
  • ????????delete?[]all_threads;??
  • ????????throw?std::exception();??
  • ????}??
  • ????if(pthread_detach(all_threads[i]))??
  • ????{//將線程設置為脫離線程,失敗則清除成功申請的資源并拋出異常??
  • ????????delete?[]all_threads;??
  • ????????throw?std::exception();??
  • ????}??
  • ????}??
  • }??
  • //添加任務進入任務隊列??
  • template?<class?T>??
  • bool?threadpool<T>::append_task(T?*task)??
  • {???//獲取互斥鎖??
  • ????queue_mutex_locker.mutex_lock();??
  • ????//判斷隊列中任務數是否大于最大任務數??
  • ????if(task_queue.size()?>?max_task_number)??
  • ????{//是則釋放互斥鎖??
  • ????queue_mutex_locker.mutex_unlock();??
  • ????return?false;??
  • ????}??
  • ????//添加進入隊列??
  • ????task_queue.push_back(task);??
  • ????queue_mutex_locker.mutex_unlock();??
  • ????//喚醒等待任務的線程??
  • ????queue_sem_locker.add();??
  • ????return?true;??
  • }??
  • ??
  • template?<class?T>??
  • void?*threadpool<T>::worker(void?*arg)??
  • {??
  • ????threadpool?*pool?=?(threadpool?*)arg;??
  • ????pool->run();??
  • ????return?pool;??
  • }??
  • ??
  • template?<class?T>??
  • void?threadpool<T>::run()??
  • {??
  • ????while(!is_stop)??
  • ????{???//等待任務??
  • ????queue_sem_locker.wait();??????
  • ????if(errno?==?EINTR)??
  • ????{??
  • ????????printf("errno");??
  • ????????continue;??
  • ????}??
  • ????//獲取互斥鎖??
  • ????queue_mutex_locker.mutex_lock();??
  • ????//判斷任務隊列是否為空??
  • ????if(task_queue.empty())??
  • ????{??
  • ????????queue_mutex_locker.mutex_unlock();??
  • ????????continue;??
  • ????}??
  • ????//獲取隊頭任務并執(zhí)行??
  • ????T?*task?=?task_queue.front();??
  • ????task_queue.pop_front();??
  • ????queue_mutex_locker.mutex_unlock();??
  • ????if(!task)??
  • ????????continue;??
  • //??printf("pthreadId?=?%ld\n",?(unsigned?long)pthread_self());???
  • ????task->doit();??//doit是T對象中的方法??
  • ????}??
  • ????//測試用??
  • ????printf("close?%ld\n",?(unsigned?long)pthread_self());??
  • }??
  • ??
  • #endif??

  • 以上參考《Linux高性能服務器編程》


    寫個程序對線程池進行測試:

    [cpp]?view plain?copy
  • #include?<stdio.h>??
  • #include?<iostream>??
  • #include?<unistd.h>??
  • ??
  • #include?"thread_pool.h"??
  • ??
  • class?task??
  • {??
  • private:??
  • ????int?number;??
  • ??
  • public:??
  • ????task(int?num)?:?number(num)??
  • ????{??
  • ????}??
  • ????~task()??
  • ????{??
  • ????}??
  • ??
  • ????void?doit()??
  • ????{??
  • ????printf("this?is?the?%dth?task\n",?number);??
  • ????}??
  • };??
  • ??
  • int?main()??
  • {??
  • ????task?*ta;??
  • ????threadpool<task>?pool(10,?15);??
  • //????pool.start();??
  • ????for(int?i?=?0;?i?<?20;?++i)??
  • ????{??
  • ????ta?=?new?task(i);??
  • //??sleep(2);??
  • ????pool.append_task(ta);??
  • ????}??
  • ????pool.start();??
  • ????sleep(10);??
  • ????printf("close?the?thread?pool\n");??
  • ????pool.stop();??
  • ????pause();??
  • ????return?0;??
  • }??

  • 經測試,線程池可以正常使用。

    下一篇博客,使用線程池來實現回射服務器,測試可以達到多大的并發(fā)量。


    總結

    以上是生活随笔為你收集整理的Linux C++ 实现线程池的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。