Py西遊攻關之IO model

事件驅動模型

上節的問題: 
協程:遇到IO操做就切換。 
但何時切回去呢?怎麼肯定IO操做完了?javascript

  View Code

傳統的編程是以下線性模式的:html

開始--->代碼塊A--->代碼塊B--->代碼塊C--->代碼塊D--->......--->結束java

每個代碼塊裏是完成各類各樣事情的代碼,但編程者知道代碼塊A,B,C,D...的執行順序,惟一可以改變這個流程的是數據。輸入不一樣的數據,根據條件語句判斷,流程或許就改成A--->C--->E...--->結束。每一次程序運行順序或許都不一樣,但它的控制流程是由輸入數據和你編寫的程序決定的。若是你知道這個程序當前的運行狀態(包括輸入數據和程序自己),那你就知道接下來甚至一直到結束它的運行流程。linux

 對於事件驅動型程序模型,它的流程大體以下:nginx

開始--->初始化--->等待web

 與上面傳統編程模式不一樣,事件驅動程序在啓動以後,就在那等待,等待什麼呢?等待被事件觸發。傳統編程下也有「等待」的時候,好比在代碼塊D中,你定義了一個input(),須要用戶輸入數據。但這與下面的等待不一樣,傳統編程的「等待」,好比input(),你做爲程序編寫者是知道或者強制用戶輸入某個東西的,或許是數字,或許是文件名稱,若是用戶輸入錯誤,你還須要提醒他,並請他從新輸入。事件驅動程序的等待則是徹底不知道,也不強制用戶輸入或者幹什麼。只要某一事件發生,那程序就會作出相應的「反應」。這些事件包括:輸入信息、鼠標、敲擊鍵盤上某個鍵還有系統內部定時器觸發。編程

1、事件驅動模型介紹

一般,咱們寫服務器處理模型的程序時,有如下幾種模型:windows

(1)每收到一個請求,建立一個新的進程,來處理該請求; 
(2)每收到一個請求,建立一個新的線程,來處理該請求; 
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求

第三種就是協程、事件驅動的方式,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式 數組

論事件驅動模型 緩存

複製代碼
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>

</head>
<body>

<p onclick="fun()">點我呀</p>


<script type="text/javascript">
    function fun() {
          alert('約嗎?')
    }
</script>
</body>

</html>
複製代碼

在UI編程中,經常要對鼠標點擊進行相應,首先如何得到鼠標點擊呢? 兩種方式:

1建立一個線程循環檢測是否有鼠標點擊

      那麼這個方式有如下幾個缺點:

  1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
  2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
  3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題; 
    因此,該方式是很是很差的。

2 就是事件驅動模型 

目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:

  1. 有一個事件(消息)隊列;
  2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
  3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
  4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數; 
    這裏寫圖片描述
    事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。 
     
    讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。 
    這裏寫圖片描述

最初的問題:怎麼肯定IO操做完了切回去呢?經過回調函數 

複製代碼
1.要理解事件驅動和程序,就須要與非事件驅動的程序進行比較。實際上,現代的程序大可能是事件驅動的,好比多線程的程序,確定是事件驅動的。早期則存在許多非事件驅動的程序,這樣的程序,在須要等待某個條件觸發時,會不斷地檢查這個條件,直到條件知足,這是很浪費cpu時間的。而事件驅動的程序,則有機會釋放cpu從而進入睡眠態(注意是有機會,固然程序也可自行決定不釋放cpu),當事件觸發時被操做系統喚醒,這樣就能更加有效地使用cpu.
2.再說什麼是事件驅動的程序。一個典型的事件驅動的程序,就是一個死循環,並以一個線程的形式存在,這個死循環包括兩個部分,第一個部分是按照必定的條件接收並選擇一個要處理的事件,第二個部分就是事件的處理過程。程序的執行過程就是選擇事件和處理事件,而當沒有任何事件觸發時,程序會因查詢事件隊列失敗而進入睡眠狀態,從而釋放cpu。
3.事件驅動的程序,一定會直接或者間接擁有一個事件隊列,用於存儲未能及時處理的事件。
4.事件驅動的程序的行爲,徹底受外部輸入的事件控制,因此,事件驅動的系統中,存在大量這種程序,並以事件做爲主要的通訊方式。
5.事件驅動的程序,還有一個最大的好處,就是能夠按照必定的順序處理隊列中的事件,而這個順序則是由事件的觸發順序決定的,這一特性每每被用於保證某些過程的原子化。
6.目前windows,linux,nucleus,vxworks都是事件驅動的,只有一些單片機多是非事件驅動的。
複製代碼

