Python 的異步 IO:Asyncio 之 TCP Client

關於 Asyncio 的其餘文章:html

若是不知道 Asyncio 是什麼,先看「Asyncio 簡介」那一篇。python

一個簡單的 HTTP Server

首先,爲了便於測試,咱們用 Python 內建的 http 模塊,運行一個簡單的 HTTP Server。segmentfault

新建一個目錄,添加文件 index.html,內容爲 Hello, World!(不是合法的 HTML 格式也沒有關係),而後運行以下命令(Ubuntu 請用 python3):緩存

$ python -m http.server
Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...

後面不一樣的 Client 實現,都會鏈接這個 Server:Host 爲 localhost,Port 爲 8000bash

全部的示例代碼,import 語句一概從略。app

import asyncio

初版

初版改寫自 Python 官方文檔裏的 例子
Python 的例子是 Echo Client,咱們稍微複雜一點,是 HTTP Client,都是 TCP。異步

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop

    def connection_made(self, transport):
        request = 'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n'
        transport.write(request.encode())

    def data_received(self, data):
        print(data.decode())

    def connection_lost(self, exc):
        self.loop.stop()

async def main(loop):
    await loop.create_connection(
        lambda: ClientProtocol(loop), 'localhost', 8000)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.run_forever()

TCP 鏈接由 loop.create_connection() 建立,後者須要一個 Protocol 工廠,即 lambda: ClientProtocol(loop)
Protocol 提供了 connection_made()data_received()connection_lost() 等接口,這些接口就像回調函數同樣,會在恰當的時候被調用。
咱們在 connection_made() 中,經過參數 transport 發送一個 HTTP GET 請求,隨後在 data_received() 裏,將收到 HTTP 應答。
connection_lost() 被調用時,表示 Server 已經斷開鏈接。async

運行結果:tcp

HTTP/1.0 200 OK
Server: SimpleHTTP/0.6 Python/3.6.3
Date: Mon, 04 Dec 2017 06:11:52 GMT
Content-type: text/html
Content-Length: 13
Last-Modified: Thu, 30 Nov 2017 05:37:31 GMT


Hello, World!

這就是一個標準的 HTTP 應答,包含 Status Line,Headers 和 Body。函數

值得注意的是,loop 其實運行了兩遍:

loop.run_until_complete(main(loop))  # 第一遍
loop.run_forever()  # 第二遍

若是沒有 run_forever(),在收到數據以前,loop 可能就結束了。協程 main() 只是建立好鏈接,隨後 run_until_complete() 天然也就無事可作而終。

加了 run_forever() 後,data_received() 等便有了被調用的機會。可是也有問題,loop 一直在跑,程序沒辦法結束,因此纔在 connection_lost() 裏主動中止 loop:

def connection_lost(self, exc):
        self.loop.stop()

第二版:ClientSession

初版在 connection_made() 中 hard code 了一個 HTTP GET 請求,靈活性較差,之後必然還有 POST 等其餘 HTTP 方法須要支持,因此有必要新增一個 ClientSession 類,來抽象客戶端的會話。因而,HTTP 請求的發送,便從 connection_made() 挪到了 ClientSession.get()

ClientSession 應該爲每個 HTTP 方法提供一個相應的方法,好比 postput 等等,雖然咱們只考慮 HTTP GET。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print(data.decode())

    def connection_lost(self, exc):
        self.loop.stop()

class ClientSession:
    def __init__(self, loop):
        self._loop = loop

    async def get(self, url, host, port):
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop), host, port)

        request = 'GET {} HTTP/1.1\r\nHost: {}\r\n\r\n'.format(url, host)
        transport.write(request.encode())

首先,ClientProtocol 新增了一個屬性 transport,是在 connection_made() 時保存下來的,這樣在 ClientSession 裏才能經過它來發送請求。

第三版:去掉 run_forever()

第三版的目的是:去掉 run_forever()

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None
        self._eof = False  # 有沒有收到 EOF
        self._waiter = None  # 用來等待接收數據的 future

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print(data.decode())

    def eof_received(self):
        self._eof = True
        self._wakeup_waiter()

    def connection_lost(self, exc):
        pass  # 再也不調用 self.loop.stop()

    async def wait_for_data(self):
        assert not self._eof
        assert not self._waiter

        self._waiter = self.loop.create_future()
        await self._waiter
        self._waiter = None

    def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            waiter.set_result(None)

class ClientSession:
    def __init__(self, loop):
        self._loop = loop

    async def get(self, url, host, port):
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop), host, port)

        request = 'GET {} HTTP/1.1\r\nHost: {}\r\n\r\n'.format(url, host)
        transport.write(request.encode())

        # 等待接收數據。
        await protocol.wait_for_data()

協程 main() 保持不變,可是 loop.run_forever() 已被剔除:

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
# 再也不須要 loop.run_forever()

HTTP 請求發送以後,繼續異步等待(await)數據的接收,即 protocol.wait_for_data()
這個等待動做,是經過往 loop 裏新增一個 future 來實現的:

async def wait_for_data(self):
        # ...
        self._waiter = self.loop.create_future()
        await self._waiter
        self._waiter = None

self._waiter 就是這個致使等待的 future,它會保證 loop 一直運行,直到數據接收完畢。
eof_received() 被調用時,數據就接收完畢了(EOF 的意思不用多說了吧?)。

def eof_received(self):
        self._eof = True
        self._wakeup_waiter()

_wakeup_waiter() 的做用是結束那個致使等待的 future,這樣 loop 也就能夠結束了:

def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            # 結束 waiter future,以便 loop 結束。
            waiter.set_result(None)

第四版:Reader

data_received() 裏直接輸出 HTTP 的應答結果,實在算不上什麼完美的作法。

def data_received(self, data):
        print(data.decode())

