併發編程之多進程

 

目錄python

 

一 multiprocessing模塊介紹                                                          

二 Process類的介紹

三 開啓進程的兩種方式

四 殭屍進程與孤兒進程

五 Process的join方法和其餘屬性方法介紹

六 守護進程

七 互斥鎖

八 隊列介紹

九 生產者消費者模型介紹

十 練習題

 

 


一 multiprocessing模塊介紹

python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu\_count\(\)查看),在python中大部分狀況須要使用多進程。編程

Python提供了multiprocessing。 multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,>提供了Process、Queue、Pipe、Lock等組件。json

須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。windows

二 Process類的介紹

建立進程的類:安全

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,可用來開啓一個子進程

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:數據結構

group參數未使用,值始終爲None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

name爲子進程的名稱

方法介紹:多線程

p.start():啓動進程,並調用該子進程中的p.run() 
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  

p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間。

屬性介紹:併發

p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置

p.name:進程的名稱

p.pid:進程的pid

三 開啓進程的兩種方式

注意:在windows中Process()必須放到# if __name__ == '__main__':下dom

建立並開啓子進程的方式一socket

 1 from multiprocessing import Process  2 import time  3 
 4 def task(name):  5     print("%s is running" % name)  6     time.sleep(3)  7     print("%s is done" % name)  8 
 9 
10 if __name__ == "__main__": 11     p = Process(target=task, kwargs={"name": "子進程1"}) 12  p.start() 13     print("")
View Code

建立並開啓子進程的方式二

 1 from multiprocessing import Process  2 import time  3 
 4 class MyProcess(Process):  5     def __init__(self, name):  6         super().__init__()  7         self.name = name  8 
 9     def run(self): 10         """
11  默認方法run 12  :return: 13         """
14         print("%s is running" % self.name) 15         time.sleep(3) 16         print("%s is done " % self.name) 17         
18         
19 if __name__ == "__main__": 20     p = MyProcess("子進層1") 21     p.start()
View Code

 

四殭屍進程與孤兒進程

殭屍進程:(父進程沒結束,子進程提早結束(清理內存空間保留狀態),父進程沒有處理子進程的狀態)(有害,應當避免) 
一個進程使用fork建立子進程,若是子進程退出,而父進程沒有調用wait或waitpid獲取進程的狀態信息,那麼子進程的進程描述符仍保存在系統中,這種進程稱爲僵死進程。

孤兒進程:(父進程提早退出,子進程還沒結束,子進程成爲孤兒進程)(無害) 
一個父進程退出,而它的一個或着多個子進程還在運行,那麼這些子進程將稱爲孤兒進程。孤兒進程將被init進程(進程號1)所收養, 並由init進程對它們完成狀態收集工做

任何一個子進程(init除外)在exit()以後,並非立刻就消失掉,而是留下一個稱爲殭屍進程(Zombie)的數據結構,等待父進程去處理。這是每一個子進程結束時都必須通過的階段。若是子進程在exit()以後,父進程沒有來的及處理,此時用ps命令查看狀態的話就時Z狀態。若是父進程能及時處理掉,則用ps就查看不到子進程的狀態。

殭屍進程的危害

unix提供了一種機制去獲取子進程結束時的狀態信息。在每一個進程退出的時候,內核釋放全部的資源,包括打開的文件,佔用的內存等。可是仍然爲其保留必定的信息(包括進程號processid,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等)。直到父進程經過wait/waitpid來取時才釋放。若是父進程不調用wait/waitpid的話,那麼保留的那段信息就不會釋放,其進程號就會一直被佔用,可是系統所能使用的進程號是有限的,若是大量的產生僵死進程,將由於沒有可用的進程號而致使系統不能產生新的進程,這就是講師進程的危害,應當避免

五Process的join方法和其餘屬性方法介紹

join定義:

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間。

在主進程運行過程當中若是想併發地執行其餘的任務,咱們能夠開啓子進程,此時主進程的任務與子進程的任務分兩種狀況

