python中的協程(一)

協程

協程概念及目的

一、協程:html

單線程實現併發、在應用程序裏控制多個任務的切換+保存狀態python

優勢:程序員

應用程序級別速度要遠遠高於操做系統的切換算法

缺點:bash

多個任務一旦有一個阻塞沒有切,整個線程都阻塞在原地,該線程內的其餘的任務都不能執行了markdown

一旦引入協程,就須要檢測單線程下全部的IO行爲, 實現遇到IO就切換,少一個都不行,覺得一旦一個任務阻塞了,整個線程就阻塞了, 其餘的任務即使是能夠計算,可是也沒法運行了數據結構

二、協程序的目的:併發

想要在單線程下實現併發,主要用於io密集型dom

併發指的是多個任務看起來是同時運行的ide

併發=切換+保存狀態

三、協程相比於線程:

最大的區別在於,協程不須要像線程那樣來回的中斷切換,也不須要線程的鎖機制,由於線程中斷或者鎖機制都會對性能問題形成影響,因此協程的性能相比於線程,性能有明顯的提升,尤爲在線程越多的時候,優點越明顯。


複習生成器

生成器的一個做用是相似於迭代器,每次迭代的值爲yield右值(重點),還有一種是利用yield斷點,而後切換到另外一個任務,下面是第一種用法的複習

def f(maxx):
    n, a, b = 0, 1, 1
    while n < maxx:
        # print(b)
        y = yield b
        a, b = b, a + b
        n += 1
        print(y)
    return 'error_name'  # 原函數的return變成了迭代完報出錯誤的值(value)


fi = f(6)  # 將一個函數變成生成器,並賦值給fi,每次迭代的值都是yield右邊的值
print(fi.__next__())  # 運行一次生成器,到yield處中斷,運行下面的程序
print(fi.send('Done'))  # 回到第一次運行的生成器的yield中斷處,並把Done賦予yield,而後執行下面的程序,到yield再次中止
fi.send('Done')  # 若是這句換成print(fi.send('Done')),則會輸出3
print(fi.send('Done'))  # next和send的返回值都是fi迭代器本次的值

########################
1
Done
2
Done
Done
5

這裏有一點要注意,要激活一個生成器,必定要調用next()方法或者send(None)啓動生成器,而不是調用生成器的send()方法,若是直接調用send()方法會報錯,同時在生成器這裏,最早調用 next() 函數這一步一般稱爲「預激」(prime)協程(即,讓協程向前執行到第一個 yield 表達式,準備好做爲活躍的協程使用)。

從Python2.5 開始,咱們能夠在生成器上調用兩個方法,顯式的把異常發給協程。 這兩個方法是throw和close

generator.throw詳解

generator.throw(exc_type[, exc_value[, traceback]])

外層代碼調用throw使生成器在暫停的yield表達式處拋出指定的異常。若是生成器處理了拋出的異常,代碼會向前(循環)執行到下一個yield表達式,而產出的值會成爲調用throw方法獲得的返回值。若是沒有處理,則向上冒泡(傳遞給外層,值傳遞給外一層)。

def myGenerator():
    value = 1
    while True:
        yield value
        value += 1
 
 
gen = myGenerator()
print gen.next()
print gen.next()
print gen.throw(Exception, "Method throw called!")

  輸出結果爲

1
2
Traceback (most recent call last):
  File "test.txt", line 11, in <module>
    print gen.throw(Exception, "Method throw called!")
  File "test.txt", line 4, in myGenerator
    yield value
Exception: Method throw called!

  外層代碼的最後一句向生成器對象拋出了一個異常。可是,在生成器對象的方法時沒有處理該異常的代碼,所以異常會被拋出到主方法,主方法任然未處理,最終報錯。

下面的示例中,添加了處理異常的代碼

def myGenerator():
    value = 1
    while True:
        try:
            yield value
            value += 1
        except:
            value = 1
 
 
