python 學習筆記12(事件驅動、IO多路複用、異步IO)

阻塞IO和非阻塞IO、同步IO和異步IO的區別

討論背景Linux環境下的network IOpython

一、先決條件(幾個重要概念)

1.1用戶空間與內核空間linux

  如今操做系統都是採用虛擬存儲器,那麼對32位操做系統而言,它的尋址空間(虛擬存儲空間)爲4G(2的32次方)。操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護的內存空間,也有訪問底層硬件設備的全部權限。爲了保證用戶進程不能直接操做內核(kernel),保證內核的安全,操心繫統將虛擬空間劃分爲兩部分,一部分爲內核空間,一部分爲用戶空間。針對linux操做系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱爲內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱爲用戶空間。程序員

1.二、 進程切換web

  爲了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復之前掛起的某個進程的執行。這種行爲被稱爲進程切換。所以能夠說,任何進程都是在操做系統內核的支持下運行的,是與內核緊密相關的。編程

  從一個進程的運行轉到另外一個進程上運行,這個過程當中通過下面這些變化
  1. 保存處理機上下文,包括程序計數器和其餘寄存器。
  2. 更新PCB信息。
  3. 把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。
  4. 選擇另外一個進程執行,並更新其PCB。
  5. 更新內存管理的數據結構。
  6. 恢復處理機上下文。數組

注:總而言之就是很耗資源緩存

1.三、 進程的阻塞安全

  正在執行的進程,因爲期待的某些事件未發生,如請求系統資源失敗、等待某種操做的完成、新數據還沒有到達或無新工做作等,則由系統自動執行阻塞原語(Block),使本身由運行狀態變爲阻塞狀態。可見,進程的阻塞是進程自身的一種主動行爲,也所以只有處於運行態的進程(得到CPU),纔可能將其轉爲阻塞狀態。當進程進入阻塞狀態,是不佔用CPU資源的服務器

1.四、 文件描述符fd網絡

  文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。

  文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。

1.5 緩存 I/O

  緩存 I/O 又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。

  緩存 I/O 的缺點:
  數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的 CPU 以及內存開銷是很是大的。

 

二、IO模式

  上面已經提到,對於一次IO訪問(以read舉例),數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。因此說,當一個read操做發生時,它會經歷兩個階段:
  1. 等待數據準備 (Waiting for the data to be ready)
  2. 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)

正式由於這兩個階段,linux系統產生了下面五種網絡模式的方案:
  - 阻塞 I/O(blocking IO)
  - 非阻塞 I/O(nonblocking IO)
  - I/O 多路複用( IO multiplexing)
  - 信號驅動 I/O( signal driven IO)
  - 異步 I/O(asynchronous IO)

2.一、阻塞IO

  在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:

  當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據(對於網絡IO來講,不少時候數據在一開始尚未到達。好比,尚未收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程須要等待,也就是說數據被拷貝到操做系統內核的緩衝區中是須要一個過程的。而在用戶進程這邊,整個進程會被阻塞(固然,是進程本身選擇的阻塞)。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。

  因此,blocking IO的特色就是在IO執行的兩個階段都被block了。

2.2 、非阻塞IO(nonblocking IO)

  linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:

  當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。

  因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。


2.三、 I/O 多路複用( IO multiplexing)

  IO multiplexing就是咱們說的select,poll,epoll,有些地方也稱這種IO方式爲event driven IO。select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。

  當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。

  因此,I/O 多路複用的特色是經過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就能夠返回。

  這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。

  因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)

  在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

2.4 、異步 I/O(asynchronous IO)

  inux下的asynchronous IO其實用得不多。先看一下它的流程:

  用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

 2.五、小結

blocking和non-blocking的區別

  調用blocking IO會一直block住對應的進程直到操做完成,而non-blocking IO在kernel還準備數據的狀況下會馬上返回。

synchronous IO和asynchronous IO的區別

  在說明synchronous IO和asynchronous IO的區別以前,須要先給出二者的定義。POSIX的定義是這樣子的:
    - A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
    - An asynchronous I/O operation does not cause the requesting process to be blocked;

  二者的區別就在於synchronous IO作」IO operation」的時候會將process阻塞。按照這個定義,以前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。

  有人會說,non-blocking IO並無被block啊。這裏有個很是「狡猾」的地方,定義中所指的」IO operation」是指真實的IO操做,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。

  而asynchronous IO則不同,當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。

 

