Python中的協程

Coroutine in Python

引言: 本文出自David Beazley 的關於協程的PPT,如今筆者將他翻譯過來。並整理成文。感謝在協程方面的專家David Beazley, 能給咱們這麼深刻的協程上面的講座。也但願本文能給更多pythoner普及yield的更多用法,使python的這個特性可以更加多的活躍在你們的代碼中。html

源PPT和源碼能夠從這裏下載:

http://www.dabeaz.com/coroutines/python

問題:

1. 什麼是協程
2. 協程怎麼用
3. 要注意什麼
4. 用他們好麼
複製代碼

第一部分:生成器和協程的介紹

生成器(Generator)的本質和特色

生成器 是 能夠生成必定序列的 函數。 函數能夠調用next()方法。web

生成器的例子:

  • 例子1: follow.py 可使用生成器完成 tail -f 的功能,也就是跟蹤輸出的功能。
import time

def follow(thefile):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
    	line = thefile.readline()
      if not line:
      	time.sleep(0.1)    # Sleep briefly
          continue
      yield line
複製代碼
  • 例子2: 生成器用做程序管道(相似unix pipe)
ps:unix pipe 
       A pipeline is a sequence of processes chained together by their standard streams 

標註:unix管道
		一個uinx管道是由標準流連接在一塊兒的一系列流程.
複製代碼

pipeline.py數據庫

def grep(pattern,lines):
    for line in lines:
        if pattern in line:
             yield line

if __name__ == '__main__':
    from follow import follow

    # Set up a processing pipe : tail -f | grep python
    logfile  = open("access-log")
    loglines = follow(logfile)
    pylines  = grep("python",loglines)

    # Pull results out of the processing pipeline
    for line in pylines:
        print line,

複製代碼

理解pipeline.py
在pipeline中,follow函數和grep函數至關於程序鏈,這樣就能鏈式處理程序。編程

Yield做爲表達【咱們開始說協程了~】:

grep.pyjson

def grep(pattern):
    print "Looking for %s" % pattern

    print "give a value in the coroutines"
    while True:
        line = (yield)
        if pattern in line:
            print line
# Example use
if __name__ == '__main__':
    g = grep("python")
    g.next()
    g.send("Yeah, but no, but yeah, but no")
    g.send("A series of tubes")
    g.send("python generators rock!")
複製代碼

yield最重要的問題在於yield的值是多少。安全

yield的值須要使用coroutine協程這個概念 相對於僅僅生成值,函數能夠動態處理傳送進去的值,而最後值經過yield返回。bash

協程的執行:

協程的執行和生成器的執行很類似。 當你初始化一個協程,不會返回任何東西。 協程只能響應run和send函數。 協程的執行依賴run和send函數。服務器

協程啓動:

全部的協程都須要調用.next( )函數。 調用的next( )函數將要執行到第一個yield表達式的位置。 在yield表達式的位置上,很容易去執行就能夠。 協程使用next()啓動。網絡

使用協程的修飾器:

由【協程啓動】中咱們知道,啓動一個協程須要記得調用next( )來開始協程,而這個啓動器容易忘記使用。 使用修飾器包一層,來讓咱們啓動協程。 【之後全部的協程器都會先有@coroutine

def coroutine(func):
		def start(*args, **kwargs):
			cr = func(*args, **kwargs)
          cr.next()
          return cr
	return start

@coroutine
def grep(pattern):
	...
複製代碼

關閉一個協程:

使用close()來關閉。

使用except捕獲協程的關閉close():

grepclose.py

@coroutine
def grep(pattern):
		print "Looking for %s" % pattern
		try:
			while True:
          	line = (yield)
              if pattern in line:
              	print line,
		except GeneratorExit:
			print "Going away. Goodbye"
複製代碼

使用GeneratorExit這個異常類型

拋出一個異常:

在一個協程中,能夠拋出一個異常

g.throw(RuntimeError,"You're hosed")

Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "<stdin>", line 4, in grep
    RuntimeError: You're hosed 複製代碼

異常起源於yield表達式 能夠用常規方法去抓取

一些小tips