gen = myGenerator()
print gen.next()
print gen.next()
print gen.throw(Exception, "Method throw called!")

  代碼的輸出以下

1
2
1
Exception RuntimeError: 'generator ignored GeneratorExit' in <generator object myGenerator at 0x00000000028BB900> ignored
  上面輸出中,第2個1是gen.throw方法的返回值。在執行完該方法後,生成器對象方法的while循環並無結束,也便是說生成器方法的執行尚未結束。這個時候若是強制結束主程序,會拋出一個RuntimeError(這裏不懂下面在介紹)。也就是上面輸出的第4行。要優雅地關閉主程序,須要用到生成器對象的close方法。下面繼續介紹。

GeneratorExit異常

當一個生成器對象被銷燬時,或者生成器遇到異常退出時,會拋出一個GeneratorExit異常。請看下面的代碼。
def myGenerator():  
    try:
        yield 1
    except GeneratorExit:
        print "myGenerator exited"
 
gen = myGenerator()
print gen.next()

輸出結果爲
1
myGenerator exited

  上面代碼的運行邏輯以下: 當調用到gen.next()方法時,會執行生成器對象方法的yield語句。此後,主程序結束,系統會自動產生一個GeneratorExit異常,被生成器對象方法的Except語句塊截獲。

  值得一提的是,GeneratorExit異常只有在生成器對象被激活後,纔有可能產生。更確切的說,須要至少調用一次生成器對象的next方法後,系統纔會產生GeneratorExit異常。請看下面的代碼。

def myGenerator():  
    try:
        yield 1
        yield 2
    except GeneratorExit:
        print "myGenerator exited"
 
gen = myGenerator()
del gen
print "Main caller exited"

  其輸出結果以下:

Main caller exited

  在上面的示例中,咱們都顯式地捕獲了GeneratorExit異常。若是該異常沒有被顯式捕獲,生成器對象也不會把該異常向主程序拋出。由於GeneratorExit異常定義的初衷,是方便開發者在生成器對象調用結束後定義一些收尾的工做,如釋放資源等。

generator.close()

  生成器對象的close方法會在生成器對象方法的掛起處拋出一個GeneratorExit異常。GeneratorExit異常產生後,系統會繼續把生成器對象方法後續的代碼執行完畢。參見下面的代碼。

def myGenerator():  
    try:
        yield 1
        print "Statement after yield"
    except GeneratorExit:
        print "Generator error caught"
 
    print "End of myGenerator"
 
gen = myGenerator()
print gen.next()
gen.close()
print "End of main caller"

代碼執行過程以下:

  • 當調用gen.next方法時,會激活生成器,直至遇到生成器方法的yield語句,返回值1。同時,生成器方法的執行被掛起。
  • 當調用gen,close方法時,恢復生成器方法的執行過程。系統在yield語句處拋出GeneratorExit異常,執行過程跳到except語句塊。當except語句塊處理完畢後,系統會繼續往下執行,直至生成器方法執行結束。

代碼的輸出以下:

1
Generator error caught
End of myGenerator
End of main caller

  須要注意的是,GeneratorExit異常的產生意味着生成器對象的生命週期已經結束。所以,一旦產生了GeneratorExit異常,生成器方法後續執行的語句中,不能再有yield語句,不然會產生RuntimeError。請看下面的例子。

def myGenerator():  
    try:
        yield 1
        print "Statement after yield"
    except GeneratorExit:
        print "Generator error caught"
 
    yield 3
 
gen = myGenerator()
print gen.next()
gen.close()
print "End of main caller"


輸出結果爲
1
Generator error caught
Traceback (most recent call last):
  File "test.txt", line 12, in <module>
    gen.close()
RuntimeError: generator ignored GeneratorExit

  注意,因爲RuntimError會向主方法拋出,所以主方法最後的print語句沒有執行。

  有了上面的知識,咱們就能夠理解爲何下面的代碼會拋出RuntimError錯誤了。