狀況一:在主進程的任務與子進程的任務彼此獨立的狀況下,主進程的任務先執行完畢後,主進程還須要等待子進程執行完畢,而後統一回收資源。

狀況二:若是主進程的任務在執行到某一個階段時,須要等待子進程執行完畢後才能繼續執行,就須要有一種機制可以讓主進程檢測子進程是否運行完畢,在子進程執行完畢後才繼續執行,不然一直在原地阻塞,這就是join方法的做用

 1 from multiprocessing import Process  2 import time  3 
 4 def task(name, n):  5     print("%s is running" % name)  6  time.sleep(n)  7 
 8 
 9 if __name__ == "__main__": 10     start_time = time.time() 11     p = Process(target=task, kwargs={"name": "子進程", "n": 5}) 12     p1 = Process(target=task, kwargs={"name": "子進程1", "n": 4}) 13     p2 = Process(target=task, kwargs={"name": "子進程2", "n": 3}) 14     p3 = Process(target=task, kwargs={"name": "子進程3", "n": 2}) 15 
16     p_l = [p, p1, p2, p3] 17     for p in p_l: 18  p.start() 19     for p in p_l: 20  p.join() 21 
22     print("") 23     # 同時發送p,p1,p2,p3,主進程結束時間是,等待子進程運行時間最長的那個:
24     print(time.time()-start_time)
View Code
p.is_alive()定義:
若是p仍然運行,返回True
 1 from multiprocessing import Process  2 
 3 def task(name):  4     print("%s is running" % name)  5 
 6 if __name__ == "__main__":  7     p = Process(target=task, kwargs={"name": "子進程1"})  8  p.start()  9  p.join() 10     # p.is_alive():若是p仍然運行,返回True
11     print(p.is_alive()) 12     print("")
View Code
p.terminate()定義:
強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
 1 from multiprocessing import Process  2 import time  3 def task(name):  4     print("%s is running" % name)  5 
 6 if __name__ == "__main__":  7     p = Process(target=task, kwargs={"name": "子進程1"})  8  p.start()  9  p.terminate() 10     # 若是不睡上2秒,只是發送p.terminate(),還沒運行,p.is_alive()返回的是TRUE
11     time.sleep(2) 12     print(p.is_alive()) 13     print("")
View Code
p.pid定義:進程的pid

使用os模塊的os.getpid()和os.getppid()方法來查看:

 
 
View Code
 
 

查看運行軟件的進程方法

 
 

wimdows系統在cmd中用:tasklist | findstr pycharm

 
 

mac系統:ps aux | grep pycharm

p.name定義:進程的名稱
 
 
 1 from multiprocessing import Process  2 def task(name):  3     print("%s is running" % name)  4 
 5 
 6 if __name__ == "__main__":  7     p = Process(target=task, kwargs={"name": "子進程1"}, name="revenge")  8  p.start()  9     print("") 10     print(p.name)
View Code
 
  
 

六守護進程

關於守護進程須要強調兩點:

其一:守護進程會在主進程代碼執行結束後就終止

其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

 
 
 1 from multiprocessing import Process  2 import time  3 
 4 def task(name):  5     print("%s is running" % name)  6     time.sleep(3)  7     print("%s is done" % name)  8 
 9 
10 if __name__ == "__main__": 11     p = Process(target=task, kwargs={"name": "子進程1"}) 12     # 必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
13     p.daemon = True 14  p.start() 15     # 只要終端打印出這一行內容,那麼守護進程p也就跟着結束掉了
16     print("")
View Code

七互斥鎖

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,以下

 1 from multiprocessing import Process  2 import time  3 
 4 def task(name):  5     print("%s 1" % name)  6     time.sleep(1)  7     print("%s 2" % name)  8     time.sleep(1)  9     print("%s 3" % name) 10 
