python之多進程

1、multiprocessing模塊

python中的多線程沒法利用多核優點,若是想要充分地使用多核cpu的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。python提供了multiprocessingcss

multiprocessing 模塊用來開啓子進程。並在子進程中執行咱們定製的任務(例如函數)。與多線程threading相似html

 

multiprocessing 模塊支持不少功能 :子進程、通訊、共享數據,執行不一樣的同步,提供Process、Queue、Pipe、Lock 等組件python

ps:與線程不一樣,進程沒有任何共享狀態,修改的數據(改動等等)僅限於該進程內。編程

 2、Process類介紹

 建立進程的類

Procss([group [, target [,name [, args [, kwargs ] ] ] ] ] ) ,由該類實例化獲得對象,表示一個子進程中的人物(還沒有啓動)json

ps:1. 須要指定關鍵字的方式來制定參數windows

  2. args 指定爲傳給 target 函數位置,是一個元組形式,必須有逗號安全

參數介紹

1. group 參數 未使用 ,值始終爲 None

2. target 表示 調用對象 , 即子進程要執行的任務(不加括號)

3. args 表示調用對象的位置參數元組,args = (x , y ,)

4. kwargs 表示調用對象的字典,kwargs={'x':1 , 'y':99}

5. name 爲子進程的名稱

方法介紹

1. p.start() :啓動進程,並調用該子進程的 p.run()

2. p.run() :進程啓動時運行的方法 ,它去調用target指定的函數,自定義的類必定要實現該方法。

3. p.terminate() :強制終止進程p ,不會進行任何清理操做,若是p建立了子進程,該子進程就變成殭屍進程了,使用該方法須要當心這個狀況,若是p還保存了一個鎖 那麼也將不會被釋放,進而致使死鎖

4. p.is_alive() : 若是p仍然運行,返回True 

5. p.join([timeout]) :主進程等待p終止 (ps:是主線程處於等 的狀態,而p是處於運行的狀態),timeout是可選的超時時間,須要強調的是,p.join 只能join 住 start 開啓的進程,而不能join住 run開啓的進程

屬性介紹

1. p.daemon : 默認爲False , 若是設爲True ,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設置完後,p不能建立本身的新進程,必須在p.start() 以前設置。

2. p.name :進程的名稱

3. p.pid :進程的pid

4. p.exitcode :進程運行時爲None、若是爲-N ,表示被信號N結束(瞭解)

5. p.authkey :進程的身份驗證鍵,默認是由 os.urandom()隨機生成的32位字符串,這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同身份驗證鍵時纔算成功(瞭解)

 

3、Process類的使用

在windows中Process()必須放到# if __name__ == '__main__':下網絡

建立並開啓子進程的兩種方式多線程

# 開啓子進程的方式1:

from multiprocessing import Process
import time

def task(name):
    print('%s is running' %name)
    time.sleep(3)
    print('%s is done' %name)

# 在windows系統上開啓子進程的操做必須放到該行代碼下
if __name__ == '__main__':
    p=Process(target=task,args=('子進程',)) # Process(target=task,kwargs={'name':'子進程'}) #
    p.start() # 僅僅只是向操做系統發送一個創造子進程的信號
    print('')
方式一
# 開啓子進程的方式2:
from multiprocessing import Process
import time

class Myprocess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(3)
        print('%s is done' %self.name)

# 在windows系統上開啓子進程的操做必須放到該行代碼下
if __name__ == '__main__':
    p=Myprocess('子進程')
    p.start() # 僅僅只是向操做系統發送一個創造子進程的信號,start會調用run方法
    p.join()
    print('')
方式二

進程之間的內存空間是隔離的

from multiprocessing import Process
n=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了
def work():
    global n
    n=0
    print('子進程內: ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主進程內: ',n)
View Code

將以前的基於TCP的套接字通訊改成支持併發

from socket import *
from multiprocessing import Process

FTPS=socket(AF_INET,SOCK_STREAM)
FTPS.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
FTPS.bind(('127.0.0.1',9999))
FTPS.listen(5)