def myGenerator():  
    value = 1  
    while True:  
        try:  
            yield value  
            value += 1  
        except:  
            value = 1  
  
  
gen = myGenerator()  
print gen.next()  
print gen.next()  
print gen.throw(Exception, "Method throw called!")  

  上面代碼中,當主程序結束前,系統產生GeneratorExit異常,被生成器對象方法的except語句捕獲,可是此時while語句尚未退出,所以後面還會執行「yield value」這一語句,從而發生RuntimeError。要避免這個錯誤很是簡單,請看下面的代碼。

def myGenerator():  
    value = 1  
    while True:  
        try:  
            yield value  
            value += 1  
        except Exception:  
            value = 1  
  
  
gen = myGenerator()  
print gen.next()  
print gen.next()  
print gen.throw(Exception, "Method throw called!")  

  代碼第7行的except語句聲明只捕獲Exception異常對象。這樣,當系統產生GeneratorExit異常後,再也不被except語句捕獲,繼續向外拋出,從而跳出了生成器對象方法的while語句。

  這裏再簡單說一句,GeneratorExit異常繼承自BaseException類。BaseException類與Exception類不一樣。通常狀況下,BaseException類是全部內建異常類的基類,而Exception類是全部用戶定義的異常類的基類。

生成器和協程

生成器和協程都是經過python中的yield的關鍵字實現的,不一樣的是,生成器的任務指向不可控,協程能夠理解爲任務指向可控的生成器

寫成是:程序員可控制的併發流程,無論是進程仍是線程,其切換都是操做系統在調度,而對於協程,程序員能夠控制何時切換出去,何時切換回來

生成器能夠做爲協程使用。協程是指一個過程,這個過程與調用方協做,產出由調用方提供的值。

儘管生成器和協程看起來很像,可是它們表明的倒是徹底不一樣的設計理念。生成器是用來生成數據的,而協程從某種意義上來講是消耗數據的。協程和迭代無關,儘管協程也會用next來獲取數據,可是協程和迭代無關,不要嘗試像使用生成器那樣去迭代地使用協程。


協程(生成器)有四種狀態,分別是

GEN_CREATED:等待執行

GEN_RUNNING:解釋器執行

GEN_SUSPENDED:在yield表達式處暫停

GEN_CLOSED:執行結束

協程(生成器)的狀態能夠用inspect.getgeneratorstate()函數來肯定,這裏是:inspect模塊官方文檔

中文方面參考以下:http://www.javashuo.com/article/p-cdwngenb-hg.html

來看下面的例子:

from inspect import getgeneratorstate
from time import sleep
import threading


def get_state(coro):
    print("其餘線程生成器狀態:%s", getgeneratorstate(coro))  # <1>


def simple_coroutine():
    for i in range(3):
        sleep(0.5)
        x = yield i + 1  # <1>


my_coro = simple_coroutine()
print("生成器初始狀態:%s" % getgeneratorstate(my_coro))  # <2>
first = next(my_coro)
for i in range(5):
    try:
        my_coro.send(i)
        print("主線程生成器初始狀態:%s" % getgeneratorstate(my_coro))  # <3>
        t = threading.Thread(target=get_state, args=(my_coro,))
        t.start()
    except StopIteration:
        print("生成器的值拉取完畢")
print("生成器最後狀態:%s" % getgeneratorstate(my_coro))  # <4>

#################
生成器初始狀態:GEN_CREATED
主線程生成器初始狀態:GEN_SUSPENDED
其餘線程生成器狀態:%s GEN_SUSPENDED
主線程生成器初始狀態:GEN_SUSPENDED
其餘線程生成器狀態:%s GEN_SUSPENDED
生成器的值拉取完畢
生成器的值拉取完畢
生成器的值拉取完畢
生成器最後狀態:GEN_CLOSED

  裝飾器加yield實現協程,生成器可實現自動循環,加上異常捕捉,生成器中的return爲異常stopitertions的值

