日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

linux——select、poll、epoll

發布時間:2024/7/19 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 linux——select、poll、epoll 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • 1.多路I/O轉接服務器
    • 2.select
    • 3.select代碼
    • 4.poll
    • 5.epoll
      • 5.1 基礎API
      • 5.3 epoll代碼
      • 5.4 邊沿觸發和水平觸發
        • 5.4.1 水平出發LT
        • 5.4.2 邊緣觸發
        • 5.4.3 服務器的邊緣觸發和水平觸發
      • 5.4 邊緣觸發但是能一次讀完
    • 6.epoll反應堆模型
      • 6.1 反應堆模型
      • 6.2 epoll反應堆代碼
    • 7.心跳包
    • 8.線程池
      • 8.1 線程池代碼
      • 8.2 請問怎么實現線程池

1.多路I/O轉接服務器

多路IO轉接服務器也叫做多任務IO服務器。該類服務器實現的主旨思想是,不再由應用程序自己監視客戶端連接,取而代之由內核替應用程序監視文件。

2.select

#include <sys/select.h> #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);nfds: 監控的文件描述符集里最大文件描述符加1,因為此參數會告訴內核檢測前多少個文件描述符的狀態readfds: 監控有讀數據到達文件描述符集合,傳入傳出參數writefds: 監控寫數據到達文件描述符集合,傳入傳出參數exceptfds: 監控異常發生達文件描述符集合,如帶外數據到達異常,傳入傳出參數timeout: 定時阻塞監控時間,3種情況1.NULL,永遠等下去2.設置timeval,等待固定時間3.設置timeval里時間均為0,檢查描述字后立即返回,輪詢struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */};void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd清0int FD_ISSET(int fd, fd_set *set); //測試文件描述符集合里fd是否置1void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0返回值:成功:所監聽的所有監聽集合中滿足條件的總數;失敗:返回-1

