Python gevent學習筆記 2

    在上一篇裏瞭解了gevent應用的IO模型概念以後,接下來開始真正瞭解gevent的使用。 python

Greenlet

    在gevent裏面最多應用到的就是greenlet,一個輕量級的協程實現。在任什麼時候間點,只有一個greenlet處於運行狀態。Greenlet與multiprocessing 和 threading這兩個庫提供的真正的並行結構的區別在於這兩個庫會真正的切換進程,POSIX線程是由操做系統來負責調度,而且它們是真正並行的。 shell

同步和異步

    應對併發的主要思路就是將一個大的任務分解成一個子任務的集合而且可以讓它並行或者異步地執行,而不是一次執行一個或者同步執行。在兩個子任務中的切換被稱爲上下文切換。 express

    gevent裏面的上下文切換是很是平滑的。在下面的例子程序中,咱們能夠看到兩個上下文經過調用 gevent.sleep()來互相切換。 編程

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
    這段程序的執行結果以下:
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
    從這個執行結果能夠看出這個程序的執行過程,在這裏的兩個函數是交替執行的。

    gevent的真正威力是在處理網絡和帶有IO阻塞的功能時可以這些任務協調地運行。gevent來實現了這些具體的細節來保證在須要的時候greenlet上下文進行切換。在這裏用一個例子來講明。 json

import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: ', tic())
    select.select([], [], [], 2)
    print('Ended Polling: ', tic())

def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: ', tic())
    select.select([], [], [], 2)
    print('Ended Polling: ', tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, at", tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

在上面的例子裏,select() 一般是一個阻塞的調用。 服務器

程序的執行結果以下:
Started Polling:  at 0.0 seconds
Started Polling:  at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at at 0.0 seconds
Ended Polling:  at 2.0 seconds
Ended Polling:  at 2.0 seconds

接下來一個例子中能夠看到gevent是安排各個任務的執行的。 網絡

import gevent
import random

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(random.randint(0,2)*0.001)
    print('Task', pid, 'done')

def synchronous():
    for i in range(1,10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()
執行結果以下:
root@master:~# python two.py 
Synchronous:
('Task', 1, 'done')
('Task', 2, 'done')
('Task', 3, 'done')
('Task', 4, 'done')
('Task', 5, 'done')
('Task', 6, 'done')
('Task', 7, 'done')
('Task', 8, 'done')
('Task', 9, 'done')
Asynchronous:
('Task', 0, 'done')
('Task', 9, 'done')
('Task', 7, 'done')
('Task', 3, 'done')
('Task', 6, 'done')
('Task', 5, 'done')
('Task', 4, 'done')
('Task', 1, 'done')
('Task', 2, 'done')
('Task', 8, 'done')
在同步的狀況下,任務是按順序執行的,在執行各個任務的時候會阻塞主線程。

而gevent.spawn 的重要功能就是封裝了greenlet裏面的函數。初始化的greenlet放在了threads這個list裏面,被傳遞給了 gevent.joinall 這個函數,它會阻塞當前的程序來執行全部的greenlet。 併發

在異步執行的狀況下,全部任務的執行順序是徹底隨機的。每個greenlet的都不會阻塞其餘greenlet的執行。 app

在有時候須要異步地從服務器獲取數據,gevent能夠經過判斷從服務器的數據載入狀況來處理請求。 dom

import gevent.monkey
gevent.monkey.patch_socket()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
    response = urllib2.urlopen('http://json-time.appspot.com/time.json')
    result = response.read()
    json_result = json.loads(result)
    datetime = json_result['datetime']

    print 'Process ', pid, datetime
    return json_result['datetime']

def synchronous():
    for i in range(1,10):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,10):
        threads.append(gevent.spawn(fetch, i))
    gevent.joinall(threads)

print 'Synchronous:'
synchronous()

print 'Asynchronous:'
asynchronous()

肯定性

就像以前說的,greenlet是肯定的。給每一個greenlet相同的配置和相同的輸入,獲得的輸出是相同的。咱們能夠用python 的多進程池和gevent池來做比較。下面的例子能夠說明這個特色:

import time

def echo(i):
    time.sleep(0.001)
    return i

# Non Deterministic Process Pool

from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print( run1 == run2 == run3 == run4 )

# Deterministic Gevent Pool

from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print( run1 == run2 == run3 == run4 )
下面是執行結果:
False
True
 從上面的例子能夠看出,執行同一個函數,產生的greenlet是相同的,而產生的process是不一樣的。

 在處理併發編程的時候會碰到一些問題,好比競爭資源的問題。最簡單的狀況,當有兩個線程或進程訪問同一資源而且修改這個資源的時候,就會引起資源競爭的問題。那麼這個資源最終的值就會取決於那個線程或進程是最後執行的。這是個問題,總之,在處理全局的程序不肯定行爲的時候,須要儘可能避免資源競爭的問題

 最好的方法就是在任什麼時候候儘可能避免使用全局的狀態。全局狀態是常常會坑你的!

產生Greenlet

在gevent裏面封裝了一些初始化greenlet的方法,下面是幾個最經常使用的例子:

import gevent
from gevent import Greenlet

def foo(message, n):
    """
    Each thread will be passed the message, and n arguments
    in its initialization.
    """
    gevent.sleep(n)
    print(message)

# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)

# Wrapper for creating and runing a new Greenlet from the named 
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)

# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

# Block until all threads complete.
gevent.joinall(threads)
在上面的程序裏使用 spawn 方法來產生greenlet。還有一種初始化greenlet的方法,就是建立Greenlet的子類,而且重寫 _run 方法。
import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

Greenlet 的狀態

就像其餘的代碼同樣,greenlet在執行的時候也會出錯。Greenlet有可能會沒法拋出異常,中止失敗,或者消耗了太多的系統資源。

greenlet的內部狀態一般是一個依賴時間的參數。greenlet有一些標記來讓你可以監控greenlet的狀態。

  • started -- 標誌greenlet是否已經啓動
  • ready -- 標誌greenlet是否已經被終止
  • successful() -- 標誌greenlet是否已經被終止,而且沒有拋出異常
  • value -- 由greenlet返回的值
  • exception -- 在greenlet裏面沒有被捕獲的異常
import gevent

def win():
    return 'You win!'

def fail():
    raise Exception('You fail at failing.')

winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started)  # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
    gevent.joinall([winner, loser])
except Exception as e:
    print('This will never be reached')

print(winner.value) # 'You win!'
print(loser.value)  # None

print(winner.ready()) # True
print(loser.ready())  # True

print(winner.successful()) # True
print(loser.successful())  # False

# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()
 這段代碼的執行結果以下:
True
True
You win!
None
True
True
True
False
You fail at failing.

終止程序

在主程序收到一個SIGQUIT 以後會阻塞程序的執行讓Greenlet沒法繼續執行。這會致使殭屍進程的產生,須要在操做系統中將這些殭屍進程清除掉。

import gevent
import signal

def run_forever():
    gevent.sleep(1000)

if __name__ == '__main__':
    gevent.signal(signal.SIGQUIT, gevent.shutdown)
    thread = gevent.spawn(run_forever)
    thread.join()

超時

gevent提供了對與代碼運行時的時間限制功能,也就是超時功能。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print 'Could not complete'
也能夠經過用with 上下文的方法來實現超時的功能:
import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)
gevent還提供了一些超時的參數以應對不一樣的情況:
import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Thread 2 timed out')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Thread 3 timed out')
運行結果以下:
Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

Monkeypatching

如今這是gevent裏面的一個難點。下面一個例子裏可能看到 monkey.patch_socket() 可以在運行時裏面修改基礎庫socket:

import socket
print( socket.socket )

print "After monkey patch"
from gevent import monkey
monkey.patch_socket()
print( socket.socket )

import select
print select.select
monkey.patch_select()
print "After monkey patch"
print( select.select )
 運行結果以下:
class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'

built-in function select
After monkey patch
function select at 0x1924de8
 Python的運行時裏面容許可以大部分的對象都是能夠修改的,包括模塊,類和方法。這一般是一個壞主意,然而在極端的狀況下,當有一個庫須要加入一些Python基本的功能的時候,monkey patch就能派上用場了。在上面的例子裏,gevent可以改變基礎庫裏的一些使用IO阻塞模型的庫好比socket,ssl,threading等等而且把它們改爲協程的執行方式。
相關文章
相關標籤/搜索