注意,事件驅動的監聽事件是由操做系統調用的cpu來完成的

IO多路複用

前面是用協程實現的IO阻塞自動切換,那麼協程又是怎麼實現的,在原理是是怎麼實現的。如何去實現事件驅動的狀況下IO的自動阻塞的切換,這個學名叫什麼呢? => IO多路複用 
好比socketserver,多個客戶端鏈接,單線程下實現併發效果,就叫多路複用。 
  
同步IO和異步IO,阻塞IO和非阻塞IO分別是什麼,到底有什麼區別?不一樣的人在不一樣的上下文下給出的答案是不一樣的。因此先限定一下本文的上下文。 

本文討論的背景是Linux環境下的network IO。

1 IO模型前戲準備

在進行解釋以前,首先要說明幾個概念:

  1. 用戶空間和內核空間
  2. 進程切換
  3. 進程的阻塞
  4. 文件描述符
  5. 緩存 I/O

用戶空間與內核空間

如今操做系統都是採用虛擬存儲器,那麼對32位操做系統而言,它的尋址空間(虛擬存儲空間)爲4G(2的32次方)。 
操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護的內存空間,也有訪問底層硬件設備的全部權限。 
爲了保證用戶進程不能直接操做內核(kernel),保證內核的安全,操心繫統將虛擬空間劃分爲兩部分,一部分爲內核空間,一部分爲用戶空間。 
針對linux操做系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱爲內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱爲用戶空間。 

進程切換

爲了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復之前掛起的某個進程的執行。這種行爲被稱爲進程切換,這種切換是由操做系統來完成的。所以能夠說,任何進程都是在操做系統內核的支持下運行的,是與內核緊密相關的。 
從一個進程的運行轉到另外一個進程上運行,這個過程當中通過下面這些變化:

保存處理機上下文,包括程序計數器和其餘寄存器。

更新PCB信息。

把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。

選擇另外一個進程執行,並更新其PCB。

更新內存管理的數據結構。

恢復處理機上下文。 
注:總而言之就是很耗資源的

進程的阻塞

正在執行的進程,因爲期待的某些事件未發生,如請求系統資源失敗、等待某種操做的完成、新數據還沒有到達或無新工做作等,則由系統自動執行阻塞原語(Block),使本身由運行狀態變爲阻塞狀態。可見,進程的阻塞是進程自身的一種主動行爲,也所以只有處於運行態的進程(得到CPU),纔可能將其轉爲阻塞狀態。當進程進入阻塞狀態,是不佔用CPU資源的。

文件描述符fd

文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。 
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。

緩存 I/O

緩存 I/O 又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。用戶空間無法直接訪問內核空間的,內核態到用戶態的數據拷貝 

思考:爲何數據必定要先到內核區,直接到用戶內存不是更直接嗎?
緩存 I/O 的缺點: 

數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的 CPU 以及內存開銷是很是大的。

 

       同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什麼,到底有什麼區別?這個問題其實不一樣的人給出的答案均可能不一樣,好比wiki,就認爲asynchronous IO和non-blocking IO是一個東西。這實際上是由於不一樣的人的知識背景不一樣,而且在討論這個問題的時候上下文(context)也不相同。因此,爲了更好的回答這個問題,我先限定一下本文的上下文。
本文討論的背景是Linux環境下的network IO。 

Stevens在文章中一共比較了五種IO Model:

      •     blocking IO
      •     nonblocking IO
      •     IO multiplexing
      •     signal driven IO
      •     asynchronous IO

因爲signal driven IO在實際中並不經常使用,因此我這隻說起剩下的四種IO Model。
再說一下IO發生時涉及的對象和步驟。
      對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,它會經歷兩個階段:
 1 等待數據準備 (Waiting for the data to be ready)
 2 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)
記住這兩點很重要,由於這些IO Model的區別就是在兩個階段上各有不一樣的狀況。

2 blocking IO (阻塞IO)

