非常精简的Linux线程池实现(一)——使用互斥锁和条件变量
https://blog.csdn.net/kxcfzyk/article/details/31719687
線程池的含義跟它的名字一樣,就是一個(gè)由許多線程組成的池子。
有了線程池,在程序中使用多線程變得簡單。我們不用再自己去操心線程的創(chuàng)建、撤銷、管理問題,有什么要消耗大量CPU時(shí)間的任務(wù)通通直接扔到線程池里就好了,然后我們的主程序(主線程)可以繼續(xù)干自己的事去,線程池里面的線程會(huì)自動(dòng)去執(zhí)行這些任務(wù)。
另一方面,線程池提升了多線程程序的性能。我們不需要在大量任務(wù)需要執(zhí)行時(shí)現(xiàn)創(chuàng)建大量線程,然后在任務(wù)結(jié)束時(shí)又銷毀大量線程,因?yàn)榫€程池里面的線程都是現(xiàn)成的而且能夠重復(fù)使用。一個(gè)理想的線程池能夠合理地動(dòng)態(tài)調(diào)節(jié)池內(nèi)線程數(shù)量,既不會(huì)因?yàn)榫€程過少而導(dǎo)致大量任務(wù)堆積,也不會(huì)因?yàn)榫€程過多了而增加額外的系統(tǒng)開銷。
線程池看上去很神奇的樣子,那它是怎么實(shí)現(xiàn)的呢?線程這么虛渺在的東西也能像有形的物品一樣圈在一個(gè)池子里?在只知道線程池這個(gè)名字的時(shí)候,我心里的疑惑就是這樣的。
其實(shí)線程池的原理非常簡單,它就是一個(gè)非常典型的生產(chǎn)者消費(fèi)者同步問題。如果不知道我說的這個(gè)XXX問題也不要緊,我下面就解釋。
根據(jù)剛才描述的線程池的功能,可以看出線程池至少有兩個(gè)主要?jiǎng)幼?#xff0c;一個(gè)是主程序不定時(shí)地向線程池添加任務(wù),另一個(gè)是線程池里的線程領(lǐng)取任務(wù)去執(zhí)行。且不論任務(wù)和執(zhí)行任務(wù)是個(gè)什么概念,但是一個(gè)任務(wù)肯定只能分配給一個(gè)線程執(zhí)行。
這樣就可以簡單猜想線程池的一種可能的架構(gòu)了:主程序執(zhí)行入隊(duì)操作,把任務(wù)添加到一個(gè)隊(duì)列里面;池子里的多個(gè)工作線程共同對這個(gè)隊(duì)列試圖執(zhí)行出隊(duì)操作,這里要保證同一時(shí)刻只有一個(gè)線程出隊(duì)成功,搶奪到這個(gè)任務(wù),其他線程繼續(xù)共同試圖出隊(duì)搶奪下一個(gè)任務(wù)。所以在實(shí)現(xiàn)線程池之前,我們需要一個(gè)隊(duì)列,我為這個(gè)線程池配備的隊(duì)列單獨(dú)放到了另一篇博客一個(gè)通用純C隊(duì)列的實(shí)現(xiàn)中。
這里的生產(chǎn)者就是主程序,生產(chǎn)任務(wù)(增加任務(wù)),消費(fèi)者就是工作線程,消費(fèi)任務(wù)(執(zhí)行、減少任務(wù))。因?yàn)檫@里涉及到多個(gè)線程同時(shí)訪問一個(gè)隊(duì)列的問題,所以我們需要互斥鎖來保護(hù)隊(duì)列,同時(shí)還需要條件變量來處理主線程通知任務(wù)到達(dá)、工作線程搶奪任務(wù)的問題。如果不熟悉條件變量,我在另一篇博客Linux C語言多線程庫Pthread中條件變量的的正確用法逐步詳解中作了詳細(xì)說明。
準(zhǔn)備工作都差不多了,可以開始設(shè)計(jì)線程池了。一個(gè)最簡單線程池應(yīng)該有什么功能呢?對于使用者來說,除了創(chuàng)建和銷毀線程池,最簡單的情況下只需要一個(gè)功能——添加任務(wù)。對于線程池自己來說,最簡單的情況下不需要?jiǎng)討B(tài)調(diào)節(jié)線程數(shù)量,不需要考慮線程同步、線程死鎖等等一大堆麻煩的問題。所以最后的線程池API定義為:
?
//thread_pool.h
#ifndef THREAD_POOL_H_INCLUDED
#define THREAD_POOL_H_INCLUDED
typedef struct thread_pool *thread_pool_t;
thread_pool_t thread_pool_create(unsigned int thread_count);
void thread_pool_add_task(thread_pool_t pool, void* (*routine)(void *arg), void *arg);
void thread_pool_destroy(thread_pool_t pool);
#endif //THREAD_POOL_H_INCLUDED
創(chuàng)建線程池時(shí)指定線程池中應(yīng)該固定包含多少工作線程,添加任務(wù)就是向線程池添加一個(gè)任務(wù)函數(shù)指針和任務(wù)函數(shù)需要的參數(shù)——這跟Pthread線程庫中的普通線程創(chuàng)建函數(shù)pthread_create是一樣的。根據(jù)這套線程池API,我們使用線程池的應(yīng)用程序應(yīng)該是這個(gè)套路:
?
?
//test.c
#include "thread_pool.h"
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void* test(void *arg) {
int i;
for(i=0; i<5; i++) {
printf("tid:%ld task:%ld\n", pthread_self(), (long)arg);
fflush(stdout);
sleep(2);
}
return NULL;
}
int main() {
long i=0;
thread_pool_t pool;
pool=thread_pool_create(2);
for(i=0; i<5; i++) {
thread_pool_add_task(pool, test, (void*)i);
}
puts("press enter to terminate ...");
getchar();
thread_pool_destroy(pool);
return 0;
}
上面這個(gè)測試程序向線程池添加了5個(gè)相同的任務(wù),每個(gè)任務(wù)耗時(shí)10秒,但是線程池中只有2個(gè)工作線程,所以程序的運(yùn)行結(jié)果是兩個(gè)工作線程輪流把5個(gè)任務(wù)挨個(gè)做完。顯示到屏幕上就是:前10秒兩個(gè)工作線程輪流輸出自己的線程ID和當(dāng)前任務(wù)的任務(wù)號0和1,各輸出5次;第二個(gè)10秒兩個(gè)工作線程輪流輸出自己的線程ID和當(dāng)前任務(wù)的任務(wù)號2和3……
?
在這期間,主程序輸出“press enter to terminate ...”并等待用戶輸入,任何時(shí)候都可以按回車讓主程序繼續(xù)往下,這樣會(huì)強(qiáng)制終止所有工作線程并銷毀線程池,最后程序退出。test程序運(yùn)行效果截圖如下:
?
最后就是線程池真正的實(shí)現(xiàn)了:
?
?
//thread_pool.c
#include "thread_pool.h"
#include "queue.h"
#include <stdlib.h>
#include <pthread.h>
struct thread_pool {
unsigned int thread_count;
pthread_t *threads;
queue_t tasks;
pthread_mutex_t lock;
pthread_cond_t task_ready;
};
struct task {
void* (*routine)(void *arg);
void *arg;
};
static void cleanup(pthread_mutex_t* lock) {
pthread_mutex_unlock(lock);
}
static void * worker(thread_pool_t pool) {
struct task *t;
while(1) {
pthread_mutex_lock(&pool->lock);
pthread_cleanup_push((void(*)(void*))cleanup, &pool->lock);
while(queue_isempty(pool->tasks)) {
pthread_cond_wait(&pool->task_ready, &pool->lock);
/*A condition wait (whether timed or not) is a cancellation point ... a side-effect of acting upon a cancellation request while in a condition wait is that the mutex is (in effect) re-acquired before calling the first cancellation cleanup handler.*/
}
t=(struct task*)queue_dequeue(pool->tasks);
pthread_cleanup_pop(0);
pthread_mutex_unlock(&pool->lock);
t->routine(t->arg);/*todo: report returned value*/
free(t);
}
return NULL;
}
thread_pool_t thread_pool_create(unsigned int thread_count) {
unsigned int i;
thread_pool_t pool=NULL;
pool=(thread_pool_t)malloc(sizeof(struct thread_pool));
pool->thread_count=thread_count;
pool->threads=(pthread_t*)malloc(sizeof(pthread_t)*thread_count);
pool->tasks=queue_create();
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->task_ready, NULL);
for(i=0; i<thread_count; i++) {
pthread_create(pool->threads+i, NULL, (void*(*)(void*))worker, pool);
}
return pool;
}
void thread_pool_add_task(thread_pool_t pool, void* (*routine)(void *arg), void *arg) {
struct task *t;
pthread_mutex_lock(&pool->lock);
t=(struct task*)queue_enqueue(pool->tasks, sizeof(struct task));
t->routine=routine;
t->arg=arg;
pthread_cond_signal(&pool->task_ready);
pthread_mutex_unlock(&pool->lock);
}
void thread_pool_destroy(thread_pool_t pool) {
unsigned int i;
for(i=0; i<pool->thread_count; i++) {
pthread_cancel(pool->threads[i]);
}
for(i=0; i<pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->task_ready);
queue_destroy(pool->tasks);
free(pool->threads);
free(pool);
}
上面的worker函數(shù)就是工作線程函數(shù),所有的工作線程都在執(zhí)行著這個(gè)函數(shù)。它首先在互斥鎖和條件變量的保護(hù)下從任務(wù)隊(duì)列中取出一個(gè)任務(wù),這個(gè)任務(wù)實(shí)際上是一個(gè)函數(shù)指針和調(diào)用函數(shù)所需的參數(shù),所以執(zhí)行任務(wù)就很簡單了——用任務(wù)參數(shù)調(diào)用任務(wù)函數(shù)。函數(shù)返回以后,工作線程繼續(xù)去搶任務(wù)。
?
這里沒有處理任務(wù)函數(shù)的返回值問題,理論上任務(wù)函數(shù)返回以后線程池應(yīng)該用某種機(jī)制通知主程序,然后主程序獲取通過某種手段獲取返回值,但這明顯不是一個(gè)最簡單的線程池需要操心的事。實(shí)際上,應(yīng)用程序可以通過全局變量或傳入的參數(shù)指針,加上額外的線程同步代碼解決返回值的通知和獲取問題。
還有一點(diǎn)需要注意,最后線程池銷毀時(shí)會(huì)強(qiáng)制終止所有處于撤銷點(diǎn)(cacellation points)的工作線程,如果工作線程正在任務(wù)函數(shù)中沒返回而且任務(wù)函數(shù)中有非手動(dòng)創(chuàng)建的撤銷點(diǎn),那么任務(wù)函數(shù)就會(huì)在跑到撤銷點(diǎn)時(shí)戛然而止,這可能導(dǎo)致意外結(jié)果。而如果任務(wù)函數(shù)中沒有任何線程撤銷點(diǎn),那么線程池銷毀函數(shù)會(huì)一直阻塞等待直到任務(wù)函數(shù)完成后才能終止對應(yīng)的工作線程并返回。
要正確處理這個(gè)問題,線程池使用者必須通過自己的線程同步代碼保證調(diào)用thread_pool_destroy之前所有任務(wù)都已經(jīng)完成、終止或者取消。
?
總結(jié)
以上是生活随笔為你收集整理的非常精简的Linux线程池实现(一)——使用互斥锁和条件变量的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LOL我不怎么会ADC 想买一个奥巴马去
- 下一篇: Linux C 实现一个简单的线程池