協程實際上是比起通常的子例程而言更寬泛的存在,子例程是協程的一種特例。javascript
子例程的起始處是唯一的入口點,一旦退出即完成了子例程的執行,子例程的一個實例只會返回一次。css
協程能夠經過yield
來調用其它協程。經過yield
方式轉移執行權的協程之間不是調用者與被調用者的關係,而是彼此對稱、平等的。html
協程的起始處是第一個入口點,在協程裏,返回點以後是接下來的入口點。子例程的生命期遵循後進先出(最後一個被調用的子例程最早返回);相反,協程的生命期徹底由他們的使用的須要決定。java
還記得咱們何時會用到yield
嗎,就是在生成器(generator)裏,在迭代的時候每次執行next(generator)生成器都會執行到下一次yield
的位置並返回,能夠說生成器就是例程。python
一個生成器的例子:git
#定義生成器函數 def fib(): a, b = 0, 1 while(True): yield a a, b = b, a + b #得到生成器 fib = fib() next(fib) # >> 0 next(fib) # >> 1
在考察生成器前,咱們須要先了解通常的python函數是如何運行的。當一個函數調用子函數時,控制就會交給子函數,直到子函數返回或者拋出異常時纔會將控制交還給調用函數。程序員
自定義兩個函數:github
>>> def foo(): ... bar() ... >>> def bar(): ... pass
標準Python
解釋器CPython
中的PyEval_EvalFrameEx
方法會取得棧幀和待運行的字節碼,並在獲得的棧幀的上下文環境下計算字節碼的結果。如下是foo
函數的字節碼:web
從字節碼能夠看出foo
函數加載bar
到棧上以後經過CALL_FUNCTION
調用,bar
返回後彈出bar
的返回值,加載None
到棧上並將其做爲foo
的返回值返回。sql
當PyEval_EvalFrameEx
遇到CALL_FUNCTION
字節碼時,它建立一個新的Python
棧幀。
須要瞭解的一點是Python
的棧幀是存在於堆上的。CPython
做爲一個普通的C程序,它的棧幀就在棧上,可是CPython
所控制的Python
的棧幀倒是在堆上的,因此Python
的棧幀在函數調用結束後是仍可以保持存在。咱們設置一個全局變量frame
,將bar
的棧幀賦給frame
。
>>> import inspect >>> frame = None >>> def foo(): ... bar() ... >>> def bar(): ... global frame ... frame = inspect.currentframe() ... >>> foo() >>> #獲得'bar'的棧幀 >>> frame.f_code.co_name 'bar' >>> # 它的返回指針指向foo的棧 >>> caller_frame = frame.f_back >>> caller_frame.f_code.co_name 'foo'
如今讓咱們考察一下生成器的結構,先定義一個生成器函數:
>>> def gen_fn(): ... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done'
當 Python
將 gen_fn
編譯爲字節碼時,它會檢查有沒有yield
,有的話那就是生成器函數了,編譯器會在該函數的flag
上打上標識:
>>> # 生成器的標識位是第5位. >>> generator_bit = 1 << 5 >>> bool(gen_fn.__code__.co_flags & generator_bit) True
調用生成器函數時會生成一個生成器對象:
>>> gen = gen_fn() >>> type(gen) <class 'generator'>
生成器對象會封裝一個棧幀和一個對代碼的引用:
>>> gen.gi_code.co_name 'gen_fn'
全部來自同一個生成器函數的生成器對象都會引用同一份代碼,可是卻會擁有各自不一樣的棧幀,生成器對象結構圖以下:
幀擁有f_lasti
指針,它指向以前最新一次運行的指令。它初始化的時候爲-1,說明生成器尚未開始運行。
>>> gen.gi_frame.f_lasti -1
當咱們對生成器執行send
方法時,生成器會運行到第一次yield
的位置並中止。在這裏它返回1,正如咱們編寫的代碼預期的那樣。
>>> gen.send(None) 1
如今f_lasti
指向3了,比最開始前進了4個字節碼,以及能夠發現這個函數一共生成了56個字節碼:
>>> gen.gi_frame.f_lasti 3 >>> len(gen.gi_code.co_code) 56
生成器可以中止,也可以在任意時刻經過任意函數恢復,這也是由於棧幀是在堆上而不是棧上,因此不用遵照函數調用的先進後出原則。
咱們能夠send
"hello"字符串給生成器,它會在以前中止的yield
那裏獲得並賦值給result
,以後生成器繼續運行直到下一個yield
位置中止並返回。
>>> gen.send('hello') result of yield: hello 2
查看生成器的局部變量:
>>> gen.gi_frame.f_locals {'result': 'hello'}
當咱們再次調用send
的時候,生成器從它第二次yield的地方繼續運行,到最後已經沒有yield
了,因此出現了StopIteration
異常:
>>> gen.send('goodbye') result of 2nd yield: goodbye Traceback (most recent call last): File "<input>", line 1, in <module> StopIteration: done
能夠看到,該異常的值是生成器最後返回的值,在這裏就是字符串"done"
雖然生成器擁有一個協程該有的特性,但光這樣是不夠的,作異步編程還是困難的,咱們須要先用生成器實現一個協程異步編程的簡單模型,它同時也是Python
標準庫asyncio
的簡化版,正如asyncio
的實現,咱們會用到生成器,Future
類,以及yield from
語句。
首先實現Future
類, Future
類能夠認爲是專門用來存儲將要發送給協程的信息的類。
class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self)
Future
對象最開始處在掛起狀態,當調用set_result
時被激活,並運行註冊的回調函數,該回調函數多半是對協程發送信息讓協程繼續運行下去的函數。
咱們改造一下以前從fetch
到connected
的函數,加入Future
與yield
。
這是以前回調實現的fetch
:
class Fetcher: def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): print('connected!') # ...後面省略...
改造後,咱們將鏈接創建後的部分也放到了fetch
中。
class Fetcher: def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass f = Future() def on_connected(): #鏈接創建後經過set_result協程繼續從yield的地方往下運行 f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) print('connected!')
fetcher
是一個生成器函數,咱們建立一個Future
實例,yield它來暫停fetch
的運行直到鏈接創建f.set_result(None)
的時候,生成器才繼續運行。那set_result
時運行的回調函數是哪來的呢?這裏引入Task
類:
class Task: def __init__(self, coro): #協程 self.coro = coro #建立並初始化一個爲None的Future對象 f = Future() f.set_result(None) #步進一次(發送一次信息) #在初始化的時候發送是爲了協程到達第一個yield的位置,也是爲了註冊下一次的步進 self.step(f) def step(self, future): try: #向協程發送消息並獲得下一個從協程那yield到的Future對象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) fetcher = Fetcher('/') Task(fetcher.fetch()) loop()
流程大體是這樣的,首先Task
初始化,向fetch
生成器發送None
信息(也能夠想象成step
調用了fetch
,參數是None),fetch
得以從開頭運行到第一個yield
的地方並返回了一個Future
對象給step
的next_future
,而後step
就在這個獲得的Future
對象註冊了step
。當鏈接創建時on_connected
就會被調用,再一次向協程發送信息,協程就會繼續往下執行了。
yield from
分解協程一旦socket
鏈接創建成功,咱們發送HTTP GET
請求到服務器並在以後讀取服務器響應。如今這些步驟不用再分散在不一樣的回調函數裏了,咱們能夠將其放在同一個生成器函數中:
def fetch(self): # ... 省略鏈接的代碼 sock.send(request.encode('ascii')) while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: # 完成讀取 break
可是這樣代碼也會越積越多,可不能夠分解生成器函數的代碼呢,從協程中提取出子協程?Python 3
的yield from
能幫助咱們完成這部分工做。:
>>> def gen_fn(): ... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' ...
使用yield from
在一個生成器中調用另外一個生成器:
>>> # Generator function: >>> def caller_fn(): ... gen = gen_fn() ... rv = yield from gen ... print('return value of yield-from: {}' ... .format(rv)) ... >>> # Make a generator from the >>> # generator function. >>> caller = caller_fn()
給caller
生成器發送消息,消息送到了gen
生成器那裏,在gen
還沒返回前caller
就停在rv = yield from gen
這條語句上了。
>>> caller.send(None) 1 >>> caller.gi_frame.f_lasti 15 >>> caller.send('hello') result of yield: hello 2 >>> caller.gi_frame.f_lasti # 能夠發現指令沒有前進 15 >>> caller.send('goodbye') result of 2nd yield: goodbye return value of yield-from: done Traceback (most recent call last): File "<input>", line 1, in <module> StopIteration
對於咱們來講,於外,咱們沒法判斷髮送消息時yield
的值是來自caller
仍是caller
內的子協程(好比gen
),於內,咱們也不用關心gen
所獲得的消息是從哪裏傳送來的,gen
只用負責在上一次yield
時獲得消息輸入,運行到下一個yield時返回輸出,重複這個模式到最後return
就能夠了。
yield from
獲得的子協程最後return
的返回值:
rv = yield from gen
想想以前咱們抱怨回調函數拋出異常時看不到上下文,這回咱們看看協程是怎麼樣的:
>>> def gen_fn(): ... raise Exception('my error') >>> caller = caller_fn() >>> caller.send(None) Traceback (most recent call last): File "<input>", line 1, in <module> File "<input>", line 3, in caller_fn File "<input>", line 2, in gen_fn Exception: my error
清楚多了,棧跟蹤顯示在gen_fn
拋出異常時消息是從caller_fn
委派到gen_fn
的。
協程處理異常的手段跟普通函數也是同樣的:
>>> def gen_fn(): ... yield 1 ... raise Exception('uh oh') ... >>> def caller_fn(): ... try: ... yield from gen_fn() ... except Exception as exc: ... print('caught {}'.format(exc)) ... >>> caller = caller_fn() >>> caller.send(None) 1 >>> caller.send('hello') caught uh oh
如今讓咱們從fetch
協程上分解出一些子協程。(注意分離出的子協程並非Fetcher
的成員協程)
實現read
協程接收一個數據塊:
def read(sock): f = Future() def on_readable(): #在socket可讀時讀取消息並向協程發送一個數據快 f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) #yield f中止協程,等到可讀時,從f那獲得數據塊。 chunk = yield f selector.unregister(sock.fileno()) return chunk
實現read_all
協程接收整個消息:
def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response)
若是將yield from
去掉,看上去就跟以前實現阻塞式I/O讀取差很少呢。
如今在fetch
中調用read_all
:
class Fetcher: def fetch(self): # ... 省略鏈接的代碼: sock.send(request.encode('ascii')) self.response = yield from read_all(sock)
嗯,如今看上去代碼短多了,可是能夠作的更好。 從以前的代碼能夠看到當咱們等一個Future
返回時使用的是yield
,而等一個子協程返回倒是使用yield from
,咱們可讓二者統一塊兒來。得益於生成器與迭代器在Python中的一致性,咱們實現Future
方法同時讓它成爲一個生成器:
def __iter__(self): yield self return self.result
這樣yield f
與yield from f
的效果就一樣是輸入Future
返回Future
的結果了。 統一的好處是什麼呢?統一以後不管你是調用一個返回Future
的協程仍是返回值的協程均可以統一用yield from
應對了,當你想改變協程的實現時也不用擔憂對調用函數(or協程)產生影響。
咱們將鏈接的邏輯也從fetch
中分離出來:
def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno())
fetch
如今長這個樣子:
def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('xkcd.com', 80)) get = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url)
將上一節課中的parse_links
更名 _process_response
並稍做修改:
def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch())
主循環部分:
start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds'.format( len(urls_seen), time.time() - start))
這裏先奉上完整代碼:
from selectors import * import socket import re import urllib.parse import time class Future: def __init__(self): self.result = None self._callbacks = [] def result(self): return self.result def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) urls_seen = set(['/']) urls_todo = set(['/']) #追加了一個能夠看最高併發數的變量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) # Read 4k at a time. selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Fetcher: def __init__(self, url): self.response = b'' self.url = url def fetch(self): global concurrency_achieved, stopped concurrency_achieved = max(concurrency_achieved, len(urls_todo)) sock = socket.socket() yield from connect(sock, ('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(urls_seen), time.time() - start, concurrency_achieved))
運行python3 coroutine.py
命令查看效果:
至此,咱們在學習的過程當中掌握了:
三種爬蟲的實現方式中線程池是最壞的選擇,由於它既佔用內存,又有線程競爭的危險須要程序員本身編程解決,並且產生的I/O阻塞也浪費了CPU佔用時間。再來看看回調方式,它是一種異步方法,因此I/O阻塞的問題解決了,並且它是單線程的不會產生競爭,問題好像都解決了。然而它引入了新的問題,它的問題在於以這種方式編寫的代碼很差維護,也不容易debug。看來協程纔是最好的選擇,咱們實現的協程異步編程模型使得一個單線程可以很容易地改寫爲協程。那是否是每一次作異步編程都要實現Task
、Future
呢?不是的,你能夠直接使用asyncio
官方標準協程庫,它已經幫你把Task
、Future
封裝好了,你根本不會感覺到它們的存在,是否是很棒呢?若是你使用Python 3.5
那更好,已經能夠用原生的協程了,Python 3.5
追加了async def
,await
等協程相關的關鍵詞。這些會在從此的課程中再一一展開,下一個發佈的課程就是asyncio庫實現網絡爬蟲啦。