事件驅動-協程實現爬蟲

實驗三:事件驅動-協程實現爬蟲

什麼是協程?

協程實際上是比起通常的子例程而言更寬泛的存在,子例程是協程的一種特例。javascript

子例程的起始處是唯一的入口點,一旦退出即完成了子例程的執行,子例程的一個實例只會返回一次。css

協程能夠經過yield來調用其它協程。經過yield方式轉移執行權的協程之間不是調用者與被調用者的關係,而是彼此對稱、平等的。html

協程的起始處是第一個入口點,在協程裏,返回點以後是接下來的入口點。子例程的生命期遵循後進先出(最後一個被調用的子例程最早返回);相反,協程的生命期徹底由他們的使用的須要決定。java

還記得咱們何時會用到yield嗎,就是在生成器(generator)裏,在迭代的時候每次執行next(generator)生成器都會執行到下一次yield的位置並返回,能夠說生成器就是例程。python

一個生成器的例子:git

#定義生成器函數 def fib(): a, b = 0, 1 while(True): yield a a, b = b, a + b #得到生成器 fib = fib() next(fib) # >> 0 next(fib) # >> 1 

生成器是如何工做的?

在考察生成器前,咱們須要先了解通常的python函數是如何運行的。當一個函數調用子函數時,控制就會交給子函數,直到子函數返回或者拋出異常時纔會將控制交還給調用函數。程序員

自定義兩個函數:github

>>> def foo(): ... bar() ... >>> def bar(): ... pass 

標準Python解釋器CPython中的PyEval_EvalFrameEx方法會取得棧幀和待運行的字節碼,並在獲得的棧幀的上下文環境下計算字節碼的結果。如下是foo函數的字節碼:web

從字節碼能夠看出foo函數加載bar到棧上以後經過CALL_FUNCTION調用,bar返回後彈出bar的返回值,加載None到棧上並將其做爲foo的返回值返回。sql

PyEval_EvalFrameEx遇到CALL_FUNCTION字節碼時,它建立一個新的Python棧幀。

須要瞭解的一點是Python的棧幀是存在於堆上的。CPython做爲一個普通的C程序,它的棧幀就在棧上,可是CPython所控制的Python的棧幀倒是在堆上的,因此Python的棧幀在函數調用結束後是仍可以保持存在。咱們設置一個全局變量frame,將bar的棧幀賦給frame

>>> import inspect >>> frame = None >>> def foo(): ... bar() ... >>> def bar(): ... global frame ... frame = inspect.currentframe() ... >>> foo() >>> #獲得'bar'的棧幀 >>> frame.f_code.co_name 'bar' >>> # 它的返回指針指向foo的棧 >>> caller_frame = frame.f_back >>> caller_frame.f_code.co_name 'foo' 

此處輸入圖片的描述

如今讓咱們考察一下生成器的結構,先定義一個生成器函數:

>>> def gen_fn(): ... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' 

當 Python 將 gen_fn 編譯爲字節碼時,它會檢查有沒有yield,有的話那就是生成器函數了,編譯器會在該函數的flag上打上標識:

>>> # 生成器的標識位是第5位. >>> generator_bit = 1 << 5 >>> bool(gen_fn.__code__.co_flags & generator_bit) True 

調用生成器函數時會生成一個生成器對象:

>>> gen = gen_fn() >>> type(gen) <class 'generator'> 

生成器對象會封裝一個棧幀和一個對代碼的引用:

>>> gen.gi_code.co_name 'gen_fn' 

全部來自同一個生成器函數的生成器對象都會引用同一份代碼,可是卻會擁有各自不一樣的棧幀,生成器對象結構圖以下:

此處輸入圖片的描述

幀擁有f_lasti指針,它指向以前最新一次運行的指令。它初始化的時候爲-1,說明生成器尚未開始運行。

>>> gen.gi_frame.f_lasti -1 

當咱們對生成器執行send方法時,生成器會運行到第一次yield的位置並中止。在這裏它返回1,正如咱們編寫的代碼預期的那樣。

>>> gen.send(None) 1 

如今f_lasti指向3了,比最開始前進了4個字節碼,以及能夠發現這個函數一共生成了56個字節碼:

>>> gen.gi_frame.f_lasti 3 >>> len(gen.gi_code.co_code) 56 

生成器可以中止,也可以在任意時刻經過任意函數恢復,這也是由於棧幀是在堆上而不是棧上,因此不用遵照函數調用的先進後出原則。

咱們能夠send"hello"字符串給生成器,它會在以前中止的yield那裏獲得並賦值給result,以後生成器繼續運行直到下一個yield位置中止並返回。

