gevent程序員指南

轉自http://xlambda.com/gevent-tutorial/html

gevent程序員指南

由Gevent社區編寫

gevent是一個基於 libev的併發庫。它爲各類併發和網絡相關的任務提供了整潔的API。

介紹

本指南假定讀者有中級Python水平,但不要求有其它更多的知識,不期待讀者有併發方面的知識。本指南的目標在於給予你須要的工具來開始使用gevent,幫助你馴服現有的併發問題,並從今開始編寫異步應用程序。python

貢獻者

按提供貢獻的時間前後順序列出以下: Stephen Diehl Jérémy Bethmont sww Bruno Bigras David Ripton Travis Cline Boris Feld youngsterxyf Eddie Hebert Alexis Metaireau Daniel Velkovgit

同時感謝Denis Bilenko寫了gevent和相應的指導以造成本指南。程序員

這是一個以MIT許可證發佈的協做文檔。你想添加一些內容?或看見一個排版錯誤? Fork一個分支發佈一個request到 Github. 咱們歡迎任何貢獻。github

本頁也有日文版本web

核心部分

Greenlets

在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。express

在任什麼時候刻,只有一個協程在運行。json

這與multiprocessingthreading等提供真正並行構造的庫是不一樣的。這些庫輪轉使用操做系統調度的進程和線程,是真正的並行。數組

同步和異步執行

併發的核心思想在於,大的任務能夠分解成一系列的子任務,後者能夠被調度成同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的切換也就是上下文切換服務器

在gevent裏面,上下文切換是經過yielding來完成的. 在下面的例子裏,咱們有兩個上下文,經過調用gevent.sleep(0),它們各自yield向對方。

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

 

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

 

下圖將控制流形象化,就像在調試器中單步執行整個程序,以說明上下文切換如何發生。

Greenlet Control Flow

當咱們在受限於網絡或IO的函數中使用gevent,這些函數會被協做式的調度, gevent的真正能力會獲得發揮。Gevent處理了全部的細節,來保證你的網絡庫會在可能的時候,隱式交出greenlet上下文的執行權。這樣的一種用法是如何強大,怎麼強調都不爲過。或者咱們舉些例子來詳述。

下面例子中的select()函數一般是一個在各類文件描述符上輪詢的阻塞調用。

import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, %s" % tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

 

Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds

 

下面是另一個多少有點人造色彩的例子,定義一個非肯定性的(non-deterministic)task函數(給定相同輸入的狀況下,它的輸出不保證相同)。此例中執行這個函數的反作用就是,每次task在它的執行過程當中都會隨機地停某些秒。

import gevent
import random

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(random.randint(0,2)*0.001)
    print('Task %s done' % pid)

def synchronous():
    for i in range(1,10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

 

Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 3 done
Task 7 done
Task 9 done
Task 2 done
Task 4 done
Task 1 done
Task 8 done
Task 6 done
Task 0 done
Task 5 done

 

上例中,在同步的部分,全部的task都同步的執行,結果當每一個task在執行時主流程被阻塞(主流程的執行暫時停住)。

程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在全部greenlet執行完後纔會繼續向下走。

要重點留意的是,異步的部分本質上是隨機的,並且異步部分的總體運行時間比同步要大大減小。事實上,同步部分的最大運行時間,便是每一個task停0.002秒,結果整個隊列要停0.02秒。而異步部分的最大運行時間大體爲0.002秒,由於沒有任何一個task會阻塞其它task的執行。

一個更常見的應用場景,如異步地向服務器取數據,取數據操做的執行時間依賴於發起取數據請求時遠端服務器的負載,各個請求的執行時間會有差異。

import gevent.monkey
gevent.monkey.patch_socket()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
    response = urllib2.urlopen('http://json-time.appspot.com/time.json')
    result = response.read()
    json_result = json.loads(result)
    datetime = json_result['datetime']

    print('Process %s: %s' % (pid, datetime))
    return json_result['datetime']

def synchronous():
    for i in range(1,10):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,10):
        threads.append(gevent.spawn(fetch, i))
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

肯定性

就像以前所提到的,greenlet具備肯定性。在相同配置相同輸入的狀況下,它們老是會產生相同的輸出。下面就有例子,咱們在multiprocessing的pool之間執行一系列的任務,與在gevent的pool之間執行做比較。

import time

def echo(i):
    time.sleep(0.001)
    return i

# Non Deterministic Process Pool

from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)

# Deterministic Gevent Pool

from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)
False
True

即便gevent一般帶有肯定性,當開始與如socket或文件等外部服務交互時,不肯定性也可能溜進你的程序中。所以儘管gevent線程是一種「肯定的併發」形式,使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。

涉及併發長期存在的問題就是競爭條件(race condition)。簡單來講,當兩個併發線程/進程都依賴於某個共享資源同時都嘗試去修改它的時候,就會出現競爭條件。這會致使資源修改的結果狀態依賴於時間和執行順序。這是個問題,咱們通常會作不少努力嘗試避免競爭條件,由於它會致使整個程序行爲變得不肯定。

最好的辦法是始終避免全部全局的狀態。全局狀態和導入時(import-time)反作用老是會反咬你一口!

建立Greenlets

gevent對Greenlet初始化提供了一些封裝,最經常使用的使用模板之一有

import gevent
from gevent import Greenlet

def foo(message, n):
    """
    Each thread will be passed the message, and n arguments
    in its initialization.
    """
    gevent.sleep(n)
    print(message)

# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)

# Wrapper for creating and running a new Greenlet from the named
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)

# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

# Block until all threads complete.
gevent.joinall(threads)

 

Hello
I live!

 

除使用基本的Greenlet類以外,你也能夠子類化Greenlet類,重載它的_run方法。

import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

 

Hi there!

 

Greenlet狀態

就像任何其餘成段代碼,Greenlet也可能以不一樣的方式運行失敗。 Greenlet可能未能成功拋出異常,不能中止運行,或消耗了太多的系統資源。

一個greenlet的狀態一般是一個依賴於時間的參數。在greenlet中有一些標誌,讓你能夠監視它的線程內部狀態:

  • started -- Boolean, 指示此Greenlet是否已經啓動
  • ready() -- Boolean, 指示此Greenlet是否已經中止
  • successful() -- Boolean, 指示此Greenlet是否已經中止並且沒拋異常
  • value -- 任意值, 此Greenlet代碼返回的值
  • exception -- 異常, 此Greenlet內拋出的未捕獲異常
import gevent

def win():
    return 'You win!'

def fail():
    raise Exception('You fail at failing.')

winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started)  # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
    gevent.joinall([winner, loser])
except Exception as e:
    print('This will never be reached')

print(winner.value) # 'You win!'
print(loser.value)  # None

print(winner.ready()) # True
print(loser.ready())  # True

print(winner.successful()) # True
print(loser.successful())  # False

# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()

 

True
True
You win!
None
True
True
True
False
You fail at failing.

 

程序中止

當主程序(main program)收到一個SIGQUIT信號時,不能成功作yield操做的 Greenlet可能會令意外地掛起程序的執行。這致使了所謂的殭屍進程,它須要在Python解釋器以外被kill掉。

對此,一個通用的處理模式就是在主程序中監聽SIGQUIT信號,在程序退出調用gevent.shutdown

import gevent
import signal

def run_forever():
    gevent.sleep(1000)

if __name__ == '__main__':
    gevent.signal(signal.SIGQUIT, gevent.shutdown)
    thread = gevent.spawn(run_forever)
    thread.join()

超時

超時是一種對一塊代碼或一個Greenlet的運行時間的約束。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print('Cou
相關文章
相關標籤/搜索