A. Jesse Jiryu Davis 是紐約 MongoDB 的工程師。他編寫了異步 MongoDB Python 驅動程序 Motor,也是 MongoDB C 驅動程序的開發領袖和 PyMongo 團隊成員。 他也爲 asyncio 和 Tornado 作了貢獻,在 http://emptysqua.re 上寫做。 |
介紹html
Guido van Rossum 是主流編程語言 Python 的創造者,Python 社區稱他爲 BDFL(仁慈的終生大獨裁者)——這是一個來自 Monty Python 短劇的稱號。他的主頁是 http://www.python.org/~guido/ 。python
經典的計算機科學強調高效的算法,儘量快地完成計算。可是不少網絡程序的時間並非消耗在計算上,而是在等待許多慢速的鏈接或者低頻事件的發生。這些程序暴露出一個新的挑戰:如何高效的等待大量網絡事件。一個現代的解決方案是異步 I/O。linux
這一章咱們將實現一個簡單的網絡爬蟲。這個爬蟲只是一個原型式的異步應用,由於它等待許多響應而只作少許的計算。一次爬的網頁越多,它就能越快的完成任務。若是它爲每一個動態的請求啓動一個線程的話,隨着併發請求數量的增長,它會在耗盡套接字以前,耗盡內存或者線程相關的資源。使用異步 I/O 能夠避免這個的問題。git
咱們將分三個階段展現這個例子。首先,咱們會實現一個事件循環並用這個事件循環和回調來勾畫出一隻網絡爬蟲。它頗有效,可是當把它擴展成更復雜的問題時,就會致使沒法管理的混亂代碼。而後,因爲 Python 的協程不只有效並且可擴展,咱們將用 Python 的生成器函數實現一個簡單的協程。在最後一個階段,咱們將使用 Python 標準庫「asyncio」中功能完整的協程, 並經過異步隊列完成這個網絡爬蟲。(在 PyCon 2013 上,Guido 介紹了標準的 asyncio 庫,當時稱之爲「Tulip」。)github
任務web
網絡爬蟲尋找並下載一個網站上的全部網頁,也許還會把它們存檔,爲它們創建索引。從根 URL 開始,它獲取每一個網頁,解析出沒有遇到過的連接加到隊列中。當網頁沒有未見到過的連接而且隊列爲空時,它便中止運行。算法
咱們能夠經過同時下載大量的網頁來加快這一過程。當爬蟲發現新的連接,它使用一個新的套接字並行的處理這個新連接,解析響應,添加新連接到隊列。當併發很大時,可能會致使性能降低,因此咱們會限制併發的數量,在隊列保留那些未處理的連接,直到一些正在執行的任務完成。數據庫
傳統方式編程
怎麼使一個爬蟲併發?傳統的作法是建立一個線程池,每一個線程使用一個套接字在一段時間內負責一個網頁的下載。好比,下載 xkcd.com 網站的一個網頁:緩存
def fetch(url): sock = socket.socket() sock.connect(('xkcd.com', 80)) request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) # Page is now downloaded. links = parse_links(response) q.add(links)
套接字操做默認是阻塞的:當一個線程調用一個相似 connect 和 recv 方法時,它會阻塞,直到操做完成。(即便是 send 也能被阻塞,好比接收端在接受外發消息時緩慢而系統的外發數據緩存已經滿了的狀況下)所以,爲了同一時間內下載多個網頁,咱們須要不少線程。一個複雜的應用會經過線程池保持空閒的線程來分攤建立線程的開銷。一樣的作法也適用於套接字,使用鏈接池。
到目前爲止,使用線程的是成本昂貴的,操做系統對一個進程、一個用戶、一臺機器能使用線程作了不一樣的硬性限制。在 做者 Jesse 的系統中,一個 Python 線程須要 50K 的內存,開啓上萬個線程就會失敗。每一個線程的開銷和系統的限制就是這種方式的瓶頸所在。
在 Dan Kegel 那一篇頗有影響力的文章「The C10K problem」中,它提出了多線程方式在 I/O 併發上的侷限性。他在開始寫道,
網絡服務器到了要同時處理成千上萬的客戶的時代了,你不這樣認爲麼?畢竟,如今網絡規模很大了。
Kegel 在 1999 年創造出「C10K」這個術語。一萬個鏈接在今天看來仍是可接受的,可是問題依然存在,只不過大小不一樣。回到那時候,對於 C10K 問題,每一個鏈接啓一個線程是不切實際的。如今這個限制已經成指數級增加。確實,咱們的玩具網絡爬蟲使用線程也能夠工做的很好。可是,對於有着千萬級鏈接的大規模應用來講,限制依然存在:它會消耗掉全部線程,即便套接字還夠用。那麼咱們該如何解決這個問題?
異步
異步 I/O 框架在一個線程中完成併發操做。讓咱們看看這是怎麼作到的。
異步框架使用非阻塞套接字。異步爬蟲中,咱們在發起到服務器的鏈接前把套接字設爲非阻塞:
sock = socket.socket() sock.setblocking(False) try: sock.connect(('xkcd.com', 80)) except BlockingIOError: pass
對一個非阻塞套接字調用 connect 方法會當即拋出異常,即便它能夠正常工做。這個異常復現了底層 C 語言函數使人厭煩的行爲,它把 errno 設置爲 EINPROGRESS,告訴你操做已經開始。
如今咱們的爬蟲須要一種知道鏈接什麼時候創建的方法,這樣它才能發送 HTTP 請求。咱們能夠簡單地使用循環來重試:
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url) encoded = request.encode('ascii') while True: try: sock.send(encoded) break # Done. except OSError as e: pass print('sent')
這種方法不只消耗 CPU,也不能有效的等待多個套接字。在遠古時代,BSD Unix 的解決方法是 select,這是一個 C 函數,它在一個或一組非阻塞套接字上等待事件發生。如今,互聯網應用大量鏈接的需求,致使 select 被 poll 所代替,在 BSD 上的實現是 kqueue ,在 Linux 上是 epoll。它們的 API 和 select 類似,但在大數量的鏈接中也能有較好的性能。
Python 3.4 的 DefaultSelector 會使用你係統上最好的 select 類函數。要註冊一個網絡 I/O 事件的提醒,咱們會建立一個非阻塞套接字,並使用默認 selector 註冊它。
from selectors import DefaultSelector, EVENT_WRITE selector = DefaultSelector() sock = socket.socket() sock.setblocking(False) try: sock.connect(('xkcd.com', 80)) except BlockingIOError: pass def connected(): selector.unregister(sock.fileno()) print('connected!') selector.register(sock.fileno(), EVENT_WRITE, connected)
咱們不理會這個僞造的錯誤,調用 selector.register,傳遞套接字文件描述符和一個表示咱們想要監聽什麼事件的常量表達式。爲了當鏈接創建時收到提醒,咱們使用 EVENT_WRITE :它表示何時這個套接字可寫。咱們還傳遞了一個 Python 函數 connected,當對應事件發生時被調用。這樣的函數被稱爲回調。
在一個循環中,selector 接收到 I/O 提醒時咱們處理它們。
def loop(): while True: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback()
connected 回調函數被保存在 event_key.data 中,一旦這個非阻塞套接字創建鏈接,它就會被取出來執行。
不像咱們前面那個快速輪轉的循環,這裏的 select 調用會暫停,等待下一個 I/O 事件,接着執行等待這些事件的回調函數。沒有完成的操做會保持掛起,直到進到下一個事件循環時執行。
到目前爲止咱們展示了什麼?咱們展現瞭如何開始一個 I/O 操做和當操做準備好時調用回調函數。異步框架,它在單線程中執行併發操做,其創建在兩個功能之上,非阻塞套接字和事件循環。
咱們這裏達成了「併發性concurrency」,但不是傳統意義上的「並行性parallelism」。也就是說,咱們構建了一個能夠進行重疊 I/O 的微小系統,它能夠在其它操做還在進行的時候就開始一個新的操做。它實際上並無利用多核來並行執行計算。這個系統是用於解決 I/O 密集I/O-bound問題的,而不是解決 CPU 密集CPU-bound問題的。(Python 的全局解釋器鎖禁止在一個進程中以任何方式並行執行 Python 代碼。在 Python 中並行化 CPU 密集的算法須要多個進程,或者以將該代碼移植爲 C 語言並行版本。可是這是另一個話題了。)
因此,咱們的事件循環在併發 I/O 上是有效的,由於它並不用爲每一個鏈接撥付線程資源。可是在咱們開始前,咱們須要澄清一個常見的誤解:異步比多線程快。一般並非這樣的,事實上,在 Python 中,在處理少許很是活躍的鏈接時,像咱們這樣的事件循環是慢於多線程的。在運行時環境中是沒有全局解釋器鎖的,在一樣的負載下線程會執行的更好。異步 I/O 真正適用於事件不多、有許多緩慢或睡眠的鏈接的應用程序。(Jesse 在「什麼是異步,它如何工做,何時該用它?」一文中指出了異步所適用和不適用的場景。Mike Bayer 在「異步 Python 和數據庫」一文中比較了不一樣負載狀況下異步 I/O 和多線程的不一樣。)
回調
用咱們剛剛創建的異步框架,怎麼才能完成一個網絡爬蟲?即便是一個簡單的網頁下載程序也是很難寫的。
首先,咱們有一個還沒有獲取的 URL 集合,和一個已經解析過的 URL 集合。
urls_todo = set(['/']) seen_urls = set(['/'])
seen_urls 集合包括 urls_todo 和已經完成的 URL。用根 URL / 初始化它們。
獲取一個網頁須要一系列的回調。在套接字鏈接創建時會觸發 connected 回調,它向服務器發送一個 GET 請求。可是它要等待響應,因此咱們須要註冊另外一個回調函數;當該回調被調用,它仍然不能讀取到完整的請求時,就會再一次註冊回調,如此反覆。
讓咱們把這些回調放在一個 Fetcher 對象中,它須要一個 URL,一個套接字,還須要一個地方保存返回的字節:
class Fetcher: def __init__(self, url): self.response = b'' # Empty array of bytes. self.url = url self.sock = None
咱們的入口點在 Fetcher.fetch:
# Method on Fetcher class. def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('xkcd.com', 80)) except BlockingIOError: pass # Register next callback. selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
fetch 方法從鏈接一個套接字開始。可是要注意這個方法在鏈接創建前就返回了。它必須將控制返回到事件循環中等待鏈接創建。爲了理解爲何要這樣作,假設咱們程序的總體結構以下:
# Begin fetching http://xkcd.com/353/ fetcher = Fetcher('/353/') fetcher.fetch() while True: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask)
當調用 select 函數後,全部的事件提醒纔會在事件循環中處理,因此 fetch 必須把控制權交給事件循環,這樣咱們的程序才能知道何時鏈接已創建,接着循環調用 connected 回調,它已經在上面的 fetch 方法中註冊過。
這裏是咱們的 connected 方法的實現:
# Method on Fetcher class. def connected(self, key, mask): print('connected!') selector.unregister(key.fd) request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url) self.sock.send(request.encode('ascii')) # Register the next callback. selector.register(key.fd, EVENT_READ, self.read_response)
這個方法發送一個 GET 請求。一個真正的應用會檢查 send 的返回值,以防全部的信息沒能一次發送出去。可是咱們的請求很小,應用也不復雜。它只是簡單的調用 send,而後等待響應。固然,它必須註冊另外一個回調並把控制權交給事件循環。接下來也是最後一個回調函數 read_response,它處理服務器的響應:
# Method on Fetcher class. def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 4k chunk size. if chunk: self.response += chunk else: selector.unregister(key.fd) # Done reading. links = self.parse_links() # Python set-logic: for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() # seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True
這個回調在每次 selector 發現套接字可讀時被調用,可讀有兩種狀況:套接字接受到數據或它被關閉。
這個回調函數從套接字讀取 4K 數據。若是不到 4k,那麼有多少讀多少。若是比 4K 多,chunk 中只包 4K 數據而且這個套接字保持可讀,這樣在事件循環的下一個週期,會再次回到這個回調函數。當響應完成時,服務器關閉這個套接字,chunk 爲空。
這裏沒有展現的 parse_links 方法,它返回一個 URL 集合。咱們爲每一個新的 URL 啓動一個 fetcher。注意一個使用異步回調方式編程的好處:咱們不須要爲共享數據加鎖,好比咱們往 seen_urls 增長新連接時。這是一種非搶佔式的多任務,它不會在咱們代碼中的任意一個地方被打斷。
咱們增長了一個全局變量 stopped,用它來控制這個循環:
stopped = False def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback()
一旦全部的網頁被下載下來,fetcher 中止這個事件循環,程序退出。
這個例子讓異步編程的一個問題明顯的暴露出來:意大利麪代碼。
咱們須要某種方式來表達一系列的計算和 I/O 操做,而且可以調度多個這樣的系列操做讓它們併發的執行。可是,沒有線程你不能把這一系列操做寫在一個函數中:當函數開始一個 I/O 操做,它明確的把將來所需的狀態保存下來,而後返回。你須要考慮如何寫這個狀態保存的代碼。
讓咱們來解釋下這究竟是什麼意思。先來看一下在線程中使用一般的阻塞套接字來獲取一個網頁時是多麼簡單。
# Blocking version. def fetch(url): sock = socket.socket() sock.connect(('xkcd.com', 80)) request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) # Page is now downloaded. links = parse_links(response) q.add(links)
在一個套接字操做和下一個操做之間這個函數到底記住了什麼狀態?它有一個套接字,一個 URL 和一個可增加的 response。運行在線程中的函數使用編程語言的基本功能來在棧中的局部變量保存這些臨時狀態。這樣的函數也有一個「continuation」——它會在 I/O 結束後執行這些代碼。運行時環境經過線程的指令指針來記住這個 continuation。你沒必要考慮怎麼在 I/O 操做後恢復局部變量和這個 continuation。語言自己的特性幫你解決。
可是用一個基於回調的異步框架時,這些語言特性不能提供一點幫助。當等待 I/O 操做時,一個函數必須明確的保存它的狀態,由於它會在 I/O 操做完成以前返回並清除棧幀。在咱們基於回調的例子中,做爲局部變量的替代,咱們把 sock 和 response 做爲 Fetcher 實例 self 的屬性來存儲。而做爲指令指針的替代,它經過註冊 connected 和 read_response 回調來保存它的 continuation。隨着應用功能的增加,咱們須要手動保存的回調的複雜性也會增長。如此繁複的記帳式工做會讓編碼者感到頭痛。
更糟糕的是,當咱們的回調函數拋出異常會發生什麼?假設咱們沒有寫好 parse_links 方法,它在解析 HTML 時拋出異常:
Traceback (most recent call last): File "loop-with-callbacks.py", line 111, in loop() File "loop-with-callbacks.py", line 106, in loop callback(event_key, event_mask) File "loop-with-callbacks.py", line 51, in read_response links = self.parse_links() File "loop-with-callbacks.py", line 67, in parse_links raise Exception('parse error') Exception: parse error
這個堆棧回溯只能顯示出事件循環調用了一個回調。咱們不知道是什麼致使了這個錯誤。這條鏈的兩邊都被破壞:不知道從哪來也不知到哪去。這種丟失上下文的現象被稱爲「堆棧撕裂stack ripping」,常常會致使沒法分析緣由。它還會阻止咱們爲回調鏈設置異常處理,即那種用「try / except」塊封裝函數調用及其調用樹。(對於這個問題的更復雜的解決方案,參見 http://www.tornadoweb.org/en/stable/stack_context.html )
因此,除了關於多線程和異步哪一個更高效的長期爭議以外,還有一個關於這二者之間的爭論:誰更容易跪了。若是在同步上出現失誤,線程更容易出現數據競爭的問題,而回調由於"堆棧撕裂stack ripping"問題而很是難於調試。
via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html
做者:A. Jesse Jiryu Davis , Guido van Rossum 譯者:qingyunha 校對:wxy