>>> gen.send('hello') result of yield: hello 2 

查看生成器的局部變量:

>>> gen.gi_frame.f_locals {'result': 'hello'} 

當咱們再次調用send的時候,生成器從它第二次yield的地方繼續運行,到最後已經沒有yield了,因此出現了StopIteration異常:

>>> gen.send('goodbye') result of 2nd yield: goodbye Traceback (most recent call last): File "<input>", line 1, in <module> StopIteration: done 

能夠看到,該異常的值是生成器最後返回的值,在這裏就是字符串"done"

生成器實現協程模型

雖然生成器擁有一個協程該有的特性,但光這樣是不夠的,作異步編程還是困難的,咱們須要先用生成器實現一個協程異步編程的簡單模型,它同時也是Python標準庫asyncio的簡化版,正如asyncio的實現,咱們會用到生成器,Future類,以及yield from語句。

首先實現Future類, Future類能夠認爲是專門用來存儲將要發送給協程的信息的類。

class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) 

Future對象最開始處在掛起狀態,當調用set_result時被激活,並運行註冊的回調函數,該回調函數多半是對協程發送信息讓協程繼續運行下去的函數。

咱們改造一下以前從fetchconnected的函數,加入Futureyield

這是以前回調實現的fetch

class Fetcher: def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): print('connected!') # ...後面省略... 

改造後,咱們將鏈接創建後的部分也放到了fetch中。

class Fetcher: def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass f = Future() def on_connected(): #鏈接創建後經過set_result協程繼續從yield的地方往下運行 f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) print('connected!') 

fetcher是一個生成器函數,咱們建立一個Future實例,yield它來暫停fetch的運行直到鏈接創建f.set_result(None)的時候,生成器才繼續運行。那set_result時運行的回調函數是哪來的呢?這裏引入Task類:

class Task: def __init__(self, coro): #協程 self.coro = coro #建立並初始化一個爲None的Future對象 f = Future() f.set_result(None) #步進一次(發送一次信息) #在初始化的時候發送是爲了協程到達第一個yield的位置,也是爲了註冊下一次的步進 self.step(f) def step(self, future): try: #向協程發送消息並獲得下一個從協程那yield到的Future對象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) fetcher = Fetcher('/') Task(fetcher.fetch()) loop() 

流程大體是這樣的,首先Task初始化,向fetch生成器發送None信息(也能夠想象成step調用了fetch,參數是None),fetch得以從開頭運行到第一個yield的地方並返回了一個Future對象給stepnext_future,而後step就在這個獲得的Future對象註冊了step。當鏈接創建時on_connected就會被調用,再一次向協程發送信息,協程就會繼續往下執行了。

使用yield from分解協程

一旦socket鏈接創建成功,咱們發送HTTP GET請求到服務器並在以後讀取服務器響應。如今這些步驟不用再分散在不一樣的回調函數裏了,咱們能夠將其放在同一個生成器函數中:

def fetch(self): # ... 省略鏈接的代碼 sock.send(request.encode('ascii')) while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: # 完成讀取 break 

可是這樣代碼也會越積越多,可不能夠分解生成器函數的代碼呢,從協程中提取出子協程?Python 3yield from能幫助咱們完成這部分工做。:

>>> def gen_fn(): ... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' ... 

使用yield from在一個生成器中調用另外一個生成器:

>>> # Generator function: >>> def caller_fn(): ... gen = gen_fn() ... rv = yield from gen ... print('return value of yield-from: {}' ... .format(rv)) ... >>> # Make a generator from the >>> # generator function. >>> caller = caller_fn() 

caller生成器發送消息,消息送到了gen生成器那裏,在gen還沒返回前caller就停在rv = yield from gen這條語句上了。

>>> caller.send(None) 1 >>> caller.gi_frame.f_lasti 15 >>> caller.send('hello') result of yield: hello 2 >>> caller.gi_frame.f_lasti # 能夠發現指令沒有前進 15 >>> caller.send('goodbye') result of 2nd yield: goodbye return value of yield-from: done Traceback (most recent call last): File "<input>", line 1, in <module> StopIteration 

對於咱們來講,於外,咱們沒法判斷髮送消息時yield的值是來自caller仍是caller內的子協程(好比gen),於內,咱們也不用關心gen所獲得的消息是從哪裏傳送來的,gen只用負責在上一次yield時獲得消息輸入,運行到下一個yield時返回輸出,重複這個模式到最後return就能夠了。

yield from獲得的子協程最後return的返回值:

rv = yield from gen 

想想以前咱們抱怨回調函數拋出異常時看不到上下文,這回咱們看看協程是怎麼樣的:

>>> def gen_fn(): ... raise Exception('my error') >>> caller = caller_fn() >>> caller.send(None) Traceback (most recent call last): File "<input>", line 1, in <module> File "<input>", line 3, in caller_fn File "<input>", line 2, in gen_fn Exception: my error 

清楚多了,棧跟蹤顯示在gen_fn拋出異常時消息是從caller_fn委派到gen_fn的。

協程處理異常的手段跟普通函數也是同樣的:

>>> def gen_fn(): ... yield 1 ... raise Exception('uh oh') ... >>> def caller_fn(): ... try: ... yield from gen_fn() ... except Exception as exc: ... print('caught {}'.format(exc)) ... >>> caller = caller_fn() >>> caller.send(None) 1 >>> caller.send('hello') caught uh oh 

如今讓咱們從fetch協程上分解出一些子協程。(注意分離出的子協程並非Fetcher的成員協程)

實現read協程接收一個數據塊:

def read(sock): f = Future() def on_readable(): #在socket可讀時讀取消息並向協程發送一個數據快 f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) #yield f中止協程,等到可讀時,從f那獲得數據塊。 chunk = yield f selector.unregister(sock.fileno()) return chunk 

實現read_all協程接收整個消息:

def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) 

若是將yield from去掉,看上去就跟以前實現阻塞式I/O讀取差很少呢。

如今在fetch中調用read_all

class Fetcher: def fetch(self): # ... 省略鏈接的代碼: sock.send(request.encode('ascii')) self.response = yield from read_all(sock) 

嗯,如今看上去代碼短多了,可是能夠作的更好。 從以前的代碼能夠看到當咱們等一個Future返回時使用的是yield,而等一個子協程返回倒是使用yield from,咱們可讓二者統一塊兒來。得益於生成器與迭代器在Python中的一致性,咱們實現Future方法同時讓它成爲一個生成器:

def __iter__(self): yield self return self.result 

這樣yield fyield from f的效果就一樣是輸入Future返回Future的結果了。 統一的好處是什麼呢?統一以後不管你是調用一個返回Future的協程仍是返回值的協程均可以統一用yield from應對了,當你想改變協程的實現時也不用擔憂對調用函數(or協程)產生影響。

完成後續工做

咱們將鏈接的邏輯也從fetch中分離出來:

def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) 

fetch如今長這個樣子:

def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('xkcd.com', 80)) get = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) 

將上一節課中的parse_links 更名 _process_response 並稍做修改:

def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) 

主循環部分:

start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds'.format( len(urls_seen), time.time() - start)) 

運行效果

這裏先奉上完整代碼:

from selectors import * import socket import re import urllib.parse import time class Future: def __init__(self): self.result = None self._callbacks = [] def result(self): return self.result def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) urls_seen = set(['/']) urls_todo = set(['/']) #追加了一個能夠看最高併發數的變量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) # Read 4k at a time. selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Fetcher: def __init__(self, url): self.response = b'' self.url = url def fetch(self): global concurrency_achieved, stopped concurrency_achieved = max(concurrency_achieved, len(urls_todo)) sock = socket.socket() yield from connect(sock, ('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(urls_seen), time.time() - start, concurrency_achieved)) 

運行python3 coroutine.py命令查看效果:

此處輸入圖片的描述

7、總結

至此,咱們在學習的過程當中掌握了:

  1. 線程池實現併發爬蟲
  2. 回調方法實現異步爬蟲
  3. 協程技術的介紹
  4. 一個基於協程的異步編程模型
  5. 協程實現異步爬蟲

三種爬蟲的實現方式中線程池是最壞的選擇,由於它既佔用內存,又有線程競爭的危險須要程序員本身編程解決,並且產生的I/O阻塞也浪費了CPU佔用時間。再來看看回調方式,它是一種異步方法,因此I/O阻塞的問題解決了,並且它是單線程的不會產生競爭,問題好像都解決了。然而它引入了新的問題,它的問題在於以這種方式編寫的代碼很差維護,也不容易debug。看來協程纔是最好的選擇,咱們實現的協程異步編程模型使得一個單線程可以很容易地改寫爲協程。那是否是每一次作異步編程都要實現TaskFuture呢?不是的,你能夠直接使用asyncio官方標準協程庫,它已經幫你把TaskFuture封裝好了,你根本不會感覺到它們的存在,是否是很棒呢?若是你使用Python 3.5那更好,已經能夠用原生的協程了,Python 3.5追加了async defawait等協程相關的關鍵詞。這些會在從此的課程中再一一展開,下一個發佈的課程就是asyncio庫實現網絡爬蟲啦。

8、參考資料

相關文章
相關標籤/搜索