各個IO Model的比較如圖所示:

 

  能夠發現non-blocking IO和asynchronous IO的區別仍是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。

2 事件驅動、IO多路複用(select/poll/epoll)

2.1論事件驅動

  一般,咱們寫 服務器處理模型的程序時,有如下幾種模型
    (1)每收到一個請求,建立一個新的進程,來處理該請求;
    (2)每收到一個請求,建立一個新的線程,來處理該請求;
    (3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
  上面的幾種方式,各有千秋,
    第(1)中方法,因爲建立新的進程:實現比較簡單,但開銷比較大,致使服務器性能比較差。
    第(2)種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。
    第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。
  綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式
 

看圖說話講事件驅動模型

  傳統的編程是以下線性模式的:

  開始--->代碼塊A--->代碼塊B--->代碼塊C--->代碼塊D--->......--->結束

  每個代碼塊裏是完成各類各樣事情的代碼,但編程者知道代碼塊A,B,C,D...的執行順序,惟一可以改變這個流程的是數據。輸入不一樣的數據,根據條件語句判斷,流程或許就改成A--->C--->E...--->結束。每一次程序運行順序或許都不一樣,但它的控制流程是由輸入數據和你編寫的程序決定的。若是你知道這個程序當前的運行狀態(包括輸入數據和程序自己),那你就知道接下來甚至一直到結束它的運行流程。

   對於事件驅動型程序模型,它的流程大體以下:

  開始--->初始化--->等待

   與上面傳統編程模式不一樣,事件驅動程序在啓動以後,就在那等待,等待什麼呢?等待被事件觸發。傳統編程下也有「等待」的時候,好比在代碼塊D中,你定義了一個input(),須要用戶輸入數據。但這與下面的等待不一樣,傳統編程的「等待」,好比input(),你做爲程序編寫者是知道或者強制用戶輸入某個東西的,或許是數字,或許是文件名稱,若是用戶輸入錯誤,你還須要提醒他,並請他從新輸入。事件驅動程序的等待則是徹底不知道,也不強制用戶輸入或者幹什麼。只要某一事件發生,那程序就會作出相應的「反應」。這些事件包括:輸入信息、鼠標、敲擊鍵盤上某個鍵還有系統內部定時器觸發。

  在UI編程中,經常要對鼠標點擊進行相應,首先如何得到鼠標點擊呢?
  方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點
    1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
    2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
    3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
  因此,該方式是很是很差的。

  方式二:就是事件驅動模型
    目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
      1. 有一個事件(消息)隊列;
      2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
      3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
      4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;

 

  事件驅動編程是一種網絡編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

  讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。

  在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

  在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。

  在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

  1. 程序中有許多任務,並且…
  2. 任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…
  3. 在等待事件到來時,某些任務會阻塞。

  當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

  網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

2.2 Select\Poll\Epoll

2.2.1 Python之select簡析

sellect、poll、epoll三者的區別 
select 
  select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。

  select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。

select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。

另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。

poll 
  poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。

  poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。

  另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。

epoll 
  直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。

  epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。

  epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

  另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

總結一下:IO多路複用就是利用select/poll/epoll來監聽socket對象內部是否有變化的性質,來實現併發。

      select是操做系統底層用一個死循環一直輪詢全部socket對象,監聽其否有變化,監聽socket對象的個數有限制,1024個(不許確)。

      poll就是將select該進成沒有個數限制。

      epoll,不只沒有個數限制,還沒用死循環去輪詢全部socket對象,二是socket對象有變化,自動告知epoll。

Python select 

  Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。

  接下來經過echo server例子要以瞭解select 是如何經過單進程實現同時處理多個非阻塞的socket鏈接的:

 1 import select
 2 import socket
 3 import sys
 4 import Queue
 5  
 6 # Create a TCP/IP socket
 7 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 8 server.setblocking(0)
 9  
10 # Bind the socket to the port
11 server_address = ('localhost', 10000)
12 print >>sys.stderr, 'starting up on %s port %s' % server_address
13 server.bind(server_address)
14  
15 # Listen for incoming connections
16 server.listen(5)
View Code

  select()方法接收並監控3個通訊列表, 第一個是全部的輸入的data,就是指外部發過來的數據,第2個是監控和接收全部要發出去的data(outgoing data),第3個監控錯誤信息,接下來咱們須要建立2個列表來包含輸入和輸出信息來傳給select().

# Sockets from which we expect to read
inputs = [ server ] # Sockets to which we expect to write outputs = [ ]

  全部客戶端的進來的鏈接和數據將會被server的主循環程序放在上面的list中處理,咱們如今的server端須要等待鏈接可寫(writable)以後才能過來,而後接收數據並返回(所以不是在接收到數據以後就馬上返回),由於每一個鏈接要把輸入或輸出的數據先緩存到queue裏,而後再由select取出來再發出去。

# Outgoing message queues (socket:Queue)
message_queues = {}

  The main portion of the server program loops, calling select() to block and wait for network activity.

  下面是此程序的主循環,調用select()時會阻塞和等待直到新的鏈接和數據進來

while inputs: # Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs) 

  當你把inputs,outputs,exceptional(這裏跟inputs共用)傳給select()後,它返回3個新的list,咱們上面將他們分別賦值爲readable,writable,exceptional, 全部在readable list中的socket鏈接表明有數據可接收(recv),全部在writable list中的存放着你能夠對其進行發送(send)操做的socket鏈接,當鏈接通訊出現error時會把error寫到exceptional列表中。

  Readable list 中的socket 能夠有3種可能狀態,第一種是若是這個socket是main "server" socket,它負責監聽客戶端的鏈接,若是這個main server socket出如今readable裏,那表明這是server端已經ready來接收一個新的鏈接進來了,爲了讓這個main server能同時處理多個鏈接,在下面的代碼裏,咱們把這個main server的socket設置爲非阻塞模式。

# Handle inputs
 for s in readable:
   
      if s is server:
          # A "readable" server socket is ready to accept a connection
          connection, client_address = s.accept()
          print >>sys.stderr, 'new connection from', client_address
          connection.setblocking(0)
          inputs.append(connection)
  
         # Give the connection a queue for data we want to send
         message_queues[connection] = Queue.Queue()
View Code

  第二種狀況是這個socket是已經創建了的鏈接,它把數據發了過來,這個時候你就能夠經過recv()來接收它發過來的數據,而後把接收到的數據放到queue裏,這樣你就能夠把接收到的數據再傳回給客戶端了。

 else:
      data = s.recv(1024)
      if data:
          # A readable client socket has data
          print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
          message_queues[s].put(data)
          # Add output channel for response
          if s not in outputs:
              outputs.append(s)
View Code

  第三種狀況就是這個客戶端已經斷開了,因此你再經過recv()接收到的數據就爲空了,因此這個時候你就能夠把這個跟客戶端的鏈接關閉了。

else:
      # Interpret empty result as closed connection
      print >>sys.stderr, 'closing', client_address, 'after reading no data'
      # Stop listening for input on the connection
      if s in outputs:
          outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉
      inputs.remove(s)    #inputs中也刪除掉
      s.close()           #把這個鏈接關閉掉
   
     # Remove message queue
     del message_queues[s]  
View Code

  對於writable list中的socket,也有幾種狀態,若是這個客戶端鏈接在跟它對應的queue裏有數據,就把這個數據取出來再發回給這個客戶端,不然就把這個鏈接從output list中移除,這樣下一次循環select()調用時檢測到outputs list中沒有這個鏈接,那就會認爲這個鏈接還處於非活動狀態

# Handle outputs
  for s in writable:
      try:
          next_msg = message_queues[s].get_nowait()
      except Queue.Empty:
          # No messages waiting so stop checking for writability.
          print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
          outputs.remove(s)
      else:
         print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
         s.send(next_msg)
View Code

  最後,若是在跟某個socket鏈接通訊過程當中出了錯誤,就把這個鏈接對象在inputs\outputs\message_queue中都刪除,再把鏈接關閉掉

# Handle "exceptional conditions"
for s in exceptional:
    print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
    # Stop listening for input on the connection
    inputs.remove(s)
    if s in outputs:
        outputs.remove(s)
    s.close()
 
    # Remove message queue
    del message_queues[s]
View Code

client 端

  下面的這個是客戶端程序展現瞭如何經過select()對socket進行管理並與多個鏈接同時進行交互,

import socket
  import sys
   
  messages = [ 'This is the message. ',
               'It will be sent ',
               'in parts.',
               ]
  server_address = ('localhost', 10000)
   
 # Create a TCP/IP socket
 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           ]
  
 # Connect the socket to the port where the server is listening
 print >>sys.stderr, 'connecting to %s port %s' % server_address
 for s in socks:
     s.connect(server_address)
View Code

  接下來經過循環經過每一個socket鏈接給server發送和接收數據。

for message in messages:
   
      # Send messages on both sockets
      for s in socks:
          print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
          s.send(message)
   
      # Read responses on both sockets
      for s in socks:
         data = s.recv(1024)
         print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
         if not data:
             print >>sys.stderr, 'closing socket', s.getsockname()
複製代碼
View Code

最後代碼整理以下:

Server端:

 #_*_coding:utf-8_*_
   
  import select
  import socket
  import sys
  import queue
   
  # Create a TCP/IP socket
  server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 server.setblocking(False)
  
 # Bind the socket to the port
 server_address = ('localhost', 10000)
 print(sys.stderr, 'starting up on %s port %s' % server_address)
 server.bind(server_address)
  
 # Listen for incoming connections
 server.listen(5)
  
 # Sockets from which we expect to read
 inputs = [ server ]
  
 # Sockets to which we expect to write
 outputs = [ ]
  
 message_queues = {}
 while inputs:
  
     # Wait for at least one of the sockets to be ready for processing
     print( '\nwaiting for the next event')
     readable, writable, exceptional = select.select(inputs, outputs, inputs)
     # Handle inputs
     for s in readable:
  
         if s is server:
             # A "readable" server socket is ready to accept a connection
             connection, client_address = s.accept()
             print('new connection from', client_address)
             connection.setblocking(False)
            inputs.append(connection)
 
            # Give the connection a queue for data we want to send
             message_queues[connection] = queue.Queue()
         else:
             data = s.recv(1024)
             if data:
                 # A readable client socket has data
                 print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                 message_queues[s].put(data)
                 # Add output channel for response
                 if s not in outputs:
                     outputs.append(s)
             else:
                 # Interpret empty result as closed connection
                 print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                 if s in outputs:
                     outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉
                 inputs.remove(s)    #inputs中也刪除掉
                 s.close()           #把這個鏈接關閉掉
  
                 # Remove message queue
                 del message_queues[s]
     # Handle outputs
     for s in writable:
         try:
             next_msg = message_queues[s].get_nowait()
         except queue.Empty:
             # No messages waiting so stop checking for writability.
             print('output queue for', s.getpeername(), 'is empty')
             outputs.remove(s)
         else:
             print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
             s.send(next_msg)
     # Handle "exceptional conditions"
     for s in exceptional:
         print('handling exceptional condition for', s.getpeername() )
         # Stop listening for input on the connection
         inputs.remove(s)
         if s in outputs:
            outputs.remove(s)
         s.close()
 
         # Remove message queue
        del message_queues[s]
View Code

Client端:

 import socket
  import sys
   
  messages = [ 'This is the message. ',
               'It will be sent ',
               'in parts.',
               ]
  server_address = ('localhost', 10000)
   
 # Create a TCP/IP socket
 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           ]
  
 # Connect the socket to the port where the server is listening
 print >>sys.stderr, 'connecting to %s port %s' % server_address
 for s in socks:
     s.connect(server_address)
  
 for message in messages:
  
     # Send messages on both sockets
     for s in socks:
         print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
         s.send(message)
  
     # Read responses on both sockets
     for s in socks:
         data = s.recv(1024)
         print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
         if not data:
             print >>sys.stderr, 'closing socket', s.getsockname()
             s.close()
View Code
相關文章
相關標籤/搜索