from functools import wraps
 
 
def coroutine(func):
    @wraps(func)
    def primer(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
 
    return primer
 
 
@coroutine
def averager():
    total = .0
    count = 0
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average = total / count
 
 
try:
    coro_avg = averager()  # 預激裝飾器
    print(coro_avg.send(10))
    print(coro_avg.send(20))
    print(coro_avg.send(30))
    coro_avg.close()
    print(coro_avg.send(40))
except StopIteration:
    print("協程已結束")
except TypeError:
        print("傳入值異常")

####################
10.0
15.0
20.0
協程已結束

協程(生成器)的返回值

以前咱們知道,生成器的返回值是異常StopIteration的值,具體獲取值的方式能夠如此

from collections import namedtuple

Result = namedtuple('Result', 'count average')


def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield  # yield右邊是每次迭代的值,不寫則爲None
        if term is None:
            break  # 爲了返回值,協程必須正常終止;這裏是退出條件
        total += term
        count += 1
        average = total/count
    # 返回一個namedtuple,包含count和average兩個字段。在python3.3前,若是生成器返回值,會報錯
    return Result(count, average)

>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(20) # 並無返回值
>>> coro_avg.send(30)
>>> coro_avg.send(40)
>>> try:
...     coro_avg.send(None)
... except StopIteration as exc:
...     result = exc.value
...
>>> result
Result(count=3, average=30)

  這裏獲取返回值的方法很繁瑣,下面引出yield from。

yield from 初解

yield from 結果會在內部自動捕獲StopIteration 異常。這種處理方式與 for 循環處理StopIteration異常的方式同樣。 對於yield from 結構來講,解釋器不只會捕獲StopIteration異常,還會把value屬性的值變成yield from 表達式的值。

在函數外部不能使用yield from(yield也不行)。

yield from 是 Python3.3 後新加的語言結構。和其餘語言的await關鍵字相似,它表示:*在生成器 gen 中使用 yield from subgen()時,subgen 會得到控制權,把產出的值傳給gen的調用方,即調用方能夠直接控制subgen。於此同時,gen會阻塞,等待subgen終止,subgen是子生成器。

yield from 可用於簡化for循環中的yield表達式。

例如:

def gen():
    for c in 'AB':
        yield c
    for i in range(1, 3):
        yield i

list(gen())
['A', 'B', '1', '2']

能夠改寫爲:

def gen():
    yield from 'AB'
    yield from range(1, 3)
    
list(gen())
['A', 'B', '1', '2']

  還有一個稍微複雜點的例子

from collections import Iterable

def flatten(items, ignore_types=(str, bytes)):
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, ignore_types):
            # yield from x(subgen)表達式對x對象作的第一件事是,調用iter(x),獲取迭代器,因此要求x是可迭代對象,每次迭代的值爲yield的右值。
            yield from flatten(x)  # 這裏遞歸調用,若是flatten(x)中參數x是可迭代對象,繼續分解
            print('委派器從yield from阻塞中還原')
        else:
            yield x  # 這裏是每次迭代的值,直接傳給調用者,跳過(不通過)委派者管道


items = [1, 2, [3, 4, [5, 6], 7], 8]

# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):  # 第一步yield的右值
    print(x)  # x爲子生成器中每次迭代的
    print('調用者對委派器進行了一次迭代,委派器返回一個值')

items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
for x in flatten(items):
    print(x)

  這裏的註釋看不懂先往下看,理解了yield from的委派器、管道做用就好理解了。

  上面的例子若是進行了斷點調試的話,你看你會發如今子生成器(subgen)中,yield x直接能吧值傳給最外層的for循環,這就涉及到下面要說的yield from是鏈接子生成器和調用者的一個通道

 yield from的做用

PEP380 的標題是 」syntax for delegating to subgenerator「(把指責委託給子生成器的句法)。由此咱們能夠知道,yield from是能夠實現嵌套生成器的使用。注意,使用 yield from 句法調用協程時,會自動預激。

