構建現代的服務器應用程序須要以某種方法同時接收數百、數千甚至數萬個事件,不管它們是內部請求仍是網絡鏈接,都要有效地處理它們的操做。php
有許多解決方案,但事件驅動也被普遍應用到網絡編程中。並大規模部署在高鏈接數高吞吐量的服務器程序中,如 http 服務器程序、ftp 服務器程序等。相比於傳統的網絡編程方式,事件驅動可以極大的下降資源佔用,增大服務接待能力,並提升網絡傳輸效率。html
這些事件驅動模型中, libevent 庫和 libev庫可以大大提升性能和事件處理能力。在本文中,咱們要討論在 UNIX/Linux 應用程序中使用和部署這些解決方案所用的基本結構和方法。libev 和 libevent 均可以在高性能應用程序中使用。mysql
在討論libev 和 libevent以前,咱們看看I/O模型演進變化歷史react
一、阻塞網絡接口:處理單個客戶端linux
咱們 第一次接觸到的網絡編程通常都是從 listen() 、 send() 、 recv() 等接口開始的。使用這些接口能夠很方便的構建服務器 / 客戶機的模型。阻塞I/O模型圖:在調用recv()函數時,發生在內核中等待數據和複製數據的過程。ios
當調用recv()函數時,系統首先查是否有準備好的數據。若是數據沒有準備好,那麼系統就處於等待狀態。當數據準備好後,將數據從系統緩衝區複製到用戶空間,而後該函數返回。在套接應用程序中,當調用recv()函數時,未必用戶空間就已經存在數據,那麼此時recv()函數就會處於等待狀態。程序員
咱們注意到,大部分的 socket 接口都是阻塞型的。所謂阻塞型接口是指系統調用(通常是 IO 接口)不返回調用結果並讓當前線程一直阻塞,只有當該系統調用得到結果或者超時出錯時才返回。web
實際上,除非特別指定,幾乎全部的 IO 接口 ( 包括 socket 接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,如在調用 send() 的同時,線程將被阻塞,在此期間,線程將沒法執行任何運算或響應任何的網絡請求。這給多客戶機、多業務邏輯的網絡編程帶來了挑戰。這時,不少程序員可能會選擇多線程的方式來解決這個問題。sql
使用阻塞模式的套接字,開發網絡程序比較簡單,容易實現。當但願可以當即發送和接收數據,且處理的套接字數量比較少的狀況下,即一個一個處理客戶端,服務器沒什麼壓力,使用阻塞模式來開發網絡程序比較合適。數據庫
阻塞模式給網絡編程帶來了一個很大的問題,如在調用 send()的同時,線程將被阻塞,在此期間,線程將沒法執行任何運算或響應任何的網絡請求。若是不少客戶端同時訪問服務器,服務器就不能同時處理這些請求。這時,咱們可能會選擇多線程的方式來解決這個問題。
二、多線程/進程處理多個客戶端
應對多客戶機的網絡應用,最簡單的解決方式是在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。
具體使用多進程仍是多線程,並無一個特定的模式。傳統意義上,進程的開銷要遠遠大於線程,因此,若是須要同時爲較多的客戶機提供服務,則不推薦使用多進程;若是單個服務執行體須要消耗較多的 CPU 資源,譬如須要進行大規模或長時間的數據運算或文件訪問,則進程較爲安全。一般,使用 pthread_create () 建立新線程,fork() 建立新進程。即:
(1) a new Connection 進來,用 fork() 產生一個 Process 處理。
(2) a new Connection 進來,用 pthread_create() 產生一個 Thread 處理。
多線程/進程服務器同時爲多個客戶機提供應答服務。模型以下:
主線程持續等待客戶端的鏈接請求,若是有鏈接,則建立新線程,並在新線程中提供爲前例一樣的問答服務。
不少初學者可能不明白爲什麼一個 socket 能夠 accept 屢次。實際上,socket 的設計者可能特地爲多客戶機的狀況留下了伏筆,讓 accept() 可以返回一個新的 socket。下面是 accept 接口的原型:
int accept(int s, struct sockaddr *addr, socklen_t *addrlen);
輸入參數 s 是從 socket(),bind() 和 listen() 中沿用下來的 socket 句柄值。執行完 bind() 和 listen() 後,操做系統已經開始在指定的端口處監聽全部的鏈接請求,若是有請求,則將該鏈接請求加入請求隊列。調用 accept() 接口正是從 socket s 的請求隊列抽取第一個鏈接信息,建立一個與 s 同類的新的 socket 返回句柄。新的 socket 句柄便是後續 read() 和 recv() 的輸入參數。若是請求隊列當前沒有請求,則 accept() 將進入阻塞狀態直到有請求進入隊列。
上述多線程的服務器模型彷佛完美的解決了爲多個客戶機提供問答服務的要求,但其實並不盡然。若是要同時響應成百上千路的鏈接請求,則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,而線程與進程自己也更容易進入假死狀態。
所以其缺點:
1)用 fork() 的問題在於每個 Connection 進來時的成本過高,若是同時接入的併發鏈接數太多容易進程數量不少,進程之間的切換開銷會很大,同時對於老的內核(Linux)會產生雪崩效應。
2)用 Multi-thread 的問題在於 Thread-safe 與 Deadlock 問題難以解決,另外有 Memory-leak 的問題要處理,這個問題對於不少程序員來講無異於惡夢,尤爲是對於連續服務器的服務器程序更是不能夠接受。 若是才用 Event-based 的方式在於實作上很差寫,尤爲是要注意到事件產生時必須 Nonblocking,因而會須要實作 Buffering 的問題,而 Multi-thread 所會遇到的 Memory-leak 問題在這邊會更嚴重。而在多 CPU 的系統上沒有辦法使用到全部的 CPU resource。
由此可能會考慮使用「線程池」或「鏈接池」。「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務。「鏈接池」維持鏈接的緩存池,儘可能重用已有的鏈接、減小建立和關閉鏈接的頻率。這兩種技術均可以很好的下降系統開銷,都被普遍應用不少大型系統,如apache,mysql數據庫等。
可是,「線程池」和「鏈接池」技術也只是在必定程度上緩解了頻繁調用 IO 接口帶來的資源佔用。並且,所謂「池」始終有其上限,當請求大大超過上限時,「池」構成的系統對外界的響應並不比沒有池的時候效果好多少。因此使用「池」必須考慮其面臨的響應規模,並根據響應規模調整「池」的大小。
對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。由於多線程/進程致使過多的佔用內存或 CPU等系統資源。
三、非阻塞的服務器模型
以上面臨的不少問題,必定程度是 IO 接口的阻塞特性致使的。多線程是一個解決方案,還一個方案就是使用非阻塞的接口。
非阻塞的接口相比於阻塞型接口的顯著差別在於,在被調用以後當即返回。使用以下的函數能夠將某句柄 fd 設爲非阻塞狀態。
咱們可使用 fcntl(fd, F_SETFL, flag | O_NONBLOCK); 將套接字標誌變成非阻塞:
fcntl( fd, F_SETFL, O_NONBLOCK );
下面將給出只用一個線程,但可以同時從多個鏈接中檢測數據是否送達,而且接受數據。
在非阻塞狀態下,recv() 接口在被調用後當即返回,返回值表明了不一樣的含義。
調用recv,若是設備暫時沒有數據可讀就返回-1,同時置errno爲EWOULDBLOCK(或者EAGAIN,這兩個宏定義的值相同),表示原本應該阻塞在這裏(would block,虛擬語氣),事實上並無阻塞而是直接返回錯誤,調用者應該試着再讀一次(again)。這種行爲方式稱爲輪詢(Poll),調用者只是查詢一下,而不是阻塞在這裏死等
如在本例中,
這樣能夠同時監視多個設備:
while(1){
非阻塞read(設備1);
if(設備1有數據到達)
處理數據;
非阻塞read(設備2);
if(設備2有數據到達)
處理數據;
..............................
}
若是read(設備1)是阻塞的,那麼只要設備1沒有數據到達就會一直阻塞在設備1的read調用上,即便設備2有數據到達也不能處理,使用非阻塞I/O就能夠避免設備2得不到及時處理。
非阻塞I/O有一個缺點,若是全部設備都一直沒有數據到達,調用者須要反覆查詢作無用功,若是阻塞在那裏,操做系統能夠調度別的進程執行,就不會作無用功了,在實際應用中非阻塞I/O模型比較少用。
能夠看到服務器線程能夠經過循環調用 recv() 接口,能夠在單個線程內實現對全部鏈接的數據接收工做。
可是上述模型毫不被推薦。由於,循環調用 recv() 將大幅度推高 CPU 佔用率;此外,在這個方案中,recv() 更多的是起到檢測「操做是否完成」的做用,實際操做系統提供了更爲高效的檢測「操做是否完成「做用的接口,例如 select()。
四、IO複用事件驅動服務器模型
簡介:主要是select和epoll;對一個IO端口,兩次調用,兩次返回,比阻塞IO並無什麼優越性;關鍵是能實現同時對多個IO端口進行監聽;
I/O複用模型會用到select、poll、epoll函數,這幾個函數也會使進程阻塞,可是和阻塞I/O所不一樣的的,這兩個函數能夠同時阻塞多個I/O操做。並且能夠同時對多個讀操做,多個寫操做的I/O函數進行檢測,直到有數據可讀或可寫時,才真正調用I/O操做函數。
咱們先詳解select:
SELECT函數進行IO複用服務器模型的原理是:當一個客戶端鏈接上服務器時,服務器就將其鏈接的fd加入fd_set集合,等到這個鏈接準備好讀或寫的時候,就通知程序進行IO操做,與客戶端進行數據通訊。
大部分 Unix/Linux 都支持 select 函數,該函數用於探測多個文件句柄的狀態變化。
FD_ZERO(int fd, fd_set* fds) FD_SET(int fd, fd_set* fds) FD_ISSET(int fd, fd_set* fds) FD_CLR(int fd, fd_set* fds) int select( int maxfdp, //Winsock中此參數無心義 fd_set* readfds, //進行可讀檢測的Socket fd_set* writefds, //進行可寫檢測的Socket fd_set* exceptfds, //進行異常檢測的Socket const struct timeval* timeout //非阻塞模式中設置最大等待時間 )
參數列表:
int maxfdp :是一個整數值,意思是「最大fd加1(max fd plus 1). 在三個描述符集(readfds, writefds, exceptfds)中找出最高描述符
編號值,而後加 1也可將maxfdp設置爲 FD_SETSIZE,這是一個< sys/types.h >中的常數,它說明了最大的描述符數(常常是 256或1024) 。可是對大多數應用程序而言,此值太大了。確實,大多數應用程序只應用 3 ~ 1 0個描述符。若是將第三個參數設置爲最高描述符編號值加 1,內核就只需在此範圍內尋找打開的位,而沒必要在數百位的大範圍內搜索。
fd_set *readfds: 是指向fd_set結構的指針,這個集合中應該包括文件描述符,咱們是要監視這些文件描述符的讀變化的,即咱們關
心是否能夠從這些文件中讀取數據了,若是這個集合中有一個文件可讀,select就會返回一個大於0的值,表示有文件可讀,若是沒有可讀的文件,則根據timeout參數再判斷是否超時,若超出timeout的時間,select返回0,若發生錯誤返回負值。能夠傳入NULL值,表示不關心任何文件的讀變化。
fd_set *writefds: 是指向fd_set結構的指針,這個集合中應該包括文件描述符,咱們是要監視這些文件描述符的寫變化的,即咱們關
心是否能夠向這些文件中寫入數據了,若是這個集合中有一個文件可寫,select就會返回一個大於0的值,表示有文件可寫,若是沒有可寫的文件,則根據timeout參數再判斷是否超時,若超出timeout的時間,select返回0,若發生錯誤返回負值。能夠傳入NULL值,表示不關心任何文件的寫變化。
fd_set *errorfds: 同上面兩個參數的意圖,用來監視文件錯誤異常。
readfds , writefds,*errorfds每一個描述符集存放在一個fd_set 數據類型中.如圖:
struct timeval* timeout :是select的超時時間,這個參數相當重要,它可使select處於三種狀態:
第一,若將NULL以形參傳入,即不傳入時間結構,就是將select置於阻塞狀態,必定等到監視文件描述符集合中某個文件描述符發生變化爲止;
第二,若將時間值設爲0秒0毫秒,就變成一個純粹的非阻塞函數,無論文件描述符是否有變化,都馬上返回繼續執行,文件無變化返回0,有變化返回一個正值;
第三,timeout的值大於0,這就是等待的超時時間,即 select在timeout時間內阻塞,超時時間以內有事件到來就返回了,不然在超時後無論怎樣必定返回,返回值同上述。
/* 可讀、可寫、異常三種文件描述符集的申明和初始化。*/ fd_set readfds, writefds, exceptionfds; FD_ZERO(&readfds); FD_ZERO(&writefds); FD_ZERO(&exceptionfds); int max_fd; /* socket配置和監聽。*/ sock = socket(...); bind(sock, ...); listen(sock, ...); /* 對socket描述符上發生關心的事件進行註冊。*/ FD_SET(&readfds, sock); max_fd = sock; while(1) { int i; fd_set r,w,e; /* 爲了重複使用readfds 、writefds、exceptionfds,將它們拷貝到臨時變量內。*/ memcpy(&r, &readfds, sizeof(fd_set)); memcpy(&w, &writefds, sizeof(fd_set)); memcpy(&e, &exceptionfds, sizeof(fd_set)); /* 利用臨時變量調用select()阻塞等待,timeout=null表示等待時間爲永遠等待直到發生事件。*/ select(max_fd + 1, &r, &w, &e, NULL); /* 測試是否有客戶端發起鏈接請求,若是有則接受並把新建的描述符加入監控。*/ if(FD_ISSET(&r, sock)){ new_sock = accept(sock, ...); FD_SET(&readfds, new_sock); FD_SET(&writefds, new_sock); max_fd = MAX(max_fd, new_sock); } /* 對其它描述符發生的事件進行適當處理。描述符依次遞增,最大值各系統有所不一樣(好比在做者系統上最大爲1024), 在linux能夠用命令ulimit -a查看(用ulimit命令也對該值進行修改)。 在freebsd下,用sysctl -a | grep kern.maxfilesperproc來查詢和修改。*/ for(i= sock+1; i <max_fd+1; ++i) { if(FD_ISSET(&r, i)) doReadAction(i); if(FD_ISSET(&w, i)) doWriteAction(i); } }
FD_ZERO(int fd, fd_set* fds) //清除其全部位
FD_SET(int fd, fd_set* fds) //在某 fd_set 中標記一個fd的對應位爲1
FD_ISSET(int fd, fd_set* fds) // 測試該集中的一個給定位是否仍舊設置
FD_CLR(int fd, fd_set* fds) //刪除對應位
這裏,fd_set 類型能夠簡單的理解爲按 bit 位標記句柄的隊列,例如要在某 fd_set 中標記一個值爲 16 的句柄,則該 fd_set 的第 16 個 bit 位被標記爲 1。具體的置位、驗證可以使用 FD_SET、FD_ISSET 等宏實現。
例如,編寫下列代碼:
fd_setreadset,writeset; FD_ZERO(&readset); FD_ZERO(&writeset); FD_SET(0,&readset); FD_SET(3,&readset); FD_SET(1,&writeset); FD_SET(2,&writeset); select(4,&readset,&writeset,NULL,NULL);而後,下圖顯示了這兩個描述符集的狀況:
由於描述符編號從0開始,因此要在最大描述符編號值上加1。第一個參數其實是要檢查的描述符數(從描述符0開始)。
下面將從新模擬上例中從多個客戶端接收數據的模型。
使用select()的接收數據模型上述模型只是描述了使用 select() 接口同時從多個客戶端接收數據的過程;因爲 select() 接口能夠同時對多個句柄進行讀狀態、寫狀態和錯誤狀態的探測,因此能夠很容易構建爲多個客戶端提供獨立問答服務的服務器系統。
使用select()接口的基於事件驅動的服務器模型
這裏須要指出的是,客戶端的一個 connect() 操做,將在服務器端激發一個「可讀事件」,因此 select() 也能探測來自客戶端的 connect() 行爲。
上述模型中,最關鍵的地方是如何動態維護 select() 的三個參數 readfds、writefds 和 exceptfds。做爲輸入參數,readfds 應該標記全部的須要探測的「可讀事件」的句柄,其中永遠包括那個探測 connect() 的那個「母」句柄;同時,writefds 和 exceptfds 應該標記全部須要探測的「可寫事件」和「錯誤事件」的句柄 ( 使用 FD_SET() 標記 )。
做爲輸出參數,readfds、writefds 和 exceptfds 中的保存了 select() 捕捉到的全部事件的句柄值。程序員須要檢查的全部的標記位 ( 使用 FD_ISSET() 檢查 ),以肯定到底哪些句柄發生了事件。
上述模型主要模擬的是「一問一答」的服務流程,因此,若是 select() 發現某句柄捕捉到了「可讀事件」,服務器程序應及時作 recv() 操做,並根據接收到的數據準備好待發送數據,並將對應的句柄值加入 writefds,準備下一次的「可寫事件」的 select() 探測。一樣,若是 select() 發現某句柄捕捉到「可寫事件」,則程序應及時作 send() 操做,並準備好下一次的「可讀事件」探測準備。下圖描述的是上述模型中的一個執行週期。
一個執行週期
這種模型的特徵在於每個執行週期都會探測一次或一組事件,一個特定的事件會觸發某個特定的響應。咱們能夠將這種模型歸類爲「事件驅動模型」。
相比其餘模型,使用 select() 的事件驅動模型只用單線程(進程)執行,佔用資源少,不消耗太多 CPU,同時可以爲多客戶端提供服務。若是試圖創建一個簡單的事件驅動的服務器程序,這個模型有必定的參考價值。但這個模型依舊有着不少問題。
select的缺點:
(1)單個進程可以監視的文件描述符的數量存在最大限制
(2)select須要複製大量的句柄數據結構,產生巨大的開銷
(3)select返回的是含有整個句柄的列表,應用程序須要消耗大量時間去輪詢各個句柄才能發現哪些句柄發生了事件
(4)select的觸發方式是水平觸發,應用程序若是沒有完成對一個已經就緒的文件描述符進行IO操做,那麼以後每次select調用仍是會將這些文件描述符通知進程。相對應方式的是邊緣觸發。
(6) 該模型將事件探測和事件響應夾雜在一塊兒,一旦事件響應的執行體龐大,則對整個模型是災難性的。以下例,龐大的執行體 1 的將直接致使響應事件 2 的執行體遲遲得不到執行,並在很大程度上下降了事件探測的及時性。
龐大的執行體對使用select()的事件驅動模型的影響
不少操做系統提供了更爲高效的接口,如 linux 提供了 epoll,BSD 提供了 kqueue,Solaris 提供了 /dev/poll …。若是須要實現更高效的服務器程序,相似 epoll 這樣的接口更被推薦。
/* 新建並初始化文件描述符集。*/ struct epoll_event ev; struct epoll_event events[MAX_EVENTS]; /* 建立epoll句柄。*/ int epfd = epoll_create(MAX_EVENTS); /* socket配置和監聽。*/ sock = socket(...); bind(sock, ...); listen(sock, ...); /* 對socket描述符上發生關心的事件進行註冊。*/ ev.events = EPOLLIN; ev.data.fd = sock; epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev); while(1) { int i; /*調用epoll_wait()阻塞等待,等待時間爲永遠等待直到發生事件。*/ int n = epoll_wait(epfd, events, MAX_EVENTS, -1); for(i=0; i <n; ++i) { /* 測試是否有客戶端發起鏈接請求,若是有則接受並把新建的描述符加入監控。*/ if(events.data.fd == sock) { if(events.events & POLLIN){ new_sock = accept(sock, ...); ev.events = EPOLLIN | POLLOUT; ev.data.fd = new_sock; epoll_ctl(epfd, EPOLL_CTL_ADD, new_sock, &ev); } }else{ /* 對其它描述符發生的事件進行適當處理。*/ if(events.events & POLLIN) doReadAction(i); if(events.events & POLLOUT) doWriteAction(i); } } }
這兩個詞來源於計算機硬件設計。它們的區別是隻要句柄知足某種狀態,水平觸發就會發出通知;而只有當句柄狀態改變時,邊緣觸發纔會發出通知。例如一個socket通過長時間等待後接收到一段100k的數據,兩種觸發方式都會向程序發出就緒通知。假設程序從這個socket中讀取了50k數據,並再次調用監聽函數,水平觸發依然會發出就緒通知,而邊緣觸發會由於socket「有數據可讀」這個狀態沒有發生變化而不發出通知且陷入長時間的等待。
所以在使用邊緣觸發的 api 時,要注意每次都要讀到 socket返回 EWOULDBLOCK爲止
遺憾的是不一樣的操做系統特供的 epoll 接口有很大差別,因此使用相似於 epoll 的接口實現具備較好跨平臺能力的服務器會比較困難。
幸運的是,有不少高效的事件驅動庫能夠屏蔽上述的困難,常見的事件驅動庫有 libevent 庫,還有做爲 libevent 替代者的 libev 庫。這些庫會根據操做系統的特色選擇最合適的事件探測接口,而且加入了信號 (signal) 等技術以支持異步響應,這使得這些庫成爲構建事件驅動模型的不二選擇。下章將介紹如何使用 libev 庫替換 select 或 epoll 接口,實現高效穩定的服務器模型。
五、libevent方法
libevent是一個事件觸發的網絡庫,適用於windows、linux、bsd等多種平臺,內部使用select、epoll、kqueue等系統調用管理事件機制。著名分佈式緩存軟件memcached也是libevent based,並且libevent在使用上能夠作到跨平臺,並且根據libevent官方網站上公佈的數據統計,彷佛也有着非凡的性能。
libevent 庫實際上沒有更換 select()
、poll()
或其餘機制的基礎。而是使用對於每一個平臺最高效的高性能解決方案在實現外加上一個包裝器。
爲了實際處理每一個請求,libevent 庫提供一種事件機制,它做爲底層網絡後端的包裝器。事件系統讓爲鏈接添加處理函數變得很是簡便,同時下降了底層 I/O 複雜性。這是 libevent 系統的核心。
libevent 庫的其餘組件提供其餘功能,包括緩衝的事件系統(用於緩衝發送到客戶端/從客戶端接收的數據)以及 HTTP、DNS 和 RPC 系統的核心實現。
1)事件驅動,高性能;
2)輕量級,專一於網絡;
3) 跨平臺,支持 Windows、Linux、Mac Os等;
4) 支持多種 I/O多路複用技術, epoll、poll、dev/poll、select 和kqueue 等;
5) 支持 I/O,定時器和信號等事件
1)event 及 event_base事件管理包括各類IO(socket)、定時器、信號等事件,也是libevent應用最廣的模塊;
2 ) evbuffer event 及 event_base 緩存管理是指evbuffer功能;提供了高效的讀寫方法
3) evdns DNS是libevent提供的一個異步DNS查詢功能;
4) evhttp HTTP是libevent的一個輕量級http實現,包括服務器和客戶端
libevent也支持ssl,這對於有安全需求的網絡程序很是的重要,可是其支持不是很完善,好比http server的實現就不支持ssl。
libevent是事件驅動的庫,所謂事件驅動,簡單地說就是你點什麼按鈕(即產生什麼事件),電腦執行什麼操做(即調用什麼函數)。
Libevent框架本質上是一個典型的Reactor模式,因此只須要弄懂Reactor模型,libevent就八九不離十了。
Reactor模式,是一種事件驅動機制。應用程序須要提供相應的接口並註冊到Reactor上,若是相應的事件發生,Reactor將主動調用應用程序註冊的接口,這些接口又稱爲「回調函數」。
在Libevent中也是同樣,向Libevent框架註冊相應的事件和回調函數;當這些事件發生時,Libevent會調用這些回調函數處理相應的事件(I/O讀寫、定時和信號)。
使用Reactor模型,必備的幾個組件:事件源、Reactor框架、多路複用機制和事件處理程序,先來看看Reactor模型的總體框架,接下來再對每一個組件作逐一說明。
1) 事件源
1) 2) event demultiplexer——事件多路分發機制
由操做系統提供的I/O多路複用機制,好比select和epoll。程序首先將其關心的句柄(事件源)及其事件註冊到event demultiplexer上;當有事件到達時,event demultiplexer會發出通知「在已經註冊的句柄集中,一個或多個句柄的事件已經就緒」;程序收到通知後,就能夠在非阻塞的狀況下對事件進行處理了。
對應到libevent中,依然是select、poll、epoll等,可是libevent使用結構體eventop進行了封裝,以統一的接口來支持這些I/O多路複用機制,達到了對外隱藏底層系統機制的目的。
3) Reactor——反應器
Reactor,是事件管理的接口,內部使用event demultiplexer註冊、註銷事件;並運行事件循環,當有事件進入「就緒」狀態時,調用註冊事件的回調函數處理事件。
對應到libevent中,就是event_base結構體。
4) Event Handler——事件處理程序
事件處理程序提供了一組接口,每一個接口對應了一種類型的事件,供Reactor在相應的事件發生時調用,執行相應的事件處理。一般它會綁定一個有效的句柄。
對應到libevent中,就是event結構體。
結合Reactor框架,咱們來理一下libevent的事件處理流程,請看下圖:
event_init() 初始化:
首先要隆重介紹event_base對象:
struct event_base { const struct eventop *evsel; void *evbase; int event_count; /* counts number of total events */ int event_count_active; /* counts number of active events */ int event_gotterm; /* Set to terminate loop */ /* active event management */ struct event_list **activequeues; int nactivequeues; struct event_list eventqueue; struct timeval event_tv; RB_HEAD(event_tree, event) timetree; };
event_base對象整合了事件處理的一些全局變量, 角色是event對象的"總管家", 他包括了:
事件引擎函數對象(evsel, evbase),
當前入列事件列表(event_count, event_count_active, eventqueue),
全局終止信號(event_gotterm),
活躍事件列表(avtivequeues),
事件隊列樹(timetree)...
初始化時建立event_base對象, 選擇 當前OS支持的事件引擎(epoll, poll, select...)並初始化, 建立全局信號隊列(signalqueue), 活躍隊列的內存分配( 根據設置的priority個數,默認爲1).
event_setevent_set來設置event對象,包括全部者event_base對象, fd, 事件(EV_READ| EV_WRITE|EV_PERSIST), 回掉函數和參數,事件優先級是當前event_base的中間級別(current_base->nactivequeues/2)
設置監視事件後,事件處理函數能夠只被調用一次或總被調用。
只調用一次:事件處理函數被調用後,即從事件隊列中刪除,須要在事件處理函數中再次加入事件,才能在下次事件發生時被調用;
總被調用:設置爲EV_PERSIST,只加入一次,處理函數總被調用,除非採用event_remove顯式地刪除。
event_add() 事件添加:
int event_add(struct event *ev, struct timeval *tv)
這個接口有兩個參數, 第一個是要添加的事件, 第二個參數做爲事件的超時值(timer). 若是該值非NULL, 在添加本事件的同時添加超時事件(EV_TIMEOUT)到時間隊列樹(timetree), 根據事件類型處理以下:
EV_READ => EVLIST_INSERTED => eventqueue
EV_WRITE => EVLIST_INSERTED => eventqueue
EV_TIMEOUT => EVLIST_TIMEOUT => timetree
EV_SIGNAL => EVLIST_SIGNAL => signalqueue
event_base_loop() 事件處理主循環
這裏是事件的主循環,只要flags不是設置爲EVLOOP_NONBLOCK, 該函數就會一直循環監聽事件/處理事件.
每次循環過程當中, 都會處理當前觸發(活躍)事件:
(a). 檢測當前是否有信號處理(gotterm, gotsig), 這些都是全局參數,不適合多線程
(b). 時間更新,找到離當前最近的時間事件, 獲得相對超時事件tv
(c). 調用事件引擎的dispatch wait事件觸發, 超時值爲tv, 觸發事件添加到activequeues
(d). 處理活躍事件, 調用caller的callbacks (event_process_acitve)
典型的libevent的應用大體總體流程:
建立 libevent 服務器的基本方法是, 註冊當發生某一操做(好比接受來自客戶端的鏈接)時應該執行的函數,而後調用主事件循環event_dispatch()
。執行過程的控制如今由 libevent 系統處理。註冊事件和將調用的函數以後,事件系統開始自治;在應用程序運行時,能夠在事件隊列中添加(註冊)或刪除(取消註冊)事件。事件註冊很是方便,能夠經過它添加新事件以處理新打開的鏈接,從而構建靈活的網絡處理系統
(環境設置)-> (建立event_base) -> (註冊event,將此event加入到event_base中) -> (設置event各類屬性,事件等) ->(將event加入事件列表 addevent) ->(開始事件監視循環、分發dispatch)。
例子:
例如,能夠打開一個監聽套接字,而後註冊一個回調函數,每當須要調用 accept()
函數以打開新鏈接時調用這個回調函數,這樣就建立了一個網絡服務器。例1以下所示的代碼片斷說明基本過程:
例1:打開監聽套接字,註冊一個回調函數(每當須要調用 accept() 函數以打開新鏈接時調用它),由此建立網絡服務器:
#include <stdio.h> #include <string.h> #include <iostream> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <netdb.h> #include <event.h> using namespace std; // 事件base struct event_base* base; // 讀事件回調函數 void onRead(int iCliFd, short iEvent, void *arg) { int iLen; char buf[1500]; iLen = recv(iCliFd, buf, 1500, 0); if (iLen <= 0) { cout << "Client Close" << endl; // 鏈接結束(=0)或鏈接錯誤(<0),將事件刪除並釋放內存空間 struct event *pEvRead = (struct event*)arg; event_del(pEvRead); delete pEvRead; close(iCliFd); return; } buf[iLen] = 0; cout << "Client Info:" << buf << endl; } // 鏈接請求事件回調函數 void onAccept(int iSvrFd, short iEvent, void *arg) { int iCliFd; struct sockaddr_in sCliAddr; socklen_t iSinSize = sizeof(sCliAddr); iCliFd = accept(iSvrFd, (struct sockaddr*)&sCliAddr, &iSinSize); // 鏈接註冊爲新事件 (EV_PERSIST爲事件觸發後不默認刪除) struct event *pEvRead = new event; event_set(pEvRead, iCliFd, EV_READ|EV_PERSIST, onRead, pEvRead); event_base_set(base, pEvRead); event_add(pEvRead, NULL); } int main() { int iSvrFd; struct sockaddr_in sSvrAddr; memset(&sSvrAddr, 0, sizeof(sSvrAddr)); sSvrAddr.sin_family = AF_INET; sSvrAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); sSvrAddr.sin_port = htons(8888); // 建立tcpSocket(iSvrFd),監聽本機8888端口 iSvrFd = socket(AF_INET, SOCK_STREAM, 0); bind(iSvrFd, (struct sockaddr*)&sSvrAddr, sizeof(sSvrAddr)); listen(iSvrFd, 10); // 初始化base base = event_base_new(); struct event evListen; // 設置事件 event_set(&evListen, iSvrFd, EV_READ|EV_PERSIST, onAccept, NULL); // 設置爲base事件 event_base_set(base, &evListen); // 添加事件 event_add(&evListen, NULL); // 事件循環 event_base_dispatch(base); return 0; }
event_set()
函數建立新的事件結構,
event_add()
在事件隊列機制中添加事件。
而後,event_dispatch()
啓動事件隊列系統,開始監聽(並接受)請求。
儘管 C 語言很適合許多系統應用程序,可是在現代環境中不常用 C 語言,腳本語言更靈活、更實用。幸運的是,Perl 和 PHP 等大多數腳本語言是用 C 編寫的,因此能夠經過擴展模塊使用 libevent 等 C 庫。
四、libev庫
官方文檔:http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod
與 libevent 同樣,libev 系統也是基於事件循環的系統,它在 poll()
、select()
等機制的本機實現的基礎上提供基於事件的循環。
libev是libevent以後的一個事件驅動的編程框架,其接口和libevent基本相似。據官方介紹,其性能比libevent還要高,bug比libevent還少。
libev API 比較原始,沒有 HTTP 包裝器,可是 libev 支持在實現中內置更多事件類型。例如,一種 evstat 實現能夠監視多個文件的屬性變更,能夠在 例4 所示的 HTTP 文件解決方案中使用它。
可是,libevent 和 libev 的基本過程是相同的。建立所需的網絡監聽套接字,註冊在執行期間要調用的事件,而後啓動主事件循環,讓 libev 處理過程的其他部分。
Libev是一個event loop:向libev註冊感興趣的events,好比Socket可讀事件,libev會對所註冊的事件的源進行管理,並在事件發生時觸發相應的程序。
事件驅動框架:
定義一個監控器、書寫觸發動做邏輯、初始化監控器、設置監控器觸發條件、將監控器加入大事件驅動器的循環中便可。
libev的事件驅動過程能夠想象成以下的僞代碼:
do_some_init() is_run = True while is_run: t = caculate_loop_time() deal_loop(t) deal_with_pending_event() do_some_clear()
首先作一些初始化操做,而後進入到循環中,該循環經過一個狀態位來控制是否執行。
在循環中,計算出下一次輪詢的時間,這裏輪詢的實現就採用了系統提供的epoll、kqueue等機制。
再輪詢結束後檢查有哪些監控器的被觸發了,依次執行觸發動做。
Libev 除了提供了基本的三大類事件(IO事件、定時器事件、信號事件)外還提供了週期事件、子進程事件、文件狀態改變事件等多個事件。
libev所實現的功能就是一個強大的reactor,可能notify事件主要包括下面這些:
libev 一樣須要循環探測事件是否產生。Libev 的循環體用 ev_loop 結構來表達,並用 ev_loop( ) 來啓動。
void ev_loop( ev_loop* loop, int flags ) |
Libev 支持八種事件類型,其中包括 IO 事件。一個 IO 事件用 ev_io 來表徵,並用 ev_io_init() 函數來初始化:
void ev_io_init(ev_io *io, callback, int fd, int events) |
初始化內容包括回調函數 callback,被探測的句柄 fd 和須要探測的事件,EV_READ 表「可讀事件」,EV_WRITE 表「可寫事件」。
如今,用戶須要作的僅僅是在合適的時候,將某些 ev_io 從 ev_loop 加入或剔除。一旦加入,下個循環即會檢查 ev_io 所指定的事件有否發生;若是該事件被探測到,則 ev_loop 會自動執行 ev_io 的回調函數 callback();若是 ev_io 被註銷,則再也不檢測對應事件。
不管某 ev_loop 啓動與否,均可以對其添加或刪除一個或多個 ev_io,添加刪除的接口是 ev_io_start() 和 ev_io_stop()。
void ev_io_start( ev_loop *loop, ev_io* io ) void ev_io_stop( EV_A_* ) |
由此,咱們能夠容易得出以下的「一問一答」的服務器模型。因爲沒有考慮服務器端主動終止鏈接機制,因此各個鏈接能夠維持任意時間,客戶端能夠自由選擇退出時機。
IO事件、定時器事件、信號事件:
#include<ev.h> #include <stdio.h> #include <signal.h> #include <sys/unistd.h> ev_io io_w; ev_timer timer_w; ev_signal signal_w; void io_action(struct ev_loop *main_loop,ev_io *io_w,int e) { int rst; char buf[1024] = {''}; puts("in io cb\n"); read(STDIN_FILENO,buf,sizeof(buf)); buf[1023] = ''; printf("Read in a string %s \n",buf); ev_io_stop(main_loop,io_w); } void timer_action(struct ev_loop *main_loop,ev_timer *timer_w,int e) { puts("in tiemr cb \n"); ev_timer_stop(main_loop,io_w); } void signal_action(struct ev_loop *main_loop,ev_signal signal_w,int e) { puts("in signal cb \n"); ev_signal_stop(main_loop,io_w); ev_break(main_loop,EVBREAK_ALL); } int main(int argc ,char *argv[]) { struct ev_loop *main_loop = ev_default_loop(0); ev_init(&io_w,io_action); ev_io_set(&io_w,STDIN_FILENO,EV_READ); ev_init(&timer_w,timer_action); ev_timer_set(&timer_w,2,0); ev_init(&signal_w,signal_action); ev_signal_set(&signal_w,SIGINT); ev_io_start(main_loop,&io_w); ev_timer_start(main_loop,&timer_w); ev_signal_start(main_loop,&signal_w); ev_run(main_loop,0); return 0; }
這裏使用了3種事件監控器,分別監控IO事件、定時器事件以及信號事件。所以定義了3個監控器(watcher),以及觸發監控器時要執行動做的回調函數。Libev定義了多種監控器,命名方式爲ev_xxx
這裏xxx表明監控器類型,其實現是一個結構體,
typedef struct ev_io { .... } ev_io;
經過宏定義能夠簡寫爲 ev_xxx
。回調函數的類型爲 void cb_name(struct ev_loop *main_loop,ev_xxx *io_w,int event)
。
在main中,首先定義了一個事件驅動器的結構 struct ev_loop *main_loop
這裏調用 ev_default_loop(0)
生成一個預製的全局驅動器。這裏能夠參考Manual中的選擇。而後依次初始化各個監控器以及設置監控器的觸發條件。
初始化監控器的過程是將相應的回調函數即觸發時的動做註冊到監控器上。
設置觸發條件則是該條件產生時纔去執行註冊到監控器上的動做。對於IO事件,通常是設置特定fd上的的可讀或可寫事件,定時器則是多久後觸發。這裏定時器的觸發條件中還有第三參數,表示第一次觸發後,是否循環,若爲0則吧循環,不然按該值循環。信號觸發器則是設置觸發的信號。
在初始化並設置好觸發條件後,先調用ev_xxx_start
將監控器註冊到事件驅動器上。接着調用 ev_run
開始事件驅動器。
上述模型能夠接受任意多個鏈接,且爲各個鏈接提供徹底獨立的問答服務。藉助 libev 提供的事件循環 / 事件驅動接口,上述模型有機會具有其餘模型不能提供的高效率、低資源佔用、穩定性好和編寫簡單等特色。
因爲傳統的 web 服務器,ftp 服務器及其餘網絡應用程序都具備「一問一答」的通信邏輯,因此上述使用 libev 庫的「一問一答」模型對構建相似的服務器程序具備參考價值;另外,對於須要實現遠程監視或遠程遙控的應用程序,上述模型一樣提供了一個可行的實現方案。
php-了libev擴展socket:<?php /* 使用異步io訪問socket Use some async I/O to access a socket */ // `sockets' extension still logs warnings // for EINPROGRESS, EAGAIN/EWOULDBLOCK etc. error_reporting(E_ERROR); $e_nonblocking = array (/*EAGAIN or EWOULDBLOCK*/11, /*EINPROGRESS*/115); // Get the port for the WWW service $service_port = getservbyname('www', 'tcp'); // Get the IP address for the target host $address = gethostbyname('google.co.uk'); // Create a TCP/IP socket $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket === FALSE) { echo \"socket_create() failed: reason: \" .socket_strerror(socket_last_error()) . \"n\"; } // Set O_NONBLOCK flag socket_set_nonblock($socket); // Abort on timeout $timeout_watcher = new EvTimer(10.0, 0., function () use ($socket) { socket_close($socket); Ev::stop(Ev::BREAK_ALL); }); // Make HEAD request when the socket is writable $write_watcher = new EvIo($socket, Ev::WRITE, function ($w) use ($socket, $timeout_watcher, $e_nonblocking) { // Stop timeout watcher $timeout_watcher->stop(); // Stop write watcher $w->stop(); $in = \"HEAD / HTTP/1.1rn\"; $in .= \"Host: google.co.ukrn\"; $in .= \"Connection: Closernrn\"; if (!socket_write($socket, $in, strlen($in))) { trigger_error(\"Failed writing $in to socket\", E_USER_ERROR); } $read_watcher = new EvIo($socket, Ev::READ, function ($w, $re) use ($socket, $e_nonblocking) { // Socket is readable. recv() 20 bytes using non-blocking mode $ret = socket_recv($socket, $out, 20, MSG_DONTWAIT); if ($ret) { echo $out; } elseif ($ret === 0) { // All read $w->stop(); socket_close($socket); return; } // Caught EINPROGRESS, EAGAIN, or EWOULDBLOCK if (in_array(socket_last_error(), $e_nonblocking)) { return; } $w->stop(); socket_close($socket); }); Ev::run(); }); $result = socket_connect($socket, $address, $service_port); Ev::run(); ?>
libevent 和 libev 都提供靈活且強大的環境,支持爲處理服務器端或客戶端請求實現高性能網絡(和其餘 I/O)接口。目標是以高效(CPU/RAM 使用量低)的方式支持數千甚至數萬個鏈接。在本文中,您看到了一些示例,包括 libevent 中內置的 HTTP 服務,可使用這些技術支持基於 IBM Cloud、EC2 或 AJAX 的 web 應用程序。
參考:
http://www.ibm.com/developerworks/cn/linux/l-cn-edntwk/
http://www.ibm.com/developerworks/cn/aix/library/au-libev/