11 
12 if __name__ == "__main__": 13     for i in range(3): 14         p = Process(target=task, args=("進程%s" % i)) 15         p.start()
View Code 併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂

 如何控制,就是加鎖處理。而互斥鎖的意思就是互相排斥,若是把多個進程比喻爲多我的,互斥鎖的工做原理就是多我的都要去爭搶同一個資源:衛生間,一我的搶到衛生間後上一把鎖,其餘人都要等着,等到這個完成任務後釋放鎖,其餘人才有可能有一個搶到......因此互斥鎖的原理,就是把併發改爲穿行,下降了效率,但保證了數據安全不錯亂

 1 from multiprocessing import Process,Lock  2 import time  3 
 4 def task(name, mutex):  5  mutex.acquire()  6     print("%s 1" % name)  7     time.sleep(1)  8     print("%s 2" % name)  9     time.sleep(1) 10     print("%s 3" % name) 11  mutex.release() 12 
13 
14 if __name__ == "__main__": 15     mutex = Lock() 16     for i in range(3): 17         p = Process(target=task, args=("進程%s" % i, mutex)) 18         p.start()
View Code

互斥鎖與join

使用join能夠將併發變成串行,互斥鎖的原理也是將併發變成穿行,那咱們直接使用join就能夠了啊,爲什麼還要互斥鎖?

答:join是將一個任務總體串行,而互斥鎖的好處則是能夠將一個任務中的某一段代碼串行,好比只讓task函數中的get任務串行

總結:

加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行地修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

雖然能夠用文件共享數據實現進程間通訊,但問題是:

一、效率低(共享數據基於文件,而文件是硬盤上的數據)

二、須要本身加鎖處理

所以咱們最好找尋一種解決方案可以兼顧:

一、效率高(多個進程共享一塊內存的數據)

二、幫咱們處理好鎖問題。

這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。

隊列和管道都是將數據存放於內存中,而隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,於是隊列纔是進程間通訊的最佳選擇。

咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行地修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

八隊列介紹

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

建立隊列的類(底層就是以管道和鎖定的方式實現):

Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

參數介紹:

maxsize是隊列中容許最大項數,省略則無大小限制。

但須要明確:

一、隊列內存放的是消息而非大數據

二、隊列佔用的是內存空間,於是maxsize即使是無大小限制也受限於內存大小

主要方法介紹:

q.put方法用以插入數據到隊列中。

q.get方法能夠從隊列讀取而且刪除一個元素。

九生產者消費者模型介紹

爲何要使用生產者消費者模型

生產者指的是生產數據的任務,消費者指的是處理數據的任務,在併發編程中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者和消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這個阻塞隊列就是用來給生產者和消費者解耦的

JoinableQueue([maxsize])

這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

參數介紹

maxsize是隊列中容許最大項數,省略則無大小限制。

方法介紹

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:

q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常

q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

 1 from multiprocessing import Process,JoinableQueue  2 import time  3 
 4 def producer(q):  5     for i in range(5):  6         res = "內鬆%s" % i  7         time.sleep(2)  8         print("生產者生產了%s" % res)  9  q.put(res) 10  q.join() 11 
12 def consumer(q): 13     while True: 14         res = q.get() 15         time.sleep(1) 16         print("消費者吃了%s" % res) 17  q.task_done() 18 
19 
20 if __name__ == "__main__": 21     q = JoinableQueue() 22     p = Process(target=producer, args=(q,)) 23     p1 = Process(target=producer, args=(q,)) 24     c = Process(target=consumer, args=(q,)) 25     c1 = Process(target=consumer, args=(q,)) 26     c.daemon = True 27     c1.daemon = True 28  c.start() 29  c1.start() 30  p.start() 31  p1.start() 32  p.join() 33  p1.join() 34     print("")
View Code

生產者消費者模型總結

一、程序中有兩類角色

一類負責生產數據(生產者)
一類負責處理數據(消費者)

二、引入生產者消費者模型爲了解決的問題是

平衡生產者與消費者之間的速度差
程序解開耦合

三、如何實現生產者消費者模型

生產者<--->隊列<--->消費者

 

 

十練習題

一、思考開啓進程的方式一和方式二各開啓了幾個進程?

答:各開啓了兩進程

二、進程之間的內存空間是共享的仍是隔離的?下述代碼的執行結果是什麼?

 1 from multiprocessing import Process  2 
 3 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了
 4 n=100 
 5 
 6 def work():  7     global n  8     n=0  9     print('子進程內: ',n) 10 