yield from 的主要功能是打開雙向通道,把最外層的調用方與最內層的子生成器鏈接起來,使二者能夠直接發送和產出值,還能夠直接傳入異常,而不用在中間的協程添加異常處理的代碼。

yield from 包含幾個概念:

  • 委派生成器

包含yield from 表達式的生成器函數

  • 子生成器

從yield from 部分獲取的生成器。

  • 調用方

調用委派生成器的客戶端(調用方)代碼

這個示意圖(圖一)是對yield from 的調用過程

上面的圖難以理解能夠看下面這個簡易圖

  這兩張圖說明了,委派生成器能夠理解爲管道,send和yield直接在調用方和子生成器(subgen)進行交互,send能夠直接把值由調用方「穿過(不通過)「委派生成器直接傳遞給子生成器,子生成器的yield也能夠把yield右邊的值穿過(不通過)委派生成器直接傳遞給調用者。

  yield from運行的全過程能夠歸納爲:委派生成器在 yield from 表達式處暫停時,調用方能夠直接把數據發給字生成器,子生成器再把產出的值發送給調用方。子生成器返回以後,解釋器會拋出StopIteration異常,並把返回值附加到異常對象上,這時委派生成器恢復。

  下面看一個例子來理解yield from的運行方式,這段代碼從一個字典中讀取男生和女生的身高和體重。而後把數據傳給以前定義的 averager 協程,最後生成一個報告。

from collections import namedtuple

Result = namedtuple('Result', 'count average')

# 子生成器
# 這個例子和上邊示例中的 averager 協程同樣,只不過這裏是做爲子生成器使用
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        # main 函數發送數據到這裏 
        term = yield  # yield右邊若是有數據,則這個數據會直接傳遞給main,main中委派生成器迭代一次,本次迭代的值是yield的右值
        if term is None: # 終止條件
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average) # 返回的Result 會成爲grouper函數中yield from表達式的值


# 委派生成器
def grouper(results, key):
     # 這個循環每次都會新建一個averager 實例,每一個實例都是做爲協程使用的生成器對象
    while True:
        # grouper 發送的每一個值都會經由yield from 處理,經過管道傳給averager 實例。grouper會在yield from表達式處暫停,等待averager實例處理客戶端發來的值。averager實例運行完畢後,返回的值綁定到results[key] 上。while 循環會不斷建立averager實例,處理更多的值。
        results[key] = yield from averager()


# 調用方
def main(data):
    results = {}
    for key, values in data.items():
        # group 是調用grouper函數獲得的生成器對象,傳給grouper 函數的第一個參數是results,用於收集結果;第二個是某個鍵
        group = grouper(results, key)
        next(group)
        for value in values:
            # 把各個value傳給grouper 傳入的值最終到達averager函數中;
            # grouper並不知道傳入的是什麼,同時grouper實例在yield from處暫停
            group.send(value)
        # 把None傳入groupper,傳入的值最終到達averager函數中,致使當前實例終止。而後繼續建立下一個實例。
        # 若是沒有group.send(None),那麼averager子生成器永遠不會終止,委派生成器也永遠不會在此激活,也就不會爲result[key]賦值
        group.send(None)
    report(results)


# 輸出報告
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))


data = {
    'girls;kg':[40, 41, 42, 43, 44, 54],
    'girls;m': [1.5, 1.6, 1.8, 1.5, 1.45, 1.6],
    'boys;kg':[50, 51, 62, 53, 54, 54],
    'boys;m': [1.6, 1.8, 1.8, 1.7, 1.55, 1.6],
}

if __name__ == '__main__':
    main(data)

  執行結果爲

