python併發編程之IO模型,

瞭解新知識以前須要知道的一些知識linux

同步(synchronous):一個進程在執行某個任務時,另一個進程必須等待其執行完畢,才能繼續執行程序員

#所謂同步,就是在發出一個功能調用時,在沒有獲得結果以前,該調用就不會返回。按照這個定義,
其實絕大多數函數都是同步調用。可是通常而言,咱們在說同步、異步的時候,
特指那些須要其餘部件協做或者須要必定時間完成的任務。
#舉例: #1. multiprocessing.Pool下的apply #發起同步調用後,就在原地等着任務結束,
根本不考慮任務是在計算仍是在io阻塞,總之就是一股腦地等任務結束
#2. concurrent.futures.ProcessPoolExecutor().submit(func,).result() #3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()

異步(asynchronous):web

#異步的概念和同步相對。當一個異步功能調用發出後,調用者不能馬上獲得結果。
當該異步功能完成後,經過狀態、通知或回調來通知調用者。若是異步功能用狀態來通知,
那麼調用者就須要每隔必定時間檢查一次,效率就很低(有些初學多線程編程的人,總喜歡用一個循環去檢查某個變量的值,這實際上是一 種很嚴重的錯誤)。
若是是使用通知的方式,效率則很高,由於異步功能幾乎不須要作額外的操做。至於回調函數,其實和通知沒太多區別。
#舉例: #1. multiprocessing.Pool().apply_async() #發起異步調用後,並不會等待任務結束才返回,相反,
會當即獲取一個臨時結果(並非最終的結果,多是封裝好的一個對象)。
#2. concurrent.futures.ProcessPoolExecutor(3).submit(func,) #3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)

阻塞(blocking):數據庫

#阻塞調用是指調用結果返回以前,當前線程會被掛起(如遇到io操做)。函數只有在獲得結果以後纔會
將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不一樣的。對於同步調用來講,
不少時候當前線程仍是激活的,只是從邏輯上當前函數沒有返回而已。
#舉例: #1. 同步調用:apply一個累計1億次的任務,該調用會一直等待,直到任務返回結果爲止,
但並未阻塞住(即使是被搶走cpu的執行權限,那也是處於就緒態);
#2. 阻塞調用:當socket工做在阻塞模式的時候,若是沒有數據的狀況下調用recv函數,
則當前線程就會被掛起,直到有數據爲止。

非阻塞(non-blocking):編程

 #非阻塞和阻塞的概念相對應,指在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線程。 windows

小結:緩存

 #1. 同步與異步針對的是函數/任務的調用方式:同步就是當一個進程發起一個函數(任務)調用的時候,一直等到函數(任務)完成,而進程繼續處於激活狀態。而異步狀況下是當一個進程發起一個函數(任務)調用的時候,不會等函數返回,而是繼續往下執行當,函數返回的時候經過狀態、通知、事件等方式通知進程任務完成。 #2. 阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能知足的時候就將進程掛起,而非阻塞則不會阻塞當前進程 tomcat

1、IO模型介紹服務器

IO發生時涉及的對象和步驟。對於一個網絡IO(network IO),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,該操做會經歷兩個階段:網絡

 #1)等待數據準備 (Waiting for the data to be ready)

#2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process) 

記住這兩點很重要,由於這些IO模型的區別就是在兩個階段上各有不一樣的狀況。

2、阻塞IO  (blocking IO)

 

阻塞IO(blocking IO)的特色:就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。

實際上,除非特別指定,幾乎全部的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,如在調用recv(1024)的同時,線程將被阻塞,在此期間,線程將沒法執行任何運算或響應任何的網絡請求。

 1 from socket import *
 2 server = socket(AF_INET,SOCK_STREAM)
 3 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
 4 server.bind(('127.0.0.1',8080))
 5 server.listen(5)
 6 print('start runnig...')
 7 while True:
 8     conn,addr = server.accept()  #IO操做 在這accept的時候不能幹recv的活
 9     print(addr)
