關於 Asyncio 的其餘文章:html
若是不知道 Asyncio 是什麼,先看「Asyncio 簡介」那一篇。python
首先,爲了便於測試,咱們用 Python 內建的 http
模塊,運行一個簡單的 HTTP Server。segmentfault
新建一個目錄,添加文件 index.html
,內容爲 Hello, World!
(不是合法的 HTML 格式也沒有關係),而後運行以下命令(Ubuntu 請用 python3
):緩存
$ python -m http.server Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...
後面不一樣的 Client 實現,都會鏈接這個 Server:Host 爲 localhost
,Port 爲 8000
。bash
全部的示例代碼,import
語句一概從略。app
import asyncio
初版改寫自 Python 官方文檔裏的 例子。
Python 的例子是 Echo Client,咱們稍微複雜一點,是 HTTP Client,都是 TCP。異步
class ClientProtocol(asyncio.Protocol): def __init__(self, loop): self.loop = loop def connection_made(self, transport): request = 'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n' transport.write(request.encode()) def data_received(self, data): print(data.decode()) def connection_lost(self, exc): self.loop.stop() async def main(loop): await loop.create_connection( lambda: ClientProtocol(loop), 'localhost', 8000) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.run_forever()
TCP 鏈接由 loop.create_connection()
建立,後者須要一個 Protocol 工廠,即 lambda: ClientProtocol(loop)
。
Protocol 提供了 connection_made()
,data_received()
, connection_lost()
等接口,這些接口就像回調函數同樣,會在恰當的時候被調用。
咱們在 connection_made()
中,經過參數 transport
發送一個 HTTP GET 請求,隨後在 data_received()
裏,將收到 HTTP 應答。
當 connection_lost()
被調用時,表示 Server 已經斷開鏈接。async
運行結果:tcp
HTTP/1.0 200 OK Server: SimpleHTTP/0.6 Python/3.6.3 Date: Mon, 04 Dec 2017 06:11:52 GMT Content-type: text/html Content-Length: 13 Last-Modified: Thu, 30 Nov 2017 05:37:31 GMT Hello, World!
這就是一個標準的 HTTP 應答,包含 Status Line,Headers 和 Body。函數
值得注意的是,loop 其實運行了兩遍:
loop.run_until_complete(main(loop)) # 第一遍 loop.run_forever() # 第二遍
若是沒有 run_forever()
,在收到數據以前,loop 可能就結束了。協程 main()
只是建立好鏈接,隨後 run_until_complete()
天然也就無事可作而終。
加了 run_forever()
後,data_received()
等便有了被調用的機會。可是也有問題,loop 一直在跑,程序沒辦法結束,因此纔在 connection_lost()
裏主動中止 loop:
def connection_lost(self, exc): self.loop.stop()
初版在 connection_made()
中 hard code 了一個 HTTP GET 請求,靈活性較差,之後必然還有 POST 等其餘 HTTP 方法須要支持,因此有必要新增一個 ClientSession
類,來抽象客戶端的會話。因而,HTTP 請求的發送,便從 connection_made()
挪到了 ClientSession.get()
。
ClientSession
應該爲每個 HTTP 方法提供一個相應的方法,好比 post
,put
等等,雖然咱們只考慮 HTTP GET。
class ClientProtocol(asyncio.Protocol): def __init__(self, loop): self.loop = loop self.transport = None def connection_made(self, transport): self.transport = transport def data_received(self, data): print(data.decode()) def connection_lost(self, exc): self.loop.stop() class ClientSession: def __init__(self, loop): self._loop = loop async def get(self, url, host, port): transport, protocol = await self._loop.create_connection( lambda: ClientProtocol(loop), host, port) request = 'GET {} HTTP/1.1\r\nHost: {}\r\n\r\n'.format(url, host) transport.write(request.encode())
首先,ClientProtocol
新增了一個屬性 transport
,是在 connection_made()
時保存下來的,這樣在 ClientSession
裏才能經過它來發送請求。
run_forever()
第三版的目的是:去掉 run_forever()
。
class ClientProtocol(asyncio.Protocol): def __init__(self, loop): self.loop = loop self.transport = None self._eof = False # 有沒有收到 EOF self._waiter = None # 用來等待接收數據的 future def connection_made(self, transport): self.transport = transport def data_received(self, data): print(data.decode()) def eof_received(self): self._eof = True self._wakeup_waiter() def connection_lost(self, exc): pass # 再也不調用 self.loop.stop() async def wait_for_data(self): assert not self._eof assert not self._waiter self._waiter = self.loop.create_future() await self._waiter self._waiter = None def _wakeup_waiter(self): waiter = self._waiter if waiter: self._waiter = None waiter.set_result(None) class ClientSession: def __init__(self, loop): self._loop = loop async def get(self, url, host, port): transport, protocol = await self._loop.create_connection( lambda: ClientProtocol(loop), host, port) request = 'GET {} HTTP/1.1\r\nHost: {}\r\n\r\n'.format(url, host) transport.write(request.encode()) # 等待接收數據。 await protocol.wait_for_data()
協程 main()
保持不變,可是 loop.run_forever()
已被剔除:
loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) # 再也不須要 loop.run_forever()
HTTP 請求發送以後,繼續異步等待(await)數據的接收,即 protocol.wait_for_data()
。
這個等待動做,是經過往 loop 裏新增一個 future 來實現的:
async def wait_for_data(self): # ... self._waiter = self.loop.create_future() await self._waiter self._waiter = None
self._waiter
就是這個致使等待的 future,它會保證 loop 一直運行,直到數據接收完畢。eof_received()
被調用時,數據就接收完畢了(EOF 的意思不用多說了吧?)。
def eof_received(self): self._eof = True self._wakeup_waiter()
_wakeup_waiter()
的做用是結束那個致使等待的 future,這樣 loop 也就能夠結束了:
def _wakeup_waiter(self): waiter = self._waiter if waiter: self._waiter = None # 結束 waiter future,以便 loop 結束。 waiter.set_result(None)
在 data_received()
裏直接輸出 HTTP 的應答結果,實在算不上什麼完美的作法。
def data_received(self, data): print(data.decode())
爲了解決這一問題,咱們引入一個 Reader
類,用來緩存收到的數據,並提供「讀」的接口給用戶。
首先,Protocol 被簡化了,前一版引入的各類處理,都轉交給了 Reader。
class ClientProtocol(asyncio.Protocol): def __init__(self, loop, reader): self.loop = loop self.transport = None self._reader = reader def connection_made(self, transport): self.transport = transport def data_received(self, data): self._reader.feed(data) # 轉交給 Reader def eof_received(self): self._reader.feed_eof() # 轉交給 Reader def connection_lost(self, exc): pass
下面是 ClientSession.get()
基於 Reader
的實現:
class ClientSession: async def get(self, url, host, port): reader = Reader(self._loop) transport, protocol = await self._loop.create_connection( lambda: ClientProtocol(loop, reader), host, port) # 發送請求,代碼從略... data = await reader.read() print(data.decode())
Reader
自己是從上一版的 Protocol 抽取出來的,惟一不一樣的是,接收的數據被臨時放在了一個 bytearray
緩存裏。
class Reader: def __init__(self, loop): self._loop = loop self._buffer = bytearray() # 緩存 self._eof = False self._waiter = None def feed(self, data): self._buffer.extend(data) self._wakeup_waiter() def feed_eof(self): self._eof = True self._wakeup_waiter() async def read(self): if not self._buffer and not self._eof: await self._wait_for_data() data = bytes(self._buffer) del self._buffer[:] return data async def _wait_for_data(self): assert not self._eof assert not self._waiter self._waiter = self._loop.create_future() await self._waiter self._waiter = None def _wakeup_waiter(self): waiter = self._waiter if waiter: self._waiter = None waiter.set_result(None)
稍微解釋一下 read()
,比較重要的是開始的一句判斷:
# 若是緩存爲空,而且 EOF 還沒收到,那就(繼續)等待接收數據。 if not self._buffer and not self._eof: # read() 會停在這個地方,直到 feed() 或 feed_eof() 被調用, # 也就是說有數據可讀了。 await self._wait_for_data()
接下來就是把緩存倒空:
data = bytes(self._buffer) del self._buffer[:]
運行一下,不難發現,ClientSession.get()
裏讀數據的那一句是有問題的。
data = await reader.read()
收到的 data
並非完整的 HTTP 應答,可能只包含了 HTTP 的 Headers,而沒有 Body。
一個 HTTP 應答,Server 端可能分屢次發送過來。好比這個測試用的 Hello World Server,Headers 和 Body 就分了兩次發送,也就是說 data_received()
會被調用兩次。
以前咱們在 eof_received()
裏才喚醒 waiter(_wakeup_waiter()
),如今在 data_received()
裏就喚醒了,因而第一次數據收完, waiter 就結束了,loop 也便跟着結束。
爲了讀到完整的 HTTP 應答,方法也很簡單,把 read()
放在循環裏:
blocks = [] while True: block = await reader.read() if not block: break blocks.append(block) data = b''.join(blocks) print(data.decode())
每一次 read()
,若是緩存爲空,而且 EOF 還沒收到的話,就會再次建立 waiter,放到 loop 裏,繼續等待接收數據。
這個循環顯然應該交給 Reader
處理,對 ClientSession
需保持透明。
class Reader: async def read(self): blocks = [] while True: block = await self._read() if not block: break blocks.append(block) data = b''.join(blocks) return data async def _read(self): if not self._buffer and not self._eof: await self._wait_for_data() data = bytes(self._buffer) del self._buffer[:] return data
最後,原來的 read()
重命名爲 _read()
,新的 read()
在循環中反覆調用 _read()
,直到無數據可讀。ClientSession
這邊直接調用新的 read()
便可。
到目前爲止,發送 HTTP 請求時,都是直接調用較爲底層的 transport.write()
:
async def get(self, url, host, port): # ... transport.write(request.encode())
能夠把它封裝在 Writer
中,與 Reader
的作法相似,可是 Writer
要簡單得多:
class Writer: def __init__(self, transport): self._transport = transport def write(self, data): self._transport.write(data)
而後在 ClientSession.get()
中建立 Writer
:
async def get(self, url, host, port): reader = Reader(self._loop) transport, protocol = await self._loop.create_connection( lambda: ClientProtocol(loop, reader), host, port) writer = Writer(transport) request = 'GET {} HTTP/1.1\r\nHost: {}\r\n\r\n'.format(url, host) writer.write(request.encode()) # ...
對 ClientSession
來講,只需知道 Reader
和 Writer
就足夠了,因此不妨提供一個函數 open_connection()
,直接返回 Reader
和 Writer
。
async def open_connection(host, port, loop): reader = Reader(loop) protocol = ClientProtocol(loop, reader) transport, _ = await loop.create_connection(lambda: protocol, host, port) writer = Writer(transport) return reader, writer
而後 ClientSession
就能夠簡化成這樣:
class ClientSession: async def get(self, url, host, port): reader, writer = await open_connection(host, port, self._loop) # ...
其實 Asyncio 已經提供了 Reader 和 Writer,詳見 官方文檔。
下面以 Asyncio Streams 實現 ClientSession.get()
:
class ClientSession: async def get(self, url, host, port): reader, writer = await asyncio.open_connection( host, port, loop=self._loop) request = 'GET {} HTTP/1.1\r\nHost: {}\r\n\r\n'.format(url, host) writer.write(request.encode()) data = await reader.read(-1) print(data.decode()) writer.close()
asyncio.open_connection()
就至關於咱們的 open_connection()
。Reader
和 Writer
也都相似,只是複雜了一些。
全文完