基本網絡編程範式

本文是本身學習經驗總結,有不正確的地方,請批評指正。html

總結一下這一段時間來,有關網絡編程的學習。我是從csapp的最後章節的Tiny HTTP服務器開始,以它爲基礎,改用不一樣的方式實現併發,包括進程、線程、線程池、I/O多路複用。全部代碼見地址:https://github.com/xibaohe/tiny_serverlinux

1、基於進程、線程的併發

關於進程和線程的網絡編程模型,在UNP卷1的第30章,有詳細的介紹。我這裏,在Tiny基礎上,實現瞭如下幾種:git

  • tiny_process:每一個鏈接開一個進程
  • tiny_thread:每一個鏈接開一個線程
  • tiny_thread_pre:事先建立線程池,由主線程同一accept,fdbuffer採用信號量同步(同csapp第12章)
  • tiny_thread_mutex:同上,fdbuffer採用互斥鎖和條件變量實現
  • tiny_thread_pre2:事先建立線程池,每一個線程各自accept。

其中,fdbuffer是指主線程accept獲得已鏈接描述符後,存放進fdbuffer緩衝區,其餘線程再去處理。github

多進程

Signal(SIGPIPE,SIG_IGN);//忽略SIGPIPE,見UNP 5.13
  Signal(SIGCHLD,sigchld_hander);//回收子進程
  listenfd = Open_listenfd(port);//見csapp相關章節
  while (1) {
    connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen);
    if(Fork() == 0){//the children process
      Close(listenfd);//子進程關閉監聽描述符
      doit(connfd);//子進程處理fd
      Close(connfd);
      exit(0);
    }
    Close(connfd);               
  }
void sigchld_hander(int sig)
{
  while(waitpid(-1,0,WNOHANG) > 0)
  {
    if(verbose)
      printf("a child process gone!!\n");
  }
  return;
}

上述代碼是最簡單的併發模型,每個鏈接,都會新fork一個進程去處理,顯然這種方式併發程度低。對於初學者,仍是有幾個須要注意的地方。編程

  1. 信號處理,SIGPIPE(向已經關閉的fd寫數據)默認會終止進程,這裏忽略它。sigchld_hander用於函數回收子進程(注意信號不排隊問題)。
  2. 注意父子進程都須要關閉已鏈接描述符connfd,到客戶端的鏈接纔會最終關閉

注:doit函數來自於csapp的Tiny服務器,我添加了對HEAD、POST方法的簡單支持,詳細請參考所有源碼。segmentfault

多線程

while (1) {
   connfd = Malloc(sizeof(int));//avoid race condition
   *connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen);
   Pthread_create(&tid,NULL,&thread,connfd);             
}
void *thread(void *vargp)
{
  int connfd = *((int *)vargp);
  Pthread_detach(pthread_self());
  Free(vargp);
  doit(connfd);
  Close(connfd);
  return NULL;
}

多線程與多進程基本一致,須要注意的地方:數組

  1. 向線程傳遞connfd的race condition:若是咱們不申請內存,直接傳遞&connfd給進程,在線程從vargp獲取connfd時, connfd的值可能被主線程新accept的值替換了。
  2. 線程是可結合或者是分離的,區別在於分離式線程的存儲器資源在它本身終止的時候由系統自動釋放,而可結合線程須要其餘線程回收,此處的Pthread_detach是將當前線程分離。

預先建立線程

爲每個客戶都建立一個新的線程,顯然不是高效的作法,咱們能夠預先建立線程,主線程和其它線程經過一個緩衝區傳遞描述符,或者能夠每一個線程本身accept。服務器

信號量同步
int i;
 for(i=0;i<NTHREADS;i++)/*create worker threads*/
   Pthread_create(&tid,NULL,thread,NULL);
 while (1) {
    connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen);
    sbuf_insert(&sbuf,connfd);     
  }
void *thread(void *vargp)
{
  Pthread_detach(pthread_self());
  while(1)
  {
    int connfd = sbuf_remove(&sbuf);
    doit(connfd);
    Close(connfd);
  }
}

首先建立固定數量的線程,主線程將已鏈接描述符放如緩衝區中,其它線程再從緩衝區中取出fd,並處理。這是一個典型的生產者和消費者問題,在這個版本中,採用csapp中的信號量來解決同步問題,緩衝區同步的實現見csapp相關章節。網絡

互斥鎖和條件變量同步
void sbuf_insert(sbuf_t *sp, int item)
{
    /*write to the buffer*/
    Pthead_mutex_lock(&sp->buf_mutex);
    if(sp->nslots == 0)
    {
        Pthread_mutex_unlock(&sp->buf_mutex);
        return ;
    }
    sp->buf[(++sp->rear)%(sp->n)] = item;
    sp->nslots--;
    Pthread_mutex_unlock(&sp->buf_mutex);
    
    int dosignal = 0;
    Pthread_mutex_lock(&sp->nready_mutex);
    if(sp->nready == 0)
        dosignal = 1;
    sp->nready++;
    Pthread_mutex_unlock(&sp->nready_mutex)
    if(dosignal)
        Pthread_cond_signal(&sp->cond);
}
int sbuf_remove(sbuf_t *sp)
{
    int item;
    Pthread_mutex_lock(&sp->nready_mutex);
    while(sp->nready == 0)
        Pthread_cond_wait(&sp->cond,&sp->nready_mutex);
    item = sp->buf[(++sp->front) % (sp->n)];
    Pthread_mutex_unlock(&sp->nready_mutex);
    if(item == 0)fprintf(stderr, "error!!!!fd item%d\n", item);
    return item;
}

這個版本,主函數與版本3一致,緩衝區的同步我改用了互斥鎖和條件變量。這裏貼出sbuf insert和remove操做的實現。其中sbuf_t結構體中,nready和nslots分別指準備好待消費的描述符和緩衝區剩餘的空位。多線程

在這裏,爲何在須要兩個同步變量nready和nslots對應兩個互斥鎖?任意使用其中一個,當nslots小於n,或者nready大於零的時候,喚醒等待在條件變量上的線程,這樣只需用一個同步變量,詳見源碼。我本身測試了一下,兩種方式效率是差很少的。

個人理解是,當使用兩個同步變量時,生產者在放入產品的時候,不阻塞消費者消費其餘產品,由於沒有對nready加鎖,因此若是第一個階段(放入產品)耗時比較多時,用兩個同步變量更合適一些。而這裏,放入產品並非耗時操做,所以效率差很少。

還有一個須要注意的地方是,我把Pthread_cond_signal放到了mutex外面,是爲了不上鎖衝突,見UNP卷2 7.5。

線程各自accept
int i;
for(i=0;i<NTHREADS;i++)/*create worker threads*/
    Pthread_create(&tid,NULL,thread,NULL);
while (1) {
    pause();
  }
}
/* $end tinymain */  
void *thread(void *vargp)
{
  Pthread_detach(pthread_self());
  int connfd;
  struct sockaddr_in clientaddr;
  int clientlen = sizeof(clientaddr);

  while(1)
  {
    Pthread_mutex_lock(&thead_lock);
    connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen);
    Pthread_mutex_unlock(&thead_lock);
    doit(connfd);
    Close(connfd);
  }
}

這個版本是預先建立固定數量的,可是由線程各自去accept, 對accept上鎖保護。這種方式顯然代碼實現上容易得多,在效率上,因爲只用到了Pthread_mutex_lock這一種系統調用,效率應該要稍微好一點。

以上就是我實現過的基於進程、線程,主要是線程的併發模式。進程另外還有幾種模式,我認爲那幾種模式和線程基本一致,代碼寫起來也比較相似。實際上,在Linux下,線程其實就是資源共享的進程,都有本身的task_struct結構(《Linux內核設計與實現》)。

2、基於I/O多路複用的併發

在這一部分,主要介紹linux下面,select、poll和epoll的用法和示例。一共下面三個程序:

  • tiny_select
  • tiny_poll
  • tiny_epoll_nonblock 非阻塞I/O模式

select函數

