基於協程的Python網絡庫gevent

import gevent
 
def test1():
    print 12
    gevent.sleep(0)
    print 34
 
def test2():
    print 56
    gevent.sleep(0)
    print 78
 
gevent.joinall([
    gevent.spawn(test1),
    gevent.spawn(test2),
])

解釋下,」gevent.spawn()」方法會建立一個新的greenlet協程對象,並運行它。」gevent.joinall()」方法會等待全部傳入的greenlet協程運行結束後再退出,這個方法能夠接受一個」timeout」參數來設置超時時間,單位是秒。運行上面的程序,執行順序以下:python

  1. 先進入協程test1,打印12
  2. 遇到」gevent.sleep(0)」時,test1被阻塞,自動切換到協程test2,打印56
  3. 以後test2被阻塞,這時test1阻塞已結束,自動切換回test1,打印34
  4. 當test1運行完畢返回後,此時test2阻塞已結束,再自動切換回test2,打印78
  5. 全部協程執行完畢,程序退出

因此,程序運行下來的輸出就是:git

12
56
34
78

greenlet一個協程運行完後,必須顯式切換,否則會返回其父協程。而在gevent中,一個協程運行完後,它會自動調度那些未完成的協程。github

import gevent
import socket
 
urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)
 
print [job.value for job in jobs]

咱們經過協程分別獲取三個網站的IP地址,因爲打開遠程地址會引發IO阻塞,因此gevent會自動調度不一樣的協程。另外,咱們能夠經過協程對象的」value」屬性,來獲取協程函數的返回值。安全

猴子補丁 Monkey patching

其實上面程序運行的時間同不用協程是同樣的,是三個網站打開時間的總和。但是理論上協程是非阻塞的,那運行時間應該等於最長的那個網站打開時間呀?其實這是由於Python標準庫裏的socket是阻塞式的,DNS解析沒法併發,包括像urllib庫也同樣,因此這種狀況下用協程徹底沒意義。那怎麼辦?併發

一種方法是使用gevent下的socket模塊,咱們能夠經過」from gevent import socket」來導入。不過更經常使用的方法是使用猴子布丁(Monkey patching):app

from gevent import monkey; monkey.patch_socket()
import gevent
import socket
 
urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)
 
print [job.value for job in jobs]

上述代碼的第一行就是對socket標準庫打上猴子補丁,此後socket標準庫中的類和方法都會被替換成非阻塞式的,全部其餘的代碼都不用修改,這樣協程的效率就真正體現出來了。Python中其它標準庫也存在阻塞的狀況,gevent提供了」monkey.patch_all()」方法將全部標準庫都替換。異步

from gevent import monkey; monkey.patch_all()

使用猴子補丁褒貶不一,可是官網上仍是建議使用」patch_all()」,並且在程序的第一行就執行。socket

獲取協程狀態

協程狀態有已啓動和已中止,分別能夠用協程對象的」started」屬性和」ready()」方法來判斷。對於已中止的協程,能夠用」successful()」方法來判斷其是否成功運行且沒拋異常。若是協程執行完有返回值,能夠經過」value」屬性來獲取。另外,greenlet協程運行過程當中發生的異常是不會被拋出到協程外的,所以須要用協程對象的」exception」屬性來獲取協程中的異常。下面的例子很好的演示了各類方法和屬性的使用。函數

#coding:utf8
import gevent
 
def win():
    return 'You win!'
 
def fail():
    raise Exception('You failed!')
 
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
 
print winner.started # True
print loser.started  # True
 
# 在Greenlet中發生的異常,不會被拋到Greenlet外面。
# 控制檯會打出Stacktrace,但程序不會中止
try:
    gevent.joinall([winner, loser])
except Exception as e:
    # 這段永遠不會被執行
    print 'This will never be reached'
 
print winner.ready() # True
print loser.ready()  # True
 
print winner.value # 'You win!'
print loser.value  # None
 
print winner.successful() # True
print loser.successful()  # False
 
# 這裏能夠經過raise loser.exception 或 loser.get()
# 來將協程中的異常拋出
print loser.exception

協程運行超時

在」gevent.joinall()」方法中能夠傳入timeout參數來設置超時,咱們也能夠在全局範圍內設置超時時間:網站

import gevent
from gevent import Timeout
 
timeout = Timeout(2)  # 2 seconds
timeout.start()
 
def wait():
    gevent.sleep(10)
 
try:
    gevent.spawn(wait).join()
except Timeout:
    print('Could not complete')

