本部分的Github地址爲:這裏php
在傳統的網絡服務器的構建中,IO模式會按照Blocking/Non-Blocking、Synchronous/Asynchronous這兩個標準進行分類,其中Blocking與Synchronous基本上一個意思,而NIO與Async的區別在於NIO強調的是Polling(輪詢),而Async強調的是Notification(通知)。譬如在一個典型的單進程單線程Socket接口中,阻塞型的接口必須在上一個Socket鏈接關閉以後才能接入下一個Socket鏈接。而對於NIO的Socket而言,Server Application會從內核獲取到一個特殊的"Would Block"錯誤信息,可是並不會阻塞到等待發起請求的Socket Client中止。通常來講,在Linux系統中能夠經過調用獨立的select
或者poll
方法來遍歷全部讀取好的數據,而且進行寫操做。而對於異步Socket而言(譬如Windows中的Sockets或者.Net中實現的Sockets模型),Server Application會告訴IO Framework去讀取某個Socket數據,在數據讀取完畢以後IO Framework會自動地調用你的回調(也就是通知應用程序自己數據已經準備好了)。以IO多路複用中的Reactor與Proactor模型爲例,非阻塞的模型是須要應用程序自己處理IO的,而異步模型則是由Kernel或者Framework將數據準備好讀入緩衝區中,應用程序直接從緩衝區讀取數據。html
總結一下:java
同步阻塞:在此種方式下,用戶進程在發起一個IO操做之後,必須等待IO操做的完成,只有當真正完成了IO操做之後,用戶進程才能運行。react
同步非阻塞:在此種方式下,用戶進程發起一個IO操做之後邊可返回作其它事情,可是用戶進程須要時不時的詢問IO操做是否就緒,這就要求用戶進程不停的去詢問,從而引入沒必要要的CPU資源浪費。linux
異步非阻塞:在此種模式下,用戶進程只須要發起一個IO操做而後當即返回,等IO操做真正的完成之後,應用程序會獲得IO操做完成的通知,此時用戶進程只須要對數據進行處理就行了,不須要進行實際的IO讀寫操做,由於真正的IO讀取或者寫入操做已經由內核完成了。git
而在併發IO的問題中,較常見的就是所謂的C10K問題,即有10000個客戶端須要連上一個服務器並保持TCP鏈接,客戶端會不定時的發送請求給服務器,服務器收到請求後需及時處理並返回結果。github
BIO即同步阻塞式IO,是面向流的,阻塞式的,串行的一個過程。對每個客戶端的socket鏈接,都須要一個線程來處理,並且在此期間這個線程一直被佔用,直到socket關閉。redis
採用BIO通訊模型的服務端,一般由一個獨立的Acceptor線程負責監聽客戶端的鏈接,接收到客戶端鏈接以後爲客戶端鏈接建立一個新的線程處理請求消息,處理完成以後,返回應答消息給客戶端,線程銷燬,這就是典型的一請求一應答模型。該架構最大的問題就是不具有彈性伸縮能力,當併發訪問量增長後,服務端的線程個數和併發訪問數成線性正比,因爲線程是JAVA虛擬機很是寶貴的系統資源,當線程數膨脹以後,系統的性能急劇降低,隨着併發量的繼續增長,可能會發生句柄溢出、線程堆棧溢出等問題,並致使服務器最終宕機。編程
還有一些並非因爲併發數增長而致使的系統負載增長:鏈接服務器的一些客戶端,因爲網絡或者自身性能處理的問題,接收端從socket讀取數據的速度跟不上發送端寫入數據的速度。 而在TCP/IP網絡編程過程當中,已經發送出去的數據依然須要暫存在send buffer,只有收到對方的ack,kernel才從buffer中清除這一部分數據,爲後續發送數據騰出空間。接收端將收到的數據暫存在receive buffer中,自動進行確認。但若是socket所在的進程不及時將數據從receive buffer中取出,最終致使receive buffer填滿,因爲TCP的滑動窗口和擁塞控制,接收端會阻止發送端向其發送數據。segmentfault
做爲發送端,服務器因爲遲遲不能釋放被佔用的線程,致使內存佔用率不斷升高,堆回收的效率愈來愈低,致使Full GC,最終致使服務宕機。
本文初說起,BIO的一個缺陷在於某個Socket在其鏈接到斷上期間會獨佔線程,那麼解決這個問題的一個樸素想法就是利用多進程多線程的辦法,便是建立一個新的線程來處理新的鏈接,這樣就保證了併發IO的實現。本節便是對這種思路進行分析。
最先的服務器端程序都是經過多進程、多線程來解決併發IO的問題。進程模型出現的最先,從Unix系統誕生就開始有了進程的概念。最先的服務器端程序通常都是Accept一個客戶端鏈接就建立一個進程,而後子進程進入循環同步阻塞地與客戶端鏈接進行交互,收發處理數據。
多線程模式出現要晚一些,線程與進程相比更輕量,並且線程之間是共享內存堆棧的,因此不一樣的線程之間交互很是容易實現。好比聊天室這樣的程序,客戶端鏈接之間能夠交互,比聊天室中的玩家能夠任意的其餘人發消息。用多線程模式實現很是簡單,線程中能夠直接讀寫某一個客戶端鏈接。而多進程模式就要用到管道、消息隊列、共享內存實現數據交互,統稱進程間通訊(IPC)複雜的技術才能實現。
多進程/線程模型的流程以下:
建立一個 socket,綁定服務器端口(bind),監聽端口(listen),在PHP中用stream_socket_server一個函數就能完成上面3個步驟,固然也可使用php sockets擴展分別實現。
進入while循環,阻塞在accept操做上,等待客戶端鏈接進入。此時程序會進入隨眠狀態,直到有新的客戶端發起connect到服務器,操做系統會喚醒此進程。accept函數返回客戶端鏈接的socket
主進程在多進程模型下經過fork(php: pcntl_fork)建立子進程,多線程模型下使用pthread_create(php: new Thread)建立子線程。下文如無特殊聲明將使用進程同時表示進程/線程。
子進程建立成功後進入while循環,阻塞在recv(php: fread)調用上,等待客戶端向服務器發送數據。收到數據後服務器程序進行處理而後使用send(php: fwrite)向客戶端發送響應。長鏈接的服務會持續與客戶端交互,而短鏈接服務通常收到響應就會close。
當客戶端鏈接關閉時,子進程退出並銷燬全部資源。主進程會回收掉此子進程。
上文描述的多進程/多線程模型最大的問題是,進程/線程建立和銷燬的開銷很大。因此上面的模式沒辦法應用於很是繁忙的服務器程序。對應的改進版解決了此問題,這就是經典的Leader-Follower模型。
它的特色是程序啓動後就會建立N個進程。每一個子進程進入Accept,等待新的鏈接進入。當客戶端鏈接到服務器時,其中一個子進程會被喚醒,開始處理客戶端請求,而且再也不接受新的TCP鏈接。當此鏈接關閉時,子進程會釋放,從新進入Accept,參與處理新的鏈接。這個模型的優點是徹底能夠複用進程,沒有額外消耗,性能很是好。不少常見的服務器程序都是基於此模型的,好比Apache、PHP-FPM。
多進程模型也有一些缺點。
這種模型嚴重依賴進程的數量解決併發問題,一個客戶端鏈接就須要佔用一個進程,工做進程的數量有多少,併發處理能力就有多少。操做系統能夠建立的進程數量是有限的。
啓動大量進程會帶來額外的進程調度消耗。數百個進程時可能進程上下文切換調度消耗佔CPU不到1%能夠忽略不接,若是啓動數千甚至數萬個進程,消耗就會直線上升。調度消耗可能佔到CPU的百分之幾十甚至100%。
另外有一些場景多進程模型沒法解決,好比即時聊天程序(IM),一臺服務器要同時維持上萬甚至幾十萬上百萬的鏈接(經典的C10K問題),多進程模型就力不從心了。還有一種場景也是多進程模型的軟肋。一般Web服務器啓動100個進程,若是一個請求消耗100ms,100個進程能夠提供1000qps,這樣的處理能力仍是不錯的。可是若是請求內要調用外網Http接口,像QQ、微博登陸,耗時會很長,一個請求須要10s。那一個進程1秒只能處理0.1個請求,100個進程只能達到10qps,這樣的處理能力就太差了。
Unix的5種IO模型:阻塞式IO, 非阻塞式IO,IO複用模型,信號驅動式IO和異步IO。
IO多路複用經過把多個IO的阻塞複用到同一個select的阻塞上,從而使得系統在單線程的狀況能夠同時處理多個客戶端請求。 目前支持IO多路複用的系統調用有select,pselect,poll,epoll,在linux網絡編程過程當中,很長一段時間都是用select作輪詢和網絡事件通知,然而select的一些固有缺陷致使了它的應用受到了很大的限制,最終linux不得不載新的內核版本中尋找select的替代方案,最終選擇了epoll。
Asynchronous IO refers to an interface where you supply a callback to an IO operation, which is invoked when the operation completes. This invocation often happens to an entirely different thread to the one that originally made the request, but this is not necessarily the case. Asynchronous IO is a manifestation of the "proactor" pattern.
併發IO問 題一直是後端編程中的技術挑戰,從最先的同步阻塞Fork進程,到多進程/多線程,到如今的異步IO、協程。
IO多路複用技術通俗闡述,便是由一個線程輪詢每一個鏈接,若是某個鏈接有請求則處理請求,沒有請求則處理下一個鏈接。首先來看下可讀事件與可寫事件:
當以下任一狀況發生時,會產生套接字的可讀事件:
該套接字的接收緩衝區中的數據字節數大於等於套接字接收緩衝區低水位標記的大小;
該套接字的讀半部關閉(也就是收到了FIN),對這樣的套接字的讀操做將返回0(也就是返回EOF);
該套接字是一個監聽套接字且已完成的鏈接數不爲0;
該套接字有錯誤待處理,對這樣的套接字的讀操做將返回-1。
當以下任一狀況發生時,會產生套接字的可寫事件:
該套接字的發送緩衝區中的可用空間字節數大於等於套接字發送緩衝區低水位標記的大小;
該套接字的寫半部關閉,繼續寫會產生SIGPIPE信號;
非阻塞模式下,connect返回以後,該套接字鏈接成功或失敗;
該套接字有錯誤待處理,對這樣的套接字的寫操做將返回-1。
Reactor模型在Linux系統中的具體實現便是select/poll/epoll/kqueue,像Redis中便是採用了Reactor模型實現了單進程單線程高併發。Reactor模型的理論基礎能夠參考reactor-siemens
Handles:表示操做系統管理的資源,咱們能夠理解爲fd。
Synchronous Event Demultiplexer:同步事件分離器,阻塞等待Handles中的事件發生。
Initiation Dispatcher:初始分派器,做用爲添加Event handler(事件處理器)、刪除Event handler以及分派事件給Event handler。也就是說,Synchronous Event Demultiplexer負責等待新事件發生,事件發生時通知Initiation Dispatcher,而後Initiation Dispatcher調用event handler處理事件。
Event Handler:事件處理器的接口
Concrete Event Handler:事件處理器的實際實現,並且綁定了一個Handle。由於在實際狀況中,咱們每每不止一種事件處理器,所以這裏將事件處理器接口和實現分開,與C++、Java這些高級語言中的多態相似。
Reactor模型的基本的處理邏輯爲:
咱們註冊Concrete Event Handler到Initiation Dispatcher中。
Initiation Dispatcher調用每一個Event Handler的get_handle接口獲取其綁定的Handle。
Initiation Dispatcher調用handle_events開始事件處理循環。在這裏,Initiation Dispatcher會將步驟2獲取的全部Handle都收集起來,使用Synchronous Event Demultiplexer來等待這些Handle的事件發生。
當某個(或某幾個)Handle的事件發生時,Synchronous Event Demultiplexer通知Initiation Dispatcher。
Initiation Dispatcher根據發生事件的Handle找出所對應的Handler。
Initiation Dispatcher調用Handler的handle_event方法處理事件。
時序圖以下:
抽象來講,Reactor有4個核心的操做:
add添加socket監聽到reactor,能夠是listen socket也可使客戶端socket,也能夠是管道、eventfd、信號等
set修改事件監聽,能夠設置監聽的類型,如可讀、可寫。可讀很好理解,對於listen socket就是有新客戶端鏈接到來了須要accept。對於客戶端鏈接就是收到數據,須要recv。可寫事件比較難理解一些。一個SOCKET是有緩存區的,若是要向客戶端鏈接發送2M的數據,一次性是發不出去的,操做系統默認TCP緩存區只有256K。一次性只能發256K,緩存區滿了以後send就會返回EAGAIN錯誤。這時候就要監聽可寫事件,在純異步的編程中,必須去監聽可寫才能保證send操做是徹底非阻塞的。
del從reactor中移除,再也不監聽事件
callback就是事件發生後對應的處理邏輯,通常在add/set時制定。C語言用函數指針實現,JS能夠用匿名函數,PHP能夠用匿名函數、對象方法數組、字符串函數名。
Reactor只是一個事件發生器,實際對socket句柄的操做,如connect/accept、send/recv、close是在callback中完成的。具體編碼可參考下面的僞代碼:
Reactor模型還能夠與多進程、多線程結合起來用,既實現異步非阻塞IO,又利用到多核。目前流行的異步服務器程序都是這樣的方式:如
Nginx:多進程Reactor
Nginx+Lua:多進程Reactor+協程
Golang:單線程Reactor+多線程協程
Swoole:多線程Reactor+多進程Worker
協程從底層技術角度看實際上仍是異步IO Reactor模型,應用層自行實現了任務調度,藉助Reactor切換各個當前執行的用戶態線程,但用戶代碼中徹底感知不到Reactor的存在。
Reactor和Proactor模式的主要區別就是真正的讀取和寫入操做是有誰來完成的,Reactor中須要應用程序本身讀取或者寫入數據,而 Proactor模式中,應用程序不須要進行實際的讀寫過程,它只須要從緩存區讀取或者寫入便可,操做系統會讀取緩存區或者寫入緩存區到真正的IO設備。Proactor模型的基本處理邏輯以下:
應用程序初始化一個異步讀取操做,而後註冊相應的事件處理器,此時事件處理器不關注讀取就緒事件,而是關注讀取完成事件,這是區別於Reactor的關鍵。
事件分離器等待讀取操做完成事件。
在事件分離器等待讀取操做完成的時候,操做系統調用內核線程完成讀取操做(異步IO都是操做系統負責將數據讀寫到應用傳遞進來的緩衝區供應用程序操做,操做系統扮演了重要角色),並將讀取的內容放入用戶傳遞過來的緩存區中。這也是區別於Reactor的一點,Proactor中,應用程序須要傳遞緩存區。
事件分離器捕獲到讀取完成事件後,激活應用程序註冊的事件處理器,事件處理器直接從緩存區讀取數據,而不須要進行實際的讀取操做。
select,poll,epoll都是IO多路複用的機制。IO多路複用就經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步IO,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步IO則無需本身負責進行讀寫,異步IO的實現會負責把數據從內核拷貝到用戶空間。
select(int nfds, fd_set *r, fd_set *w, fd_set *e, struct timeval *timeout)
maxfdp1
表示該進程中描述符的總數。
fd_set
則是配合select
模型的重點數據結構,用來存放描述符的集合。
timeout
表示select
返回須要等待的時間。
對於select(),咱們須要傳3個集合,r,w和e。其中,r表示咱們對哪些fd的可讀事件感興趣,w表示咱們對哪些fd的可寫事件感興趣。每一個集合實際上是一個bitmap,經過0/1表示咱們感興趣的fd。例如,咱們對於fd爲6的可讀事件感興趣,那麼r集合的第6個bit須要被 設置爲1。這個系統調用會阻塞,直到咱們感興趣的事件(至少一個)發生。調用返回時,內核一樣使用這3個集合來存放fd實際發生的事件信息。也就是說,調 用前這3個集合表示咱們感興趣的事件,調用後這3個集合表示實際發生的事件。
select爲最先期的UNIX系統調用,它存在4個問題:1)這3個bitmap有大小限制(FD_SETSIZE,一般爲1024);2)因爲 這3個集合在返回時會被內核修改,所以咱們每次調用時都須要從新設置;3)咱們在調用完成後須要掃描這3個集合才能知道哪些fd的讀/寫事件發生了,通常狀況下全量集合比較大而實際發生讀/寫事件的fd比較少,效率比較低下;4)內核在每次調用都須要掃描這3個fd集合,而後查看哪些fd的事件實際發生, 在讀/寫比較稀疏的狀況下一樣存在效率問題。
因爲存在這些問題,因而人們對select進行了改進,從而有了poll。
poll(struct pollfd *fds, int nfds, int timeout) struct pollfd { int fd; short events; short revents; }
poll調用須要傳遞的是一個pollfd結構的數組,調用返回時結果信息也存放在這個數組裏面。 pollfd的結構中存放着fd、咱們對該fd感興趣的事件(events)以及該fd實際發生的事件(revents)。poll傳遞的不是固定大小的 bitmap,所以select的問題1解決了;poll將感興趣事件和實際發生事件分開了,所以select的問題2也解決了。但select的問題3和問題4仍然沒有解決。
總的來講,Select模型的內核的處理邏輯爲:
使用copy_from_user從用戶空間拷貝fd_set到內核空間
註冊回調函數__pollwait
遍歷全部fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據狀況會調用到tcp_poll,udp_poll或者datagram_poll)
以tcp_poll爲例,其核心實現就是__pollwait,也就是上面註冊的回調函數。
__pollwait的主要工做就是把current(當前進程)掛到設備的等待隊列中,不一樣的設備有不一樣的等待隊列,對於tcp_poll 來講,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中並不表明進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數 據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。
poll方法返回時會返回一個描述讀寫操做是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。
若是遍歷完全部的fd,尚未返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。若是超過必定的超時時間(schedule_timeout 指定),仍是沒人喚醒,則調用select的進程會從新被喚醒得到CPU,進而從新遍歷fd,判斷有沒有就緒的fd。
把fd_set從內核空間拷貝到用戶空間。
多客戶端請求服務端,服務端與各客戶端保持長鏈接而且能接收到各客戶端數據大致思路以下:
初始化readset
,而且將服務端監聽的描述符添加到readset
中去。
而後select
阻塞等待readset
集合中是否有描述符可讀。
若是是服務端描述符可讀,那麼表示有新客戶端鏈接上。經過accept
接收客戶端的數據,而且將客戶端描述符添加到一個數組client
中,以便二次遍歷的時候使用。
執行第二次循環,此時經過for
循環把client
中的有效的描述符都添加到readset
中去。
select
再次阻塞等待readset
集合中是否有描述符可讀。
若是此時已經鏈接上的某個客戶端描述符有數據可讀,則進行數據讀取。
[select、poll、epoll之間的區別總結[整理]](http://www.cnblogs.com/Anker/...
select與poll問題的關鍵在於無狀態。對於每一次系統調用,內核不會記錄下任何信息,因此每次調用都須要重複傳遞相同信息。總結而言,select/poll模型存在的問題便是每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大而且每次都須要在內核遍歷傳遞進來的全部的fd,這個開銷在fd不少時候也很大。討論epoll對於select/poll改進的時候,epoll和select和poll的調用接口上的不一樣,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函數,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一個epoll句柄;epoll_ctl是註冊要監聽的事件類型;epoll_wait則是等待事件的產生。對於上面所說的select/poll的缺點,主要是在epoll_ctl中解決的,每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把全部的fd拷貝進內核,而不是在epoll_wait的時候重複拷貝。epoll保證了每一個fd在整個過程當中只會拷貝一次。epoll的解決方案不像select或poll同樣每次都把current輪流加入fd對應的設備等待隊列中,而只在epoll_ctl時把 current掛一遍(這一遍必不可少)併爲每一個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會 把就緒的fd加入一個就緒鏈表)。epoll_wait的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是相似的)。
(1)select,poll實現須要本身不斷輪詢全部fd集合,直到設備就緒,期間可能要睡眠和喚醒屢次交替。而epoll其實也須要調用epoll_wait不斷輪詢就緒鏈表,期間也可能屢次睡眠和喚醒交替,可是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,可是select和poll在「醒着」的時候要遍歷整個fd集合,而epoll在「醒着」的時候只要判斷一下就緒鏈表是否爲空就好了,這節省了大量的CPU時間。這就是回調機制帶來的性能提高。
(2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,而且要把current往設備等待隊列中掛一次,而epoll只要一次拷貝,並且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這裏的等待隊列並非設備等待隊列,只是一個epoll內部定義的等待隊列)。這也能節省很多的開銷。
建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。這個 參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值。須要注意的是,當建立好epoll句柄後,它就是會佔用一個fd值,在 linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被 耗盡。
epoll的事件註冊函數,它不一樣與select()是在監聽事件時告訴內核要監聽什麼類型的事件,而是在這裏先註冊要監聽的事件類型。第一個參數是epoll_create()的返回值,第二個參數表示動做,用三個宏來表示:
EPOLL_CTL_ADD:註冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;
第三個參數是須要監聽的fd,第四個參數是告訴內核須要監聽什麼事,struct epoll_event結構以下:
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
events能夠是如下幾個宏的集合:
EPOLLIN :表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);
EPOLLOUT:表示對應的文件描述符能夠寫;
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET: 將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。
EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏
等 待事件的產生,相似於select()調用。參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個 maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有 說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。
使用epoll 來實現服務端同時接受多客戶端長鏈接數據時,的大致步驟以下:
(1)使用epoll_create建立一個 epoll 的句柄,下例中咱們命名爲epollfd。
(2)使用epoll_ctl把服務端監聽的描述符添加到epollfd指定的 epoll 內核事件表中,監聽服務器端監聽的描述符是否可讀。
(3)使用epoll_wait阻塞等待註冊的服務端監聽的描述符可讀事件的發生。
(4)當有新的客戶端鏈接上服務端時,服務端監聽的描述符可讀,則epoll_wait返回,而後經過accept獲取客戶端描述符。
(5)使用epoll_ctl把客戶端描述符添加到epollfd指定的 epoll 內核事件表中,監聽服務器端監聽的描述符是否可讀。
(6)當客戶端描述符有數據可讀時,則觸發epoll_wait返回,而後執行讀取。
幾乎全部的epoll模型編碼都是基於如下模板:
for( ; ; ) { nfds = epoll_wait(epfd,events,20,500); for(i=0;i<nfds;++i) { if(events[i].data.fd==listenfd) //有新的鏈接 { connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept這個鏈接 ev.data.fd=connfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //將新的fd添加到epoll的監聽隊列中 } else if( events[i].events&EPOLLIN ) //接收到數據,讀socket { n = read(sockfd, line, MAXLINE)) < 0 //讀 ev.data.ptr = md; //md爲自定義類型,添加數據 ev.events=EPOLLOUT|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改標識符,等待下一個循環時發送數據,異步處理的精髓 } else if(events[i].events&EPOLLOUT) //有數據待發送,寫socket { struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取數據 sockfd = md->fd; send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //發送數據 ev.data.fd=sockfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改標識符,等待下一個循環時接收數據 } else { //其餘的處理 } } }
本部分代碼實現參考多是最接地氣的 IO 多路複用小結
#include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <string.h> #define SERV_PORT 8031 #define BUFSIZE 1024 int main(void) { int lfd, cfd; struct sockaddr_in serv_addr,clin_addr; socklen_t clin_len; char recvbuf[BUFSIZE]; int len; lfd = socket(AF_INET,SOCK_STREAM,0); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); serv_addr.sin_port = htons(SERV_PORT); bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); listen(lfd, 128); while(1){ clin_len = sizeof(clin_addr); cfd = accept(lfd, (struct sockaddr *)&clin_addr, &clin_len); while(len = read(cfd,recvbuf,BUFSIZE)){ write(STDOUT_FILENO,recvbuf,len);//把客戶端輸入的內容輸出在終端 // 只有當客戶端輸入 stop 就中止當前客戶端的鏈接 if (strncasecmp(recvbuf,"stop",4) == 0){ close(cfd); break; } } } close(lfd); return 0; }
編譯運行以後,開啓兩個終端使用命令nc 10.211.55.4 8031
(假如服務器的 ip 爲 10.211.55.4)。若是首先連上的客戶端一直不輸入stop
加回車,那麼第二個客戶端輸入任何內容都不會被客戶端接收。以下圖所示
輸入abc
的是先鏈接上的,在其輸入stop
以前,後面鏈接上的客戶端輸入123
並不會被服務端收到。也就是說一直阻塞在第一個客戶端那裏。當第一個客戶端輸入stop
以後,服務端才收到第二個客戶端的發送過來的數據。
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <fcntl.h> #include <sys/select.h> #include <sys/time.h> #include <string.h> #define SERV_PORT 8031 #define BUFSIZE 1024 #define FD_SET_SIZE 128 int main(void) { int lfd, cfd, maxfd, scokfd, retval; struct sockaddr_in serv_addr, clin_addr; socklen_t clin_len; // 地址信息結構體大小 char recvbuf[BUFSIZE]; int len; fd_set read_set, read_set_init; int client[FD_SET_SIZE]; int i; int maxi = -1; if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { perror("套接字描述符建立失敗"); exit(1); } int opt = 1; setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); serv_addr.sin_port = htons(SERV_PORT); if (bind(lfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) { perror("綁定失敗"); exit(1); } if (listen(lfd, FD_SET_SIZE) == -1) { perror("監聽失敗"); exit(1); } maxfd = lfd; for (i = 0; i < FD_SET_SIZE; ++i) { client[i] = -1; } FD_ZERO(&read_set_init); FD_SET(lfd, &read_set_init); while (1) { // 每次循環開始時,都初始化 read_set read_set = read_set_init; // 由於上一步 read_set 已經重置,因此須要已鏈接上的客戶端 fd (由上次循環後產生)從新添加進 read_set for (i = 0; i < FD_SET_SIZE; ++i) { if (client[i] > 0) { FD_SET(client[i], &read_set); } } printf("select 等待\n"); // 這裏會阻塞,直到 read_set 中某一個 fd 有數據可讀才返回,注意 read_set 中除了客戶端 fd 還有服務端監聽的 fd retval = select(maxfd + 1, &read_set, NULL, NULL, NULL); if (retval == -1) { perror("select 錯誤\n"); } else if (retval == 0) { printf("超時\n"); continue; } printf("select 返回\n"); //------------------------------------------------------------------------------------------------ // 用 FD_ISSET 來判斷 lfd (服務端監聽的fd)是否可讀。只有當新的客戶端鏈接時,lfd 纔可讀 if (FD_ISSET(lfd, &read_set)) { clin_len = sizeof(clin_addr); if ((cfd = accept(lfd, (struct sockaddr *) &clin_addr, &clin_len)) == -1) { perror("接收錯誤\n"); continue; } for (i = 0; i < FD_SET_SIZE; ++i) { if (client[i] < 0) { // 把客戶端 fd 放入 client 數組 client[i] = cfd; printf("接收client[%d]一個請求來自於: %s:%d\n", i, inet_ntoa(clin_addr.sin_addr), ntohs(clin_addr.sin_port)); break; } } // 最大的描述符值也要從新計算 maxfd = (cfd > maxfd) ? cfd : maxfd; // maxi 用於下面遍歷全部有效客戶端 fd 使用,以避免遍歷整個 client 數組 maxi = (i >= maxi) ? ++i : maxi; } //------------------------------------------------------------------------------------------------ for (i = 0; i < maxi; ++i) { if (client[i] < 0) { continue; } // 若是客戶端 fd 中有數據可讀,則進行讀取 if (FD_ISSET(client[i], &read_set)) { // 注意:這裏沒有使用 while 循環讀取,若是使用 while 循環讀取,則有阻塞在一個客戶端了。 // 可能你會想到若是一次讀取不完怎麼辦? // 讀取不完時,在循環到 select 時 因爲未讀完的 fd 還有數據可讀,那麼當即返回,而後到這裏繼續讀取,原來的 while 循環讀取直接提到最外層的 while(1) + select 來判斷是否有數據繼續可讀 len = read(client[i], recvbuf, BUFSIZE); if (len > 0) { write(STDOUT_FILENO, recvbuf, len); }else if (len == 0){ // 若是在客戶端 ctrl+z close(client[i]); printf("clinet[%d] 鏈接關閉\n", i); FD_CLR(client[i], &read_set); client[i] = -1; break; } } } } close(lfd); return 0; }
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <fcntl.h> #include <sys/epoll.h> #include <sys/time.h> #include <string.h> #define SERV_PORT 8031 #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 /* 將文件描述符 fd 上的 EPOLLIN 註冊到 epollfd 指示的 epoll 內核事件表中 */ void addfd(int epollfd, int fd) { struct epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); } void et(struct epoll_event *events, int number, int epollfd, int listenfd) { char buf[BUFFER_SIZE]; for (int i = 0; i < number; ++i) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t length = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr *) &client_address, &length); printf("接收一個請求來自於: %s:%d\n", inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port)); addfd(epollfd, connfd); } else if (events[i].events & EPOLLIN) { /* 這段代碼不會被重複觸發,因此咱們循環讀取數據,以確保把 socket 緩存中的全部數據讀取*/ while (1) { memset(buf, '\0', BUFFER_SIZE); int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0); if (ret < 0) { /* 對非阻塞 IO ,下面的條件成立表示數據已經所有讀取完畢。此後 epoll 就能再次觸發 sockfd 上的 EPOLLIN 事件,以驅動下一次讀操做 */ if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { printf("read later\n"); break; } close(sockfd); break; } else if (ret == 0) { printf("斷開一個鏈接\n"); close(sockfd); } else { printf("get %d bytes of content: %s\n", ret, buf); } } } } } int main(void) { int lfd, epollfd,ret; struct sockaddr_in serv_addr; if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { perror("套接字描述符建立失敗"); exit(1); } int opt = 1; setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); serv_addr.sin_port = htons(SERV_PORT); if (bind(lfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) { perror("綁定失敗"); exit(1); } if (listen(lfd, 5) == -1) { perror("監聽失敗"); exit(1); } struct epoll_event events[MAX_EVENT_NUMBER]; if ((epollfd = epoll_create(5)) == -1) { perror("建立失敗"); exit(1); } // 把服務器端 lfd 添加到 epollfd 指定的 epoll 內核事件表中,添加一個 lfd 可讀的事件 addfd(epollfd, lfd); while (1) { // 阻塞等待新客戶端的鏈接或者客戶端的數據寫入,返回須要處理的事件數目 if ((ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1)) < 0) { perror("epoll_wait失敗"); exit(1); } et(events, ret, epollfd, lfd); } close(lfd); return 0; }