11 
12 if __name__ == '__main__': 13     p=Process(target=work) 14  p.start() 15     print('主進程內: ',n)
View Code

答:內存空間是隔離的。

代碼運行結果:

主進程內: 100
子進程內: 0

三、基於多進程實現併發的套接字通訊?

服務端:

 1 import socket  2 from multiprocessing import Process  3 def talk(conn):  4     while True:  5         try:  6             res = conn.recv(1024)  7             if not res:  8                 continue
 9  conn.send(res.upper()) 10         except Exception as e: 11             print('\033[1;31m客戶端出現錯誤,關閉鏈接!---->\033[0m', e) 12             break
13  conn.close() 14 
15 def serve(ip, port): 16     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 17     server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 18  server.bind((ip, port)) 19     server.listen(5) 20 
21     while True: 22         conn, client_addr = server.accept() 23         p = Process(target=talk, args=(conn,)) 24  p.start() 25 
26 
27 if __name__ == "__main__": 28     serve("127.0.0.1", 8080)
View Code

客戶端:

 1 import socket  2 
 3 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  4 
 5 client.connect(("127.0.0.1", 8080))  6 while True:  7     cmd = input(">>>:").strip()  8     if not cmd:continue
 9     client.send(cmd.encode("utf-8")) 10     res = client.recv(1024) 11     print(res.decode("utf-8"))
View Code

思考每來一個客戶端,服務端就開啓一個新的進程來服務它,這種實現方式有無問題?

答:有問題,若是客戶端愈來愈多,多道必定的數目,服務端會奔潰:

四、改寫下列程序,分別別實現下述打印效果

 1 from multiprocessing import Process  2 import time  3 import random  4 
 5 def task(n):  6     time.sleep(random.randint(1,3))  7     print('-------->%s' %n)  8 
 9 if __name__ == '__main__': 10     p1=Process(target=task,args=(1,)) 11     p2=Process(target=task,args=(2,)) 12     p3=Process(target=task,args=(3,)) 13 
14  p1.start() 15  p2.start() 16  p3.start() 17 
18     print('-------->4')
View Code

保證最後輸出-------->4

 1 from multiprocessing import Process  2 import time  3 import random  4 
 5 def task(n):  6     time.sleep(random.randint(1, 3))  7     print('-------->%s' % n)  8 
 9 
10 if __name__ == '__main__': 11     p1 = Process(target=task, args=(1,)) 12     p2 = Process(target=task, args=(2,)) 13     p3 = Process(target=task, args=(3,)) 14     p_l = [p1, p2, p3] 15     for p in p_l: 16  p.start() 17     for p in p_l: 18         p.join()
View Code

保證按順序輸出(串行了)

def task(n): time.sleep(random.randint(1, 3)) print('-------->%s' % n) if __name__ == '__main__': p1 = Process(target=task, args=(1,)) p2 = Process(target=task, args=(2,)) p3 = Process(target=task, args=(3,)) p_l = [p1, p2, p3] for p in p_l: p.start() p.join() print('-------->4')

五、思考下列代碼的執行結果有可能有哪些狀況?爲何?

from multiprocessing import Process

import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------")

 

六、模擬搶票練習

 1 # 文件"db.json"
 2 from multiprocessing import Process,Lock  3 import time  4 import json  5 def search(name):  6     dic = json.load(open("db.json", "r", encoding="utf-8"))  7     time.sleep(1)  8     print("<%s>剩餘票數%s" % (name, dic["count"]))  9 
10 def get(name): 11     dic = json.load(open("db.json", "r", encoding="utf-8")) 12     time.sleep(1) 13     if dic["count"] > 0: 14         dic["count"] -= 1
15         time.sleep(2) 16         json.dump(dic, open("db.json", "w", encoding="utf-8")) 17         print("<%s>購票成功" % name) 18 
19 
20 def task(name, mutex): 21  search(name) 22  mutex.acquire() 23  get(name) 24     mutex.release()
View Code
相關文章
相關標籤/搜索