在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:

      當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來講,不少時候數據在一開始尚未到達(好比,尚未收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。
因此,blocking IO的特色就是在IO執行的兩個階段都被block了。

3 non-blocking IO(非阻塞IO)

linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:

      從圖中能夠看出,當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。
因此,用戶進程實際上是須要不斷的主動詢問kernel數據好了沒有。

 注意:

      在網絡IO時候,非阻塞IO也會進行recvform系統調用,檢查數據是否準備好,與阻塞IO不同,」非阻塞將大的整片時間的阻塞分紅N多的小的阻塞, 因此進程不斷地有機會 ‘被’ CPU光顧」。即每次recvform系統調用之間,cpu的權限還在進程手中,這段時間是能夠作其餘事情的,

      也就是說非阻塞的recvform系統調用調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,能夠乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

4  IO multiplexing(IO多路複用)

      IO multiplexing這個詞可能有點陌生,可是若是我說select,epoll,大概就都能明白了。有些地方也稱這種IO方式爲event driven IO。咱們都知道,select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:

      當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。(多說一句。因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)
在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

注意1:select函數返回結果中若是有文件可讀了,那麼進程就能夠經過調用accept()或recv()來讓kernel將位於內核中準備到的數據copy到用戶區。

注意2: select的優點在於能夠處理多個鏈接,不適用於單個鏈接

5  Asynchronous I/O(異步IO)

linux下的asynchronous IO其實用得不多。先看一下它的流程:

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

      到目前爲止,已經將四個IO Model都介紹完了。如今回過頭來回答最初的那幾個問題:blocking和non-blocking的區別在哪,synchronous IO和asynchronous IO的區別在哪。
先回答最簡單的這個:blocking vs non-blocking。前面的介紹中其實已經很明確的說明了這二者的區別。調用blocking IO會一直block住對應的進程直到操做完成,而non-blocking IO在kernel還準備數據的狀況下會馬上返回。

在說明synchronous IO和asynchronous IO的區別以前,須要先給出二者的定義。Stevens給出的定義(實際上是POSIX的定義)是這樣子的:
    A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
    An asynchronous I/O operation does not cause the requesting process to be blocked;
 
      二者的區別就在於synchronous IO作」IO operation」的時候會將process阻塞。按照這個定義,以前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。有人可能會說,non-blocking IO並無被block啊。這裏有個很是「狡猾」的地方,定義中所指的」IO operation」是指真實的IO操做,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。而asynchronous IO則不同,當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。

       注意:因爲我們接下來要講的select,poll,epoll都屬於IO多路複用,而IO多路複用又屬於同步的範疇,故,epoll只是一個僞異步而已。

各個IO Model的比較如圖所示:

      通過上面的介紹,會發現non-blocking IO和asynchronous IO的區別仍是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。

五種IO模型比較:

       

select poll epoll IO多路複用介紹

首先列一下,sellect、poll、epoll三者的區別

  • select 
    select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。 
    select目前幾乎在全部的平臺上支持 
      
    select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。 
      
    另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
  • poll 
    它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。 
    通常也不用它,至關於過渡階段

  • epoll 
    直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll。被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。windows不支持 

    沒有最大文件描述符數量的限制。 
    好比100個鏈接,有兩個活躍了,epoll會告訴用戶這兩個兩個活躍了,直接取就ok了,而select是循環一遍。 

    (瞭解)epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。 
    另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。 

    因此市面上上見到的所謂的異步IO,好比nginx、Tornado、等,咱們叫它異步IO,其實是IO多路複用。

select與epoll  

# 首先咱們來定義流的概念,一個流能夠是文件,socket,pipe等等能夠進行I/O操做的內核對象。
# 無論是文件,仍是套接字,仍是管道,咱們均可以把他們看做流。
# 以後咱們來討論I/O的操做,經過read,咱們能夠從流中讀入數據;經過write,咱們能夠往流寫入數據。如今假
# 定一個情形,咱們須要從流中讀數據,可是流中尚未數據,(典型的例子爲,客戶端要從socket讀如數據,可是
# 服務器尚未把數據傳回來),這時候該怎麼辦?
# 阻塞。阻塞是個什麼概念呢?好比某個時候你在等快遞,可是你不知道快遞何時過來,並且你沒有別的事能夠幹
# (或者說接下來的事要等快遞來了才能作);那麼你能夠去睡覺了,由於你知道快遞把貨送來時必定會給你打個電話
# (假定必定能叫醒你)。
# 非阻塞忙輪詢。接着上面等快遞的例子,若是用忙輪詢的方法,那麼你須要知道快遞員的手機號,而後每分鐘給他掛
# 個電話:「你到了沒?」
# 很明顯通常人不會用第二種作法,不只顯很無腦,浪費話費不說,還佔用了快遞員大量的時間。
# 大部分程序也不會用第二種作法,由於第一種方法經濟而簡單,經濟是指消耗不多的CPU時間,若是線程睡眠了,
# 就掉出了系統的調度隊列,暫時不會去瓜分CPU寶貴的時間片了。
#
# 爲了瞭解阻塞是如何進行的,咱們來討論緩衝區,以及內核緩衝區,最終把I/O事件解釋清楚。緩衝區的引入是爲
# 了減小頻繁I/O操做而引發頻繁的系統調用(你知道它很慢的),當你操做一個流時,更多的是以緩衝區爲單位進
# 行操做,這是相對於用戶空間而言。對於內核來講,也須要緩衝區。
# 假設有一個管道,進程A爲管道的寫入方,B爲管道的讀出方。
# 假設一開始內核緩衝區是空的,B做爲讀出方,被阻塞着。而後首先A往管道寫入,這時候內核緩衝區由空的狀態變
# 到非空狀態,內核就會產生一個事件告訴B該醒來了,這個事件姑且稱之爲「緩衝區非空」。
# 可是「緩衝區非空」事件通知B後,B卻尚未讀出數據;且內核許諾了不能把寫入管道中的數據丟掉這個時候,A寫
# 入的數據會滯留在內核緩衝區中,若是內核也緩衝區滿了,B仍未開始讀數據,最終內核緩衝區會被填滿,這個時候
# 會產生一個I/O事件,告訴進程A,你該等等(阻塞)了,咱們把這個事件定義爲「緩衝區滿」。
# 假設後來B終於開始讀數據了,因而內核的緩衝區空了出來,這時候內核會告訴A,內核緩衝區有空位了,你能夠從
# 長眠中醒來了,繼續寫數據了,咱們把這個事件叫作「緩衝區非滿」
# 也許事件Y1已經通知了A,可是A也沒有數據寫入了,而B繼續讀出數據,知道內核緩衝區空了。這個時候內核就告
# 訴B,你須要阻塞了!,咱們把這個時間定爲「緩衝區空」。
# 這四個情形涵蓋了四個I/O事件,緩衝區滿,緩衝區空,緩衝區非空,緩衝區非滿(注都是說的內核緩衝區,且這四
# 個術語都是我生造的,僅爲解釋其原理而造)。這四個I/O事件是進行阻塞同步的根本。(若是不能理解「同步」是
# 什麼概念,請學習操做系統的鎖,信號量,條件變量等任務同步方面的相關知識)。
#
# 而後咱們來講說阻塞I/O的缺點。可是阻塞I/O模式下,一個線程只能處理一個流的I/O事件。若是想要同時處理多
# 個流,要麼多進程(fork),要麼多線程(pthread_create),很不幸這兩種方法效率都不高。
# 因而再來考慮非阻塞忙輪詢的I/O方式,咱們發現咱們能夠同時處理多個流了(把一個流從阻塞模式切換到非阻塞
# 模式再此不予討論):
# while true {
# for i in stream[]; {
# if i has data
# read until unavailable
# }
# }
# 咱們只要不停的把全部流從頭至尾問一遍,又從頭開始。這樣就能夠處理多個流了,但這樣的作法顯然很差,由於
# 若是全部的流都沒有數據,那麼只會白白浪費CPU。這裏要補充一點,阻塞模式下,內核對於I/O事件的處理是阻
# 塞或者喚醒,而非阻塞模式下則把I/O事件交給其餘對象(後文介紹的select以及epoll)處理甚至直接忽略。
#
# 爲了不CPU空轉,能夠引進了一個代理(一開始有一位叫作select的代理,後來又有一位叫作poll的代理,不
# 過二者的本質是同樣的)。這個代理比較厲害,能夠同時觀察許多流的I/O事件,在空閒的時候,會把當前線程阻
# 塞掉,當有一個或多個流有I/O事件時,就從阻塞態中醒來,因而咱們的程序就會輪詢一遍全部的流(因而咱們可
# 以把「忙」字去掉了)。代碼長這樣:
# while true {
# select(streams[])
# for i in streams[] {
# if i has data
# read until unavailable
# }
# }
# 因而,若是沒有I/O事件產生,咱們的程序就會阻塞在select處。可是依然有個問題,咱們從select那裏僅僅知
# 道了,有I/O事件發生了,但卻並不知道是那幾個流(可能有一個,多個,甚至所有),咱們只能無差異輪詢全部流,
# 找出能讀出數據,或者寫入數據的流,對他們進行操做。
# 可是使用select,咱們有O(n)的無差異輪詢複雜度,同時處理的流越多,每一次無差異輪詢時間就越長。再次
# 說了這麼多,終於能好好解釋epoll了
# epoll能夠理解爲event poll,不一樣於忙輪詢和無差異輪詢,epoll之會把哪一個流發生了怎樣的I/O事件通知我
# 們。此時咱們對這些流的操做都是有意義的。
# 在討論epoll的實現細節以前,先把epoll的相關操做列出:
# epoll_create 建立一個epoll對象,通常epollfd = epoll_create()
# epoll_ctl (epoll_add/epoll_del的合體),往epoll對象中增長/刪除某一個流的某一個事件
# 好比
# epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//有緩衝區內有數據時epoll_wait返回
# epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//緩衝區可寫入時epoll_wait返回
# epoll_wait(epollfd,...)等待直到註冊的事件發生
# (注:當對一個非阻塞流的讀寫發生緩衝區滿或緩衝區空,write/read會返回-1,並設置errno=EAGAIN。
# 而epoll只關心緩衝區非滿和緩衝區非空事件)。
# 一個epoll模式的代碼大概的樣子是:
# while true {
# active_stream[] = epoll_wait(epollfd)
# for i in active_stream[] {
# read or write till unavailable
# }
# }


# 舉個例子:
#    select:
#          班裏三十個同窗在考試,誰先作完想交卷都要經過按鈕來活動,他按按鈕做爲老師的我桌子上的燈就會變紅.
#          一旦燈變紅,我(select)我就能夠知道有人交卷了,可是我並不知道誰交的,因此,我必須跟個傻子似的輪詢
#          地去問:嘿,是你要交卷嗎?而後我就能夠以這種效率極低地方式找到要交卷的學生,而後把它的卷子收上來.
#
#
#    epoll:
#         此次再有人按按鈕,我這不光燈會亮,上面還會顯示要交卷學生的名字.這樣我就能夠直接去對應學生那收卷就
#         好了.固然,同時能夠有多人交卷.
View Code

 

IO多路複用的觸發方式

複製代碼
# 在linux的IO多路複用中有水平觸發,邊緣觸發兩種模式,這兩種模式的區別以下:
#
# 水平觸發:若是文件描述符已經就緒能夠非阻塞的執行IO操做了,此時會觸發通知.容許在任意時刻重複檢測IO的狀態,
# 沒有必要每次描述符就緒後儘量多的執行IO.select,poll就屬於水平觸發.
#
# 邊緣觸發:若是文件描述符自上次狀態改變後有新的IO活動到來,此時會觸發通知.在收到一個IO事件通知後要儘量
# 多的執行IO操做,由於若是在一次通知中沒有執行完IO那麼就須要等到下一次新的IO活動到來才能獲取到就緒的描述
# 符.信號驅動式IO就屬於邊緣觸發.
#
# epoll既能夠採用水平觸發,也能夠採用邊緣觸發.
#
# 你們可能還不能徹底瞭解這兩種模式的區別,咱們能夠舉例說明:一個管道收到了1kb的數據,epoll會當即返回,此時
# 讀了512字節數據,而後再次調用epoll.這時若是是水平觸發的,epoll會當即返回,由於有數據準備好了.若是是邊
# 緣觸發的不會當即返回,由於此時雖然有數據可讀可是已經觸發了一次通知,在此次通知到如今尚未新的數據到來,
# 直到有新的數據到來epoll纔會返回,此時老的數據和新的數據均可以讀取到(固然是須要此次你儘量的多讀取).


# 下面咱們還從電子的角度來解釋一下:
# 
#     水平觸發:也就是隻有高電平(1)或低電平(0)時才觸發通知,只要在這兩種狀態就能獲得通知.上面提到的只要
# 有數據可讀(描述符就緒)那麼水平觸發的epoll就當即返回.
# 
#     邊緣觸發:只有電平發生變化(高電平到低電平,或者低電平到高電平)的時候才觸發通知.上面提到即便有數據
# 可讀,可是沒有新的IO活動到來,epoll也不會當即返回.
複製代碼

簡單實例

實例1(non-blocking IO):     

複製代碼
import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
sk.bind(('127.0.0.1',6667))
sk.listen(5)
sk.setblocking(False)
while True:
    try:
        print ('waiting client connection .......')
        connection,address = sk.accept()   # 進程主動輪詢
        print("+++",address)
        client_messge = connection.recv(1024)
        print(str(client_messge,'utf8'))
        connection.close()
    except Exception as e:
        print (e)
        time.sleep(4)

#############################client

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

while True:
    sk.connect(('127.0.0.1',6667))
    print("hello")
    sk.sendall(bytes("hello","utf8"))
    time.sleep(2)
    break
複製代碼

      優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在同時執行)。

  缺點:任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。

