Gevent簡明教程

 

Gevent簡明教程

前述

進程 線程 協程 異步

併發編程(不是並行)目前有四種方式:多進程、多線程、協程和異步。html

  • 多進程編程在python中有相似C的os.fork,更高層封裝的有multiprocessing標準庫
  • 多線程編程python中有Thread和threading
  • 異步編程在linux下主+要有三種實現select,poll,epoll
  • 協程在python中一般會說到yield,關於協程的庫主要有greenlet,stackless,gevent,eventlet等實現。

進程

  • 不共享任何狀態
  • 調度由操做系統完成
  • 有獨立的內存空間(上下文切換的時候須要保存棧、cpu寄存器、虛擬內存、以及打開的相關句柄等信息,開銷大)
  • 通信主要經過信號傳遞的方式來實現(實現方式有多種,信號量、管道、事件等,通信都須要過內核,效率低)

線程

  • 共享變量(解決了通信麻煩的問題,可是對於變量的訪問須要加鎖)
  • 調度由操做系統完成(因爲共享內存,上下文切換變得高效)
  • 一個進程能夠有多個線程,每一個線程會共享父進程的資源(建立線程開銷佔用比進程小不少,可建立的數量也會不少)
  • 通信除了可以使用進程間通信的方式,還能夠經過共享內存的方式進行通訊(經過共享內存通訊比經過內核要快不少)

協程

  • 調度徹底由用戶控制
  • 一個線程(進程)能夠有多個協程
  • 每一個線程(進程)循環按照指定的任務清單順序完成不一樣的任務(當任務被堵塞時,執行下一個任務;當恢復時,再回來執行這個任務;任務間切換隻須要保存任務的上下文,沒有內核的開銷,能夠不加鎖的訪問全局變量)
  • 協程須要保證是非堵塞的且沒有相互依賴
  • 協程基本上不能同步通信,多采用異步的消息通信,效率比較高

總結

  • 進程擁有本身獨立的堆和棧,既不共享堆,亦不共享棧,進程由操做系統調度
  • 線程擁有本身獨立的棧和共享的堆,共享堆,不共享棧,線程亦由操做系統調度(標準線程是的)
  • 協程和線程同樣共享堆,不共享棧,協程由程序員在協程的代碼裏顯示調度

聊聊協程

協程,又稱微線程,纖程。
Python的線程並非標準線程,是系統級進程,線程間上下文切換有開銷,並且Python在執行多線程時默認加了一個全局解釋器鎖(GIL),所以Python的多線程實際上是串行的,因此並不能利用多核的優點,也就是說一個進程內的多個線程只能使用一個CPU。python

def coroutine(func):
    def ret():
        f = func()
        f.next()
        return f
    return ret


@coroutine
def consumer():
    print "Wait to getting a task"
    while True:
        n = (yield)
        print "Got %s",n


import time
def producer():
    c = consumer()
    task_id = 0
    while True:
        time.sleep(1)
        print "Send a task to consumer" % task_id
        c.send("task %s" % task_id)

if __name__ == "__main__":
    producer()

結果:linux

Wait to getting a task
Send a task 0 to consumer
Got task 0
Send a task 1 to consumer
Got task 1
Send a task 2 to consumer
Got task 2
...

傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但容易死鎖。
若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。git

Gevent

介紹

gevent是基於協程的Python網絡庫。特色:程序員

  • 基於libev的快速事件循環(Linux上epoll,FreeBSD上kqueue)。
  • 基於greenlet的輕量級執行單元。
  • API的概念和Python標準庫一致(如事件,隊列)。
  • 能夠配合socket,ssl模塊使用。
  • 可以使用標準庫和第三方模塊建立標準的阻塞套接字(gevent.monkey)。
  • 默認經過線程池進行DNS查詢,也可經過c-are(經過GEVENT_RESOLVER=ares環境變量開啓)。
  • TCP/UDP/HTTP服務器
  • 子進程支持(經過gevent.subprocess)
  • 線程池

安裝和依賴

依賴於greenlet library
支持python 2.6+ 、3.3+github

核心部分

  • Greenlets
  • 同步和異步執行
  • 肯定性
  • 建立Greenlets
  • Greenlet狀態
  • 程序中止
  • 超時
  • 猴子補丁

