python gevent 模塊

 Greenlethtml

    在gevent裏面最多應用到的就是greenlet,一個輕量級的協程實現。在任什麼時候間點,只有一個greenlet處於運行狀態。Greenlet與multiprocessing 和 threading這兩個庫提供的真正的並行結構的區別在於這兩個庫會真正的切換進程,POSIX線程是由操做系統來負責調度,而且它們是真正並行的。python

同步和異步

    應對併發的主要思路就是將一個大的任務分解成一個子任務的集合而且可以讓它並行或者異步地執行,而不是一次執行一個或者同步執行。在兩個子任務中的切換被稱爲上下文切換。shell

    gevent裏面的上下文切換是很是平滑的。在下面的例子程序中,咱們能夠看到兩個上下文經過調用 gevent.sleep()來互相切換。express

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

    這段程序的執行結果以下:編程

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

    從這個執行結果能夠看出這個程序的執行過程,在這裏的兩個函數是交替執行的。json

    gevent的真正威力是在處理網絡和帶有IO阻塞的功能時可以這些任務協調地運行。gevent來實現了這些具體的細節來保證在須要的時候greenlet上下文進行切換。在這裏用一個例子來講明。安全

import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: ', tic())
    select.select([], [], [], 2)
    print('Ended Polling: ', tic())

def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: ', tic())
    select.select([], [], [], 2)
    print('Ended Polling: ', tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, at", tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

在上面的例子裏,select() 一般是一個阻塞的調用。服務器

程序的執行結果以下:網絡

Started Polling:  at 0.0 seconds
Started Polling:  at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at at 0.0 seconds
Ended Polling:  at 2.0 seconds
Ended Polling:  at 2.0 seconds

接下來一個例子中能夠看到gevent是安排各個任務的執行的。session

import gevent
import random

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(random.randint(0,2)*0.001)
    print('Task', pid, 'done')

def synchronous():
    for i in range(1,10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

執行結果以下:

root@master:~# python two.py 
Synchronous:
('Task', 1, 'done')
('Task', 2, 'done')
('Task', 3, 'done')
('Task', 4, 'done')
('Task', 5, 'done')
('Task', 6, 'done')
('Task', 7, 'done')
('Task', 8, 'done')
('Task', 9, 'done')
Asynchronous:
('Task', 0, 'done')
('Task', 9, 'done')
('Task', 7, 'done')
('Task', 3, 'done')
('Task', 6, 'done')
('Task', 5, 'done')
('Task', 4, 'done')
('Task', 1, 'done')
('Task', 2, 'done')
('Task', 8, 'done')

在同步的狀況下,任務是按順序執行的,在執行各個任務的時候會阻塞主線程。

而gevent.spawn 的重要功能就是封裝了greenlet裏面的函數。初始化的greenlet放在了threads這個list裏面,被傳遞給了 gevent.joinall 這個函數,它會阻塞當前的程序來執行全部的greenlet。

在異步執行的狀況下,全部任務的執行順序是徹底隨機的。每個greenlet的都不會阻塞其餘greenlet的執行。

在有時候須要異步地從服務器獲取數據,gevent能夠經過判斷從服務器的數據載入狀況來處理請求。

import gevent.monkey
gevent.monkey.patch_socket()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
    response = urllib2.urlopen('http://json-time.appspot.com/time.json')
    result = response.read()
    json_result = json.loads(result)
    datetime = json_result['datetime']

    print 'Process ', pid, datetime
    return json_result['datetime']

def synchronous():
    for i in range(1,10):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,10):
        threads.append(gevent.spawn(fetch, i))
    gevent.joinall(threads)

print 'Synchronous:'
synchronous()

print 'Asynchronous:'
asynchronous()

肯定性

就像以前說的,greenlet是肯定的。給每一個greenlet相同的配置和相同的輸入,獲得的輸出是相同的。咱們能夠用python 的多進程池和gevent池來做比較。下面的例子能夠說明這個特色:

import time

def echo(i):
    time.sleep(0.001)
    return i

# Non Deterministic Process Pool

from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print( run1 == run2 == run3 == run4 )

# Deterministic Gevent Pool

from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print( run1 == run2 == run3 == run4 )

下面是執行結果:

False
True

 從上面的例子能夠看出,執行同一個函數,產生的greenlet是相同的,而產生的process是不一樣的。

 在處理併發編程的時候會碰到一些問題,好比競爭資源的問題。最簡單的狀況,當有兩個線程或進程訪問同一資源而且修改這個資源的時候,就會引起資源競爭的問題。那麼這個資源最終的值就會取決於那個線程或進程是最後執行的。這是個問題,總之,在處理全局的程序不肯定行爲的時候,須要儘可能避免資源競爭的問題

 最好的方法就是在任什麼時候候儘可能避免使用全局的狀態。全局狀態是常常會坑你的!

產生Greenlet

在gevent裏面封裝了一些初始化greenlet的方法,下面是幾個最經常使用的例子:

import gevent
from gevent import Greenlet

def foo(message, n):
    """
    Each thread will be passed the message, and n arguments
    in its initialization.
    """
    gevent.sleep(n)
    print(message)

# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)

# Wrapper for creating and runing a new Greenlet from the named 
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)

# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

# Block until all threads complete.
gevent.joinall(threads)

在上面的程序裏使用 spawn 方法來產生greenlet。還有一種初始化greenlet的方法,就是建立Greenlet的子類,而且重寫 _run 方法。

import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

Greenlet 的狀態

就像其餘的代碼同樣,greenlet在執行的時候也會出錯。Greenlet有可能會沒法拋出異常,中止失敗,或者消耗了太多的系統資源。

greenlet的內部狀態一般是一個依賴時間的參數。greenlet有一些標記來讓你可以監控greenlet的狀態。

  • started -- 標誌greenlet是否已經啓動
  • ready -- 標誌greenlet是否已經被終止
  • successful() -- 標誌greenlet是否已經被終止,而且沒有拋出異常
  • value -- 由greenlet返回的值
  • exception -- 在greenlet裏面沒有被捕獲的異常
import gevent

def win():
    return 'You win!'

def fail():
    raise Exception('You fail at failing.')

winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started)  # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
    gevent.joinall([winner, loser])
except Exception as e:
    print('This will never be reached')

print(winner.value) # 'You win!'
print(loser.value)  # None

print(winner.ready()) # True
print(loser.ready())  # True

print(winner.successful()) # True
print(loser.successful())  # False

# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()

 這段代碼的執行結果以下:

True
True
You win!
None
True
True
True
False
You fail at failing.

終止程序

在主程序收到一個SIGQUIT 以後會阻塞程序的執行讓Greenlet沒法繼續執行。這會致使殭屍進程的產生,須要在操做系統中將這些殭屍進程清除掉。

import gevent
import signal

def run_forever():
    gevent.sleep(1000)

if __name__ == '__main__':
    gevent.signal(signal.SIGQUIT, gevent.shutdown)
    thread = gevent.spawn(run_forever)
    thread.join()

超時

gevent提供了對與代碼運行時的時間限制功能,也就是超時功能。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print 'Could not complete'

也能夠經過用with 上下文的方法來實現超時的功能:

import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

gevent還提供了一些超時的參數以應對不一樣的情況:

import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Thread 2 timed out')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Thread 3 timed out')

運行結果以下:

Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

Monkeypatching

如今這是gevent裏面的一個難點。下面一個例子裏可能看到 monkey.patch_socket() 可以在運行時裏面修改基礎庫socket:

import socket
print( socket.socket )

print "After monkey patch"
from gevent import monkey
monkey.patch_socket()
print( socket.socket )

import select
print select.select
monkey.patch_select()
print "After monkey patch"
print( select.select )

 運行結果以下:

class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'

built-in function select
After monkey patch
function select at 0x1924de8

 Python的運行時裏面容許可以大部分的對象都是能夠修改的,包括模塊,類和方法。這一般是一個壞主意,然而在極端的狀況下,當有一個庫須要加入一些Python基本的功能的時候,monkey patch就能派上用場了。在上面的例子裏,gevent可以改變基礎庫裏的一些使用IO阻塞模型的庫好比socket,ssl,threading等等而且把它們改爲協程的執行方式。

 

事件

事件是一種可讓greenlet進行異步通訊的手段。

 

import gevent
from gevent.event import AsyncResult

a = AsyncResult()

def setter():
    """
    After 3 seconds set wake all threads waiting on the value of
    a.
    """
    gevent.sleep(3)
    a.set()

def waiter():
    """
    After 3 seconds the get call will unblock.
    """
    a.get() # blocking
    print 'I live!'

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

AsyncResult 是 event對象的擴展可以讓你來發送值而且帶有必定延遲。這種功能被成爲feature或deferred,當它拿到一個將來的值的引用時,可以在任意安排好的時間內讓它起做用。

隊列

 

隊列是一個有序的數據集合,一般有 put/get 的操做,這樣能讓隊列在有在有greenletJ進行操做的時候可以進行安全的管理。

例如,若是greenlet從隊列中取出了一項數據,那麼這份數據就不能被另外一個greenlet取出。

 

import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n):
    while not tasks.empty():
        task = tasks.get()
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)

    print('Quitting time!')

