啊,終於要把這一個系列寫完整了,好高興啊編程
在前面的三篇文章中介紹了Python的Python的Generator和coroutine(協程)相關的編程技術,接下來這篇文章會用Python的coroutine技術實現一個簡單的多任務的操做系統app
代碼以下,可看註釋異步
1 #-*-coding:utf-8 -*- 2 ''' 3 用Python和coroutine實現一個簡單的多任務系統 4 ''' 5 # ##Step 1:Define Tasks################################### 6 import select 7 class Task(object): 8 taskid = 0 9 10 def __init__(self,target): 11 Task.taskid += 1 12 self.tid = Task.taskid # Task id 13 self.target = target # Target coroutine 14 self.sendval = None # Value to send 15 16 def run(self): 17 return self.target.send(self.sendval) 18 # ############################################### 19 20 # ##Step 2:The Scheduler######################### 21 import Queue 22 class Scheduler(object): 23 def __init__(self): 24 self.ready = Queue.Queue() 25 self.taskmap = {} 26 27 # 正在等待的Tasks,key是taskid 28 self.exit_waiting = {} 29 30 # 異步IO 31 # Holding areas for tasks blocking on I/O.These are 32 # dictionaries mapping file descriptions to tasks 33 # 鍵值爲文件描述符 34 self.read_waiting = {} 35 self.write_waiting = {} 36 37 38 def iotask(self): 39 while True: 40 if self.ready.empty(): 41 # 若是ready爲空,表示沒有正在等待執行的隊列 42 # timeout 爲None,表示不關心任何文件描述符的變化 43 self.iopool(None) 44 else: 45 # ready不爲空,則設置select函數無論文件描述符是否發生變化都當即返回 46 self.iopool(0) 47 yield 48 49 50 def new(self,target): 51 newtask = Task(target) 52 self.taskmap[newtask.tid] = newtask 53 self.schedule(newtask) 54 return newtask.tid 55 56 def schedule(self,task): 57 # 把task放到任務隊列中去 58 self.ready.put(task) 59 60 def exit(self,task): 61 print "Task %d terminated" %task.tid 62 del self.taskmap[task.tid] 63 # Notify other tasks waiting for exit 64 # 把正在等待的任務加入到正在執行的隊列中去 65 for task in self.exit_waiting.pop(task.tid,[]): 66 self.schedule(task) 67 68 def waitforexit(self,task,waittid): 69 ''' 70 讓一個任務等待另一個任務,把這個任務加入到exit_waiting中去 71 返回True表示這個task正在等待隊列中 72 ''' 73 if waittid in self.taskmap: 74 self.exit_waiting.setdefault(waittid,[]).append(task) 75 return True 76 else: 77 return False 78 79 80 def waitforread(self,task,fd): 81 ''' 82 functions that simply put a task into to 83 one of the above dictionaries 84 ''' 85 self.read_waiting[fd] = task 86 87 def waitforwrite(self,task,fd): 88 self.write_waiting[fd] = task 89 90 def iopool(self,timeout): 91 ''' 92 I/O Polling.Use select() to determine which file 93 descriptors can be used.Unblock any associated task 94 ''' 95 if self.read_waiting or self.write_waiting: 96 # 獲取I/O事件,一旦獲取到,就放入到執行隊列中取,等待執行 97 r,w,e = select.select(self.read_waiting, 98 self.write_waiting,[],timeout) 99 for fd in r: 100 self.schedule(self.read_waiting.pop(fd)) 101 102 for fd in w: 103 self.schedule(self.write_waiting.pop(fd)) 104 105 def mainloop(self): 106 self.new(self.iotask()) # Launch I/O polls 107 while self.taskmap: 108 task = self.ready.get() 109 try: 110 result = task.run() 111 # 若是task執行的是System call,則對當前環境進行保存 112 # 而後在執行System Call 113 if isinstance(result,SystemCall): 114 # 把當前的環境保存,即保存當前運行的task和sched 115 result.task = task 116 result.sched = self 117 result.handle() 118 continue 119 except StopIteration: 120 self.exit(task) 121 # print("task is over") 122 continue 123 self.schedule(task) 124 # ##Step 2:The Scheduler######################### 125 126 127 # ##SystemCall######################### 128 class SystemCall(object): 129 ''' 130 全部系統調用的基類,繼承自該類的類要重寫handle函數 131 ''' 132 def handle(self): 133 pass 134 135 136 class GetTid(SystemCall): 137 ''' 138 獲取任務ID 139 ''' 140 def handle(self): 141 self.task.sendval = self.task.tid 142 self.sched.schedule(self.task) 143 144 145 class NewTask(SystemCall): 146 ''' 147 新建一個Task 148 ''' 149 def __init__(self,target): 150 self.target = target 151 152 def handle(self): 153 # 在這裏把target封裝成Task 154 # 是在這裏把新生成的task加入到執行隊列當中去 155 tid = self.sched.new(self.target) 156 self.task.sendval = tid 157 # 把執行這個系統調用的父task從新加入到執行隊列中去 158 # 這一點很關鍵,由於判斷一個task是否結束是經過taskmap的 159 # 這個task只是暫時被掛起,要從新放到queue中去 160 self.sched.schedule(self.task) 161 162 class KillTask(SystemCall): 163 ''' 164 殺死一個Task 165 ''' 166 def __init__(self,tid): 167 self.tid = tid 168 169 def handle(self): 170 task = self.sched.taskmap.get(self.tid,None) 171 # task指的是要被kill掉的那個task 172 # self.task指的是發起KillTask這個系統調用task 173 if task: 174 task.target.close() 175 self.task.sendval = None 176 else: 177 self.task.sendval = False 178 # target.close()只是產生一個StopIteration異常 179 self.sched.schedule(self.task) 180 181 182 class WaitTask(SystemCall): 183 ''' 184 讓任務進行等待 系統調用 185 ''' 186 def __init__(self,tid): 187 self.tid = tid 188 189 def handle(self): 190 result = self.sched.waitforexit(self.task,self.tid) 191 self.task.sendval = result 192 # 若是等待的是一個不存在的task,則當即返回 193 if not result: 194 self.sched.schedule(self.task) 195 196 197 198 199 class ReadWait(SystemCall): 200 ''' 201 異步讀 系統調用 202 ''' 203 def __init__(self,f): 204 self.f = f 205 206 def handle(self): 207 fd = self.f.fileno() 208 self.sched.waitforread(self.task,fd) 209 210 class WriteWait(SystemCall): 211 ''' 212 異步寫 系統調用 213 ''' 214 def _init__(self,f): 215 self.f = f 216 217 def handle(self): 218 fd = self.f.fileno() 219 self.sched.waitforwrite(self.task,fd)