typedef struct 
{
    int maxfd;
    fd_set read_set; 
    fd_set ready_set;
    int nready;
    int maxi;
    int clientfd[FD_SETSIZE];
} pool;
static pool client_pool;
init_pool(listenfd,&client_pool);
while(1)
{
    client_pool.ready_set = client_pool.read_set;
    while((client_pool.nready = Select(client_pool.maxfd+1,&client_pool.ready_set,NULL,NULL,NULL)) < 0)
    {
        if(errno == EINTR)
            printf("got a signal restart pselect!!! \n");
    }
    /*mask SIGCHLD!!!!!    but some signal will be abondoned
    */
    //client_pool.nready = Pselect(client_pool.maxfd+1,&client_pool.ready_set,NULL,NULL,NULL,&sig_chld);
    if(FD_ISSET(listenfd,&client_pool.ready_set)){
        connfd = Accept(listenfd,(SA *)&clientaddr,&clientlen);
        add_client(connfd,&client_pool);
    } 
    check_clients(&client_pool);
}

第一個版本,來自於csapp。在pool結構體中clientfd保存全部已鏈接的fd,read_set是須要select去檢測的fd集,ready是select返回已經準備好的fd集。須要注意的地方有:

  1. 在調用select函數的地方,用while的緣由是,子進程中斷的SIGCHLD的信號會致使select返回,須要手動重啓。最開始我用了Pselect來解決這個問題,但這樣會形成信號的丟失。
  2. 在每次調用select以前,須要對ready_set從新賦值,select返回以後,ready_set會被修改,在下次調用以前須要恢復。

再看一下這個client_pool的實現:

void init_pool(int listenfd, pool *p) {
  int i;
  p->maxi = -1;
  for (i = 0; i < FD_SETSIZE; i++) p->clientfd[i] = -1;

  p->maxfd = listenfd;
  FD_ZERO(&(p->read_set));
  FD_SET(listenfd, &p->read_set);
}

void add_client(int connfd, pool *p) {
  int i;
  p->nready--;
  for (i = 0; i < FD_SETSIZE; i++) {
    if (p->clientfd[i] < 0) {
      p->clientfd[i] = connfd;
      FD_SET(connfd, &p->read_set);

      if (connfd > p->maxfd) p->maxfd = connfd;
      if (i > p->maxi) p->maxi = i;   
      break;
    }
  }
  if (i == FD_SETSIZE) app_error("add_client error: Too many clients");
}

void check_clients(pool *p) {
  int i, connfd, n;

  for (i = 0; (i <= p->maxi) && (p->nready > 0); i++) {
    connfd = p->clientfd[i];
    if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) {
      p->nready--;
      doit(connfd);
      Close(connfd);
      FD_CLR(connfd, &p->read_set);
      p->clientfd[i] = -1;
    }
  }
}

init_pool初始化,最開始的fd_set裏面只有listenfd,add_client將已鏈接描述符添加到clientfd,並將read_set置位。check_clients是循環依次檢查是哪個已鏈接fd,處理完畢後,將fd從clientfd和read_set中移除。

從上面的過程當中,能夠看出select有幾個明顯的缺點:

  • 第一個是fd_set是固定大小的,最大爲FD_SETSIZE(修改起來比較麻煩),其實就是一個數組,這是極大的一個限制。
  • 第二個是咱們在每次調用select時,實際上須要把全部的鏈接描述符從用戶空間拷貝到內核空間,內核檢測到準備好的描述符事後,將結果集再拷貝回來。這樣的拷貝,當你fd的數量不少的時候,消耗是很是大的。
  • 第三個是select在內核中,是經過輪詢遍歷全部fd,這種開銷也是很大的。具體能夠參考select有關的內核源碼。

poll函數

struct pollfd{
    int fd; //fd to check
    short events; //events of interest on fd
    short revents; //events that occurred on fd
}
typedef struct {
  struct pollfd client[OPEN_MAX];
  int maxi;
  int nready;
} pool;
static pool client_pool;
init_pool(listenfd, &client_pool);
while (1) {
while ((client_pool.nready =
            Poll(client_pool.client, client_pool.maxi + 1, INFTIM)) < 0) {
  if (errno == EINTR) printf("got a signal restart poll!!! \n");
}
if (client_pool.client[0].revents & POLLRDNORM) {
  connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
  add_client(connfd, &client_pool);
}
check_clients(&client_pool);
}

poll的代碼,基本與select模式一致。poll不一樣的地方在於它使用pollfd結構來表示fdset,而不是位圖。沒有大小限制,使用起來更爲方便。events和revents分別表示須要檢測的事件和發生的事件。poll傳入的client指針,底層應該是全部的pollfd構成的一個鏈表。

poll和select的缺點同樣,都須要拷貝和輪詢,隨着fd數量的增大,效率都會大大下降。