6 boys  averaging 54.00kg
6 boys  averaging 1.68m
6 girls averaging 44.00kg
6 girls averaging 1.58m

  這斷代碼展現了yield from 結構最簡單的用法。委派生成器至關於管道,因此能夠把任意數量的委派生成器鏈接在一塊兒—一個委派生成器使用yield from 調用一個子生成器,而那個子生成器自己也是委派生成器,使用yield from調用另外一個生成器。最終以一個只是用yield表達式的生成器(或者任意可迭代對象)結束。

 yield from 的意義

PEP380 分6點說明了yield from 的行爲。

  • 子生成器產出的值(yield右值)都直接傳給委派生成器的調用方(客戶端代碼)
  • 使用send() 方法發給委派生成器的值都直接傳給子生成器。若是發送的值是None,那麼會調用子生成器的 next()方法。若是發送的值不是None,那麼會調用子生成器的send()方法。若是調用的方法拋出StopIteration異常,那麼委派生成器恢復運行(從yield from阻塞中恢復)。任何其餘異常都會向上冒泡(向外層傳遞),傳給委派生成器。
  • 生成器退出時,生成器(或子生成器)中的return expr 表達式會觸發 StopIteration(expr) 異常拋出。
  • yield from表達式的值是子生成器終止時傳給StopIteration異常的第一個參數。
  • 傳入委派生成器的異常,除了 GeneratorExit 以外都傳給子生成器的throw()方法。若是調用throw()方法時拋出 StopIteration 異常,委派生成器恢復運行。StopIteration以外的異常會向上冒泡(有異常處理就處理,沒處理繼續上冒)。傳給委派生成器。
  • 若是把 GeneratorExit 異常傳入委派生成器,或者在委派生成器上調用close() 方法,那麼在子生成器上調用close() 方法,若是他有的話。若是調用close() 方法致使異常拋出,那麼異常會向上冒泡,傳給委派生成器;不然,委派生成器拋出 GeneratorExit 異常。

PEP380 還有個說明:

In a generator, the statement

return value

is semantically equivalent to

raise StopIteration(value)

except that, as currently, the exception cannot be caught by except clauses within the returning generator.

這也就是爲何 yield from 可使用return 來返回值而 yield 只能使用 try … except StopIteration … 來捕獲異常的value 值。

yield 實現併發(協程)

會建立幾輛出租車,每輛出租車會拉幾個乘客,而後回家。出租車會首先駛離車庫,四處徘徊,尋找乘客;拉到乘客後,行程開始;乘客下車後,繼續四處徘徊。

import random
import collections
import queue
import argparse

DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERAVAL = 5

# time 是事件發生的仿真時間,proc 是出租車進程實例的編號,action是描述活動的字符串
Event = collections.namedtuple('Event', ['time', 'proc', 'action'])


# 開始 出租車進程
# 每輛出租車調用一次taxi_process 函數,建立一個生成器對象,表示各輛出租車的運營過程。
def taxi_process(ident, trips, start_time=0):
"""
每次狀態變化時向建立事件,把控制權交給仿真器
:param ident: 出租車編號
:param trips: 出租車回家前的行程數量,接客數
:param start_time: 離開車庫的時間
:return:
"""
time = yield Event(start_time, ident, 'leave garage') # 產出的第一個Event
for i in range(trips): # 每次行程都會執行一遍這個代碼塊
# 產出一個Event實例,表示拉到了乘客 協程在這裏暫停 等待下一次send() 激活
time = yield Event(time, ident, 'pick up passenger')
# 產出一個Event實例,表示乘客下車 協程在這裏暫停 等待下一次send() 激活
time = yield Event(time, ident, 'drop off passenger')
# 指定的行程數量完成後,for 循環結束,最後產出 'going home' 事件。協程最後一次暫停
yield Event(time, ident, 'going home')
# 協程執行到最後 拋出StopIteration 異常


