Linux TCP server系列(4)-浅谈listen与大并发TCP连接
背景:
???服務器在調用listen和accept后,就會阻塞在accept函數上,accpet函數返回后循環調用accept函數等待客戶的TCP連接。如果這時候又大量的用戶并發發起connect連接,那么在listen有隊列上限(最大可接受TCP的連接數)的情況下,有多少個connect會成功了。試驗證明,當連接數遠遠高于listen的可連接數上限時,客戶端的大部分TCP請求會被拋棄,只有當listen監聽隊列空閑或者放棄某個連接時,才可以接收新的連接,那么我們應該如何來避免這種情況出現?
?
分析:
(一)客戶端
客戶端運行初期完成所設定的一定量的socket創建和相應的處理線程的創建,然后使用條件變量來完成線程同步,直到最后一個線程創建完成,才向所有線程發出廣播通知,讓所有線程并發調用connect,連接成功則關閉連接,失敗則返回,如下代碼所示。
socket創建和線程創建:
????????int testCount=300;????????//并發用戶數
/*
每個進程需要自己獨立的棧空間,linux下默認棧大小是10M,在32位的機子上一個進程需要4G的內存空間,去掉自己的棧空間全局程序段空間,一般只有3G內存可以用,創建線程時就需要從這3G的空間中分配10M出來,所以最多可以分配300個線程。當然這里還可以使用多個進程,每個進程300個線程的方式來進一步擴大并發量。
*/
????????int sockfd[testCount];
????????pthread_t ntid[testCount];
?
????????bzero(&servaddr,sizeof(servaddr));
????????servaddr.sin_family=AF_INET;
????????servaddr.sin_port=htons(SERVER_PORT);
????????inet_pton(AF_INET,argv[1],&servaddr.sin_addr);
?
?????int testCaseIndex=0;
?????for(testCaseIndex=0;testCaseIndex<testCount;testCaseIndex++)
?????{
????????sockfd[testCaseIndex]=socket(AF_INET,SOCK_STREAM,0);
?????????????????????//為每個并發客戶端創建一個socket
????????if(sockfd[testCaseIndex]==-1)
????????{
???????????printf("socket established error: %s\n",(char*)strerror(errno));
???????????return -1;
????????}
????????if( pthread_create(&ntid[testCaseIndex],NULL,handleFun,&sockfd[testCaseIndex])!=0)
?{
????????????????printf("create thread error :%s\n",strerror(errno));
????????????????return -1;
?}
//為每個并發客戶端創建一個線程來執行connect
}
?
?????printf("%d client has initiated\n",testCaseIndex);
?
???并發客戶端的線程實現:線程阻塞在條件變量上(只有條件滿足了并且發起喚醒動作,線程才開始執行)。
????????int sockfd=*((int*)arg);
????????{
????????????????pthread_cond_wait(&cond,&mut);
?????????????????????????????????????//在條件變量上等待條件滿足!
//阻塞返回后立即解鎖,防止互斥量加鎖帶來的阻塞
pthread_mutex_unlock(&mut);
int conRes=0;
????????????????conRes=connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr));
?????????????????????????????????????//線程執行connect連接,每個線程在接到喚醒信號后,才可以執行該語句,來模擬多個線程的并發調用。
????????????????if(conRes==-1)
????????????????{
??????????????????printf("connect error: %s\n",strerror(errno));
??????????????????return 0;
????????????????}
????????}
?
???當條件滿足時,喚醒阻塞在條件變量上的線程:
???while(1)
{
sleep(2);
pthread_cond_broadcast(&cond);??//在所有線程創建完成后才進行喚醒。
}
?
綜上,客戶端模擬并發過程中沒有存在不同步的情況導致上述性能問題。(注意,在廣播的時候,會出現廣播丟失的情況,所以需要多次執行廣播操作才會使得所有線程執行任務,所以某種程度上這里并不能模擬完完全全的并發)
?
???(二)通信中介
客戶端和服務器之間的連接是在同一臺機器上,使用Socket方式通信的話會經過127.0.0.1的回環線路,不會有網卡等硬件資源的訪問性能消耗,所以不存在網絡通信時延等問題。
?
???(三)服務器?
性能問題主要發生在服務器,可能是以下幾部分造成:
(1)服務器的監聽隊列?listen(listenfd,xxx),參數2指定隊列內所能容納的元素的最大值,當來不及從隊列中移除元素時(調用accept移除或者TCP自動放棄)就會造成隊列滿而使得一些請求丟失。
解決辦法:
a)增大隊列容量是一種辦法,但是注意等待隊列太會帶來效率的性能缺陷,而且listen函數對最大隊列容量有一個上限,大小為SOMAXCONN,當然必要時刻我們可以修改這個常量的大小。
??b)直接修改listen及相關函數的實現(比較麻煩,不建議),可以將listen所維護的隊列修改為linklist,支持隊列的動態增長。
?
(2)accept處理速度太慢,導致阻塞過長時間,使得隊列無法及時清空已經完成3次連接的socket。也就是任意兩個accept之間的時間間隔是關鍵因素(這里實驗了將accept后面的也刪了,那么10個可以達到153的數目),如下面代碼所示(listen之后在循環調用accept來將已完成3次握手的連接從listen所維護的隊列中移除。)
listen(listenfd,10);
for(;;)
{
??????chilen=sizeof(chiaddr);
connfd=accept(listenfd,(struct sockaddr*)&chiaddr,&chilen);
//其他操作
}
?
解決辦法:
a?兩個accept之間盡量不要又多余的操作,使得accept返回后可以立刻執行下一個accept。經過試驗,該方法可以較好的提高性能,減少connect的丟失數。
?
b
???本質上這是一種“生產者-消費者”的模式,listen維護“已連接”和“待連接”的隊列,當客戶發出連接請求并最終連接成功時,在“已連接”隊列中會生產一個“product”,然后這時候希望“消費者”也就是accept函數可以快速的從隊列中消費這個“product”,這樣就不會因為隊列滿而導致無法繼續生產(也就是客戶的connect會失效,導致上面隊列長度10,300個并發connect帶來的67個存活的情況),但是在本例情況下,我們無法控制生產者的瘋狂生產行為,因為連接是客戶發起的,這是不可預知的,所以我們如果想不修改listen函數來提高性能的話,那么就只能讓消費者更加快的把產品消耗掉,使得listen隊列可以容納更多的新生產的產品,而第一種加快消費者消耗產品的方法就是a,第二種加快消費者消耗產品的方法是我們可以增加多幾個消費者來幫忙消耗,但是這幾個消費者間也要好好協調。第三種方法是讓消費者把產品先移走為listen的隊列騰出空間,再自行處理產品,如d所示。
使用多線程策略,每個線程獨立調用accept。(花了一個上午的時候正glib的線程池,一直用不了,其他都正常,就是線程不啟動,不知道會不會是bug)
下面自己手動使用簡單的多線程來測試
線程數????????????????隊列容量???????????????????并發用戶數??????????????通過數
1????????????????????????????10?????????????????????????????????300??????????????????????????????65
2????????????????????????????10?????????????????????????????????300??????????????????????????????142
3????????????????????????????10?????????????????????????????????300??????????????????????????????122
6????????????????????????????10?????????????????????????????????300??????????????????????????????120
10??????????????????????????10?????????????????????????????????300??????????????????????????????196
?20??????????????????????????10?????????????????????????????????300??????????????????????????????174
可以看到線程間也會出現競爭現象,并不是說一味增大并發線程數就可以提高并發數的。
?
c
修改listen和accept的實現方式,讓listen所維護的隊列可以智能的判斷擁擠情況,從而對accept的調用做出調度,在隊列繁忙時,使用多線程的方式讓多個accept來移除隊列中的元素,在隊列空閑時,可以適當的調整accept的處理線程數,這也是一種線程池的實現。
?
?
??d
修改accept的實現方式,在accept中實現一個“消費緩沖區”,為的是及時將listen中的隊列元素移動到該緩沖區中,再由其他處理線程或者進程來對緩沖區中的元素進行處理,這個方法盡量讓listen隊列中已連接的socket可以被移除。
最后這個方法比較上述方法來說是比較好的一種,但是還是需要修改已有的代碼。
?server.cpp
?#include<sys/types.h>
?#include<sys/socket.h>
?#include<strings.h>
?#include<arpa/inet.h>
?#include<unistd.h>
?#include<stdlib.h>
?#include<stdio.h>
?#include<string.h>
?#include<errno.h>
?#include<signal.h>
?#include<sys/wait.h>
?#include<pthread.h>
?
?#define LISTEN_PORT 84
?void str_echo(int sockfd) ? ? ? ? ? ?// 服務器收到客戶端的消息后的響應
?{
? ? ?ssize_t n;
? ? ?char line[512];
?
? ? ?printf("ready to read\n");
?
? ? ?while( (n=read(sockfd,line,512))>0 )
? ? ?{
?
? ? ? ? ?line[n]='\0';
? ? ? ? ?printf("Client: %s\n",line);
?
? ? ? ? ?char msgBack[512];
? ? ? ? ?snprintf(msgBack,sizeof(msgBack),"recv: %s\n",line);
? ? ? ? ?write(sockfd,msgBack,strlen(msgBack));
? ? ? ? ?bzero(&line,sizeof(line));
?
? ? ?}
?
? ? ?printf("end read\n");
?
?}
?
?void sig_child(int signo) ? ? ? ? //父進程對子進程結束的信號處理
?{
? ? ?pid_t pid;
? ? ?int ? stat;
?
? ? ?while( (pid=waitpid(-1,&stat,WNOHANG))>0)
? ? ?printf("child %d terminated\n",pid);
?
? ? ?return;
?}
?
?void* acceptThreadFun(void *arg)
?{
? ? ?int listenfd=*((int*)arg);
? ? ?struct sockaddr_in chiaddr;
? ? ?socklen_t chilen;
? ? ?int connfd;
? ? ?for(;;)
? ? ?{
? ? ? ? ?chilen=sizeof(chiaddr);
? ? ? ? ?
? ? ? ? ?connfd=accept(listenfd, (struct sockaddr*)&chiaddr, &chilen);
? ? ? ? ?//accept會總listen所維護的已連接隊列中pop一個出來
? ? ? ? ?//阻塞在accept,直到三次握手成功了才返回
? ? ? ? ?if(connfd==-1)
? ? ? ? ?printf("accept client error: %s\n",strerror(errno));
? ? ? ? ?else ? ? ? ? ? ?
? ? ? ? ?printf("client connected :%d\n",1);
?
? ? ? ? ?close(connfd);
? ? ? ? ?
? ? ?}
?}
?int successCount=0;
?int main(int argc, char **argv)
?{
? ? ?int listenfd, connfd;
? ? ?pid_t childpid;
? ? ?socklen_t chilen;
?
? ? ?struct sockaddr_in chiaddr,servaddr;
?
? ? ?listenfd=socket(AF_INET,SOCK_STREAM,0);
? ? ?if(listenfd==-1)
? ? ?{
? ? ? ? ?printf("socket established error: %s\n",(char*)strerror(errno)); ? ? ? ? ? ? ? ? //后面需要采用日志到方式來記錄
? ? ? ? ?//socket創建失敗后可以讓用戶選擇重新連接
? ? ?}
?
? ? ?bzero(&servaddr,sizeof(servaddr));
? ? ?servaddr.sin_family=AF_INET;
? ? ?servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
? ? ?servaddr.sin_port=htons(LISTEN_PORT);
?
? ? ?int bindc=bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr));
? ? ?if(bindc==-1)
? ? ?{
? ? ? ? ?printf("bind error: %s\n",strerror(errno));
? ? ? ? ?//綁定失敗,錯誤提示
? ? ?}
?
? ? ?listen(listenfd,10); ? ? ? ? ? ? ? //limit是SOMAXCONN
?
? ? ?signal(SIGCHLD,sig_child);
?
? ? ?pthread_t acceptThread[20];
? ? ?int threadCount=0;
? ? ?for(threadCount;threadCount<20;threadCount++)
? ? ?{
? ? ? ? ?err=pthread_create(&acceptThread[threadCount],NULL,acceptThreadFun,&listenfd); ? ?//創建線程來幫忙accept
? ? ? ? ?if(err!=0)
? ? ? ? ?printf("can not create thread : %s\n",strerror(errno));
? ? ?}
?
? ? ?for(;;)
? ? ?{
? ? ? ? ?chilen=sizeof(chiaddr);
? ? ? ? ?
? ? ? ? ?connfd=accept(listenfd,(struct sockaddr*)&chiaddr,&chilen);
? ? ? ? ?//accept會總listen所維護的已連接隊列中pop一個出來
? ? ? ? ?//阻塞在accept,直到三次握手成功了才返回
? ? ? ? ?if(connfd==-1)
? ? ? ? ?printf("accept client error: %s\n",strerror(errno));
? ? ? ? ?else ? ? ? ?
? ? ? ? ?printf("client connected :%d\n",++successCount);
? ? ? ? ?
? ? ? ? ?close(connfd);
? ? ? ? ?
? ? ?}
?
?}
#include<sys/types.h> 2 #include<stdlib.h> 3 #include<stdio.h> 4 #include<unistd.h> 5 #include<sys/socket.h> 6 #include<strings.h> 7 #include<string.h> 8 #include<arpa/inet.h> 9 #include<errno.h> 10 #include<stdio.h> 11 #include<pthread.h> 12 #include<algorithm> 13 14 #include<exception> 15 16 #define SERVER_PORT 84 17 18 pthread_mutex_t mut=PTHREAD_MUTEX_INITIALIZER; //互斥量 19 pthread_cond_t cond=PTHREAD_COND_INITIALIZER; //條件變量 20 21 int cond_value=1; 22 struct sockaddr_in servaddr; 23 24 void *handleFun(void *arg) 25 { 26 int sockfd=*((int*)arg); 27 28 { 29 pthread_cond_wait(&cond,&mut); 30 pthread_mutex_unlock(&mut); 31 //信號會丟失,使得這里永遠醒不了,所以需要重發信號. 32 33 int conRes=0; 34 conRes=connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); 35 36 printf("%d\n",sockfd); //如果不加connect,那么這里顯示正確 37 if(conRes==-1) 38 { 39 printf("connect error: %s\n",strerror(errno)); 40 return 0; 41 } 42 43 } 44 45 } 46 47 void *handleFun2(void *arg) 48 { 49 *((int*)arg)=2; 50 pthread_cond_broadcast(&cond); 51 } 52 53 int main(int argc, char **argv) 54 { 55 int testCount=300; 56 int sockfd[testCount]; 57 pthread_t ntid[testCount]; 58 59 //tcpcli <ipaddress> <data> 60 if(argc!=3) 61 return -1; 62 63 bzero(&servaddr,sizeof(servaddr)); 64 servaddr.sin_family=AF_INET; 65 servaddr.sin_port=htons(SERVER_PORT); 66 inet_pton(AF_INET,argv[1],&servaddr.sin_addr); 67 68 int testCaseIndex=0; 69 for(testCaseIndex=0;testCaseIndex<testCount;testCaseIndex++) 70 { 71 sockfd[testCaseIndex]=socket(AF_INET,SOCK_STREAM,0); 72 //為每個客戶端線程創建socket 73 if(sockfd[testCaseIndex]==-1) 74 { 75 printf("socket established error: %s\n",(char*)strerror(errno)); 76 77 return -1; 78 } 79 80 if(pthread_create(&ntid[testCaseIndex],NULL,handleFun,&sockfd[testCaseIndex])!=0) 81 //客戶端線程 82 { 83 printf("create thread error :%s\n",strerror(errno)); 84 return -1; 85 } 86 } 87 88 printf("%d client has initiated\n",testCaseIndex); 89 90 cond_value=2; 91 while(1) 92 { 93 sleep(2); 94 pthread_cond_broadcast(&cond); //條件滿足后發信號通知所有阻塞在條件變量上的線程! 95 } 96 exit(0); 97 }
總結
以上是生活随笔為你收集整理的Linux TCP server系列(4)-浅谈listen与大并发TCP连接的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 郭明錤:富士康获iPhone 14后置超
- 下一篇: Linux NULL定义