3.select代碼

  • 服務器
  • /* server.c */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include "wrap.h"#define MAXLINE 80 #define SERV_PORT 6666int main(int argc, char *argv[]) {int i, maxi, maxfd, listenfd, connfd, sockfd;int nready, client[FD_SETSIZE]; /* FD_SETSIZE 默認為 1024 */ssize_t n;fd_set rset, allset;char buf[MAXLINE];char str[INET_ADDRSTRLEN]; /* #define INET_ADDRSTRLEN 16 */socklen_t cliaddr_len;struct sockaddr_in cliaddr, servaddr;listenfd = Socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));Listen(listenfd, 20); /* 默認最大128 */maxfd = listenfd; /* 初始化 */maxi = -1; /* client[]的下標 */for (i = 0; i < FD_SETSIZE; i++)client[i] = -1; /* 用-1初始化client[] */FD_ZERO(&allset);FD_SET(listenfd, &allset); /* 構造select監控文件描述符集 */for ( ; ; ) {rset = allset; /* 每次循環時都從新設置select監控信號集 */nready = select(maxfd+1, &rset, NULL, NULL, NULL);if (nready < 0)perr_exit("select error");if (FD_ISSET(listenfd, &rset)) { /* new client connection */cliaddr_len = sizeof(cliaddr);connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));for (i = 0; i < FD_SETSIZE; i++) {if (client[i] < 0) {client[i] = connfd; /* 保存accept返回的文件描述符到client[]里 */break;}}/* 達到select能監控的文件個數上限 1024 */if (i == FD_SETSIZE) {fputs("too many clients\n", stderr);exit(1);}FD_SET(connfd, &allset); /* 添加一個新的文件描述符到監控信號集里 */if (connfd > maxfd)maxfd = connfd; /* select第一個參數需要 */if (i > maxi)maxi = i; /* 更新client[]最大下標值 */if (--nready == 0)continue; /* 如果沒有更多的就緒文件描述符繼續回到上面select阻塞監聽,負責處理未處理完的就緒文件描述符 */}for (i = 0; i <= maxi; i++) { /* 檢測哪個clients 有數據就緒 */if ( (sockfd = client[i]) < 0)continue;if (FD_ISSET(sockfd, &rset)) {if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {Close(sockfd); /* 當client關閉鏈接時,服務器端也關閉對應鏈接 */FD_CLR(sockfd, &allset); /* 解除select監控此文件描述符 */client[i] = -1;} else {int j;for (j = 0; j < n; j++)buf[j] = toupper(buf[j]);Write(sockfd, buf, n);}if (--nready == 0)break;}}}close(listenfd);return 0; }
  • 客戶端
  • /* client.c */ #include <stdio.h> #include <string.h> #include <unistd.h> #include <netinet/in.h> #include "wrap.h"#define MAXLINE 80 #define SERV_PORT 6666int main(int argc, char *argv[]) {struct sockaddr_in servaddr;char buf[MAXLINE];int sockfd, n;sockfd = Socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);servaddr.sin_port = htons(SERV_PORT);Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));while (fgets(buf, MAXLINE, stdin) != NULL) {Write(sockfd, buf, strlen(buf));n = Read(sockfd, buf, MAXLINE);if (n == 0)printf("the other side has been closed.\n");elseWrite(STDOUT_FILENO, buf, n);}Close(sockfd);return 0; }

    4.poll

    #include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* 文件描述符 */short events; /* 監控的事件 */short revents; /* 監控事件中滿足條件返回的事件 */};POLLIN 普通或帶外優先數據可讀,即POLLRDNORM | POLLRDBANDPOLLRDNORM 數據可讀POLLRDBAND 優先級帶數據可讀POLLPRI 高優先級可讀數據POLLOUT 普通或帶外數據可寫POLLWRNORM 數據可寫POLLWRBAND 優先級帶數據可寫POLLERR 發生錯誤POLLHUP 發生掛起POLLNVAL 描述字不是一個打開的文件nfds 監控數組中有多少文件描述符需要被監控timeout 毫秒級等待-1:阻塞等,#define INFTIM -1 Linux中沒有定義此宏0:立即返回,不阻塞進程>0:等待指定毫秒數,如當前系統時間精度不夠毫秒,向上取值如果不再監控某個文件描述符時,可以把pollfd中,fd設置為-1,poll不再監控此pollfd, 下次返回時,把revents設置為0#include<stdio.h> #include<stdlib.h> #include<string.h> #include<netinet/in.h> #include<arpa/inet.h> #include<poll.h> #include<errno.h> #include<ctype.h> #include<unistd.h>#define MAXLINE 80 #define SERV_PORT 8000 #define OPEN_MAX 1024int main() {int i,j,maxi,listenfd,connfd,sockfd;int nready;ssize_t n;char buf[MAXLINE],str[INET_ADDRSTRLEN];socklen_t clilen;struct pollfd client[OPEN_MAX];struct sockaddr_in cliaddr,servaddr;listenfd=socket(AF_INET,SOCK_STREAM,0);int opt=1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htonl(INADDR_ANY);servaddr.sin_port=htons(SERV_PORT);bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));listen(listenfd,120);client[0].fd=listenfd;client[0].events=POLLIN;for(int i=1;i<OPEN_MAX;i++){client[i].fd=-1;}maxi=0;for( ; ;){nready=poll(client,maxi+1,-1);/*這個if語句監聽listenfd是否有讀事件,也就是是否有客戶端連接請求,有的話accept連接,并將客戶端的文件描述符添加到監聽隊列中,進行監聽*/if(client[0].revents & POLLIN){clilen=sizeof(cliaddr);connfd=accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);printf("received from %s at PORT %d\n",inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));for(i=1;i<OPEN_MAX;i++){if(client[i].fd<0){client[i].fd=connfd;break;}}if(i==OPEN_MAX){perror("too many clients");}client[i].events=POLLIN;if(i>maxi){maxi=i;}if(--nready<=0){continue;}}for(int i=1;i<=maxi;i++){if((sockfd=client[i].fd)<0){continue;}if(client[i].revents & POLLIN){if((n=read(sockfd,buf,MAXLINE))<0){if(errno==ECONNRESET){close(sockfd);client[i].fd=-1;}else{perror("read error");}}else if(n==0){printf("client[%d] closed connection\n",i);close(sockfd);client[i].fd=-1;}else{for(j=0;j<n;j++){buf[j]=toupper(buf[j]);}printf("read 執行\n");write(sockfd,buf,n);}if(--nready<=0){break;}}}}return 0; }

    5.epoll

    epoll是Linux下多路復用IO接口select/poll的增強版本,它能顯著提高程序在大量并發連接中只有少量活躍的情況下的系統CPU利用率,因為它會復用文件描述符集合來傳遞結果而不用迫使開發者每次等待事件之前都必須重新準備要被偵聽的文件描述符集合,另一點原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。
    目前epell是linux大規模并發網絡程序中的熱門首選模型。
    epoll除了提供select/poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提高應用程序效率。
    可以使用cat命令查看一個進程可以打開的socket描述符上限。

    cat /proc/sys/fs/file-max

    如有需要,可以通過修改配置文件的方式修改該上限值

    sudo vi /etc/security/limits.conf在文件尾部寫入以下配置,soft軟限制,hard硬限制。如下圖所示。* soft nofile 65536* hard nofile 100000

    5.1 基礎API

  • 創建一個epoll句柄,參數size用來告訴內核監聽的文件描述符的個數,跟內存大小有關。
  • #include <sys/epoll.h>int epoll_create(int size) size:監聽數目
  • 控制某個epoll監控的文件描述符上的事件:注冊、修改、刪除。
  • #include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)epfd: 為epoll_creat的句柄op: 表示動作,用3個宏來表示:EPOLL_CTL_ADD (注冊新的fd到epfd)EPOLL_CTL_MOD (修改已經注冊的fd的監聽事件)EPOLL_CTL_DEL (從epfd刪除一個fd);event: 告訴內核需要監聽的事件struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;EPOLLIN : 表示對應的文件描述符可以讀(包括對端SOCKET正常關閉)EPOLLOUT: 表示對應的文件描述符可以寫EPOLLPRI: 表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來)EPOLLERR: 表示對應的文件描述符發生錯誤EPOLLHUP: 表示對應的文件描述符被掛斷;EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水平觸發(Level Triggered)而言的EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里
  • 等待所監控文件描述符上有事件的產生,類似于select()調用
  • #include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)events: 用來存內核得到事件的集合,maxevents: 告之內核這個events有多大,這個maxevents的值不能大于創建epoll_create()時的size,timeout: 是超時時間-1: 阻塞0: 立即返回,非阻塞>0: 指定毫秒返回值: 成功返回有多少文件描述符就緒,時間到時返回0,出錯返回-1

    5.3 epoll代碼

    #include<stdio.h> #include<unistd.h> #include<stdlib.h> #include<string.h> #include<arpa/inet.h> #include<sys/epoll.h> #include<errno.h> #include<ctype.h>#define MAXLINE 8192 #define SERV_PORT 8000 #define OPEN_MAX 5000int main() {int i,listenfd,connfd,sockfd;int n,num=0;ssize_t nready,efd,res;char buf[MAXLINE],str[INET_ADDRSTRLEN];socklen_t clilen;struct sockaddr_in cliaddr,servaddr;struct epoll_event tep,ep[OPEN_MAX]; //tep:epoll_ctl參數;ep[]:epoll_wait參數listenfd=socket(AF_INET,SOCK_STREAM,0);int opt=1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htonl(INADDR_ANY);servaddr.sin_port=htons(SERV_PORT);bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));listen(listenfd,20);efd=epoll_create(OPEN_MAX); //創建epoll模型,efd指向紅黑樹根節點if(efd==-1){perror("epoll_create error");}tep.events=EPOLLIN;tep.data.fd=listenfd; //指定lfd的監聽事件為讀res=epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep); //將lfd及對應的結構體添加到紅黑樹上,efd可以找到該樹if(res==-1){perror("epoll_ctl error");}while(1){/*對紅黑樹上文件描述符監聽,ep為struct epoll_event類型數組,OPEN_MAX為數組容量-1永久阻塞*/nready=epoll_wait(efd,ep,OPEN_MAX,-1);if(nready==-1){perror("epoll_wait error");}for(int i=0;i<nready;i++){if(!(ep[i].events&EPOLLIN))continue;if(ep[i].data.fd==listenfd){clilen=sizeof(cliaddr);connfd=accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);//listenfd可讀,說明有客戶端連接printf("received from %s at PORT %d\n",inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));printf("cfd %d---client %d\n",connfd,++num);tep.events=EPOLLIN;tep.data.fd=connfd;res=epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); //將客戶端文件描述符添加到紅黑樹上if(res==-1){perror("epoll_ctl error");}}else{sockfd=ep[i].data.fd;n=read(sockfd,buf,MAXLINE);if(n==0){ //讀到0,說明客戶端關閉連接res=epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL); //將該文件描述符從紅黑樹摘除if(res==-1){perror("epoll_ctl error");}close(sockfd);printf("client[%d] closed connection\n",sockfd);}else if(n<0){ //出錯perror("read n<0 error:");res=epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL);close(sockfd);}else{for(int i=0;i<n;i++){buf[i]=toupper(buf[i]);}write(sockfd,buf,n);}}}}close(listenfd);close(efd);return 0; }

    5.4 邊沿觸發和水平觸發

    5.4.1 水平出發LT

    #include<stdio.h> #include<unistd.h> #include<stdlib.h> #include<sys/epoll.h> #include<errno.h>#define MAXLINE 10int main() {int efd,i;int pfd[2];pid_t pid;char buf[MAXLINE],ch='a';pipe(pfd);pid=fork();if(pid==0){close(pfd[0]);while(1){for(i=0;i<MAXLINE/2;i++){buf[i]=ch;}buf[i-1]='\n';ch++;for(;i<MAXLINE;i++){buf[i]=ch;}buf[i-1]='\n';ch++;write(pfd[1],buf,sizeof(buf));sleep(5);}close(pfd[1]);}else if(pid>0){struct epoll_event event;struct epoll_event resevent[10];int res,len;close(pfd[1]);efd=epoll_create(10);//event.events=EPOLLIN|EPOLLET; //邊緣觸發event.events=EPOLLIN; //水平觸發,默認水平觸發event.data.fd=pfd[0];epoll_ctl(efd,EPOLL_CTL_ADD,pfd[0],&event);while(1){res=epoll_wait(efd,resevent,10,-1);printf("res %d\n",res);if(resevent[0].data.fd==pfd[0]){len=read(pfd[0],buf,MAXLINE/2);write(STDOUT_FILENO,buf,len);}}close(pfd[0]);}} zhaoxr@zhaoxr-ThinkPad-E450:~/select$ ./epoll_trigger res 1 aaaa res 1 bbbb res 1 cccc res 1 dddd res 1 eeee res 1 ffff ^C

    5.4.2 邊緣觸發

    #include<stdio.h> #include<unistd.h> #include<stdlib.h> #include<sys/epoll.h> #include<errno.h>#define MAXLINE 10int main() {int efd,i;int pfd[2];pid_t pid;char buf[MAXLINE],ch='a';pipe(pfd);pid=fork();if(pid==0){close(pfd[0]);while(1){for(i=0;i<MAXLINE/2;i++){buf[i]=ch;}buf[i-1]='\n';ch++;for(;i<MAXLINE;i++){buf[i]=ch;}buf[i-1]='\n';ch++;write(pfd[1],buf,sizeof(buf));sleep(5);}close(pfd[1]);}else if(pid>0){struct epoll_event event;struct epoll_event resevent[10];int res,len;close(pfd[1]);efd=epoll_create(10);event.events=EPOLLIN|EPOLLET; //ET 邊緣觸發//event.events=EPOLLIN; //LT 水平觸發(默認)event.data.fd=pfd[0];epoll_ctl(efd,EPOLL_CTL_ADD,pfd[0],&event);while(1){res=epoll_wait(efd,resevent,10,-1);printf("res %d\n",res);if(resevent[0].data.fd==pfd[0]){len=read(pfd[0],buf,MAXLINE/2);write(STDOUT_FILENO,buf,len);}}close(pfd[0]);}}

    5.4.3 服務器的邊緣觸發和水平觸發

    邊緣觸發(非阻塞模式):

    #include <stdio.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <signal.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/epoll.h> #include <unistd.h>#define MAXLINE 10 #define SERV_PORT 8080int main(void) {struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int i, efd;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);struct epoll_event event;struct epoll_event resevent[10];int res, len;efd = epoll_create(10);event.events = EPOLLIN | EPOLLET; /* ET 邊沿觸發 ,默認是水平觸發 *///event.events=EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == connfd) {len = read(connfd, buf, MAXLINE/2);write(STDOUT_FILENO, buf, len);}}return 0; }

    水平觸發:

    #include <stdio.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <signal.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/epoll.h> #include <unistd.h>#define MAXLINE 10 #define SERV_PORT 8080int main(void) {struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int i, efd;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);struct epoll_event event;struct epoll_event resevent[10];int res, len;efd = epoll_create(10);//event.events = EPOLLIN | EPOLLET; /* ET 邊沿觸發 ,默認是水平觸發 */event.events=EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == connfd) {len = read(connfd, buf, MAXLINE/2);write(STDOUT_FILENO, buf, len);}}return 0; }

    客戶端程序:

    #include <stdio.h> #include <string.h> #include <unistd.h> #include <netinet/in.h> #include<arpa/inet.h> #define MAXLINE 10 #define SERV_PORT 8080int main(int argc, char *argv[]) {struct sockaddr_in servaddr;char buf[MAXLINE];int sockfd, i;char ch = 'a';sockfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);servaddr.sin_port = htons(SERV_PORT);connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));while (1) {for (i = 0; i < MAXLINE/2; i++)buf[i] = ch;buf[i-1] = '\n';ch++;for (; i < MAXLINE; i++)buf[i] = ch;buf[i-1] = '\n';ch++;write(sockfd, buf, sizeof(buf));sleep(5);}close(sockfd);return 0; }

    5.4 邊緣觸發但是能一次讀完

    邊緣觸發但是能一次讀完,不通過使用epoll_wait的觸發,可以一次讀完所有的數據,這其中的原理是:將文件描述符設置為非阻塞,通過while ((len = read(connfd, buf, MAXLINE/2)) > 0) write(STDOUT_FILENO, buf, len); ,可以一次將connfd中的所有數據讀完所有數據。

    ET的非阻塞模式比LT模式效率要高,因為ET減少了epoll_wait()的使用。

    結論:epoll 的 ET模式, 高效模式,但是只支持 非阻塞模式。 --- 忙輪詢。struct epoll_event event;event.events = EPOLLIN | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &event); int flg = fcntl(cfd, F_GETFL); flg |= O_NONBLOCK;fcntl(cfd, F_SETFL, flg);優點:高效。突破1024文件描述符。缺點:不能跨平臺。 Linux。

    6.epoll反應堆模型

    6.1 反應堆模型

    epoll 反應堆模型:epoll ET模式 + 非阻塞、輪詢 + void *ptr。原來: socket、bind、listen -- epoll_create 創建監聽 紅黑樹 -- 返回 epfd -- epoll_ctl() 向樹上添加一個監聽fd -- while1---- epoll_wait 監聽 -- 對應監聽fd有事件產生 -- 返回 監聽滿足數組。 -- 判斷返回數組元素 -- lfd滿足 -- Accept -- cfd 滿足 -- read() ---->-- write回去。反應堆:不但要監聽 cfd 的讀事件、還要監聽cfd的寫事件。socket、bind、listen -- epoll_create 創建監聽 紅黑樹 -- 返回 epfd -- epoll_ctl() 向樹上添加一個監聽fd -- while1---- epoll_wait 監聽 -- 對應監聽fd有事件產生 -- 返回 監聽滿足數組。 -- 判斷返回數組元素 -- lfd滿足 -- Accept -- cfd 滿足 -- read() ---->-- cfd從監聽紅黑樹上摘下 -- EPOLLOUT -- 回調函數 -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到紅黑上監聽寫事件-- 等待 epoll_wait 返回 -- 說明 cfd 可寫 -- write回去 -- cfd從監聽紅黑樹上摘下 -- EPOLLIN -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到紅黑上監聽讀事件 -- epoll_wait 監聽

    反應堆的理解:加入IO轉接之后,有了事件,server才去處理,這里反應堆也是這樣,由于網絡環境復雜,服務器處理數據之后,可能并不能直接寫回去,比如遇到網絡繁忙或者對方緩沖區已經滿了這種情況,就不能直接寫回給客戶端。反應堆就是在處理數據之后,監聽寫事件,能寫會客戶端了,才去做寫回操作。寫回之后,再改為監聽讀事件。如此循環。

    6.2 epoll反應堆代碼

    /* *epoll基于非阻塞I/O事件驅動 */ #include <stdio.h> #include <sys/socket.h> #include <sys/epoll.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <stdlib.h> #include <time.h> #define MAX_EVENTS 1024 //監聽上限數 #define BUFLEN 4096 #define SERV_PORT 8080 void recvdata(int fd, int events, void *arg); void senddata(int fd, int events, void *arg); /* 描述就緒文件描述符相關信息 */ struct myevent_s { int fd; //要監聽的文件描述符 int events; //對應的監聽事件 void *arg; //泛型參數 void (*call_back)(int fd, int events, void *arg); //回調函數 int status; //是否在監聽:1->在紅黑樹上(監聽), 0->不在(不監聽) char buf[BUFLEN]; int len; long last_active; //記錄每次加入紅黑樹 g_efd 的時間值 }; int g_efd; //全局變量, 保存epoll_create返回的文件描述符 struct myevent_s g_events[MAX_EVENTS+1]; //自定義結構體類型數組. +1-->listen fd /*將結構體 myevent_s 成員變量 初始化*/ void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg) { ev->fd = fd; ev->call_back = call_back; ev->events = 0; ev->arg = arg; ev->status = 0; memset(ev->buf, 0, sizeof(ev->buf)); ev->len = 0; ev->last_active = time(NULL); //調用eventset函數的時間 return; } /* 向 epoll監聽的紅黑樹 添加一個 文件描述符 */ //eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); void eventadd(int efd, int events, struct myevent_s *ev) { struct epoll_event epv = {0, {0}}; int op; epv.data.ptr = ev; epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUT if (ev->status == 0) { //已經在紅黑樹 g_efd 里 op = EPOLL_CTL_ADD; //將其加入紅黑樹 g_efd, 并將status置1 ev->status = 1; } if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //實際添加/修改 printf("event add failed [fd=%d], events[%d]\n", ev->fd, events); else printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events); return ; } /* 從epoll 監聽的 紅黑樹中刪除一個 文件描述符*/ void eventdel(int efd, struct myevent_s *ev) { struct epoll_event epv = {0, {0}}; if (ev->status != 1) //不在紅黑樹上 return ; //epv.data.ptr = ev; epv.data.ptr = NULL; ev->status = 0; //修改狀態 epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //從紅黑樹 efd 上將 ev->fd 摘除 return ; } /* 當有文件描述符就緒, epoll返回, 調用該函數 與客戶端建立鏈接 */ void acceptconn(int lfd, int events, void *arg) { struct sockaddr_in cin; socklen_t len = sizeof(cin); int cfd, i; if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) { if (errno != EAGAIN && errno != EINTR) { /* 暫時不做出錯處理 */ } printf("%s: accept, %s\n", __func__, strerror(errno)); return ; } do { for (i = 0; i < MAX_EVENTS; i++) //從全局數組g_events中找一個空閑元素 if (g_events[i].status == 0) //類似于select中找值為-1的元素 break; //跳出 for if (i == MAX_EVENTS) { printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS); break; //跳出do while(0) 不執行后續代碼 } int flag = 0; if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { //將cfd也設置為非阻塞 printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno)); break; } /* 給cfd設置一個 myevent_s 結構體, 回調函數 設置為 recvdata */ eventset(&g_events[i], cfd, recvdata, &g_events[i]); eventadd(g_efd, EPOLLIN, &g_events[i]); //將cfd添加到紅黑樹g_efd中,監聽讀事件 } while(0); printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i); return ; } void recvdata(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s *)arg; int len; len = recv(fd, ev->buf, sizeof(ev->buf), 0); //讀文件描述符, 數據存入myevent_s成員buf中 eventdel(g_efd, ev); //將該節點從紅黑樹上摘除 if (len > 0) { ev->len = len; ev->buf[len] = '\0'; //手動添加字符串結束標記 printf("C[%d]:%s\n", fd, ev->buf); eventset(ev, fd, senddata, ev); //設置該 fd 對應的回調函數為 senddata eventadd(g_efd, EPOLLOUT, ev); //將fd加入紅黑樹g_efd中,監聽其寫事件 } else if (len == 0) { close(ev->fd); /* ev-g_events 地址相減得到偏移元素位置 */ printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events); } else { close(ev->fd); printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } return; } void senddata(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s *)arg; int len; len = send(fd, ev->buf, ev->len, 0); //直接將數據 回寫給客戶端。未作處理 eventdel(g_efd, ev); //從紅黑樹g_efd中移除 if (len > 0) { printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf); eventset(ev, fd, recvdata, ev); //將該fd的 回調函數改為 recvdata eventadd(g_efd, EPOLLIN, ev); //從新添加到紅黑樹上, 設為監聽讀事件 } else { close(ev->fd); //關閉鏈接 printf("send[fd=%d] error %s\n", fd, strerror(errno)); } return ; } /*創建 socket, 初始化lfd */ void initlistensocket(int efd, short port) { struct sockaddr_in sin; int lfd = socket(AF_INET, SOCK_STREAM, 0); fcntl(lfd, F_SETFL, O_NONBLOCK); //將socket設為非阻塞 memset(&sin, 0, sizeof(sin)); //bzero(&sin, sizeof(sin)) sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(port); bind(lfd, (struct sockaddr *)&sin, sizeof(sin)); listen(lfd, 20); /* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); */ eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]); /* void eventadd(int efd, int events, struct myevent_s *ev) */ eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); return ; } int main(int argc, char *argv[]) { unsigned short port = SERV_PORT; if (argc == 2) port = atoi(argv[1]); //使用用戶指定端口.如未指定,用默認端口 g_efd = epoll_create(MAX_EVENTS+1); //創建紅黑樹,返回給全局 g_efd if (g_efd <= 0) printf("create efd in %s err %s\n", __func__, strerror(errno)); initlistensocket(g_efd, port); //初始化監聽socket struct epoll_event events[MAX_EVENTS+1]; //保存已經滿足就緒事件的文件描述符數組 printf("server running:port[%d]\n", port); int checkpos = 0, i; while (1) { /* 超時驗證,每次測試100個鏈接,不測試listenfd 當客戶端60秒內沒有和服務器通信,則關閉此客戶端鏈接 */ long now = time(NULL); //當前時間 for (i = 0; i < 100; i++, checkpos++) { //一次循環檢測100個。 使用checkpos控制檢測對象 if (checkpos == MAX_EVENTS) checkpos = 0; if (g_events[checkpos].status != 1) //不在紅黑樹 g_efd 上 continue; long duration = now - g_events[checkpos].last_active; //客戶端不活躍的世間 if (duration >= 60) { close(g_events[checkpos].fd); //關閉與該客戶端鏈接 printf("[fd=%d] timeout\n", g_events[checkpos].fd); eventdel(g_efd, &g_events[checkpos]); //將該客戶端 從紅黑樹 g_efd移除 } } /*監聽紅黑樹g_efd, 將滿足的事件的文件描述符加至events數組中, 1秒沒有事件滿足, 返回 0*/ int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000); if (nfd < 0) { printf("epoll_wait error, exit\n"); break; } for (i = 0; i < nfd; i++) { /*使用自定義結構體myevent_s類型指針, 接收 聯合體data的void *ptr成員*/ struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr; if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //讀就緒事件 ev->call_back(ev->fd, events[i].events, ev->arg); //lfd EPOLLIN } if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //寫就緒事件 ev->call_back(ev->fd, events[i].events, ev->arg); } } } /* 退出前釋放所有資源 */ return 0; }

    7.心跳包

    TCP保活機制

  • 心跳包
  • 由應用程序自己發送心跳包來檢測連接是否正常,大致的方法是:服務器在一個 Timer事件中定時向客戶端發送一個短小精悍的數據包,然后啟動一個低級別的線程,在該線程中不斷檢測客戶端的回應, 如果在一定時間內沒有收到客戶端的回應,即認為客戶端已經掉線;同樣,如果客戶端在一定時間內沒有收到服務器的心跳包,則認為連接不可用。

    心跳檢測機制
    在TCP網絡通信中,經常會出現客戶端和服務器之間的非正常斷開,需要實時檢測查詢鏈接狀態。常用的解決方法就是在程序中加入心跳機制。

    Heart-Beat線程
    這個是最常用的簡單方法。在接收和發送數據時個人設計一個守護進程(線程),定時發送Heart-Beat包,客戶端/服務器收到該小包后,立刻返回相應的包即可檢測對方是否實時在線。

    該方法的好處是通用,但缺點就是會改變現有的通訊協議!大家一般都是使用業務層心跳來處理,主要是靈活可控。

    UNIX網絡編程不推薦使用SO_KEEPALIVE來做心跳檢測,還是在業務層以心跳包做檢測比較好,也方便控制。

  • 乒乓包
  • 舉例:微信朋友圈有人評論,客戶端怎么知道有人評論?服務器怎么將評論發給客戶端的?

    微信客戶端每隔一段時間就向服務器詢問,是否有人評論?
    當服務器檢查到有人給評論時,服務器發送一個乒乓包給客戶端,該乒乓包中攜帶的數據是[此時有人評論的標志位]
    注:步驟1和2,服務器和客戶端不需要建立連接,只是發送簡單的乒乓包。
    當客戶端接收到服務器回復的帶有評論標志位的乒乓包后,才真正的去和服務器通過三次握手建立連接;建立連接后,服務器將評論的數據發送給客戶端。

    注意:乒乓包是攜帶很簡單的數據的包

  • 設置TCP屬性: SO_KEEPALIVE
  • 1.因為要考慮到一個服務器通常會連接多個客戶端,因此由用戶在應用層自己實現心跳包,代碼較多 且稍顯復雜,而利用TCP/IP協議層為內置的KeepAlive功能來實現心跳功能則簡單得多。

    2.不論是服務端還是客戶端,一方開啟KeepAlive功能后,就會自動在規定時間內向對方發送心跳包, 而另一方在收到心跳包后就會自動回復,以告訴對方我仍然在線。

    3.因為開啟KeepAlive功能需要消耗額外的寬帶和流量,所以TCP協議層默認并不開啟KeepAlive功 能,盡管這微不足道,但在按流量計費的環境下增加了費用,另一方面,KeepAlive設置不合理時可能會 因為短暫的網絡波動而斷開健康的TCP連接。并且,默認的KeepAlive超時需要7,200,000 MilliSeconds, 即2小時,探測次數為5次。對于很多服務端應用程序來說,2小時的空閑時間太長。

    4.因此,我們需要手工開啟KeepAlive功能并設置合理的KeepAlive參數。

    在《UNIX網絡編程第1卷》中也有詳細的闡述:
    SO_KEEPALIVE:保持連接,檢測對方主機是否崩潰,避免(服務器)永遠阻塞于TCP連接的輸入。設置該選項后,如果2小時內在此套接口的任一方向都沒有數據交換,TCP就自動給對方 發一個保持存活探測分節(keepalive
    probe)。這是一個對方必須響應的TCP分節.它會導致以下三種情況:

    對方接收一切正常:以期望的ACK響應。2小時后,TCP將發出另一個探測分節。 對方已崩潰且已重新啟動:以RST響應。套接口的待處理錯誤被置為ECONNRESET,套接口本身則被關閉。 對方無任何響應:源自berkeley的TCP發送另外8個探測分節,相隔75秒一個,試圖得到一個響應。在發出第一個探測分節11分鐘15秒后若仍無響應就放棄。套接口的待處理錯誤被置為ETIMEOUT,套接口本身則被關閉。如ICMP錯誤是“host unreachable(主機不可達)”,說明對方主機并沒有崩潰,但是不可達,這種情況下待處理錯誤被置為EHOSTUNREACH。

    根據上面的介紹可以知道對端以一種非優雅的方式斷開連接的時候,可以設置SO_KEEPALIVE屬性使得在2小時以后發現對方的TCP連接是否依然存在。如果不能接受如此之長的等待時間,從TCP-Keepalive-HOWTO上可以知道一共有兩種方式可以設置:

    修改內核關于網絡方面的配置參數 SOL_TCP字段的TCP_KEEPIDLE,TCP_KEEPINTVL,TCP_KEEPCNT三個選項 int keepIdle = 6; /*開始首次KeepAlive探測前的TCP空閉時間 */ int keepInterval = 5; /* 兩次KeepAlive探測間的時間間隔 */ int keepCount = 3; /* 判定斷開前的KeepAlive探測次數 */ Setsockopt(listenfd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle)); Setsockopt(listenfd, SOL_TCP,TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval)); Setsockopt(listenfd,SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount));

    8.線程池

    8.1 線程池代碼

    #ifndef __THREADPOOL_H_ #define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** @function threadpool_create* @descCreates a threadpool_t object.* @param thr_num thread num* @param max_thr_num max thread size* @param queue_max_size size of the queue.* @return a newly created thread pool or NULL*/ threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** @function threadpool_add* @desc add a new task in the queue of a thread pool* @param pool Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @return 0 if all goes well,else -1*/ int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** @function threadpool_destroy* @desc Stops and destroys a thread pool.* @param pool Thread pool to destroy.* @return 0 if destory success else -1*/ int threadpool_destroy(threadpool_t *pool);/*** @desc get the thread num* @pool pool threadpool* @return # of the thread*/ int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* @param pool threadpool* return # of the busy thread*/ int threadpool_busy_threadnum(threadpool_t *pool);#endif #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <assert.h> #include <stdio.h> #include <string.h> #include <signal.h> #include <errno.h> #include "threadpool.h"#define DEFAULT_TIME 10 /*10s檢測一次*/ #define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的線程到線程池*/ #define DEFAULT_THREAD_VARY 10 /*每次創建和銷毀線程的個數*/ #define true 1 #define false 0typedef struct {void *(*function)(void *); /* 函數指針,回調函數 */void *arg; /* 上面函數的參數 */ } threadpool_task_t; /* 各子線程任務結構體 *//* 描述線程池相關信息 */struct threadpool_t {pthread_mutex_t lock; /* 用于鎖住本結構體 */ pthread_mutex_t thread_counter; /* 記錄忙狀態線程個數de瑣 -- busy_thr_num */pthread_cond_t queue_not_full; /* 當任務隊列滿時,添加任務的線程阻塞,等待此條件變量 */pthread_cond_t queue_not_empty; /* 任務隊列里不為空時,通知等待任務的線程 */pthread_t *threads; /* 存放線程池中每個線程的tid。數組 */pthread_t adjust_tid; /* 存管理線程tid */threadpool_task_t *task_queue; /* 任務隊列(數組首地址) */int min_thr_num; /* 線程池最小線程數 */int max_thr_num; /* 線程池最大線程數 */int live_thr_num; /* 當前存活線程個數 */int busy_thr_num; /* 忙狀態線程個數 */int wait_exit_thr_num; /* 要銷毀的線程個數 */int queue_front; /* task_queue隊頭下標 */int queue_rear; /* task_queue隊尾下標 */int queue_size; /* task_queue隊中實際任務數 */int queue_max_size; /* task_queue隊列可容納任務數上限 */int shutdown; /* 標志位,線程池使用狀態,true或false */ };void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid); int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100); threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size) {int i;threadpool_t *pool = NULL; /* 線程池 結構體 */do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool fail");break; /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num; /* 活著的線程數 初值=最小線程數 */pool->wait_exit_thr_num = 0;pool->queue_size = 0; /* 有0個產品 */pool->queue_max_size = queue_max_size; /* 最大任務隊列數 */pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false; /* 不關閉線程池 *//* 根據最大線程上限數, 給工作線程數組開辟空間, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 給 任務隊列 開辟空間 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail");break;}/* 初始化互斥瑣、條件變量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail");break;}/* 啟動 min_thr_num 個 work thread */for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); /*pool指向當前線程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); /* 創建管理者線程 */return pool;} while (0);threadpool_free(pool); /* 前面代碼調用失敗時,釋放poll存儲空間 */return NULL; }/* 向線程池中 添加一個任務 */ //threadpool_add(thp, process, (void*)&num[i]); /* 向線程池中添加任務 process: 小寫---->大寫*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) {pthread_mutex_lock(&(pool->lock));/* ==為真,隊列已經滿, 調wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作線程 調用的回調函數 的參數arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任務到任務隊列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 隊尾指針移動, 模擬環形 */pool->queue_size++;/*添加完任務后,隊列不為空,喚醒線程池中 等待處理任務的線程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0; }/* 線程池中各個工作線程 */ void *threadpool_thread(void *threadpool) {threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*剛創建出線程,等待任務隊列里有任務,否則阻塞等待任務隊列里有任務后再喚醒接收任務*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 說明沒有任務,調 wait 阻塞在條件變量上, 若有任務,跳過該while*/while ((pool->queue_size == 0) && (!pool->shutdown)) { printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));/*清除指定數目的空閑線程,如果要結束的線程個數大于0,結束線程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果線程池里線程個數大于最小值時可以結束當前線程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}}/*如果指定了true,要關閉線程池里的每個線程,自行退出處理---銷毀線程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL); /* 線程自行結束 */}/*從任務隊列里獲取任務, 是一個出隊操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出隊,模擬環形隊列 */pool->queue_size--;/*通知可以有新的任務添加進來*/pthread_cond_broadcast(&(pool->queue_not_full));/*任務取出后,立即將 線程池瑣 釋放*/pthread_mutex_unlock(&(pool->lock));/*執行任務*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter)); /*忙狀態線程數變量瑣*/pool->busy_thr_num++; /*忙狀態線程數+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg); /*執行回調函數任務*///task.function(task.arg); /*執行回調函數任務*//*任務結束處理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--; /*處理掉一個任務,忙狀態數線程數-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL); }/* 管理線程 */ void *adjust_thread(void *threadpool) {int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME); /*定時 對線程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size; /* 關注 任務數 */int live_thr_num = pool->live_thr_num; /* 存活 線程數 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num; /* 忙著的線程數 */pthread_mutex_unlock(&(pool->thread_counter));/* 創建新線程 算法: 任務數大于最小線程池個數, 且存活的線程數少于最大線程個數時 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock)); int add = 0;/*一次增加 DEFAULT_THREAD 個線程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 銷毀多余的空閑線程 算法:忙線程X2 小于 存活的線程數 且 存活的線程數 大于 最小線程數時*/if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {/* 一次銷毀DEFAULT_THREAD個線程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要銷毀的線程數 設置為10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知處在空閑狀態的線程, 他們會自行終止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL; }int threadpool_destroy(threadpool_t *pool) {int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先銷毀管理線程*/pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空閑線程*/pthread_cond_broadcast(&(pool->queue_not_empty));}for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}threadpool_free(pool);return 0; }int threadpool_free(threadpool_t *pool) {if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0; }int threadpool_all_threadnum(threadpool_t *pool) {int all_threadnum = -1; // 總線程數pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num; // 存活線程數pthread_mutex_unlock(&(pool->lock));return all_threadnum; }int threadpool_busy_threadnum(threadpool_t *pool) {int busy_threadnum = -1; // 忙線程數pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum; }int is_thread_alive(pthread_t tid) {int kill_rc = pthread_kill(tid, 0); //發0號信號,測試線程是否存活if (kill_rc == ESRCH) {return false;}return true; }/*測試*/ #if 1/* 線程池中的線程,模擬處理業務 */ void *process(void *arg) {printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);sleep(1); //模擬 小---大寫printf("task %d is end\n",(int)arg);return NULL; }int main(void) {/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100); /*創建線程池,池里最小3個線程,最大100,隊列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i] = i;printf("add task %d\n",i);/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */threadpool_add(thp, process, (void*)&num[i]); /* 向線程池中添加任務 */}sleep(10); /* 等子線程完成任務 */threadpool_destroy(thp);return 0; }#endif

    8.2 請問怎么實現線程池

    參考回答:

  • 設置一個生產者消費者隊列,作為臨界資源
  • 初始化n個線程,并讓其運行起來,加鎖去隊列取任務運行
  • 當任務隊列為空的時候,所有線程阻塞
  • 當生產者隊列來了一個任務后,先對隊列加鎖,把任務掛在到隊列上,然后使用條件變量去通知阻塞中的一個線程
  • 總結

    以上是生活随笔為你收集整理的linux——select、poll、epoll的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。