def compute_duration(previous_action):
"""使用指數分佈計算操做的耗時"""
if previous_action in ['leave garage', 'drop off passenger']:
# 新狀態是四處徘徊
interval = SEARCH_DURATION
elif previous_action == 'pick up passenger':
# 新狀態是開始行程
interval = TRIP_DURATION
elif previous_action == 'going home':
interval = 1
else:
raise ValueError('Unkonw previous_action: %s' % previous_action)
return int(random.expovariate(1 / interval)) + 1


# 開始仿真
class Simulator:

def __init__(self, procs_map):
self.events = queue.PriorityQueue() # 帶優先級的隊列 會按時間正向排序
self.procs = dict(procs_map) # 從獲取的procs_map 參數中建立本地副本,爲了避免修改用戶傳入的值

def run(self, end_time):
"""
調度並顯示事件,直到時間結束
:param end_time: 結束時間 只須要指定一個參數
:return:
"""
# 調度各輛出租車的第一個事件
for iden, proc in sorted(self.procs.items()):
first_event = next(proc) # 預激協程 併產出一個 Event 對象
self.events.put(first_event) # 把各個事件加到self.events 屬性表示的 PriorityQueue對象中

# 這次仿真的主循環
sim_time = 0 # 把 sim_time 歸0
while sim_time < end_time:
if self.events.empty(): # 事件所有完成後退出循環
print('*** end of event ***')
break
current_event = self.events.get() # 獲取優先級最高(time 屬性最小)的事件
sim_time, proc_id, previous_action = current_event # 更新 sim_time
print('taxi:', proc_id, proc_id * ' ', current_event)
active_proc = self.procs[proc_id] # 從self.procs 字典中獲取表示當前活動的出租車協程
next_time = sim_time + compute_duration(previous_action)
try:
next_event = active_proc.send(next_time) # 把計算獲得的時間發送給出租車協程。協程會產出下一個事件,或者拋出 StopIteration
except StopIteration:
del self.procs[proc_id] # 若是有異常 表示已經退出, 刪除這個協程
else:
self.events.put(next_event) # 若是沒有異常,把next_event 加入到隊列
else: # 若是超時 則走到這裏
msg = '*** end of simulation time: {} event pendding ***'
print(msg.format(self.events.qsize()))


def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS, seed=None):
"""初始化隨機生成器,構建過程,運行仿真程序"""
if seed is not None:
random.seed(seed) # 獲取可復現的結果
# 構建taxis 字典。值是三個參數不一樣的生成器對象。
taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERAVAL)
for i in range(num_taxis)}
sim = Simulator(taxis)
sim.run(end_time)


if __name__ == '__main__':
# parser = argparse.ArgumentParser(description='Taxi fleet simulator.')
# parser.add_argument('-e', '--end-time', type=int,
# default=DEFAULT_END_TIME,
# help='simulation end time; default=%s' % DEFAULT_END_TIME)
# parser.add_argument('-t', '--taxis', type=int,
# default=DEFAULT_NUMBER_OF_TAXIS,
# help='number of taxis running; default = %s' % DEFAULT_NUMBER_OF_TAXIS)
# parser.add_argument('-s', '--seed', type=int, default=None,
# help='random generator seed (for testing)')
#
# args = parser.parse_args()
main()

運行程序

# -s 3 參數設置隨機生成器的種子,以便調試的時候隨機數不變,輸出相同的結果 python taxi_sim.py -s 3 

輸出結果以下圖

從結果咱們能夠看出,3輛出租車的行程是交叉進行的。不一樣顏色的箭頭表明不一樣出租車從乘客上車到乘客下車的跨度。

從結果能夠看出:

  • 出租車每5隔分鐘從車庫出發
  • 0 號出租車2分鐘後拉到乘客(time=2),1號出租車3分鐘後拉到乘客(time=8),2號出租車5分鐘後拉到乘客(time=15)
  • 0 號出租車拉了兩個乘客
  • 1 號出租車拉了4個乘客
  • 2 號出租車拉了6個乘客
  • 在這次示中,全部排定的事件都在默認的仿真時間內完成