####Greenlets
gevent中的主要模式, 它是以C擴展模塊形式接入Python的輕量級協程。 所有運行在主程序操做系統進程的內部,但它們被程序員協做式地調度。web

在任什麼時候刻,只有一個協程在運行。redis

區別於multiprocessing、threading等提供真正並行構造的庫, 這些庫輪轉使用操做系統調度的進程和線程,是真正的並行。shell

同步和異步執行

併發的核心思想在於,大的任務能夠分解成一系列的子任務,後者能夠被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。編程

在gevent裏面,上下文切換是經過yielding來完成的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent
 
def foo():
print( 'Running in foo')
gevent.sleep( 0)
print( 'Explicit context switch to foo again')
 
def bar():
print( 'Explicit context to bar')
gevent.sleep( 0)
print( 'Implicit context switch back to bar')
 
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])

 

執行結果:

1
2
3
4
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

 

代碼執行過程:

Alt text

網絡延遲或IO阻塞隱式交出greenlet上下文的執行權。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
import gevent
from gevent import select
 
start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)
 
def gr1():
print( 'Started Polling: %s' % tic())
select.select([], [], [], 1)
print( 'Ended Polling: %s' % tic())
 
def gr2():
print( 'Started Polling: %s' % tic())
select.select([], [], [], 2)
print( 'Ended Polling: %s' % tic())
 
def gr3():
print( "Hey lets do some stuff while the greenlets poll, %s" % tic())
gevent.sleep( 1)
 
gevent.joinall([
gevent.spawn(gr1),
gevent.spawn(gr2),
gevent.spawn(gr3),
])

 

執行結果:

1
2
3
4
5
Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 1.0 seconds
Ended Polling: at 2.0 seconds

 

同步vs異步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import gevent
import random
 
def task(pid):
gevent.sleep(random.randint( 0,2)*0.001)
print( 'Task %s done' % pid)
 
def synchronous():
for i in xrange(5):
task(i)
 
def asynchronous():
threads = [gevent.spawn(task, i) for i in xrange(5)]
gevent.joinall(threads)
 
print( 'Synchronous:')
synchronous()
 
print( 'Asynchronous:')
asynchronous()

 

執行結果:

1
2
3
4
5
6
7
8
9
10
11
12
Synchronous:
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Asynchronous:
Task 2 done
Task 0 done
Task 1 done
Task 3 done
Task 4 done

 

肯定性

greenlet具備肯定性。在相同配置相同輸入的狀況下,它們老是會產生相同的輸出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
 
def echo(i):
time.sleep( 0.001)
return i
 
# Non Deterministic Process Pool
from multiprocessing.pool import Pool
 
