轉自http://xlambda.com/gevent-tutorial/html
gevent是一個基於 libev的併發庫。它爲各類併發和網絡相關的任務提供了整潔的API。
本指南假定讀者有中級Python水平,但不要求有其它更多的知識,不期待讀者有併發方面的知識。本指南的目標在於給予你須要的工具來開始使用gevent,幫助你馴服現有的併發問題,並從今開始編寫異步應用程序。python
按提供貢獻的時間前後順序列出以下: Stephen Diehl Jérémy Bethmont sww Bruno Bigras David Ripton Travis Cline Boris Feld youngsterxyf Eddie Hebert Alexis Metaireau Daniel Velkovgit
同時感謝Denis Bilenko寫了gevent和相應的指導以造成本指南。程序員
這是一個以MIT許可證發佈的協做文檔。你想添加一些內容?或看見一個排版錯誤? Fork一個分支發佈一個request到 Github. 咱們歡迎任何貢獻。github
本頁也有日文版本。web
在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。express
在任什麼時候刻,只有一個協程在運行。json
這與multiprocessing
或threading
等提供真正並行構造的庫是不一樣的。這些庫輪轉使用操做系統調度的進程和線程,是真正的並行。數組
併發的核心思想在於,大的任務能夠分解成一系列的子任務,後者能夠被調度成同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的切換也就是上下文切換。服務器
在gevent裏面,上下文切換是經過yielding來完成的. 在下面的例子裏,咱們有兩個上下文,經過調用gevent.sleep(0)
,它們各自yield向對方。
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
下圖將控制流形象化,就像在調試器中單步執行整個程序,以說明上下文切換如何發生。
當咱們在受限於網絡或IO的函數中使用gevent,這些函數會被協做式的調度, gevent的真正能力會獲得發揮。Gevent處理了全部的細節,來保證你的網絡庫會在可能的時候,隱式交出greenlet上下文的執行權。這樣的一種用法是如何強大,怎麼強調都不爲過。或者咱們舉些例子來詳述。
下面例子中的select()
函數一般是一個在各類文件描述符上輪詢的阻塞調用。
import time import gevent from gevent import select start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic()) def gr2(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic()) def gr3(): print("Hey lets do some stuff while the greenlets poll, %s" % tic()) gevent.sleep(1) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 2.0 seconds
下面是另一個多少有點人造色彩的例子,定義一個非肯定性的(non-deterministic) 的task
函數(給定相同輸入的狀況下,它的輸出不保證相同)。此例中執行這個函數的反作用就是,每次task在它的執行過程當中都會隨機地停某些秒。
import gevent import random def task(pid): """ Some non-deterministic task """ gevent.sleep(random.randint(0,2)*0.001) print('Task %s done' % pid) def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in xrange(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous: Task 3 done Task 7 done Task 9 done Task 2 done Task 4 done Task 1 done Task 8 done Task 6 done Task 0 done Task 5 done
上例中,在同步的部分,全部的task都同步的執行,結果當每一個task在執行時主流程被阻塞(主流程的執行暫時停住)。
程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn
。初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在全部greenlet執行完後纔會繼續向下走。
要重點留意的是,異步的部分本質上是隨機的,並且異步部分的總體運行時間比同步要大大減小。事實上,同步部分的最大運行時間,便是每一個task停0.002秒,結果整個隊列要停0.02秒。而異步部分的最大運行時間大體爲0.002秒,由於沒有任何一個task會阻塞其它task的執行。
一個更常見的應用場景,如異步地向服務器取數據,取數據操做的執行時間依賴於發起取數據請求時遠端服務器的負載,各個請求的執行時間會有差異。
import gevent.monkey gevent.monkey.patch_socket() import gevent import urllib2 import simplejson as json def fetch(pid): response = urllib2.urlopen('http://json-time.appspot.com/time.json') result = response.read() json_result = json.loads(result) datetime = json_result['datetime'] print('Process %s: %s' % (pid, datetime)) return json_result['datetime'] def synchronous(): for i in range(1,10): fetch(i) def asynchronous(): threads = [] for i in range(1,10): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
就像以前所提到的,greenlet具備肯定性。在相同配置相同輸入的狀況下,它們老是會產生相同的輸出。下面就有例子,咱們在multiprocessing的pool之間執行一系列的任務,與在gevent的pool之間執行做比較。
import time def echo(i): time.sleep(0.001) return i # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4) # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4)
False True
即便gevent一般帶有肯定性,當開始與如socket或文件等外部服務交互時,不肯定性也可能溜進你的程序中。所以儘管gevent線程是一種「肯定的併發」形式,使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。
涉及併發長期存在的問題就是競爭條件(race condition)。簡單來講,當兩個併發線程/進程都依賴於某個共享資源同時都嘗試去修改它的時候,就會出現競爭條件。這會致使資源修改的結果狀態依賴於時間和執行順序。這是個問題,咱們通常會作不少努力嘗試避免競爭條件,由於它會致使整個程序行爲變得不肯定。
最好的辦法是始終避免全部全局的狀態。全局狀態和導入時(import-time)反作用老是會反咬你一口!
gevent對Greenlet初始化提供了一些封裝,最經常使用的使用模板之一有
import gevent from gevent import Greenlet def foo(message, n): """ Each thread will be passed the message, and n arguments in its initialization. """ gevent.sleep(n) print(message) # Initialize a new Greenlet instance running the named function # foo thread1 = Greenlet.spawn(foo, "Hello", 1) # Wrapper for creating and running a new Greenlet from the named # function foo, with the passed arguments thread2 = gevent.spawn(foo, "I live!", 2) # Lambda expressions thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] # Block until all threads complete. gevent.joinall(threads)
Hello I live!
除使用基本的Greenlet類以外,你也能夠子類化Greenlet類,重載它的_run
方法。
import gevent from gevent import Greenlet class MyGreenlet(Greenlet): def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()
Hi there!
就像任何其餘成段代碼,Greenlet也可能以不一樣的方式運行失敗。 Greenlet可能未能成功拋出異常,不能中止運行,或消耗了太多的系統資源。
一個greenlet的狀態一般是一個依賴於時間的參數。在greenlet中有一些標誌,讓你能夠監視它的線程內部狀態:
started
-- Boolean, 指示此Greenlet是否已經啓動ready()
-- Boolean, 指示此Greenlet是否已經中止successful()
-- Boolean, 指示此Greenlet是否已經中止並且沒拋異常value
-- 任意值, 此Greenlet代碼返回的值exception
-- 異常, 此Greenlet內拋出的未捕獲異常import gevent def win(): return 'You win!' def fail(): raise Exception('You fail at failing.') winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) # True print(loser.started) # True # Exceptions raised in the Greenlet, stay inside the Greenlet. try: gevent.joinall([winner, loser]) except Exception as e: print('This will never be reached') print(winner.value) # 'You win!' print(loser.value) # None print(winner.ready()) # True print(loser.ready()) # True print(winner.successful()) # True print(loser.successful()) # False # The exception raised in fail, will not propogate outside the # greenlet. A stack trace will be printed to stdout but it # will not unwind the stack of the parent. print(loser.exception) # It is possible though to raise the exception again outside # raise loser.exception # or with # loser.get()
True True You win! None True True True False You fail at failing.
當主程序(main program)收到一個SIGQUIT信號時,不能成功作yield操做的 Greenlet可能會令意外地掛起程序的執行。這致使了所謂的殭屍進程,它須要在Python解釋器以外被kill掉。
對此,一個通用的處理模式就是在主程序中監聽SIGQUIT信號,在程序退出調用gevent.shutdown
。
import gevent import signal def run_forever(): gevent.sleep(1000) if __name__ == '__main__': gevent.signal(signal.SIGQUIT, gevent.shutdown) thread = gevent.spawn(run_forever) thread.join()
超時是一種對一塊代碼或一個Greenlet的運行時間的約束。
import gevent from gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print('Cou