Python 線程 |
Threading是用於提供線程相關的操做,線程是應用程序中工做的最小單元。線程與進程的關係下圖所示:html
子線程是由主線程產生的,但二者並無關聯。java
利用threading建立線程:python
1 '''利用threading包建立''' 2 import threading 3 import time 4 5 def run(n): 6 time.sleep(2) 7 print("task:",n) 8 9 '''串行:一個運行完後,再運行另一個''' 10 run("t1") #並非線程,只是調用方法傳參數 11 run("t2") 12 13 '''併發性''' 14 t1 = threading.Thread(target=run,args=("T1",)) #t1是線程,args爲元組 15 t2 = threading.Thread(target=run,args=("T2",)) 16 t1.start() #併發性地工做 17 t2.start() 18 19 20 '''運行結果''' 21 task: t1 #t1運行後會間隔兩秒,而後運行t2 22 task: t2 23 24 task: T2 #T1,T2同時運行 25 task: T1
上述建立了兩個線程t1和t2,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。算法
更多方法:bootstrap
1 #子線程是由主線程產生的,但二者並無關聯 2 import threading 3 import time 4 5 def run(n): 6 print("task:",n) 7 time.sleep(0.1) 8 print("taskdone:",n) 9 10 Start_time = time.time() 11 for i in range(50): #共有51個線程,代碼自己是一個主線程 12 t = threading.Thread(target=run,args=("t--%s" % i,)) 13 t.start() 14 t.join() #join使得主線程與子線程成串行運行 15 16 print(time.time()-Start_time) #print爲建立子線程所產生的時間,而非運行時間
1 import threading 2 import time 3 4 class My_Thread(threading.Thread): 5 def __init__(self,n): 6 super(My_Thread,self).__init__() 7 self.n = n 8 9 def run(self): 10 print("task:",self.n) 11 time.sleep(0.1) 12 t_obj=[] 13 start_time = time.time() 14 for i in range(50): #共有51個線程,代碼自己是一個主線程 15 t = My_Thread("t--%s" % i) 16 t.setDaemon(True) #監聽端口,當主程序執行完畢,將不會執行其餘線程(前提是去掉join方法) 17 t.start() 18 t_obj.append(t) 19 print(time.time()-start_time) 20 21 22 '''運行結果''' 23 task: t--0 24 task: t--1 25 task: t--2 26 task: t--3 27 task: t--4 28 task: t--5 29 task: t--6 30 task: t--7 31 task: t--8 32 task: t--9 33 task: t--10 34 task: t--11 35 task: t--12 36 task: t--13 37 task: t--14 38 task: t--15 39 task: t--16 40 task: t--17 41 task: t--18 42 task: t--19 43 task: t--20 44 task: t--21 45 task: t--22 46 task: t--23 47 task: t--24 48 task: t--25 49 task: t--26 50 task: t--27 51 task: t--28 52 task: t--29 53 task: t--30 54 task: t--31 55 task: t--32 56 task: t--33 57 task: t--34 58 task: t--35 59 task: t--36 60 task: t--37 61 task: t--38 62 task: t--39 63 task: t--40 64 task: t--41 65 task: t--42 66 task: t--43 67 task: t--44 68 task: t--45 69 task: t--46 70 task: t--47 71 task: t--48 72 task: t--49 73 0.01196908950805664
線程鎖(Lock):api
1 def acquire(self, blocking=True, timeout=None): 2 """Acquire a semaphore, decrementing the internal counter by one. 3 When invoked without arguments: if the internal counter is larger than 4 zero on entry, decrement it by one and return immediately. If it is zero 5 on entry, block, waiting until some other thread has called release() to 6 make it larger than zero. This is done with proper interlocking so that 7 if multiple acquire() calls are blocked, release() will wake exactly one 8 of them up. The implementation may pick one at random, so the order in 9 which blocked threads are awakened should not be relied on. There is no 10 return value in this case. 11 When invoked with blocking set to true, do the same thing as when called 12 without arguments, and return true. 13 When invoked with blocking set to false, do not block. If a call without 14 an argument would block, return false immediately; otherwise, do the 15 same thing as when called without arguments, and return true. 16 When invoked with a timeout other than None, it will block for at 17 most timeout seconds. If acquire does not complete successfully in 18 that interval, return false. Return true otherwise. 19 """ 20 #得到一個信號量,將內部計數器減1。在沒有參數的狀況下調用時:若是內部計數器在入口時 21 # 大於0,則將其遞減1並當即返回。若是進入時爲零,阻塞,等待其餘線程調用release() 22 # 使其大於零。這是經過適當的聯鎖完成的,這樣,若是多個acquire()調用被阻塞, 23 # release()就會喚醒其中一個調用。實現能夠隨機選擇一個線程,所以不該該依賴於被阻塞 24 # 線程被喚醒的順序。在本例中沒有返回值。當阻塞集調用爲true時,執行與沒有參數調用 25 # 時相同的操做,並返回true。當阻塞設置爲false時,不要阻塞。若是一個沒有參數的 26 # 調用將阻塞,當即返回false;不然,執行與沒有參數調用時相同的操做,並返回true。 27 # 當使用除None之外的超時調用時,它最多將阻塞超時秒。若是在那段時間裏收購沒有成功 28 # 完成,還假。不然返回true。 29 if not blocking and timeout is not None: 30 raise ValueError("can't specify timeout for non-blocking acquire") 31 rc = False 32 endtime = None 33 with self._cond: 34 while self._value == 0: 35 if not blocking: 36 break 37 if timeout is not None: 38 if endtime is None: 39 endtime = _time() + timeout 40 else: 41 timeout = endtime - _time() 42 if timeout <= 0: 43 break 44 self._cond.wait(timeout) 45 else: 46 self._value -= 1 47 rc = True 48 return rc 49 50 __enter__ = acquire 51 52 def release(self): 53 """Release a semaphore, incrementing the internal counter by one. 54 When the counter is zero on entry and another thread is waiting for it 55 to become larger than zero again, wake up that thread. 56 """ 57 #釋放信號量,增長一個內部計數器。當進入時計數器爲零,而另外一個線程正在等待計數器 58 # 再次大於零時,喚醒該線程。 59 with self._cond: 60 self._value += 1 61 self._cond.notify() 62 63 def __exit__(self, t, v, tb): 64 self.release()
1 import threading 2 import time 3 4 lock = threading.Lock() #線程鎖 5 6 def run(n): 7 lock.acquire() #鎖定 8 global num 9 num+=1 10 lock.release() #釋放鎖 11 time.sleep(1) 12 13 t_obj = [] 14 num = 0 15 for i in range(50): 16 t = threading.Thread(target=run,args=("t--%s" % i,)) 17 t.start() 18 t_obj.append(t) 19 20 for i in t_obj: 21 i.join() 22 23 print("num:",num) 24 25 26 '''運行結果''' 27 num: 50
'''可用來作測試''' if __name__ == "__main__" #表示函數的開始位置,判斷自主運行與否
線程池(信號量(semaphore)):緩存
信號量管理一個計數器,該計數器表示release()調用的數量減去acquire()調用的數量,再加上一個初始值。acquire()方法若是有必要會阻塞,直到它能夠返回而不會使計數器變爲負數爲止。若是未指定,值默認爲1。安全
'''信號量''' import threading import time def run(n): Semaphore.acquire() print("task:",n) time.sleep(1) Semaphore.release() if __name__ == "__main__": Semaphore = threading.BoundedSemaphore(5) #每五個子進程運行一次,間隔一秒後,再運行下五個 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count()!=1: pass else: print("--all threading has done")
1 """Thread module emulating a subset of Java's threading model.""" 2 #線程模塊模擬Java線程模型的一個子集。 3 import os as _os 4 import sys as _sys 5 import _thread 6 7 from time import monotonic as _time 8 from traceback import format_exc as _format_exc 9 from _weakrefset import WeakSet 10 from itertools import islice as _islice, count as _count 11 try: 12 from _collections import deque as _deque 13 except ImportError: 14 from collections import deque as _deque 15 16 # Note regarding PEP 8 compliant names 17 # This threading model was originally inspired by Java, and inherited 18 # the convention of camelCase function and method names from that 19 # language. Those original names are not in any imminent danger of 20 # being deprecated (even for Py3k),so this module provides them as an 21 # alias for the PEP 8 compliant names 22 # Note that using the new PEP 8 compliant names facilitates substitution 23 # with the multiprocessing module, which doesn't provide the old 24 # Java inspired names. 25 26 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 27 'enumerate', 'main_thread', 'TIMEOUT_MAX', 28 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 29 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 30 'setprofile', 'settrace', 'local', 'stack_size'] 31 32 # Rename some stuff so "from threading import *" is safe 33 _start_new_thread = _thread.start_new_thread 34 _allocate_lock = _thread.allocate_lock 35 _set_sentinel = _thread._set_sentinel 36 get_ident = _thread.get_ident 37 ThreadError = _thread.error 38 try: 39 _CRLock = _thread.RLock 40 except AttributeError: 41 _CRLock = None 42 TIMEOUT_MAX = _thread.TIMEOUT_MAX 43 del _thread 44 45 46 # Support for profile and trace hooks 47 #支持配置文件和跟蹤掛鉤 48 49 _profile_hook = None 50 _trace_hook = None 51 52 def setprofile(func): 53 """Set a profile function for all threads started from the threading module. 54 The func will be passed to sys.setprofile() for each thread, before its 55 run() method is called. 56 """ 57 #爲從線程模塊啓動的全部線程設置一個配置文件函數。在調用其run()方法以前, 58 # func將被傳遞給每一個線程的sys.setprofile()。 59 60 global _profile_hook 61 _profile_hook = func 62 63 def settrace(func): 64 """Set a trace function for all threads started from the threading module. 65 The func will be passed to sys.settrace() for each thread, before its run() 66 method is called. 67 """ 68 #爲從線程模塊啓動的全部線程設置跟蹤函數。在調用其run()方法以前, 69 # func將被傳遞給每一個線程的sys.settrace()。 70 71 global _trace_hook 72 _trace_hook = func 73 74 # Synchronization classes 75 # 同步類 76 77 Lock = _allocate_lock 78 79 def RLock(*args, **kwargs): 80 """Factory function that returns a new reentrant lock. 81 A reentrant lock must be released by the thread that acquired it. Once a 82 thread has acquired a reentrant lock, the same thread may acquire it again 83 without blocking; the thread must release it once for each time it has 84 acquired it. 85 """ 86 #返回一個新的可重入鎖的工廠函數。可重入鎖必須由得到它的線程釋放。 87 # 一旦一個線程得到了可重入鎖,該線程能夠在不阻塞的狀況下再次得到該鎖; 88 # 線程每次得到它時都必須釋放它一次。 89 90 if _CRLock is None: 91 return _PyRLock(*args, **kwargs) 92 return _CRLock(*args, **kwargs) 93 94 class _RLock: 95 """This class implements reentrant lock objects. 96 A reentrant lock must be released by the thread that acquired it. Once a 97 thread has acquired a reentrant lock, the same thread may acquire it 98 again without blocking; the thread must release it once for each time it 99 has acquired it. 100 """ 101 #該類實現可重入鎖對象。可重入鎖必須由得到它的線程釋放。一旦一個線程得到了可重入鎖, 102 # 該線程能夠在不阻塞的狀況下再次得到該鎖;線程每次得到它時都必須釋放它一次。 103 104 def __init__(self): 105 self._block = _allocate_lock() 106 self._owner = None 107 self._count = 0 108 109 def __repr__(self): 110 owner = self._owner 111 try: 112 owner = _active[owner].name 113 except KeyError: 114 pass 115 return "<%s %s.%s object owner=%r count=%d at %s>" % ( 116 "locked" if self._block.locked() else "unlocked", 117 self.__class__.__module__, 118 self.__class__.__qualname__, 119 owner, 120 self._count, 121 hex(id(self)) 122 ) 123 124 def acquire(self, blocking=True, timeout=-1): 125 """Acquire a lock, blocking or non-blocking. 126 When invoked without arguments: if this thread already owns the lock, 127 increment the recursion level by one, and return immediately. Otherwise, 128 if another thread owns the lock, block until the lock is unlocked. Once 129 the lock is unlocked (not owned by any thread), then grab ownership, set 130 the recursion level to one, and return. If more than one thread is 131 blocked waiting until the lock is unlocked, only one at a time will be 132 able to grab ownership of the lock. There is no return value in this 133 case. 134 When invoked with the blocking argument set to true, do the same thing 135 as when called without arguments, and return true. 136 When invoked with the blocking argument set to false, do not block. If a 137 call without an argument would block, return false immediately; 138 otherwise, do the same thing as when called without arguments, and 139 return true. 140 When invoked with the floating-point timeout argument set to a positive 141 value, block for at most the number of seconds specified by timeout 142 and as long as the lock cannot be acquired. Return true if the lock has 143 been acquired, false if the timeout has elapsed. 144 """ 145 #得到一個鎖,阻塞或非阻塞。在沒有參數的狀況下調用時:若是這個線程已經擁有鎖, 146 # 那麼將遞歸級別增長1,並當即返回。不然,若是另外一個線程擁有鎖, 147 # 則阻塞直到鎖被解鎖。一旦鎖被解鎖(不屬於任何線程),而後獲取全部權, 148 # 將遞歸級別設置爲1,而後返回。若是有多個線程被阻塞,等待鎖被解鎖, 149 # 每次只有一個線程可以獲取鎖的全部權。在本例中沒有返回值。當阻塞參數設置 150 # 爲true時,執行與沒有參數時相同的操做,並返回true。當阻塞參數設置爲false時, 151 # 不要阻塞。若是一個沒有參數的調用將阻塞,當即返回false;不然,執行與沒有 152 # 參數調用時相同的操做,並返回true。當將浮點超時參數設置爲正值時,若是得到 153 # 了鎖,則最多阻塞超時指定的秒數,若是超時已過,則返回true;若是超時已過,則返回false。 154 155 me = get_ident() 156 if self._owner == me: 157 self._count += 1 158 return 1 159 rc = self._block.acquire(blocking, timeout) 160 if rc: 161 self._owner = me 162 self._count = 1 163 return rc 164 165 __enter__ = acquire 166 167 def release(self): 168 """Release a lock, decrementing the recursion level. 169 If after the decrement it is zero, reset the lock to unlocked (not owned 170 by any thread), and if any other threads are blocked waiting for the 171 lock to become unlocked, allow exactly one of them to proceed. If after 172 the decrement the recursion level is still nonzero, the lock remains 173 locked and owned by the calling thread. 174 Only call this method when the calling thread owns the lock. A 175 RuntimeError is raised if this method is called when the lock is 176 unlocked. 177 There is no return value. 178 """ 179 #釋放鎖,下降遞歸級別。若是減量後爲零,則將鎖重置爲解鎖(不屬於任何線程), 180 # 若是任何其餘線程被阻塞,等待鎖解鎖,則只容許其中一個線程繼續執行。若是在遞減 181 # 以後遞歸級別仍然是非零,則鎖仍然被鎖定,而且由調用線程擁有。只有當調用線程擁有 182 # 鎖時才調用此方法。若是在解鎖鎖時調用此方法,將引起運行時錯誤。沒有返回值。 183 184 if self._owner != get_ident(): 185 raise RuntimeError("cannot release un-acquired lock") 186 self._count = count = self._count - 1 187 if not count: 188 self._owner = None 189 self._block.release() 190 191 def __exit__(self, t, v, tb): 192 self.release() 193 194 # Internal methods used by condition variables 195 #條件變量使用的內部方法 196 197 def _acquire_restore(self, state): 198 self._block.acquire() 199 self._count, self._owner = state 200 201 def _release_save(self): 202 if self._count == 0: 203 raise RuntimeError("cannot release un-acquired lock") 204 count = self._count 205 self._count = 0 206 owner = self._owner 207 self._owner = None 208 self._block.release() 209 return (count, owner) 210 211 def _is_owned(self): 212 return self._owner == get_ident() 213 214 _PyRLock = _RLock 215 216 217 class Condition: 218 """Class that implements a condition variable. 219 A condition variable allows one or more threads to wait until they are 220 notified by another thread. 221 If the lock argument is given and not None, it must be a Lock or RLock 222 object, and it is used as the underlying lock. Otherwise, a new RLock object 223 is created and used as the underlying lock. 224 """ 225 #實現條件變量的類。條件變量容許一個或多個線程等待,直到另外一個線程通知它們。 226 # 若是鎖參數是給定的而不是空的,那麼它必須是一個鎖或RLock對象,而且它被用做底層鎖。 227 # 不然,將建立一個新的RLock對象並將其用做底層鎖。 228 229 def __init__(self, lock=None): 230 if lock is None: 231 lock = RLock() 232 self._lock = lock 233 # Export the lock's acquire() and release() methods 234 #導出鎖的acquire()和release()方法 235 self.acquire = lock.acquire 236 self.release = lock.release 237 # If the lock defines _release_save() and/or _acquire_restore(), 238 # these override the default implementations (which just call 239 # release() and acquire() on the lock). Ditto for _is_owned(). 240 #若是鎖定義了_release_save()和/或_acquire_restore(),就會覆蓋默認的實現 241 # (它只調用release()和acquire()對鎖進行訪問)。_is_owned同上()。 242 try: 243 self._release_save = lock._release_save 244 except AttributeError: 245 pass 246 try: 247 self._acquire_restore = lock._acquire_restore 248 except AttributeError: 249 pass 250 try: 251 self._is_owned = lock._is_owned 252 except AttributeError: 253 pass 254 self._waiters = _deque() 255 256 def __enter__(self): 257 return self._lock.__enter__() 258 259 def __exit__(self, *args): 260 return self._lock.__exit__(*args) 261 262 def __repr__(self): 263 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters)) 264 265 def _release_save(self): 266 self._lock.release() # No state to save 沒有狀態保存 267 268 def _acquire_restore(self, x): 269 self._lock.acquire() # Ignore saved state 忽略保存的狀態 270 271 def _is_owned(self): 272 # Return True if lock is owned by current_thread. 273 #若是鎖屬於current_thread,則返回True。 274 # This method is called only if _lock doesn't have _is_owned(). 275 #只有當_lock沒有_is_owned()時才調用該方法。 276 if self._lock.acquire(0): 277 self._lock.release() 278 return False 279 else: 280 return True 281 282 def wait(self, timeout=None): 283 """Wait until notified or until a timeout occurs. 284 If the calling thread has not acquired the lock when this method is 285 called, a RuntimeError is raised. 286 This method releases the underlying lock, and then blocks until it is 287 awakened by a notify() or notify_all() call for the same condition 288 variable in another thread, or until the optional timeout occurs. Once 289 awakened or timed out, it re-acquires the lock and returns. 290 When the timeout argument is present and not None, it should be a 291 floating point number specifying a timeout for the operation in seconds 292 (or fractions thereof). 293 When the underlying lock is an RLock, it is not released using its 294 release() method, since this may not actually unlock the lock when it 295 was acquired multiple times recursively. Instead, an internal interface 296 of the RLock class is used, which really unlocks it even when it has 297 been recursively acquired several times. Another internal interface is 298 then used to restore the recursion level when the lock is reacquired. 299 """ 300 #等待直到通知或超時發生。若是調用該方法時調用的線程沒有得到鎖,則會引起運行時錯誤。 301 # 該方法釋放底層鎖,而後阻塞,直到它被另外一個線程中的notify()或notify_all()調用 302 # 喚醒,或者直到出現可選超時爲止。一旦被喚醒或超時,它會從新得到鎖並返回。 303 # 當出現timeout參數而不是None時,它應該是一個浮點數,以秒(或幾分之一)爲單位指定 304 # 操做的超時。當底層鎖是RLock時,不會使用其release()方法釋放它,由於當遞歸地屢次 305 # 獲取鎖時,這可能不會真正解鎖它。相反,使用了RLock類的內部接口,即便遞歸地得到了 306 # 屢次,它也會真正地解鎖它。而後使用另外一個內部接口在從新得到鎖時恢復遞歸級別。 307 308 if not self._is_owned(): 309 raise RuntimeError("cannot wait on un-acquired lock") 310 waiter = _allocate_lock() 311 waiter.acquire() 312 self._waiters.append(waiter) 313 saved_state = self._release_save() 314 gotit = False 315 try: # restore state no matter what (e.g., KeyboardInterrupt) 316 #不管如何都要恢復狀態(例如,鍵盤中斷) 317 if timeout is None: 318 waiter.acquire() 319 gotit = True 320 else: 321 if timeout > 0: 322 gotit = waiter.acquire(True, timeout) 323 else: 324 gotit = waiter.acquire(False) 325 return gotit 326 finally: 327 self._acquire_restore(saved_state) 328 if not gotit: 329 try: 330 self._waiters.remove(waiter) 331 except ValueError: 332 pass 333 334 def wait_for(self, predicate, timeout=None): 335 """Wait until a condition evaluates to True. 336 predicate should be a callable which result will be interpreted as a 337 boolean value. A timeout may be provided giving the maximum time to 338 wait. 339 """ 340 #等待,直到條件的值爲True。謂詞應該是可調用的,其結果將被解釋爲布爾值。 341 # 可能會提供一個超時,以提供最長的等待時間。 342 343 endtime = None 344 waittime = timeout 345 result = predicate() 346 while not result: 347 if waittime is not None: 348 if endtime is None: 349 endtime = _time() + waittime 350 else: 351 waittime = endtime - _time() 352 if waittime <= 0: 353 break 354 self.wait(waittime) 355 result = predicate() 356 return result 357 358 def notify(self, n=1): 359 """Wake up one or more threads waiting on this condition, if any. 360 If the calling thread has not acquired the lock when this method is 361 called, a RuntimeError is raised. 362 This method wakes up at most n of the threads waiting for the condition 363 variable; it is a no-op if no threads are waiting. 364 """ 365 #喚醒在此條件下等待的一個或多個線程(若是有的話)。若是調用該方法時調用的線程沒有得到鎖, 366 # 則會引起運行時錯誤。該方法最多喚醒n個等待條件變量的線程;若是沒有線程在等待,那麼 367 # 這是一個no-op。 368 if not self._is_owned(): 369 raise RuntimeError("cannot notify on un-acquired lock") 370 all_waiters = self._waiters 371 waiters_to_notify = _deque(_islice(all_waiters, n)) 372 if not waiters_to_notify: 373 return 374 for waiter in waiters_to_notify: 375 waiter.release() 376 try: 377 all_waiters.remove(waiter) 378 except ValueError: 379 pass 380 381 def notify_all(self): 382 """Wake up all threads waiting on this condition. 383 If the calling thread has not acquired the lock when this method 384 is called, a RuntimeError is raised. 385 """ 386 #喚醒在此條件下等待的全部線程。若是調用該方法時調用的線程沒有得到鎖, 387 # 則會引起運行時錯誤。 388 self.notify(len(self._waiters)) 389 390 notifyAll = notify_all 391 392 393 class Semaphore: 394 """This class implements semaphore objects. 395 Semaphores manage a counter representing the number of release() calls minus 396 the number of acquire() calls, plus an initial value. The acquire() method 397 blocks if necessary until it can return without making the counter 398 negative. If not given, value defaults to 1. 399 """ 400 #這個類實現信號量對象。信號量管理一個計數器,該計數器表示release()調用的數量減去 401 # acquire()調用的數量,再加上一個初始值。acquire()方法若是有必要會阻塞,直到它能夠 402 # 返回而不會使計數器變爲負數爲止。若是未指定,值默認爲1。 403 404 # After Tim Peters' semaphore class, but not quite the same (no maximum) 405 #在Tim Peters的信號量類以後,但不徹底相同(沒有最大值) 406 407 def __init__(self, value=1): 408 if value < 0: 409 raise ValueError("semaphore initial value must be >= 0") 410 self._cond = Condition(Lock()) 411 self._value = value 412 413 def acquire(self, blocking=True, timeout=None): 414 """Acquire a semaphore, decrementing the internal counter by one. 415 When invoked without arguments: if the internal counter is larger than 416 zero on entry, decrement it by one and return immediately. If it is zero 417 on entry, block, waiting until some other thread has called release() to 418 make it larger than zero. This is done with proper interlocking so that 419 if multiple acquire() calls are blocked, release() will wake exactly one 420 of them up. The implementation may pick one at random, so the order in 421 which blocked threads are awakened should not be relied on. There is no 422 return value in this case. 423 When invoked with blocking set to true, do the same thing as when called 424 without arguments, and return true. 425 When invoked with blocking set to false, do not block. If a call without 426 an argument would block, return false immediately; otherwise, do the 427 same thing as when called without arguments, and return true. 428 When invoked with a timeout other than None, it will block for at 429 most timeout seconds. If acquire does not complete successfully in 430 that interval, return false. Return true otherwise. 431 """ 432 #得到一個信號量,將內部計數器減1。在沒有參數的狀況下調用時:若是內部計數器在入口時 433 # 大於0,則將其遞減1並當即返回。若是進入時爲零,阻塞,等待其餘線程調用release() 434 # 使其大於零。這是經過適當的聯鎖完成的,這樣,若是多個acquire()調用被阻塞, 435 # release()就會喚醒其中一個調用。實現能夠隨機選擇一個線程,所以不該該依賴於被阻塞 436 # 線程被喚醒的順序。在本例中沒有返回值。當阻塞集調用爲true時,執行與沒有參數調用 437 # 時相同的操做,並返回true。當阻塞設置爲false時,不要阻塞。若是一個沒有參數的 438 # 調用將阻塞,當即返回false;不然,執行與沒有參數調用時相同的操做,並返回true。 439 # 當使用除None之外的超時調用時,它最多將阻塞超時秒。若是在那段時間裏收購沒有成功 440 # 完成,還假。不然返回true。 441 if not blocking and timeout is not None: 442 raise ValueError("can't specify timeout for non-blocking acquire") 443 rc = False 444 endtime = None 445 with self._cond: 446 while self._value == 0: 447 if not blocking: 448 break 449 if timeout is not None: 450 if endtime is None: 451 endtime = _time() + timeout 452 else: 453 timeout = endtime - _time() 454 if timeout <= 0: 455 break 456 self._cond.wait(timeout) 457 else: 458 self._value -= 1 459 rc = True 460 return rc 461 462 __enter__ = acquire 463 464 def release(self): 465 """Release a semaphore, incrementing the internal counter by one. 466 When the counter is zero on entry and another thread is waiting for it 467 to become larger than zero again, wake up that thread. 468 """ 469 #釋放信號量,增長一個內部計數器。當進入時計數器爲零,而另外一個線程正在等待計數器 470 # 再次大於零時,喚醒該線程。 471 with self._cond: 472 self._value += 1 473 self._cond.notify() 474 475 def __exit__(self, t, v, tb): 476 self.release() 477 478 479 class BoundedSemaphore(Semaphore): 480 """Implements a bounded semaphore. 481 A bounded semaphore checks to make sure its current value doesn't exceed its 482 initial value. If it does, ValueError is raised. In most situations 483 semaphores are used to guard resources with limited capacity. 484 If the semaphore is released too many times it's a sign of a bug. If not 485 given, value defaults to 1. 486 Like regular semaphores, bounded semaphores manage a counter representing 487 the number of release() calls minus the number of acquire() calls, plus an 488 initial value. The acquire() method blocks if necessary until it can return 489 without making the counter negative. If not given, value defaults to 1. 490 """ 491 #實現有界信號量。有界信號量檢查其當前值是否不超過初始值。若是是,則會引起ValueError。 492 # 在大多數狀況下,信號量被用來保護有限容量的資源。若是信號量被釋放了太屢次,這是錯誤 493 # 的信號。若是未指定,值默認爲1。與常規信號量同樣,有界信號量管理一個計數器, 494 # 表示release()調用的數量減去acquire()調用的數量,再加上一個初始值。acquire()方法 495 # 若是有必要會阻塞,直到它能夠返回而不會使計數器變爲負數爲止。若是未指定,值默認爲1。 496 497 def __init__(self, value=1): 498 Semaphore.__init__(self, value) 499 self._initial_value = value 500 501 def release(self): 502 """Release a semaphore, incrementing the internal counter by one. 503 504 When the counter is zero on entry and another thread is waiting for it 505 to become larger than zero again, wake up that thread. 506 507 If the number of releases exceeds the number of acquires, 508 raise a ValueError. 509 """ 510 #釋放信號量,增長一個內部計數器。當進入時計數器爲0,而另外一個線程正在等待i再次 511 # 大於0時,喚醒那個線程。若是發佈的數量超過了得到的數量,則引起一個ValueError。 512 with self._cond: 513 if self._value >= self._initial_value: 514 raise ValueError("Semaphore released too many times") 515 self._value += 1 516 self._cond.notify() 517 518 519 class Event: 520 """Class implementing event objects. 521 522 Events manage a flag that can be set to true with the set() method and reset 523 to false with the clear() method. The wait() method blocks until the flag is 524 true. The flag is initially false. 525 """ 526 #類實現事件對象。事件管理的標誌能夠用set()方法設置爲true,用clear()方法重置爲false。 527 # wait()方法將阻塞,直到標記爲true。標誌最初是假的。 528 529 # After Tim Peters' event class (without is_posted()) 530 #在Tim Peters的事件類以後(沒有is_post ()) 531 532 def __init__(self): 533 self._cond = Condition(Lock()) 534 self._flag = False 535 536 def _reset_internal_locks(self): 537 # private! called by Thread._reset_internal_locks by _after_fork() 538 #私人!調用線程._reset_internal_locks _after_fork() 539 self._cond.__init__(Lock()) 540 541 def is_set(self): 542 """Return true if and only if the internal flag is true.""" 543 #當且僅當內部標誌爲true時返回true。 544 return self._flag 545 546 isSet = is_set 547 548 def set(self): 549 """Set the internal flag to true. 550 All threads waiting for it to become true are awakened. Threads 551 that call wait() once the flag is true will not block at all. 552 """ 553 #將內部標誌設置爲true。等待它成真的全部線程都被喚醒。一旦標誌爲true, 554 # 調用wait()的線程將不會阻塞。 555 with self._cond: 556 self._flag = True 557 self._cond.notify_all() 558 559 def clear(self): 560 """Reset the internal flag to false. 561 Subsequently, threads calling wait() will block until set() is called to 562 set the internal flag to true again. 563 """ 564 #將內部標誌重置爲false。隨後,調用wait()的線程將阻塞,直到調用set()將內部標誌再次設置爲true。 565 with self._cond: 566 self._flag = False 567 568 def wait(self, timeout=None): 569 """Block until the internal flag is true. 570 If the internal flag is true on entry, return immediately. Otherwise, 571 block until another thread calls set() to set the flag to true, or until 572 the optional timeout occurs. 573 When the timeout argument is present and not None, it should be a 574 floating point number specifying a timeout for the operation in seconds 575 (or fractions thereof). 576 This method returns the internal flag on exit, so it will always return 577 True except if a timeout is given and the operation times out. 578 """ 579 #阻塞,直到內部標誌爲true。若是進入時內部標誌爲true,則當即返回。不然,阻塞直到 580 # 另外一個線程調用set()將標誌設置爲true,或者直到出現可選超時。當出現timeout參數 581 # 而不是None時,它應該是一個浮點數,以秒(或幾分之一)爲單位指定操做的超時。這個 582 # 方法在退出時返回內部標誌,所以它老是返回True,除非超時和操做超時。 583 with self._cond: 584 signaled = self._flag 585 if not signaled: 586 signaled = self._cond.wait(timeout) 587 return signaled 588 589 590 # A barrier class. Inspired in part by the pthread_barrier_* api and 591 # the CyclicBarrier class from Java. See 592 '''一個障礙類。部分靈感來自於pthread_barrier_* api和來自Java的循環屏障類。看到''' 593 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and 594 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ 595 # CyclicBarrier.html 596 # for information. ##獲取信息 597 # We maintain two main states, 'filling' and 'draining' enabling the barrier 598 # to be cyclic. Threads are not allowed into it until it has fully drained 599 # since the previous cycle. In addition, a 'resetting' state exists which is 600 # similar to 'draining' except that threads leave with a BrokenBarrierError, 601 # and a 'broken' state in which all threads get the exception. 602 '''咱們維持兩種主要狀態,「填充」和「排水」,使屏障是循環的。線程不容許進入它,直到它從 603 上一個循環中徹底耗盡爲止。此外,存在一種「重置」狀態,相似於「耗盡」狀態,只是線程留下了 604 一個故障的barriererror錯誤,以及全部線程都獲得異常的「中斷」狀態。''' 605 class Barrier: 606 """Implements a Barrier. 607 Useful for synchronizing a fixed number of threads at known synchronization 608 points. Threads block on 'wait()' and are simultaneously once they have all 609 made that call. 610 """ 611 #實現了一個障礙。用於在已知同步點同步固定數量的線程。線程阻塞在'wait()'上, 612 # 而且一旦它們都進行了該調用,就會同時阻塞。 613 614 def __init__(self, parties, action=None, timeout=None): 615 """Create a barrier, initialised to 'parties' threads. 616 'action' is a callable which, when supplied, will be called by one of 617 the threads after they have all entered the barrier and just prior to 618 releasing them all. If a 'timeout' is provided, it is uses as the 619 default for all subsequent 'wait()' calls. 620 """ 621 #建立一個障礙,初始化爲「party」線程。「action」是一個可調用的線程,當它被提供時, 622 # 它將被其中一個線程在它們所有進入壁壘並釋放它們以前調用。若是提供了'timeout', 623 # 那麼它將用做全部後續'wait()'調用的默認值。 624 self._cond = Condition(Lock()) 625 self._action = action 626 self._timeout = timeout 627 self._parties = parties 628 self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken 629 self._count = 0 630 631 def wait(self, timeout=None): 632 """Wait for the barrier. 633 When the specified number of threads have started waiting, they are all 634 simultaneously awoken. If an 'action' was provided for the barrier, one 635 of the threads will have executed that callback prior to returning. 636 Returns an individual index number from 0 to 'parties-1'. 637 """ 638 #等待障礙。當指定數量的線程開始等待時,它們都同時被喚醒。若是爲barrier提供了一個 639 # 「操做」,其中一個線程將在返回以前執行該回調。返回從0到「parties-1」的單個索引號。 640 641 if timeout is None: 642 timeout = self._timeout 643 with self._cond: 644 self._enter() # Block while the barrier drains. 隔離牆排水時要進行隔離。 645 index = self._count 646 self._count += 1 647 try: 648 if index + 1 == self._parties: 649 # We release the barrier 650 self._release() 651 else: 652 # We wait until someone releases us 653 self._wait(timeout) 654 return index 655 finally: 656 self._count -= 1 657 # Wake up any threads waiting for barrier to drain. 658 #喚醒任何等待屏障耗盡的線程。 659 self._exit() 660 661 # Block until the barrier is ready for us, or raise an exception 662 # if it is broken. 663 #阻止,直到障礙爲咱們準備好,或提出一個例外,若是它被打破。 664 def _enter(self): 665 while self._state in (-1, 1): 666 # It is draining or resetting, wait until done正在排水或重置,等待完成 667 self._cond.wait() 668 #see if the barrier is in a broken state看看勢壘是否處於破碎狀態 669 if self._state < 0: 670 raise BrokenBarrierError 671 assert self._state == 0 672 673 # Optionally run the 'action' and release the threads waiting 674 # in the barrier. 675 #能夠選擇運行「action」,並釋放等待在barrier中的線程。 676 677 def _release(self): 678 try: 679 if self._action: 680 self._action() 681 # enter draining state 進入排水狀態 682 self._state = 1 683 self._cond.notify_all() 684 except: 685 #an exception during the _action handler. Break and reraise 686 #_action處理程序期間的異常。打破和reraise 687 self._break() 688 raise 689 690 # Wait in the barrier until we are released. Raise an exception 691 # if the barrier is reset or broken. 692 #在障礙物裏等着,直到咱們被釋放。若是障礙被重置或破壞,則引起異常。 693 def _wait(self, timeout): 694 if not self._cond.wait_for(lambda : self._state != 0, timeout): 695 #timed out. Break the barrier 696 self._break() 697 raise BrokenBarrierError 698 if self._state < 0: 699 raise BrokenBarrierError 700 assert self._state == 1 701 702 # If we are the last thread to exit the barrier, signal any threads 703 # # waiting for the barrier to drain. 704 #若是咱們是最後一個退出屏障的線程,那麼向等待屏障流出的線程發出信號。 705 def _exit(self): 706 if self._count == 0: 707 if self._state in (-1, 1): 708 #resetting or draining 709 self._state = 0 710 self._cond.notify_all() 711 712 def reset(self): 713 """Reset the barrier to the initial state. 714 Any threads currently waiting will get the BrokenBarrier exception 715 raised. 716 """ 717 #將勢壘重置爲初始狀態。當前等待的任何線程都將引起故障障礙異常。 718 with self._cond: 719 if self._count > 0: 720 if self._state == 0: 721 #reset the barrier, waking up threads 重置障礙,喚醒線程 722 self._state = -1 723 elif self._state == -2: 724 #was broken, set it to reset state 被破壞,設置爲重置狀態 725 #which clears when the last thread exits 最後一個線程退出時哪一個線程清除 726 self._state = -1 727 else: 728 self._state = 0 729 self._cond.notify_all() 730 731 def abort(self): 732 """Place the barrier into a 'broken' state. 733 Useful in case of error. Any currently waiting threads and threads 734 attempting to 'wait()' will have BrokenBarrierError raised. 735 """ 736 #將障礙設置爲「破碎」狀態。在發生錯誤時頗有用。任何當前正在等待的線程和 737 # 試圖「wait()」的線程都會出現故障障礙。 738 with self._cond: 739 self._break() 740 741 def _break(self): 742 # An internal error was detected. The barrier is set to 743 # a broken state all parties awakened. 744 #檢測到內部錯誤。障礙被設置爲一個破碎的國家,全部各方都覺醒了。 745 self._state = -2 746 self._cond.notify_all() 747 748 @property 749 def parties(self): 750 """Return the number of threads required to trip the barrier.""" 751 #返回跳閘所需的線程數。 752 return self._parties 753 754 @property 755 def n_waiting(self): 756 """Return the number of threads currently waiting at the barrier.""" 757 #返回阻塞處當前等待的線程數。 758 # We don't need synchronization here since this is an ephemeral result 759 # anyway. It returns the correct value in the steady state. 760 #咱們不須要同步,由於這是一個短暫的結果。它在穩定狀態下返回正確的值。 761 if self._state == 0: 762 return self._count 763 return 0 764 765 @property 766 def broken(self): 767 """Return True if the barrier is in a broken state.""" 768 #若是屏障處於破壞狀態,返回True。 769 return self._state == -2 770 771 # exception raised by the Barrier class 772 #由Barrier類引起的異常 773 class BrokenBarrierError(RuntimeError): 774 pass 775 776 777 # Helper to generate new thread names 778 #幫助程序生成新的線程名稱 779 _counter = _count().__next__ 780 _counter() # Consume 0 so first non-main thread has id 1. 781 #消耗0,因此第一個非主線程id爲1。 782 def _newname(template="Thread-%d"): 783 return template % _counter() 784 785 # Active thread administration #活動線程管理 786 _active_limbo_lock = _allocate_lock() 787 _active = {} # maps thread id to Thread object 將線程id映射到線程對象 788 _limbo = {} 789 _dangling = WeakSet() 790 791 # Main class for threads 792 '''線程的主類''' 793 794 class Thread: 795 """A class that represents a thread of control. 796 This class can be safely subclassed in a limited fashion. There are two ways 797 to specify the activity: by passing a callable object to the constructor, or 798 by overriding the run() method in a subclass. 799 """ 800 #表示控制線程的類。這個類能夠以有限的方式安全地子類化。有兩種方法能夠指定活動: 801 # 經過將可調用對象傳遞給構造函數,或者在子類中重寫run()方法。 802 803 _initialized = False 804 # Need to store a reference to sys.exc_info for printing 805 # out exceptions when a thread tries to use a global var. during interp. 806 # shutdown and thus raises an exception about trying to perform some 807 # operation on/with a NoneType 808 #須要存儲對sys的引用。exc_info用於在interp期間線程試圖使用全局變量時打印異常。 809 # 關閉,所以引起了一個異常,即試圖對/使用非etype執行某些操做 810 _exc_info = _sys.exc_info 811 # Keep sys.exc_clear too to clear the exception just before 812 # allowing .join() to return. 813 #Keep sys.ex_clear也能夠在allowing.join()返回以前清除異常。 814 #XXX __exc_clear = _sys.exc_clear 815 816 def __init__(self, group=None, target=None, name=None, 817 args=(), kwargs=None, *, daemon=None): 818 """This constructor should always be called with keyword arguments. Arguments are: 819 *group* should be None; reserved for future extension when a ThreadGroup 820 class is implemented. 821 *target* is the callable object to be invoked by the run() 822 method. Defaults to None, meaning nothing is called. 823 *name* is the thread name. By default, a unique name is constructed of 824 the form "Thread-N" where N is a small decimal number. 825 *args* is the argument tuple for the target invocation. Defaults to (). 826 *kwargs* is a dictionary of keyword arguments for the target 827 invocation. Defaults to {}. 828 If a subclass overrides the constructor, it must make sure to invoke 829 the base class constructor (Thread.__init__()) before doing anything 830 else to the thread. 831 """ 832 #這個構造函數應該老是使用關鍵字參數調用。論點是:*group*不該該是;在實現 833 # ThreadGroup類時爲未來的擴展保留。*target*是run()方法調用的可調用對象。 834 # 默認爲None,表示不調用任何東西。*name*是線程名。默認狀況下,惟一的名稱 835 # 是由「Thread-N」的形式構造的,其中N是一個小數。*args*是目標調用的參數元組。 836 # 默認爲()。*kwargs*是目標調用的關鍵字參數字典。默認爲{}。若是子類重寫構造 837 # 函數,它必須確保在對線程執行其餘操做以前調用基類構造函數(thread. __init__())。 838 839 assert group is None, "group argument must be None for now" 840 if kwargs is None: 841 kwargs = {} 842 self._target = target 843 self._name = str(name or _newname()) 844 self._args = args 845 self._kwargs = kwargs 846 if daemon is not None: 847 self._daemonic = daemon 848 else: 849 self._daemonic = current_thread().daemon 850 self._ident = None 851 self._tstate_lock = None 852 self._started = Event() 853 self._is_stopped = False 854 self._initialized = True 855 # sys.stderr is not stored in the class like 856 # sys.exc_info since it can be changed between instances 857 self._stderr = _sys.stderr 858 # For debugging and _after_fork() 859 _dangling.add(self) 860 861 def _reset_internal_locks(self, is_alive): 862 # private! Called by _after_fork() to reset our internal locks as 863 # they may be in an invalid state leading to a deadlock or crash. 864 #私人!由_after_fork()調用,以重置內部鎖,由於它們可能處於無效狀態,致使死鎖或崩潰。 865 self._started._reset_internal_locks() 866 if is_alive: 867 self._set_tstate_lock() 868 else: 869 # The thread isn't alive after fork: it doesn't have a tstate anymore. 870 #在fork以後,線程再也不是活的:它再也不有tstate。 871 self._is_stopped = True 872 self._tstate_lock = None 873 874 def __repr__(self): 875 assert self._initialized, "Thread.__init__() was not called" 876 status = "initial" 877 if self._started.is_set(): 878 status = "started" 879 self.is_alive() # easy way to get ._is_stopped set when appropriate 880 #在適當的狀況下,得到._is_stopped設置的簡單方法 881 if self._is_stopped: 882 status = "stopped" 883 if self._daemonic: 884 status += " daemon" 885 if self._ident is not None: 886 status += " %s" % self._ident 887 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) 888 889 def start(self): 890 """Start the thread's activity. 891 It must be called at most once per thread object. It arranges for the 892 object's run() method to be invoked in a separate thread of control. 893 This method will raise a RuntimeError if called more than once on the 894 same thread object. 895 """ 896 #啓動線程的活動。每一個線程對象最多隻能調用一次。它安排在一個單獨的控制線程中 897 # 調用對象的run()方法。若是在同一個線程對象上調用屢次,此方法將引起運行時錯誤。 898 if not self._initialized: 899 raise RuntimeError("thread.__init__() not called") 900 901 if self._started.is_set(): 902 raise RuntimeError("threads can only be started once") 903 with _active_limbo_lock: 904 _limbo[self] = self 905 try: 906 _start_new_thread(self._bootstrap, ()) 907 except Exception: 908 with _active_limbo_lock: 909 del _limbo[self] 910 raise 911 self._started.wait() 912 913 def run(self): 914 """Method representing the thread's activity. 915 You may override this method in a subclass. The standard run() method 916 invokes the callable object passed to the object's constructor as the 917 target argument, if any, with sequential and keyword arguments taken 918 from the args and kwargs arguments, respectively. 919 """ 920 #表示線程活動的方法。您能夠在子類中重寫此方法。標準run()方法調用傳遞給對象 921 # 構造函數的可調用對象做爲目標參數(若是有的話),分別使用args和kwargs參數 922 # 中的順序參數和關鍵字參數。 923 try: 924 if self._target: 925 self._target(*self._args, **self._kwargs) 926 finally: 927 # Avoid a refcycle if the thread is running a function with 928 # an argument that has a member that points to the thread. 929 #若是線程正在運行一個具備指向線程的成員的參數的函數,請避免使用refcycle。 930 del self._target, self._args, self._kwargs 931 932 def _bootstrap(self): 933 # Wrapper around the real bootstrap code that ignores 934 # exceptions during interpreter cleanup. Those typically 935 # happen when a daemon thread wakes up at an unfortunate 936 # moment, finds the world around it destroyed, and raises some 937 # random exception *** while trying to report the exception in 938 # _bootstrap_inner() below ***. Those random exceptions 939 # don't help anybody, and they confuse users, so we suppress 940 # them. We suppress them only when it appears that the world 941 # indeed has already been destroyed, so that exceptions in 942 # _bootstrap_inner() during normal business hours are properly 943 # reported. Also, we only suppress them for daemonic threads; 944 # if a non-daemonic encounters this, something else is wrong. 945 '''包裝真正的引導代碼,在解釋器清理期間忽略異常。這一般發生在守護進程線程 946 在一個不幸的時刻醒來,發現它周圍的世界被破壞,並在試圖報告***下面的異常 947 in_bootstrap_inner()時引起一些隨機異常時。這些隨機的異常對任何人都沒有 948 幫助,並且它們混淆了用戶,因此咱們抑制了它們。只有當世界彷佛確實已經被破壞 949 時,咱們纔會抑制它們,以便在正常工做時間內正確報告_bootstrap_inner()中 950 的異常。並且,咱們只對daemonic線程禁止它們;若是一個非daemonic遇到了這個 951 問題,就會出現其餘問題''' 952 try: 953 self._bootstrap_inner() 954 except: 955 if self._daemonic and _sys is None: 956 return 957 raise 958 959 def _set_ident(self): 960 self._ident = get_ident() 961 962 def _set_tstate_lock(self): 963 """ 964 Set a lock object which will be released by the interpreter when 965 the underlying thread state (see pystate.h) gets deleted. 966 """ 967 #設置一個鎖對象,當底層線程狀態(請參閱pystate.h)被刪除時,解釋器將釋放這個鎖對象。 968 self._tstate_lock = _set_sentinel() 969 self._tstate_lock.acquire() 970 971 def _bootstrap_inner(self): 972 try: 973 self._set_ident() 974 self._set_tstate_lock() 975 self._started.set() 976 with _active_limbo_lock: 977 _active[self._ident] = self 978 del _limbo[self] 979 980 if _trace_hook: 981 _sys.settrace(_trace_hook) 982 if _profile_hook: 983 _sys.setprofile(_profile_hook) 984 985 try: 986 self.run() 987 except SystemExit: 988 pass 989 except: 990 # If sys.stderr is no more (most likely from interpreter 991 # shutdown) use self._stderr. Otherwise still use sys (as in 992 # _sys) in case sys.stderr was redefined since the creation of 993 # self. 994 #若是系統。stderr再也不使用self._stderr(極可能是因爲解釋器關閉)。不然, 995 # 在case sys中仍然使用sys(如in_sys)。stderr自自我創造以來被從新定義。 996 if _sys and _sys.stderr is not None: 997 print("Exception in thread %s:\n%s" % 998 (self.name, _format_exc()), file=_sys.stderr) 999 elif self._stderr is not None: 1000 # Do the best job possible w/o a huge amt. of code to 1001 # approximate a traceback (code ideas from Lib/traceback.py) 1002 #盡最大的努力作最好的工做。近似回溯的代碼(來自Lib/traceback.py的代碼思想) 1003 exc_type, exc_value, exc_tb = self._exc_info() 1004 try: 1005 print(( 1006 "Exception in thread " + self.name + 1007 " (most likely raised during interpreter shutdown):"), file=self._stderr) 1008 print(( 1009 "Traceback (most recent call last):"), file=self._stderr) 1010 while exc_tb: 1011 print(( 1012 ' File "%s", line %s, in %s' % 1013 (exc_tb.tb_frame.f_code.co_filename, 1014 exc_tb.tb_lineno, 1015 exc_tb.tb_frame.f_code.co_name)), file=self._stderr) 1016 exc_tb = exc_tb.tb_next 1017 print(("%s: %s" % (exc_type, exc_value)), file=self._stderr) 1018 self._stderr.flush() 1019 # Make sure that exc_tb gets deleted since it is a memory 1020 # hog; deleting everything else is just for thoroughness 1021 #確保exc_tb被刪除,由於它佔用內存;刪除全部其餘內容只是爲了完全 1022 finally: 1023 del exc_type, exc_value, exc_tb 1024 finally: 1025 # Prevent a race in 1026 # test_threading.test_no_refcycle_through_target when 1027 # the exception keeps the target alive past when we 1028 # assert that it's dead. 1029 #防止test_threading中的競爭。test_no_refcycle_through_target, 1030 # 當異常斷言目標已死時,該異常將使目標保持存活。 1031 #XXX self._exc_clear() 1032 pass 1033 finally: 1034 with _active_limbo_lock: 1035 try: 1036 # We don't call self._delete() because it also 1037 # grabs _active_limbo_lock. 1038 #咱們不調用self._delete(),由於它也抓取_active_limbo_lock。 1039 del _active[get_ident()] 1040 except: 1041 pass 1042 1043 def _stop(self): 1044 # After calling ._stop(), .is_alive() returns False and .join() returns 1045 # immediately. ._tstate_lock must be released before calling ._stop(). 1046 #調用._stop()後,.is_alive()返回False, .join()當即返回。 1047 1048 # Normal case: C code at the end of the thread's life 1049 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and 1050 # that's detected by our ._wait_for_tstate_lock(), called by .join() 1051 # and .is_alive(). Any number of threads _may_ call ._stop() 1052 # simultaneously (for example, if multiple threads are blocked in 1053 # .join() calls), and they're not serialized. That's harmless - 1054 # they'll just make redundant rebindings of ._is_stopped and 1055 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the 1056 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works 1057 # (the assert is executed only if ._tstate_lock is None). 1058 #正常狀況:線程生命週期結束時的C代碼(_threadmodule.c中的release_sentinel) 1059 # 釋放了._tstate_lock,咱們的._wait_for_tstate_lock()檢測到這一點, 1060 # 它被.join()和.is_alive()調用。同時調用任意數量的線程_may_ ._stop() 1061 # (例如,若是多個線程在.join()調用中被阻塞,而且它們沒有被序列化)。這是無害的, 1062 # 他們只會對._is_stopped和._tstate_lock進行冗餘的重綁定。晦澀的: 1063 # 咱們將._tstate_lock綁定到最後,以便「斷言self」。_is_stopped()中 1064 # 的._wait_for_tstate_lock()老是有效的(只有當._tstate_lock爲空時才執行斷言)。 1065 1066 # Special case: _main_thread releases ._tstate_lock via this 1067 # module's _shutdown() function. 1068 #特殊狀況:_main_thread經過這個模塊的_shutdown()函數釋放._tstate_lock。 1069 lock = self._tstate_lock 1070 if lock is not None: 1071 assert not lock.locked() 1072 self._is_stopped = True 1073 self._tstate_lock = None 1074 1075 def _delete(self): 1076 "Remove current thread from the dict of currently running threads." 1077 with _active_limbo_lock: 1078 del _active[get_ident()] 1079 # There must not be any python code between the previous line 1080 # and after the lock is released. Otherwise a tracing function 1081 # could try to acquire the lock again in the same thread, (in 1082 # current_thread()), and would block. 1083 #前一行和鎖釋放後之間不該該有任何python代碼。不然,跟蹤函數能夠嘗試在相 1084 # 同的線程(在current_thread()中)中再次獲取鎖,並將阻塞。 1085 1086 def join(self, timeout=None): 1087 """Wait until the thread terminates. 1088 This blocks the calling thread until the thread whose join() method is 1089 called terminates -- either normally or through an unhandled exception 1090 or until the optional timeout occurs. 1091 When the timeout argument is present and not None, it should be a 1092 floating point number specifying a timeout for the operation in seconds 1093 (or fractions thereof). As join() always returns None, you must call 1094 isAlive() after join() to decide whether a timeout happened -- if the 1095 thread is still alive, the join() call timed out. 1096 When the timeout argument is not present or None, the operation will 1097 block until the thread terminates. 1098 A thread can be join()ed many times. 1099 join() raises a RuntimeError if an attempt is made to join the current 1100 thread as that would cause a deadlock. It is also an error to join() a 1101 thread before it has been started and attempts to do so raises the same 1102 exception. 1103 """ 1104 #等待直到線程終止。這將阻塞調用線程,直到調用join()方法的線程終止——一般或經過 1105 # 未處理的異常終止,或直到出現可選超時爲止。當出現timeout參數而不是None時, 1106 # 它應該是一個浮點數,以秒(或幾分之一)爲單位指定操做的超時。由於join()老是 1107 # 返回None,因此必須在join()以後調用isAlive(),以決定是否發生超時——若是線程 1108 # 仍然活着,則join()調用超時。當timeout參數不存在或不存在時,操做將阻塞, 1109 # 直到線程終止。一個線程能夠屢次鏈接()ed。若是嘗試鏈接當前線程,join()將引起 1110 # 一個運行時錯誤,由於這會致使死鎖。在線程啓動以前鏈接()線程也是一個錯誤, 1111 # 試圖這樣作會引起相同的異常。 1112 if not self._initialized: 1113 raise RuntimeError("Thread.__init__() not called") 1114 if not self._started.is_set(): 1115 raise RuntimeError("cannot join thread before it is started") 1116 if self is current_thread(): 1117 raise RuntimeError("cannot join current thread") 1118 1119 if timeout is None: 1120 self._wait_for_tstate_lock() 1121 else: 1122 # the behavior of a negative timeout isn't documented, but 1123 # historically .join(timeout=x) for x<0 has acted as if timeout=0 1124 #沒有記錄消極超時的行爲,可是在歷史上,x<0時的.join(timeout=x)就像timeout=0同樣 1125 self._wait_for_tstate_lock(timeout=max(timeout, 0)) 1126 1127 def _wait_for_tstate_lock(self, block=True, timeout=-1): 1128 # Issue #18808: wait for the thread state to be gone. 1129 # At the end of the thread's life, after all knowledge of the thread 1130 # is removed from C data structures, C code releases our _tstate_lock. 1131 # This method passes its arguments to _tstate_lock.acquire(). 1132 # If the lock is acquired, the C code is done, and self._stop() is 1133 # called. That sets ._is_stopped to True, and ._tstate_lock to None. 1134 #問題#18808:等待線程狀態消失。在線程生命週期結束時,在從C數據結構中刪除全部 1135 # 線程知識以後,C代碼釋放咱們的_tstate_lock。該方法將其參數 1136 # 傳遞給_tstate_lock.acquire()。若是得到了鎖,則完成C代碼, 1137 # 並調用self._stop()。這將._is_stopped設置爲True,._tstate_lock設置爲None。 1138 lock = self._tstate_lock 1139 if lock is None: # already determined that the C code is done 已經肯定C代碼已經完成 1140 assert self._is_stopped 1141 elif lock.acquire(block, timeout): 1142 lock.release() 1143 self._stop() 1144 1145 @property 1146 def name(self): 1147 """A string used for identification purposes only. 1148 It has no semantics. Multiple threads may be given the same name. The 1149 initial name is set by the constructor. 1150 """ 1151 #僅用於識別目的的字符串。它沒有語義。多個線程可能被賦予相同的名稱。初始名稱由構造函數設置。 1152 assert self._initialized, "Thread.__init__() not called" 1153 return self._name 1154 1155 @name.setter 1156 def name(self, name): 1157 assert self._initialized, "Thread.__init__() not called" 1158 self._name = str(name) 1159 1160 @property 1161 def ident(self): 1162 """Thread identifier of this thread or None if it has not been started. 1163 This is a nonzero integer. See the get_ident() function. Thread 1164 identifiers may be recycled when a thread exits and another thread is 1165 created. The identifier is available even after the thread has exited. 1166 """ 1167 #此線程的線程標識符,若是沒有啓動,則爲空。這是非零整數。請參閱get_ident()函數。 1168 # 當線程退出並建立另外一個線程時,能夠回收線程標識符。即便線程已經退出,標識符也是可用的。 1169 assert self._initialized, "Thread.__init__() not called" 1170 return self._ident 1171 1172 def is_alive(self): 1173 """Return whether the thread is alive. 1174 This method returns True just before the run() method starts until just 1175 after the run() method terminates. The module function enumerate() 1176 returns a list of all alive threads. 1177 """ 1178 #返回線程是否存在。這個方法在run()方法開始以前返回True,直到run()方法終止以後。 1179 # 模塊函數enumerate()返回一個包含全部活線程的列表。 1180 assert self._initialized, "Thread.__init__() not called" 1181 if self._is_stopped or not self._started.is_set(): 1182 return False 1183 self._wait_for_tstate_lock(False) 1184 return not self._is_stopped 1185 1186 isAlive = is_alive 1187 1188 @property 1189 def daemon(self): 1190 """A boolean value indicating whether this thread is a daemon thread. 1191 This must be set before start() is called, otherwise RuntimeError is 1192 raised. Its initial value is inherited from the creating thread; the 1193 main thread is not a daemon thread and therefore all threads created in 1194 the main thread default to daemon = False. 1195 The entire Python program exits when no alive non-daemon threads are 1196 left. 1197 """ 1198 #一個布爾值,指示此線程是否爲守護線程。這必須在調用start()以前設置,不然會引起 1199 # 運行時錯誤。它的初始值繼承自建立線程;主線程不是守護進程線程,所以在主線程中 1200 # 建立的全部線程默認爲守護進程= False。當沒有存活的非守護進程線程時, 1201 # 整個Python程序退出。 1202 assert self._initialized, "Thread.__init__() not called" 1203 return self._daemonic 1204 1205 @daemon.setter 1206 def daemon(self, daemonic): 1207 if not self._initialized: 1208 raise RuntimeError("Thread.__init__() not called") 1209 if self._started.is_set(): 1210 raise RuntimeError("cannot set daemon status of active thread") 1211 self._daemonic = daemonic 1212 1213 def isDaemon(self): #Daemon:守護進程 1214 return self.daemon 1215 1216 def setDaemon(self, daemonic): 1217 self.daemon = daemonic 1218 1219 def getName(self): 1220 return self.name 1221 1222 def setName(self, name): 1223 self.name = name 1224 1225 # The timer class was contributed by Itamar Shtull-Trauring 1226 #計時器類由Itamar Shtull-Trauring貢獻 1227 1228 class Timer(Thread): 1229 """Call a function after a specified number of seconds: 1230 t = Timer(30.0, f, args=None, kwargs=None) 1231 t.start() 1232 t.cancel() # stop the timer's action if it's still waiting 1233 """ 1234 #在指定的秒數後調用一個函數:t = Timer(30.0, f, args=None, kwargs=None) 1235 #t.start() t.cancel()若是計時器仍在等待,則中止計時器的操做 1236 1237 def __init__(self, interval, function, args=None, kwargs=None): 1238 Thread.__init__(self) 1239 self.interval = interval 1240 self.function = function 1241 self.args = args if args is not None else [] 1242 self.kwargs = kwargs if kwargs is not None else {} 1243 self.finished = Event() 1244 1245 def cancel(self): 1246 """Stop the timer if it hasn't finished yet.""" 1247 #若是計時器尚未結束,請中止。 1248 self.finished.set() 1249 1250 def run(self): 1251 self.finished.wait(self.interval) 1252 if not self.finished.is_set(): 1253 self.function(*self.args, **self.kwargs) 1254 self.finished.set() 1255 1256 1257 # Special thread class to represent the main thread 1258 '''表示主線程的特殊線程類''' 1259 1260 class _MainThread(Thread): 1261 1262 def __init__(self): 1263 Thread.__init__(self, name="MainThread", daemon=False) 1264 self._set_tstate_lock() 1265 self._started.set() 1266 self._set_ident() 1267 with _active_limbo_lock: 1268 _active[self._ident] = self 1269 1270 1271 # Dummy thread class to represent threads not started here. 1272 # These aren't garbage collected when they die, nor can they be waited for. 1273 # If they invoke anything in threading.py that calls current_thread(), they 1274 # leave an entry in the _active dict forever after. 1275 # Their purpose is to return *something* from current_thread(). 1276 # They are marked as daemon threads so we won't wait for them 1277 # when we exit (conform previous semantics). 1278 #僞線程類來表示這裏沒有啓動的線程。它們死後不會被垃圾收集,也不會被等待。若是它們在 1279 # 線程中調用任何東西。調用current_thread()的py在_active dict中永遠留下一個條目。 1280 # 它們的目的是從current_thread()返回*something*。它們被標記爲守護線程,所以在退出 1281 # 時咱們不會等待它們(符合前面的語義)。 1282 1283 class _DummyThread(Thread): 1284 1285 def __init__(self): 1286 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) 1287 1288 self._started.set() 1289 self._set_ident() 1290 with _active_limbo_lock: 1291 _active[self._ident] = self 1292 1293 def _stop(self): 1294 pass 1295 1296 def is_alive(self): 1297 assert not self._is_stopped and self._started.is_set() 1298 return True 1299 1300 def join(self, timeout=None): 1301 assert False, "cannot join a dummy thread" 1302 1303 1304 # Global API functions 1305 #全球API函數 1306 1307 def current_thread(): 1308 """Return the current Thread object, corresponding to the caller's thread of control. 1309 If the caller's thread of control was not created through the threading 1310 module, a dummy thread object with limited functionality is returned. 1311 """ 1312 #返回當前線程對象,對應於調用方的控制線程。若是沒有經過線程模塊建立調用者的控制線 1313 # 程,則返回具備有限功能的虛擬線程對象。 1314 try: 1315 return _active[get_ident()] 1316 except KeyError: 1317 return _DummyThread() 1318 1319 currentThread = current_thread 1320 1321 def active_count(): 1322 """Return the number of Thread objects currently alive. 1323 The returned count is equal to the length of the list returned by 1324 enumerate(). 1325 """ 1326 #返回當前存活的線程對象的數量。返回的計數等於enumerate()返回的列表的長度。 1327 with _active_limbo_lock: 1328 return len(_active) + len(_limbo) 1329 1330 activeCount = active_count 1331 1332 def _enumerate(): 1333 # Same as enumerate(), but without the lock. Internal use only. 1334 #與enumerate()相同,只是沒有鎖。內部使用。 1335 return list(_active.values()) + list(_limbo.values()) 1336 1337 def enumerate(): 1338 """Return a list of all Thread objects currently alive. 1339 The list includes daemonic threads, dummy thread objects created by 1340 current_thread(), and the main thread. It excludes terminated threads and 1341 threads that have not yet been started. 1342 """ 1343 #返回當前全部線程對象的列表。該列表包括daemonic線程、current_thread()建立的虛擬 1344 # 線程對象和主線程。它排除終止的線程和還沒有啓動的線程。 1345 with _active_limbo_lock: 1346 return list(_active.values()) + list(_limbo.values()) 1347 1348 from _thread import stack_size 1349 1350 # Create the main thread object, 1351 # and make it available for the interpreter 1352 # (Py_Main) as threading._shutdown. 1353 #建立主線程對象,並將其做爲thread ._shutdown提供給解釋器(Py_Main)。 1354 1355 _main_thread = _MainThread() 1356 1357 def _shutdown(): 1358 # Obscure: other threads may be waiting to join _main_thread. That's 1359 # dubious, but some code does it. We can't wait for C code to release 1360 # the main thread's tstate_lock - that won't happen until the interpreter 1361 # is nearly dead. So we release it here. Note that just calling _stop() 1362 # isn't enough: other threads may already be waiting on _tstate_lock. 1363 #晦澀:其餘線程可能正在等待加入_main_thread。這很可疑,但有些代碼能夠作到。 1364 # 咱們不能等待C代碼釋放主線程的tstate_lock——這要等到解釋器快死的時候纔會發生。 1365 # 咱們在這裏釋放它。注意,僅僅調用_stop()是不夠的:其餘線程可能已經在 1366 # 等待_tstate_lock了。 1367 if _main_thread._is_stopped: 1368 # _shutdown() was already called 1369 return 1370 tlock = _main_thread._tstate_lock 1371 # The main thread isn't finished yet, so its thread state lock can't have 1372 # been released. 1373 #主線程還沒有完成,所以它的線程狀態鎖沒法釋放。 1374 assert tlock is not None 1375 assert tlock.locked() 1376 tlock.release() 1377 _main_thread._stop() 1378 t = _pickSomeNonDaemonThread() 1379 while t: 1380 t.join() 1381 t = _pickSomeNonDaemonThread() 1382 1383 def _pickSomeNonDaemonThread(): 1384 for t in enumerate(): 1385 if not t.daemon and t.is_alive(): 1386 return t 1387 return None 1388 1389 def main_thread(): 1390 """Return the main thread object. 1391 In normal conditions, the main thread is the thread from which the 1392 Python interpreter was started. 1393 """ 1394 #返回主線程對象。在正常狀況下,主線程是Python解釋器啓動的線程。 1395 return _main_thread 1396 1397 # get thread-local implementation, either from the thread 1398 # module, or from the python fallback 1399 #從線程模塊或python回退中獲取線程本地實現 1400 1401 try: 1402 from _thread import _local as local 1403 except ImportError: 1404 from _threading_local import local 1405 1406 1407 def _after_fork(): 1408 """ 1409 Cleanup threading module state that should not exist after a fork. 1410 """ 1411 # Reset _active_limbo_lock, in case we forked while the lock was held 1412 # by another (non-forked) thread. http://bugs.python.org/issue874900 1413 #Reset _active_limbo_lock,以防咱們分叉而鎖被另外一個(非分叉的)線程持有。 1414 global _active_limbo_lock, _main_thread 1415 _active_limbo_lock = _allocate_lock() 1416 1417 # fork() only copied the current thread; clear references to others. 1418 #fork()只複製當前線程;明確說起他人。 1419 new_active = {} 1420 current = current_thread() 1421 _main_thread = current 1422 with _active_limbo_lock: 1423 # Dangling thread instances must still have their locks reset, 1424 # because someone may join() them. 1425 #懸空線程實例必須從新設置它們的鎖,由於有人可能會加入()它們。 1426 threads = set(_enumerate()) 1427 threads.update(_dangling) 1428 for thread in threads: 1429 # Any lock/condition variable may be currently locked or in an 1430 # invalid state, so we reinitialize them. 1431 #任何鎖/條件變量可能當前被鎖定或處於無效狀態,所以咱們從新初始化它們。 1432 if thread is current: 1433 # There is only one active thread. We reset the ident to 1434 # its new value since it can have changed. 1435 #只有一個活動線程。咱們將ident重置爲它的新值,由於它可能已經更改。 1436 thread._reset_internal_locks(True) 1437 ident = get_ident() 1438 thread._ident = ident 1439 new_active[ident] = thread 1440 else: 1441 # All the others are already stopped. 1442 thread._reset_internal_locks(False) 1443 thread._stop() 1444 1445 _limbo.clear() 1446 _active.clear() 1447 _active.update(new_active) 1448 assert len(_active) == 1 1449 1450 1451 if hasattr(_os, "register_at_fork"): 1452 _os.register_at_fork(after_in_child=_after_fork)
隊列:數據結構
Python的Queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現了鎖原語,可以在多線程中直接使用。可使用隊列來實現線程間的同步。多線程
1 '''隊列''' 2 import queue 3 q =queue.Queue() #設置隊列 4 q.put("q1") #隊列中放入數據 5 q.put("q2") 6 q.put("q3") 7 8 # print(q.qsize()) #獲取隊列大小 9 10 '''隊列中獲取數據,取出的數據超出存入數據時會等待,不會報錯''' 11 print(q.get()) 12 print(q.get()) 13 print(q.get()) 14 # print(q.get()) 15 16 '''獲取隊列,但不會等待,超出後直接報錯''' 17 print(q.get_nowait()) 18 print(q.get_nowait()) 19 print(q.get_nowait()) 20 # print(q.get_nowait()) 21 22 '''設置優先級排序的依據''' 23 q = queue.PriorityQueue(maxsize=0) 24 q.put((3,"q1")) #當maxsizie<=0時,隊列無限大,>0時,給定數據即爲隊列大小 25 q.put((1,"q2")) 26 q.put((-4,"q3")) 27 print(q.get()) #獲取時會從小到大按順序獲取 28 print(q.get()) 29 print(q.get())
上述代碼只是隊列的應用,下面將隊列應用與線程之中:
1 import queue 2 import time 3 import threading 4 5 q = queue.Queue(maxsize=10) 6 def gave(name): 7 count = 1 8 while True: 9 q.put("--骨頭--%s" % count) 10 print("%s 生產骨頭 %s" % (name,count)) 11 time.sleep(1) 12 count+=1 13 14 def consumer(name): 15 while q.qsize()>0: 16 # while True: 17 print("%s 吃掉 %s" % (name,q.get())) 18 # time.sleep(10) 19 20 g = threading.Thread(target=gave,args=("王二小",)) 21 c = threading.Thread(target=consumer,args=("旺財",)) 22 g.start() 23 c.start()
1 #print('\033[41;1m--red light on---\033[0m') #紅燈 2 #print('\033[43;1m--yellow light on---\033[0m') #黃燈 3 #print('\033[42;1m--green light on---\033[0m') #綠燈 4 '''主要用在數據同步上''' 5 '''紅綠燈事件''' 6 7 # import threading 8 # import time 9 # # import queue 10 # event = threading.Event() 11 # # q = queue.Queue() 12 # 13 # def light(): 14 # count = 1 15 # while True: 16 # if count<=5: 17 # event.set() 18 # print('\033[42;1m--green light on---\033[0m') 19 # elif 5<count<=10: 20 # event.clear() 21 # print('\033[43;1m--yellow light on---\033[0m') 22 # else: 23 # print('\033[41;1m--red light on---\033[0m') 24 # if count>=15: 25 # count = 0 26 # time.sleep(1) 27 # count+=1 28 # 29 # def car(name): 30 # while True: 31 # if event.is_set(): 32 # time.sleep(1) 33 # print("%s is running..." % name) 34 # else: 35 # print("car is waiting...") 36 # event.wait() #等待事件event對象發生變化 37 # 38 # 39 # Light = threading.Thread(target=light,) 40 # Light.start() 41 # Car = threading.Thread(target=car,args=("BENZ",)) 42 # Car.start() 43 44 45 import threading 46 import time 47 import queue 48 49 event=threading.Event() 50 q=queue.PriorityQueue(maxsize=20) 51 #在循環以前先放入十輛車: 52 for i in range(10): 53 q.put("舊車輛,%s" % "QQ") 54 55 def light(): 56 count=0 57 while True: 58 if count<10: 59 event.set() 60 print("\033[42;1m--green light on---\033[0m",10-count) 61 elif 10<=count<15: 62 event.clear() 63 print("\033[43;1m--yellow light on---\033[0m",15-count) 64 else: 65 event.clear() 66 if count>=25: 67 count=0 68 continue 69 print("\033[41;1m--red light on---\033[0m",25-count) 70 time.sleep(1) 71 count+=1 72 73 def car(name): 74 while True: 75 if event.is_set() and q.qsize()>=1: 76 print("%s is running..." % name) 77 time.sleep(1) 78 print("道路還有【%s】輛車" % q.qsize()) 79 else: 80 print("car is waiting...") 81 print("如今道路中有車%s輛" % q.qsize()) 82 event.wait() #等待事件event對象發生變化 83 84 #路口停車 85 def Put(): 86 n=0 87 while q.qsize()<20: 88 time.sleep(2) 89 q.put("新車輛%s車輛" % n) 90 n+=1 91 print("車輛已駛入") 92 else: 93 event.wait() 94 print("中止駛入") 95 print("中止駛入後道路中有車%s" % q.qsize()) 96 97 #車輛行駛 98 def Get(): 99 while True: 100 if event.is_set(): 101 time.sleep(2) 102 print("%s車輛--------經過" % q.get()) 103 else: 104 print("禁止通行!!") 105 event.wait() 106 107 108 109 C=threading.Thread(target=car,args=("...T...",)) 110 L=threading.Thread(target=light) 111 P=threading.Thread(target=Put) 112 G=threading.Thread(target=Get) 113 L.start() 114 C.start() 115 P.start() 116 G.start()
1 import threading 2 3 money = 0 4 lock = threading.Lock() 5 6 #存錢 7 def get_money(Sum): 8 global money 9 money+=Sum #x=money+sum;money=x 10 11 #取錢 12 def put_money(Sum): 13 global money 14 money-=Sum 15 16 def run(Sum): 17 lock.acquire() 18 for i in range(10000): 19 put_money(Sum) 20 get_money(Sum) 21 lock.release() 22 23 #單線程中不會存在問題 24 #然而在多線程中,操做系統交叉處理賦值語句,致使 25 # 全局變量被一個線程修改,而另外一個線程殊不知情。 26 m1 = threading.Thread(target=run,args=(100,)) 27 m2 = threading.Thread(target=run,args=(1000,)) 28 m1.start() 29 m2.start() 30 m1.join() 31 m2.join() 32 print(money)
1 '''A multi-producer, multi-consumer queue.''' 2 #多生產者、多消費者隊列。 3 import threading 4 from collections import deque 5 from heapq import heappush, heappop 6 from time import monotonic as time 7 try: 8 from _queue import SimpleQueue 9 except ImportError: 10 SimpleQueue = None 11 12 __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue'] 13 14 15 try: 16 from _queue import Empty 17 except AttributeError: 18 class Empty(Exception): 19 'Exception raised by Queue.get(block=0)/get_nowait().' 20 pass 21 22 class Full(Exception): 23 'Exception raised by Queue.put(block=0)/put_nowait().' 24 pass 25 26 27 class Queue: 28 '''Create a queue object with a given maximum size. 29 If maxsize is <= 0, the queue size is infinite. 30 ''' 31 #建立一個具備給定最大大小的隊列對象。若是maxsize <= 0,則隊列大小爲無窮大。 32 33 def __init__(self, maxsize=0): 34 self.maxsize = maxsize 35 self._init(maxsize) 36 37 # mutex must be held whenever the queue is mutating. All methods 38 # that acquire mutex must release it before returning. mutex 39 # is shared between the three conditions, so acquiring and 40 # releasing the conditions also acquires and releases mutex. 41 #當隊列發生變化時,必須持有互斥鎖。全部得到互斥鎖的方法都必須在返回以前釋放它。 42 # 互斥鎖在這三個條件之間是共享的,所以獲取和釋放條件也得到和釋放互斥鎖。 43 self.mutex = threading.Lock() 44 45 # Notify not_empty whenever an item is added to the queue; a 46 # thread waiting to get is notified then. 47 #當一個項目被添加到隊列中時,通知not_empty;而後會通知等待獲取的線程。 48 self.not_empty = threading.Condition(self.mutex) 49 50 # Notify not_full whenever an item is removed from the queue; 51 # a thread waiting to put is notified then. 52 #當一個項目從隊列中刪除時,通知not_full;而後會通知等待放置的線程。 53 self.not_full = threading.Condition(self.mutex) 54 55 # Notify all_tasks_done whenever the number of unfinished tasks 56 # drops to zero; thread waiting to join() is notified to resume 57 #當未完成任務的數量降爲零時,通知all_tasks_done;等待加入()的線程被通知恢復 58 self.all_tasks_done = threading.Condition(self.mutex) 59 self.unfinished_tasks = 0 60 61 def task_done(self): 62 '''Indicate that a formerly enqueued task is complete. 63 Used by Queue consumer threads. For each get() used to fetch a task, 64 a subsequent call to task_done() tells the queue that the processing 65 on the task is complete. 66 If a join() is currently blocking, it will resume when all items 67 have been processed (meaning that a task_done() call was received 68 for every item that had been put() into the queue). 69 Raises a ValueError if called more times than there were items 70 placed in the queue. 71 ''' 72 #指示之前加入隊列的任務已經完成。由隊列使用者線程使用。對於用於獲取任務的每一個 73 # get(),對task_done()的後續調用將告訴隊列任務的處理已經完成。若是一個join() 74 # 當前處於阻塞狀態,那麼當全部項都被處理完時(這意味着對於每一個已將()放入隊列的 75 # 項都接收了task_done()調用),它將恢復。若是調用的次數超過了隊列中放置的項的 76 # 次數,就會引起ValueError。 77 with self.all_tasks_done: 78 unfinished = self.unfinished_tasks - 1 79 if unfinished <= 0: 80 if unfinished < 0: 81 raise ValueError('task_done() called too many times') 82 self.all_tasks_done.notify_all() 83 self.unfinished_tasks = unfinished 84 85 def join(self): 86 '''Blocks until all items in the Queue have been gotten and processed. 87 The count of unfinished tasks goes up whenever an item is added to the 88 queue. The count goes down whenever a consumer thread calls task_done() 89 to indicate the item was retrieved and all work on it is complete. 90 When the count of unfinished tasks drops to zero, join() unblocks. 91 ''' 92 #阻塞,直到獲取和處理隊列中的全部項。當一個項目被添加到隊列中時,未完成任務的計數 93 # 就會上升。每當使用者線程調用task_done()時,計數就會降低,以指示檢索了項目並完成 94 # 了對其的全部工做。當未完成任務的計數降爲0時,join()將解塊。 95 with self.all_tasks_done: 96 while self.unfinished_tasks: 97 self.all_tasks_done.wait() 98 99 def qsize(self): 100 '''Return the approximate size of the queue (not reliable!).''' 101 #返回隊列的大體大小(不可靠!) 102 with self.mutex: 103 return self._qsize() 104 105 def empty(self): 106 '''Return True if the queue is empty, False otherwise (not reliable!). 107 This method is likely to be removed at some point. Use qsize() == 0 108 as a direct substitute, but be aware that either approach risks a race 109 condition where a queue can grow before the result of empty() or 110 qsize() can be used. 111 To create code that needs to wait for all queued tasks to be 112 completed, the preferred technique is to use the join() method. 113 ''' 114 #若是隊列爲空,返回True,不然返回False(不可靠!)這種方法可能會在某個時候被刪除。 115 # 使用qsize() == 0做爲直接的替代,可是要注意,在使用empty()或qsize()的結果以前, 116 # 隊列可能會增加,這可能會帶來競爭條件的風險。要建立須要等待全部排隊任務完成的代碼, 117 # 首選技術是使用join()方法。 118 with self.mutex: 119 return not self._qsize() 120 121 def full(self): 122 '''Return True if the queue is full, False otherwise (not reliable!). 123 This method is likely to be removed at some point. Use qsize() >= n 124 as a direct substitute, but be aware that either approach risks a race 125 condition where a queue can shrink before the result of full() or 126 qsize() can be used. 127 ''' 128 #若是隊列滿了,返回True,不然返回False(不可靠!)這種方法可能會在某個時候被刪除。 129 # 使用qsize() >= n做爲直接替代,可是要注意,在使用full()或qsize()的結果以前, 130 # 隊列可能會收縮,這可能會致使競爭條件的風險。 131 with self.mutex: 132 return 0 < self.maxsize <= self._qsize() 133 134 def put(self, item, block=True, timeout=None): 135 '''Put an item into the queue. 136 If optional args 'block' is true and 'timeout' is None (the default), 137 block if necessary until a free slot is available. If 'timeout' is 138 a non-negative number, it blocks at most 'timeout' seconds and raises 139 the Full exception if no free slot was available within that time. 140 Otherwise ('block' is false), put an item on the queue if a free slot 141 is immediately available, else raise the Full exception ('timeout' 142 is ignored in that case). 143 ''' 144 #將項目放入隊列中。若是可選的args 'block'爲true,而'timeout'爲None(默認值), 145 # 那麼若是有必要,阻塞直到空閒的插槽可用爲止。若是「timeout」是非負數,它最多會 146 # 阻塞「timeout」秒,若是在這段時間內沒有可用的空閒時間,它就會引起徹底異常。 147 # 不然(‘block’爲false),若是有空閒的插槽當即可用,就在隊列中放置一個項目, 148 # 不然引起完整的異常(在這種狀況下忽略‘timeout’)。 149 with self.not_full: 150 if self.maxsize > 0: 151 if not block: 152 if self._qsize() >= self.maxsize: 153 raise Full 154 elif timeout is None: 155 while self._qsize() >= self.maxsize: 156 self.not_full.wait() 157 elif timeout < 0: 158 raise ValueError("'timeout' must be a non-negative number") 159 else: 160 endtime = time() + timeout 161 while self._qsize() >= self.maxsize: 162 remaining = endtime - time() 163 if remaining <= 0.0: 164 raise Full 165 self.not_full.wait(remaining) 166 self._put(item) 167 self.unfinished_tasks += 1 168 self.not_empty.notify() 169 170 def get(self, block=True, timeout=None): 171 '''Remove and return an item from the queue. 172 If optional args 'block' is true and 'timeout' is None (the default), 173 block if necessary until an item is available. If 'timeout' is 174 a non-negative number, it blocks at most 'timeout' seconds and raises 175 the Empty exception if no item was available within that time. 176 Otherwise ('block' is false), return an item if one is immediately 177 available, else raise the Empty exception ('timeout' is ignored 178 in that case). 179 ''' 180 #從隊列中刪除並返回項。若是可選的args 'block'爲true,而'timeout'爲None 181 # (默認值),則在項可用以前,若是有必要,阻塞。若是「timeout」是非負數,它最多 182 # 會阻塞「timeout」秒,若是在這段時間內沒有可用項,就會引起空異常。 183 # 不然(‘block’爲false),若是一個項當即可用,返回一個項,不然引起空異常 184 # (在這種狀況下忽略'timeout')。 185 with self.not_empty: 186 if not block: 187 if not self._qsize(): 188 raise Empty 189 elif timeout is None: 190 while not self._qsize(): 191 self.not_empty.wait() 192 elif timeout < 0: 193 raise ValueError("'timeout' must be a non-negative number") 194 else: 195 endtime = time() + timeout 196 while not self._qsize(): 197 remaining = endtime - time() 198 if remaining <= 0.0: 199 raise Empty 200 self.not_empty.wait(remaining) 201 item = self._get() 202 self.not_full.notify() 203 return item 204 205 def put_nowait(self, item): 206 '''Put an item into the queue without blocking. 207 Only enqueue the item if a free slot is immediately available. 208 Otherwise raise the Full exception. 209 ''' 210 #將項目放入隊列中而不阻塞。只有當一個空閒的插槽當即可用時,纔將項目加入隊列。不然引起徹底異常。 211 return self.put(item, block=False) 212 213 def get_nowait(self): 214 '''Remove and return an item from the queue without blocking. 215 Only get an item if one is immediately available. Otherwise 216 raise the Empty exception. 217 ''' 218 #在不阻塞的狀況下從隊列中刪除並返回項。只有當一個項目是當即可用的。不然引起空異常。 219 return self.get(block=False) 220 221 # Override these methods to implement other queue organizations 222 # (e.g. stack or priority queue). 223 # These will only be called with appropriate locks held 224 #重寫這些方法以實現其餘隊列組織(例如堆棧或優先隊列)。只有在持有適當的鎖時纔會調用這些函數 225 226 # Initialize the queue representation 227 '''初始化隊列表示''' 228 def _init(self, maxsize): 229 self.queue = deque() 230 231 def _qsize(self): 232 return len(self.queue) 233 234 # Put a new item in the queue 235 def _put(self, item): 236 self.queue.append(item) 237 238 # Get an item from the queue 239 def _get(self): 240 return self.queue.popleft() 241 242 243 class PriorityQueue(Queue): 244 '''Variant of Queue that retrieves open entries in priority order (lowest first). 245 Entries are typically tuples of the form: (priority number, data). 246 ''' 247 #按優先級順序(最低優先級)檢索打開項的隊列的變體。條目一般是表單的元組(優先級號、數據)。 248 249 def _init(self, maxsize): 250 self.queue = [] 251 252 def _qsize(self): 253 return len(self.queue) 254 255 def _put(self, item): 256 heappush(self.queue, item) 257 258 def _get(self): 259 return heappop(self.queue) 260 261 262 class LifoQueue(Queue): 263 '''Variant of Queue that retrieves most recently added entries first.''' 264 #隊列的變體,它首先檢索最近添加的條目。 265 266 def _init(self, maxsize): 267 self.queue = [] 268 269 def _qsize(self): 270 return len(self.queue) 271 272 def _put(self, item): 273 self.queue.append(item) 274 275 def _get(self): 276 return self.queue.pop() 277 278 279 class _PySimpleQueue: 280 '''Simple, unbounded FIFO queue. 281 This pure Python implementation is not reentrant. 282 ''' 283 #簡單、無界的FIFO隊列。這個純Python實現是不可重入的。 284 285 # Note: while this pure Python version provides fairness 286 # (by using a threading.Semaphore which is itself fair, being based 287 # on threading.Condition), fairness is not part of the API contract. 288 # This allows the C version to use a different implementation. 289 #注意:雖然這個純Python版本提供了公平性(經過使用線程)。信號量自己是公平的, 290 # 基於thread . condition),公平不是API契約的一部分。這容許C版本使用不一樣的實現。 291 292 def __init__(self): 293 self._queue = deque() 294 self._count = threading.Semaphore(0) 295 296 def put(self, item, block=True, timeout=None): 297 '''Put the item on the queue. 298 The optional 'block' and 'timeout' arguments are ignored, as this method 299 never blocks. They are provided for compatibility with the Queue class. 300 ''' 301 #將項目放到隊列中。可選的「block」和「timeout」參數被忽略,由於這個方法從不阻塞。 302 # 它們是爲了與隊列類兼容而提供的。 303 self._queue.append(item) 304 self._count.release() 305 306 def get(self, block=True, timeout=None): 307 '''Remove and return an item from the queue. 308 If optional args 'block' is true and 'timeout' is None (the default), 309 block if necessary until an item is available. If 'timeout' is 310 a non-negative number, it blocks at most 'timeout' seconds and raises 311 the Empty exception if no item was available within that time. 312 Otherwise ('block' is false), return an item if one is immediately 313 available, else raise the Empty exception ('timeout' is ignored 314 in that case). 315 ''' 316 #從隊列中刪除並返回項。若是可選的args 'block'爲true,而'timeout'爲None 317 # (默認值),則在項可用以前,若是有必要,阻塞。若是「timeout」是非負數,它最多 318 # 會阻塞「timeout」秒,若是在這段時間內沒有可用項,就會引起空異常。不然 319 # (‘block’爲false),若是一個項當即可用,返回一個項,不然引起空異常 320 # (在這種狀況下忽略'timeout')。 321 if timeout is not None and timeout < 0: 322 raise ValueError("'timeout' must be a non-negative number") 323 if not self._count.acquire(block, timeout): 324 raise Empty 325 return self._queue.popleft() 326 327 def put_nowait(self, item): 328 '''Put an item into the queue without blocking. 329 This is exactly equivalent to `put(item)` and is only provided 330 for compatibility with the Queue class. 331 ''' 332 #將項目放入隊列中而不阻塞。這徹底等同於‘put(item)’,而且只提供與隊列類的兼容性。 333 return self.put(item, block=False) 334 335 def get_nowait(self): 336 '''Remove and return an item from the queue without blocking. 337 Only get an item if one is immediately available. Otherwise 338 raise the Empty exception. 339 ''' 340 #在不阻塞的狀況下從隊列中刪除並返回項。只有當一個項目是當即可用的。不然引起空異常。 341 return self.get(block=False) 342 343 def empty(self): 344 '''Return True if the queue is empty, False otherwise (not reliable!).''' 345 #若是隊列爲空,返回True,不然返回False(不可靠!) 346 return len(self._queue) == 0 347 348 def qsize(self): 349 '''Return the approximate size of the queue (not reliable!).''' 350 #返回隊列的大體大小(不可靠!) 351 return len(self._queue) 352 353 354 if SimpleQueue is None: 355 SimpleQueue = _PySimpleQueue
Python進程 |
進程(multiprocessing):
線程是進程最小的數據單元;每一個進程都是相互獨立的,它們之間不能共享數據。
啓動單個進程:
1 '''啓動一個進程''' 2 import multiprocessing 3 import time 4 5 def run(name): 6 time.sleep(2) 7 print("hello",name) 8 9 if __name__ == "__main__": 10 p = multiprocessing.Process(target=run,args=("pp",)) 11 p.start() 12 p.join() 13 14 15 '''運行結果''' 16 hello pp
啓動多個進程:
1 import multiprocessing 2 import time 3 4 def run(name): 5 time.sleep(2) 6 print("hello",name) 7 8 for i in range(10): 9 if __name__ == "__main__": 10 p = multiprocessing.Process(target=run, args=("pp",)) 11 p.start() 12 p.join()
在進程中建立進程:
1 import multiprocessing 2 import threading 3 import time 4 5 def thread_run(): 6 print(threading.get_ident()) #get_ident()得到線程地址 7 8 def run(name): 9 time.sleep(2) 10 print("hello",name) 11 t = threading.Thread(target=thread_run) 12 t.start() 13 14 if __name__ == "__main__": 15 p = multiprocessing.Process(target=run,args=("pp",)) 16 p.start() 17 p.join() 18 19 20 '''運行結果''' 21 hello pp 22 2404
數據共享:
1 '''經過中間介質(pickle)使兩個進程實現數據共享,實質上並非徹底的數據共享, 2 只是將子進程的對象(隊列Queue)進行克隆''' 3 # import threading 4 # import queue 5 # 6 # def f(): 7 # q.put("jfkdsljfkdls") 8 # 9 # if __name__=="__main__": 10 # q=queue.Queue() 11 # p=threading.Thread(target=f) 12 # p.start() 13 # print(q.get()) 14 # p.join() 15 16 '''進程''' 17 from multiprocessing import Process,Queue 18 import queue 19 20 def f(q2): 21 q2.put("hkshhdjskajdksa") 22 23 if __name__=="__main__": 24 q=Queue() 25 p=Process(target=f,args=(q,)) 26 p.start() 27 print(q.get()) 28 p.join()
1 '''manager是用來傳遞對象''' 2 from multiprocessing import Process,Manager 3 import os 4 5 def f(d,l,l1): 6 d[os.getpid()] = os.getpid() #os.getpid():Return the current process id. 7 l.append(os.getpid()) 8 l1.append(os.getpid()) #l1屬於直接傳遞,不能回傳 9 # print(l) 10 # print("l1:***",l1) #普通列表在子進程中每次會得到一個新值,但都會被下一個值覆蓋 11 # print(d) 12 13 if __name__ == "__main__": 14 with Manager() as mager: 15 d = mager.dict() #由manager生成的字典 16 l = mager.list(range(5)) #由manager生成的列表 17 l1 = [] #普通列表沒法共享數據,最後仍舊是空列表 18 p_list = [] 19 for i in range(10): 20 p = Process(target=f,args=(d,l,l1)) 21 p.start() 22 p.join() 23 print("l:",l) 24 print(l1) 25 print("d:",d)
數據傳遞:
1 '''主進程(父類進程) 子進程''' 2 '''管道通訊實現數據之間的傳遞''' 3 # from multiprocessing import Process,Pipe 4 # 5 # def f(con): 6 # con.send("hello from child1") 7 # con.send("hello from child2") 8 # print("parent news:",con.recv()) 9 # con.close() 10 # 11 # if __name__ == "__main__": 12 # Parent_con,Child_con = Pipe() 13 # p = Process(target=f,args=(Child_con,)) 14 # p.start() 15 # print(Parent_con.recv()) 16 # print(Parent_con.recv()) 17 # Parent_con.send("from parent") 18 # p.join() 19 20 21 22 '''運行結果''' 23 hello from child1 24 hello from child2 25 parent news: from parent
進程鎖(Lock):
屏幕存在共享,多進程能夠同時使用屏幕,進程加鎖的目的在於,確保屏幕被獨個進程使用。
from multiprocessing import Process,Lock def f(l,i): l.acquire() print("+++",i) l.release() if __name__ == "__main__": lock = Lock() #加鎖的目的是爲了確保屏幕被單獨佔用 for num in range(100): Process(target=f,args=(lock,num)).start() '''運行結果''' +++ 0 +++ 1 +++ 2 +++ 3 +++ 4 +++ 5 . . .(不在這裏演示完全部內容)
進程池(pool):
python中,進程池內部會維護一個進程序列。當須要時,程序會去進程池中獲取一個進程。
若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。
from multiprocessing import Process,Pool import time import os def foo(i): time.sleep(2) print("in the process:",os.getpid()) return i+100 def bar(args): print("system done",args) if __name__ == "__main__": pool = Pool(5) for i in range(10): # pool.apply(func=foo,args=(i,)) #生成進程,把pool放入容器 #apply自己是一個串行方法,不受join影響 pool.apply_async(func=foo,args=(i,),callback=bar) #apply_async是一個並行方法,受join影響 #callback()回調函數爲主進程操做,進程池一旦開始運行,回調函數會自動執行 print("end") pool.close() #pool關閉是須要時間的,因此在close以後再join pool.join()
pool的內置方法:
1 # 2 # Module providing the `Pool` class for managing a process pool 3 #模塊提供用於管理進程池的「池」類 4 # multiprocessing/pool.py 5 # 6 # Copyright (c) 2006-2008, R Oudkerk 7 # Licensed to PSF under a Contributor Agreement. 8 # 9 10 __all__ = ['Pool', 'ThreadPool'] 11 12 # 13 # Imports 14 # 15 16 import threading 17 import queue 18 import itertools 19 import collections 20 import os 21 import time 22 import traceback 23 24 # If threading is available then ThreadPool should be provided. Therefore 25 # we avoid top-level imports which are liable to fail on some systems. 26 from . import util 27 from . import get_context, TimeoutError 28 29 # 30 # Constants representing the state of a pool 31 #表示池狀態的常數 32 33 RUN = 0 34 CLOSE = 1 35 TERMINATE = 2 36 37 # 38 # Miscellaneous 39 # 40 41 job_counter = itertools.count() 42 43 def mapstar(args): 44 return list(map(*args)) 45 46 def starmapstar(args): 47 return list(itertools.starmap(args[0], args[1])) 48 49 # 50 # Hack to embed stringification of remote traceback in local traceback 51 # 52 53 class RemoteTraceback(Exception): 54 def __init__(self, tb): 55 self.tb = tb 56 def __str__(self): 57 return self.tb 58 59 class ExceptionWithTraceback: 60 def __init__(self, exc, tb): 61 tb = traceback.format_exception(type(exc), exc, tb) 62 tb = ''.join(tb) 63 self.exc = exc 64 self.tb = '\n"""\n%s"""' % tb 65 def __reduce__(self): 66 return rebuild_exc, (self.exc, self.tb) 67 68 def rebuild_exc(exc, tb): 69 exc.__cause__ = RemoteTraceback(tb) 70 return exc 71 72 # 73 # Code run by worker processes 74 # 75 76 class MaybeEncodingError(Exception): 77 """Wraps possible unpickleable errors, so they can be 78 safely sent through the socket.""" 79 #包裝可能出現的沒法拾取的錯誤,以便經過套接字安全地發送這些錯誤。 80 81 def __init__(self, exc, value): 82 self.exc = repr(exc) 83 self.value = repr(value) 84 super(MaybeEncodingError, self).__init__(self.exc, self.value) 85 86 def __str__(self): 87 return "Error sending result: '%s'. Reason: '%s'" % (self.value, 88 self.exc) 89 90 def __repr__(self): 91 return "<%s: %s>" % (self.__class__.__name__, self) 92 93 94 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, 95 wrap_exception=False): 96 if (maxtasks is not None) and not (isinstance(maxtasks, int) 97 and maxtasks >= 1): 98 raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) 99 put = outqueue.put 100 get = inqueue.get 101 if hasattr(inqueue, '_writer'): 102 inqueue._writer.close() 103 outqueue._reader.close() 104 105 if initializer is not None: 106 initializer(*initargs) 107 108 completed = 0 109 while maxtasks is None or (maxtasks and completed < maxtasks): 110 try: 111 task = get() 112 except (EOFError, OSError): 113 util.debug('worker got EOFError or OSError -- exiting') 114 break 115 116 if task is None: 117 util.debug('worker got sentinel -- exiting') 118 break 119 120 job, i, func, args, kwds = task 121 try: 122 result = (True, func(*args, **kwds)) 123 except Exception as e: 124 if wrap_exception and func is not _helper_reraises_exception: 125 e = ExceptionWithTraceback(e, e.__traceback__) 126 result = (False, e) 127 try: 128 put((job, i, result)) 129 except Exception as e: 130 wrapped = MaybeEncodingError(e, result[1]) 131 util.debug("Possible encoding error while sending result: %s" % ( 132 wrapped)) 133 put((job, i, (False, wrapped))) 134 135 task = job = result = func = args = kwds = None 136 completed += 1 137 util.debug('worker exiting after %d tasks' % completed) 138 139 def _helper_reraises_exception(ex): 140 'Pickle-able helper function for use by _guarded_task_generation.' 141 #用於_guarded_task_generation的可選擇助手函數。 142 raise ex 143 144 # 145 # Class representing a process pool 類表示進程池 146 # 147 148 class Pool(object): 149 ''' 150 Class which supports an async version of applying functions to arguments. 151 ''' 152 #類,該類支持將函數應用於參數的異步版本。 153 _wrap_exception = True 154 155 def Process(self, *args, **kwds): 156 return self._ctx.Process(*args, **kwds) 157 158 def __init__(self, processes=None, initializer=None, initargs=(), 159 maxtasksperchild=None, context=None): 160 self._ctx = context or get_context() 161 self._setup_queues() 162 self._taskqueue = queue.SimpleQueue() 163 self._cache = {} 164 self._state = RUN 165 self._maxtasksperchild = maxtasksperchild 166 self._initializer = initializer 167 self._initargs = initargs 168 169 if processes is None: 170 processes = os.cpu_count() or 1 171 if processes < 1: 172 raise ValueError("Number of processes must be at least 1") 173 174 if initializer is not None and not callable(initializer): 175 raise TypeError('initializer must be a callable') 176 177 self._processes = processes 178 self._pool = [] 179 self._repopulate_pool() 180 181 self._worker_handler = threading.Thread( 182 target=Pool._handle_workers, 183 args=(self, ) 184 ) 185 self._worker_handler.daemon = True 186 self._worker_handler._state = RUN 187 self._worker_handler.start() 188 189 190 self._task_handler = threading.Thread( 191 target=Pool._handle_tasks, 192 args=(self._taskqueue, self._quick_put, self._outqueue, 193 self._pool, self._cache) 194 ) 195 self._task_handler.daemon = True 196 self._task_handler._state = RUN 197 self._task_handler.start() 198 199 self._result_handler = threading.Thread( 200 target=Pool._handle_results, 201 args=(self._outqueue, self._quick_get, self._cache) 202 ) 203 self._result_handler.daemon = True 204 self._result_handler._state = RUN 205 self._result_handler.start() 206 207 self._terminate = util.Finalize( 208 self, self._terminate_pool, 209 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 210 self._worker_handler, self._task_handler, 211 self._result_handler, self._cache), 212 exitpriority=15 213 ) 214 215 def _join_exited_workers(self): 216 """Cleanup after any worker processes which have exited due to reaching 217 their specified lifetime. Returns True if any workers were cleaned up. 218 """ 219 #在因爲達到指定的生存期而退出的任何工做進程以後進行清理。若是有工人被清理乾淨, 220 # 返回True。 221 cleaned = False 222 for i in reversed(range(len(self._pool))): 223 worker = self._pool[i] 224 if worker.exitcode is not None: 225 # worker exited 226 util.debug('cleaning up worker %d' % i) 227 worker.join() 228 cleaned = True 229 del self._pool[i] 230 return cleaned 231 232 def _repopulate_pool(self): 233 """Bring the number of pool processes up to the specified number, 234 for use after reaping workers which have exited. 235 """ 236 #將池進程的數量增長到指定的數量,以便在收割已退出的工人後使用。 237 for i in range(self._processes - len(self._pool)): 238 w = self.Process(target=worker, 239 args=(self._inqueue, self._outqueue, 240 self._initializer, 241 self._initargs, self._maxtasksperchild, 242 self._wrap_exception) 243 ) 244 self._pool.append(w) 245 w.name = w.name.replace('Process', 'PoolWorker') 246 w.daemon = True 247 w.start() 248 util.debug('added worker') 249 250 def _maintain_pool(self): 251 """Clean up any exited workers and start replacements for them. 252 """ 253 #清理全部離職的員工,並開始替換他們。 254 if self._join_exited_workers(): 255 self._repopulate_pool() 256 257 def _setup_queues(self): 258 self._inqueue = self._ctx.SimpleQueue() 259 self._outqueue = self._ctx.SimpleQueue() 260 self._quick_put = self._inqueue._writer.send 261 self._quick_get = self._outqueue._reader.recv 262 263 def apply(self, func, args=(), kwds={}): 264 ''' 265 Equivalent of `func(*args, **kwds)`. 266 Pool must be running. 267 ''' 268 #至關於「func(*args, ** kwds)」。池必須正在運行。 269 return self.apply_async(func, args, kwds).get() 270 271 def map(self, func, iterable, chunksize=None): 272 ''' 273 Apply `func` to each element in `iterable`, collecting the results 274 in a list that is returned. 275 ''' 276 #對「iterable」中的每一個元素應用「func」,在返回的列表中收集結果。 277 return self._map_async(func, iterable, mapstar, chunksize).get() 278 279 def starmap(self, func, iterable, chunksize=None): 280 ''' 281 Like `map()` method but the elements of the `iterable` are expected to 282 be iterables as well and will be unpacked as arguments. Hence 283 `func` and (a, b) becomes func(a, b). 284 ''' 285 #方法相似於「map()」,但「iterable」的元素也應該是可迭代的,並將做爲參數解壓縮。 286 # 所以「func」和(a, b)變成了func(a, b)。 287 return self._map_async(func, iterable, starmapstar, chunksize).get() 288 289 def starmap_async(self, func, iterable, chunksize=None, callback=None, 290 error_callback=None): 291 ''' 292 Asynchronous version of `starmap()` method. 293 ''' 294 #異步版本的「starmap()」方法。 295 return self._map_async(func, iterable, starmapstar, chunksize, 296 callback, error_callback) 297 298 def _guarded_task_generation(self, result_job, func, iterable): 299 '''Provides a generator of tasks for imap and imap_unordered with 300 appropriate handling for iterables which throw exceptions during 301 iteration.''' 302 #爲imap和imap_unordered提供任務生成器,併爲迭代期間拋出異常的迭代提供適當的處理。 303 try: 304 i = -1 305 for i, x in enumerate(iterable): 306 yield (result_job, i, func, (x,), {}) 307 except Exception as e: 308 yield (result_job, i+1, _helper_reraises_exception, (e,), {}) 309 310 def imap(self, func, iterable, chunksize=1): 311 ''' 312 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. 313 ''' 314 #等價於「map()」——可能比「Pool.map()」慢得多。 315 if self._state != RUN: 316 raise ValueError("Pool not running") 317 if chunksize == 1: 318 result = IMapIterator(self._cache) 319 self._taskqueue.put( 320 ( 321 self._guarded_task_generation(result._job, func, iterable), 322 result._set_length 323 )) 324 return result 325 else: 326 if chunksize < 1: 327 raise ValueError( 328 "Chunksize must be 1+, not {0:n}".format( 329 chunksize)) 330 task_batches = Pool._get_tasks(func, iterable, chunksize) 331 result = IMapIterator(self._cache) 332 self._taskqueue.put( 333 ( 334 self._guarded_task_generation(result._job, 335 mapstar, 336 task_batches), 337 result._set_length 338 )) 339 return (item for chunk in result for item in chunk) 340 341 def imap_unordered(self, func, iterable, chunksize=1): 342 ''' 343 Like `imap()` method but ordering of results is arbitrary. 344 ''' 345 #Like `imap()`方法,但結果的順序是任意的。 346 if self._state != RUN: 347 raise ValueError("Pool not running") 348 if chunksize == 1: 349 result = IMapUnorderedIterator(self._cache) 350 self._taskqueue.put( 351 ( 352 self._guarded_task_generation(result._job, func, iterable), 353 result._set_length 354 )) 355 return result 356 else: 357 if chunksize < 1: 358 raise ValueError( 359 "Chunksize must be 1+, not {0!r}".format(chunksize)) 360 task_batches = Pool._get_tasks(func, iterable, chunksize) 361 result = IMapUnorderedIterator(self._cache) 362 self._taskqueue.put( 363 ( 364 self._guarded_task_generation(result._job, 365 mapstar, 366 task_batches), 367 result._set_length 368 )) 369 return (item for chunk in result for item in chunk) 370 371 def apply_async(self, func, args=(), kwds={}, callback=None, 372 error_callback=None): 373 ''' 374 Asynchronous version of `apply()` method. 「apply()」方法的異步版本。 375 ''' 376 if self._state != RUN: 377 raise ValueError("Pool not running") 378 result = ApplyResult(self._cache, callback, error_callback) 379 self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) 380 return result 381 382 def map_async(self, func, iterable, chunksize=None, callback=None, 383 error_callback=None): 384 ''' 385 Asynchronous version of `map()` method. 方法的異步版本 386 ''' 387 return self._map_async(func, iterable, mapstar, chunksize, callback, 388 error_callback) 389 390 def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, 391 error_callback=None): 392 ''' 393 Helper function to implement map, starmap and their async counterparts. 394 ''' 395 #幫助函數實現映射,星圖和他們的異步對等。 396 if self._state != RUN: 397 raise ValueError("Pool not running") 398 if not hasattr(iterable, '__len__'): 399 iterable = list(iterable) 400 401 if chunksize is None: 402 chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 403 if extra: 404 chunksize += 1 405 if len(iterable) == 0: 406 chunksize = 0 407 408 task_batches = Pool._get_tasks(func, iterable, chunksize) 409 result = MapResult(self._cache, chunksize, len(iterable), callback, 410 error_callback=error_callback) 411 self._taskqueue.put( 412 ( 413 self._guarded_task_generation(result._job, 414 mapper, 415 task_batches), 416 None 417 ) 418 ) 419 return result 420 421 @staticmethod 422 def _handle_workers(pool): 423 thread = threading.current_thread() 424 425 # Keep maintaining workers until the cache gets drained, unless the pool 426 # is terminated. 427 #繼續維護worker,直到緩存耗盡,除非池終止。 428 while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 429 pool._maintain_pool() 430 time.sleep(0.1) 431 # send sentinel to stop workers 432 pool._taskqueue.put(None) 433 util.debug('worker handler exiting') 434 435 @staticmethod 436 def _handle_tasks(taskqueue, put, outqueue, pool, cache): 437 thread = threading.current_thread() 438 439 for taskseq, set_length in iter(taskqueue.get, None): 440 task = None 441 try: 442 # iterating taskseq cannot fail 443 #迭代taskseq不會失敗 444 for task in taskseq: 445 if thread._state: 446 util.debug('task handler found thread._state != RUN') 447 break 448 try: 449 put(task) 450 except Exception as e: 451 job, idx = task[:2] 452 try: 453 cache[job]._set(idx, (False, e)) 454 except KeyError: 455 pass 456 else: 457 if set_length: 458 util.debug('doing set_length()') 459 idx = task[1] if task else -1 460 set_length(idx + 1) 461 continue 462 break 463 finally: 464 task = taskseq = job = None 465 else: 466 util.debug('task handler got sentinel') 467 468 try: 469 # tell result handler to finish when cache is empty 470 #告訴結果處理程序在緩存爲空時結束 471 util.debug('task handler sending sentinel to result handler') 472 outqueue.put(None) 473 474 # tell workers there is no more work 475 util.debug('task handler sending sentinel to workers') 476 for p in pool: 477 put(None) 478 except OSError: 479 util.debug('task handler got OSError when sending sentinels') 480 481 util.debug('task handler exiting') 482 483 @staticmethod 484 def _handle_results(outqueue, get, cache): 485 thread = threading.current_thread() 486 487 while 1: 488 try: 489 task = get() 490 except (OSError, EOFError): 491 util.debug('result handler got EOFError/OSError -- exiting') 492 return 493 494 if thread._state: 495 assert thread._state == TERMINATE, "Thread not in TERMINATE" 496 util.debug('result handler found thread._state=TERMINATE') 497 break 498 499 if task is None: 500 util.debug('result handler got sentinel') 501 break 502 503 job, i, obj = task 504 try: 505 cache[job]._set(i, obj) 506 except KeyError: 507 pass 508 task = job = obj = None 509 510 while cache and thread._state != TERMINATE: 511 try: 512 task = get() 513 except (OSError, EOFError): 514 util.debug('result handler got EOFError/OSError -- exiting') 515 return 516 517 if task is None: 518 util.debug('result handler ignoring extra sentinel') 519 continue 520 job, i, obj = task 521 try: 522 cache[job]._set(i, obj) 523 except KeyError: 524 pass 525 task = job = obj = None 526 527 if hasattr(outqueue, '_reader'): 528 util.debug('ensuring that outqueue is not full') 529 # If we don't make room available in outqueue then 530 # attempts to add the sentinel (None) to outqueue may 531 # block. There is guaranteed to be no more than 2 sentinels. 532 #若是咱們不在outqueue中留出可用的空間,那麼嘗試將sentinel (None) 533 # 添加到outqueue可能會阻塞。保證不超過2個哨兵。 534 try: 535 for i in range(10): 536 if not outqueue._reader.poll(): 537 break 538 get() 539 except (OSError, EOFError): 540 pass 541 542 util.debug('result handler exiting: len(cache)=%s, thread._state=%s', 543 len(cache), thread._state) 544 545 @staticmethod 546 def _get_tasks(func, it, size): 547 it = iter(it) 548 while 1: 549 x = tuple(itertools.islice(it, size)) 550 if not x: 551 return 552 yield (func, x) 553 554 def __reduce__(self): 555 raise NotImplementedError( 556 'pool objects cannot be passed between processes or pickled' 557 #不能在進程之間傳遞池對象或pickle池對象 558 ) 559 560 def close(self): 561 util.debug('closing pool') 562 if self._state == RUN: 563 self._state = CLOSE 564 self._worker_handler._state = CLOSE 565 566 def terminate(self): 567 util.debug('terminating pool') 568 self._state = TERMINATE 569 self._worker_handler._state = TERMINATE 570 self._terminate() 571 572 def join(self): 573 util.debug('joining pool') 574 if self._state == RUN: 575 raise ValueError("Pool is still running") 576 elif self._state not in (CLOSE, TERMINATE): 577 raise ValueError("In unknown state") 578 self._worker_handler.join() 579 self._task_handler.join() 580 self._result_handler.join() 581 for p in self._pool: 582 p.join() 583 584 @staticmethod 585 def _help_stuff_finish(inqueue, task_handler, size): 586 # task_handler may be blocked trying to put items on inqueue 587 #試圖將項放入inqueue時可能阻塞task_handler 588 util.debug('removing tasks from inqueue until task handler finished') 589 inqueue._rlock.acquire() 590 while task_handler.is_alive() and inqueue._reader.poll(): 591 inqueue._reader.recv() 592 time.sleep(0) 593 594 @classmethod 595 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, 596 worker_handler, task_handler, result_handler, cache): 597 # this is guaranteed to only be called once 這保證只調用一次 598 util.debug('finalizing pool') 599 600 worker_handler._state = TERMINATE 601 task_handler._state = TERMINATE 602 603 util.debug('helping task handler/workers to finish') 604 cls._help_stuff_finish(inqueue, task_handler, len(pool)) 605 606 if (not result_handler.is_alive()) and (len(cache) != 0): 607 raise AssertionError( 608 "Cannot have cache with result_hander not alive") 609 610 result_handler._state = TERMINATE 611 outqueue.put(None) # sentinel 612 613 # We must wait for the worker handler to exit before terminating 614 # workers because we don't want workers to be restarted behind our back. 615 #咱們必須在終止工人以前等待工人處理程序退出,由於咱們不但願工人在咱們背後從新啓動。 616 util.debug('joining worker handler') 617 if threading.current_thread() is not worker_handler: 618 worker_handler.join() 619 620 # Terminate workers which haven't already finished. 621 if pool and hasattr(pool[0], 'terminate'): 622 util.debug('terminating workers') 623 for p in pool: 624 if p.exitcode is None: 625 p.terminate() 626 627 util.debug('joining task handler') 628 if threading.current_thread() is not task_handler: 629 task_handler.join() 630 631 util.debug('joining result handler') 632 if threading.current_thread() is not result_handler: 633 result_handler.join() 634 635 if pool and hasattr(pool[0], 'terminate'): 636 util.debug('joining pool workers') 637 for p in pool: 638 if p.is_alive(): 639 # worker has not yet exited 640 util.debug('cleaning up worker %d' % p.pid) 641 p.join() 642 643 def __enter__(self): 644 return self 645 646 def __exit__(self, exc_type, exc_val, exc_tb): 647 self.terminate() 648 649 # 650 # Class whose instances are returned by `Pool.apply_async()` 651 # 652 653 class ApplyResult(object): 654 655 def __init__(self, cache, callback, error_callback): 656 self._event = threading.Event() 657 self._job = next(job_counter) 658 self._cache = cache 659 self._callback = callback 660 self._error_callback = error_callback 661 cache[self._job] = self 662 663 def ready(self): 664 return self._event.is_set() 665 666 def successful(self): 667 if not self.ready(): 668 raise ValueError("{0!r} not ready".format(self)) 669 return self._success 670 671 def wait(self, timeout=None): 672 self._event.wait(timeout) 673 674 def get(self, timeout=None): 675 self.wait(timeout) 676 if not self.ready(): 677 raise TimeoutError 678 if self._success: 679 return self._value 680 else: 681 raise self._value 682 683 def _set(self, i, obj): 684 self._success, self._value = obj 685 if self._callback and self._success: 686 self._callback(self._value) 687 if self._error_callback and not self._success: 688 self._error_callback(self._value) 689 self._event.set() 690 del self._cache[self._job] 691 692 AsyncResult = ApplyResult # create alias -- see #17805 693 694 # 695 # Class whose instances are returned by `Pool.map_async()` 696 # 697 698 class MapResult(ApplyResult): 699 700 def __init__(self, cache, chunksize, length, callback, error_callback): 701 ApplyResult.__init__(self, cache, callback, 702 error_callback=error_callback) 703 self._success = True 704 self._value = [None] * length 705 self._chunksize = chunksize 706 if chunksize <= 0: 707 self._number_left = 0 708 self._event.set() 709 del cache[self._job] 710 else: 711 self._number_left = length//chunksize + bool(length % chunksize) 712 713 def _set(self, i, success_result): 714 self._number_left -= 1 715 success, result = success_result 716 if success and self._success: 717 self._value[i*self._chunksize:(i+1)*self._chunksize] = result 718 if self._number_left == 0: 719 if self._callback: 720 self._callback(self._value) 721 del self._cache[self._job] 722 self._event.set() 723 else: 724 if not success and self._success: 725 # only store first exception 726 self._success = False 727 self._value = result 728 if self._number_left == 0: 729 # only consider the result ready once all jobs are done 730 if self._error_callback: 731 self._error_callback(self._value) 732 del self._cache[self._job] 733 self._event.set() 734 735 # 736 # Class whose instances are returned by `Pool.imap()` 737 # 738 739 class IMapIterator(object): 740 741 def __init__(self, cache): 742 self._cond = threading.Condition(threading.Lock()) 743 self._job = next(job_counter) 744 self._cache = cache 745 self._items = collections.deque() 746 self._index = 0 747 self._length = None 748 self._unsorted = {} 749 cache[self._job] = self 750 751 def __iter__(self): 752 return self 753 754 def next(self, timeout=None): 755 with self._cond: 756 try: 757 item = self._items.popleft() 758 except IndexError: 759 if self._index == self._length: 760 raise StopIteration from None 761 self._cond.wait(timeout) 762 try: 763 item = self._items.popleft() 764 except IndexError: 765 if self._index == self._length: 766 raise StopIteration from None 767 raise TimeoutError from None 768 769 success, value = item 770 if success: 771 return value 772 raise value 773 774 __next__ = next # XXX 775 776 def _set(self, i, obj): 777 with self._cond: 778 if self._index == i: 779 self._items.append(obj) 780 self._index += 1 781 while self._index in self._unsorted: 782 obj = self._unsorted.pop(self._index) 783 self._items.append(obj) 784 self._index += 1 785 self._cond.notify() 786 else: 787 self._unsorted[i] = obj 788 789 if self._index == self._length: 790 del self._cache[self._job] 791 792 def _set_length(self, length): 793 with self._cond: 794 self._length = length 795 if self._index == self._length: 796 self._cond.notify() 797 del self._cache[self._job] 798 799 # 800 # Class whose instances are returned by `Pool.imap_unordered()` 801 #類,其實例由' Pool.imap_unordered() '返回 802 803 class IMapUnorderedIterator(IMapIterator): 804 805 def _set(self, i, obj): 806 with self._cond: 807 self._items.append(obj) 808 self._index += 1 809 self._cond.notify() 810 if self._index == self._length: 811 del self._cache[self._job] 812 813 # 814 # 815 # 816 817 class ThreadPool(Pool): 818 _wrap_exception = False 819 820 @staticmethod 821 def Process(*args, **kwds): 822 from .dummy import Process 823 return Process(*args, **kwds) 824 825 def __init__(self, processes=None, initializer=None, initargs=()): 826 Pool.__init__(self, processes, initializer, initargs) 827 828 def _setup_queues(self): 829 self._inqueue = queue.SimpleQueue() 830 self._outqueue = queue.SimpleQueue() 831 self._quick_put = self._inqueue.put 832 self._quick_get = self._outqueue.get 833 834 @staticmethod 835 def _help_stuff_finish(inqueue, task_handler, size): 836 # drain inqueue, and put sentinels at its head to make workers finish 837 #排幹隊伍內的水,並在其頭部放置哨兵,使工人完成工做 838 try: 839 while True: 840 inqueue.get(block=False) 841 except queue.Empty: 842 pass 843 for i in range(size): 844 inqueue.put(None)