日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > linux >内容正文

linux

非常精简的Linux线程池实现(一)——使用互斥锁和条件变量

發(fā)布時(shí)間:2023/11/30 linux 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 非常精简的Linux线程池实现(一)——使用互斥锁和条件变量 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

https://blog.csdn.net/kxcfzyk/article/details/31719687

線程池的含義跟它的名字一樣,就是一個(gè)由許多線程組成的池子。

有了線程池,在程序中使用多線程變得簡單。我們不用再自己去操心線程的創(chuàng)建、撤銷、管理問題,有什么要消耗大量CPU時(shí)間的任務(wù)通通直接扔到線程池里就好了,然后我們的主程序(主線程)可以繼續(xù)干自己的事去,線程池里面的線程會(huì)自動(dòng)去執(zhí)行這些任務(wù)。

另一方面,線程池提升了多線程程序的性能。我們不需要在大量任務(wù)需要執(zhí)行時(shí)現(xiàn)創(chuàng)建大量線程,然后在任務(wù)結(jié)束時(shí)又銷毀大量線程,因?yàn)榫€程池里面的線程都是現(xiàn)成的而且能夠重復(fù)使用。一個(gè)理想的線程池能夠合理地動(dòng)態(tài)調(diào)節(jié)池內(nèi)線程數(shù)量,既不會(huì)因?yàn)榫€程過少而導(dǎo)致大量任務(wù)堆積,也不會(huì)因?yàn)榫€程過多了而增加額外的系統(tǒng)開銷。

線程池看上去很神奇的樣子,那它是怎么實(shí)現(xiàn)的呢?線程這么虛渺在的東西也能像有形的物品一樣圈在一個(gè)池子里?在只知道線程池這個(gè)名字的時(shí)候,我心里的疑惑就是這樣的。

其實(shí)線程池的原理非常簡單,它就是一個(gè)非常典型的生產(chǎn)者消費(fèi)者同步問題。如果不知道我說的這個(gè)XXX問題也不要緊,我下面就解釋。

根據(jù)剛才描述的線程池的功能,可以看出線程池至少有兩個(gè)主要?jiǎng)幼?#xff0c;一個(gè)是主程序不定時(shí)地向線程池添加任務(wù),另一個(gè)是線程池里的線程領(lǐng)取任務(wù)去執(zhí)行。且不論任務(wù)和執(zhí)行任務(wù)是個(gè)什么概念,但是一個(gè)任務(wù)肯定只能分配給一個(gè)線程執(zhí)行。

這樣就可以簡單猜想線程池的一種可能的架構(gòu)了:主程序執(zhí)行入隊(duì)操作,把任務(wù)添加到一個(gè)隊(duì)列里面;池子里的多個(gè)工作線程共同對這個(gè)隊(duì)列試圖執(zhí)行出隊(duì)操作,這里要保證同一時(shí)刻只有一個(gè)線程出隊(duì)成功,搶奪到這個(gè)任務(wù),其他線程繼續(xù)共同試圖出隊(duì)搶奪下一個(gè)任務(wù)。所以在實(shí)現(xiàn)線程池之前,我們需要一個(gè)隊(duì)列,我為這個(gè)線程池配備的隊(duì)列單獨(dú)放到了另一篇博客一個(gè)通用純C隊(duì)列的實(shí)現(xiàn)中。

這里的生產(chǎn)者就是主程序,生產(chǎn)任務(wù)(增加任務(wù)),消費(fèi)者就是工作線程,消費(fèi)任務(wù)(執(zhí)行、減少任務(wù))。因?yàn)檫@里涉及到多個(gè)線程同時(shí)訪問一個(gè)隊(duì)列的問題,所以我們需要互斥鎖來保護(hù)隊(duì)列,同時(shí)還需要條件變量來處理主線程通知任務(wù)到達(dá)、工作線程搶奪任務(wù)的問題。如果不熟悉條件變量,我在另一篇博客Linux C語言多線程庫Pthread中條件變量的的正確用法逐步詳解中作了詳細(xì)說明。

準(zhǔn)備工作都差不多了,可以開始設(shè)計(jì)線程池了。一個(gè)最簡單線程池應(yīng)該有什么功能呢?對于使用者來說,除了創(chuàng)建和銷毀線程池,最簡單的情況下只需要一個(gè)功能——添加任務(wù)。對于線程池自己來說,最簡單的情況下不需要?jiǎng)討B(tài)調(diào)節(jié)線程數(shù)量,不需要考慮線程同步、線程死鎖等等一大堆麻煩的問題。所以最后的線程池API定義為:

