1、互斥鎖python
進程之間數據隔離,可是共享一套文件系統,於是能夠經過文件來實現進程直接的通訊,但問題是必須本身加鎖處理。編程
注意:加鎖的目的是爲了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。json
1.上廁所的小例子:你上廁所的時候確定得鎖門吧,有人來了看見門鎖着,就會在外面等着,等你吧門開開出來的時候,下一我的纔去上廁所。安全
1 from multiprocessing import Process,Lock 2 import os 3 import time 4 def work(mutex): 5 mutex.acquire() 6 print('task[%s] 上廁所'%os.getpid()) 7 time.sleep(3) 8 print('task[%s] 上完廁所'%os.getpid()) 9 mutex.release() 10 if __name__ == '__main__': 11 mutex = Lock() 12 p1 = Process(target=work,args=(mutex,)) 13 p2 = Process(target=work,args=(mutex,)) 14 p3 = Process(target=work,args=(mutex,)) 15 p1.start() 16 p2.start() 17 p3.start() 18 p1.join() 19 p2.join() 20 p3.join() 21 print('主')
2、模擬搶票(也是利用了互斥鎖的原理 :LOCK互斥鎖)多線程
1 import json 2 import time 3 import random 4 import os 5 from multiprocessing import Process,Lock 6 def chakan(): 7 dic = json.load(open('piao',)) # 先查看票數,也就是打開那個文件 8 print('剩餘票數:%s' % dic['count']) # 查看剩餘的票數 9 def buy(): 10 dic = json.load(open('piao',)) 11 if dic['count']>0: #若是還有票 12 dic['count']-=1 #就修改裏面的值-1 13 time.sleep(random.randint(1,3)) #執行裏面買票的一系列操做就先不執行了,讓睡一會代替(而且隨機的睡) 14 json.dump(dic,open('piao','w')) 15 print('%s 購票成功' % os.getpid()) # 當前的那個id購票成功 16 def task(mutex): #搶票 17 chakan() #由於查看的時候你們均可以看到,不須要加鎖 18 mutex.acquire() #加鎖 19 buy() #買的時候必須一個一個的買,先等一我的買完了,後面的人在買 20 mutex.release() #取消鎖 21 if __name__ == '__main__': 22 mutex = Lock() 23 for i in range(50):#讓50我的去訪問那個票數 24 p = Process(target=task,args=(mutex,)) 25 p.start()
3、Process對象的其餘屬性併發
p.daemon :守護進程(必須在開啓以前設置守護進程):若是父進程死,子進程p也死了dom
p.join:父進程等p執行完了才運行主進程,是父進程阻塞在原地,而p仍然在後臺運行。性能
terminate:強制關閉。(確保p裏面沒有其餘子進程的時候關閉,若是裏面有子進程,你去用這個方法強制關閉了就會產生殭屍進程(打個比方:若是你老子掛了,你還沒掛,那麼就沒人給你收屍了,啊哈哈))ui
is_alive:關閉進程的時候,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活spa
p.join():父進程在等p的結束,是父進程阻塞在原地,而p仍然在後臺運行
p.name:查看名字
p.pid :查看id
咱們能夠簡單介紹一下殭屍進程:
子進程運行完成,可是父進程遲遲沒有進行回收,此時子進程實際上並無退出,其仍然佔用着系統資源,這樣的⼦進程稱爲殭屍進程。
由於殭屍進程的資源一直未被回收,形成了系統資源的浪費,過多的殭屍進程將形成系統性能降低,因此應避免出現僵⼫進程。
1 from multiprocessing import Process 2 import os 3 import time 4 def work(): 5 print('%s is working'%os.getpid()) 6 time.sleep(3) 7 if __name__ == '__main__': 8 p1 =Process(target=work) 9 p2 =Process(target=work) 10 p3 =Process(target=work) 11 # p1.daemon = True 12 # p2.daemon = True #守護進程(守護他爹) 13 # p3.daemon = True #主進程死了子進程也死了(就不會執行子進程了) 14 p1.start() 15 p2.start() 16 p3.start() 17 18 p3.join() 19 p2.join() 20 p1.join() #多個join就是在等花費時間最長的那個運行完就執行主程序了 21 print('主程序') 22 23 # -瞭解方法--------------- 24 # p1.terminate() #強制關閉進程 25 # time.sleep(3) 26 # print(p1.is_alive()) #看是否是還活着 27 # print(p1.name) #查看進程名字 28 # print(p1.pid) #查看id號 29 # print('主程序')
3、進程間的三種通訊(IPC)方式:
方式一:隊列(推薦使用)
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
1.隊列:隊列相似於一條管道,元素先進先出
須要注意的一點是:隊列都是在內存中操做,進程退出,隊列清空,另外,隊列也是一個阻塞的形態
2.隊列分類
隊列有不少種,但都依賴與模塊queue
queue.Queue() #先進先出
queue.LifoQueue() #後進先出
queue.PriorityQueue() #優先級隊列
queue.deque() #雙線隊列
建立隊列的類(底層就是以管道和鎖定的方式實現):
1
2
|
Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,
可使用Queue實現多進程之間的數據傳遞。
|
參數介紹:
1
|
1
maxsize是隊列中容許最大項數,省略則無大小限制。
|
方法介紹:
1
2
3
4
5
6
7
8
9
|
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲
True
(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲
False
,但該Queue已滿,會當即拋出Queue.Full異常。
q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲
True
(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲
False
,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
q.get_nowait():同q.get(
False
)
q.put_nowait():同q.put(
False
)
q.empty():調用此方法時q爲空則返回
True
,該結果不可靠,好比在返回
True
的過程當中,若是隊列中又加入了項目。
q.full():調用此方法時q已滿則返回
True
,該結果不可靠,好比在返回
True
的過程當中,若是隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
|
應用:
# 1.能夠往隊列裏聽任意類型的 2 # 2.先進先出 3 from multiprocessing import Process,Queue 4 q= Queue(3) 5 q.put('first') #默認block=True 6 q.put('second') 7 q.put('third') 8 9 print(q.get()) 10 print(q.get()) 11 print(q.get())
生產者和消費者模型
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型
一個生產者和一個消費者(有兩種方式)
一、q.put(None):生產者給放一個None進去
from multiprocessing import Process,Queue 2 import os 3 import time 4 import random 5 #首先得有生產者和消費者 6 # 生產者製造包子 7 '''這種用 q.put(None)放進去一個None的方法雖然解決了問題 8 可是若是有多個生產者多個消費者,或許框裏面沒有包子了可是 9 還有其餘的食物呢,你就已經顯示空着,這樣也能夠解決,就是不完美, 10 還能夠用到JoinableQueue去解決''' 11 def producter(q): 12 for i in range(10): 13 time.sleep(2) #生產包子得有個過程,就先讓睡一會 14 res = '包子%s'%i #生產了這麼多的包子 15 q.put(res) #吧生產出來的包子放進框裏面去 16 print('\033[44m%s製造了%s\033[0m'%(os.getpid(),res)) 17 q.put(None) #只有生產者才知道何時就生產完了(放一個None進去說明此時已經生產完了) 18 # 消費者吃包子 19 def consumer(q): 20 while True:#假如消費者不斷的吃 21 res = q.get() 22 if res is None:break #若是吃的時候框裏面已經空了,就直接break了 23 time.sleep(random.randint(1,3)) 24 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 25 if __name__ == '__main__': 26 q = Queue() 27 p1 = Process(target=producter,args=(q,)) 28 p2 = Process(target=consumer,args=(q,)) 29 p1.start() 30 p2.start() 31 p1.join() 32 p2.join() #等待執行完上面的進程,在去執行主 33 print('主')
二、利用JoinableQueue
from multiprocessing import Process,JoinableQueue 2 import os 3 import time 4 import random 5 #首先得有生產者和消費者 6 # 消費者吃包子 7 def consumer(q): 8 '''消費者''' 9 while True:#假如消費者不斷的吃 10 res = q.get() 11 time.sleep(random.randint(1,3)) 12 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 13 q.task_done() #任務結束了(消費者告訴生產者,我已經吧東西取走了) 14 # 生產者製造包子 15 def producter(q): 16 '''生產者''' 17 for i in range(5): 18 time.sleep(2) #生產包子得有個過程,就先讓睡一會 19 res = '包子%s'%i #生產了這麼多的包子 20 q.put(res) #吧生產出來的包子放進框裏面去 21 print('\033[44m%s製造了%s\033[0m'%(os.getpid(),res)) 22 q.join() 23 24 if __name__ == '__main__': 25 q = JoinableQueue() 26 p1 = Process(target=producter,args=(q,)) 27 p2 = Process(target=consumer,args=(q,)) 28 p2.daemon = True #在啓動以前吧消費者設置成守護進程,p1結束了p2也就結束了 29 p1.start() 30 p2.start() 31 p1.join() #在等生產者結束(生產者結束後,就不製造包子了,那消費者一直在吃,就卡住了 32 #都不生產了還吃啥,就把消費者也結束了 ) 33 #等待執行完上面的進程,在去執行主 34 print('主')
多個生產者和多個消費者(有兩種方式)
一、q.put(None):生產者給放一個None進去
二、利用JoinableQueue
from multiprocessing import Process,JoinableQueue 2 import os 3 import time 4 import random 5 #首先得有生產者和消費者 6 # 消費者吃包子 7 def consumer(q): 8 while True: 9 res = q.get() 10 time.sleep(random.randint(1,3)) 11 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 12 q.task_done() #任務結束了(消費者告訴生產者,我已經吧東西取走了) 13 def product_baozi(q): 14 for i in range(5): 15 time.sleep(2) 16 res = '包子%s' % i 17 q.put(res) 18 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 19 q.join() #不用put(None) 了,在等q被取完。(若是數據沒有被取完,生產者就不會結束掉) 20 def product_gutou(q): 21 for i in range(5): 22 time.sleep(2) 23 res = '骨頭%s' % i 24 q.put(res) 25 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 26 q.join() 27 def product_doujiang(q): 28 for i in range(5): 29 time.sleep(2) 30 res = '豆漿%s' % i 31 q.put(res) 32 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 33 q.join() 34 35 if __name__ == '__main__': 36 q = JoinableQueue() 37 # 生產者們:廚師們 38 p1 = Process(target=product_baozi,args=(q,)) 39 p2 = Process(target=product_doujiang,args=(q,)) 40 p3 = Process(target=product_gutou,args=(q,)) 41 42 #消費者們:吃貨們 43 p4 = Process(target=consumer,args=(q,)) 44 p5 = Process(target=consumer,args=(q,)) 45 p4.daemon = True 46 p5.daemon = True 47 # p1.start() 48 # p2.start() 49 # p3.start() 50 # p4.start() 51 # p5.start() 52 li = [p1,p2,p3,p4,p5] 53 for i in li: 54 i.start() 55 p1.join() 56 p2.join() 57 p3.join() 58 print('主')
方式二:管道(不推薦使用,瞭解便可)
管道至關於隊列,可是管道不自動加鎖
方式三:共享數據(不推薦使用,瞭解便可)
共享數據也沒有自動加鎖的功能,因此仍是推薦用隊列的。感興趣的能夠研究研究管道和共享數據