異步編程學習

此文已轉:https://www.jianshu.com/p/6606d1a44340

原文章來源於:
https://mp.weixin.qq.com/s?__biz=MzIxMjY5NTE0MA==&mid=2247483720&idx=1&sn=f016c06ddd17765fd50b705fed64429c


原文章寫得很精彩,但有些代碼仍是能夠優化下的。並且這文章一直只有上篇,惋惜了。

接下來按我的看法,從代碼角度解析下這篇文章:

前提知識講解:
一、計算機資源:常分爲CPU資源、內存資源、硬盤資源和網絡資源
二、進程阻塞:正在運行的程序,因爲自身某個模塊須要使用硬盤或網絡I/O資源等,而系統又未及時響應,致使進程處於待機狀態,直至等待事件做出迴應後纔會被喚醒。
三、進程非阻塞:同理,在獲取某些資源時,不會等待結果響應,而是繼續處理其餘模塊。
咱們以socket爲例,以下可獲取阻塞與非阻塞兩種編程
import socket
sock = socket.socket()

socket.setblocking(True)
# 默認就是阻塞。即套接字 創建鏈接/發送請求/接受請求 的時候,是阻塞的。
socket.setblocking(False) # 設置爲非阻塞,即上述請求過程不會阻塞,而是繼續處理其餘模塊。

 

使用原生asyncio編寫異步程序:python

在此代碼中,咱們須要注意幾個關鍵點git

一、loop = asyncio.get_event_loop()  # 開啓事件循環,異步"任務"將在此循環執行github

二、asyncio.create_task()  # 將一個協程包裝成一個"任務"排入日程準備執行編程

三、asyncio.gather()  # 同步執行"任務"服務器

import asyncio
import aiohttp
import time

loop = asyncio.get_event_loop()

async def fetch():
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get('http://www.baidu.com') as response:
            print(await response.read())

async def multi_fetch():
    await asyncio.gather(*[asyncio.create_task(fetch()) for _ in range(10)])

if __name__ == '__main__':
    start = time.time()
    loop.run_until_complete(fetch())  # 執行一次
    # loop.run_until_complete(multi_fetch())  # 執行十次
    print(time.time() - start)

 

接下來咱們就來一步一步的實現上述幾個關鍵點,實現手寫本身的異步程序微信

首先得實現一個阻塞程序,以socket爲例。此例子比較簡單,大體看一下便可。網絡

用時的話,咱們能夠明顯看出blocking_socket() 用時大體0.7ssession

而multi_blocking_socket() 用時大體0.7s,恰好是10倍左右。異步

import socket
import time

def blocking_socket(response=b''):
    sock = socket.socket()  # 默認爲阻塞鏈接
    sock.connect(('www.baidu.com', 80))  # 創建百度TCP鏈接
    sock.send(b'GET / HTTP/1.0\r\n\r\n')  # 發送HTTP協議
    chunk = sock.recv(1024)  # 接收數據
    while chunk:
        response += chunk
        chunk = sock.recv(1024)
    return response

def mutil_blocking_socket():
    return [blocking_socket() for _ in range(10)].__len__()

if __name__ == '__main__':
    start = time.time()
    blocking_socket()  # 執行一次
    # mutil_blocking_socket()  # 執行十次
    print(time.time() - start)

 

接着咱們來實現非阻塞程序,仍然使用socket來實現socket

來比較時間,咱們會驚訝的發現,no_blocking_socket() 執行一次大體0.07s

而multi_no_blocking_socket() 執行十次,大體是0.7s....

沒錯,花費時間和阻塞編程是同樣的。咱們的非阻塞編程並無達到實際的效果。

import socket
import time

def no_blocking_socket(response=b''):
    sock = socket.socket()
    sock.setblocking(False)  # 設置非阻塞鏈接
    try:
        sock.connect(('www.baidu.com', 80))
    except BlockingIOError:  # 非阻塞式創建鏈接在此處會報錯,捕獲忽略便可
        pass

    while True:
        try:
            sock.send(b'GET / HTTP/1.0\r\n\r\n')  # 發送HTTP協議
            break
        except OSError:  # 此處會報錯。由於套接字與百度服務器的TCP鏈接尚未創建,
# 可是套接字倒是非阻塞的,故在未創建鏈接的狀況下發送協議會報錯,此處捕獲此異常
continue while True: try: chunk = sock.recv(1024) while chunk: response += chunk chunk = sock.recv(1024) return response except OSError: # 同理,對方服務器未接收HTTP協議,不會返回數據,即便返回,也有時延。故此時接收數據會報異常,此處捕獲此異常 pass def multi_no_blocking_socket(): return [no_blocking_socket() for _ in range(10)].__len__() if __name__ == '__main__': start = time.time() no_blocking_socket() # 執行一次 # multi_no_blocking_socket() # 執行十次 print(time.time() - start)

咱們來分析一下,爲何會形成這種現象。咱們發現這套非阻塞代碼與以前的阻塞代碼相比,

多了三處 try: ... except: ...

多了兩處 while True: ...