def talk(conn):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg :break
            conn.sendall(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,client_addr=FTPS.accept()
        p=Process(target=talk,args=(conn,))
        p.start()
服務端
from socket import *

FTPC=socket(AF_INET,SOCK_STREAM)
FTPC.connect(('127.0.0.1',9999))

while True:
    cmd=input('>>:').strip()
    if not cmd:continue

    FTPC.sendall(cmd.encode('utf-8'))
    msg=FTPC.recv(1024)
    print(msg.decode('utf-8'))
客戶端

這樣作雖然能夠實現併發的效果  可是有一個問題併發

每來一個客戶端,都在服務端開啓一個進程,若是併發來十萬個客戶端,要開啓十萬個進程嗎
解決方法:進程池

 Process對象的join方法

from multiprocessing import Process
import time
import random

class Task(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name


    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randint(1,3))
        print('%s is ending' %self.name)

if __name__ == '__main__':
    p=Task('子進程')
    p.start()
    p.join(0.5) #等待p中止,等0.5秒就再也不等了
    print('開始')
View Code
from multiprocessing import Process
import time
import random
class Task(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name


    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randint(1,3))
        print('%s is ending' %self.name)
if __name__ == '__main__':
    p_msg = []
    for p in range(4):
        p=Task('子進程%s'%p)
        p_msg.append(p)
        p.start()
    for p in p_msg:
        p.join()
#固然不是了,必須明確:p.join()是讓誰等?
#很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,

#詳細解析以下:
#進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了
#而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵
#join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待
# 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間


    print('主線程')


#上述啓動進程與join進程能夠簡寫爲
# p_l=[p1,p2,p3,p4]
#
# for p in p_l:
#     p.start()
#
# for p in p_l:
#     p.join()
p.join詳細說明

process對象的其餘方法或屬性

from multiprocessing import Process
import time
import random
class Myprocess(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Myprocess-1,
        #                    #因此加到這裏,會覆蓋咱們的self.name=name

        #爲咱們開啓的進程設置名字的作法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is  ending' %self.name)
if __name__ == '__main__':

    p=Myprocess('子進程')
    p.start()
    print('開始')
    print(p.pid) #查看pid
name和pid
#進程對象的其餘方法一:terminate,is_alive
from multiprocessing import Process
import time
import random
class Myprocess(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Myprocess-1,
        #                    #因此加到這裏,會覆蓋咱們的self.name=name

        #爲咱們開啓的進程設置名字的作法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is  ending' %self.name)
if __name__ == '__main__':

    p=Myprocess('子進程')
    p.start()

    p.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
    print(p.is_alive()) #結果爲True

    print('開始')
    print(p.is_alive()) #結果爲False
terminate和is_alive

殭屍進程與孤兒進程

 參考博客:http://www.cnblogs.com/Anker/p/3271773.html

#總結殭屍進程和孤兒進程

#殭屍進程:有害
例若有個進程,它按期的產 生一個子進程,這個子進程須要作的事情不多,作完它該作的事情以後就退出了,所以這個子進程的生命週期很短,可是,父進程只管生成新的子進程,至於子進程 退出以後的事情,則一律漠不關心,這樣,系統運行上一段時間以後,系統中就會存在不少的僵死進程,假若用ps命令查看的話,就會看到不少狀態爲Z的進程。 嚴格地來講,僵死進程並非問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。所以,當咱們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是經過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元兇進程以後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們佔用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。

#孤兒進程無害:會被系統接管
總結孤兒和殭屍進程

4、守護進程

主進程建立守護進程

  其一:守護進程會在主進程代碼執行結束後就終止

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")
if __name__ == '__main__':
    p1=Process(target=foo)
    p1.daemon = True
    p1.start()
    print('主進程')
View Code
#主進程代碼運行完畢,守護進程就會結束
from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':

    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止
特殊的例子

5、互斥鎖

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

from multiprocessing import Process,Lock
import os,time

#互斥鎖 可理解爲生活中衛生間的鎖

def work(mutex):
    # mutex.acquire() 搶鎖操做
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    # mutex.release()  #釋放鎖操做

if __name__ == '__main__':
    # mutex=Lock()    #多進程需加在main後
    for i in range(3):
        p=Process(target=work)#args=(mutex,)#傳mutex
        p.start()
互斥鎖mutex

加鎖:由併發變成‘串行’,犧牲了運行效率,但保障了數據安全

例子:模擬12306搶票

from multiprocessing import Process,Lock
import json
import time,random

def search(i):
    with open('db.json','rt',encoding='utf-8') as f: #db.json 文件 {"count": 1},表示餘一張票
        dic=json.load(f)
        time.sleep(1)
        print('路人%s查看到剩餘票數:%s' %(i,dic['count']))

def get(i):
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    if dic['count'] > 0:
        # 有票
        dic['count']-=1
        time.sleep(random.randint(1,3)) #關鍵在於此處 睡的同時全部子進程都讀取到1張票
        with open('db.json','wt',encoding='utf-8') as f:
            json.dump(dic,f)
            print('路人%s搶票成功' % i)

    else:
        print('路人%s搶票失敗' %i)


def task(i,mutex):
    search(i)
    # mutex.acquire()
    get(i)
    # mutex.release()

if __name__ == '__main__':
    mutex = Lock()

    for i in range(1,11):
        p=Process(target=task,args=(i,mutex))
        p.start()
        # p.join()

    print('主進程')
模擬12306搶票:互斥鎖的應用

總結

#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理



#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
1 隊列和管道都是將數據存放於內存中
2 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

6、IPC機制

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

一、隊列

建立隊列的類(底層就是以管道和鎖定的方式實現的)

  Queue(maxsize):建立共享的進程隊列,Queue是多進程安全的隊列

可使用Queue實現多進程之間的數據傳遞。參數:maxsize 是隊列中容許最大數,不寫則無限制
from multiprocessing import Queue

主要方法:

'''
multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,可是隊列接口
'''

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
Queue 隊列
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
2 q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
8 q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
詳細

二、管道(瞭解)

建立管道的類

Pipe(duplex)在進程之間建立一條管道,並返回元組(coon1,coon2)表示管道的兩端

  ps:必須在產生process對象以前產生管道

參數:dumplex:默認管道是全雙工的,若是將duplex設成False,conn1只能用於接收,conn2只能用於發送。

主要方法:

    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 #其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
Pipe 管道
from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主進程')
Pipe 示例

7、生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

 特色:

  • 實現了 生產者 與 消費者 解耦合
  • 平衡了生產者的生產能力和消費者的消費能力

 爲何使用此模式:

當程序中存在明顯的兩類任務,一類負責造數據,另一類負責處理數據,就能夠用生產者消費者模型來實現
解耦和從而提高效率

如何實現

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型
實現方法
生產者進程-------->queue<----------消費者進程
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('')
示例

上述示例存在一個問題:主進程永遠不會結束,由於子進程的生產者在生產完後就結束了,可是消費者在取空q以後,則以後一直處於死循環且阻塞在q.get()這步。

  解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,此時便可在收到結束信號時break出死循環

 

另一種隊列提供了這種機制

   #JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

   #參數介紹:
    maxsize是隊列中容許最大項數,省略則無大小限制。    
  #方法介紹:
    JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
from multiprocessing import Process,JoinableQueue
import time,random

def producer(food,name,q):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res='%s%s' %(food,i)
        q.put(res)
        print('廚師%s生產了%s' %(name,res))


def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('吃貨%s吃了%s' %(name,res))
        q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了

if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們
    p1=Process(target=producer,args=('包子','tom',q))
    p2=Process(target=producer,args=('蛋湯','jerry',q))
    p3=Process(target=producer,args=('紅燒肉','bob',q))
    #消費者們
    c1 = Process(target=consumer,args=('吃貨1',q))
    c2 = Process(target=consumer,args=('吃貨2',q))
    c1.daemon=True
    c2.daemon=True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()


    p1.join()
    p2.join()
    p3.join()
    q.join() #主進程最後一行代碼運行完畢,生產者所有正常死亡,消費者也沒有存在的意義了

    print('')
另外一種Queue
#主進程等--->p1,p2,p3等---->c1,c2
#p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
#於是c1,c2也沒有存在的價值了,應該隨着主進程的結束而結束,因此設置成守護進程

8、信號量(瞭解)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去,若是指定信號量爲3,那麼來一我的得到一把鎖,計數加1,當計數等於3時,後面的人均須要等待。一旦釋放,就有人能夠得到一把鎖
    信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
from multiprocessing import Process,Semaphore
import time

def test(sem,user):
    sem.acquire()
    print('%s 佔一個位置' %user)
    time.sleep(2)
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(3)
    p_l=[]
    for i in range(12):
        p=Process(target=test,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)


    for i in p_l:
        i.join()
    print('')
信號量

9、事件(瞭解)

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

clear:將「Flag」設置爲False
set:將「Flag」設置爲True

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m紅燈亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m車%s 看見綠燈亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('燈的是%s,警車走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')
View Code

10、進程池(重要)

在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:

  1. 很明顯須要併發執行的任務一般要遠大於核數
  2. 一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程
  3. 進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)

 

例如當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。

用池是爲了將併發的進程或線程數目控制在計算機可承受的範圍內

咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數... 

ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

    建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程

from concurrent.futures import ProcessPoolExecutor 
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import random

def task(x):
    print('%s is running' %os.getpid())
    time.sleep(random.randint(1,3))
    return x**2

if __name__ == '__main__':
    p=ProcessPoolExecutor() #不指定參數默認池的大寫等於cpu的核數
    for i in range(10):
        p.submit(task,i)  #提交任務
    print('')
進程池

重點:

submit  提交任務

同步調用 vs 異步調用
提交任務的兩種方式

同步調用:提交完任務後,就在原地等待任務執行完畢,拿到運行結果/返回值後再執行下一行代碼
同步調用下任務的執行是串行執行

異步調用:提交完任務後,不會原地等待任務執行完畢,不等結果 直接執行下一行代碼
同步調用下任務的執行是併發執行
if __name__ == '__main__':
    # 異步調用
    p=ProcessPoolExecutor() #不指定參數默認池的大寫等於cpu的核數
    futures=[]
    for i in range(10):
        future=p.submit(task,i)  #提交任務
        futures.append(future)
    p.shutdown(wait=True) #關閉了進程池的入口,而後執行join操做
    for future in futures:
        print(future.result()) #拿到結果
    print('')

#main上方代碼     與上面示例一致
異步調用
if __name__ == '__main__':
    p=ProcessPoolExecutor() #不指定參數默認池的大寫等於cpu的核數
    for i in range(10):
        res=p.submit(task,i).result()
        print(res)

    print('')
同步調用
相關文章
相關標籤/搜索