上例中,咱們將超時設爲2秒,此後全部協程的運行,若是超過兩秒就會拋出」Timeout」異常。咱們也能夠將超時設置在with語句內,這樣該設置只在with語句塊中有效:

with Timeout(1):
    gevent.sleep(10)

此外,咱們能夠指定超時所拋出的異常,來替換默認的」Timeout」異常。好比下例中超時就會拋出咱們自定義的」TooLong」異常。

class TooLong(Exception):
    pass
 
with Timeout(1, TooLong):
    gevent.sleep(10)

協程間通信

greenlet協程間的異步通信可使用事件(Event)對象。該對象的」wait()」方法能夠阻塞當前協程,而」set()」方法能夠喚醒以前阻塞的協程。在下面的例子中,5個waiter協程都會等待事件evt,當setter協程在3秒後設置evt事件,全部的waiter協程即被喚醒。

#coding:utf8
import gevent
from gevent.event import Event
 
evt = Event()
 
def setter():
    print 'Wait for me'
    gevent.sleep(3)  # 3秒後喚醒全部在evt上等待的協程
    print "Ok, I'm done"
    evt.set()  # 喚醒
 
def waiter():
    print "I'll wait for you"
    evt.wait()  # 等待
    print 'Finish waiting'
 
gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter)
])

除了Event事件外,gevent還提供了AsyncResult事件,它能夠在喚醒時傳遞消息。讓咱們將上例中的setter和waiter做以下改動:

from gevent.event import AsyncResult
aevt = AsyncResult()
 
def setter():
    print 'Wait for me'
    gevent.sleep(3)  # 3秒後喚醒全部在evt上等待的協程
    print "Ok, I'm done"
    aevt.set('Hello!')  # 喚醒,並傳遞消息
 
def waiter():
    print("I'll wait for you")
    message = aevt.get()  # 等待,並在喚醒時獲取消息
    print 'Got wake up message: %s' % message

隊列 Queue

gevent的隊列對象可讓greenlet協程之間安全的訪問。運行下面的程序,你會看到3個消費者會分別消費隊列中的產品,且消費過的產品不會被另外一個消費者再取到:

import gevent
from gevent.queue import Queue
 
products = Queue()
 
def consumer(name):
    while not products.empty():
        print '%s got product %s' % (name, products.get())
        gevent.sleep(0)
 
    print '%s Quit'
 
def producer():
    for i in xrange(1, 10):
        products.put(i)
 
gevent.joinall([
    gevent.spawn(producer),
    gevent.spawn(consumer, 'steve'),
    gevent.spawn(consumer, 'john'),
    gevent.spawn(consumer, 'nancy'),
])

put和get方法都是阻塞式的,它們都有非阻塞的版本:put_nowait和get_nowait。若是調用get方法時隊列爲空,則拋出」gevent.queue.Empty」異常。‘

信號量

信號量能夠用來限制協程併發的個數。它有兩個方法,acquire和release。顧名思義,acquire就是獲取信號量,而release就是釋放。當全部信號量都已被獲取,那剩餘的協程就只能等待任一協程釋放信號量後才能得以運行:

import gevent
from gevent.coros import BoundedSemaphore
 
sem = BoundedSemaphore(2)
 
def worker(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    gevent.sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)
 
gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])

上面的例子中,咱們初始化了」BoundedSemaphore」信號量,並將其個數定爲2。因此同一個時間,只能有兩個worker協程被調度。程序運行後的結果以下:

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 2 acquired semaphore
Worker 3 acquired semaphore
Worker 2 released semaphore
Worker 3 released semaphore
Worker 4 acquired semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

協程本地變量

同線程相似,協程也有本地變量,也就是隻在當前協程內可被訪問的變量:

import gevent
from gevent.local import local
 
data = local()
 
def f1():
    data.x = 1
    print data.x
 
def f2():
    try:
        print data.x
    except AttributeError:
        print 'x is not visible'
 
gevent.joinall([
    gevent.spawn(f1),
    gevent.spawn(f2)
])

經過將變量存放在local對象中,便可將其的做用域限制在當前協程內,當其餘協程要訪問該變量時,就會拋出異常。不一樣協程間能夠有重名的本地變量,並且互相不影響。由於協程本地變量的實現,就是將其存放在以的」greenlet.getcurrent()」的返回爲鍵值的私有的命名空間內。

實際應用

基於Flask聊天室

https://github.com/sdiehl/minichat/blob/master/app.py

相關文章
相關標籤/搜索