實例2(IO multiplexing):

在非阻塞實例中,輪詢的主語是進程,而「後臺」 可能有多個任務在同時進行,人們就想到了循環查詢多個任務的完成狀態,只要有任何一個任務完成,就去處理它。不過,這個監聽的重任經過調用select等函數交給了內核去作。IO多路複用有兩個特別的系統調用select、poll、epoll函數。select調用是內核級別的,select輪詢相對非阻塞的輪詢的區別在於—前者能夠等待多個socket,能實現同時對多個IO端口進行監聽,當其中任何一個socket的數據準好了,就能返回進行可讀,而後進程再進行recvfrom系統調用,將數據由內核拷貝到用戶進程,固然這個過程是阻塞的。

實例2:

複製代碼
import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",9904))
sk.listen(5)

while True:
    r,w,e=select.select([sk,],[],[],5)
    for i in r:
        # conn,add=i.accept()
        #print(conn)
        print("hello")
    print('>>>>>>')
    
#*************************client.py
import socket

sk=socket.socket()

sk.connect(("127.0.0.1",9904))

while 1:
    inp=input(">>").strip()
    sk.send(inp.encode("utf8"))
    data=sk.recv(1024)
    print(data.decode("utf8"))
複製代碼

請思考:爲何不調用accept,會反覆print?

