前兩篇咱們已經介紹了python 協程的使用和yield from 的原理,這一篇,咱們用一個例子來揭示如何使用協程在單線程中管理併發活動。。html
Wiki上的定義是:python
離散事件仿真將系統隨時間的變化抽象成一系列的離散時間點上的事件,經過按照事件時間順序處理事件來演進,是一種事件驅動的仿真世界觀。離散事件仿真將系統的變化看作一個事件,所以系統任何的變化都只能是經過處理相應的事件來實現,在兩個相鄰的事件之間,系統狀態維持前一個事件發生後的狀態不變。算法
人話說就是一種把系統建模成一系列事件的仿真系統。在離散事件仿真中,仿真「鍾」向前推動的量不是固定的,而是直接推動到下一個事件模型的模擬時間。編程
假設咱們抽象模擬出租車的運營過程,其中一個事件是乘客上車,下一個事件則是乘客下車。無論乘客作了5分鐘仍是50分鐘,一旦下車,仿真鍾就會更新,指向這次運營的結束時間。segmentfault
事件?是否是想到了協程!數據結構
協程剛好爲實現離散事件仿真提供了合理的抽象。多線程
第一門面向對象的語音 Simula 引入協程這個概念就是爲了支持仿真。
Simpy 是一個實現離散事件仿真的Python包,經過一個協程表示離散事件仿真系統的各個進程。併發
仿真程序會建立幾輛出租車,每輛出租車會拉幾個乘客,而後回家。出租車會首先駛離車庫,四處徘徊,尋找乘客;拉到乘客後,行程開始;乘客下車後,繼續四處徘徊。dom
徘徊和行程所用的時間使用指數分佈生成,咱們將時間設爲分鐘數,以便顯示清楚。ide
完整代碼以下:(taxi_sim.py)
#! -*- coding: utf-8 -*- import random import collections import queue import argparse DEFAULT_NUMBER_OF_TAXIS = 3 DEFAULT_END_TIME = 180 SEARCH_DURATION = 5 TRIP_DURATION = 20 DEPARTURE_INTERAVAL = 5 # time 是事件發生的仿真時間,proc 是出租車進程實例的編號,action是描述活動的字符串 Event = collections.namedtuple('Event', 'time proc action') # 開始 出租車進程 # 每輛出租車調用一次taxi_process 函數,建立一個生成器對象,表示各輛出租車的運營過程。 def taxi_process(ident, trips, start_time=0): ''' 每次狀態變化時向建立事件,把控制權交給仿真器 :param ident: 出租車編號 :param trips: 出租車回家前的行程數量 :param start_time: 離開車庫的時間 :return: ''' time = yield Event(start_time, ident, 'leave garage') # 產出的第一個Event for i in range(trips): # 每次行程都會執行一遍這個代碼塊 # 產出一個Event實例,表示拉到了乘客 協程在這裏暫停 等待下一次send() 激活 time = yield Event(time, ident, 'pick up passenger') # 產出一個Event實例,表示乘客下車 協程在這裏暫停 等待下一次send() 激活 time = yield Event(time, ident, 'drop off passenger') # 指定的行程數量完成後,for 循環結束,最後產出 'going home' 事件。協程最後一次暫停 yield Event(time, ident, 'going home') # 協程執行到最後 拋出StopIteration 異常 def compute_duration(previous_action): '''使用指數分佈計算操做的耗時''' if previous_action in ['leave garage', 'drop off passenger']: # 新狀態是四處徘徊 interval = SEARCH_DURATION elif previous_action == 'pick up passenger': # 新狀態是開始行程 interval = TRIP_DURATION elif previous_action == 'going home': interval = 1 else: raise ValueError('Unkonw previous_action: %s' % previous_action) return int(random.expovariate(1/interval)) + 1 # 開始仿真 class Simulator: def __init__(self, procs_map): self.events = queue.PriorityQueue() # 帶優先級的隊列 會按時間正向排序 self.procs = dict(procs_map) # 從獲取的procs_map 參數中建立本地副本,爲了避免修改用戶傳入的值 def run(self, end_time): ''' 調度並顯示事件,直到時間結束 :param end_time: 結束時間 只須要指定一個參數 :return: ''' # 調度各輛出租車的第一個事件 for iden, proc in sorted(self.procs.items()): first_event = next(proc) # 預激協程 併產出一個 Event 對象 self.events.put(first_event) # 把各個事件加到self.events 屬性表示的 PriorityQueue對象中 # 這次仿真的主循環 sim_time = 0 # 把 sim_time 歸0 while sim_time < end_time: if self.events.empty(): # 事件所有完成後退出循環 print('*** end of event ***') break current_event = self.events.get() # 獲取優先級最高(time 屬性最小)的事件 sim_time, proc_id, previous_action = current_event # 更新 sim_time print('taxi:', proc_id, proc_id * ' ', current_event) active_proc = self.procs[proc_id] # 從self.procs 字典中獲取表示當前活動的出租車協程 next_time = sim_time + compute_duration(previous_action) try: next_event = active_proc.send(next_time) # 把計算獲得的時間發送給出租車協程。協程會產出下一個事件,或者拋出 StopIteration except StopIteration: del self.procs[proc_id] # 若是有異常 表示已經退出, 刪除這個協程 else: self.events.put(next_event) # 若是沒有異常,把next_event 加入到隊列 else: # 若是超時 則走到這裏 msg = '*** end of simulation time: {} event pendding ***' print(msg.format(self.events.qsize())) def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS, seed=None): '''初始化隨機生成器,構建過程,運行仿真程序''' if seed is not None: random.seed(seed) # 獲取可復現的結果 # 構建taxis 字典。值是三個參數不一樣的生成器對象。 taxis = {i: taxi_process(i, (i + 1) * 2, i*DEPARTURE_INTERAVAL) for i in range(num_taxis)} sim = Simulator(taxis) sim.run(end_time) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Taxi fleet simulator.') parser.add_argument('-e', '--end-time', type=int, default=DEFAULT_END_TIME, help='simulation end time; default=%s' % DEFAULT_END_TIME) parser.add_argument('-t', '--taxis', type=int, default=DEFAULT_NUMBER_OF_TAXIS, help='number of taxis running; default = %s' % DEFAULT_NUMBER_OF_TAXIS) parser.add_argument('-s', '--seed', type=int, default=None, help='random generator seed (for testing)') args = parser.parse_args() main(args.end_time, args.taxis, args.seed)
運行程序,
# -s 3 參數設置隨機生成器的種子,以便調試的時候隨機數不變,輸出相同的結果 python taxi_sim.py -s 3
輸出結果以下圖
從結果咱們能夠看出,3輛出租車的行程是交叉進行的。不一樣顏色的箭頭表明不一樣出租車從乘客上車到乘客下車的跨度。
從結果能夠看出:
出租車每5隔分鐘從車庫出發
0 號出租車2分鐘後拉到乘客(time=2),1號出租車3分鐘後拉到乘客(time=8),2號出租車5分鐘後拉到乘客(time=15)
0 號出租車拉了兩個乘客
1 號出租車拉了4個乘客
2 號出租車拉了6個乘客
在這次示中,全部排定的事件都在默認的仿真時間內完成
咱們先在控制檯中調用taxi_process 函數,本身駕駛一輛出租車,示例以下:
In [1]: from taxi_sim import taxi_process # 建立一個生成器,表示一輛出租車 編號是13 從t=0 開始,有兩次行程 In [2]: taxi = taxi_process(ident=13, trips=2, start_time=0) In [3]: next(taxi) # 預激協程 Out[3]: Event(time=0, proc=13, action='leave garage') # 發送當前時間 在控制檯中,變量_綁定的是前一個結果 # _.time + 7 是 0 + 7 In [4]: taxi.send(_.time+7) Out[4]: Event(time=7, proc=13, action='pick up passenger') # 這個事件有for循環在第一個行程的開頭產出 # 發送_.time+12 表示這個乘客用時12分鐘 In [5]: taxi.send(_.time+12) Out[5]: Event(time=19, proc=13, action='drop off passenger') # 徘徊了29 分鐘 In [6]: taxi.send(_.time+29) Out[6]: Event(time=48, proc=13, action='pick up passenger') # 乘坐了50分鐘 In [7]: taxi.send(_.time+50) Out[7]: Event(time=98, proc=13, action='drop off passenger') # 兩次行程結束 for 循環結束產出'going home' In [8]: taxi.send(_.time+5) Out[8]: Event(time=103, proc=13, action='going home') # 再發送值,會執行到末尾 協程返回後 拋出 StopIteration 異常 In [9]: taxi.send(_.time+10) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-9-d775cc8cc079> in <module>() ----> 1 taxi.send(_.time+10) StopIteration:
在這個示例中,咱們用控制檯模擬仿真主循環。從taxi協程中產出的Event實例中獲取 .time 屬性,隨意加一個數,而後調用send()方法發送兩數之和,從新激活協程。
在taxi_sim.py 代碼中,出租車協程由 Simulator.run 方法中的主循環驅動。
Simulator 類的主要數據結構以下:
self.events
PriorityQueue 對象,保存Event實例。元素能夠放進PriorityQueue對象中,而後按 item[0](對象的time 屬性)依序取出(按從小到大)。
self.procs
一個字典,把出租車的編號映射到仿真過程的進程(表示出租車生成器的對象)。這個屬性會綁定前面所示的taxis字典副本。
優先隊列是離散事件仿真系統的基礎構件:建立事件的順序不定,放入這種隊列後,能夠按各個事件排定的順序取出。
好比,咱們把兩個事件放入隊列:
Event(time=14, proc=0, action='pick up passenger') Event(time=10, proc=1, action='pick up passenger')
這個意思是 0號出租車14分拉到一個乘客,1號出租車10分拉到一個乘客。可是主循環獲取的第一個事件將是
Event(time=10, proc=1, action='pick up passenger')
下面咱們分析一下仿真系統的主算法--Simulator.run 方法。
迭表明示各輛出租車的進程
在各輛出租車上調用next()函數,預激協程。
把各個事件放入Simulator類的self.events屬性中。
知足 sim_time < end_time 條件是,運行仿真系統的主循環。
檢查self.events 屬性是否爲空;若是爲空,跳出循環
從self.events 中獲取當前事件
顯示獲取的Event對象
獲取curent_event 的time 屬性,更新仿真時間
把時間發送給current_event 的pro屬性標識的協程,產出下一個事件
把next_event 添加到self.events 隊列中,排定 next_event
咱們代碼中 while 循環有一個else 語句,仿真系統到達結束時間後,代碼會執行else中的語句。
這個示例主要是想說明如何在一個主循環中處理事件,以及如何經過發送數據驅動協程,同時解釋瞭如何使用生成器代替線程和回調,實現併發。
併發: 多個任務交替執行
並行: 多個任務同時執行
到這裏 Python協程系列的三篇文章就結束了。
咱們會看到,協程作面向事件編程時,會不斷把控制權讓步給主循環,激活並向前運行其餘協程,從而執行各個併發活動。
協程一種協做式多任務:協程顯式自主的把控制權讓步給中央調度程序。
多線程實現的是搶佔式多任務。調度程序能夠在任什麼時候刻暫停線程,把控制權交給其餘線程
再次說明一下,這幾篇是《流暢的python》一書的讀書筆記,做者提供了大量的擴展閱讀,有興趣的能夠看一下。
BinaryTree類、一個簡單的XML解析器、和一個任務調度器Proposal for a yield from statement for Python
最後,感謝女友支持。
>歡迎關注 | >請我喝芬達 |
---|---|