咱們先在控制檯中調用taxi_process 函數,本身駕駛一輛出租車,示例以下:

In [1]: from taxi_sim import taxi_process # 建立一個生成器,表示一輛出租車 編號是13 從t=0 開始,有兩次行程 In [2]: taxi = taxi_process(ident=13, trips=2, start_time=0) In [3]: next(taxi) # 預激協程 Out[3]: Event(time=0, proc=13, action='leave garage') # 發送當前時間 在控制檯中,變量_綁定的是前一個結果 # _.time + 7 是 0 + 7 In [4]: taxi.send(_.time+7) Out[4]: Event(time=7, proc=13, action='pick up passenger') # 這個事件有for循環在第一個行程的開頭產出 # 發送_.time+12 表示這個乘客用時12分鐘 In [5]: taxi.send(_.time+12) Out[5]: Event(time=19, proc=13, action='drop off passenger') # 徘徊了29 分鐘 In [6]: taxi.send(_.time+29) Out[6]: Event(time=48, proc=13, action='pick up passenger') # 乘坐了50分鐘 In [7]: taxi.send(_.time+50) Out[7]: Event(time=98, proc=13, action='drop off passenger') # 兩次行程結束 for 循環結束產出'going home' In [8]: taxi.send(_.time+5) Out[8]: Event(time=103, proc=13, action='going home') # 再發送值,會執行到末尾 協程返回後 拋出 StopIteration 異常 In [9]: taxi.send(_.time+10) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-9-d775cc8cc079> in <module>() ----> 1 taxi.send(_.time+10) StopIteration: 

在這個示例中,咱們用控制檯模擬仿真主循環。從taxi協程中產出的Event實例中獲取 .time 屬性,隨意加一個數,而後調用send()方法發送兩數之和,從新激活協程。

在taxi_sim.py 代碼中,出租車協程由 Simulator.run 方法中的主循環驅動。

Simulator 類的主要數據結構以下:

self.events

PriorityQueue 對象,保存Event實例。元素能夠放進PriorityQueue對象中,而後按 item[0](對象的time 屬性)依序取出(按從小到大)。 

self.procs

一個字典,把出租車的編號映射到仿真過程的進程(表示出租車生成器的對象)。這個屬性會綁定前面所示的taxis字典副本。

優先隊列是離散事件仿真系統的基礎構件:建立事件的順序不定,放入這種隊列後,能夠按各個事件排定的順序取出。

好比,咱們把兩個事件放入隊列:

Event(time=14, proc=0, action='pick up passenger') Event(time=10, proc=1, action='pick up passenger') 

這個意思是 0號出租車14分拉到一個乘客,1號出租車10分拉到一個乘客。可是主循環獲取的第一個事件將是

Event(time=10, proc=1, action=‘pick up passenger’)

下面咱們分析一下仿真系統的主算法–Simulator.run 方法。

  1. 迭表明示各輛出租車的進程
    • 在各輛出租車上調用next()函數,預激協程。
    • 把各個事件放入Simulator類的self.events屬性中。
  2. 知足 sim_time < end_time 條件是,運行仿真系統的主循環。
    • 檢查self.events 屬性是否爲空;若是爲空,跳出循環
    • 從self.events 中獲取當前事件
    • 顯示獲取的Event對象
    • 獲取curent_event 的time 屬性,更新仿真時間
    • 把時間發送給current_event 的pro屬性標識的協程,產出下一個事件
    • 把next_event 添加到self.events 隊列中,排定 next_event

咱們代碼中 while 循環有一個else 語句,仿真系統到達結束時間後,代碼會執行else中的語句。

這個示例主要是想說明如何在一個主循環中處理事件,以及如何經過發送數據驅動協程,同時解釋瞭如何使用生成器代替線程和回調,實現併發。

本文參考http://blog.gusibi.com/post/python-coroutine-yield-from/

相關文章
相關標籤/搜索