* 儘管有點類似,可是生成器和協程是*兩個徹底不一樣的概念*。
* 生成器用來產生序列。
* 協程用來處理序列。
* 很容易產生一些誤解。由於協程有的時候用來對進程裏面的用來產生迭代對象的生成器做微調。
複製代碼

生成器不可以同時生成值和接受值

* 不能往generator裏面send東西。
* 協程和迭代器的概念沒有關係
* 雖然有一種用法,確實是在一個協程裏面生成一些值,可是並不和迭代器有關係。
複製代碼

第二部分:協程,管道,數據流

進程管道:以下圖所示,一連串進程串起來像管道同樣。

協程能夠用來做爲進程管道。 你僅僅須要把協程鏈接在一塊兒,而後經過send()操做傳遞數據。 整個進程管道由三部分組成:

第一部分,管道源/協程源:

進程管道須要一個初始的源(一個生產者)。 這個初始的源驅動整個管道。 管道源不是協程。

第二部分,管道終止/協程終止:

管道必須有個終止點。 管道終止/協程終止是進程管道的終止點。

例子:以實現tail -f 功能爲例子

from coroutine import coroutine

# A data source. This is not a coroutine, but it sends
# data into one (target)
import time
def follow(thefile, target):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
         line = thefile.readline()
         if not line:
             time.sleep(0.1)    # Sleep briefly
             continue
         target.send(line)

# A sink. A coroutine that receives data
@coroutine
def printer():
    while True:
         line = (yield)
         print line,

# Example use
if __name__ == '__main__':
    f = open("access-log")
    follow(f,printer())
複製代碼

分析:第一個follow函數是協程源,第二個printer函數是協程終止。協程源不是一個協程,可是須要傳入一個已經初始化完畢的協程。在協程源當中,調用send()。

第三部分,管道過濾器:

叫過濾器其實並不貼切,應該叫中間人Intermediate:其兩端都是send()函數。

(協程的中間層) 典型的中間層以下:

 @coroutine
	def filter(target):  # 這個target是傳遞參數的對象
		while True:
			item = (yield)  # 這裏用來接收上一個send()傳入的value
			# Transform/filter item
			# processing items
			# Send it along to the next stage
			target.send(item)  # 像target傳遞參數 
複製代碼

分析可知,中間層須要接受上一個coroutine,也須要往下一個coroutine裏面傳遞值。

一個管道過濾器的例子 從文章中找出具備「python」關鍵字的句子打印。 grep.py:

 @coroutine
	def grep(pattern, target):  # 這個target用來接收參數
		while True:
			line = (yield)  # 這裏用來接收上一個send()傳入的value
			# Transform/filter item
			# processing items
			if pattern in line:
				target.send(line)
			# Send it along to the next stage
複製代碼

Hook it up with follow and printer:

f = open("access-log")
  follow(f, grep('python', printer())) 
複製代碼

grep 從中間傳入follow,而後printer傳入grep。

協程和生成器的對比

不一樣處:生成器使用了迭代器拉取數據,協程使用send()壓入數據。

變得多分支:(上一個協程發送數據去多個下一段協程)

圖示:

使用協程,你能夠發送數據 給 多個 協程過濾器/協程終了。可是請注意,協程源只是用來傳遞數據的,過多的在協程源中傳遞數據是使人困惑而且複雜的。

一個例子

@coroutine
def broadcast(targets):
    while True:
        item = (yield)
        for target in targets:
            target.send(item)
複製代碼

Hook it Up!

if __name__ == '__main__':
    f = open("access-log")
    follow(f,
       broadcast([grep('python',printer()),
                  grep('ply',printer()),
                  grep('swig',printer())])
           )
複製代碼

從文章中分別打印出含有’python‘ ’ply‘ ’swig‘ 關鍵字的句子。使用了一個協程隊列向全部printer協程 送出 接收到的數據。 圖示:

或者這樣Hook them up:

if __name__ == '__main__':
    f = open("access-log")
    p = printer()
    follow(f,
       broadcast([grep('python',p),
                  grep('ply',p),
                  grep('swig',p)])
           )
複製代碼

圖示:

爲何咱們用協程

  • 協程相較於迭代器,存在更增強大的數據路由(就像上圖的數據流向)的可能。
  • 協程能夠將一系列簡單的數據處理組件,整合到管道,分支,合併等複雜的佈置當中。
  • 但有些限制…【後文會說】 相對於對象的優點
  • 從概念上簡單一點:協程就是一個函數,對象要構建整個對象。
  • 從代碼執行角度上來講,協程相對要快一些。

第三部分:協程,事件分發

事件處理

協程能夠用在寫各類各樣處理事件流的組件。

介紹一個例子【這個例子會貫穿這個第三部分始終】要求作一個實時的公交車GPS位置監控。編寫程序的主要目的是處理一份文件。傳統上,使用SAX進行處理。【SAX處理能夠減小內存空間的使用,但SAX事件驅動的特性會讓它笨重和低效】。
複製代碼

把SAX和協程組合在一塊兒

咱們可使用協程分發SAX事件,好比:

import xml.sax

class EventHandler(xml.sax.ContentHandler):
    def __init__(self,target):
        self.target = target
    def startElement(self,name,attrs):
        self.target.send(('start',(name,attrs._attrs)))
    def characters(self,text):
        self.target.send(('text',text))
    def endElement(self,name):
        self.target.send(('end',name))

# example use
if __name__ == '__main__':
    from coroutine import *

 @coroutine
    def printer():
        while True:
            event = (yield)
            print event
    xml.sax.parse("allroutes.xml",
                  EventHandler(printer()))

複製代碼

解析:整個事件的處理如圖所示

【最終的組合】

好比,把xml改爲json最後從中篩選的出固定信息. buses.py

@coroutine
def buses_to_dicts(target):
    while True:
        event, value = (yield)
        # Look for the start of a <bus> element
        if event == 'start' and value[0] == 'bus':
            busdict = {}
            fragments = []
            # Capture text of inner elements in a dict
            while True:
                event, value = (yield)
                if event == 'start':
                    fragments = []
                elif event == 'text':
                    fragments.append(value)
                elif event == 'end':
                    if value != 'bus':
                        busdict[value] = "".join(fragments)
                    else:
                        target.send(busdict)
                        break
複製代碼

協程的一個有趣的事情是,您能夠將初始數據源推送到低級別的語言,而不須要重寫全部處理階段。好比,PPT 中69-73頁介紹的,能夠經過協程和低級別的語言進行聯動,從而達成很是好的優化效果。如Expat模塊或者cxmlparse模塊。 ps: ElementTree具備快速的遞增xml句法分析

第四部分:從數據處理到併發編程

複習一下上面學的特色:

協程有如下特色。

  • 協程和生成器很是像。
  • 咱們能夠用協程,去組合各類簡單的小組件。
  • 咱們可使用建立進程管道,數據流圖的方法去處理數據。
  • 你可使用伴有複雜數據處理代碼的協程。

一個類似的主題:

咱們往協程內傳送數據,向線程內傳送數據,也向進程內傳送數據。那麼,協程天然很容易和線程和分佈式系統聯繫起來。

基礎的併發:

咱們能夠經過添加一個額外的層,從而封裝協程進入線程或者子進程。這描繪了幾個基本的概念。

目標!協程+線程【沒有蛀牙。

下面看一個線程的例子。 cothread.py

@coroutine
def threaded(target):
# 第一部分:
    messages = Queue()

    def run_target():
        while True:
            item = messages.get()
            if item is GeneratorExit:
                target.close()
                return
            else:
                target.send(item)

    Thread(target=run_target).start()
# 第二部分:
    try:
        while True:
            item = (yield)
            messages.put(item)
    except GeneratorExit:
        messages.put(GeneratorExit)
複製代碼

例子解析:第一部分:先新建一個隊列。而後定義一個永久循環的線程;這個線程能夠將其中的元素拉出消息隊列,而後發送到目標裏面。第二部分:接受上面送來的元素,並經過隊列,將他們傳送進線程裏面。其中用到了GeneratorExit ,使得線程能夠正確的關閉。

Hook up:cothread.py