10     while True:
11         try:
12             data = conn.recv(1024)  #IO操做
13             conn.send(data.upper())
14         except Exception:
15             break
16     conn.close()
17 server.close()
18 
19 # 咱們之前寫的這個就是阻塞的IO模型:一旦阻塞了就在那卡着
20 # 直到等到數據已經到了操做系統,操做系統再從內核拷貝給應用程序
21 # 阻塞IO在那兩個階段全都阻塞住了
服務端
 1 from socket import *
 2 client  = socket(AF_INET,SOCK_STREAM)
 3 client.connect(('127.0.0.1',8080))
 4 while True:
 5     cmd = input('>>:').strip()
 6     if not cmd:continue
 7     client.send(cmd.encode('utf-8'))
 8     data = client.recv(1024)
 9     print('接受的是:%s'%data.decode('utf-8'))
10 client.close()
客戶端

 一個簡單的解決方案:

#在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),
這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。

該方案的問題是:

#開啓多進程或都線程的方式,在遇到要同時響應成百上千路的鏈接請求,
則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,
並且線程與進程自己也更容易進入假死狀態。

改進方案:    

不少程序員可能會考慮使用「線程池」或「鏈接池」。
「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務
。「鏈接池」維持鏈接的緩存池,儘可能重用已有的鏈接、減小建立和關閉鏈接的頻率。這兩種技術均可以很好
的下降系統開銷,都被普遍應用不少大型系統,如websphere、tomcat和各類數據庫等。

  改進後方案其實也存在着問題:

#「線程池」和「鏈接池」技術也只是在必定程度上緩解了頻繁調用IO接口帶來的資源佔用。並且,
所謂「池」始終有其上限,當請求大大超過上限時,「池」構成的系統對外界的響應並不比沒有池的時候效果
好多少。因此使用「池」必須考慮其面臨的響應規模,並根據響應規模調整「池」的大小。

對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。總之,多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,能夠用非阻塞接口來嘗試解決這個問題。

3、非阻塞IO  (nonblocking IO)

多線程,多進程,進程池,線程池均可以實現併發,可是仍然沒有解決IO問題
那麼下面咱們來了解一下非阻塞IO

從圖中能夠看出,當用戶進程發出read操做時若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而用戶就能夠在本次到下次再發起read詢問的時間間隔內作其餘事情,或者直接再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存(這一階段仍然是阻塞的),而後返回。

    也就是說非阻塞的recvform系統調用調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,能夠乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

    因此,在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有。

server.setblocking()#默認是True  
server.setblocking(False) #False的話就成非阻塞了,這只是對於socket套接字來講的 

因此,在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問內核數據準備好了沒有。

wait data 等數據的這個階段是不阻塞的
copy data 這個階段仍是要阻塞的

服務端

 1 #這種程序雖然說解決了單線程併發,可是大大的佔用了cpu
 2 from socket import *
 3 import time
 4 severt = socket(AF_INET,SOCK_STREAM)
 5 severt.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
 6 severt.bind(('127.0.0.1',8080))
 7 severt.listen(5)
 8 severt.setblocking(False) #默認是True  (若是是False,套接字裏的一些阻塞操做都變成非阻塞的)
 9 print('startting....')
10 conn_l = []
11 del_l =[]
12 while True:
13     try:
14         print(conn_l)
15         conn,addr = severt.accept() #收不到數據的時候纔出異常
16         print(conn)
17         conn_l.append(conn)
18     except BlockingIOError:  #吧收不到數據的那段時間利用起來(利用他收不到
19         #數據的時候,才幹下面的for循環)
20        for conn in conn_l:
21             try:
22                 data = conn.recv(1024)
23                 conn.send(data.upper())
24             except BlockingIOError:
25                 pass
26             except ConnectionResetError:  #端開連接的錯誤(若是忽然斷開連接,會報錯
27                                            #就先添加到列表裏面去,完了吧連接給清除了)
28                  del_l.append(conn)
29        for obj in del_l:
30              obj.close()
31              conn_l.remove(obj)
32        del_l.clear()

客戶端

1 from socket import *
2 client = socket(AF_INET,SOCK_STREAM)
3 client.connect(('127.0.0.1',8080))
4 while True:
5     cmd = input('>>:').strip()
6     if not cmd:continue
7     client.send(cmd.encode('utf-8'))
8     data = client.recv(1024)
9     print(data.decode('utf-8'))