epoll+非阻塞IO

typedef struct request_buffer{
    int fd;/*fd for the connection  */
    int epfd;/* fd for epoll */
    char buf[MAXBUF]; /*the buffer for the current request*/
    size_t pos,last;
}request_b
struct epoll_event event;  // event to register
event.data.ptr = (void *)request;
event.events = EPOLLIN | EPOLLET;
Epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event);

while (1) {
    int n;
    while ((n = Epoll_wait(epfd, events, MAXEVENTS, -1)) < 0) {
      if (errno == EINTR) printf("got a signal restart\n");
    }
    for (int i = 0; i < n; i++) {
      request_b *rb = (request_b *)events[i].data.ptr;
      // int fd = rb->fd;
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        fprintf(stderr, "epoll error fd %d", rb->fd);
        Close(rb->fd);
        continue;
      } else if (rb->fd == listenfd) {
        /* the new connection incoming */
        int infd;
        while (1) {
          infd = accept(listenfd, (SA *)&clientaddr, &clientlen);
          if (infd < 0) {
            if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
              /*we have processed all incoming connections*/
              break;
            } else {
              unix_error("accept error!");
              break;
            }
          }
          make_socket_non_blocking(infd);
          if (verbose) printf("the new connection fd :%d\n", infd);
          request_b *request = (request_b *)Malloc(sizeof(request_b));
          request_init(request, infd, epfd);
      event.data.ptr = (void *)request;
          event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
          Epoll_ctl(epfd, EPOLL_CTL_ADD, infd, &event);
        }
      }
    
      else {
        if (verbose) printf("new data from fd %d\n", rb->fd);
        doit_nonblock((request_b *)events[i].data.ptr);
        /* where to close the fd and release the requst!!! */
      }
    }
}

這段代碼是典型的non-blocking IO + IO multiplexing的模式,因爲是non-blocking IO,就不能用前面csapp提供的Rio包,由於要處理數據分包到達的狀況。能夠注意到結構體request_b就是是用來記錄當前fd對應的請求狀態,buf是當前請求的緩衝區。doit_nonblock和前面的doit函數有了很大的變化,這裏我就不展開了,能夠看個人代碼,待完善。

epoll有ET和LT兩種模式,詳細定義見man手冊。ET模式可以減小事件觸發的次數,可是代碼複雜度會增長,IO操做必需要等到EAGAIN,容易漏掉事件。而LT模式,事件觸發的次數多一些,代碼實現上要簡單一點,不容易出錯。目前沒有明確的結論哪一種模式更高效,應該是看具體的場景。我這裏使用了ET模式,在個人doit_nonblock函數裏面,對於請求結束的判斷還有錯誤,不可以正確判斷一個完整的HTTP請求。

前面說到select的缺點,epoll是怎麼解決的呢?

  • epoll是在epoll_ctl的時候,就把當前fd"註冊"到內核中,而不是在epoll_wait的時候,時效更長,避免了大量重複的拷貝。
  • epoll內部採用紅黑樹來組織全部fd(結構體epitem),用一個雙向鏈表存儲發生事件的fd(結構體epitem)。epoll_wait只須要檢查這個雙向鏈表是否爲空來判斷是否有事件發生。
  • epoll在執行epoll_ctl的時候,除把fd了插入紅黑樹以外,還會給內核中斷處理程序註冊回調函數。在這個回調函數裏面,會把準備好的fd插入就緒的雙向鏈表裏面。

所以,當存在有大量的鏈接,可是活躍鏈接數量較少的狀況下,epoll是十分高效的。更多的細節,能夠參考內核源碼

3、總結

在實際使用中,確定不會單純的用上面的某一種模式,而是多進程+IO multiplexing 或者 多線程 + IO multiplexing。後面再結合具體的例子,我會再繼續研究。本來,我寫了一個小的測試程序,可是發現個人測試方法不是十分合理,沒有太大意義,就沒有放出來了,有興趣的能夠看一看源碼裏面。

我是從讀csapp後半部分開始,集中學習了一下網絡編程的內容,這些內容十分基礎,也許會對初學者有一些幫助。後續,我還會繼續深刻,準備閱讀陳碩的muduo,本身再動手寫一些代碼。

參考連接:

相關文章
相關標籤/搜索