martin_pthread_pool
生活随笔
收集整理的這篇文章主要介紹了
martin_pthread_pool
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
這是我所用的線程池,主任務為復制文件.首先是功能函數.c和.h文件
cp_dir.h
#ifndef __CP_DIR_H__ #define __CP_DIR_H__#include "head.h" #include "pthread_pool.h"#define MAXNAME_LEN 4096 struct cp_file {//要復制的文件char src[MAXNAME_LEN];//目標文件char dest[MAXNAME_LEN]; }; //將dir_src 指向的目錄拷貝到dir_dest目錄下 void cp_dir( pthread_pool *pool, char *dir_src, char *dir_dest);//普通函數-->實現兩個文件之間的拷貝 void cp_file(void *arg);#endif這是 cp_dir.c 文件
#include "cp_dir.h"//普通函數--->實現兩個文件之間的拷貝 void cp_file(void *arg) {//設置本線程為分離屬性//pthread_detach(pthread_self());//將兩個文件名解析出來struct cp_file *p = (struct cp_file *)arg;//以只讀的方式去打開源文件int fd_src = open(p->src,O_RDONLY);if(fd_src == -1){printf("%s",p->src);fflush(stdout);perror("open src file failed");goto cp_return;}//以可讀可寫打開目標文件,如果目標文件不存在的話則創建,//如果目標文件存在的話則截短。int fd_dest = open(p->dest,O_RDWR | O_CREAT | O_TRUNC,0777);if(fd_dest == -1){perror("open dest file failed");goto cp_return; }int ret;char buf[1024];//讀取源文件的內容寫入到目標文件中去while(1){ret = read(fd_src,buf,sizeof(buf));if(ret == 0){break;}else if(ret > 0){int w = write(fd_dest,buf,ret);if(w != ret){perror("write failed");}}else{perror("read error");break;}}cp_return:close(fd_dest);close(fd_src);free(p); }//利用pool所指向的線程池將dir_src指向的目錄拷貝到dir_dest目錄下 void cp_dir(pthread_pool *pool,char *dir_src,char *dir_dest) {//獲取到要拷貝的目錄的目錄名(不帶路徑的)char *dirname = basename(dir_src);//dirname --> "20200907"//根據剛才獲取到的目錄名和目標路徑合成目標目錄(帶路徑)char newdirname[MAXNAME_LEN] = {0};sprintf(newdirname,"%s/%s",dir_dest,dirname);//newdirname -> "/mnt/hgfs/CS20201/20200907"printf("%s\n",newdirname);//先創建目標目錄mkdir(newdirname,0777);dir_dest = newdirname;//打開要復制的目錄DIR *dir = opendir(dir_src);if(dir == NULL){perror("open src dir failed");return ;}struct dirent *dirp = NULL;//讀取目錄項while(dirp = readdir(dir)){if(!strcmp(dirp->d_name,".") || !strcmp(dirp->d_name,"..")){continue;}char file_src[MAXNAME_LEN] = {0};//獲取到目錄項的名字(帶路徑)sprintf(file_src,"%s/%s",dir_src,dirp->d_name);printf("file_src : %s\n",file_src);//獲取目錄項的屬性struct stat st;lstat(file_src,&st);//如果是普通文件或者是鏈接文件就直接創建一個線程去拷貝即可if(S_ISREG(st.st_mode) || S_ISLNK(st.st_mode)){//開辟一個線程去對兩個文件實現直接拷貝char file_dest[MAXNAME_LEN] = {0};sprintf(file_dest,"%s/%s",dir_dest,dirp->d_name);//此時要把源文件和目標文件都傳遞給線程函數 //所以需要把這個目標文件和源文件都保存到一個結構體中//再把這個結構體傳遞給線程函數struct cp_file *cp = malloc(sizeof(*cp));strcpy(cp->src,file_src);strcpy(cp->dest,file_dest);printf("拷貝的兩個文件名為:\n");printf("src_file : %s\n",file_src);printf("des_file : %s\n",file_dest);printf("--------------------------------------\n");/*//創建一個線程去執行copy的任務pthread_t tid;int ret = pthread_create(&tid,NULL,cp_file,(void*)cp);if(ret != 0){perror("pthread create failed");}//pthread_join(tid,NULL);*//*int ret = pthread_create(&tids[tid_num++],NULL,cp_file,(void*)cp);if(ret != 0){perror("pthread create failed");}*/add_task(pool,cp_file,(void *)cp);}else if(S_ISDIR(st.st_mode)){//如果是目錄的話則遞歸調用字節printf("拷貝的兩個目錄名為:\n");printf("src_dir : %s\n",file_src);printf("des_dir : %s\n",dir_dest);printf("--------------------------------------\n");cp_dir(pool,file_src,dir_dest);}else{printf("%s is UNKOWN TYPE!\n",file_src);}}closedir(dir);return ; }?
?
線程池頭文件
pthread_pool_copy.h
#ifndef __PTHREAD_POOL_H__ #define __PTHREAD_POOL_H__#include "head.h"//線程池中線程的最大數量 #define MAX_THREADS_NUM 50//線程池中任務隊列最大的任務數量 #define MAX_TASKS 1000typedef struct pthread_pool {//線程池的實現按照項目的不同也有所不同,但大體應該要有如下成員://因為“任務隊列”是一種共享資源,所有我們需要互斥鎖//就是說我們需要一把互斥鎖去保存“任務隊列”pthread_mutex_t lock;//同時當我們的任務隊列中沒有任務的時候,線程池內的線程應該要休眠//以避免系統資源的浪費,所有需要線程條件變量//線程條件變量用來表示“任務隊列”中是否有任務pthread_cond_t cond;//任務隊列(鏈表),指向第一個需要執行的任務//所有的線程都從這個任務鏈表中獲取任務struct task *task_list;//指向線程ID的數組,用來保存線程池中所有線程的IDpthread_t *tids;//線程池中正在服役的線程數--->線程的個數unsigned int active_threads;//線程池中任務隊列最大的任務數量unsigned int max_waiting_tasks;//線程池中任務隊列當前的任務數量unsigned int cur_waiting_tasks;//表示是否退出程序bool shutdown;//bool-->true(1) false(0) }pthread_pool;//任務節點 struct task {//每一個任務結點保存一個任務.//所謂任務實際上就是把一個文件或目錄從源目錄下拷貝到目標目錄下//那么這個任務該如何去保存到任務節點上面去?//任務的完成是通過函數來實現的,所以我們如果要去保存一個任務的話//只需要保存完成任務的函數(cp_file)的指針既可以了。//也就是說如果我們要去完成一個任務,就是去節點保存的地址上去//執行一個函數就可以啦,函數只要執行完了,任務也就完成了。//那么我們就需要定義一個函數指針。保存任務函數(cp_file)的地址void (*do_task)(void *arg);//另外,我們可能需要給任務函數傳遞參數(比如:文件名)void *arg;//下一個任務struct task *next; };/*init_pool:線程池的初始化函數初始化指定的線程池的,線程池中有thread_num個初始化線程。@pool:線程池指針,指向你要初始化的線程池@thread_num:你要初始化的線程池中一開始線程的數量@返回值:成功返回0,失敗返回-1。 */ int init_pool(phtread_pool *pool,unsigned int thread_num);/*routine:任務調配函數--->線程函數所有線程開始都回去執行這個函數,此函數會不斷的從線程池的任務隊列中取任務然后交給線程去執行取任務--->arg表示的是線程池的指針,在線程池中有任務隊列,任務隊列中有任務節點,每一個任務節點中都包含了函數指針和函數參數。 */ void *routine(void *arg);/*銷毀線程池:銷毀線程池前要保證所有的任務都已經完成了 */ int destroy_pool(pthread_pool *pool);/*add_task:給任務隊列增加任務,把do_task指向的任務(函數指針)和arg指向的參數保存到一個任務節點中去,同時將任務節點添加到pool表示的線程池的任務隊列中去。@pool:指針,指向你要添加的任務的線程池@fun_task:你要添加的任務(cp_file)@fun_arg:你要執行的任務的參數(兩個文件的文件名)@返回值:成功返回0,失敗返回-1。 */ int add_task(pthread_pool *pool,void (*fun_task)(void *arg),void *fun_arg);//往線程池中添加線程 int add_threads(pthread_pool *pool,unsigned int add_threads_num);//刪除線程池中的線程 int remove_threads(pthread_pool *pool,unsigned int remove_threads_num);#endif線程池函數
#include "pthread_pool.h"/*init_pool:線程池的初始化函數初始化指定的線程池的,線程池中有thread_num個初始化線程。@pool:線程池指針,指向你要初始化的線程池@thread_num:你要初始化的線程池中一開始線程的數量@返回值:成功返回0,失敗返回-1。 */ int init_pool(pthread_pool *pool,unsigned int thread_num) {//初始化線程池結構體//初始化線程互斥鎖pthread_mutex_init(&pool->lock,NULL);//初始化線程條件變量pthread_cond_init(&pool->cond,NULL);//創建一個任務節點,并且使用task_list去指向它//需要注意的是第一個任務節點時沒有初始化的,即沒有值的//因為對線程池進行初始化的時候,此時還沒有任務pool->task_list = (struct task *)malloc(sizeof(struct task));pool->task_list->next = NULL;//指向線程ID的數組進行初始化,此數組中保存了線程池中所有線程的ID//用來保存線程ID的數組空間最好是用malloc進行開辟而且這塊要足夠大pool->tids = (pthread_t *)malloc(sizeof(pthread_t)*MAX_THREADS_NUM);if(pool->task_list == NULL || pool->tids == NULL){perror("malloc memory failed");return -1;}//線程池中正在服役的線程數量-->總的線程個數pool->active_threads = thread_num;//線程池中任務隊列最大的任務數量pool->max_waiting_tasks = MAX_TASKS;//線程池中任務隊列當前的任務數量pool->cur_waiting_tasks = 0;//不退出pool->shutdown = false;//創建thread_num個線程,同時記錄所有線程的ID,讓所有線程//一開始就執行任務調配函數去執行任務。int i;for(i = 0;i < thread_num;i++){if(pthread_create(&pool->tids[i],NULL,routine,(void *)pool) != 0){perror("create thread failed");return -1;}//打印調試信息printf("[%lu] create tids[%d] : [%lu] is success!\n",pthread_self(),i,pool->tids[i]);}return 0; }//清理函數--->防止某一個線程帶鎖退出 void handler(void *arg) {pthread_mutex_unlock((pthread_mutex_t *)arg); }/*routine:任務調配函數--->線程函數所有線程開始都回去執行這個函數,此函數會不斷的從線程池的任務隊列中取任務然后交給線程去執行取任務--->arg表示的是線程池的指針,在線程池中有任務隊列,任務隊列中有任務節點,每一個任務節點中都包含了函數指針和函數參數。 */ void *routine(void *arg) {struct task *p = NULL;//因為這個函數需要去訪問任務隊列,任務隊列是共享資源//所以需要上鎖//arg是你的線程池的指針.pthread_pool *pool = (pthread_pool *)arg;while(1){//在上鎖之前可以設置一個線程退出清理函數//當線程因為某種意外情況導致線程夭折的時候自動執行//我指定的清理函數(一般是解鎖為了防止線程帶鎖退出)//只能用在線程中pthread_cleanup_push(handler,(void *)&pool->lock);//獲取到線程互斥鎖,上鎖pthread_mutex_lock(&pool->lock);//任務隊列的情況分為很多種://1.任務隊列沒有任務并且線程池還沒有結束的時候//此時,在沒有任務的時候(條件不滿足的情況下),需要等待新任務的到來//此處不能用if,必須要用while,因為一個線程被喚醒之后,有可能"搶不到"任務while(pool->cur_waiting_tasks == 0 && !pool->shutdown){//當前線程陷入休眠pthread_cond_wait(&pool->cond,&pool->lock);//當前線程被喚醒的條件://1.當有新任務的時候 2.當我要銷毀線程池的時候}//2.任務隊列中沒有任務并且線程池要結束if(pool->cur_waiting_tasks == 0 && pool->shutdown){//解鎖pthread_mutex_unlock(&pool->lock);pthread_exit(NULL);}//3.任務隊列中有任務,將任務節點從任務鏈表中取下來//找到任務鏈表中的第二個節點,因為第一個節點上面是空節點,沒有任務p = pool->task_list->next;//摘除結點,更新鏈表pool->task_list->next = p->next;p->next = NULL;pool->cur_waiting_tasks--;//獲取到線程互斥鎖,解鎖pthread_mutex_unlock(&pool->lock);//清理線程退出函數pthread_cleanup_pop(0);//當條件滿足(任務隊列中有任務的)的時候,去執行任務.//執行拷貝任務的時候,拷貝任務是不能被打斷的,所以我們要設置//線程屬性為不可被取消(干掉)pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);//按任務節點中的任務去執行任務(去執行cp_file函數)(p->do_task)(p->arg);//取消線程的不可被取消的屬性pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);//釋放任務節點free(p);} }/*銷毀線程池:銷毀線程池前要保證所有的任務都已經完成了 */ int destroy_pool(pthread_pool *pool) {//釋放所有的空間等待任務執行完畢pool->shutdown = true;//線程池要結束了//喚醒所有的線程pthread_cond_broadcast(&pool->cond); //利用join函數回收每一個線程int i;for(i = 0;i < pool->active_threads;i++){//從第0個線程開始回收int r = pthread_join(pool->tids[i],NULL);if(r != 0){printf("Recover tids[%d] failed!\n",i);}else{printf("Recover tids[%d] success!\n",i); }}//銷毀線程互斥鎖pthread_mutex_destroy(&pool->lock);//銷毀線程條件變量pthread_cond_destroy(&pool->cond);//釋放任務節點if(pool->task_list){free(pool->task_list);//只需要釋放第一個節點即可}free(pool->tids);free(pool);return 0; }/*add_task:給任務隊列增加任務,把do_task指向的任務(函數指針)和arg指向的參數保存到一個任務節點中去,同時將任務節點添加到pool表示的線程池的任務隊列中去。@pool:指針,指向你要添加的任務的線程池@fun_task:你要添加的任務(cp_file)@fun_arg:你要執行的任務的參數(兩個文件的文件名)@返回值:成功返回0,失敗返回-1。 */ int add_task(pthread_pool *pool,void (*fun_task)(void *arg),void *fun_arg) {//在往任務隊列中添加任務的時候,需要注意解鎖和上鎖//加入任務后要喚醒等待的線程//創建一個新的節點去保存任務struct task *new_task = malloc(sizeof(*new_task));new_task->next = NULL;new_task->do_task = fun_task;new_task->arg = fun_arg;//把任務節點添加到任務鏈表中去pthread_mutex_lock(&pool->lock);//任務隊列中的任務數量已達上限if(pool->cur_waiting_tasks >= MAX_TASKS){pthread_mutex_unlock(&pool->lock);printf("Too many task,add task failed!\n");//釋放任務節點空間free(new_task->arg);free(new_task);return -1;}//任務節點采用尾插法插入任務鏈表(此處不能頭插)struct task *find = pool->task_list;while(find->next){find = find->next;}//已經找到尾結點直接添加任務到鏈表中即可find->next = new_task;pool->cur_waiting_tasks++;//訪問完任務鏈表完之后解鎖pthread_mutex_unlock(&pool->lock);//添加完之后,喚醒線程pthread_cond_signal(&pool->cond);return 0; }//往線程池中添加線程 int add_threads(pthread_pool *pool,unsigned int add_threads_num) {}//刪除線程池中的線程 int remove_threads(pthread_pool *pool,unsigned int remove_threads_num) {}主函數頭文件
#ifndef __HEAD_H__ #define __HEAD_H__#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <sys/types.h> #include <unistd.h> #include <sys/stat.h> #include <fcntl.h> #include <dirent.h> #include <libgen.h>#endif主函數
#include "cp_dir.h"//./a.out dir_src dir_dest int main(int argc,char *argv[]) {if(argc != 3){printf("Usage : %s <dir_src> <dir_dest>!\n",argv[0]);return -1;}//創建一個線程池pthread_pool *pool = malloc(sizeof(*pool));//初始化線程池int ret = init_pool(pool);if(ret == -1){printf("init pthread pool failed!\n");return -1;}//利用線程池實現目錄之間的拷貝cp_dir(pool,argv[1],argv[2]);destroy_pool(pool);return 0; }?
總結
以上是生活随笔為你收集整理的martin_pthread_pool的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mac osx系统中virtual bo
- 下一篇: 进程基本概念和练习