對服務端的說明:若是客戶端斷開鏈接的時候,就會發生ConnectionResetError

因此咱們的處理一下這個異常。就如上邊的服務端所示

 可是非阻塞IO模型毫不被推薦。

非阻塞IO模型優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在「」同時「」執行)。

非阻塞IO模型缺點:

  1. 循環調用recv()將大幅度推高CPU佔用率;這也是咱們在代碼中留一句time.sleep(2)的緣由,不然在低配主機下極容易出現卡機狀況

   2. 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。

4、多路複用IO  (IO multiplexing)

   當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
    這個圖和blocking IO的圖其實並無太大的不一樣,事實上還更差一些。由於這裏須要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。

   強調:

    1. 若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。

    2. 在多路複用模型中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

    結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接 

 1 #服務端
 2 from socket import *
 3 import select
 4 
 5 s=socket(AF_INET,SOCK_STREAM)
 6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
 7 s.bind(('127.0.0.1',8081))
 8 s.listen(5)
 9 s.setblocking(False) #設置socket的接口爲非阻塞
10 read_l=[s,]
11 while True:
12     r_l,w_l,x_l=select.select(read_l,[],[])
13     print(r_l)
14     for ready_obj in r_l:
15         if ready_obj == s:
16             conn,addr=ready_obj.accept() #此時的ready_obj等於s
17             read_l.append(conn)
18         else:
19             try:
20                 data=ready_obj.recv(1024) #此時的ready_obj等於conn
21                 if not data:
22                     read_l.remove(ready_obj)
23                     continue
24                 ready_obj.send(data.upper())
25             except ConnectionResetError:
26                 read_l.remove(ready_obj)
27 
28 #客戶端
29 from socket import *
30 c=socket(AF_INET,SOCK_STREAM)
31 c.connect(('127.0.0.1',8081))
32 
33 while True:
34     msg=input('>>: ')
35     if not msg:continue
36     c.send(msg.encode('utf-8'))
37     data=c.recv(1024)
38     print(data.decode('utf-8'))
39 
40 select IO模型
select IO 模塊
 1 # select模塊用select方法檢測那個套接字準備好了,也就是收沒收到數據(而咱們的
 2 # 非阻塞IO你不知道那個套接字準備好了,那麼用select模塊就能解決
 3 # 這個問題)
 4 # select還能夠檢測多個套接字
 5 # 因此select比非阻塞IO的效率高
 6 from socket import *
 7 import select
 8 server = socket(AF_INET,SOCK_STREAM)
 9 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
10 server.bind(('127.0.0.1',8081))
11 server.setblocking(False) #設置socket的套接字爲非阻塞的
12 server.listen(5)
13 print('start running....')
14 read_l = [server,]  #由於不僅就那麼一個列表要檢測。因此不要在參數裏面定死了
15 while True:
16     r_l,w_l,x_l = select.select(read_l,[],[])  #select()方法有四個參數
17     print(r_l)  #一開始服務端運行的時候,就等着,當你客戶端一連接的時候,他就
18                 # 檢測到有數據了(檢測那個數據準備好了)
19     for obj in r_l:
20         if obj == server:
21             conn,addr = obj.accept()  #accept要經歷兩個階段,可是程序若是走到這一步,那確定是數據準備好了
22                          #當數據已經準備好的時候,accept就只經歷一個copy數據的階段了
23             # print(addr)
24             read_l.append(conn)  #在監聽一下conn套接字(這時候已經監聽了兩個了:分別是accept,conn)
25         else:
26             data = obj.recv(1024)  # 此時的obj=conn
27             obj.send(data.upper())
28 #         obj.close()
29 # server.close()
服務端(多路複用IO)
 1 from socket import *
 2 import select
 3 client = socket(AF_INET,SOCK_STREAM)
 4 client.connect(('127.0.0.1',8081))
 5 while True:
 6     cmd = input('>>:')
 7     client.send(cmd.encode('utf-8'))
 8     data = client.recv(1024)
 9     print('接收的是:%s'%data.decode('utf-8'))
10 client.close()
客戶端(多路複用IO)

select監聽fd變化的過程分析:

#用戶進程建立socket對象,拷貝監聽的fd到內核空間,每個fd會對應一張系統文件表,
內核空間的fd響應到數據後,就會發送信號給用戶進程數據已到;
#用戶進程再發送系統調用,好比(accept)將內核空間的數據copy到用戶空間,
同時做爲接受數據端內核空間的數據清除,這樣從新監聽時fd再有新的數據又能夠響應到了
(發送端由於基於TCP協議因此須要收到應答後纔會清除)。

select模塊的優勢

#相比其餘模型,使用select() 的事件驅動模型只用單線程(進程)執行,佔用資源少,不消耗太多 CPU,
同時可以爲多客戶端提供服務。若是試圖創建一個簡單的事件驅動的服務器程序,這個模型有必定的參考價值。

select模塊的缺點

#首先select()接口並非實現「事件驅動」的最好選擇。由於當須要探測的句柄值較大時,
select()接口自己須要消耗大量時間去輪詢各個句柄。不少操做系統提供了更爲高效的接口,
如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。
若是須要實現更高效的服務器程序,相似epoll這樣的接口更被推薦。
遺憾的是不一樣的操做系統特供的epoll接口有很大差別,因此使用相似於epoll的接口實現
具備較好跨平臺能力的服務器會比較困難。
#其次,該模型將事件探測和事件響應夾雜在一塊兒,一旦事件響應的執行體龐大,則對整個模型是災難性的。

5、異步IO(asynchronous IO)

    用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

6、IO模型比較分析

  通過上面的介紹,會發現non-blocking IO和asynchronous IO的區別仍是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。

7、selsectors模塊

這三種IO多路複用模型在不一樣的平臺有着不一樣的支持,而epoll在windows下就不支持,好在咱們有selectors模塊,幫咱們默認選擇當前平臺下最合適的

 1 #服務端
 2 from socket import *
 3 import selectors
 4 
 5 sel=selectors.DefaultSelector()
 6 def accept(server_fileobj,mask):
 7     conn,addr=server_fileobj.accept()
 8     sel.register(conn,selectors.EVENT_READ,read)
 9 
10 def read(conn,mask):
11     try:
12         data=conn.recv(1024)
13         if not data:
14             print('closing',conn)
15             sel.unregister(conn)
16             conn.close()
17             return
18         conn.send(data.upper()+b'_SB')
19     except Exception:
20         print('closing', conn)
21         sel.unregister(conn)
22         conn.close()
23 
24 
25 
26 server_fileobj=socket(AF_INET,SOCK_STREAM)
27 server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
28 server_fileobj.bind(('127.0.0.1',8088))
29 server_fileobj.listen(5)
30 server_fileobj.setblocking(False) #設置socket的接口爲非阻塞
31 sel.register(server_fileobj,selectors.EVENT_READ,accept) #至關於網select的讀列表裏append了一個文件句柄server_fileobj,而且綁定了一個回調函數accept
32 
33 while True:
34     events=sel.select() #檢測全部的fileobj,是否有完成wait data的
35     for sel_obj,mask in events:
36         callback=sel_obj.data #callback=accpet
37         callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
38 
39 #客戶端
40 from socket import *
41 c=socket(AF_INET,SOCK_STREAM)
42 c.connect(('127.0.0.1',8088))
43 
44 while True:
45     msg=input('>>: ')
46     if not msg:continue
47     c.send(msg.encode('utf-8'))
48     data=c.recv(1024)
49     print(data.decode('utf-8'))
selectors

 8、總結

IO多路複用(select)
select檢測的是哪一個套接字準備好了(檢測的時候等待了,變成阻塞了)


select之因此比阻塞IO好,就是由於select能夠檢測多個套接字
多個連接下select才能發揮它的優點
可是你的套接字特別多,你怎麼知道哪一個好了呢,那麼就得用循環去遍歷一下
那麼若是特別多的時候,效率也就不咋高了

eppol:只支持linux系統(就是爲了解決select效率低的問題)
eppol比pool,select效率高


selectors 更好用,解決了上面select,eppol,pool的問題

socketserver用這個模塊IO問題也解決了,實現併發也解決了
相關文章
相關標籤/搜索