本文是本身學習經驗總結,有不正確的地方,請批評指正。html
總結一下這一段時間來,有關網絡編程的學習。我是從csapp的最後章節的Tiny HTTP服務器開始,以它爲基礎,改用不一樣的方式實現併發,包括進程、線程、線程池、I/O多路複用。全部代碼見地址:https://github.com/xibaohe/tiny_serverlinux
關於進程和線程的網絡編程模型,在UNP卷1的第30章,有詳細的介紹。我這裏,在Tiny基礎上,實現瞭如下幾種:git
其中,fdbuffer是指主線程accept獲得已鏈接描述符後,存放進fdbuffer緩衝區,其餘線程再去處理。github
Signal(SIGPIPE,SIG_IGN);//忽略SIGPIPE,見UNP 5.13 Signal(SIGCHLD,sigchld_hander);//回收子進程 listenfd = Open_listenfd(port);//見csapp相關章節 while (1) { connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen); if(Fork() == 0){//the children process Close(listenfd);//子進程關閉監聽描述符 doit(connfd);//子進程處理fd Close(connfd); exit(0); } Close(connfd); } void sigchld_hander(int sig) { while(waitpid(-1,0,WNOHANG) > 0) { if(verbose) printf("a child process gone!!\n"); } return; }
上述代碼是最簡單的併發模型,每個鏈接,都會新fork一個進程去處理,顯然這種方式併發程度低。對於初學者,仍是有幾個須要注意的地方。編程
注:doit函數來自於csapp的Tiny服務器,我添加了對HEAD、POST方法的簡單支持,詳細請參考所有源碼。segmentfault
while (1) { connfd = Malloc(sizeof(int));//avoid race condition *connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen); Pthread_create(&tid,NULL,&thread,connfd); } void *thread(void *vargp) { int connfd = *((int *)vargp); Pthread_detach(pthread_self()); Free(vargp); doit(connfd); Close(connfd); return NULL; }
多線程與多進程基本一致,須要注意的地方:數組
爲每個客戶都建立一個新的線程,顯然不是高效的作法,咱們能夠預先建立線程,主線程和其它線程經過一個緩衝區傳遞描述符,或者能夠每一個線程本身accept。服務器
int i; for(i=0;i<NTHREADS;i++)/*create worker threads*/ Pthread_create(&tid,NULL,thread,NULL); while (1) { connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen); sbuf_insert(&sbuf,connfd); } void *thread(void *vargp) { Pthread_detach(pthread_self()); while(1) { int connfd = sbuf_remove(&sbuf); doit(connfd); Close(connfd); } }
首先建立固定數量的線程,主線程將已鏈接描述符放如緩衝區中,其它線程再從緩衝區中取出fd,並處理。這是一個典型的生產者和消費者問題,在這個版本中,採用csapp中的信號量來解決同步問題,緩衝區同步的實現見csapp相關章節。網絡
void sbuf_insert(sbuf_t *sp, int item) { /*write to the buffer*/ Pthead_mutex_lock(&sp->buf_mutex); if(sp->nslots == 0) { Pthread_mutex_unlock(&sp->buf_mutex); return ; } sp->buf[(++sp->rear)%(sp->n)] = item; sp->nslots--; Pthread_mutex_unlock(&sp->buf_mutex); int dosignal = 0; Pthread_mutex_lock(&sp->nready_mutex); if(sp->nready == 0) dosignal = 1; sp->nready++; Pthread_mutex_unlock(&sp->nready_mutex) if(dosignal) Pthread_cond_signal(&sp->cond); } int sbuf_remove(sbuf_t *sp) { int item; Pthread_mutex_lock(&sp->nready_mutex); while(sp->nready == 0) Pthread_cond_wait(&sp->cond,&sp->nready_mutex); item = sp->buf[(++sp->front) % (sp->n)]; Pthread_mutex_unlock(&sp->nready_mutex); if(item == 0)fprintf(stderr, "error!!!!fd item%d\n", item); return item; }
這個版本,主函數與版本3一致,緩衝區的同步我改用了互斥鎖和條件變量。這裏貼出sbuf insert和remove操做的實現。其中sbuf_t結構體中,nready和nslots分別指準備好待消費的描述符和緩衝區剩餘的空位。多線程
在這裏,爲何在須要兩個同步變量nready和nslots對應兩個互斥鎖?任意使用其中一個,當nslots小於n,或者nready大於零的時候,喚醒等待在條件變量上的線程,這樣只需用一個同步變量,詳見源碼。我本身測試了一下,兩種方式效率是差很少的。
個人理解是,當使用兩個同步變量時,生產者在放入產品的時候,不阻塞消費者消費其餘產品,由於沒有對nready加鎖,因此若是第一個階段(放入產品)耗時比較多時,用兩個同步變量更合適一些。而這裏,放入產品並非耗時操做,所以效率差很少。
還有一個須要注意的地方是,我把Pthread_cond_signal放到了mutex外面,是爲了不上鎖衝突,見UNP卷2 7.5。
int i; for(i=0;i<NTHREADS;i++)/*create worker threads*/ Pthread_create(&tid,NULL,thread,NULL); while (1) { pause(); } } /* $end tinymain */ void *thread(void *vargp) { Pthread_detach(pthread_self()); int connfd; struct sockaddr_in clientaddr; int clientlen = sizeof(clientaddr); while(1) { Pthread_mutex_lock(&thead_lock); connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen); Pthread_mutex_unlock(&thead_lock); doit(connfd); Close(connfd); } }
這個版本是預先建立固定數量的,可是由線程各自去accept, 對accept上鎖保護。這種方式顯然代碼實現上容易得多,在效率上,因爲只用到了Pthread_mutex_lock這一種系統調用,效率應該要稍微好一點。
以上就是我實現過的基於進程、線程,主要是線程的併發模式。進程另外還有幾種模式,我認爲那幾種模式和線程基本一致,代碼寫起來也比較相似。實際上,在Linux下,線程其實就是資源共享的進程,都有本身的task_struct結構(《Linux內核設計與實現》)。
在這一部分,主要介紹linux下面,select、poll和epoll的用法和示例。一共下面三個程序:
typedef struct { int maxfd; fd_set read_set; fd_set ready_set; int nready; int maxi; int clientfd[FD_SETSIZE]; } pool; static pool client_pool; init_pool(listenfd,&client_pool); while(1) { client_pool.ready_set = client_pool.read_set; while((client_pool.nready = Select(client_pool.maxfd+1,&client_pool.ready_set,NULL,NULL,NULL)) < 0) { if(errno == EINTR) printf("got a signal restart pselect!!! \n"); } /*mask SIGCHLD!!!!! but some signal will be abondoned */ //client_pool.nready = Pselect(client_pool.maxfd+1,&client_pool.ready_set,NULL,NULL,NULL,&sig_chld); if(FD_ISSET(listenfd,&client_pool.ready_set)){ connfd = Accept(listenfd,(SA *)&clientaddr,&clientlen); add_client(connfd,&client_pool); } check_clients(&client_pool); }
第一個版本,來自於csapp。在pool結構體中clientfd保存全部已鏈接的fd,read_set是須要select去檢測的fd集,ready是select返回已經準備好的fd集。須要注意的地方有:
再看一下這個client_pool的實現:
void init_pool(int listenfd, pool *p) { int i; p->maxi = -1; for (i = 0; i < FD_SETSIZE; i++) p->clientfd[i] = -1; p->maxfd = listenfd; FD_ZERO(&(p->read_set)); FD_SET(listenfd, &p->read_set); } void add_client(int connfd, pool *p) { int i; p->nready--; for (i = 0; i < FD_SETSIZE; i++) { if (p->clientfd[i] < 0) { p->clientfd[i] = connfd; FD_SET(connfd, &p->read_set); if (connfd > p->maxfd) p->maxfd = connfd; if (i > p->maxi) p->maxi = i; break; } } if (i == FD_SETSIZE) app_error("add_client error: Too many clients"); } void check_clients(pool *p) { int i, connfd, n; for (i = 0; (i <= p->maxi) && (p->nready > 0); i++) { connfd = p->clientfd[i]; if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) { p->nready--; doit(connfd); Close(connfd); FD_CLR(connfd, &p->read_set); p->clientfd[i] = -1; } } }
init_pool初始化,最開始的fd_set裏面只有listenfd,add_client將已鏈接描述符添加到clientfd,並將read_set置位。check_clients是循環依次檢查是哪個已鏈接fd,處理完畢後,將fd從clientfd和read_set中移除。
從上面的過程當中,能夠看出select有幾個明顯的缺點:
struct pollfd{ int fd; //fd to check short events; //events of interest on fd short revents; //events that occurred on fd } typedef struct { struct pollfd client[OPEN_MAX]; int maxi; int nready; } pool; static pool client_pool; init_pool(listenfd, &client_pool); while (1) { while ((client_pool.nready = Poll(client_pool.client, client_pool.maxi + 1, INFTIM)) < 0) { if (errno == EINTR) printf("got a signal restart poll!!! \n"); } if (client_pool.client[0].revents & POLLRDNORM) { connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen); add_client(connfd, &client_pool); } check_clients(&client_pool); }
poll的代碼,基本與select模式一致。poll不一樣的地方在於它使用pollfd結構來表示fdset,而不是位圖。沒有大小限制,使用起來更爲方便。events和revents分別表示須要檢測的事件和發生的事件。poll傳入的client指針,底層應該是全部的pollfd構成的一個鏈表。
poll和select的缺點同樣,都須要拷貝和輪詢,隨着fd數量的增大,效率都會大大下降。
typedef struct request_buffer{ int fd;/*fd for the connection */ int epfd;/* fd for epoll */ char buf[MAXBUF]; /*the buffer for the current request*/ size_t pos,last; }request_b struct epoll_event event; // event to register event.data.ptr = (void *)request; event.events = EPOLLIN | EPOLLET; Epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event); while (1) { int n; while ((n = Epoll_wait(epfd, events, MAXEVENTS, -1)) < 0) { if (errno == EINTR) printf("got a signal restart\n"); } for (int i = 0; i < n; i++) { request_b *rb = (request_b *)events[i].data.ptr; // int fd = rb->fd; if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { fprintf(stderr, "epoll error fd %d", rb->fd); Close(rb->fd); continue; } else if (rb->fd == listenfd) { /* the new connection incoming */ int infd; while (1) { infd = accept(listenfd, (SA *)&clientaddr, &clientlen); if (infd < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { /*we have processed all incoming connections*/ break; } else { unix_error("accept error!"); break; } } make_socket_non_blocking(infd); if (verbose) printf("the new connection fd :%d\n", infd); request_b *request = (request_b *)Malloc(sizeof(request_b)); request_init(request, infd, epfd); event.data.ptr = (void *)request; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; Epoll_ctl(epfd, EPOLL_CTL_ADD, infd, &event); } } else { if (verbose) printf("new data from fd %d\n", rb->fd); doit_nonblock((request_b *)events[i].data.ptr); /* where to close the fd and release the requst!!! */ } } }
這段代碼是典型的non-blocking IO + IO multiplexing的模式,因爲是non-blocking IO,就不能用前面csapp提供的Rio包,由於要處理數據分包到達的狀況。能夠注意到結構體request_b就是是用來記錄當前fd對應的請求狀態,buf是當前請求的緩衝區。doit_nonblock和前面的doit函數有了很大的變化,這裏我就不展開了,能夠看個人代碼,待完善。
epoll有ET和LT兩種模式,詳細定義見man手冊。ET模式可以減小事件觸發的次數,可是代碼複雜度會增長,IO操做必需要等到EAGAIN,容易漏掉事件。而LT模式,事件觸發的次數多一些,代碼實現上要簡單一點,不容易出錯。目前沒有明確的結論哪一種模式更高效,應該是看具體的場景。我這裏使用了ET模式,在個人doit_nonblock函數裏面,對於請求結束的判斷還有錯誤,不可以正確判斷一個完整的HTTP請求。
前面說到select的缺點,epoll是怎麼解決的呢?
所以,當存在有大量的鏈接,可是活躍鏈接數量較少的狀況下,epoll是十分高效的。更多的細節,能夠參考內核源碼
在實際使用中,確定不會單純的用上面的某一種模式,而是多進程+IO multiplexing 或者 多線程 + IO multiplexing。後面再結合具體的例子,我會再繼續研究。本來,我寫了一個小的測試程序,可是發現個人測試方法不是十分合理,沒有太大意義,就沒有放出來了,有興趣的能夠看一看源碼裏面。
我是從讀csapp後半部分開始,集中學習了一下網絡編程的內容,這些內容十分基礎,也許會對初學者有一些幫助。後續,我還會繼續深刻,準備閱讀陳碩的muduo,本身再動手寫一些代碼。
參考連接: