併發編程(不是並行)目前有四種方式:多進程、多線程、協程和異步。html
協程 > 多進程 >多線程 使用gevent,能夠得到極高的併發性能,但gevent只能在Unix/Linux下運行,在Windows下不保證正常安裝和運行
既然有了線程爲何還要協程呢?由於線程是系統級別的,在作切換的時候消耗是特別大的,具體爲何這麼大等我研究好了再告訴你;同時線程的切換是由CPU決定的,可能你恰好執行到一個地方的時候就要被迫終止,這個時候你須要用各類措施來保證你的數據不出錯,因此線程對於數據安全的操做是比較複雜的。而協程是用戶級別的切換,且切換是由本身控制,不受外力終止.python
協程,又稱微線程,纖程。 Python的線程並非標準線程,是系統級進程,線程間上下文切換有開銷,並且Python在執行多線程時默認加了一個全局解釋器鎖(GIL),所以Python的多線程實際上是串行的,因此並不能利用多核的優點,也就是說一個進程內的多個線程只能使用一個CPU。linux
def coroutine(func): def ret(): f = func() f.next() return f return ret @coroutine def consumer(): print "Wait to getting a task" while True: n = (yield) print "Got %s",n import time def producer(): c = consumer() task_id = 0 while True: time.sleep(1) print "Send a task to consumer" % task_id c.send("task %s" % task_id) if __name__ == "__main__": producer() 運行結果: Wait to getting a task Send a task 0 to consumer Got task 0 Send a task 1 to consumer Got task 1 Send a task 2 to consumer Got task 2
傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但容易死鎖。 若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。git
gevent是基於協程的Python網絡庫。特色:程序員
gevent中的主要模式, 它是以C擴展模塊形式接入Python的輕量級協程。 所有運行在主程序操做系統進程的內部,但它們被程序員協做式地調度。github
在任什麼時候刻,只有一個協程在運行。
區別於multiprocessing、threading等提供真正並行構造的庫, 這些庫輪轉使用操做系統調度的進程和線程,是真正的並行。web
併發的核心思想在於,大的任務能夠分解成一系列的子任務,後者能夠被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。redis
在gevent裏面,上下文切換是經過yielding來完成的.shell
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ]) Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
網絡延遲或IO阻塞隱式交出greenlet上下文的執行權.編程
import time import gevent from gevent import select start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1(): print('Started Polling: %s' % tic()) select.select([], [], [], 1) print('Ended Polling: %s' % tic()) def gr2(): print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic()) def gr3(): print("Hey lets do some stuff while the greenlets poll, %s" % tic()) gevent.sleep(1) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ]) 運行結果: Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 1.0 seconds Ended Polling: at 2.0 seconds
import gevent import random def task(pid): gevent.sleep(random.randint(0,2)*0.001) print('Task %s done' % pid) def synchronous(): for i in xrange(5): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in xrange(5)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() 運行結果: Synchronous: Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Asynchronous: Task 2 done Task 0 done Task 1 done Task 3 done Task 4 done
greenlet具備肯定性。在相同配置相同輸入的狀況下,它們老是會產生相同的輸出
import time def echo(i): time.sleep(0.001) return i # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4) # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4) 運行結果: False True
即便gevent一般帶有肯定性,當開始與如socket或文件等外部服務交互時, 不肯定性也可能溜進你的程序中。所以儘管gevent線程是一種「肯定的併發」形式, 使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。
涉及併發長期存在的問題就是競爭條件(race condition)(當兩個併發線程/進程都依賴於某個共享資源同時都嘗試去修改它的時候, 就會出現競爭條件),這會致使資源修改的結果狀態依賴於時間和執行順序。 這個問題,會致使整個程序行爲變得不肯定。
解決辦法: 始終避免全部全局的狀態.
gevent對Greenlet初始化提供了一些封裝.
import gevent from gevent import Greenlet def foo(message, n): gevent.sleep(n) print(message) thread1 = Greenlet.spawn(foo, "Hello", 1) thread2 = gevent.spawn(foo, "I live!", 2) thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] gevent.joinall(threads) 執行結果: Hello I live!
除使用基本的Greenlet類以外,你也能夠子類化Greenlet類,重載它的_run方法.
import gevent from gevent import Greenlet class MyGreenlet(Greenlet): def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join() 執行結果: Hi there!
greenlet的狀態一般是一個依賴於時間的參數:
當主程序(main program)收到一個SIGQUIT信號時,不能成功作yield操做的 Greenlet可能會令意外地掛起程序的執行。這致使了所謂的殭屍進程, 它須要在Python解釋器以外被kill掉
通用的處理模式就是在主程序中監聽SIGQUIT信號,調用gevent.shutdown退出程序
import gevent import signal def run_forever(): gevent.sleep(1000) if __name__ == '__main__': gevent.signal(signal.SIGQUIT, gevent.shutdown) thread = gevent.spawn(run_forever) thread.join()
經過超時能夠對代碼塊兒或一個Greenlet的運行時間進行約束
import gevent from gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print('Could not complete')
超時類
import gevent from gevent import Timeout time_to_wait = 5 # seconds class TooLong(Exception): pass with Timeout(time_to_wait, TooLong): gevent.sleep(10)
另外,對各類Greenlet和數據結構相關的調用,gevent也提供了超時參數
import gevent from gevent import Timeout def wait(): gevent.sleep(2) timer = Timeout(1).start() thread1 = gevent.spawn(wait) try: thread1.join(timeout=timer) except Timeout: print('Thread 1 timed out') # -- timer = Timeout.start_new(1) thread2 = gevent.spawn(wait) try: thread2.get(timeout=timer) except Timeout: print('Thread 2 timed out') # -- try: gevent.with_timeout(1, wait) except Timeout: print('Thread 3 timed out') 運行結果: Thread 1 timed out Thread 2 timed out Thread 3 timed out
gevent的死角.
import socket print(socket.socket) print("After monkey patch") from gevent import monkey monkey.patch_socket() print(socket.socket) import select print(select.select) monkey.patch_select() print("After monkey patch") print(select.select) 運行結果: class 'socket.socket' After monkey patch class 'gevent.socket.socket' built-in function select After monkey patch function select at 0x1924de8
Python的運行環境容許咱們在運行時修改大部分的對象,包括模塊,類甚至函數。 這是個通常說來使人驚奇的壞主意,由於它創造了「隱式的反作用」,若是出現問題 它不少時候是極難調試的。雖然如此,在極端狀況下當一個庫須要修改Python自己 的基礎行爲的時候,猴子補丁就派上用場了。在這種狀況下,gevent可以修改標準庫裏面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變爲協做式運行。
例如,Redis的python綁定通常使用常規的tcp socket來與redis-server實例通訊。 經過簡單地調用gevent.monkey.patch_all(),可使得redis的綁定協做式的調度 請求,與gevent棧的其它部分一塊兒工做。
這讓咱們能夠將通常不能與gevent共同工做的庫結合起來,而不用寫哪怕一行代碼。 雖然猴子補丁仍然是邪惡的(evil),但在這種狀況下它是「有用的邪惡(useful evil)」
事件(event)是一個在Greenlet之間異步通訊的形式
import gevent from gevent.event import Event evt = Event() def setter(): print('A: Hey wait for me, I have to do something') gevent.sleep(3) print("Ok, I'm done") evt.set() def waiter(): print("I'll wait for you") evt.wait() # blocking print("It's about time") def main(): gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter) ]) if __name__ == '__main__': main() 運行結果: A: Hey wait for me, I have to do something I'll wait for you I'll wait for you I'll wait for you Ok, I'm done It's about time It's about time It's about time
事件對象的一個擴展是AsyncResult,它容許你在喚醒調用上附加一個值。 它有時也被稱做是future或defered,由於它持有一個指向未來任意時間可設置爲任何值的引用
import gevent from gevent.event import AsyncResult a = AsyncResult() def setter(): gevent.sleep(3) a.set('Hello!') def waiter(): print(a.get()) gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
隊列是一個排序的數據集合,它有常見的put / get操做, 可是它是以在Greenlet之間能夠安全操做的方式來實現的
import gevent from gevent.queue import Queue tasks = Queue() def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0) print('Quitting time!') def boss(): for i in xrange(1,10): tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ]) 執行結果: Worker steve got task 1 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker john got task 5 Worker nancy got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Quitting time! Quitting time! Quitting time!
put和get操做都是阻塞的,put_nowait和get_nowait不會阻塞, 然而在操做不能完成時拋出gevent.queue.Empty或gevent.queue.Full異常
組(group)是一個運行中greenlet集合,集合中的greenlet像一個組同樣會被共同管理和調度。 它也兼飾了像Python的multiprocessing庫那樣的平行調度器的角色,主要用在在管理異步任務的時候進行分組.
import gevent from gevent.pool import Group def talk(msg): for i in xrange(2): print(msg) g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz') group = Group() group.add(g1) group.add(g2) group.join() group.add(g3) group.join() 運行結果: bar bar foo foo fizz fizz
池(pool)是一個爲處理數量變化而且須要限制併發的greenlet而設計的結構 不使用Pool
from gevent import monkey monkey.patch_all() import gevent import requests urls = ["https://www.python.org/", "https://www.yahoo.com/", "https://github.com/"] def get(url): print(requests.get(url).url) ts = [gevent.spawn(get, url) for url in urls] gevent.joinall(ts)
使用Pool
from gevent import monkey monkey.patch_all() import requests from gevent.pool import Pool urls = ["https://www.python.org/", "https://www.yahoo.com/", "https://github.com/"] def get(url): print(requests.get(url).url) p = Pool(3) p.map(get, urls)
構造一個socket池的類,在各個socket上輪詢
from gevent.pool import Pool class SocketPool(object): def __init__(self): self.pool = Pool(10) self.pool.start() def listen(self, socket): while True: socket.recv() def add_handler(self, socket): if self.pool.full(): raise Exception("At maximum pool size") else: self.pool.spawn(self.listen, socket) def shutdown(self): self.pool.kill()
信號量是一個容許greenlet相互合做,限制併發訪問或運行的低層次的同步原語。 信號量有兩個方法,acquire和release。在信號量是否已經被 acquire或release,和擁有資源的數量之間不一樣,被稱爲此信號量的範圍 (the bound of the semaphore)。若是一個信號量的範圍已經下降到0,它會 阻塞acquire操做直到另外一個已經得到信號量的greenlet做出釋放
from gevent import sleep from gevent.pool import Pool from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) def worker1(n): sem.acquire() print('Worker %i acquired semaphore' % n) sleep(0) sem.release() print('Worker %i released semaphore' % n) def worker2(n): with sem: print('Worker %i acquired semaphore' % n) sleep(0) print('Worker %i released semaphore' % n) pool = Pool() pool.map(worker1, xrange(0,2)) 運行結果: Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore
鎖(lock)是範圍爲1的信號量。它向單個greenlet提供了互斥訪問。 信號量和鎖常被用來保證資源只在程序上下文被單次使用
Gevent容許程序員指定局部於greenlet上下文的數據。 在內部,它被實現爲以greenlet的getcurrent()爲鍵, 在一個私有命名空間尋址的全局查找
import gevent from gevent.local import local stash = local() def f1(): stash.x = 1 print(stash.x) def f2(): stash.y = 2 print(stash.y) try: stash.x except AttributeError: print("x is not local to f2") g1 = gevent.spawn(f1) g2 = gevent.spawn(f2) gevent.joinall([g1, g2]) 運行結果: 1 2 x is not local to f2
不少集成了gevent的web框架將HTTP會話對象以線程局部變量的方式存儲在gevent內。 例如使用Werkzeug實用庫和它的proxy對象,咱們能夠建立Flask風格的請求對象
from gevent.local import local from werkzeug.local import LocalProxy from werkzeug.wrappers import Request from contextlib import contextmanager from gevent.wsgi import WSGIServer _requests = local() request = LocalProxy(lambda: _requests.request) @contextmanager def sessionmanager(environ): _requests.request = Request(environ) yield _requests.request = None def logic(): return "Hello " + request.remote_addr def application(environ, start_response): status = '200 OK' with sessionmanager(environ): body = logic() headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body] WSGIServer(('', 8000), application).serve_forever()
從gevent 1.0起,支持gevent.subprocess,支持協做式的等待子進程
import gevent from gevent.subprocess import Popen, PIPE def cron(): while True: print("cron") gevent.sleep(0.2) g = gevent.spawn(cron) sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print(out.rstrip()) 運行結果: cron cron cron cron cron Linux
不少人也想將gevent和multiprocessing一塊兒使用。最明顯的挑戰之一 就是multiprocessing提供
import gevent from multiprocessing import Process, Pipe from gevent.socket import wait_read, wait_write # To Process a, b = Pipe() # From Process c, d = Pipe() def relay(): for i in xrange(5): msg = b.recv() c.send(msg + " in " + str(i)) def put_msg(): for i in xrange(5): wait_write(a.fileno()) a.send('hi') def get_msg(): for i in xrange(5): wait_read(d.fileno()) print(d.recv()) if __name__ == '__main__': proc = Process(target=relay) proc.start() g1 = gevent.spawn(get_msg) g2 = gevent.spawn(put_msg) gevent.joinall([g1, g2], timeout=1) 執行結果: hi in 0 hi in 1 hi in 2 hi in 3 hi in 4
然而要注意,組合multiprocessing和gevent一定帶來 依賴於操做系統(os-dependent)的缺陷,其中有:
在兼容POSIX的系統建立子進程(forking)以後, 在子進程的gevent的狀態是不適定的(ill-posed)。一個反作用就是, multiprocessing.Process建立以前的greenlet建立動做,會在父進程和子進程兩方都運行。
上例的put_msg()中的a.send()可能依然非協做式地阻塞調用的線程:一個 ready-to-write事件只保證寫了一個byte。在嘗試寫完成以前底下的buffer多是滿的。
上面表示的基於wait_write()/wait_read()的方法在Windows上不工做 (IOError: 3 is not a socket (files are not supported)),由於Windows不能監視 pipe事件。
Python包gipc以大致上透明的方式在 兼容POSIX系統和Windows上克服了這些挑戰。它提供了gevent感知的基於 multiprocessing.Process的子進程和gevent基於pipe的協做式進程間通訊
actor模型是一個因爲Erlang變得普及的更高層的併發模型。 簡單的說它的主要思想就是許多個獨立的Actor,每一個Actor有一個能夠從 其它Actor接收消息的收件箱。Actor內部的主循環遍歷它收到的消息,並根據它指望的行爲來採起行動。
Gevent沒有原生的Actor類型,但在一個子類化的Greenlet內使用隊列, 咱們能夠定義一個很是簡單的
import gevent from gevent.queue import Queue class Actor(gevent.Greenlet): def __init__(self): self.inbox = Queue() Greenlet.__init__(self) def receive(self, message): """ Define in your subclass. """ raise NotImplemented() def _run(self): self.running = True while self.running: message = self.inbox.get() self.receive(message)
下面是一個使用的例子:
import gevent from gevent.queue import Queue from gevent import Greenlet class Pinger(Actor): def receive(self, message): print(message) pong.inbox.put('ping') gevent.sleep(0) class Ponger(Actor): def receive(self, message): print(message) ping.inbox.put('pong') gevent.sleep(0) ping = Pinger() pong = Ponger() ping.start() pong.start() ping.inbox.put('start') gevent.joinall([ping, pong])
# On Unix: Access with ``$ nc 127.0.0.1 5000`` # On Window: Access with ``$ telnet 127.0.0.1 5000`` from gevent.server import StreamServer def handle(socket, address): socket.send("Hello from a telnet!\n") for i in range(5): socket.send(str(i) + '\n') socket.close() server = StreamServer(('127.0.0.1', 5000), handle) server.serve_forever()
Gevent爲HTTP內容服務提供了兩種WSGI server。從今之後就稱爲 wsgi和pywsgi
glb中使用
import click from flask import Flask from gevent.pywsgi import WSGIServer from geventwebsocket.handler import WebSocketHandler import v1 from .settings import Config from .sockethandler import handle_websocket def create_app(config=None): app = Flask(__name__, static_folder='static') if config: app.config.update(config) else: app.config.from_object(Config) app.register_blueprint( v1.bp, url_prefix='/v1') return app def wsgi_app(environ, start_response): path = environ['PATH_INFO'] if path == '/websocket': handle_websocket(environ['wsgi.websocket']) else: return create_app()(environ, start_response) @click.command() @click.option('-h', '--host_port', type=(unicode, int), default=('0.0.0.0', 5000), help='Host and port of server.') @click.option('-r', '--redis', type=(unicode, int, int), default=('127.0.0.1', 6379, 0), help='Redis url of server.') @click.option('-p', '--port_range', type=(int, int), default=(50000, 61000), help='Port range to be assigned.') def manage(host_port, redis=None, port_range=None): Config.REDIS_URL = 'redis://%s:%s/%s' % redis Config.PORT_RANGE = port_range http_server = WSGIServer(host_port, wsgi_app, handler_class=WebSocketHandler) print '----GLB Server run at %s:%s-----' % host_port print '----Redis Server run at %s:%s:%s-----' % redis http_server.serve_forever()
和其餘異步I/O框架同樣,gevent也有一些缺陷:
一個gevent迴避的缺陷是,你幾乎不會碰到一個和異步無關的Python庫–它將阻塞你的應用程序,由於純Python庫使用的是monkey patch的stdlib