爲了解決這一問題,咱們引入一個 Reader 類,用來緩存收到的數據,並提供「讀」的接口給用戶。

首先,Protocol 被簡化了,前一版引入的各類處理,都轉交給了 Reader。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop, reader):
        self.loop = loop
        self.transport = None
        self._reader = reader

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        self._reader.feed(data)  # 轉交給 Reader

    def eof_received(self):
        self._reader.feed_eof()  # 轉交給 Reader

    def connection_lost(self, exc):
        pass

下面是 ClientSession.get() 基於 Reader 的實現:

class ClientSession:
    async def get(self, url, host, port):
        reader = Reader(self._loop)
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop, reader), host, port)
        # 發送請求,代碼從略...
        data = await reader.read()
        print(data.decode())

Reader 自己是從上一版的 Protocol 抽取出來的,惟一不一樣的是,接收的數據被臨時放在了一個 bytearray 緩存裏。

class Reader:
    def __init__(self, loop):
        self._loop = loop
        self._buffer = bytearray()  # 緩存
        self._eof = False
        self._waiter = None

    def feed(self, data):
        self._buffer.extend(data)
        self._wakeup_waiter()

    def feed_eof(self):
        self._eof = True
        self._wakeup_waiter()

    async def read(self):
        if not self._buffer and not self._eof:
            await self._wait_for_data()
            
        data = bytes(self._buffer)
        del self._buffer[:]
        return data

    async def _wait_for_data(self):
        assert not self._eof
        assert not self._waiter

        self._waiter = self._loop.create_future()
        await self._waiter
        self._waiter = None

    def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            waiter.set_result(None)

稍微解釋一下 read(),比較重要的是開始的一句判斷:

# 若是緩存爲空,而且 EOF 還沒收到,那就(繼續)等待接收數據。
        if not self._buffer and not self._eof:
            # read() 會停在這個地方,直到 feed() 或 feed_eof() 被調用,
            # 也就是說有數據可讀了。
            await self._wait_for_data()

接下來就是把緩存倒空:

data = bytes(self._buffer)
        del self._buffer[:]

運行一下,不難發現,ClientSession.get() 裏讀數據的那一句是有問題的。

data = await reader.read()

收到的 data 並非完整的 HTTP 應答,可能只包含了 HTTP 的 Headers,而沒有 Body。

一個 HTTP 應答,Server 端可能分屢次發送過來。好比這個測試用的 Hello World Server,Headers 和 Body 就分了兩次發送,也就是說 data_received() 會被調用兩次。

以前咱們在 eof_received() 裏才喚醒 waiter(_wakeup_waiter()),如今在 data_received() 裏就喚醒了,因而第一次數據收完, waiter 就結束了,loop 也便跟着結束。

爲了讀到完整的 HTTP 應答,方法也很簡單,把 read() 放在循環裏:

blocks = []
        while True:
            block = await reader.read()
            if not block:
                break
            blocks.append(block)
        data = b''.join(blocks)
        print(data.decode())

每一次 read(),若是緩存爲空,而且 EOF 還沒收到的話,就會再次建立 waiter,放到 loop 裏,繼續等待接收數據。

這個循環顯然應該交給 Reader 處理,對 ClientSession 需保持透明。

class Reader:
    async def read(self):
        blocks = []
        while True:
            block = await self._read()
            if not block:
                break
            blocks.append(block)
        data = b''.join(blocks)
        return data

    async def _read(self):
        if not self._buffer and not self._eof:
            await self._wait_for_data()
            
        data = bytes(self._buffer)
        del self._buffer[:]
        return data

最後,原來的 read() 重命名爲 _read(),新的 read() 在循環中反覆調用 _read(),直到無數據可讀。ClientSession 這邊直接調用新的 read() 便可。

第五版:Writer

到目前爲止,發送 HTTP 請求時,都是直接調用較爲底層的 transport.write()

async def get(self, url, host, port):
        # ...
        transport.write(request.encode())

能夠把它封裝在 Writer 中,與 Reader 的作法相似,可是 Writer 要簡單得多:

class Writer:
    def __init__(self, transport):
        self._transport = transport

    def write(self, data):
        self._transport.write(data)

而後在 ClientSession.get() 中建立 Writer

async def get(self, url, host, port):
        reader = Reader(self._loop)
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop, reader), host, port)

        writer = Writer(transport)
        request = 'GET {} HTTP/1.1\r\nHost: {}\r\n\r\n'.format(url, host)
        writer.write(request.encode())
        # ...

ClientSession 來講,只需知道 ReaderWriter 就足夠了,因此不妨提供一個函數 open_connection(),直接返回 ReaderWriter

async def open_connection(host, port, loop):
    reader = Reader(loop)
    protocol = ClientProtocol(loop, reader)
    transport, _ = await loop.create_connection(lambda: protocol, host, port)
    writer = Writer(transport)
    return reader, writer

而後 ClientSession 就能夠簡化成這樣:

class ClientSession:
    async def get(self, url, host, port):
        reader, writer = await open_connection(host, port, self._loop)
        # ...

第六版:Asyncio Streams

其實 Asyncio 已經提供了 Reader 和 Writer,詳見 官方文檔

下面以 Asyncio Streams 實現 ClientSession.get()

class ClientSession:
    async def get(self, url, host, port):
        reader, writer = await asyncio.open_connection(
            host, port, loop=self._loop)

        request = 'GET {} HTTP/1.1\r\nHost: {}\r\n\r\n'.format(url, host)
        writer.write(request.encode())

        data = await reader.read(-1)
        print(data.decode())
        writer.close()

asyncio.open_connection() 就至關於咱們的 open_connection()ReaderWriter 也都相似,只是複雜了一些。

全文完

相關文章
相關標籤/搜索