if __name__ == '__main__':
    import xml.sax
    from cosax import EventHandler
    from buses import *

    xml.sax.parse("allroutes.xml", EventHandler(
        buses_to_dicts(
            threaded(
                filter_on_field("route", "22",
              	filter_on_field("direction", "North Bound",
											bus_locations()))))))

複製代碼

可是:添加線程讓這個例子慢了50%

目標!協程+子進程

咱們知道,進程之間是不共享系統資源的,因此要進行兩個子進程之間的通訊,咱們須要經過一個文件橋接兩個協程。

import cPickle as pickle
from coroutine import *

@coroutine
def sendto(f):
    try:
        while True:
            item = (yield)
            pickle.dump(item, f)
            f.flush()
    except StopIteration:
        f.close()

def recvfrom(f, target):
    try:
        while True:
            item = pickle.load(f)
            target.send(item)
    except EOFError:
        target.close()
# Example use
if __name__ == '__main__':
    import xml.sax
    from cosax import EventHandler
    from buses import *
    import subprocess
    p = subprocess.Popen(['python', 'busproc.py'],
                         stdin=subprocess.PIPE)
    xml.sax.parse("allroutes.xml",
                  EventHandler(
                      buses_to_dicts(
                          sendto(p.stdin))))
複製代碼

程序經過sendto()和recvfrom()傳遞文件。

和環境結合的協程:

使用協程,咱們能夠從一個任務的執行環境中剝離出他的實現。而且,協程就是那個實現。執行環境是你選擇的線程,子進程,網絡等。

須要注意的警告

  • 建立大量的協同程序,線程和進程多是建立 不可維護 應用程序的一個好方法,而且會減慢你程序的速度。須要學習哪些是良好的使用協程的習慣。
  • 在協程裏send()方法須要被適當的同步。
  • 若是你對已經正在執行了的協程使用send()方法,那麼你的程序會發生崩潰。如:多個線程發送數據進入同一個協程。
  • 一樣的不能創造循環的協程:

  • 堆棧發送正在構建一種調用堆棧(send()函數不返回,直到目標產生)。
  • 若是調用一個正在發送進程的協程,將會拋出一個錯誤。
  • send() 函數不會掛起任何一個協程的執行。

第五部分:任務同樣的協程

Task的概念

在併發編程中,一般將問題細分爲「任務」。 「任務」有下面幾個經典的特色: * 擁有獨立的控制流。 * 擁有內在的狀態。 * 能夠被安排規劃/掛起/恢復。 * 可與其餘的任務通訊。 協程也是任務的一種。

協程是任務的一種:

  1. 下面的部分 來告訴你協程有他本身的控制流,這裏 if 的控制就是控制流。
@coroutine
def grep(pattern):
    print "Looking for %s" % pattern
    print "give a value in the coroutines"
    while True:
        line = (yield)
        if pattern in line:
            print line
複製代碼
  1. 協程是一個相似任何其餘Python函數的語句序列。
  2. 協程有他們內在的本身的狀態,好比一些變量:其中的pattern和line就算是本身的狀態。
@coroutine
def grep(pattern):
    print "Looking for %s" % pattern
    print "give a value in the coroutines"
    while True:
        line = (yield)
        if pattern in line:
            print line
複製代碼
  1. 本地的生存時間和協程的生存時間相同。
  2. 不少協程構建了一個可執行的環境。
  3. 協程能夠互相通訊,好比:yield就是用來接受傳遞的信息,而上一個協程的send( )就是用來向下一個協程。
@coroutine
def grep(pattern):
    print "Looking for %s" % pattern
    print "give a value in the coroutines"
    while True:
        line = (yield)
        if pattern in line:
            print line
複製代碼
  1. 協程能夠被掛起,重啓,關閉。
    • yield能夠掛起執行進程。
    • send() 用來 重啓執行進程。
    • close()用來終止/關閉進程。

總之,一個協程知足以上全部任務(task)的特色,因此協程很是像任務。可是協程不用與任何一個線程或者子進程綁定。

第六部分:操做系統的中斷事件。(微嵌課程學的好的同窗能夠直接跳到這部分的「啓示」✌️)

操做系統的執行(複習微嵌知識)