p = Pool( 10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
 
print(run1 == run2 == run3 == run4)
 
# Deterministic Gevent Pool
from gevent.pool import Pool
 
p = Pool( 10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
 
print(run1 == run2 == run3 == run4)

執行結果:

1
2
False
True

 

即便gevent一般帶有肯定性,當開始與如socket或文件等外部服務交互時, 不肯定性也可能溜進你的程序中。所以儘管gevent線程是一種「肯定的併發」形式, 使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。

涉及併發長期存在的問題就是競爭條件(race condition)(當兩個併發線程/進程都依賴於某個共享資源同時都嘗試去修改它的時候, 就會出現競爭條件),這會致使資源修改的結果狀態依賴於時間和執行順序。 這個問題,會致使整個程序行爲變得不肯定。

解決辦法: 始終避免全部全局的狀態.

建立Greenlets

gevent對Greenlet初始化提供了一些封裝.

1
2
3
4
5
6
7
8
9
10
11
12
import gevent
from gevent import Greenlet
 
def foo(message, n):
gevent.sleep(n)
print(message)
 
thread1 = Greenlet.spawn(foo, "Hello", 1)
thread2 = gevent.spawn(foo, "I live!", 2)
thread3 = gevent.spawn( lambda x: (x+1), 2)
threads = [thread1, thread2, thread3]
gevent.joinall(threads)

 

執行結果:

1
2
Hello
I live!

 

除使用基本的Greenlet類以外,你也能夠子類化Greenlet類,重載它的_run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import gevent
from gevent import Greenlet
 
class MyGreenlet(Greenlet):
 
def __init__(self, message, n):
Greenlet.__init__(self)
self.message = message
self.n = n
 
def _run(self):
print(self.message)
gevent.sleep(self.n)
 
g = MyGreenlet( "Hi there!", 3)
g.start()
g.join()

 

執行結果:

1
Hi there!

 

Greenlet狀態

greenlet的狀態一般是一個依賴於時間的參數:

  • started – Boolean, 指示此Greenlet是否已經啓動
  • ready() – Boolean, 指示此Greenlet是否已經中止
  • successful() – Boolean, 指示此Greenlet是否已經中止並且沒拋異常
  • value – 任意值, 此Greenlet代碼返回的值
  • exception – 異常, 此Greenlet內拋出的未捕獲異常

程序中止

程序
當主程序(main program)收到一個SIGQUIT信號時,不能成功作yield操做的 Greenlet可能會令意外地掛起程序的執行。這致使了所謂的殭屍進程, 它須要在Python解釋器以外被kill掉。

通用的處理模式就是在主程序中監聽SIGQUIT信號,調用gevent.shutdown退出程序。

1
2
3
4
5
6
7
8
9
10
import gevent
import signal
 
def run_forever():
gevent.sleep( 1000)
 
if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.shutdown)
thread = gevent.spawn(run_forever)
thread.join()

 

超時

經過超時能夠對代碼塊兒或一個Greenlet的運行時間進行約束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import gevent
from gevent import Timeout
 
seconds = 10
 
timeout = Timeout(seconds)
timeout.start()
 
def wait():
gevent.sleep( 10)
 
try:
gevent.spawn(wait).join()
except Timeout:
print( 'Could not complete')

 

超時類

1
2
3
4
5
6
7
8
9
10
import gevent
from gevent import Timeout
 
time_to_wait = 5 # seconds
 
class TooLong(Exception):
pass
 
with Timeout(time_to_wait, TooLong):
gevent.sleep( 10)

 

另外,對各類Greenlet和數據結構相關的調用,gevent也提供了超時參數。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import gevent
from gevent import Timeout
 
def wait():
gevent.sleep( 2)
 
timer = Timeout( 1).start()
thread1 = gevent.spawn(wait)
 
try:
thread1.join(timeout=timer)
except Timeout:
print( 'Thread 1 timed out')
 
# --
 
timer = Timeout.start_new( 1)
thread2 = gevent.spawn(wait)
 
try:
thread2.get(timeout=timer)
except Timeout:
print( 'Thread 2 timed out')
 
# --
 
try:
gevent.with_timeout( 1, wait)
except Timeout:
print( 'Thread 3 timed out')

 

執行結果:

1
2
3
Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

 

猴子補丁(Monkey patching)

gevent的死角.

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket
print(socket.socket)
 
print( "After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)
 
import select
print(select.select)
monkey.patch_select()
print( "After monkey patch")
print(select.select)

 

執行結果:

1
2
3
4
5
6
7
class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'
 
built-in function select
After monkey patch
function select at 0x1924de8

 

Python的運行環境容許咱們在運行時修改大部分的對象,包括模塊,類甚至函數。 這是個通常說來使人驚奇的壞主意,由於它創造了「隱式的反作用」,若是出現問題 它不少時候是極難調試的。雖然如此,在極端狀況下當一個庫須要修改Python自己 的基礎行爲的時候,猴子補丁就派上用場了。在這種狀況下,gevent可以修改標準庫裏面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變爲協做式運行。

例如,Redis的python綁定通常使用常規的tcp socket來與redis-server實例通訊。 經過簡單地調用gevent.monkey.patch_all(),可使得redis的綁定協做式的調度 請求,與gevent棧的其它部分一塊兒工做。

這讓咱們能夠將通常不能與gevent共同工做的庫結合起來,而不用寫哪怕一行代碼。 雖然猴子補丁仍然是邪惡的(evil),但在這種狀況下它是「有用的邪惡(useful evil)」。

數據結構

  • 事件
  • 隊列
  • 組和池
  • 鎖和信號量
  • 線程局部變量
  • 子進程
  • Actors

    事件

    事件(event)是一個在Greenlet之間異步通訊的形式。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import gevent
    from gevent.event import Event
     
    evt = Event()
     
    def setter():
    print( 'A: Hey wait for me, I have to do something')
    gevent.sleep( 3)
    print( "Ok, I'm done")
    evt.set()
     
    def waiter():
    print( "I'll wait for you")
    evt.wait() # blocking
    print( "It's about time")
     
    def main():
    gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
    gevent.spawn(waiter),
    gevent.spawn(waiter)
    ])
     
    if __name__ == '__main__':
    main()