select屬於水平觸發

實例3(server端併發聊天):

複製代碼
#***********************server.py
import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8801))
sk.listen(5)
inputs=[sk,]
while True:
    r,w,e=select.select(inputs,[],[],5)
    print(len(r))

    for obj in r:
        if obj==sk:
            conn,add=obj.accept()
            print(conn)
            inputs.append(conn)
        else:
            data_byte=obj.recv(1024)
            print(str(data_byte,'utf8'))
            inp=input('回答%s號客戶>>>'%inputs.index(obj))
            obj.sendall(bytes(inp,'utf8'))

    print('>>',r)

#***********************client.py

import socket
sk=socket.socket()
sk.connect(('127.0.0.1',8801))

while True:
    inp=input(">>>>")
    sk.sendall(bytes(inp,"utf8"))
    data=sk.recv(1024)
    print(str(data,'utf8'))
複製代碼

文件描述符其實就是我們平時說的句柄,只不過文件描述符是linux中的概念。注意,咱們的accept或recv調用時即向系統發出recvfrom請求

    (1)  若是內核緩衝區沒有數據--->等待--->數據到了內核緩衝區,轉到用戶進程緩衝區;

    (2) 若是先用select監聽到某個文件描述符對應的內核緩衝區有了數據,當咱們再調用accept或recv時,直接將數據轉到用戶緩衝區。