當計算機運行時,電腦沒有同時運行好幾條指令的打算。而不管是處理器,應用程序都不懂多任務處理。因此,操做系統須要去完成多任務的調度。操做系統經過在多個任務中快速切換來實現多任務。

須要解決的問題(還在複習微嵌知識)

CPU執行的是應用程序,而不是你的操做系統,那沒有被CPU執行的操做系統是怎麼控制正在運行的應用程序中斷的呢。

中斷(interrupts)和陷阱(Traps)

操做系統只能經過兩個機制去得到對應用程序的控制:中斷和陷阱。 * 中斷:和硬件有關的balabala。 * 陷阱:一個軟件發出的信號。 在兩種情況下,CPU都會掛起正在作的,而後執行OS的代碼(這個時候,OS的代碼成功插入了應用程序的執行),此時,OS來切換了程序。

中斷的底層實現(略…碼字員微嵌只有70分🤦‍♀️)

中斷的高級表現:

* 中斷(Traps)使得OS的代碼能夠實現。
* 在程序運行遇到中斷(Traps)時,OS強制在CPU上中止你的程序。
* 程序掛起,而後OS運行。
複製代碼

表現以下圖:

每次中斷(Traps)程序都會執行另外一個不一樣的任務。

任務調度(很是簡單):

爲了執行不少任務,添加一簇任務隊列。

啓示(很重要):

BB了這麼多微嵌的內容,獲得的是什麼結論呢。類比任務調度,協程中yield聲明能夠理解爲中斷(Traps)。當一個生成器函數碰到了yield聲明,那函數將當即掛起。而執行被傳給生成器函數運行的任何代碼。若是你把yield聲明當作了一箇中斷,那麼你就能夠組件一個多任務執行的操做系統了。

第七部分:讓咱們建一個操做系統。【起飛了,請握好扶手

目標:知足如下條件建成一個操做系統。

1. 用純python語句。
2. 不用線程。
3. 不用子進程。
4. 使用生成器和協程器。
複製代碼

咱們用python去構建操做系統的一些動機:

* 尤爲在存在線程鎖(GIL)的條件下,在線程間切換會變得很是重要。我要高併發!
* 不阻塞和異步I/O。我要高併發!
* 在實戰中可能會遇到:服務器要同時處理上千條客戶端的鏈接。我要高併發!
* 大量的工做 致力於實現 事件驅動 或者說 響應式模型。我要組件化!
* 綜上,python構建操做系統,有利於瞭解如今高併發,組件化的趨勢。
複製代碼

第一步:定義任務

定義一個任務類:任務像一個協程的殼,協程函數傳入target;任務類僅僅有一個run()函數。 pyos1.py

# Step 1: Tasks
# This object encapsulates a running task.

class Task(object):
    taskid = 0 # 全部task對象會共享這個值。不熟悉的朋友請補一下類的知識
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)
複製代碼

任務類的執行:

if __name__ == '__main__':
    # A simple generator/coroutine function
    def foo():
        print "Part 1"
        yield
        print "Part 2"
        yield

    t1 = Task(foo())
    print "Running foo()"
    t1.run()
    print "Resuming foo()"
    t1.run()
複製代碼

在foo中,yield就像中斷(Traps)同樣,每次執行run(),任務就會執行到下一個yield(一箇中斷)。

第二步:構建調度者

下面是調度者類,兩個屬性分別是Task隊列和task_id與Task類對應的map。schedule()向隊列裏面添加Task。new()用來初始化目標函數(協程函數),將目標函數包裝在Task,進而裝入Scheduler。最後mainloop會從隊列裏面拉出task而後執行到task的target函數的yield爲止,執行完之後再把task放回隊列。這樣下一次會從下一個yield開始執行。 pyos2.py

from Queue import Queue

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
        while self.taskmap:
            task = self.ready.get()
            result = task.run()
            self.schedule(task)
複製代碼

下面是一個執行的例子:

# === Example ===
if __name__ == '__main__':
    # Two tasks
    def foo():
        while True:
            print "I'm foo"
            yield
            print "I am foo 2"
            yield

    def bar():
        while True:
            print "I'm bar"
            yield
            print "i am bar 2"
            yield       
    # Run them
    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()