執行結果:

1
2
3
4
5
6
7
8
A: Hey wait for me, I have to do something
I'll wait for you
I'll wait for you
I'll wait for you
Ok, I'm done
It's about time
It's about time
It's about time

 

事件對象的一個擴展是AsyncResult,它容許你在喚醒調用上附加一個值。 它有時也被稱做是future或defered,由於它持有一個指向未來任意時間可設置爲任何值的引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import gevent
from gevent.event import AsyncResult
a = AsyncResult()
 
def setter():
gevent.sleep( 3)
a.set( 'Hello!')
 
def waiter():
print(a.get())
 
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])

 

隊列

隊列是一個排序的數據集合,它有常見的put / get操做, 可是它是以在Greenlet之間能夠安全操做的方式來實現的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import gevent
from gevent.queue import Queue
 
tasks = Queue()
 
def worker(n):
while not tasks.empty():
task = tasks.get()
print( 'Worker %s got task %s' % (n, task))
gevent.sleep( 0)
print( 'Quitting time!')
 
def boss():
for i in xrange(1,10):
tasks.put_nowait(i)
 
gevent.spawn(boss).join()
 
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])

 

執行結果:

1
2
3
4
5
6
7
8
9
10
11
12
Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker john got task 5
Worker nancy got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Quitting time!
Quitting time!
Quitting time!

 

put和get操做都是阻塞的,put_nowait和get_nowait不會阻塞, 然而在操做不能完成時拋出gevent.queue.Empty或gevent.queue.Full異常。

組和池

組(group)是一個運行中greenlet集合,集合中的greenlet像一個組同樣會被共同管理和調度。 它也兼飾了像Python的multiprocessing庫那樣的平行調度器的角色,主要用在在管理異步任務的時候進行分組。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import gevent
from gevent.pool import Group
 
def talk(msg):
for i in xrange(2):
print(msg)
 
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
 
group = Group()
group.add(g1)
group.add(g2)
group.join()
 
group.add(g3)
group.join()

 

執行結果:

1
2
3
4
5
6
bar
bar
foo
foo
fizz
fizz

 

池(pool)是一個爲處理數量變化而且須要限制併發的greenlet而設計的結構。

1
2
3
4
5
6
7
8
9
import gevent
from gevent.pool import Pool
 
pool = Pool( 2)
 
def hello_from(n):
print( 'Size of pool %s' % len(pool))
 
pool.map(hello_from, xrange( 3))

 

執行結果:

1
2
3
Size of pool 2
Size of pool 2
Size of pool 1

 

構造一個socket池的類,在各個socket上輪詢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from gevent.pool import Pool
 
class SocketPool(object):
 
def __init__(self):
self.pool = Pool( 10)
self.pool.start()
 
def listen(self, socket):
while True:
socket.recv()
 
def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)
 
def shutdown(self):
self.pool.kill()

 

鎖和信號量

信號量是一個容許greenlet相互合做,限制併發訪問或運行的低層次的同步原語。 信號量有兩個方法,acquire和release。在信號量是否已經被 acquire或release,和擁有資源的數量之間不一樣,被稱爲此信號量的範圍 (the bound of the semaphore)。若是一個信號量的範圍已經下降到0,它會 阻塞acquire操做直到另外一個已經得到信號量的greenlet做出釋放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore
 
sem = BoundedSemaphore( 2)
 
def worker1(n):
sem.acquire()
print( 'Worker %i acquired semaphore' % n)
sleep( 0)
sem.release()
print( 'Worker %i released semaphore' % n)
 