?

  • //thread_pool.h

  • ?
  • #ifndef THREAD_POOL_H_INCLUDED

  • #define THREAD_POOL_H_INCLUDED

  • ?
  • typedef struct thread_pool *thread_pool_t;

  • ?
  • thread_pool_t thread_pool_create(unsigned int thread_count);

  • ?
  • void thread_pool_add_task(thread_pool_t pool, void* (*routine)(void *arg), void *arg);

  • ?
  • void thread_pool_destroy(thread_pool_t pool);

  • ?
  • #endif //THREAD_POOL_H_INCLUDED

  • 創(chuàng)建線程池時(shí)指定線程池中應(yīng)該固定包含多少工作線程,添加任務(wù)就是向線程池添加一個(gè)任務(wù)函數(shù)指針和任務(wù)函數(shù)需要的參數(shù)——這跟Pthread線程庫中的普通線程創(chuàng)建函數(shù)pthread_create是一樣的。根據(jù)這套線程池API,我們使用線程池的應(yīng)用程序應(yīng)該是這個(gè)套路:

    ?

    ?

  • //test.c

  • ?
  • #include "thread_pool.h"

  • #include <stdio.h>

  • #include <unistd.h>

  • #include <pthread.h>

  • ?
  • void* test(void *arg) {

  • int i;

  • for(i=0; i<5; i++) {

  • printf("tid:%ld task:%ld\n", pthread_self(), (long)arg);

  • fflush(stdout);

  • sleep(2);

  • }

  • return NULL;

  • }

  • ?
  • int main() {

  • long i=0;

  • thread_pool_t pool;

  • ?
  • pool=thread_pool_create(2);

  • ?
  • for(i=0; i<5; i++) {

  • thread_pool_add_task(pool, test, (void*)i);

  • }

  • ?
  • puts("press enter to terminate ...");

  • getchar();

  • ?
  • thread_pool_destroy(pool);

  • return 0;

  • }

  • 上面這個(gè)測試程序向線程池添加了5個(gè)相同的任務(wù),每個(gè)任務(wù)耗時(shí)10秒,但是線程池中只有2個(gè)工作線程,所以程序的運(yùn)行結(jié)果是兩個(gè)工作線程輪流把5個(gè)任務(wù)挨個(gè)做完。顯示到屏幕上就是:前10秒兩個(gè)工作線程輪流輸出自己的線程ID和當(dāng)前任務(wù)的任務(wù)號0和1,各輸出5次;第二個(gè)10秒兩個(gè)工作線程輪流輸出自己的線程ID和當(dāng)前任務(wù)的任務(wù)號2和3……

    ?

    在這期間,主程序輸出“press enter to terminate ...”并等待用戶輸入,任何時(shí)候都可以按回車讓主程序繼續(xù)往下,這樣會(huì)強(qiáng)制終止所有工作線程并銷毀線程池,最后程序退出。test程序運(yùn)行效果截圖如下:

    ?

    最后就是線程池真正的實(shí)現(xiàn)了:

    ?

    ?

  • //thread_pool.c

  • ?
  • #include "thread_pool.h"

  • #include "queue.h"

  • #include <stdlib.h>

  • #include <pthread.h>

  • ?
  • struct thread_pool {

  • unsigned int thread_count;

  • pthread_t *threads;

  • queue_t tasks;

  • pthread_mutex_t lock;

  • pthread_cond_t task_ready;

  • };

  • ?
  • struct task {

  • void* (*routine)(void *arg);

  • void *arg;

  • };

  • ?
  • static void cleanup(pthread_mutex_t* lock) {

  • pthread_mutex_unlock(lock);

  • }

  • ?
  • static void * worker(thread_pool_t pool) {

  • struct task *t;

  • while(1) {

  • pthread_mutex_lock(&pool->lock);

  • pthread_cleanup_push((void(*)(void*))cleanup, &pool->lock);

  • while(queue_isempty(pool->tasks)) {

  • pthread_cond_wait(&pool->task_ready, &pool->lock);

  • /*A condition wait (whether timed or not) is a cancellation point ... a side-effect of acting upon a cancellation request while in a condition wait is that the mutex is (in effect) re-acquired before calling the first cancellation cleanup handler.*/

  • }

  • t=(struct task*)queue_dequeue(pool->tasks);

  • pthread_cleanup_pop(0);

  • pthread_mutex_unlock(&pool->lock);

  • t->routine(t->arg);/*todo: report returned value*/

  • free(t);

  • }

  • return NULL;

  • }

  • ?
  • thread_pool_t thread_pool_create(unsigned int thread_count) {

  • unsigned int i;

  • thread_pool_t pool=NULL;

  • pool=(thread_pool_t)malloc(sizeof(struct thread_pool));

  • pool->thread_count=thread_count;

  • pool->threads=(pthread_t*)malloc(sizeof(pthread_t)*thread_count);

  • ?
  • pool->tasks=queue_create();

  • ?
  • pthread_mutex_init(&pool->lock, NULL);

  • pthread_cond_init(&pool->task_ready, NULL);

  • ?
  • for(i=0; i<thread_count; i++) {

  • pthread_create(pool->threads+i, NULL, (void*(*)(void*))worker, pool);

  • }

  • return pool;

  • }

  • ?
  • void thread_pool_add_task(thread_pool_t pool, void* (*routine)(void *arg), void *arg) {

  • struct task *t;

  • pthread_mutex_lock(&pool->lock);

  • t=(struct task*)queue_enqueue(pool->tasks, sizeof(struct task));

  • t->routine=routine;

  • t->arg=arg;

  • pthread_cond_signal(&pool->task_ready);

  • pthread_mutex_unlock(&pool->lock);

  • }

  • ?
  • void thread_pool_destroy(thread_pool_t pool) {

  • unsigned int i;

  • for(i=0; i<pool->thread_count; i++) {

  • pthread_cancel(pool->threads[i]);

  • }

  • for(i=0; i<pool->thread_count; i++) {

  • pthread_join(pool->threads[i], NULL);

  • }

  • pthread_mutex_destroy(&pool->lock);

  • pthread_cond_destroy(&pool->task_ready);

  • queue_destroy(pool->tasks);

  • free(pool->threads);

  • free(pool);

  • }

  • 上面的worker函數(shù)就是工作線程函數(shù),所有的工作線程都在執(zhí)行著這個(gè)函數(shù)。它首先在互斥鎖和條件變量的保護(hù)下從任務(wù)隊(duì)列中取出一個(gè)任務(wù),這個(gè)任務(wù)實(shí)際上是一個(gè)函數(shù)指針和調(diào)用函數(shù)所需的參數(shù),所以執(zhí)行任務(wù)就很簡單了——用任務(wù)參數(shù)調(diào)用任務(wù)函數(shù)。函數(shù)返回以后,工作線程繼續(xù)去搶任務(wù)。

    ?

    這里沒有處理任務(wù)函數(shù)的返回值問題,理論上任務(wù)函數(shù)返回以后線程池應(yīng)該用某種機(jī)制通知主程序,然后主程序獲取通過某種手段獲取返回值,但這明顯不是一個(gè)最簡單的線程池需要操心的事。實(shí)際上,應(yīng)用程序可以通過全局變量或傳入的參數(shù)指針,加上額外的線程同步代碼解決返回值的通知和獲取問題。
    還有一點(diǎn)需要注意,最后線程池銷毀時(shí)會(huì)強(qiáng)制終止所有處于撤銷點(diǎn)(cacellation points)的工作線程,如果工作線程正在任務(wù)函數(shù)中沒返回而且任務(wù)函數(shù)中有非手動(dòng)創(chuàng)建的撤銷點(diǎn),那么任務(wù)函數(shù)就會(huì)在跑到撤銷點(diǎn)時(shí)戛然而止,這可能導(dǎo)致意外結(jié)果。而如果任務(wù)函數(shù)中沒有任何線程撤銷點(diǎn),那么線程池銷毀函數(shù)會(huì)一直阻塞等待直到任務(wù)函數(shù)完成后才能終止對應(yīng)的工作線程并返回。

    要正確處理這個(gè)問題,線程池使用者必須通過自己的線程同步代碼保證調(diào)用thread_pool_destroy之前所有任務(wù)都已經(jīng)完成、終止或者取消。
    ?

    總結(jié)

    以上是生活随笔為你收集整理的非常精简的Linux线程池实现(一)——使用互斥锁和条件变量的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。