協程,英文名Coroutine。
前面介紹Python的多線程,以及用多線程實現併發(參見這篇文章【淺析Python多線程】),今天介紹的協程也是經常使用的併發手段。本篇主要內容包含:協程的基本概念、協程庫的實現原理以及Python中常見的協程庫。html
咱們知道線程的調度(線程上下文切換)是由操做系統決定的,當一個線程啓動後,何時佔用CPU、何時讓出CPU,程序員都沒法干涉。假設如今啓動4個線程,CPU線程時間片爲 5 毫秒,也就是說,每一個線程每隔5ms就讓出CPU,讓其餘線程搶佔CPU。可想而知,等4個線程運行結束,要進行多少次切換?python
若是咱們可以自行調度本身寫的程序,讓一些代碼塊遇到IO操做時,切換去執行另一些須要CPU操做的代碼塊,是否是節約了不少無畏的上下文切換呢?是的,協程就是針對這一狀況而生的。咱們把寫好的一個應用程序分爲不少個代碼塊,以下圖所示:程序員
把應用程序的代碼分爲多個代碼塊,正常狀況代碼自上而下順序執行。若是代碼塊A運行過程當中,可以切換執行代碼塊B,又可以從代碼塊B再切換回去繼續執行代碼塊A,這就實現了協程(一般是遇到IO操做時切換纔有意義)。示意圖以下:編程
因此,關於協程能夠總結如下兩點:多線程
(1)線程的調度是由操做系統負責,協程調度是程序自行負責。併發
(2)與線程相比,協程減小了無畏的操做系統切換。app
實際上當遇到IO操做時作切換才更有意義,(由於IO操做不用佔用CPU),若是沒遇到IO操做,按照時間片切換,無心義。socket
舉個例子,你在作一頓飯你要蒸飯和炒菜:最笨的方法是先蒸飯,飯蒸好了再去炒菜。這樣一頓飯得花很多時間,就跟咱們沒采用併發編程同樣。async
多線程至關於,你5分鐘在作蒸飯的工做,到了5分鐘開始炒菜,又過了5分鐘,你又去忙蒸飯。分佈式
協程至關於,你淘完米,放在電飯鍋,按下煮飯鍵以後,你開始去炒菜。炒菜的時候油沒熱,你能夠調佐料。這樣,你炒兩個菜出來,飯蒸好了。整個過程你沒閒着,可是節約了很多時間。
如1中所述,代碼塊A可以中斷去執行代碼塊B,代碼塊B可以中斷,執行代碼塊A。這不是和yield功能一模一樣嗎?咱們先回憶一下yield的功能:
(1) 在函數中,語句執行到yield,會返回yield 後面的內容;當再回來執行時,從yield的下一句開始執行;
(2) 使用yield語法的函數是一個生成器;
(3) python3中,經過 .__next__() 或者 next() 方法獲取生成器的下一個值。
來看一個yield實現協程的例子:
from collections import deque def sayHello(n): while n > 0: print("hello~", n) yield n n -= 1 print('say hello') def sayHi(n): x = 0 while x < n: print('hi~', x) yield x += 1 print("say hi") # 使用yield語句,實現簡單任務調度器 class TaskScheduler(object): def __init__(self): self._task_queue = deque() def new_task(self, task): ''' 向調度隊列添加新的任務 ''' self._task_queue.append(task) def run(self): ''' 不斷運行,直到隊列中沒有任務 ''' while self._task_queue: task = self._task_queue.popleft() try: next(task) self._task_queue.append(task) except StopIteration: # 生成器結束 pass sched = TaskScheduler() sched.new_task(sayHello(10)) sched.new_task(sayHi(15)) sched.run()
上例執行時,你會看到sayHello()和sayHi() 不斷交替執行,當執行sayHello()時,在yield處中斷,當執行sayHi()時從yield處中斷,切換回sayHello()從yield以後的一句開始執行。。。如此來回交替無縫鏈接。
actor模式是一種最古老的也是最簡單的並行和分佈式計算解決方案。下面咱們經過yield來實現:
from collections import deque class ActorScheduler: def __init__(self): self._actors = {} self._msg_queue = deque() def new_actor(self, name, actor): self._msg_queue.append((actor, None)) self._actors[name] = actor def send(self, name, msg): actor = self._actors.get(name) if actor: self._msg_queue.append((actor, msg)) def run(self): while self._msg_queue: # print("隊列:", self._msg_queue) actor, msg = self._msg_queue.popleft() # print("actor", actor) # print("msg", msg) try: actor.send(msg) except StopIteration: pass if __name__ == '__main__': def say_hello(): while True: msg = yield print("say hello", msg) def say_hi(): while True: msg = yield print("say hi", msg) def counter(sched): while True: n = yield print("counter:", n) if n == 0: break sched.send('say_hello', n) sched.send('say_hi', n) sched.send('counter', n-1) sched = ActorScheduler() # 建立初始化 actors sched.new_actor('say_hello', say_hello()) sched.new_actor('say_hi', say_hi()) sched.new_actor('counter', counter(sched)) sched.send('counter', 10) sched.run()
上例中:
(1) ActorScheduler 負責事件循環
(2) counter() 負責控制終止
(3) say_hello() / say_hi() 至關於切換的協程,當程序運行到這些函數內部的yield處,就開始切換。
因此,當執行時,咱們可以看到say_hello() / say_hi()不斷交替切換執行,直到counter知足終止條件以後,協程終止。看懂上例可能須要花費一些時間。實際上咱們已經實現了一個「操做系統」的最小核心部分。 生成器函數(含有yield的函數)就是認爲,而yield語句是任務掛起的信號。 調度器循環檢查任務列表直到沒有任務要執行爲止。
有了前面對協程的瞭解,咱們能夠思考怎樣去實現一個協程庫?我以爲能夠從如下兩個個方面去思考:
(1)事件循環 (event loop)。事件循環須要實現兩個功能,一是順序執行協程代碼;二是完成協程的調度,即一個協程「暫停」時,決定接下來執行哪一個協程。
(2)協程上下文的切換。基本上Python 生成器的 yeild 已經能完成切換,Python3中還有特定語法支持協程切換。
咱們看一個比較複雜的例子:
from collections import deque from select import select class YieldEvent: def handle_yield(self, sched, task): pass def handle_resume(self, sched, task): pass # 任務調度(至關於EventLoop) class Scheduler: def __init__(self): self._numtasks = 0 # 任務總數量 self._ready = deque() # 等待執行的任務隊列 self._read_waiting = {} # 正等待讀的任務 self._write_waiting = {} # 正等待寫的任務 # 利用I/O多路複用 監聽讀寫I/0 def _iopoll(self): rset, wset, eset = select(self._read_waiting, self._write_waiting, []) for r in rset: evt, task = self._read_waiting.pop(r) evt.handle_resume(self, task) for w in wset: evt, task = self._write_waiting.pop(w) evt.handle_resume(self, task) def new(self, task): """添加一個新的任務""" self._ready.append((task, None)) self._numtasks += 1 def add_ready(self, task, msg=None): """添加到任務對列等待執行""" self._ready.append((task, msg)) def _read_wait(self, fileno, evt, task): self._read_waiting[fileno] = (evt, task) def _write_wait(self, fileno, evt, task): self._write_waiting[fileno] = (evt, task) def run(self): while self._numtasks: # 若是任務數量爲空,阻塞在select處,保持監聽 if not self._ready: self._iopoll() task, msg = self._ready.popleft() try: r = task.send(msg) if isinstance(r, YieldEvent): r.handle_yield(self, task) else: raise RuntimeError('unrecognized yield event') except StopIteration: self._numtasks -= 1 # 示例: 將協程抽象成YieldEvent的子類,並重寫handle_yield和handle_resume方法 class ReadSocket(YieldEvent): def __init__(self, sock, nbytes): self.sock = sock self.nbytes = nbytes def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): data = self.sock.recv(self.nbytes) sched.add_ready(task, data) class WriteSocket(YieldEvent): def __init__(self, sock, data): self.sock = sock self.data = data def handle_yield(self, sched, task): sched._write_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): nsent = self.sock.send(self.data) sched.add_ready(task, nsent) class AcceptSocket(YieldEvent): def __init__(self, sock): self.sock = sock def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): r = self.sock.accept() sched.add_ready(task, r) class Socket(object): def __init__(self, sock): self._sock = sock def recv(self, maxbytes): return ReadSocket(self._sock, maxbytes) def send(self, data): return WriteSocket(self._sock, data) def accept(self): return AcceptSocket(self._sock) def __getattr__(self, name): return getattr(self._sock, name) if __name__ == '__main__': from socket import socket, AF_INET, SOCK_STREAM def readline(sock): chars = [] while True: c = yield sock.recv(1) print(c) if not c: break chars.append(c) if c == b'\n': break return b''.join(chars) # socket server 使用生成器 class EchoServer: def __init__(self, addr, sched): self.sched = sched sched.new(self.server_loop(addr)) def server_loop(self, addr): s = Socket(socket(AF_INET, SOCK_STREAM)) s.bind(addr) s.listen(5) while True: c, a = yield s.accept() print('Got connection from ', a) print("got", c) self.sched.new(self.client_handler(Socket(c))) def client_handler(self, client): while True: try: line = yield from readline(client) if not line: break print("from Client::", str(line)) except Exception: break while line: try: nsent = yield client.sendall(line) print("nsent", nsent) line = line[nsent:] except Exception: break client.close() print('Client closed') sched = Scheduler() EchoServer(('localhost', 9999), sched) sched.run()
Scheduler至關於實現事件循環並調度協程, 添加到事件循環中的事件必須繼承YieldEvent, 並重寫它定義的兩個方法。此例比較難,看不懂能夠忽略。
咱們看一下Python3中的協程庫asyncio是怎麼實現的:
import asyncio @asyncio.coroutine def say_hi(n): print("start:", n) r = yield from asyncio.sleep(2) print("end:", n) loop = asyncio.get_event_loop() tasks = [say_hi(0), say_hi(1)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() # start: 1 # start: 0 # 停頓兩秒 # end: 1 # end: 0
(1)@asyncio.coroutine把一個generator標記爲coroutine類型,而後,咱們就把這個coroutine扔到EventLoop中執行。
(2)yield from語法可讓咱們方便地調用另外一個generator。因爲asyncio.sleep()也是一個coroutine,因此線程不會等待asyncio.sleep(),而是直接中斷並執行下一個消息循環。當asyncio.sleep()返回時,線程就能夠從yield from拿到返回值(此處是None),而後接着執行下一行語句。
(3)asyncio.sleep(1)至關於一個耗時1秒的IO操做,在此期間,主線程並未等待,而是去執行EventLoop中其餘能夠執行的coroutine了,所以能夠實現併發執行。
asyncio中get_event_loop()就是事件循環,而裝飾器@asyncio.coroutine標記了一個協程,並yield from 語法實現協程切換。在Python3.5中,新增了async
和await
的新語法,代替裝飾器和yield from。上例能夠用新增語法徹底代替。
async def say_hi(n): print("start:", n) r = await asyncio.sleep(2) print("end:", n) loop = asyncio.get_event_loop() tasks = [say_hi(0), say_hi(1)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() # start: 1 # start: 0 # 停頓兩秒 # end: 1 # end: 0
將@asyncio.coroutine換成async, 將yield from 換成await 便可。
(1)使用協程,只能使用單線程,多線程的便利就一點都用不到。例如,I/O阻塞程序,CPU仍然會將整個任務掛起直到操做完成。
(2) 一旦使用協程,大部分ython庫並不能很好的兼容,這就會致使要改寫大量的標準庫函數。
因此,最好別用協程,一旦用很差,協程給程序性能帶來的提高,遠遠彌補不了其帶來的災難。