構建現代的server應用程序需要以某種方法同一時候接收數百、數千甚至數萬個事件,無論它們是內部請求仍是網絡鏈接,都要有效地處理它們的操做。php
有不少解決方式,但事件驅動也被普遍應用到網絡編程中。並大規模部署在高鏈接數高吞吐量的server程序中,如 http server程序、ftp server程序等。html
相比於傳統的網絡編程方式,事件驅動能夠極大的減小資源佔用,增大服務接待能力,並提升網絡傳輸效率。mysql
這些事件驅動模型中, libevent 庫和 libev庫能夠大大提升性能和事件處理能力。react
在本文中。咱們要討論在 UNIX/Linux 應用程序中使用和部署這些解決方式所用的基本結構和方法。linux
libev 和 libevent 都可以在高性能應用程序中使用。ios
在討論libev 和 libevent以前,咱們看看I/O模型演進變化歷史web
一、堵塞網絡接口:處理單個clientsql
咱們 第一次接觸到的網絡編程通常都是從 listen() 、 send() 、 recv() 等接口開始的。使用這些接口可以很是方便的構建server / 客戶機的模型。堵塞I/O模型圖:在調用recv()函數時,發生在內核中等待數據和複製數據的過程。數據庫
當調用recv()函數時。系統首先查是否有準備好的數據。假設數據沒有準備好,那麼系統就處於等待狀態。當數據準備好後,將數據從系統緩衝區拷貝到用戶空間,而後該函數返回。在套接應用程序中,當調用recv()函數時,未必用戶空間就已經存在數據,那麼此時recv()函數就會處於等待狀態。apache
咱們注意到。大部分的 socket 接口都是堵塞型的。所謂堵塞型接口是指系統調用(一般是 IO 接口)不返回調用結果並讓當前線程一直堵塞,僅僅有當該系統調用得到結果或者超時出錯時才返回。
實際上,除非特別指定,差點兒所有的 IO 接口 ( 包含 socket 接口 ) 都是堵塞型的。這給網絡編程帶來了一個很是大的問題。如在調用 send() 的同一時候,線程將被堵塞,在此期間。線程將沒法運行不論什麼運算或響應不論什麼的網絡請求。這給多客戶機、多業務邏輯的網絡編程帶來了挑戰。這時。很是多程序猿可能會選擇多線程的方式來解決問題。
使用堵塞模式的套接字,開發網絡程序比較簡單。easy實現。
當但願能夠立刻發送和接收數據。且處理的套接字數量比較少的狀況下。即一個一個處理client,server沒什麼壓力。使用堵塞模式來開發網絡程序比較合適。
堵塞模式給網絡編程帶來了一個很是大的問題,如在調用 send()的同一時候。線程將被堵塞,在此期間,線程將沒法運行不論什麼運算或響應不論什麼的網絡請求。
假設很是多client同一時候訪問server,server就不能同一時候處理這些請求。這時,咱們可能會選擇多線程的方式來解決問題。
二、多線程/進程處理多個client
應對多客戶機的網絡應用,最簡單的解決方案是在server端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個鏈接都擁有獨立的線程(或進程),這樣不論什麼一個鏈接的堵塞都不會影響其它的鏈接。
詳細使用多進程仍是多線程,並無一個特定的模式。
傳統意義上,進程的開銷要遠遠大於線程,因此,假設需要同一時候爲較多的客戶機提供服務。則不推薦使用多進程;假設單個服務運行體需要消耗較多的 CPU 資源,譬如需要進行大規模或長時間的數據運算或文件訪問,則進程較爲安全。一般,使用 pthread_create () 建立新線程。fork() 建立新進程。即:
(1) a new Connection 進來。用 fork() 產生一個 Process 處理。
(2) a new Connection 進來。用 pthread_create() 產生一個 Thread 處理。
多線程/進程server同一時候爲多個客戶機提供應答服務。模型例如如下:
主線程持續等待client的鏈接請求,假設有鏈接,則建立新線程,並在新線程中提供爲前例相同的問答服務。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> void do_service(int conn); void err_log(string err, int sockfd) { perror("binding"); close(sockfd); exit(-1); } int main(int argc, char *argv[]) { unsigned short port = 8000; int sockfd; sockfd = socket(AF_INET, SOCK_STREAM, 0);// 建立通訊端點:套接字 if(sockfd < 0) { perror("socket"); exit(-1); } struct sockaddr_in my_addr; bzero(&my_addr, sizeof(my_addr)); my_addr.sin_family = AF_INET; my_addr.sin_port = htons(port); my_addr.sin_addr.s_addr = htonl(INADDR_ANY); int err_log = bind(sockfd, (struct sockaddr*)&my_addr, sizeof(my_addr)); if( err_log != 0) err_log("binding"); err_log = listen(sockfd, 10); if(err_log != 0) err_log("listen"); struct sockaddr_in peeraddr; //傳出參數 socklen_t peerlen = sizeof(peeraddr); //傳入傳出參數。必須有初始值 int conn; // 已鏈接套接字(變爲主動套接字,即可以主動connect) pid_t pid; while (1) { if ((conn = accept(sockfd, (struct sockaddr *)&peeraddr, &peerlen)) < 0) //3次握手完畢的序列 err_log("accept error"); printf("recv connect ip=%s port=%d/n", inet_ntoa(peeraddr.sin_addr),ntohs(peeraddr.sin_port)); pid = fork(); if (pid == -1) err_log("fork error"); if (pid == 0) {// 子進程 close(listenfd); do_service(conn); exit(EXIT_SUCCESS); } else close(conn); //父進程 } return 0; } void do_service(int conn) { char recvbuf[1024]; while (1) { memset(recvbuf, 0, sizeof(recvbuf)); int ret = read(conn, recvbuf, sizeof(recvbuf)); if (ret == 0) { //客戶端關閉了 printf("client close/n"); break; } else if (ret == -1) ERR_EXIT("read error"); fputs(recvbuf, stdout); write(conn, recvbuf, ret); } }
很是多剛開始學習的人可能不明確爲什麼一個 socket 能夠 accept 屢次。實際上,socket 的設計者可能特地爲多客戶機的狀況留下了伏筆,讓 accept() 能夠返回一個新的 socket。如下是 accept 接口的原型:
int accept(int s, struct sockaddr *addr, socklen_t *addrlen);
輸入參數 s 是從 socket()。bind() 和 listen() 中沿用下來的 socket 句柄值。運行完 bind() 和 listen() 後,操做系統已經開始在指定的port處監聽所有的鏈接請求。假設有請求。則將該鏈接請求增長請求隊列。
調用 accept() 接口正是從 socket s 的請求隊列抽取第一個鏈接信息,建立一個與 s 同類的新的 socket 返回句柄。新的 socket 句柄便是興許 read() 和 recv() 的輸入參數。假設請求隊列當前沒有請求。則 accept() 將進入堵塞狀態直到有請求進入隊列。
上述多線程的server模型彷佛完美的攻克了爲多個客戶機提供問答服務的要求,但事實上並不盡然。假設要同一時候響應成百上千路的鏈接請求,則無論多線程仍是多進程都會嚴重佔領系統資源,減小系統對外界響應效率。而線程與進程自己也更easy進入假死狀態。
所以其缺點:
1)用 fork() 的問題在於每一個 Connection 進來時的成本過高,假設同一時候接入的併發鏈接數太多easy進程數量很是多,進程之間的切換開銷會很是大,同一時候對於老的內核(Linux)會產生雪崩效應。
2)用 Multi-thread 的問題在於 Thread-safe 與 Deadlock 問題難以解決。另外有 Memory-leak 的問題要處理,這個問題對於很是多程序猿來講無異於惡夢,尤爲是對於連續server的server程序更是不可以接受。 假設才用 Event-based 的方式在於實作上很差寫,尤爲是要注意到事件產生時必須 Nonblocking。因而會需要實作 Buffering 的問題。而 Multi-thread 所會遇到的 Memory-leak 問題在這邊會更嚴重。
而在多 CPU 的系統上沒有辦法使用到所有的 CPU resource。
由此可能會考慮使用「線程池」或「鏈接池」。
「線程池」旨在下降建立和銷燬線程的頻率,其維持必定合理數量的線程。並讓空暇的線程又一次承擔新的運行任務。「鏈接池」維持鏈接的緩存池。儘可能重用已有的鏈接、下降建立和關閉鏈接的頻率。這兩種技術都可以很是好的下降系統開銷,都被普遍應用很是多大型系統。如apache,mysql數據庫等。
但是,「線程池」和「鏈接池」技術也僅僅是在必定程度上緩解了頻繁調用 IO 接口帶來的資源佔用。而且。所謂「池」始終有其上限,當請求大大超過上限時,「池」構成的系統對外界的響應並不比沒有池的時候效果好多少。
因此使用「池」必須考慮其面臨的響應規模,並依據響應規模調整「池」的大小。
相應上例中的所面臨的可能同一時候出現的上千甚至上萬次的client請求,「線程池」或「鏈接池」也許可以緩解部分壓力,但是不能解決所有問題。
因爲多線程/進程致使過多的佔用內存或 CPU等系統資源。
三、非堵塞的server模型
以上面臨的很是多問題,必定程度是 IO 接口的堵塞特性致使的。
多線程是一個解決方式,還一個方案就是使用非堵塞的接口。
非堵塞的接口相比於堵塞型接口的顯著差別在於。在被調用以後立刻返回。
使用例如如下的函數可以將某句柄 fd 設爲非堵塞狀態。
咱們可以使用 fcntl(fd, F_SETFL, flag | O_NONBLOCK); 將套接字標誌變成非堵塞:
fcntl( fd, F_SETFL, O_NONBLOCK );
如下將給出僅僅用一個線程。但能夠同一時候從多個鏈接中檢測數據是否送達,並且接受數據。
在非堵塞狀態下,recv() 接口在被調用後立刻返回,返回值表明了不一樣的含義。
調用recv。假設設備臨時沒有數據可讀就返回-1,同一時候置errno爲EWOULDBLOCK(或者EAGAIN,這兩個宏定義的值一樣)。表示原本應該堵塞在這裏(would block,虛擬語氣),其實並無堵塞而是直接返回錯誤,調用者應該試着再讀一次(again)。
這樣的行爲方式稱爲輪詢(Poll)。調用者僅僅是查詢一下。而不是堵塞在這裏死等
如在本例中,
這樣可以同一時候監視多個設備:
while(1){
非堵塞read(設備1);
if(設備1有數據到達)
處理數據;
非堵塞read(設備2);
if(設備2有數據到達)
處理數據;
..............................
}
假設read(設備1)是堵塞的,那麼僅僅要設備1沒有數據到達就會一直堵塞在設備1的read調用上,即便設備2有數據到達也不能處理,使用非堵塞I/O就可以避免設備2得不到及時處理。
相似一個快遞的樣例:這裏使用忙輪詢的方法:每隔1微妙(while(1)差點兒不間斷)到A樓一層(內核緩衝區)去看快遞來了沒有。假設沒來。立刻返回。
而快遞來了,就放在A樓一層,等你去取。
非堵塞I/O有一個缺點,假設所有設備都一直沒有數據到達,調用者需要重複查詢作無用功,假設堵塞在那裏。操做系統可以調度別的進程運行,就不會作無用功了。在實際應用中非堵塞I/O模型比較少用。
可以看到server線程可以經過循環調用 recv() 接口。可以在單個線程內實現對所有鏈接的數據接收工做。
但是上述模型毫不被推薦。因爲。循環調用 recv() 將大幅度推高 CPU 佔用率。此外。在這個方法中,recv() 不少其它的是起到檢測「操做是否完畢」的做用,實際操做系統提供了更爲高效的檢測「操做是否完畢「做用的接口。好比 select()。
四、IO複用事件驅動server模型
簡單介紹:主要是select和epoll;對一個IOport,兩次調用,兩次返回。比堵塞IO並無什麼優越性。關鍵是能實現同一時候對多個IOport進行監聽;
I/O複用模型會用到select、poll、epoll函數。這幾個函數也會使進程堵塞,但是和堵塞I/O所不一樣的的,這兩個函數可以同一時候堵塞多個I/O操做。而且可以同一時候對多個讀操做,多個寫操做的I/O函數進行檢測,直到有數據可讀或可寫時,才真正調用I/O操做函數。
咱們先具體解釋select:
SELECT函數進行IO複用server模型的原理是:當一個client鏈接上server時。server就將其鏈接的fd增長fd_set集合,等到這個鏈接準備好讀或寫的時候,就通知程序進行IO操做,與client進行數據通訊。
大部分 Unix/Linux 都支持 select 函數。該函數用於探測多個文件句柄的狀態變化。
FD_ZERO(int fd, fd_set* fds) FD_SET(int fd, fd_set* fds) FD_ISSET(int fd, fd_set* fds) FD_CLR(int fd, fd_set* fds) int select( int maxfdp, //Winsock中此參數無心義 fd_set* readfds, //進行可讀檢測的Socket fd_set* writefds, //進行可寫檢測的Socket fd_set* exceptfds, //進行異常檢測的Socket const struct timeval* timeout //非堵塞模式中設置最大等待時間 )
參數列表:
int maxfdp :是一個整數值,意思是「最大fd加1(max fd plus 1). 在三個描寫敘述符集(readfds, writefds, exceptfds)中找出最高描寫敘述符
編號值,而後加 1也可將maxfdp設置爲 FD_SETSIZE。這是一個< sys/types.h >中的常數,它說明了最大的描寫敘述符數(經常是 256或1024) 。
但是對大多數應用程序而言,此值太大了。
確實,大多數應用程序僅僅應用 3 ~ 1 0個描寫敘述符。假設將第三個參數設置爲最高描寫敘述符編號值加 1,內核就僅僅需在此範圍內尋找打開的位。而沒必要在數百位的大範圍內搜索。
fd_set *readfds: 是指向fd_set結構的指針,這個集合中應該包含文件描寫敘述符,咱們是要監視這些文件描寫敘述符的讀變化的。即咱們關
心可否夠從這些文件裏讀取數據了,假設這個集合中有一個文件可讀,select就會返回一個大於0的值。表示有文件可讀,假設沒有可讀的文件。則依據timeout參數再推斷是否超時,若超出timeout的時間,select返回0,若錯誤發生返回負值。可以傳入NULL值。表示不關心不論什麼文件的讀變化。
fd_set *writefds: 是指向fd_set結構的指針,這個集合中應該包含文件描寫敘述符,咱們是要監視這些文件描寫敘述符的寫變化的,即咱們關
心可否夠向這些文件裏寫入數據了,假設這個集合中有一個文件可寫,select就會返回一個大於0的值,表示有文件可寫,假設沒有可寫的文件,則依據timeout參數再推斷是否超時,若超出timeout的時間,select返回0。若錯誤發生返回負值。可以傳入NULL值,表示不關心不論什麼文件的寫變化。
fd_set *errorfds: 同上面兩個參數的意圖,用來監視文件錯誤異常。
readfds , writefds,*errorfds每個描寫敘述符集存放在一個fd_set 數據類型中.如圖:
struct timeval* timeout :是select的超時時間,這個參數相當重要。它可以使select處於三種狀態:
第一,若將NULL以形參傳入,即不傳入時間結構,就是將select置於堵塞狀態,必定等到監視文件描寫敘述符集合中某個文件描寫敘述符發生變化爲止。
第二,若將時間值設爲0秒0毫秒,就變成一個純粹的非堵塞函數,不管文件描寫敘述符是否有變化,都立馬返回繼續運行。文件無變化返回0,有變化返回一個正值;
第三,timeout的值大於0,這就是等待的超時時間,即 select在timeout時間內堵塞。超時時間以內有事件到來就返回了。不然在超時後不管如何必定返回,返回值同上述。
這裏需要注意的一點是,select的堵塞與是否設置非堵塞I/O是沒有關係的。
/* 可讀、可寫、異常三種文件描寫敘述符集的申明和初始化。*/ fd_set readfds, writefds, exceptionfds; FD_ZERO(&readfds); FD_ZERO(&writefds); FD_ZERO(&exceptionfds); int max_fd; /* socket配置和監聽。
*/ sock = socket(...); bind(sock, ...); listen(sock, ...); /* 對socket描寫敘述符上發生關心的事件進行註冊。
*/ FD_SET(&readfds, sock); max_fd = sock; while(1) { int i; fd_set r,w,e; /* 爲了反覆使用readfds 、writefds、exceptionfds,將它們複製到暫時變量內。*/ memcpy(&r, &readfds, sizeof(fd_set)); memcpy(&w, &writefds, sizeof(fd_set)); memcpy(&e, &exceptionfds, sizeof(fd_set)); /* 利用暫時變量調用select()堵塞等待。timeout=null表示等待時間爲永遠等待直到發生事件。*/ select(max_fd + 1, &r, &w, &e, NULL); /* 測試是否有client發起鏈接請求,假設有則接受並把新建的描寫敘述符增長監控。*/ if(FD_ISSET(&r, sock)){ new_sock = accept(sock, ...); FD_SET(&readfds, new_sock); FD_SET(&writefds, new_sock); max_fd = MAX(max_fd, new_sock); } /* 對其餘描寫敘述符發生的事件進行適當處理。描寫敘述符依次遞增,最大值各系統有所不一樣(比方在做者系統上最大爲1024)。 在linux可以用命令ulimit -a查看(用ulimit命令也對該值進行改動)。
在freebsd下,用sysctl -a | grep kern.maxfilesperproc來查詢和改動。
*/ for(i= sock+1; i <max_fd+1; ++i) { if(FD_ISSET(&r, i)) doReadAction(i); if(FD_ISSET(&w, i)) doWriteAction(i); } }
FD_ZERO(int fd, fd_set* fds) //清除其所有位
FD_SET(int fd, fd_set* fds) //在某 fd_set 中標記一個fd的相應位爲1
FD_ISSET(int fd, fd_set* fds) // 測試該集中的一個給定位是否仍舊設置
FD_CLR(int fd, fd_set* fds) //刪除相應位
這裏,fd_set 類型可以簡單的理解爲按 bit 位標記句柄的隊列,好比要在某 fd_set 中標記一個值爲 16 的句柄,則該 fd_set 的第 16 個 bit 位被標記爲 1。詳細的置位、驗證可以使用 FD_SET、FD_ISSET 等宏實現。
好比,編寫下列代碼:
fd_setreadset,writeset; FD_ZERO(&readset); FD_ZERO(&writeset); FD_SET(0,&readset); FD_SET(3,&readset); FD_SET(1,&writeset); FD_SET(2,&writeset); select(4,&readset,&writeset,NULL,NULL);而後,下圖顯示了這兩個描寫敘述符集的狀況:
因爲描寫敘述符編號從0開始,因此要在最大描寫敘述符編號值上加1。
第一個參數其實是要檢查的描寫敘述符數(從描寫敘述符0開始)。
若指定的描寫敘述符都沒有準備好,而且指定的時間已經超過。則發生這樣的狀況。
(3)返回一個正值說明了已經準備好的描寫敘述符數,在這樣的狀況下,三個描寫敘述符集中仍舊打開的位是相應於已準備好的描寫敘述符位。
如下將又一次模擬上例中從多個client接收數據的模型。
使用select()的接收數據模型上述模型僅僅是描寫敘述了使用 select() 接口同一時候從多個client接收數據的過程;由於 select() 接口可以同一時候對多個句柄進行讀狀態、寫狀態和錯誤狀態的探測。因此可以很是easy構建爲多個client提供獨立問答服務的server系統。
使用select()接口的基於事件驅動的server模型
這裏需要指出的是。client的一個 connect() 操做,將在server端激發一個「可讀事件」,因此 select() 也能探測來自client的 connect() 行爲。
上述模型中,最關鍵的地方是怎樣動態維護 select() 的三個參數 readfds、writefds 和 exceptfds。做爲輸入參數,readfds 應該標記所有的需要探測的「可讀事件」的句柄,當中永遠包含那個探測 connect() 的那個「母」句柄;同一時候,writefds 和 exceptfds 應該標記所有需要探測的「可寫事件」和「錯誤事件」的句柄 ( 使用 FD_SET() 標記 )。
做爲輸出參數,readfds、writefds 和 exceptfds 中的保存了 select() 捕捉到的所有事件的句柄值。程序猿需要檢查的所有的標記位 ( 使用 FD_ISSET() 檢查 ),以肯定究竟哪些句柄發生了事件。
上述模型主要模擬的是「一問一答」的服務流程,因此,假設 select() 發現某句柄捕捉到了「可讀事件」,server程序應及時作 recv() 操做。並依據接收到的數據準備好待發送數據。並將相應的句柄值增長 writefds,準備下一次的「可寫事件」的 select() 探測。相同,假設 select() 發現某句柄捕捉到「可寫事件」,則程序應及時作 send() 操做,並準備好下一次的「可讀事件」探測準備。
下圖描寫敘述的是上述模型中的一個運行週期。
一個運行週期
這樣的模型的特徵在於每一個運行週期都會探測一次或一組事件。一個特定的事件會觸發某個特定的響應。咱們可以將這樣的模型歸類爲「事件驅動模型」。
相比其它模型,使用 select() 的事件驅動模型僅僅用單線程(進程)運行,佔用資源少,不消耗太多 CPU,同一時候能夠爲多client提供服務。
假設試圖創建一個簡單的事件驅動的server程序,這個模型有必定的參考價值。但這個模型依然有着很是多問題。
select的缺點:
(1)單個進程能夠監視的文件描寫敘述符的數量存在最大限制
(2)select需要複製大量的句柄數據結構。產生巨大的開銷
(3)select返回的是含有整個句柄的列表,應用程序需要消耗大量時間去輪詢各個句柄才幹發現哪些句柄發生了事件
(4)select的觸發方式是水平觸發,應用程序假設沒有完畢對一個已經就緒的文件描寫敘述符進行IO操做,那麼以後每次select調用仍是會將這些文件描寫敘述符通知進程。相相應方式的是邊緣觸發。
(6) 該模型將事件探測和事件響應夾雜在一塊兒。一旦事件響應的運行體龐大,則對整個模型是災難性的。例如如下例。龐大的運行體 1 的將直接致使響應事件 2 的運行體遲遲得不到運行,並在很是大程度上減小了事件探測的及時性。
龐大的運行體對使用select()的事件驅動模型的影響
很是多操做系統提供了更爲高效的接口,如 linux 提供了 epoll,BSD 提供了 kqueue。Solaris 提供了 /dev/poll …。
假設需要實現更高效的server程序,相似 epoll 這種接口更被推薦。
所以。poll有着與select類似的處理流程:
如下對epoll的使用進行說明:
在這樣的狀況下,僅僅要有數據沒有讀、寫完,調用epoll_wait()的時候,就會有事件被觸發。
/* 新建並初始化文件描寫敘述符集。*/ struct epoll_event ev; struct epoll_event events[MAX_EVENTS]; /* 建立epoll句柄。*/ int epfd = epoll_create(MAX_EVENTS); /* socket配置和監聽。*/ sock = socket(...); bind(sock, ...); listen(sock, ...); /* 對socket描寫敘述符上發生關心的事件進行註冊。*/ ev.events = EPOLLIN; ev.data.fd = sock; epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev); while(1) { int i; /*調用epoll_wait()堵塞等待。等待時間爲永遠等待直到發生事件。*/ int n = epoll_wait(epfd, events, MAX_EVENTS, -1); for(i=0; i <n; ++i) { /* 測試是否有client發起鏈接請求,假設有則接受並把新建的描寫敘述符增長監控。*/ if(events.data.fd == sock) { if(events.events & POLLIN){ new_sock = accept(sock, ...); ev.events = EPOLLIN | POLLOUT; ev.data.fd = new_sock; epoll_ctl(epfd, EPOLL_CTL_ADD, new_sock, &ev); } }else{ /* 對其餘描寫敘述符發生的事件進行適當處理。*/ if(events.events & POLLIN) doReadAction(i); if(events.events & POLLOUT) doWriteAction(i); } } }
這兩個詞來源於計算機硬件設計。
它們的差異是僅僅要句柄知足某種狀態,水平觸發就會發出通知。而僅僅有當句柄狀態改變時。邊緣觸發纔會發出通知。好比一個socket通過長時間等待後接收到一段100k的數據,兩種觸發方式都會向程序發出就緒通知。若是程序從這個socket中讀取了50k數據。並再次調用監聽函數,水平觸發依舊會發出就緒通知,而邊緣觸發會因爲socket「有數據可讀」這個狀態沒有發生變化而不發出通知且陷入長時間的等待。
所以在使用邊緣觸發的 api 時,要注意每次都要讀到 socket返回 EWOULDBLOCK爲止
遺憾的是不一樣的操做系統特供的 epoll 接口有很是大差別,因此使用相似於 epoll 的接口實現具備較好跨平臺能力的server會比較困難。
幸運的是,有很是多高效的事件驅動庫可以屏蔽上述的困難,常見的事件驅動庫有 libevent 庫。還有做爲 libevent 替代者的 libev 庫。
這些庫會依據操做系統的特色選擇最合適的事件探測接口。並且增長了信號 (signal) 等技術以支持異步響應,這使得這些庫成爲構建事件驅動模型的不二選擇。
下章將介紹怎樣使用 libev 庫替換 select 或 epoll 接口。實現高效穩定的server模型。
五、libevent方法
libevent是一個事件觸發的網絡庫,適用於windows、linux、bsd等多種平臺,內部使用select、epoll、kqueue等系統調用管理事件機制。著名分佈式緩存軟件memcached也是libevent based,而且libevent在使用上可以作到跨平臺。而且依據libevent官方站點上發佈的數據統計,彷佛也有着非凡的性能。
libevent 庫實際上沒有更換 select()
、poll()
或其它機制的基礎。而是使用對於每個平臺最高效的高性能解決方式在實現外加上一個包裝器。
爲了實際處理每個請求,libevent 庫提供一種事件機制。它做爲底層網絡後端的包裝器。
事件系統讓爲鏈接加入處理函數變得很簡便,同一時候減小了底層 I/O 複雜性。這是 libevent 系統的核心。
libevent 庫的其它組件提供其它功能。包含緩衝的事件系統(用於緩衝發送到client/從client接收的數據)以及 HTTP、DNS 和 RPC 系統的核心實現。
1)事件驅動,高性能;
2)輕量級,專一於網絡。
3) 跨平臺,支持 Windows、Linux、Mac Os等;
4) 支持多種 I/O多路複用技術, epoll、poll、dev/poll、select 和kqueue 等。
5) 支持 I/O。定時器和信號等事件
1)event 及 event_base事件管理包含各類IO(socket)、定時器、信號等事件,也是libevent應用最廣的模塊。
2 ) evbuffer event 及 event_base 緩存管理是指evbuffer功能;提供了高效的讀寫方法
3) evdns DNS是libevent提供的一個異步DNS查詢功能;
4) evhttp HTTP是libevent的一個輕量級http實現,包含server和client
libevent也支持ssl,這對於有安全需求的網絡程序很是的重要。但是其支持不是很是無缺,比方http server的實現就不支持ssl。
libevent是事件驅動的庫,所謂事件驅動,簡單地說就是你點什麼button(即產生什麼事件),電腦運行什麼操做(即調用什麼函數)。
Libevent框架本質上是一個典型的Reactor模式。因此僅僅需要弄懂Reactor模型。libevent就八九不離十了。
Reactor模式。是一種事件驅動機制。
應用程序需要提供對應的接口並註冊到Reactor上,假設對應的事件發生,Reactor將主動調用應用程序註冊的接口,這些接口又稱爲「回調函數」。
在Libevent中也是同樣。向Libevent框架註冊對應的事件和回調函數;當這些事件發生時,Libevent會調用這些回調函數處理對應的事件(I/O讀寫、定時和信號)。
使用Reactor模型,必備的幾個組件:事件源、Reactor框架、多路複用機制和事件處理程序,先來看看Reactor模型的整體框架,接下來再對每個組件作逐一說明。
1) 事件源
1) 2) event demultiplexer——事件多路分發機制
由操做系統提供的I/O多路複用機制,比方select和epoll。程序首先將其關心的句柄(事件源)及其事件註冊到event demultiplexer上。當有事件到達時,event demultiplexer會發出通知「在已經註冊的句柄集中,一個或多個句柄的事件已經就緒」;程序收到通知後,就可以在非堵塞的狀況下對事件進行處理了。
相應到libevent中,依舊是select、poll、epoll等,但是libevent使用結構體eventop進行了封裝,以統一的接口來支持這些I/O多路複用機制,達到了對外隱藏底層系統機制的目的。
3) Reactor——反應器
Reactor,是事件管理的接口,內部使用event demultiplexer註冊、註銷事件;並執行事件循環。當有事件進入「就緒」狀態時,調用註冊事件的回調函數處理事件。
相應到libevent中。就是event_base結構體。
4) Event Handler——事件處理程序
事件處理程序提供了一組接口,每個接口對應了一種類型的事件。供Reactor在對應的事件發生時調用,運行對應的事件處理。一般它會綁定一個有效的句柄。
相應到libevent中,就是event結構體。
結合Reactor框架,咱們來理一下libevent的事件處理流程,請看下圖:
event_init() 初始化:
首先要隆重介紹event_base對象:
struct event_base { const struct eventop *evsel; void *evbase; int event_count; /* counts number of total events */ int event_count_active; /* counts number of active events */ int event_gotterm; /* Set to terminate loop */ /* active event management */ struct event_list **activequeues; int nactivequeues; struct event_list eventqueue; struct timeval event_tv; RB_HEAD(event_tree, event) timetree; };
event_base對象整合了事件處理的一些全局變量, 角色是event對象的"總管家", 他包含了:
事件引擎函數對象(evsel, evbase),
當前入列事件列表(event_count, event_count_active, eventqueue),
全局終止信號(event_gotterm),
活躍事件列表(avtivequeues),
事件隊列樹(timetree)...
初始化時建立event_base對象, 選擇 當前OS支持的事件引擎(epoll, poll, select...)並初始化, 建立全局信號隊列(signalqueue), 活躍隊列的內存分配( 依據設置的priority個數,默以爲1).
event_setevent_set來設置event對象,包含所有者event_base對象, fd, 事件(EV_READ| EV_WRITE|EV_PERSIST), 回掉函數和參數,事件優先級是當前event_base的中間級別(current_base->nactivequeues/2)
設置監視事件後,事件處理函數可以僅僅被調用一次或總被調用。
僅僅調用一次:事件處理函數被調用後,即從事件隊列中刪除。需要在事件處理函數中再次增長事件,才幹在下次事件發生時被調用;
總被調用:設置爲EV_PERSIST,僅僅增長一次,處理函數總被調用,除非採用event_remove顯式地刪除。
event_add() 事件加入:
int event_add(struct event *ev, struct timeval *tv)
這個接口有兩個參數, 第一個是要加入的事件, 第二個參數做爲事件的超時值(timer). 假設該值非NULL, 在加入本事件的同一時候加入超時事件(EV_TIMEOUT)到時間隊列樹(timetree), 依據事件類型處理例如如下:
EV_READ => EVLIST_INSERTED => eventqueue
EV_WRITE => EVLIST_INSERTED => eventqueue
EV_TIMEOUT => EVLIST_TIMEOUT => timetree
EV_SIGNAL => EVLIST_SIGNAL => signalqueue
event_base_loop() 事件處理主循環
這裏是事件的主循環,僅僅要flags不是設置爲EVLOOP_NONBLOCK, 該函數就會一直循環監聽事件/處理事件.
每次循環過程當中, 都會處理當前觸發(活躍)事件:
(a). 檢測當前是否有信號處理(gotterm, gotsig), 這些都是全局參數,不適合多線程
(b). 時間更新,找到離當前近期的時間事件, 獲得相對超時事件tv
(c). 調用事件引擎的dispatch wait事件觸發, 超時值爲tv, 觸發事件加入到activequeues
(d). 處理活躍事件, 調用caller的callbacks (event_process_acitve)
典型的libevent的應用大體整體流程:
建立 libevent server的基本方法是, 註冊當發生某一操做(比方接受來自client的鏈接)時應該運行的函數,而後調用主事件循環event_dispatch()
。運行過程的控制現在由 libevent 系統處理。
註冊事件和將調用的函數以後,事件系統開始自治。在應用程序執行時,可以在事件隊列中加入(註冊)或刪除(取消註冊)事件。事件註冊很方便,可以經過它加入新事件以處理新打開的鏈接,從而構建靈活的網絡處理系統
(環境設置)-> (建立event_base) -> (註冊event,將此event增長到event_base中) -> (設置event各類屬性。事件等) ->(將event增長事件列表 addevent) ->(開始事件監視循環、分發dispatch)。
樣例:
好比,可以打開一個監聽套接字,而後註冊一個回調函數,每當需要調用 accept()
函數以打開新鏈接時調用這個回調函數,這樣就建立了一個網絡server。例1例如如下所看到的的代碼片斷說明基本過程:
例1:打開監聽套接字,註冊一個回調函數(每當需要調用 accept() 函數以打開新鏈接時調用它),由此建立網絡server:
#include <stdio.h> #include <string.h> #include <iostream> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <netdb.h> #include <event.h> using namespace std; // 事件base struct event_base* base; // 讀事件回調函數 void onRead(int iCliFd, short iEvent, void *arg) { int iLen; char buf[1500]; iLen = recv(iCliFd, buf, 1500, 0); if (iLen <= 0) { cout << "Client Close" << endl; // 鏈接結束(=0)或鏈接錯誤(<0)。將事件刪除並釋放內存空間 struct event *pEvRead = (struct event*)arg; event_del(pEvRead); delete pEvRead; close(iCliFd); return; } buf[iLen] = 0; cout << "Client Info:" << buf << endl; } // 鏈接請求事件回調函數 void onAccept(int iSvrFd, short iEvent, void *arg) { int iCliFd; struct sockaddr_in sCliAddr; socklen_t iSinSize = sizeof(sCliAddr); iCliFd = accept(iSvrFd, (struct sockaddr*)&sCliAddr, &iSinSize); // 鏈接註冊爲新事件 (EV_PERSIST爲事件觸發後不默認刪除) struct event *pEvRead = new event; event_set(pEvRead, iCliFd, EV_READ|EV_PERSIST, onRead, pEvRead); event_base_set(base, pEvRead); event_add(pEvRead, NULL); } int main() { int iSvrFd; struct sockaddr_in sSvrAddr; memset(&sSvrAddr, 0, sizeof(sSvrAddr)); sSvrAddr.sin_family = AF_INET; sSvrAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); sSvrAddr.sin_port = htons(8888); // 建立tcpSocket(iSvrFd),監聽本機8888端口 iSvrFd = socket(AF_INET, SOCK_STREAM, 0); bind(iSvrFd, (struct sockaddr*)&sSvrAddr, sizeof(sSvrAddr)); listen(iSvrFd, 10); // 初始化base base = event_base_new(); struct event evListen; // 設置事件 event_set(&evListen, iSvrFd, EV_READ|EV_PERSIST, onAccept, NULL); // 設置爲base事件 event_base_set(base, &evListen); // 加入事件 event_add(&evListen, NULL); // 事件循環 event_base_dispatch(base); return 0; }
event_set()
函數建立新的事件結構,
event_add()
在事件隊列機制中加入事件。
而後,event_dispatch()
啓動事件隊列系統,開始監聽(並接受)請求。
雖然 C 語言很是適合不少系統應用程序。但是在現代環境中不經常使用 C 語言,腳本語言更靈活、更有用。
幸運的是,Perl 和 PHP 等大多數腳本語言是用 C 編寫的,因此可以經過擴展模塊使用 libevent 等 C 庫。
四、libev庫
官方文檔:http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod
與 libevent 同樣,libev 系統也是基於事件循環的系統,它在 poll()
、select()
等機制的本機實現的基礎上提供基於事件的循環。
libev是libevent以後的一個事件驅動的編程框架。其接口和libevent基本相似。據官方介紹。其性能比libevent還要高,bug比libevent還少。
libev API 比較原始,沒有 HTTP 包裝器。但是 libev 支持在實現中內置不少其它事件類型。好比,一種 evstat 實現可以監視多個文件的屬性變更,可以在 例4 所看到的的 HTTP 文件解決方式中使用它。
但是,libevent 和 libev 的基本過程是一樣的。建立所需的網絡監聽套接字。註冊在運行期間要調用的事件。而後啓動主事件循環,讓 libev 處理過程的其他部分。
Libev是一個event loop:向libev註冊感興趣的events,比方Socket可讀事件,libev會對所註冊的事件的源進行管理,並在事件發生時觸發對應的程序。
事件驅動框架:
定義一個監控器、書寫觸發動做邏輯、初始化監控器、設置監控器觸發條件、將監控器增長大事件驅動器的循環中就能夠。
libev的事件驅動過程可以想象成例如如下的僞代碼:
do_some_init() is_run = True while is_run: t = caculate_loop_time() deal_loop(t) deal_with_pending_event() do_some_clear()
首先作一些初始化操做,而後進入到循環中,該循環經過一個狀態位來控制是否運行。
在循環中。計算出下一次輪詢的時間,這裏輪詢的實現就採用了系統提供的epoll、kqueue等機制。
再輪詢結束後檢查有哪些監控器的被觸發了,依次運行觸發動做。
Libev 除了提供了主要的三大類事件(IO事件、定時器事件、信號事件)外還提供了週期事件、子進程事件、文件狀態改變事件等多個事件。
libev所實現的功能就是一個強大的reactor,可能notify事件主要包含如下這些:
libev 相同需要循環探測事件是否產生。
Libev 的循環體用 ev_loop 結構來表達。並用 ev_loop( ) 來啓動。
void ev_loop( ev_loop* loop, int flags ) |
Libev 支持八種事件類型,當中包含 IO 事件。
一個 IO 事件用 ev_io 來表徵,並用 ev_io_init() 函數來初始化:
void ev_io_init(ev_io *io, callback, int fd, int events) |
初始化內容包含回調函數 callback,被探測的句柄 fd 和需要探測的事件。EV_READ 表「可讀事件」。EV_WRITE 表「可寫事件」。
現在,用戶需要作的不過在合適的時候,將某些 ev_io 從 ev_loop 增長或剔除。一旦增長,下個循環即會檢查 ev_io 所指定的事件有否發生;假設該事件被探測到,則 ev_loop 會本身主動運行 ev_io 的回調函數 callback();假設 ev_io 被註銷。則再也不檢測相應事件。
無論某 ev_loop 啓動與否,都可以對其加入或刪除一個或多個 ev_io,加入刪除的接口是 ev_io_start() 和 ev_io_stop()。
void ev_io_start( ev_loop *loop, ev_io* io ) void ev_io_stop( EV_A_* ) |
由此,咱們可以easy得出例如如下的「一問一答」的server模型。由於沒有考慮server端主動終止鏈接機制,因此各個鏈接可以維持隨意時間,client可以自由選擇退出時機。
IO事件、定時器事件、信號事件:
#include<ev.h> #include <stdio.h> #include <signal.h> #include <sys/unistd.h> ev_io io_w; ev_timer timer_w; ev_signal signal_w; void io_action(struct ev_loop *main_loop,ev_io *io_w,int e) { int rst; char buf[1024] = {''}; puts("in io cb\n"); read(STDIN_FILENO,buf,sizeof(buf)); buf[1023] = ''; printf("Read in a string %s \n",buf); ev_io_stop(main_loop,io_w); } void timer_action(struct ev_loop *main_loop,ev_timer *timer_w,int e) { puts("in tiemr cb \n"); ev_timer_stop(main_loop,io_w); } void signal_action(struct ev_loop *main_loop,ev_signal signal_w,int e) { puts("in signal cb \n"); ev_signal_stop(main_loop,io_w); ev_break(main_loop,EVBREAK_ALL); } int main(int argc ,char *argv[]) { struct ev_loop *main_loop = ev_default_loop(0); ev_init(&io_w,io_action); ev_io_set(&io_w,STDIN_FILENO,EV_READ); ev_init(&timer_w,timer_action); ev_timer_set(&timer_w,2,0); ev_init(&signal_w,signal_action); ev_signal_set(&signal_w,SIGINT); ev_io_start(main_loop,&io_w); ev_timer_start(main_loop,&timer_w); ev_signal_start(main_loop,&signal_w); ev_run(main_loop,0); return 0; }
這裏使用了3種事件監控器,分別監控IO事件、定時器事件以及信號事件。所以定義了3個監控器(watcher),以及觸發監控器時要運行動做的回調函數。Libev定義了多種監控器,命名方式爲ev_xxx
這裏xxx表明監控器類型,事實上現是一個結構體。
typedef struct ev_io { .... } ev_io;
經過宏定義可以簡寫爲 ev_xxx
。
回調函數的類型爲 void cb_name(struct ev_loop *main_loop,ev_xxx *io_w,int event)
。
在main中,首先定義了一個事件驅動器的結構 struct ev_loop *main_loop
這裏調用 ev_default_loop(0)
生成一個預製的全局驅動器。這裏可以參考Manual中的選擇。
而後依次初始化各個監控器以及設置監控器的觸發條件。
初始化監控器的過程是將對應的回調函數即觸發時的動做註冊到監控器上。
設置觸發條件則是該條件產生時纔去運行註冊到監控器上的動做。對於IO事件,一般是設置特定fd上的的可讀或可寫事件,定時器則是多久後觸發。這裏定時器的觸發條件中還有第三參數。表示第一次觸發後,是否循環。若爲0則吧循環,不然按該值循環。信號觸發器則是設置觸發的信號。
在初始化並設置好觸發條件後,先調用ev_xxx_start
將監控器註冊到事件驅動器上。接着調用 ev_run
開始事件驅動器。
上述模型可以接受隨意多個鏈接,且爲各個鏈接提供全然獨立的問答服務。藉助 libev 提供的事件循環 / 事件驅動接口,上述模型有機會具有其它模型不能提供的高效率、低資源佔用、穩定性好和編寫簡單等特色。
由於傳統的 web server。ftp server及其它網絡應用程序都具備「一問一答」的通信邏輯。因此上述使用 libev 庫的「一問一答」模型對構建相似的server程序具備參考價值;另外,對於需要實現遠程監視或遠程遙控的應用程序,上述模型相同提供了一個可行的實現方案。
php-了libev擴展socket:<?php /* 使用異步io訪問socket Use some async I/O to access a socket */ // `sockets' extension still logs warnings // for EINPROGRESS, EAGAIN/EWOULDBLOCK etc. error_reporting(E_ERROR); $e_nonblocking = array (/*EAGAIN or EWOULDBLOCK*/11, /*EINPROGRESS*/115); // Get the port for the WWW service $service_port = getservbyname('www', 'tcp'); // Get the IP address for the target host $address = gethostbyname('google.co.uk'); // Create a TCP/IP socket $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket === FALSE) { echo \"socket_create() failed: reason: \" .socket_strerror(socket_last_error()) . \"n\"; } // Set O_NONBLOCK flag socket_set_nonblock($socket); // Abort on timeout $timeout_watcher = new EvTimer(10.0, 0., function () use ($socket) { socket_close($socket); Ev::stop(Ev::BREAK_ALL); }); // Make HEAD request when the socket is writable $write_watcher = new EvIo($socket, Ev::WRITE, function ($w) use ($socket, $timeout_watcher, $e_nonblocking) { // Stop timeout watcher $timeout_watcher->stop(); // Stop write watcher $w->stop(); $in = \"HEAD / HTTP/1.1rn\"; $in .= \"Host: google.co.ukrn\"; $in .= \"Connection: Closernrn\"; if (!socket_write($socket, $in, strlen($in))) { trigger_error(\"Failed writing $in to socket\", E_USER_ERROR); } $read_watcher = new EvIo($socket, Ev::READ, function ($w, $re) use ($socket, $e_nonblocking) { // Socket is readable. recv() 20 bytes using non-blocking mode $ret = socket_recv($socket, $out, 20, MSG_DONTWAIT); if ($ret) { echo $out; } elseif ($ret === 0) { // All read $w->stop(); socket_close($socket); return; } // Caught EINPROGRESS, EAGAIN, or EWOULDBLOCK if (in_array(socket_last_error(), $e_nonblocking)) { return; } $w->stop(); socket_close($socket); }); Ev::run(); }); $result = socket_connect($socket, $address, $service_port); Ev::run(); ?>
libevent 和 libev 都提供靈活且強大的環境。支持爲處理server端或client請求實現高性能網絡(和其它 I/O)接口。
目標是以高效(CPU/RAM 使用量低)的方式支持數千甚至數萬個鏈接。在本文中,您看到了一些演示樣例,包含 libevent 中內置的 HTTP 服務,可以使用這些技術支持基於 IBM Cloud、EC2 或 AJAX 的 web 應用程序。
參考:
http://www.ibm.com/developerworks/cn/linux/l-cn-edntwk/
http://www.ibm.com/developerworks/cn/aix/library/au-libev/