複製代碼

執行結果,能夠發現兩個task之間任務是交替的,而且以yield做爲中斷點。每當執行撞到yield(中斷點)以後,Scheduler對Tasks作從新的規劃。下圖是兩個循環。 上述執行的結果:

第三步:肯定任務的中止條件

若是,target函數裏面不是死循環,那麼上面的代碼就會出錯。因此咱們對Scheduler作改進。添加一個從任務隊列中刪除的操做,和對於StopIteration的驗證。 【對scheduler作改進的緣由是任務的性質:能夠被安排規劃/掛起/恢復。】

class Scheduler(object):
    def __init__(self):
			...     
    def new(self,target):
			...
    def schedule(self,task):
			...

    def exit(self,task):
        print "Task %d terminated" % task.tid
        del self.taskmap[task.tid]
    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)
複製代碼

第四步:添加系統調用基類。

在OS中,中斷是應用程序請求系統服務的方式。在咱們的代碼中,OS是調度者(scheduler),而中斷是yield。爲了請求調度者服務,任務須要帶值使用yield聲明。 pyos4.py

class Scheduler(object):
	  ...
    def mainloop(self):
        while self.taskmap:   # 1
            task = self.ready.get() 
            try:				 # 2
                result = task.run()
                if isinstance(result, SystemCall):
                    result.task = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue 
            self.schedule(task) # 3

class SystemCall(object): # 4
    def handle(self):
        pass
複製代碼

代碼解析: 1. 若是taskmap裏面存在task,就從ready隊列裏面拿任務出來,若是沒有就結束mainloop。 2. 【就是傳說中的系統調運部分】ready隊列裏面的task被拿出來之後,執行task,返回一個result對象,並初始化這個result對象。若是隊列裏面的task要中止迭代了(終止yield這個過程)就從隊列裏刪除這個任務。 3. 最後再經過schedule函數把執行後的task放回隊列裏面。 4. 系統調用基類,以後全部的系統調用都要從這個基類繼承。

第4.5步:添加第一個系統調用

這個系統調用想返回任務的id。 Task的sendval屬性就像一個系統調用的返回值。當task從新運行的是後,sendval將會傳入這個系統調用。 pyos4.py

...
class GetTid(SystemCall):
    def handle(self):
		# 把task的id傳給task的返回參數:
        self.task.sendval = self.task.tid  
		# 再把task給放入Scheduler的隊列裏面
        self.sched.schedule(self.task)

class Task(object):
	  ...
    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)
複製代碼

進行最後的調用:

if __name__ == '__main__':
    def foo():
        mytid = yield GetTid()
        for i in xrange(5):
            print "I'm foo", mytid
            yield
    def bar():
        mytid = yield GetTid()
        for i in xrange(10):
            print "I'm bar", mytid
            yield

    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()
複製代碼

理解這段代碼的前提:(很是重要) 1. send()函數有返回值的,返回值是yield表達式右邊的值。在本段代碼中,result的返回值是yield GetTid()的GetTid的實例或者是yield後面的None。 2. 執行send(sendval)之後,sendval被傳入了yield表達式。並賦給了mytid,返回GetTid()給ruselt。

執行順序: 先建立一個調度者(Scheduler),而後在調度者裏面添加兩個協程函數:foo(), bar(),最後觸發mainloop進行協程的調度執行。

系統調用原理: 系統調用是基於系統調用類實現的,如GetTid類,其目的是傳出本身的tid。傳出本身的tid以後,再將task放回隊列。

第五步:任務管理

上面咱們搞定了一個GetTid系統調用。咱們如今搞定更多的系統調用: * 建立一個新的任務。 * 殺掉一個已經存在的任務。 * 等待一個任務結束。 這些細小的相同的操做會與線程,進程配合。

1. *建立一個新的系統調用*:經過系統調用加入一個task。
複製代碼
# Create a new task
class NewTask(SystemCall):
    def __init__(self,target):
        self.target = target
    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)