def boss():
    for i in xrange(1,25):
        tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),
])

 

執行的結果以下:

Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!

隊列的 put/get 操做在須要的狀況下也能夠阻塞程序的執行。

put 和 get 操做都有非阻塞的副本,就是 put_nowait 和 get_nowait。

在下面代碼的例子裏,運行一個叫boss的方法,同時運行worker方法,而且對隊列有一個限制:隊列的子項不能超過3個。這個限制意味着 put 操做在隊列裏面有足夠空間以前會阻塞。相反,若是隊列裏沒有任何子項,get操做會阻塞,同時也須要超時的機制,當一個操做在阻塞超過必定時間後會拋出異常。

import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)

def worker(n):
    try:
        while True:
            task = tasks.get(timeout=1) # decrements queue size by 1
            print('Worker %s got task %s' % (n, task))
            gevent.sleep(0)
    except Empty:
        print('Quitting time!')

def boss():
    """
    Boss will wait to hand out work until a individual worker is
    free since the maxsize of the task queue is 3.
    """

    for i in xrange(1,10):
        tasks.put(i)
    print('Assigned all work in iteration 1')

    for i in xrange(10,20):
        tasks.put(i)
    print('Assigned all work in iteration 2')

gevent.joinall([
    gevent.spawn(boss),
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'bob'),
])

代碼的執行結果以下:

Worker steve got task 1
Worker john got task 2
Worker bob got task 3
Worker steve got task 4
Worker bob got task 5
Worker john got task 6
Assigned all work in iteration 1
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
Worker steve got task 10
Worker bob got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
Worker steve got task 16
Worker bob got task 17
Worker john got task 18
Assigned all work in iteration 2
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!

組和池

組是一個由greenlet組成的集合,而且可以被統一管理。

import gevent
from gevent.pool import Group

def talk(msg):
    for i in xrange(3):
        print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.add(g2)
group.join()

group.add(g3)
group.join()

這在管理一組異步任務的時候會頗有用。

Group還提供了一個API來分配成組的greenlet任務,而且經過不一樣的方法來獲取結果。

import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n):
    print('Size of group', len(group))
    print('Hello from Greenlet %s' % id(getcurrent()))

group.map(hello_from, xrange(3))

def intensive(n):
    gevent.sleep(3 - n)
    return 'task', n

print('Ordered')

ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
    print(i)

print('Unordered')

igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
    print(i)

執行結果以下:

Size of group 3
Hello from Greenlet 10769424
Size of group 3
Hello from Greenlet 10770544
Size of group 3
Hello from Greenlet 10772304
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)

池是用來處理當擁有動態數量的greenlet須要進行併發管理(限制併發數)時使用的。

這在處理大量的網絡和IO操做的時候是很是須要的。

import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):
    print('Size of pool', len(pool))

pool.map(hello_from, xrange(3))
Size of pool 2
Size of pool 2
Size of pool 1

常常在建立gevent驅動程序的時候,整個服務須要圍繞一個池的結構來執行。

鎖和信號量

信號量是低級別的同步機制,可以讓greenlet在執行的時候互相協調而且限制其併發數。信號量暴露了兩個方法,acquire 和 release。若是信號量範圍變成0,那麼它會阻塞住直到另外一個greenlet釋放它的得到物。

from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)

def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))

一下是代碼的執行結果:

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

 若是把信號量的數量限制爲1那麼它就成爲了鎖。它常常會在多個greenlet訪問相同資源的時候用到。

本地線程

Gevent還可以讓你給gevent上下文來指定那些數據是本地的。

import gevent
from gevent.local import local

stash = local()

def f1():
    stash.x = 1
    print(stash.x)

def f2():
    stash.y = 2
    print(stash.y)

    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")

g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])

 如下是執行結果:

1
2
x is not local to f2

 不少集成了gevent的框架把HTTP的session對象存在gevent 本地線程裏面。好比下面的例子:

from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager

from gevent.wsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)

@contextmanager
def sessionmanager(environ):
    _requests.request = Request(environ)
    yield
    _requests.request = None

def logic():
    return "Hello " + request.remote_addr

def application(environ, start_response):
    status = '200 OK'

    with sessionmanager(environ):
        body = logic()

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer(('', 8000), application).serve_forever()

 子進程

在gevent 1.0版本中,gevent.subprocess 這個庫被添加上。這個庫可以讓子進程相互協調地執行。

import gevent
from gevent.subprocess import Popen, PIPE

def cron():
    while True:
        print "cron"
        gevent.sleep(0.2)

g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print out.rstrip()

 執行結果:

cron
cron
cron
cron
cron
Linux
相關文章
相關標籤/搜索