代碼確實是非阻塞編程,也就是程序不會在網絡IO模塊處阻塞,可是程序也沒有把空閒下來的時間花在"正確"的地方

在非阻塞的狀況下,CUP把空閒下來的時間不停的去"試錯",也就是程序會不停的嘗試發送協議,直到發送成功。不停的嘗試接收數據,直到接收成功。

這樣就致使咱們的非阻塞編程,實際上和阻塞編程是同樣的,惟一要說不一樣。就是非阻塞程序中的CPU可能會 "忙一點"。

 

那麼咱們能夠注意到了問題的關鍵,就是咱們不知道何時程序是 "準備就緒了",也就是何時能夠發送協議,何時能夠接收數據。

其實操做系統已經幫咱們實現了,它將事件的 I/O 都封裝成了事件,如可讀事件,可寫事件。那麼咱們就能夠當即想到,當咱們的套接字狀態變爲咱們所須要的時候,

就當即執行接下來的步驟。因此此處,咱們就須要 "回調"!

這裏咱們說明下python中selectors模塊,用於註冊事件的回調

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

sock = socket.socket()  # 獲取套接字
selector
= DefaultSelector() # 獲取selector對象 selector.register(sock.fileno(), EVENT_WRITE, on_send) # fileno()獲取當前socket套接字的文件描述符,並綁定事件EVENT_WRITE,回調函數爲on_send selector.unregister(sock.fileno()) # 註銷事件綁定 selector.register(sock.fileno(), EVENT_READ, on_recv) # 同理,綁定事件EVENT_READ,回調函數爲on_recv

while True:
  events = selector.select() # 此處的events是操做系統返回的事件,也就是咱們綁定的事件被觸發了,
# 此處是阻塞獲取的。也就是用一個事件循環的阻塞,來代替咱們的while True: ...
  for sock, mask in events:
    sock.data() #
sock.data爲綁定的回調函數,也就是上面的on_send和on_recv

如此咱們就不須要本身手動while True來監控事件狀態的改變,將這件工做交給事件循環。此處咱們將其命名爲loop。

 

接下來就是回調編程了

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

class Crawler:
    def __init__(self, flag=10):
        self.flag = flag
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.baidu.com', 80))
        except BlockingIOError:
            pass
    # fileno()獲取當前socket套接字的文件描述符,並綁定事件回調
    selector.register(self.sock.fileno(), EVENT_WRITE, self.on_send)
    def on_send(self):
        selector.unregister(self.sock.fileno())
        self.sock.send(b'GET / HTTP/1.0\r\n\r\n')
        selector.register(self.sock.fileno(), EVENT_READ, self.on_recv)

    def on_recv(self):
        chunk = self.sock.recv(1024)
        if chunk:
            self.response += chunk
        else:
            global stop_loop
            stop_loop -= self.flag
            selector.unregister(self.sock.fileno())

def loop():  # 事件循環,由操做系統返回那個事件發生了,對應執行那些事件的回調。
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

if __name__ == '__main__':
    start = time.time()
    Crawler(10).fetch()  # 執行一次
    # [Crawler(1).fetch() for _ in range(10)]  # 執行十次
    loop()
    print(time.time() - start)

咱們來捋一下代碼執行的流程:

一、首先實例化一個Crawler對象,而後執行此實例的fetch方法

二、fetch方法發起了與百度服務器的鏈接,而後註冊了回調函數。

三、此時會走到loop()函數來,執行事件循環。直到咱們與對方服務器的鏈接創建成功,則此時OS會返回事件,咱們則執行對應的回調事件 sock.data()  <=> self.on_send()

四、執行on_recv,首先註銷上一個事件,而後發送協議,再接着註冊可讀事件。繼續進入等待

五、此時繼續進入loop事件循環,直到觸發註冊事件,執行回調函數on_recv,因爲一次只接收1024,故可能會接收屢次,也就是會觸發屢次on_recv事件的回調,直接接收完成。

六、接收完成,咱們令全局變量stop_loop - flag,來中止loop事件循環。程序結束。

而後咱們來看下時間,咱們會發現執行一次的時間和執行十次的時間基本是差很少的。說明咱們編寫的程序是沒有問題的。

回調式異步編程,成功!

 

至此,咱們已經實現了回調式異步編程,可是咱們思考下第一個例子,是基於協程的異步編程,故咱們如今來調整代碼,編寫協程。

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

class Future:
    # 用於存放將來可能出現的數據,當出現時執行一次回調函數
    # 此中的result僅做爲一箇中轉,實際仍是經過回調返回給協程

    def __init__(self):
        self.result = None
        self.callback = None

    def set_callback(self, func):
        self.callback = func

    def set_result(self, result):
        self.result = result
        self.callback(self) if self.callback else None

class Task:
    # 用於啓動協程,該類實例初始化時傳入爲協程對象,執行self.process方法
    # 調用協程的send方法,啓動協程,並最後綁定回調函數

    def __init__(self, co_routine):
        self.co_routine = co_routine
        future = Future()
        self.process(future)

    def process(self, future):
        try:
            next_future = self.co_routine.send(future.result)
        except StopIteration:
            return
        next_future.set_callback(self.process)