複製代碼
2. *殺掉一個系統調用*:經過系統調用殺掉一個task。
複製代碼
class KillTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid

    def handle(self):
        task = self.sched.taskmap.get(self.tid, None)
        if task:
            task.target.close()
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)
複製代碼
3. 進程等待:須要大幅度改進Scheduler。
複製代碼
class Scheduler(object):
    def __init__(self):
			...
        # Tasks waiting for other tasks to exit
        self.exit_waiting = {}
    def new(self, target):
			...
    def exit(self, task):
        print "Task %d terminated" % task.tid
        del self.taskmap[task.tid]
        # Notify other tasks waiting for exit
        for task in self.exit_waiting.pop(task.tid, []):
            self.schedule(task)
    def waitforexit(self, task, waittid):
        if waittid in self.taskmap:
            self.exit_waiting.setdefault(waittid, []).append(task)
            return True
        else:
            return False
    def schedule(self, task):
			...
    def mainloop(self):
        ...
複製代碼

exit_waiting 是用來暫時存放要退出task的地方。

class WaitTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid

    def handle(self):
        result = self.sched.waitforexit(self.task, self.tid)
        self.task.sendval = result
        # If waiting for a non-existent task,
        # return immediately without waiting
        if not result:
            self.sched.schedule(self.task)
複製代碼

設計討論: * 在任務中引用另外一個任務的惟一辦法 是 使用scheduler分配給它的任務ID。 * 上述準則是一個安全的封裝策略。 * 這個準則讓任務保持獨立,不與內核混淆在一塊兒。 * 這個準則能讓全部的任務都被scheduler管理的好好的。

網絡服務器的搭建:

如今已經完成了: * 多任務。 * 開啓新的進程。 * 進行新任務的管理。 這些特色都很是符合一個web服務器的各類特色。下面作一個Echo Server的嘗試。

from pyos6 import *
from socket import *
def handle_client(client, addr):
    print "Connection from", addr
    while True:
        data = client.recv(65536)
        if not data:
            break
        client.send(data)
    client.close()
    print "Client closed"
    yield  # Make the function a generator/coroutine
def server(port):
    print "Server starting"
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(("", port))
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        yield NewTask(handle_client(client, addr))
def alive():
    while True:
        print "I'm alive!"
        yield
sched = Scheduler()
sched.new(alive())
sched.new(server(45000))
sched.mainloop()
複製代碼

但問題是這個網絡服務器是I / O阻塞的。整個python的解釋器須要掛起,一直到I/O操做結束。

非阻塞的I/O

先額外介紹一個叫Select的模塊。select模塊能夠用來監視一組socket連接的活躍狀態。用法以下:

reading = []    # List of sockets waiting for read
writing = []    # List of sockets waiting for write
# Poll for I/O activity

r,w,e = select.select(reading,writing,[],timeout)
    # r is list of sockets with incoming data
    # w is list of sockets ready to accept outgoing data
    # e is list of sockets with an error state
複製代碼

下面實現一個非阻塞I/O的網絡服務器,所用的思想就是以前所實現的Task waiting 思想。

class Scheduler(object):
    def __init__(self):
		  ...
        # I/O waiting
        self.read_waiting = {}
        self.write_waiting = {}
	  ...
    # I/O waiting
    def waitforread(self, task, fd):
        self.read_waiting[fd] = task

    def waitforwrite(self, task, fd):
        self.write_waiting[fd] = task

    def iopoll(self, timeout):
        if self.read_waiting or self.write_waiting:
            r, w, e = select.select(self.read_waiting,
                                    self.write_waiting, 
												[], timeout)
            for fd in r:
					self.schedule(self.read_waiting.pop(fd))
            for fd in w:
					self.schedule(self.write_waiting.pop(fd))
複製代碼

源碼解析:__init__裏面的是兩個字典。用來存儲阻塞的IO的任務。waitforread()和waitforwrite()將須要等待寫入和等待讀取的task放在dict裏面。這裏的iopoll():使用select()去決定使用哪一個文件描述器,而且可以不阻塞任意一個和I/O才作有關係的任務。poll這個東西也能夠放在mainloop裏面,可是這樣會帶來線性的開銷增加。 詳情請見:Python Select 解析 - 金角大王 - 博客園

添加新的系統調用:

# Wait for a task to exit
class WaitTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid

    def handle(self):
        result = self.sched.waitforexit(self.task, self.tid)
        self.task.sendval = result
        # If waiting for a non-existent task,
        # return immediately without waiting
        if not result:
            self.sched.schedule(self.task)
# Wait for reading
class ReadWait(SystemCall):
    def __init__(self, f):
        self.f = f

    def handle(self):
        fd = self.f.fileno()
        self.sched.waitforread(self.task, fd)
# Wait for writing
class WriteWait(SystemCall):
    def __init__(self, f):
        self.f = f

    def handle(self):
        fd = self.f.fileno()
        self.sched.waitforwrite(self.task, fd)
複製代碼

更多請見開頭那個鏈接裏面的代碼:pyos8.py

這樣咱們就完成了一個多任務處理的OS。這個OS能夠併發執行,能夠建立、銷燬、等待任務。任務能夠進行I/O操做。而且最後咱們實現了併發服務器。

第八部分:協程棧的一些問題的研究。

咱們可能在使用yield的時候會遇到一些問題:

先來看一段示例代碼:

def Accept(sock):
      yield ReadWait(sock)
      return sock.accept()

def server(port):
		while True:
  		client,addr = Accept(sock)
      	yield NewTask(handle_client(client,addr))
複製代碼

這種狀況下,server()函數裏面的有調用Accept(),可是accept函數裏面的yield不起做用。這是由於yield只能在函數棧的最頂層掛起一個協程。你也不可以把yield寫進庫函數裏面。 【這個限制是Stackless Python要解決的問題之一。

解決這個只能在函數棧頂掛起協程的解決方法。 * 有且只有一種方法,可以建立可掛起的子協程和函數。 * 可是,建立可掛起的子協程和函數須要經過咱們以前所說的Task Scheduler自己。 * 咱們必須嚴格遵照yield聲明。 * 咱們須要使用一種 -奇淫巧技- 叫作Trampolining(蹦牀)。

讓咱們來看看這個叫蹦牀的奇淫巧技。

代碼:trampoline.py

def add(x, y):
    yield x + y

# A function that calls a subroutine
def main():
    r = yield add(2, 2)
    print r
    yield

def run():
    m = main()
    # An example of a "trampoline"
    sub = m.send(None)

    result = sub.send(None)
    m.send(result)

# execute:
run()
複製代碼

整個控制流以下:

咱們看到,上圖中左側爲統一的scheduler,若是咱們想調用一個子線程,咱們都用經過上面的scheduler進行調度。

控制流:

控制過程: scheduler -> subroutine_1 -> scheduler -> subroutine_2 -> scheduler -> subroutine_1 就像蹦牀(trampolining)同樣,全部的子進程調度都要先返回scheduler,再進行下一步。【有點像汽車換擋。

而不是: -scheduler -> subroutine_1 -> subroutine_2 -> subroutine_1- 這種直接棧式的子協程調度是不被容許的。

第九部分:最後的一些話。

更加深遠的一些話題。

有不少更加深遠的話題值得咱們去討論。其實在上面的套路里面都說了一些。 * 在task之間的通訊。 * 處理阻塞的一些操做:好比和數據庫的一些連接。 * 多進程的協程和多線程的協程。 * 異常處理。

讓咱們對yield一點小尊重:

Python 的生成器比不少人想象的有用的多。生成器能夠:

* 定製可迭代對象。
* 處理程序管道和數據流。【第二部分提到】
* 事物處理。【第三部分提到的和SAX結合的事務處理】
* 合做的多任務處理【第四部分提到的Task,子進程子線程合做】
複製代碼

在下列三種蛀牙的狀況下咱們能夠想起來,使用yield。

* 迭代器:要產生數據。
* 接受數據/消息:消費數據。
* 一箇中斷:在合做性的多任務裏面。
複製代碼

千萬不要一個函數裏面包含兩個或多個以上的功能,好比函數是generator就是generator,是一個coroutine就是一個coroutin。

最後

感謝你們閱讀。我是LumiaXu。一名電子科大正在找實習的pythoner~。

更多訪問原做者的網站: http://www.dabeaz.com

#python/coroutine#

相關文章
相關標籤/搜索