Python高級編程之生成器(Generator)與coroutine(四):一個簡單的多任務系統

啊,終於要把這一個系列寫完整了,好高興啊編程

在前面的三篇文章中介紹了Python的Python的Generator和coroutine(協程)相關的編程技術,接下來這篇文章會用Python的coroutine技術實現一個簡單的多任務的操做系統app

代碼以下,可看註釋異步

  1 #-*-coding:utf-8 -*-
  2 '''
  3 用Python和coroutine實現一個簡單的多任務系統
  4 '''
  5 # ##Step 1:Define Tasks###################################
  6 import select
  7 class Task(object):
  8     taskid = 0
  9 
 10     def __init__(self,target):
 11         Task.taskid += 1
 12         self.tid = Task.taskid    # Task id
 13         self.target = target      # Target coroutine
 14         self.sendval = None       # Value to send
 15 
 16     def run(self):
 17         return self.target.send(self.sendval)
 18 # ###############################################
 19 
 20 # ##Step 2:The Scheduler#########################
 21 import Queue
 22 class Scheduler(object):
 23     def __init__(self):
 24         self.ready = Queue.Queue()
 25         self.taskmap = {}
 26 
 27         # 正在等待的Tasks,key是taskid
 28         self.exit_waiting = {}
 29 
 30         # 異步IO
 31         # Holding areas for tasks blocking on I/O.These are
 32         # dictionaries mapping file descriptions to tasks
 33         # 鍵值爲文件描述符
 34         self.read_waiting = {}
 35         self.write_waiting = {}
 36 
 37 
 38     def iotask(self):
 39         while True:
 40             if self.ready.empty():
 41                 # 若是ready爲空,表示沒有正在等待執行的隊列
 42                 # timeout 爲None,表示不關心任何文件描述符的變化
 43                 self.iopool(None)
 44             else:
 45                 # ready不爲空,則設置select函數無論文件描述符是否發生變化都當即返回
 46                 self.iopool(0)
 47             yield
 48 
 49 
 50     def new(self,target):
 51         newtask = Task(target)
 52         self.taskmap[newtask.tid] = newtask
 53         self.schedule(newtask)
 54         return newtask.tid
 55 
 56     def schedule(self,task):
 57         # 把task放到任務隊列中去
 58         self.ready.put(task)
 59 
 60     def exit(self,task):
 61         print "Task %d terminated" %task.tid
 62         del self.taskmap[task.tid]
 63         # Notify other tasks waiting for exit
 64         # 把正在等待的任務加入到正在執行的隊列中去
 65         for task in self.exit_waiting.pop(task.tid,[]):
 66             self.schedule(task)
 67 
 68     def waitforexit(self,task,waittid):
 69         '''
 70         讓一個任務等待另一個任務,把這個任務加入到exit_waiting中去
 71         返回True表示這個task正在等待隊列中
 72         '''
 73         if waittid in self.taskmap:
 74             self.exit_waiting.setdefault(waittid,[]).append(task)
 75             return True
 76         else:
 77             return False
 78 
 79 
 80     def waitforread(self,task,fd):
 81         '''
 82         functions that simply put a task into to
 83         one of the above dictionaries
 84         '''
 85         self.read_waiting[fd] = task
 86 
 87     def waitforwrite(self,task,fd):
 88         self.write_waiting[fd] = task
 89 
 90     def iopool(self,timeout):
 91         '''
 92         I/O Polling.Use select() to determine which file
 93         descriptors can be used.Unblock any associated task
 94         '''
 95         if self.read_waiting or self.write_waiting:
 96             # 獲取I/O事件,一旦獲取到,就放入到執行隊列中取,等待執行
 97             r,w,e = select.select(self.read_waiting,
 98                                   self.write_waiting,[],timeout)
 99             for fd in r:
100                 self.schedule(self.read_waiting.pop(fd))
101 
102             for fd in w:
103                 self.schedule(self.write_waiting.pop(fd))
104 
105     def mainloop(self):
106         self.new(self.iotask())  # Launch I/O polls
107         while self.taskmap:
108             task = self.ready.get()
109             try:
110                 result = task.run()
111                 # 若是task執行的是System call,則對當前環境進行保存
112                 # 而後在執行System Call
113                 if isinstance(result,SystemCall):
114                     # 把當前的環境保存,即保存當前運行的task和sched
115                     result.task = task
116                     result.sched = self
117                     result.handle()
118                     continue
119             except StopIteration:
120                 self.exit(task)
121                 # print("task is over")
122                 continue
123             self.schedule(task)
124 # ##Step 2:The Scheduler#########################
125 
126 
127 # ##SystemCall#########################
128 class SystemCall(object):
129     '''
130     全部系統調用的基類,繼承自該類的類要重寫handle函數
131     '''
132     def handle(self):
133         pass
134 
135 
136 class GetTid(SystemCall):
137     '''
138     獲取任務ID
139     '''
140     def handle(self):
141         self.task.sendval = self.task.tid
142         self.sched.schedule(self.task)
143 
144 
145 class NewTask(SystemCall):
146     '''
147     新建一個Task
148     '''
149     def __init__(self,target):
150         self.target = target
151 
152     def handle(self):
153         # 在這裏把target封裝成Task
154         # 是在這裏把新生成的task加入到執行隊列當中去
155         tid = self.sched.new(self.target)
156         self.task.sendval = tid
157         # 把執行這個系統調用的父task從新加入到執行隊列中去
158         # 這一點很關鍵,由於判斷一個task是否結束是經過taskmap的
159         # 這個task只是暫時被掛起,要從新放到queue中去
160         self.sched.schedule(self.task)
161 
162 class KillTask(SystemCall):
163     '''
164     殺死一個Task
165     '''
166     def __init__(self,tid):
167         self.tid = tid
168 
169     def handle(self):
170         task = self.sched.taskmap.get(self.tid,None)
171         # task指的是要被kill掉的那個task
172         # self.task指的是發起KillTask這個系統調用task
173         if task:
174             task.target.close()
175             self.task.sendval = None
176         else:
177             self.task.sendval = False
178         # target.close()只是產生一個StopIteration異常
179         self.sched.schedule(self.task)
180 
181 
182 class WaitTask(SystemCall):
183     '''
184     讓任務進行等待 系統調用
185     '''
186     def __init__(self,tid):
187         self.tid = tid
188 
189     def handle(self):
190         result = self.sched.waitforexit(self.task,self.tid)
191         self.task.sendval = result
192         # 若是等待的是一個不存在的task,則當即返回
193         if not  result:
194             self.sched.schedule(self.task)
195 
196 
197 
198 
199 class ReadWait(SystemCall):
200     '''
201     異步讀 系統調用
202     '''
203     def __init__(self,f):
204         self.f = f
205 
206     def handle(self):
207         fd = self.f.fileno()
208         self.sched.waitforread(self.task,fd)
209 
210 class WriteWait(SystemCall):
211     '''
212     異步寫 系統調用
213     '''
214     def _init__(self,f):
215         self.f = f
216 
217     def handle(self):
218         fd = self.f.fileno()
219         self.sched.waitforwrite(self.task,fd)
相關文章
相關標籤/搜索