class Crawler:
    def __init__(self, flag=10):
        self.flag = flag
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.baidu.com', 80))
        except BlockingIOError:
            pass

        future = Future()

        def _on_send():
            future.set_result(None)

        def _on_recv():
            future.set_result(self.sock.recv(1024))

        selector.register(self.sock.fileno(), EVENT_WRITE, _on_send)
        yield future
        selector.unregister(self.sock.fileno())
        self.sock.send(b'GET / HTTP/1.0\r\n\r\n')
        selector.register(self.sock.fileno(), EVENT_READ, _on_recv)
        while True:
            chunk = yield future  # 在此處輪詢EVENT_READ事件,直至全部數據加載完畢
            if chunk:
                self.response += chunk
            else:
                global stop_loop
                stop_loop -= self.flag
                return self.response

def loop():
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

if __name__ == '__main__':
    start = time.time()
    Task(Crawler(10).fetch())  # 傳入協程fetch,使用Task實例化調用協程的send方法來啓動協程
    # [Task(Crawler(1).fetch()) for _ in range(10)]  # 同理,啓動十個協程任務
    loop()
    print(time.time() - start)

咱們繼續來整理下代碼執行的流程:

一、首先實例化一些Crawler對象,調用調用此實例的fetch函數獲得一個協程。此時協程是沒有執行的。(協程須要send等觸發纔會執行)

二、將此協程裝於Task用來建立任務實例,在任務中會主動觸發協程的send函數來啓動協程

三、此時協程已觸發,註冊事件 selector.register(self.sock.fileno(), EVENT_WRITE, _on_send),而後返回future,此時協程到此暫停

四、返回的future會添加任務的回調函數,也就是self.precess()。而loop也開始了事件輪詢,當套接字的文件描述符狀態變爲可寫狀態時,觸發回調方法_on_send

五、_on_send方法執行Future中的set_result方法,此時在此方法中會調用一次future註冊的回調函數,繼續觸發任務Task中協程的send方法,回到協程上次暫停的狀態

六、回來後,首先註銷事件EVENT_WRITE。發送HTTP協議請求。再註冊事件 selector.register(self.sock.fileno(), EVENT_READ, _on_recv)

七、loop事件輪詢,當套接字的文件描述符變爲可讀狀態時,觸發回調方法_on_recv

八、_on_recv方法執行Future中的set_result方法,此時在方法中會初始化result爲sock.recv(1024)的值,並執行註冊的回調函數,將此結果繼續傳遞至協程上回暫停的地方

九、因爲一直沒有註銷事件EVENT_READ,故會一直驅動事件輪詢直至結束

十、Task、Future、Crawler、loop這四個就這麼神奇的串聯在一塊兒了,難以想象的說。

 

不過這樣寫貌似不臺好看,雖然感受也能夠,可是不少模塊其實都是能夠拆離開的

下面就是一個拆分版本,就不細細的分析流程啦

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

def fetch(sock):
    sock.setblocking(False)
    try:
        sock.connect(('www.baidu.com', 80))
    except BlockingIOError:
        pass

    future = Future()

    def _on_send():
        future.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, _on_send)
    yield from future
    selector.unregister(sock.fileno())
    return future

def read(sock, future, flag, response=b''):
    def _on_recv():
        future.set_result(sock.recv(1024))

    selector.register(sock.fileno(), EVENT_READ, _on_recv)
    chunk = yield from future
    while chunk:
        response += chunk
        chunk = yield from future
    selector.unregister(sock.fileno())
    global stop_loop
    stop_loop -= flag
    return response


def loop():
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

class Future:
    def __init__(self):
        self.result = None
        self.callback = None

    def set_callback(self, func):
        self.callback = func

    def set_result(self, result):
        self.result = result
        self.callback(self) if self.callback else None

    def __iter__(self):
        yield self
        return self.result

class Task:
    def __init__(self, co_routine):
        self.co_routine = co_routine
        future = Future()
        self.process(future)

    def process(self, future):
        try:
            next_future = self.co_routine.send(future.result)
        except StopIteration:
            return
        next_future.set_callback(self.process)

class Crawler:
    def __init__(self, flag):
        self.flag = flag

    def fetch(self):
        sock = socket.socket()
        future = yield from fetch(sock)
        sock.send(b'GET / HTTP/1.0\r\n\r\n')
        response = yield from read(sock, future, self.flag)
        print(response)

if __name__ == '__main__':
    start = time.time()
    Task(Crawler(10).fetch())
    # [Task(Crawler(1).fetch()) for _ in range(10)]
    loop()
    print(time.time() - start)

代碼放到github上了
https://github.com/CzaOrz/ioco/tree/master/open_source_project/%E5%BC%82%E6%AD%A5%E6%95%99%E7%A8%8B%E5%AD%A6%E4%B9%A0/%E5%BC%82%E6%AD%A5%E5%8D%8F%E7%A8%8B
相關文章
相關標籤/搜索