JoinableQueue一樣經過multiprocessing使用。python
建立隊列的另一個類:dom
JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。ide
參數介紹:函數
![](http://static.javashuo.com/static/loading.gif)
1 import time 2 import random 3 from multiprocessing import Process,JoinableQueue 4 5 def consumer(name,q): 6 while True: 7 res=q.get() 8 if res is None:break 9 time.sleep(random.randint(1,3)) 10 print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res)) 11 q.task_done() 12 13 def producer(name,q,food): 14 for i in range(5): 15 time.sleep(random.randint(1,2)) 16 res='%s%s' %(food,i) 17 q.put(res) 18 print('\033[45m生產者者===》%s 生產了 %s\033[0m' %(name,res)) 19 20 21 22 if __name__ == '__main__': 23 #一、共享的盆 24 q=JoinableQueue() 25 26 27 #二、生產者們 28 p1=Process(target=producer,args=('egon',q,'包子')) 29 p2=Process(target=producer,args=('劉',q,'泔水')) 30 p3=Process(target=producer,args=('楊',q,'米飯')) 31 32 #三、消費者們 33 c1=Process(target=consumer,args=('alex',q)) 34 c2=Process(target=consumer,args=('梁',q)) 35 c1.daemon=True 36 c2.daemon=True 37 38 p1.start() 39 p2.start() 40 p3.start() 41 c1.start() 42 c2.start() 43 44 45 # 肯定生產者確確實實已經生產完畢 46 p1.join() 47 p2.join() 48 p3.join() 49 # 在生產者生產完畢後,拿到隊列中元素的總個數,而後直到元素總數變爲0,q.join()這一行代碼纔算運行完畢 50 q.join() 51 #q.join()一旦結束就意味着隊列確實被取空,消費者已經確確實實把數據都取乾淨了 52 print('主進程結束')
2、線程
一、什麼是線程post
線程指的是一條流水線的工做過程ui
進程根本就不是一個執行單位,進程實際上是一個資源單位
一個進程內自帶一個線程,線程纔是執行單位spa
二、進程VS線程線程
- 同一進程內的線程們共享該進程內資源,不一樣進程內的線程資源確定是隔離的
- 建立線程的開銷比建立進程要小的多
- 一個進程的全部線程的pid都是同樣的
- 線程之間的地位相同,沒有主次之分,可是主線程又表明着主進程
#方式一 from threading import Thread import time def sayhi(name): print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程')
#方式二 from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主線程')
線程執行的結果:code
與進程不一樣,是由於線程啓動的代價很小,啓動速度遠大於進程,因此,子線程先打印,主線程纔打印對象
from threading import Thread,current_thread,active_count,enumerate import time,os def task(): print('%s is running' %current_thread().name) time.sleep(3) if __name__ == '__main__': t1=Thread(target=task,name='第一個線程') t2=Thread(target=task,) t3=Thread(target=task,) t1.start() t2.start() t3.start() # print(t1.is_alive()) #查看線程是否存活 print(active_count()) #查看活躍的線程數,包括主線程 print(enumerate()) #獲取當前每一個活躍的線程對象 print('主線程',current_thread().name) #獲取當前線程名
Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
五、守護線程
t.daemon=true # 設爲守護線程,守護線程要在線程開始執行以前設置,不然報錯
不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬
須要強調的是:運行完畢並不是終止運行
1 .對主進程來講,運行完畢指的是主進程代碼運行完畢
2. 對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
詳細解釋:
- 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束
- 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()以前設置 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True '''
3、死鎖現象;互斥鎖、遞歸鎖、信號量
死鎖現象
![](http://static.javashuo.com/static/loading.gif)
1 from threading import Thread,Lock,RLock 2 import time 3 4 mutexA=Lock() 5 mutexB=Lock() 6 7 class MyThread(Thread): 8 def run(self): 9 self.f1() 10 self.f2() 11 12 def f1(self): 13 mutexA.acquire() 14 print('%s 拿到了A鎖' %self.name) 15 16 mutexB.acquire() 17 print('%s 拿到了B鎖' %self.name) 18 mutexB.release() 19 20 mutexA.release() 21 22 def f2(self): 23 mutexB.acquire() 24 print('%s 拿到了B鎖' %self.name) 25 time.sleep(0.1) 26 27 mutexA.acquire() 28 print('%s 拿到了A鎖' %self.name) 29 mutexA.release() 30 31 mutexB.release() 32 33 34 if __name__ == '__main__': 35 for i in range(10): 36 t=MyThread() 37 t.start() 38 39 print('主')
解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
信號量
同進程的同樣
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。
實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):
# from multiprocessing import Semaphore from threading import Thread,Semaphore,current_thread import time,random sm=Semaphore(5) def go_wc(): sm.acquire() print('%s 上廁所ing' %current_thread().getName()) time.sleep(random.randint(1,3)) sm.release() if __name__ == '__main__': for i in range(23): t=Thread(target=go_wc) t.start()