思考1:開啓5個client,分別按54321的順序發送消息,那麼server端是按什麼順序回消息的呢?

思考2:  如何在某一個client端退出後,不影響server端和其它客戶端正常交流

linux:

if not data_byte:
            inputs.remove(obj)
            continue

win

複製代碼
try:
      data_byte=obj.recv(1024)
      print(str(data_byte,'utf8'))
      inp=input('回答%s號客戶>>>'%inputs.index(obj))
      obj.sendall(bytes(inp,'utf8'))
except Exception:
      inputs.remove(obj)
複製代碼

延伸

實例4:

複製代碼
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
 
import select
import socket
import sys
import queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)
 
# Sockets from which we expect to read
inputs = [ server ]
 
# Sockets to which we expect to write
outputs = [ ]
 
message_queues = {}
while inputs:
 
    # Wait for at least one of the sockets to be ready for processing
    print( '\nwaiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # Handle inputs
    for s in readable:
 
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print('new connection from', client_address)
            connection.setblocking(False)
            inputs.append(connection)
 
            # Give the connection a queue for data we want to send
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
                print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉
                inputs.remove(s)    #inputs中也刪除掉
                s.close()           #把這個鏈接關閉掉
 
                # Remove message queue
                del message_queues[s]
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
            print('output queue for', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle "exceptional conditions"
    for s in exceptional:
        print('handling exceptional condition for', s.getpeername() )
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
 
        # Remove message queue
        del message_queues[s]
複製代碼

實例5:

複製代碼
# select 模擬一個socket server,注意socket必須在非阻塞狀況下才能實現IO多路複用。
# 接下來經過例子瞭解select 是如何經過單進程實現同時處理多個非阻塞的socket鏈接的。
#server端


import select
import socket
import queue

server = socket.socket()
server.bind(('localhost',9000))
server.listen(1000)

server.setblocking(False)  # 設置成非阻塞模式,accept和recv都非阻塞
# 這裏若是直接 server.accept() ,若是沒有鏈接會報錯,因此有數據才調他們
# BlockIOError:[WinError 10035] 沒法當即完成一個非阻塞性套接字操做。
msg_dic = {}
inputs = [server,]  # 交給內核、select檢測的列表。
# 必須有一個值,讓select檢測,不然報錯提供無效參數。
# 沒有其餘鏈接以前,本身就是個socket,本身就是個鏈接,檢測本身。活動了說明有連接
outputs = []  # 你往裏面放什麼,下一次就出來了

while True:
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)  # 定義檢測
    #新來鏈接                                        檢測列表         異常(斷開)
    # 異常的也是inputs是: 檢測那些鏈接的存在異常
    print(readable,writeable,exceptional)
    for r in readable:
        if r is server:  # 有數據,表明來了一個新鏈接
            conn, addr = server.accept()
            print("來了個新鏈接",addr)
            inputs.append(conn)  # 把鏈接加到檢測列表裏,若是這個鏈接活動了,就說明數據來了
            # inputs = [server.conn] # 【conn】只返回活動的鏈接,但怎麼肯定是誰活動了
            # 若是server活動,則來了新鏈接,conn活動則來數據
            msg_dic[conn] = queue.Queue()  # 初始化一個隊列,後面存要返回給這個客戶端的數據
        else:
            try :
                data = r.recv(1024)  # 注意這裏是r,而不是conn,多個鏈接的狀況
                print("收到數據",data)
                # r.send(data) # 不能直接發,若是客戶端不收,數據就沒了
                msg_dic[r].put(data)  # 往裏面放數據
                outputs.append(r)  # 放入返回的鏈接隊列裏
            except ConnectionResetError as e:
                print("客戶端斷開了",r)
                if r in outputs:
                    outputs.remove(r) #清理已斷開的鏈接
                inputs.remove(r) #清理已斷開的鏈接
                del msg_dic[r] ##清理已斷開的鏈接

    for w in writeable:  # 要返回給客戶端的鏈接列表
        data_to_client = msg_dic[w].get()  # 在字典裏取數據
        w.send(data_to_client)  # 返回給客戶端
        outputs.remove(w)  # 刪除這個數據,確保下次循環的時候不返回這個已經處理完的鏈接了。

    for e in exceptional:  # 若是鏈接斷開,刪除鏈接相關數據
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del msg_dic[e]


#*************************client
import socket
client = socket.socket()

client.connect(('localhost', 9000))

while True:
    cmd = input('>>> ').strip()
    if len(cmd) == 0 : continue
    client.send(cmd.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode())

client.close()
複製代碼

實例6:

複製代碼
import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
複製代碼
 

注:本文最重要的參考文獻是Richard Stevens的「UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking 」      http://mp.weixin.qq.com/s__biz=MzA4MjEyNTA5Mw==&mid=2652563599&idx=1&sn=9781747e54d906c0c140228376e671ed&scene=21#wecha t_redirect

https://pymotw.com/2/select/#module-select

http://blog.csdn.net/lingfengtengfei/article/details/12392449

http://www.jb51.net/article/37416.htm

https://pymotw.com/2/select/#module-select 

相關文章
相關標籤/搜索