此文已轉:https://www.jianshu.com/p/6606d1a44340
原文章來源於:
https://mp.weixin.qq.com/s?__biz=MzIxMjY5NTE0MA==&mid=2247483720&idx=1&sn=f016c06ddd17765fd50b705fed64429c
原文章寫得很精彩,但有些代碼仍是能夠優化下的。並且這文章一直只有上篇,惋惜了。
接下來按我的看法,從代碼角度解析下這篇文章:
前提知識講解:
一、計算機資源:常分爲CPU資源、內存資源、硬盤資源和網絡資源
二、進程阻塞:正在運行的程序,因爲自身某個模塊須要使用硬盤或網絡I/O資源等,而系統又未及時響應,致使進程處於待機狀態,直至等待事件做出迴應後纔會被喚醒。
三、進程非阻塞:同理,在獲取某些資源時,不會等待結果響應,而是繼續處理其餘模塊。
咱們以socket爲例,以下可獲取阻塞與非阻塞兩種編程
import socket sock = socket.socket()
socket.setblocking(True) # 默認就是阻塞。即套接字 創建鏈接/發送請求/接受請求 的時候,是阻塞的。
socket.setblocking(False) # 設置爲非阻塞,即上述請求過程不會阻塞,而是繼續處理其餘模塊。
使用原生asyncio編寫異步程序:python
在此代碼中,咱們須要注意幾個關鍵點git
一、loop = asyncio.get_event_loop() # 開啓事件循環,異步"任務"將在此循環執行github
二、asyncio.create_task() # 將一個協程包裝成一個"任務"排入日程準備執行編程
三、asyncio.gather() # 同步執行"任務"服務器
import asyncio import aiohttp import time loop = asyncio.get_event_loop() async def fetch(): async with aiohttp.ClientSession(loop=loop) as session: async with session.get('http://www.baidu.com') as response: print(await response.read()) async def multi_fetch(): await asyncio.gather(*[asyncio.create_task(fetch()) for _ in range(10)]) if __name__ == '__main__': start = time.time() loop.run_until_complete(fetch()) # 執行一次 # loop.run_until_complete(multi_fetch()) # 執行十次 print(time.time() - start)
接下來咱們就來一步一步的實現上述幾個關鍵點,實現手寫本身的異步程序微信
首先得實現一個阻塞程序,以socket爲例。此例子比較簡單,大體看一下便可。網絡
用時的話,咱們能夠明顯看出blocking_socket() 用時大體0.7ssession
而multi_blocking_socket() 用時大體0.7s,恰好是10倍左右。異步
import socket import time def blocking_socket(response=b''): sock = socket.socket() # 默認爲阻塞鏈接 sock.connect(('www.baidu.com', 80)) # 創建百度TCP鏈接 sock.send(b'GET / HTTP/1.0\r\n\r\n') # 發送HTTP協議 chunk = sock.recv(1024) # 接收數據 while chunk: response += chunk chunk = sock.recv(1024) return response def mutil_blocking_socket(): return [blocking_socket() for _ in range(10)].__len__() if __name__ == '__main__': start = time.time() blocking_socket() # 執行一次 # mutil_blocking_socket() # 執行十次 print(time.time() - start)
接着咱們來實現非阻塞程序,仍然使用socket來實現socket
來比較時間,咱們會驚訝的發現,no_blocking_socket() 執行一次大體0.07s
而multi_no_blocking_socket() 執行十次,大體是0.7s....
沒錯,花費時間和阻塞編程是同樣的。咱們的非阻塞編程並無達到實際的效果。
import socket import time def no_blocking_socket(response=b''): sock = socket.socket() sock.setblocking(False) # 設置非阻塞鏈接 try: sock.connect(('www.baidu.com', 80)) except BlockingIOError: # 非阻塞式創建鏈接在此處會報錯,捕獲忽略便可 pass while True: try: sock.send(b'GET / HTTP/1.0\r\n\r\n') # 發送HTTP協議 break except OSError: # 此處會報錯。由於套接字與百度服務器的TCP鏈接尚未創建,
# 可是套接字倒是非阻塞的,故在未創建鏈接的狀況下發送協議會報錯,此處捕獲此異常 continue while True: try: chunk = sock.recv(1024) while chunk: response += chunk chunk = sock.recv(1024) return response except OSError: # 同理,對方服務器未接收HTTP協議,不會返回數據,即便返回,也有時延。故此時接收數據會報異常,此處捕獲此異常 pass def multi_no_blocking_socket(): return [no_blocking_socket() for _ in range(10)].__len__() if __name__ == '__main__': start = time.time() no_blocking_socket() # 執行一次 # multi_no_blocking_socket() # 執行十次 print(time.time() - start)
咱們來分析一下,爲何會形成這種現象。咱們發現這套非阻塞代碼與以前的阻塞代碼相比,
多了三處 try: ... except: ...
多了兩處 while True: ...
代碼確實是非阻塞編程,也就是程序不會在網絡IO模塊處阻塞,可是程序也沒有把空閒下來的時間花在"正確"的地方
在非阻塞的狀況下,CUP把空閒下來的時間不停的去"試錯",也就是程序會不停的嘗試發送協議,直到發送成功。不停的嘗試接收數據,直到接收成功。
這樣就致使咱們的非阻塞編程,實際上和阻塞編程是同樣的,惟一要說不一樣。就是非阻塞程序中的CPU可能會 "忙一點"。
那麼咱們能夠注意到了問題的關鍵,就是咱們不知道何時程序是 "準備就緒了",也就是何時能夠發送協議,何時能夠接收數據。
其實操做系統已經幫咱們實現了,它將事件的 I/O 都封裝成了事件,如可讀事件,可寫事件。那麼咱們就能夠當即想到,當咱們的套接字狀態變爲咱們所須要的時候,
就當即執行接下來的步驟。因此此處,咱們就須要 "回調"!
這裏咱們說明下python中selectors模塊,用於註冊事件的回調
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE sock = socket.socket() # 獲取套接字
selector = DefaultSelector() # 獲取selector對象 selector.register(sock.fileno(), EVENT_WRITE, on_send) # fileno()獲取當前socket套接字的文件描述符,並綁定事件EVENT_WRITE,回調函數爲on_send selector.unregister(sock.fileno()) # 註銷事件綁定 selector.register(sock.fileno(), EVENT_READ, on_recv) # 同理,綁定事件EVENT_READ,回調函數爲on_recv
while True:
events = selector.select() # 此處的events是操做系統返回的事件,也就是咱們綁定的事件被觸發了,
# 此處是阻塞獲取的。也就是用一個事件循環的阻塞,來代替咱們的while True: ...
for sock, mask in events:
sock.data() #sock.data爲綁定的回調函數,也就是上面的on_send和on_recv
如此咱們就不須要本身手動while True來監控事件狀態的改變,將這件工做交給事件循環。此處咱們將其命名爲loop。
接下來就是回調編程了
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE selector = DefaultSelector() stop_loop = 10 class Crawler: def __init__(self, flag=10): self.flag = flag self.sock = None self.response = b'' def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('www.baidu.com', 80)) except BlockingIOError: pass
# fileno()獲取當前socket套接字的文件描述符,並綁定事件回調
selector.register(self.sock.fileno(), EVENT_WRITE, self.on_send)
def on_send(self): selector.unregister(self.sock.fileno()) self.sock.send(b'GET / HTTP/1.0\r\n\r\n') selector.register(self.sock.fileno(), EVENT_READ, self.on_recv) def on_recv(self): chunk = self.sock.recv(1024) if chunk: self.response += chunk else: global stop_loop stop_loop -= self.flag selector.unregister(self.sock.fileno()) def loop(): # 事件循環,由操做系統返回那個事件發生了,對應執行那些事件的回調。 while stop_loop: events = selector.select() for sock, mask in events: sock.data() if __name__ == '__main__': start = time.time() Crawler(10).fetch() # 執行一次 # [Crawler(1).fetch() for _ in range(10)] # 執行十次 loop() print(time.time() - start)
咱們來捋一下代碼執行的流程:
一、首先實例化一個Crawler對象,而後執行此實例的fetch方法
二、fetch方法發起了與百度服務器的鏈接,而後註冊了回調函數。
三、此時會走到loop()函數來,執行事件循環。直到咱們與對方服務器的鏈接創建成功,則此時OS會返回事件,咱們則執行對應的回調事件 sock.data() <=> self.on_send()
四、執行on_recv,首先註銷上一個事件,而後發送協議,再接着註冊可讀事件。繼續進入等待
五、此時繼續進入loop事件循環,直到觸發註冊事件,執行回調函數on_recv,因爲一次只接收1024,故可能會接收屢次,也就是會觸發屢次on_recv事件的回調,直接接收完成。
六、接收完成,咱們令全局變量stop_loop - flag,來中止loop事件循環。程序結束。
而後咱們來看下時間,咱們會發現執行一次的時間和執行十次的時間基本是差很少的。說明咱們編寫的程序是沒有問題的。
回調式異步編程,成功!
至此,咱們已經實現了回調式異步編程,可是咱們思考下第一個例子,是基於協程的異步編程,故咱們如今來調整代碼,編寫協程。
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE selector = DefaultSelector() stop_loop = 10 class Future: # 用於存放將來可能出現的數據,當出現時執行一次回調函數 # 此中的result僅做爲一箇中轉,實際仍是經過回調返回給協程 def __init__(self): self.result = None self.callback = None def set_callback(self, func): self.callback = func def set_result(self, result): self.result = result self.callback(self) if self.callback else None class Task: # 用於啓動協程,該類實例初始化時傳入爲協程對象,執行self.process方法 # 調用協程的send方法,啓動協程,並最後綁定回調函數 def __init__(self, co_routine): self.co_routine = co_routine future = Future() self.process(future) def process(self, future): try: next_future = self.co_routine.send(future.result) except StopIteration: return next_future.set_callback(self.process) class Crawler: def __init__(self, flag=10): self.flag = flag self.sock = None self.response = b'' def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('www.baidu.com', 80)) except BlockingIOError: pass future = Future() def _on_send(): future.set_result(None) def _on_recv(): future.set_result(self.sock.recv(1024)) selector.register(self.sock.fileno(), EVENT_WRITE, _on_send) yield future selector.unregister(self.sock.fileno()) self.sock.send(b'GET / HTTP/1.0\r\n\r\n') selector.register(self.sock.fileno(), EVENT_READ, _on_recv) while True: chunk = yield future # 在此處輪詢EVENT_READ事件,直至全部數據加載完畢 if chunk: self.response += chunk else: global stop_loop stop_loop -= self.flag return self.response def loop(): while stop_loop: events = selector.select() for sock, mask in events: sock.data() if __name__ == '__main__': start = time.time() Task(Crawler(10).fetch()) # 傳入協程fetch,使用Task實例化調用協程的send方法來啓動協程 # [Task(Crawler(1).fetch()) for _ in range(10)] # 同理,啓動十個協程任務 loop() print(time.time() - start)
咱們繼續來整理下代碼執行的流程:
一、首先實例化一些Crawler對象,調用調用此實例的fetch函數獲得一個協程。此時協程是沒有執行的。(協程須要send等觸發纔會執行)
二、將此協程裝於Task用來建立任務實例,在任務中會主動觸發協程的send函數來啓動協程
三、此時協程已觸發,註冊事件 selector.register(self.sock.fileno(), EVENT_WRITE, _on_send),而後返回future,此時協程到此暫停
四、返回的future會添加任務的回調函數,也就是self.precess()。而loop也開始了事件輪詢,當套接字的文件描述符狀態變爲可寫狀態時,觸發回調方法_on_send
五、_on_send方法執行Future中的set_result方法,此時在此方法中會調用一次future註冊的回調函數,繼續觸發任務Task中協程的send方法,回到協程上次暫停的狀態
六、回來後,首先註銷事件EVENT_WRITE。發送HTTP協議請求。再註冊事件 selector.register(self.sock.fileno(), EVENT_READ, _on_recv)
七、loop事件輪詢,當套接字的文件描述符變爲可讀狀態時,觸發回調方法_on_recv
八、_on_recv方法執行Future中的set_result方法,此時在方法中會初始化result爲sock.recv(1024)的值,並執行註冊的回調函數,將此結果繼續傳遞至協程上回暫停的地方
九、因爲一直沒有註銷事件EVENT_READ,故會一直驅動事件輪詢直至結束
十、Task、Future、Crawler、loop這四個就這麼神奇的串聯在一塊兒了,難以想象的說。
不過這樣寫貌似不臺好看,雖然感受也能夠,可是不少模塊其實都是能夠拆離開的
下面就是一個拆分版本,就不細細的分析流程啦
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE selector = DefaultSelector() stop_loop = 10 def fetch(sock): sock.setblocking(False) try: sock.connect(('www.baidu.com', 80)) except BlockingIOError: pass future = Future() def _on_send(): future.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, _on_send) yield from future selector.unregister(sock.fileno()) return future def read(sock, future, flag, response=b''): def _on_recv(): future.set_result(sock.recv(1024)) selector.register(sock.fileno(), EVENT_READ, _on_recv) chunk = yield from future while chunk: response += chunk chunk = yield from future selector.unregister(sock.fileno()) global stop_loop stop_loop -= flag return response def loop(): while stop_loop: events = selector.select() for sock, mask in events: sock.data() class Future: def __init__(self): self.result = None self.callback = None def set_callback(self, func): self.callback = func def set_result(self, result): self.result = result self.callback(self) if self.callback else None def __iter__(self): yield self return self.result class Task: def __init__(self, co_routine): self.co_routine = co_routine future = Future() self.process(future) def process(self, future): try: next_future = self.co_routine.send(future.result) except StopIteration: return next_future.set_callback(self.process) class Crawler: def __init__(self, flag): self.flag = flag def fetch(self): sock = socket.socket() future = yield from fetch(sock) sock.send(b'GET / HTTP/1.0\r\n\r\n') response = yield from read(sock, future, self.flag) print(response) if __name__ == '__main__': start = time.time() Task(Crawler(10).fetch()) # [Task(Crawler(1).fetch()) for _ in range(10)] loop() print(time.time() - start)
代碼放到github上了
https://github.com/CzaOrz/ioco/tree/master/open_source_project/%E5%BC%82%E6%AD%A5%E6%95%99%E7%A8%8B%E5%AD%A6%E4%B9%A0/%E5%BC%82%E6%AD%A5%E5%8D%8F%E7%A8%8B