引言: 本文出自David Beazley 的關於協程的PPT,如今筆者將他翻譯過來。並整理成文。感謝在協程方面的專家David Beazley, 能給咱們這麼深刻的協程上面的講座。也但願本文能給更多pythoner普及yield的更多用法,使python的這個特性可以更加多的活躍在你們的代碼中。
html
http://www.dabeaz.com/coroutines/python
1. 什麼是協程
2. 協程怎麼用
3. 要注意什麼
4. 用他們好麼
複製代碼
生成器 是 能夠生成必定序列的 函數。 函數能夠調用next()方法。web
import time
def follow(thefile):
thefile.seek(0,2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1) # Sleep briefly
continue
yield line
複製代碼
ps:unix pipe
A pipeline is a sequence of processes chained together by their standard streams
標註:unix管道
一個uinx管道是由標準流連接在一塊兒的一系列流程.
複製代碼
pipeline.py數據庫
def grep(pattern,lines):
for line in lines:
if pattern in line:
yield line
if __name__ == '__main__':
from follow import follow
# Set up a processing pipe : tail -f | grep python
logfile = open("access-log")
loglines = follow(logfile)
pylines = grep("python",loglines)
# Pull results out of the processing pipeline
for line in pylines:
print line,
複製代碼
理解pipeline.py
在pipeline中,follow函數和grep函數至關於程序鏈,這樣就能鏈式處理程序。編程
grep.pyjson
def grep(pattern):
print "Looking for %s" % pattern
print "give a value in the coroutines"
while True:
line = (yield)
if pattern in line:
print line
# Example use
if __name__ == '__main__':
g = grep("python")
g.next()
g.send("Yeah, but no, but yeah, but no")
g.send("A series of tubes")
g.send("python generators rock!")
複製代碼
yield最重要的問題在於yield的值是多少。安全
yield的值須要使用coroutine協程這個概念 相對於僅僅生成值,函數能夠動態處理傳送進去的值,而最後值經過yield返回。bash
協程的執行和生成器的執行很類似。 當你初始化一個協程,不會返回任何東西。 協程只能響應run和send函數。 協程的執行依賴run和send函數。服務器
全部的協程都須要調用.next( )函數。 調用的next( )函數將要執行到第一個yield表達式的位置。 在yield表達式的位置上,很容易去執行就能夠。 協程使用next()啓動。網絡
由【協程啓動】中咱們知道,啓動一個協程須要記得調用next( )來開始協程,而這個啓動器容易忘記使用。 使用修飾器包一層,來讓咱們啓動協程。 【之後全部的協程器都會先有@coroutine
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start
@coroutine
def grep(pattern):
...
複製代碼
使用close()來關閉。
grepclose.py
@coroutine
def grep(pattern):
print "Looking for %s" % pattern
try:
while True:
line = (yield)
if pattern in line:
print line,
except GeneratorExit:
print "Going away. Goodbye"
複製代碼
使用GeneratorExit這個異常類型
在一個協程中,能夠拋出一個異常
g.throw(RuntimeError,"You're hosed")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in grep
RuntimeError: You're hosed 複製代碼
異常起源於yield表達式 能夠用常規方法去抓取
* 儘管有點類似,可是生成器和協程是*兩個徹底不一樣的概念*。
* 生成器用來產生序列。
* 協程用來處理序列。
* 很容易產生一些誤解。由於協程有的時候用來對進程裏面的用來產生迭代對象的生成器做微調。
複製代碼
* 不能往generator裏面send東西。
* 協程和迭代器的概念沒有關係
* 雖然有一種用法,確實是在一個協程裏面生成一些值,可是並不和迭代器有關係。
複製代碼
進程管道須要一個初始的源(一個生產者)。 這個初始的源驅動整個管道。 管道源不是協程。
管道必須有個終止點。 管道終止/協程終止是進程管道的終止點。
from coroutine import coroutine
# A data source. This is not a coroutine, but it sends
# data into one (target)
import time
def follow(thefile, target):
thefile.seek(0,2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1) # Sleep briefly
continue
target.send(line)
# A sink. A coroutine that receives data
@coroutine
def printer():
while True:
line = (yield)
print line,
# Example use
if __name__ == '__main__':
f = open("access-log")
follow(f,printer())
複製代碼
分析:第一個follow函數是協程源,第二個printer函數是協程終止。協程源不是一個協程,可是須要傳入一個已經初始化完畢的協程。在協程源當中,調用send()。
叫過濾器其實並不貼切,應該叫中間人Intermediate:其兩端都是send()函數。
(協程的中間層) 典型的中間層以下: @coroutine
def filter(target): # 這個target是傳遞參數的對象
while True:
item = (yield) # 這裏用來接收上一個send()傳入的value
# Transform/filter item
# processing items
# Send it along to the next stage
target.send(item) # 像target傳遞參數
複製代碼
分析可知,中間層須要接受上一個coroutine,也須要往下一個coroutine裏面傳遞值。
一個管道過濾器的例子 從文章中找出具備「python」關鍵字的句子打印。 grep.py:
@coroutine
def grep(pattern, target): # 這個target用來接收參數
while True:
line = (yield) # 這裏用來接收上一個send()傳入的value
# Transform/filter item
# processing items
if pattern in line:
target.send(line)
# Send it along to the next stage
複製代碼
Hook it up with follow and printer:
f = open("access-log")
follow(f, grep('python', printer()))
複製代碼
grep 從中間傳入follow,而後printer傳入grep。
圖示:
使用協程,你能夠發送數據 給 多個 協程過濾器/協程終了。可是請注意,協程源只是用來傳遞數據的,過多的在協程源中傳遞數據是使人困惑而且複雜的。
一個例子
@coroutine
def broadcast(targets):
while True:
item = (yield)
for target in targets:
target.send(item)
複製代碼
Hook it Up!
if __name__ == '__main__':
f = open("access-log")
follow(f,
broadcast([grep('python',printer()),
grep('ply',printer()),
grep('swig',printer())])
)
複製代碼
從文章中分別打印出含有’python‘ ’ply‘ ’swig‘ 關鍵字的句子。使用了一個協程隊列向全部printer協程 送出 接收到的數據。 圖示:
或者這樣Hook them up:
if __name__ == '__main__':
f = open("access-log")
p = printer()
follow(f,
broadcast([grep('python',p),
grep('ply',p),
grep('swig',p)])
)
複製代碼
圖示:
爲何咱們用協程
協程能夠用在寫各類各樣處理事件流的組件。
介紹一個例子【這個例子會貫穿這個第三部分始終】要求作一個實時的公交車GPS位置監控。編寫程序的主要目的是處理一份文件。傳統上,使用SAX進行處理。【SAX處理能夠減小內存空間的使用,但SAX事件驅動的特性會讓它笨重和低效】。
複製代碼
咱們可使用協程分發SAX事件,好比:
import xml.sax
class EventHandler(xml.sax.ContentHandler):
def __init__(self,target):
self.target = target
def startElement(self,name,attrs):
self.target.send(('start',(name,attrs._attrs)))
def characters(self,text):
self.target.send(('text',text))
def endElement(self,name):
self.target.send(('end',name))
# example use
if __name__ == '__main__':
from coroutine import *
@coroutine
def printer():
while True:
event = (yield)
print event
xml.sax.parse("allroutes.xml",
EventHandler(printer()))
複製代碼
解析:整個事件的處理如圖所示
好比,把xml改爲json最後從中篩選的出固定信息. buses.py
@coroutine
def buses_to_dicts(target):
while True:
event, value = (yield)
# Look for the start of a <bus> element
if event == 'start' and value[0] == 'bus':
busdict = {}
fragments = []
# Capture text of inner elements in a dict
while True:
event, value = (yield)
if event == 'start':
fragments = []
elif event == 'text':
fragments.append(value)
elif event == 'end':
if value != 'bus':
busdict[value] = "".join(fragments)
else:
target.send(busdict)
break
複製代碼
協程的一個有趣的事情是,您能夠將初始數據源推送到低級別的語言,而不須要重寫全部處理階段。好比,PPT 中69-73頁介紹的,能夠經過協程和低級別的語言進行聯動,從而達成很是好的優化效果。如Expat模塊或者cxmlparse模塊。 ps: ElementTree具備快速的遞增xml句法分析
協程有如下特色。
咱們往協程內傳送數據,向線程內傳送數據,也向進程內傳送數據。那麼,協程天然很容易和線程和分佈式系統聯繫起來。
咱們能夠經過添加一個額外的層,從而封裝協程進入線程或者子進程。這描繪了幾個基本的概念。
下面看一個線程的例子。 cothread.py
@coroutine
def threaded(target):
# 第一部分:
messages = Queue()
def run_target():
while True:
item = messages.get()
if item is GeneratorExit:
target.close()
return
else:
target.send(item)
Thread(target=run_target).start()
# 第二部分:
try:
while True:
item = (yield)
messages.put(item)
except GeneratorExit:
messages.put(GeneratorExit)
複製代碼
例子解析:第一部分:先新建一個隊列。而後定義一個永久循環的線程;這個線程能夠將其中的元素拉出消息隊列,而後發送到目標裏面。第二部分:接受上面送來的元素,並經過隊列,將他們傳送進線程裏面。其中用到了GeneratorExit ,使得線程能夠正確的關閉。
Hook up:cothread.py
if __name__ == '__main__':
import xml.sax
from cosax import EventHandler
from buses import *
xml.sax.parse("allroutes.xml", EventHandler(
buses_to_dicts(
threaded(
filter_on_field("route", "22",
filter_on_field("direction", "North Bound",
bus_locations()))))))
複製代碼
可是:添加線程讓這個例子慢了50%
咱們知道,進程之間是不共享系統資源的,因此要進行兩個子進程之間的通訊,咱們須要經過一個文件橋接兩個協程。
import cPickle as pickle
from coroutine import *
@coroutine
def sendto(f):
try:
while True:
item = (yield)
pickle.dump(item, f)
f.flush()
except StopIteration:
f.close()
def recvfrom(f, target):
try:
while True:
item = pickle.load(f)
target.send(item)
except EOFError:
target.close()
# Example use
if __name__ == '__main__':
import xml.sax
from cosax import EventHandler
from buses import *
import subprocess
p = subprocess.Popen(['python', 'busproc.py'],
stdin=subprocess.PIPE)
xml.sax.parse("allroutes.xml",
EventHandler(
buses_to_dicts(
sendto(p.stdin))))
複製代碼
程序經過sendto()和recvfrom()傳遞文件。
使用協程,咱們能夠從一個任務的執行環境中剝離出他的實現。而且,協程就是那個實現。執行環境是你選擇的線程,子進程,網絡等。
須要注意的警告:
在併發編程中,一般將問題細分爲「任務」。 「任務」有下面幾個經典的特色: * 擁有獨立的控制流。 * 擁有內在的狀態。 * 能夠被安排規劃/掛起/恢復。 * 可與其餘的任務通訊。 協程也是任務的一種。
@coroutine
def grep(pattern):
print "Looking for %s" % pattern
print "give a value in the coroutines"
while True:
line = (yield)
if pattern in line:
print line
複製代碼
@coroutine
def grep(pattern):
print "Looking for %s" % pattern
print "give a value in the coroutines"
while True:
line = (yield)
if pattern in line:
print line
複製代碼
@coroutine
def grep(pattern):
print "Looking for %s" % pattern
print "give a value in the coroutines"
while True:
line = (yield)
if pattern in line:
print line
複製代碼
總之,一個協程知足以上全部任務(task)的特色,因此協程很是像任務。可是協程不用與任何一個線程或者子進程綁定。
當計算機運行時,電腦沒有同時運行好幾條指令的打算。而不管是處理器,應用程序都不懂多任務處理。因此,操做系統須要去完成多任務的調度。操做系統經過在多個任務中快速切換來實現多任務。
CPU執行的是應用程序,而不是你的操做系統,那沒有被CPU執行的操做系統是怎麼控制正在運行的應用程序中斷的呢。
操做系統只能經過兩個機制去得到對應用程序的控制:中斷和陷阱。 * 中斷:和硬件有關的balabala。 * 陷阱:一個軟件發出的信號。 在兩種情況下,CPU都會掛起正在作的,而後執行OS的代碼(這個時候,OS的代碼成功插入了應用程序的執行),此時,OS來切換了程序。
* 中斷(Traps)使得OS的代碼能夠實現。
* 在程序運行遇到中斷(Traps)時,OS強制在CPU上中止你的程序。
* 程序掛起,而後OS運行。
複製代碼
表現以下圖:
每次中斷(Traps)程序都會執行另外一個不一樣的任務。爲了執行不少任務,添加一簇任務隊列。
BB了這麼多微嵌的內容,獲得的是什麼結論呢。類比任務調度,協程中yield聲明能夠理解爲中斷(Traps)。當一個生成器函數碰到了yield聲明,那函數將當即掛起。而執行被傳給生成器函數運行的任何代碼。若是你把yield聲明當作了一箇中斷,那麼你就能夠組件一個多任務執行的操做系統了。
1. 用純python語句。
2. 不用線程。
3. 不用子進程。
4. 使用生成器和協程器。
複製代碼
* 尤爲在存在線程鎖(GIL)的條件下,在線程間切換會變得很是重要。我要高併發!
* 不阻塞和異步I/O。我要高併發!
* 在實戰中可能會遇到:服務器要同時處理上千條客戶端的鏈接。我要高併發!
* 大量的工做 致力於實現 事件驅動 或者說 響應式模型。我要組件化!
* 綜上,python構建操做系統,有利於瞭解如今高併發,組件化的趨勢。
複製代碼
定義一個任務類:任務像一個協程的殼,協程函數傳入target;任務類僅僅有一個run()函數。 pyos1.py
# Step 1: Tasks
# This object encapsulates a running task.
class Task(object):
taskid = 0 # 全部task對象會共享這個值。不熟悉的朋友請補一下類的知識
def __init__(self,target):
Task.taskid += 1
self.tid = Task.taskid # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
複製代碼
任務類的執行:
if __name__ == '__main__':
# A simple generator/coroutine function
def foo():
print "Part 1"
yield
print "Part 2"
yield
t1 = Task(foo())
print "Running foo()"
t1.run()
print "Resuming foo()"
t1.run()
複製代碼
在foo中,yield就像中斷(Traps)同樣,每次執行run(),任務就會執行到下一個yield(一箇中斷)。
下面是調度者類,兩個屬性分別是Task隊列和task_id與Task類對應的map。schedule()向隊列裏面添加Task。new()用來初始化目標函數(協程函數),將目標函數包裝在Task,進而裝入Scheduler。最後mainloop會從隊列裏面拉出task而後執行到task的target函數的yield爲止,執行完之後再把task放回隊列。這樣下一次會從下一個yield開始執行。 pyos2.py
from Queue import Queue
class Scheduler(object):
def __init__(self):
self.ready = Queue()
self.taskmap = {}
def new(self,target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def schedule(self,task):
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
result = task.run()
self.schedule(task)
複製代碼
下面是一個執行的例子:
# === Example ===
if __name__ == '__main__':
# Two tasks
def foo():
while True:
print "I'm foo"
yield
print "I am foo 2"
yield
def bar():
while True:
print "I'm bar"
yield
print "i am bar 2"
yield
# Run them
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
複製代碼
執行結果,能夠發現兩個task之間任務是交替的,而且以yield做爲中斷點。每當執行撞到yield(中斷點)以後,Scheduler對Tasks作從新的規劃。下圖是兩個循環。 上述執行的結果:
若是,target函數裏面不是死循環,那麼上面的代碼就會出錯。因此咱們對Scheduler作改進。添加一個從任務隊列中刪除的操做,和對於StopIteration的驗證。 【對scheduler作改進的緣由是任務的性質:能夠被安排規劃/掛起/恢復。】
class Scheduler(object):
def __init__(self):
...
def new(self,target):
...
def schedule(self,task):
...
def exit(self,task):
print "Task %d terminated" % task.tid
del self.taskmap[task.tid]
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
except StopIteration:
self.exit(task)
continue
self.schedule(task)
複製代碼
在OS中,中斷是應用程序請求系統服務的方式。在咱們的代碼中,OS是調度者(scheduler),而中斷是yield。爲了請求調度者服務,任務須要帶值使用yield聲明。 pyos4.py
class Scheduler(object):
...
def mainloop(self):
while self.taskmap: # 1
task = self.ready.get()
try: # 2
result = task.run()
if isinstance(result, SystemCall):
result.task = task
result.sched = self
result.handle()
continue
except StopIteration:
self.exit(task)
continue
self.schedule(task) # 3
class SystemCall(object): # 4
def handle(self):
pass
複製代碼
代碼解析: 1. 若是taskmap裏面存在task,就從ready隊列裏面拿任務出來,若是沒有就結束mainloop。 2. 【就是傳說中的系統調運部分】ready隊列裏面的task被拿出來之後,執行task,返回一個result對象,並初始化這個result對象。若是隊列裏面的task要中止迭代了(終止yield這個過程)就從隊列裏刪除這個任務。 3. 最後再經過schedule函數把執行後的task放回隊列裏面。 4. 系統調用基類,以後全部的系統調用都要從這個基類繼承。
這個系統調用想返回任務的id。 Task的sendval屬性就像一個系統調用的返回值。當task從新運行的是後,sendval將會傳入這個系統調用。 pyos4.py
...
class GetTid(SystemCall):
def handle(self):
# 把task的id傳給task的返回參數:
self.task.sendval = self.task.tid
# 再把task給放入Scheduler的隊列裏面
self.sched.schedule(self.task)
class Task(object):
...
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
複製代碼
進行最後的調用:
if __name__ == '__main__':
def foo():
mytid = yield GetTid()
for i in xrange(5):
print "I'm foo", mytid
yield
def bar():
mytid = yield GetTid()
for i in xrange(10):
print "I'm bar", mytid
yield
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
複製代碼
理解這段代碼的前提:(很是重要) 1. send()函數有返回值的,返回值是yield表達式右邊的值。在本段代碼中,result的返回值是yield GetTid()的GetTid的實例或者是yield後面的None。 2. 執行send(sendval)之後,sendval被傳入了yield表達式。並賦給了mytid,返回GetTid()給ruselt。
執行順序: 先建立一個調度者(Scheduler),而後在調度者裏面添加兩個協程函數:foo(), bar(),最後觸發mainloop進行協程的調度執行。
系統調用原理: 系統調用是基於系統調用類實現的,如GetTid類,其目的是傳出本身的tid。傳出本身的tid以後,再將task放回隊列。
上面咱們搞定了一個GetTid系統調用。咱們如今搞定更多的系統調用: * 建立一個新的任務。 * 殺掉一個已經存在的任務。 * 等待一個任務結束。 這些細小的相同的操做會與線程,進程配合。
1. *建立一個新的系統調用*:經過系統調用加入一個task。
複製代碼
# Create a new task
class NewTask(SystemCall):
def __init__(self,target):
self.target = target
def handle(self):
tid = self.sched.new(self.target)
self.task.sendval = tid
self.sched.schedule(self.task)
複製代碼
2. *殺掉一個系統調用*:經過系統調用殺掉一個task。
複製代碼
class KillTask(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self):
task = self.sched.taskmap.get(self.tid, None)
if task:
task.target.close()
self.task.sendval = True
else:
self.task.sendval = False
self.sched.schedule(self.task)
複製代碼
3. 進程等待:須要大幅度改進Scheduler。
複製代碼
class Scheduler(object):
def __init__(self):
...
# Tasks waiting for other tasks to exit
self.exit_waiting = {}
def new(self, target):
...
def exit(self, task):
print "Task %d terminated" % task.tid
del self.taskmap[task.tid]
# Notify other tasks waiting for exit
for task in self.exit_waiting.pop(task.tid, []):
self.schedule(task)
def waitforexit(self, task, waittid):
if waittid in self.taskmap:
self.exit_waiting.setdefault(waittid, []).append(task)
return True
else:
return False
def schedule(self, task):
...
def mainloop(self):
...
複製代碼
exit_waiting 是用來暫時存放要退出task的地方。
class WaitTask(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self):
result = self.sched.waitforexit(self.task, self.tid)
self.task.sendval = result
# If waiting for a non-existent task,
# return immediately without waiting
if not result:
self.sched.schedule(self.task)
複製代碼
設計討論: * 在任務中引用另外一個任務的惟一辦法 是 使用scheduler分配給它的任務ID。 * 上述準則是一個安全的封裝策略。 * 這個準則讓任務保持獨立,不與內核混淆在一塊兒。 * 這個準則能讓全部的任務都被scheduler管理的好好的。
如今已經完成了: * 多任務。 * 開啓新的進程。 * 進行新任務的管理。 這些特色都很是符合一個web服務器的各類特色。下面作一個Echo Server的嘗試。
from pyos6 import *
from socket import *
def handle_client(client, addr):
print "Connection from", addr
while True:
data = client.recv(65536)
if not data:
break
client.send(data)
client.close()
print "Client closed"
yield # Make the function a generator/coroutine
def server(port):
print "Server starting"
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(("", port))
sock.listen(5)
while True:
client, addr = sock.accept()
yield NewTask(handle_client(client, addr))
def alive():
while True:
print "I'm alive!"
yield
sched = Scheduler()
sched.new(alive())
sched.new(server(45000))
sched.mainloop()
複製代碼
但問題是這個網絡服務器是I / O阻塞的。整個python的解釋器須要掛起,一直到I/O操做結束。
先額外介紹一個叫Select的模塊。select模塊能夠用來監視一組socket連接的活躍狀態。用法以下:
reading = [] # List of sockets waiting for read
writing = [] # List of sockets waiting for write
# Poll for I/O activity
r,w,e = select.select(reading,writing,[],timeout)
# r is list of sockets with incoming data
# w is list of sockets ready to accept outgoing data
# e is list of sockets with an error state
複製代碼
下面實現一個非阻塞I/O的網絡服務器,所用的思想就是以前所實現的Task waiting 思想。
class Scheduler(object):
def __init__(self):
...
# I/O waiting
self.read_waiting = {}
self.write_waiting = {}
...
# I/O waiting
def waitforread(self, task, fd):
self.read_waiting[fd] = task
def waitforwrite(self, task, fd):
self.write_waiting[fd] = task
def iopoll(self, timeout):
if self.read_waiting or self.write_waiting:
r, w, e = select.select(self.read_waiting,
self.write_waiting,
[], timeout)
for fd in r:
self.schedule(self.read_waiting.pop(fd))
for fd in w:
self.schedule(self.write_waiting.pop(fd))
複製代碼
源碼解析:__init__裏面的是兩個字典。用來存儲阻塞的IO的任務。waitforread()和waitforwrite()將須要等待寫入和等待讀取的task放在dict裏面。這裏的iopoll():使用select()去決定使用哪一個文件描述器,而且可以不阻塞任意一個和I/O才作有關係的任務。poll這個東西也能夠放在mainloop裏面,可是這樣會帶來線性的開銷增加。 詳情請見:Python Select 解析 - 金角大王 - 博客園
添加新的系統調用:
# Wait for a task to exit
class WaitTask(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self):
result = self.sched.waitforexit(self.task, self.tid)
self.task.sendval = result
# If waiting for a non-existent task,
# return immediately without waiting
if not result:
self.sched.schedule(self.task)
# Wait for reading
class ReadWait(SystemCall):
def __init__(self, f):
self.f = f
def handle(self):
fd = self.f.fileno()
self.sched.waitforread(self.task, fd)
# Wait for writing
class WriteWait(SystemCall):
def __init__(self, f):
self.f = f
def handle(self):
fd = self.f.fileno()
self.sched.waitforwrite(self.task, fd)
複製代碼
更多請見開頭那個鏈接裏面的代碼:pyos8.py
這樣咱們就完成了一個多任務處理的OS。這個OS能夠併發執行,能夠建立、銷燬、等待任務。任務能夠進行I/O操做。而且最後咱們實現了併發服務器。
先來看一段示例代碼:
def Accept(sock):
yield ReadWait(sock)
return sock.accept()
def server(port):
while True:
client,addr = Accept(sock)
yield NewTask(handle_client(client,addr))
複製代碼
這種狀況下,server()函數裏面的有調用Accept(),可是accept函數裏面的yield不起做用。這是由於yield只能在函數棧的最頂層掛起一個協程。你也不可以把yield寫進庫函數裏面。 【這個限制是Stackless Python要解決的問題之一。
解決這個只能在函數棧頂掛起協程的解決方法。 * 有且只有一種方法,可以建立可掛起的子協程和函數。 * 可是,建立可掛起的子協程和函數須要經過咱們以前所說的Task Scheduler自己。 * 咱們必須嚴格遵照yield聲明。 * 咱們須要使用一種 -奇淫巧技- 叫作Trampolining(蹦牀)。
代碼:trampoline.py
def add(x, y):
yield x + y
# A function that calls a subroutine
def main():
r = yield add(2, 2)
print r
yield
def run():
m = main()
# An example of a "trampoline"
sub = m.send(None)
result = sub.send(None)
m.send(result)
# execute:
run()
複製代碼
整個控制流以下:
咱們看到,上圖中左側爲統一的scheduler,若是咱們想調用一個子線程,咱們都用經過上面的scheduler進行調度。
控制過程: scheduler -> subroutine_1 -> scheduler -> subroutine_2 -> scheduler -> subroutine_1 就像蹦牀(trampolining)同樣,全部的子進程調度都要先返回scheduler,再進行下一步。【有點像汽車換擋。
而不是: -scheduler -> subroutine_1 -> subroutine_2 -> subroutine_1- 這種直接棧式的子協程調度是不被容許的。
有不少更加深遠的話題值得咱們去討論。其實在上面的套路里面都說了一些。 * 在task之間的通訊。 * 處理阻塞的一些操做:好比和數據庫的一些連接。 * 多進程的協程和多線程的協程。 * 異常處理。
Python 的生成器比不少人想象的有用的多。生成器能夠:
* 定製可迭代對象。
* 處理程序管道和數據流。【第二部分提到】
* 事物處理。【第三部分提到的和SAX結合的事務處理】
* 合做的多任務處理【第四部分提到的Task,子進程子線程合做】
複製代碼
* 迭代器:要產生數據。
* 接受數據/消息:消費數據。
* 一箇中斷:在合做性的多任務裏面。
複製代碼
千萬不要一個函數裏面包含兩個或多個以上的功能,好比函數是generator就是generator,是一個coroutine就是一個coroutin。
感謝你們閱讀。我是LumiaXu。一名電子科大正在找實習的pythoner~。
更多訪問原做者的網站: http://www.dabeaz.com
#python/coroutine#