def worker2(n):
with sem:
print( 'Worker %i acquired semaphore' % n)
sleep( 0)
print( 'Worker %i released semaphore' % n)
 
pool = Pool()
pool.map(worker1, xrange( 0,2))

 

執行結果:

1
2
3
4
Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore

 

鎖(lock)是範圍爲1的信號量。它向單個greenlet提供了互斥訪問。 信號量和鎖常被用來保證資源只在程序上下文被單次使用。

線程局部變量

Gevent容許程序員指定局部於greenlet上下文的數據。 在內部,它被實現爲以greenlet的getcurrent()爲鍵, 在一個私有命名空間尋址的全局查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent
from gevent.local import local
 
stash = local()
 
def f1():
stash.x = 1
print(stash.x)
 
def f2():
stash.y = 2
print(stash.y)
 
try:
stash.x
except AttributeError:
print( "x is not local to f2")
 
g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)
 
gevent.joinall([g1, g2])

 

執行結果:

1
2
3
1
2
x is not local to f2

 

不少集成了gevent的web框架將HTTP會話對象以線程局部變量的方式存儲在gevent內。 例如使用Werkzeug實用庫和它的proxy對象,咱們能夠建立Flask風格的請求對象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager
 
from gevent.wsgi import WSGIServer
 
_requests = local()
request = LocalProxy( lambda: _requests.request)
 
@contextmanager
def sessionmanager(environ):
_requests.request = Request(environ)
yield
_requests.request = None
 
def logic():
return "Hello " + request.remote_addr
 
def application(environ, start_response):
status = '200 OK'
 
with sessionmanager(environ):
body = logic()
 
headers = [
( 'Content-Type', 'text/html')
]
 
start_response(status, headers)
return [body]
WSGIServer(( '', 8000), application).serve_forever()

 

子進程

從gevent 1.0起,支持gevent.subprocess,支持協做式的等待子進程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import gevent
from gevent.subprocess import Popen, PIPE
 
def cron():
while True:
print( "cron")
gevent.sleep( 0.2)
 
