構建異步處理網絡服務器

  此次提到了網絡服務器的異步處理,關於異步跟同步的概念,相信不少小夥伴都很是清楚,固然也有不少萌萌噠的小夥伴常常據說異步,但理解的不夠。(這裏會附帶解釋下後面涉及到的回調機制)後端

  A)實際生活中的同步和異步數組

       同步通信:最近ADSL撥號一直不是很穩定,因而打電話給電信的師傅請求支援,電話打通了,結果電信師傅正在忙着處理另外一個問題,因而我就拿着電話等着師傅處理完他手頭的事情,而後再指導我。(很呆吧,但這種阻塞的通信方式就是同步)服務器

       異步通信:一樣的問題,機智的我選擇了打給客服,電話打通了,客服說:您的問題咱們已經記錄,您只要留下可靠的聯繫方式,很快就會有技術人員跟您聯繫,而後我就能夠掛了電話去幹別的事情了,客服MM也能夠繼續去接別的電話了。(~~這種就是異步通信機制)網絡

  B)計算機程序中的同步和異步多線程

       編寫程序時,不一樣的動態庫或者模塊之間老是存在着必定的接口,從調用方式上,能夠把他們分爲三類:同步調用、回調和異步調用。併發

       同步調用:一種阻塞式調用,調用方要等待對方執行完畢才返回, 它是一種單向調用;異步

       異步調用:一種相似消息或事件的機制,不過它的調用方向恰好相反,接口的服務在收到某種訊息或發生某種事件時,會主動通知客戶方(即調用客戶方的接口)。socket

       回調:一種雙向調用模式,也就是說,被調用方在接口被調用時也會調用對方的接口(註冊好的接口,能夠理解爲我剛纔留給客服的電話號碼!好吧,很繞慢慢來!)。函數

       因而可知,回調和異步調用的關係很是緊密,一般咱們使用回調來實現異步消息的註冊,經過異步調用來實現消息的通知。工具

       理解了異步同步的概念,接下來開始咱們的重點。

       許多服務器的構建面對的最大問題之一是必須可以處理大量併發鏈接。處理多個鏈接有許多不一樣的傳統方法,可是在處理大量鏈接時它們每每會產生問題,由於它們使用的內存或 CPU 太多,或者達到了某個操做系統限制。

       先來看看經常使用作法:

       循環:早期系統使用簡單的循環選擇解決方案,即循環遍歷打開的網絡鏈接的列表,判斷是否有要讀取的數據。這種方法既緩慢(尤爲是隨着鏈接數量增長愈來愈慢),又低效(由於在處理當前鏈接時其餘鏈接可能正在發送請求並等待響應)。在系統循環遍歷每一個鏈接時,其餘鏈接不得不等待。若是有 100 個鏈接,其中只有一個有數據,那麼仍然必須處理其餘 99 個鏈接,才能輪到真正須要處理的鏈接。

  Poll和epoll:這是對循環方法的改進,它用一個結構保存要監視的每一個鏈接的數組,當在網絡套接字上發現數據時,經過回調機制調用處理函數。poll 的問題是這個結構會很是大,在列表中添加新的網絡鏈接時,修改結構會增長負載並影響性能。

  選擇方式:即select() 函數調用使用一個靜態結構,由於有1024 個鏈接的數量限制,所以不適用於很是大的部署。

  Thread多線程方式:利用多線程支持監聽和處理鏈接,爲每一個鏈接啓動一個thread。這種方式,在處理大量鏈接時,會大幅增長 內存和 CPU的開銷,由於每一個線程都須要本身的執行空間。另外,若是每一個線程都忙於處理網絡鏈接,線程之間的上下文切換會很頻繁。最後,許多內核並不適於處理如此大量的活躍線程。

  講故事講究欲擒故縱、欲揚先抑,說了這幾種經常使用方式固然只是鋪墊,下面重點介紹基於Linux網絡庫libevent的方式。

  libevent 庫實際上沒有更換 select()、poll() 或其餘機制的基礎。而是使用對於每一個平臺最高效的高性能解決方案在實現外加上一個包裝器。爲了實際處理每一個請求,libevent 庫提供一種事件機制,它做爲底層網絡後端的包裝器。事件系統讓爲鏈接添加處理函數變得很是簡便,同時下降了底層 I/O 複雜性。

  構建libevent 服務器的基本方法是,註冊當發生某一操做(好比接受來自客戶端的鏈接)時應該執行的函數,而後調用主事件循環event_dispatch()。執行過程的控制如今由 libevent 系統處理。註冊事件和將調用的函數以後,事件系統開始自治;在應用程序運行時,能夠在事件隊列中添加(註冊)或刪除(取消註冊)事件。事件註冊很是方便,能夠經過它添加新事件以處理新打開的鏈接,從而構建靈活的網絡處理服務系統。

  下面用實際代碼來講明libevent庫如何構建異步處理服務器:

  打開監聽套接字,註冊一個回調函數(每當監聽到客戶端的鏈接請求,在調用 accept() 函數以打開新鏈接時調用它),由此構建網絡服務器:

/*init libevent*/
ev_init(); 
/* Setup listening socket */
event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL);
event_add(&ev_accept, NULL);
/* Start the event loop. */
event_dispatch();

  event_set() 函數建立新的事件結構,這裏的EV_PERSIST是爲了建立一個永久的監聽事件,由於event機制在處理完一個就緒事件後爲了提升遍歷效率會主動刪除這個事件,若是須要繼續監聽,則須要從新在事件隊列中add此事件,而設置EV_PERSIST屬性,event機制將不會刪除已處理事件,保持持續監聽client動做。

  event_add() 在事件隊列機制中添加事件。

  event_dispatch() 啓動事件隊列系統,開始監聽(並接受)請求。

  On_accept函數:當接受鏈接時,事件系統調用此函數。此函數接受到客戶端的鏈接;添加客戶端套接字信息和一個 bufferevent 結構;在事件結構中爲客戶端套接字上的讀/寫/錯誤事件添加回調函數;做爲參數傳遞客戶端結構(和嵌入的 eventbuffer 和客戶端套接字)。每當對應的客戶端套接字包含讀、寫或錯誤操做時,調用對應的回調函數。

  完整代碼以下:(一個簡易應答服務器:把客戶端發過來的信息,回發給客戶端)

  1 include <event.h>
  2 #include <sys/types.h>
  3 #include <sys/socket.h>
  4 #include <netinet/in.h>
  5 #include <arpa/inet.h>
  6 #include <string.h>
  7 #include <stdlib.h>
  8 #include <stdio.h>
  9 #include <fcntl.h>
 10 #include <unistd.h>
 11 
 12 #define SERVER_PORT 8899
 13 int debug = 0;
 14 
 15 struct client {
 16   int fd;
 17   struct bufferevent *buf_ev;
 18 };
 19 
 20 int setnonblock(int fd)
 21 {
 22   int flags;
 23 
 24   flags = fcntl(fd, F_GETFL);
 25   flags |= O_NONBLOCK;
 26   fcntl(fd, F_SETFL, flags);
 27 }
 28 
 29 void buf_read_callback(struct bufferevent *incoming,
 30                        void *arg)
 31 {
 32   struct evbuffer *evreturn;
 33   char *req;
 34 
 35   req = evbuffer_readline(incoming->input);
 36   if (req == NULL)
 37     return;
 38 
 39   evreturn = evbuffer_new();
 40   evbuffer_add_printf(evreturn,"Client msg %s\n",req);
 41   bufferevent_write_buffer(incoming,evreturn);
 42   evbuffer_free(evreturn);
 43   free(req);
 44 }
 45 
 46 void buf_write_callback(struct bufferevent *bev,
 47                         void *arg)
 48 {
 49 }
 50 
 51 void buf_error_callback(struct bufferevent *bev,
 52                         short what,
 53                         void *arg)
 54 {
 55   struct client *client = (struct client *)arg;
 56   bufferevent_free(client->buf_ev);
 57   close(client->fd);
 58   free(client);
 59 }
 60 
 61 void on_accept(int fd,
 62                      short ev,
 63                      void *arg)
 64 {
 65   int client_fd;
 66   struct sockaddr_in client_addr;
 67   socklen_t client_len = sizeof(client_addr);
 68   struct client *client;
 69 
 70   client_fd = accept(fd,
 71                      (struct sockaddr *)&client_addr,
 72                      &client_len);
 73   if (client_fd < 0)
 74     {
 75       fprintf(stderr,"Client: accept() failed\n");
 76       return;
 77     }
 78 
 79   setnonblock(client_fd);
 80 
 81   client = calloc(1, sizeof(*client));
 82   if (client == NULL)
 83     fprintf(stderr, "malloc failed\n");
 84   client->fd = client_fd;
 85 
 86   client->buf_ev = bufferevent_new(client_fd,
 87                                    buf_read_callback,
 88                                    buf_write_callback,
 89                                    buf_error_callback,
 90                                    client);
 91 
 92   bufferevent_enable(client->buf_ev, EV_READ);
 93 }
 94 
 95 int main(int argc,
 96          char **argv)
 97 {
 98   int socketlisten;
 99   struct sockaddr_in addresslisten;
100   struct event accept_event;
101   int reuse = 1;
102 
103   event_init();
104 
105   socketlisten = socket(AF_INET, SOCK_STREAM, 0);
106 
107   if (socketlisten < 0)
108     {
109       fprintf(stderr,"Failed to create listen socket");
110       return 1;
111     }
112 
113   memset(&addresslisten, 0, sizeof(addresslisten));
114 
115   addresslisten.sin_family = AF_INET;
116   addresslisten.sin_addr.s_addr = INADDR_ANY;
117   addresslisten.sin_port = htons(SERVER_PORT);
118 
119   if (bind(socketlisten,
120            (struct sockaddr *)&addresslisten,
121            sizeof(addresslisten)) < 0)
122     {
123       fprintf(stderr,"Failed to bind");
124       return 1;
125     }
126 
127   if (listen(socketlisten, 5) < 0)
128     {
129       fprintf(stderr,"Failed to listen to socket");
130       return 1;
131     }
132 
133   setsockopt(socketlisten,
134              SOL_SOCKET,
135              SO_REUSEADDR,
136              &reuse,
137              sizeof(reuse));
138 
139   setnonblock(socketlisten);
140 
141   event_set(&accept_event,
142             socketlisten,
143             EV_READ|EV_PERSIST,
144             on_accept,
145             NULL);
146 
147   event_add(&accept_event,
148             NULL);
149 
150   event_dispatch();
151 
152   close(socketlisten);
153 
154   return 0;
155 }

  服務器啓動後,就會監聽8899端口,等待處理client請求:

  客戶端可使用telnet,或者其餘組包工具來模擬:

  好了:),此次的分享先介紹到這裏,有什麼意見或建議,你們積極留言,Thanks!

相關文章
相關標籤/搜索