Greenlethtml
在gevent裏面最多應用到的就是greenlet,一個輕量級的協程實現。在任什麼時候間點,只有一個greenlet處於運行狀態。Greenlet與multiprocessing 和 threading這兩個庫提供的真正的並行結構的區別在於這兩個庫會真正的切換進程,POSIX線程是由操做系統來負責調度,而且它們是真正並行的。python
應對併發的主要思路就是將一個大的任務分解成一個子任務的集合而且可以讓它並行或者異步地執行,而不是一次執行一個或者同步執行。在兩個子任務中的切換被稱爲上下文切換。shell
gevent裏面的上下文切換是很是平滑的。在下面的例子程序中,咱們能夠看到兩個上下文經過調用 gevent.sleep()來互相切換。express
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
這段程序的執行結果以下:編程
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
從這個執行結果能夠看出這個程序的執行過程,在這裏的兩個函數是交替執行的。json
gevent的真正威力是在處理網絡和帶有IO阻塞的功能時可以這些任務協調地運行。gevent來實現了這些具體的細節來保證在須要的時候greenlet上下文進行切換。在這裏用一個例子來講明。安全
import time import gevent from gevent import select start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: ', tic()) select.select([], [], [], 2) print('Ended Polling: ', tic()) def gr2(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: ', tic()) select.select([], [], [], 2) print('Ended Polling: ', tic()) def gr3(): print("Hey lets do some stuff while the greenlets poll, at", tic()) gevent.sleep(1) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])
在上面的例子裏,select() 一般是一個阻塞的調用。服務器
程序的執行結果以下:網絡
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at at 0.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 2.0 seconds
接下來一個例子中能夠看到gevent是安排各個任務的執行的。session
import gevent import random def task(pid): """ Some non-deterministic task """ gevent.sleep(random.randint(0,2)*0.001) print('Task', pid, 'done') def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in xrange(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
執行結果以下:
root@master:~# python two.py Synchronous: ('Task', 1, 'done') ('Task', 2, 'done') ('Task', 3, 'done') ('Task', 4, 'done') ('Task', 5, 'done') ('Task', 6, 'done') ('Task', 7, 'done') ('Task', 8, 'done') ('Task', 9, 'done') Asynchronous: ('Task', 0, 'done') ('Task', 9, 'done') ('Task', 7, 'done') ('Task', 3, 'done') ('Task', 6, 'done') ('Task', 5, 'done') ('Task', 4, 'done') ('Task', 1, 'done') ('Task', 2, 'done') ('Task', 8, 'done')
在同步的狀況下,任務是按順序執行的,在執行各個任務的時候會阻塞主線程。
而gevent.spawn 的重要功能就是封裝了greenlet裏面的函數。初始化的greenlet放在了threads這個list裏面,被傳遞給了 gevent.joinall 這個函數,它會阻塞當前的程序來執行全部的greenlet。
在異步執行的狀況下,全部任務的執行順序是徹底隨機的。每個greenlet的都不會阻塞其餘greenlet的執行。
在有時候須要異步地從服務器獲取數據,gevent能夠經過判斷從服務器的數據載入狀況來處理請求。
import gevent.monkey gevent.monkey.patch_socket() import gevent import urllib2 import simplejson as json def fetch(pid): response = urllib2.urlopen('http://json-time.appspot.com/time.json') result = response.read() json_result = json.loads(result) datetime = json_result['datetime'] print 'Process ', pid, datetime return json_result['datetime'] def synchronous(): for i in range(1,10): fetch(i) def asynchronous(): threads = [] for i in range(1,10): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads) print 'Synchronous:' synchronous() print 'Asynchronous:' asynchronous()
就像以前說的,greenlet是肯定的。給每一個greenlet相同的配置和相同的輸入,獲得的輸出是相同的。咱們能夠用python 的多進程池和gevent池來做比較。下面的例子能夠說明這個特色:
import time def echo(i): time.sleep(0.001) return i # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print( run1 == run2 == run3 == run4 ) # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print( run1 == run2 == run3 == run4 )
下面是執行結果:
False True
從上面的例子能夠看出,執行同一個函數,產生的greenlet是相同的,而產生的process是不一樣的。
在處理併發編程的時候會碰到一些問題,好比競爭資源的問題。最簡單的狀況,當有兩個線程或進程訪問同一資源而且修改這個資源的時候,就會引起資源競爭的問題。那麼這個資源最終的值就會取決於那個線程或進程是最後執行的。這是個問題,總之,在處理全局的程序不肯定行爲的時候,須要儘可能避免資源競爭的問題
最好的方法就是在任什麼時候候儘可能避免使用全局的狀態。全局狀態是常常會坑你的!
在gevent裏面封裝了一些初始化greenlet的方法,下面是幾個最經常使用的例子:
import gevent from gevent import Greenlet def foo(message, n): """ Each thread will be passed the message, and n arguments in its initialization. """ gevent.sleep(n) print(message) # Initialize a new Greenlet instance running the named function # foo thread1 = Greenlet.spawn(foo, "Hello", 1) # Wrapper for creating and runing a new Greenlet from the named # function foo, with the passed arguments thread2 = gevent.spawn(foo, "I live!", 2) # Lambda expressions thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] # Block until all threads complete. gevent.joinall(threads)
在上面的程序裏使用 spawn 方法來產生greenlet。還有一種初始化greenlet的方法,就是建立Greenlet的子類,而且重寫 _run 方法。
import gevent from gevent import Greenlet class MyGreenlet(Greenlet): def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()
就像其餘的代碼同樣,greenlet在執行的時候也會出錯。Greenlet有可能會沒法拋出異常,中止失敗,或者消耗了太多的系統資源。
greenlet的內部狀態一般是一個依賴時間的參數。greenlet有一些標記來讓你可以監控greenlet的狀態。
import gevent def win(): return 'You win!' def fail(): raise Exception('You fail at failing.') winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) # True print(loser.started) # True # Exceptions raised in the Greenlet, stay inside the Greenlet. try: gevent.joinall([winner, loser]) except Exception as e: print('This will never be reached') print(winner.value) # 'You win!' print(loser.value) # None print(winner.ready()) # True print(loser.ready()) # True print(winner.successful()) # True print(loser.successful()) # False # The exception raised in fail, will not propogate outside the # greenlet. A stack trace will be printed to stdout but it # will not unwind the stack of the parent. print(loser.exception) # It is possible though to raise the exception again outside # raise loser.exception # or with # loser.get()
這段代碼的執行結果以下:
True True You win! None True True True False You fail at failing.
在主程序收到一個SIGQUIT 以後會阻塞程序的執行讓Greenlet沒法繼續執行。這會致使殭屍進程的產生,須要在操做系統中將這些殭屍進程清除掉。
import gevent import signal def run_forever(): gevent.sleep(1000) if __name__ == '__main__': gevent.signal(signal.SIGQUIT, gevent.shutdown) thread = gevent.spawn(run_forever) thread.join()
gevent提供了對與代碼運行時的時間限制功能,也就是超時功能。
import gevent from gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print 'Could not complete'
也能夠經過用with 上下文的方法來實現超時的功能:
import gevent from gevent import Timeout time_to_wait = 5 # seconds class TooLong(Exception): pass with Timeout(time_to_wait, TooLong): gevent.sleep(10)
gevent還提供了一些超時的參數以應對不一樣的情況:
import gevent from gevent import Timeout def wait(): gevent.sleep(2) timer = Timeout(1).start() thread1 = gevent.spawn(wait) try: thread1.join(timeout=timer) except Timeout: print('Thread 1 timed out') # -- timer = Timeout.start_new(1) thread2 = gevent.spawn(wait) try: thread2.get(timeout=timer) except Timeout: print('Thread 2 timed out') # -- try: gevent.with_timeout(1, wait) except Timeout: print('Thread 3 timed out')
運行結果以下:
Thread 1 timed out Thread 2 timed out Thread 3 timed out
如今這是gevent裏面的一個難點。下面一個例子裏可能看到 monkey.patch_socket() 可以在運行時裏面修改基礎庫socket:
import socket print( socket.socket ) print "After monkey patch" from gevent import monkey monkey.patch_socket() print( socket.socket ) import select print select.select monkey.patch_select() print "After monkey patch" print( select.select )
運行結果以下:
class 'socket.socket' After monkey patch class 'gevent.socket.socket' built-in function select After monkey patch function select at 0x1924de8
Python的運行時裏面容許可以大部分的對象都是能夠修改的,包括模塊,類和方法。這一般是一個壞主意,然而在極端的狀況下,當有一個庫須要加入一些Python基本的功能的時候,monkey patch就能派上用場了。在上面的例子裏,gevent可以改變基礎庫裏的一些使用IO阻塞模型的庫好比socket,ssl,threading等等而且把它們改爲協程的執行方式。
事件是一種可讓greenlet進行異步通訊的手段。
import gevent from gevent.event import AsyncResult a = AsyncResult() def setter(): """ After 3 seconds set wake all threads waiting on the value of a. """ gevent.sleep(3) a.set() def waiter(): """ After 3 seconds the get call will unblock. """ a.get() # blocking print 'I live!' gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
AsyncResult 是 event對象的擴展可以讓你來發送值而且帶有必定延遲。這種功能被成爲feature或deferred,當它拿到一個將來的值的引用時,可以在任意安排好的時間內讓它起做用。
隊列是一個有序的數據集合,一般有 put/get 的操做,這樣能讓隊列在有在有greenletJ進行操做的時候可以進行安全的管理。
例如,若是greenlet從隊列中取出了一項數據,那麼這份數據就不能被另外一個greenlet取出。
import gevent from gevent.queue import Queue tasks = Queue() def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0) print('Quitting time!') def boss(): for i in xrange(1,25): tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ])
執行的結果以下:
Worker steve got task 1 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker nancy got task 5 Worker john got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Worker steve got task 10 Worker nancy got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker nancy got task 15 Worker steve got task 16 Worker nancy got task 17 Worker john got task 18 Worker steve got task 19 Worker john got task 20 Worker nancy got task 21 Worker steve got task 22 Worker nancy got task 23 Worker john got task 24 Quitting time! Quitting time! Quitting time!
隊列的 put/get 操做在須要的狀況下也能夠阻塞程序的執行。
put 和 get 操做都有非阻塞的副本,就是 put_nowait 和 get_nowait。
在下面代碼的例子裏,運行一個叫boss的方法,同時運行worker方法,而且對隊列有一個限制:隊列的子項不能超過3個。這個限制意味着 put 操做在隊列裏面有足夠空間以前會阻塞。相反,若是隊列裏沒有任何子項,get操做會阻塞,同時也須要超時的機制,當一個操做在阻塞超過必定時間後會拋出異常。
import gevent from gevent.queue import Queue, Empty tasks = Queue(maxsize=3) def worker(n): try: while True: task = tasks.get(timeout=1) # decrements queue size by 1 print('Worker %s got task %s' % (n, task)) gevent.sleep(0) except Empty: print('Quitting time!') def boss(): """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """ for i in xrange(1,10): tasks.put(i) print('Assigned all work in iteration 1') for i in xrange(10,20): tasks.put(i) print('Assigned all work in iteration 2') gevent.joinall([ gevent.spawn(boss), gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'bob'), ])
代碼的執行結果以下:
Worker steve got task 1 Worker john got task 2 Worker bob got task 3 Worker steve got task 4 Worker bob got task 5 Worker john got task 6 Assigned all work in iteration 1 Worker steve got task 7 Worker john got task 8 Worker bob got task 9 Worker steve got task 10 Worker bob got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker bob got task 15 Worker steve got task 16 Worker bob got task 17 Worker john got task 18 Assigned all work in iteration 2 Worker steve got task 19 Quitting time! Quitting time! Quitting time!
組是一個由greenlet組成的集合,而且可以被統一管理。
import gevent from gevent.pool import Group def talk(msg): for i in xrange(3): print(msg) g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz') group = Group() group.add(g1) group.add(g2) group.join() group.add(g3) group.join()
這在管理一組異步任務的時候會頗有用。
Group還提供了一個API來分配成組的greenlet任務,而且經過不一樣的方法來獲取結果。
import gevent from gevent import getcurrent from gevent.pool import Group group = Group() def hello_from(n): print('Size of group', len(group)) print('Hello from Greenlet %s' % id(getcurrent())) group.map(hello_from, xrange(3)) def intensive(n): gevent.sleep(3 - n) return 'task', n print('Ordered') ogroup = Group() for i in ogroup.imap(intensive, xrange(3)): print(i) print('Unordered') igroup = Group() for i in igroup.imap_unordered(intensive, xrange(3)): print(i)
執行結果以下:
Size of group 3 Hello from Greenlet 10769424 Size of group 3 Hello from Greenlet 10770544 Size of group 3 Hello from Greenlet 10772304 Ordered ('task', 0) ('task', 1) ('task', 2) Unordered ('task', 2) ('task', 1) ('task', 0)
池是用來處理當擁有動態數量的greenlet須要進行併發管理(限制併發數)時使用的。
這在處理大量的網絡和IO操做的時候是很是須要的。
import gevent from gevent.pool import Pool pool = Pool(2) def hello_from(n): print('Size of pool', len(pool)) pool.map(hello_from, xrange(3))
Size of pool 2 Size of pool 2 Size of pool 1
常常在建立gevent驅動程序的時候,整個服務須要圍繞一個池的結構來執行。
信號量是低級別的同步機制,可以讓greenlet在執行的時候互相協調而且限制其併發數。信號量暴露了兩個方法,acquire 和 release。若是信號量範圍變成0,那麼它會阻塞住直到另外一個greenlet釋放它的得到物。
from gevent import sleep from gevent.pool import Pool from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) def worker1(n): sem.acquire() print('Worker %i acquired semaphore' % n) sleep(0) sem.release() print('Worker %i released semaphore' % n) def worker2(n): with sem: print('Worker %i acquired semaphore' % n) sleep(0) print('Worker %i released semaphore' % n) pool = Pool() pool.map(worker1, xrange(0,2)) pool.map(worker2, xrange(3,6))
一下是代碼的執行結果:
Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore Worker 3 acquired semaphore Worker 4 acquired semaphore Worker 3 released semaphore Worker 4 released semaphore Worker 5 acquired semaphore Worker 5 released semaphore
若是把信號量的數量限制爲1那麼它就成爲了鎖。它常常會在多個greenlet訪問相同資源的時候用到。
Gevent還可以讓你給gevent上下文來指定那些數據是本地的。
import gevent from gevent.local import local stash = local() def f1(): stash.x = 1 print(stash.x) def f2(): stash.y = 2 print(stash.y) try: stash.x except AttributeError: print("x is not local to f2") g1 = gevent.spawn(f1) g2 = gevent.spawn(f2) gevent.joinall([g1, g2])
如下是執行結果:
1 2 x is not local to f2
不少集成了gevent的框架把HTTP的session對象存在gevent 本地線程裏面。好比下面的例子:
from werkzeug.local import LocalProxy from werkzeug.wrappers import Request from contextlib import contextmanager from gevent.wsgi import WSGIServer _requests = local() request = LocalProxy(lambda: _requests.request) @contextmanager def sessionmanager(environ): _requests.request = Request(environ) yield _requests.request = None def logic(): return "Hello " + request.remote_addr def application(environ, start_response): status = '200 OK' with sessionmanager(environ): body = logic() headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body] WSGIServer(('', 8000), application).serve_forever()
在gevent 1.0版本中,gevent.subprocess 這個庫被添加上。這個庫可以讓子進程相互協調地執行。
import gevent from gevent.subprocess import Popen, PIPE def cron(): while True: print "cron" gevent.sleep(0.2) g = gevent.spawn(cron) sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print out.rstrip()
執行結果:
cron cron cron cron cron Linux