g = gevent.spawn(cron)
sub = Popen([ 'sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())
```
執行結果:

 

cron
cron
cron
cron
cron
Linux
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
不少人也想將gevent和multiprocessing一塊兒使用。最明顯的挑戰之一 就是multiprocessing提供的進程間通訊默認不是協做式的。因爲基於 multiprocessing.Connection的對象(例如Pipe)暴露了它們下面的 文件描述符(file descriptor),gevent.socket.wait_read和wait_write 能夠用來在直接讀寫以前協做式的等待ready-to-read/ready-to-write事件。
 
```python
import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write
 
# To Process
a, b = Pipe()
 
# From Process
c, d = Pipe()
 
def relay():
for i in xrange(5):
msg = b.recv()
c.send(msg + " in " + str(i))
 
def put_msg():
for i in xrange(5):
wait_write(a.fileno())
a.send('hi')
 
def get_msg():
for i in xrange(5):
wait_read(d.fileno())
print(d.recv())
 
if __name__ == '__main__':
proc = Process(target=relay)
proc.start()
 
g1 = gevent.spawn(get_msg)
g2 = gevent.spawn(put_msg)
gevent.joinall([g1, g2], timeout=1)
```
執行結果:
```
hi in 0
hi in 1
hi in 2
hi in 3
hi in 4

然而要注意,組合multiprocessing和gevent一定帶來 依賴於操做系統(os-dependent)的缺陷,其中有:

在兼容POSIX的系統建立子進程(forking)以後, 在子進程的gevent的狀態是不適定的(ill-posed)。一個反作用就是, multiprocessing.Process建立以前的greenlet建立動做,會在父進程和子進程兩方都運行。

上例的put_msg()中的a.send()可能依然非協做式地阻塞調用的線程:一個 ready-to-write事件只保證寫了一個byte。在嘗試寫完成以前底下的buffer多是滿的。

上面表示的基於wait_write()/wait_read()的方法在Windows上不工做 (IOError: 3 is not a socket (files are not supported)),由於Windows不能監視 pipe事件。

Python包gipc以大致上透明的方式在 兼容POSIX系統和Windows上克服了這些挑戰。它提供了gevent感知的基於 multiprocessing.Process的子進程和gevent基於pipe的協做式進程間通訊。

Actors

actor模型是一個因爲Erlang變得普及的更高層的併發模型。 簡單的說它的主要思想就是許多個獨立的Actor,每一個Actor有一個能夠從 其它Actor接收消息的收件箱。Actor內部的主循環遍歷它收到的消息,並根據它指望的行爲來採起行動。

Gevent沒有原生的Actor類型,但在一個子類化的Greenlet內使用隊列, 咱們能夠定義一個很是簡單的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import gevent
from gevent.queue import Queue
 
class Actor(gevent.Greenlet):
 
def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)
 
def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()
 
def _run(self):
self.running = True
 
while self.running:
message = self.inbox.get()
self.receive(message)

 

下面是一個使用的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import gevent
from gevent.queue import Queue
from gevent import Greenlet
 
class Pinger(Actor):
def receive(self, message):
print(message)
pong.inbox.put( 'ping')
gevent.sleep( 0)
 
class Ponger(Actor):
def receive(self, message):
print(message)
ping.inbox.put( 'pong')
gevent.sleep( 0)
 
ping = Pinger()
pong = Ponger()
 
ping.start()
pong.start()
 
ping.inbox.put( 'start')
gevent.joinall([ping, pong])

 

實際應用

  • Gevent ZeroMQ
  • 簡單server
  • WSGI Servers
  • 流式server
  • Long Polling
  • Websockets

簡單server

1
2
3
4
5
6
7
8
9
10
11
12
13
# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``
 
from gevent.server import StreamServer
 
def handle(socket, address):
socket.send( "Hello from a telnet!\n")
for i in range(5):
socket.send(str(i) + '\n')
socket.close()
 
server = StreamServer(( '127.0.0.1', 5000), handle)
server.serve_forever()

WSGI Servers And Websockets

Gevent爲HTTP內容服務提供了兩種WSGI server。從今之後就稱爲 wsgi和pywsgi:

  • gevent.wsgi.WSGIServer
  • gevent.pywsgi.WSGIServer

glb中使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import click
from flask import Flask
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
 
import v1
from .settings import Config
from .sockethandler import handle_websocket
 
 
def create_app(config=None):
app = Flask(__name__, static_folder= 'static')
if config:
app.config.update(config)
else:
app.config.from_object(Config)
 
app.register_blueprint(
v1.bp,
url_prefix= '/v1')
return app
 
 
def wsgi_app(environ, start_response):
path = environ[ 'PATH_INFO']
if path == '/websocket':
handle_websocket(environ[ 'wsgi.websocket'])
else:
return create_app()(environ, start_response)
 
 
@click.command()
@click.option('-h', '--host_port', type=(unicode, int),
default=( '0.0.0.0', 5000), help='Host and port of server.')
@click.option('-r', '--redis', type=(unicode, int, int),
default=( '127.0.0.1', 6379, 0),
help= 'Redis url of server.')
@click.option('-p', '--port_range', type=(int, int),
default=( 50000, 61000),
help= 'Port range to be assigned.')
def manage(host_port, redis=None, port_range=None):
Config.REDIS_URL = 'redis://%s:%s/%s' % redis
Config.PORT_RANGE = port_range
http_server = WSGIServer(host_port,
wsgi_app, handler_class=WebSocketHandler)
print '----GLB Server run at %s:%s-----' % host_port
print '----Redis Server run at %s:%s:%s-----' % redis
http_server.serve_forever()

 

缺陷

和其餘異步I/O框架同樣,gevent也有一些缺陷:

  • 阻塞(真正的阻塞,在內核級別)在程序中的某個地方中止了全部的東西.這很像C代碼中monkey patch沒有生效
  • 保持CPU處於繁忙狀態.greenlet不是搶佔式的,這可能致使其餘greenlet不會被調度.
  • 在greenlet之間存在死鎖的可能.

一個gevent迴避的缺陷是,你幾乎不會碰到一個和異步無關的Python庫–它將阻塞你的應用程序,由於純Python庫使用的是monkey patch的stdlib.

相關文章
相關標籤/搜索