python多進程

多進程

  • 進程:正在進行的過程或者說是一個任務,而負責執行任務則是cpu
  • 同一個程序執行兩次是兩次進程
  • 併發:
  • 並行:基於多核cpu

unix開子進程的拷貝一份父進程的數據python

進行的三個狀態:運行,阻塞,就緒windows

同步與異步

  • 同步:提交一個任務,只有等待這個任務結束纔會繼續提交下一個任務
  • 異步:只管提交任務,不等待這個任務執行完畢就能夠作其餘事情
  • 阻塞:socket模塊中的:recv,accept
  • 非阻塞:正常執行代碼

在python中如何開啓子進程

  1. multiprocessing模塊中的process類
# 方式1
from multiprocessing import Process
import time

def task(name):
    print('%s is running'%name)
    time.sleep(5)
    print('%s is done'%name)

if __name__ =='__main__':
    p = Process(target=task,args=('子進程1',))
    p.start() #僅僅只是給操做系統發送了一個信號
#方式二:自定義類繼承自process
from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self): #函數名必須是run
        print('%s is running'%self.name)
        time.sleep(5)
        print('%s is done'%self.name)

if __name__ == '__main__':
    p = MyProcess('子進程1')
    p.start()#本質就是在調用p.run
    print('進')

查看進程的pid與ppid(進程id)

  • 補充:
    • windows查看正在執行的進程:tasklist | findstr pycharm
    • Mac os查看正在執行的進程:ps aus|grep pycharm
from multiprocessing import Process
import time
import os


def task(name):
    print('%s is running,parent id is <%s>'%(os.getpid(),os.getppid()))
    time.sleep(5)
    print('%s is done'%os.getpid())

if __name__ =='__main__':
    p = Process(target=task,args=('子進程1',))
    p.start() #僅僅只是給操做系統發送了一個信號
    print('主',os.getpid())

process對象的其餘屬性或方法

  • join()等子進程結束完畢纔會繼續運行主進程,主進程結束後全部的殭屍進程結束
  • start()只是向操做系統發送信號,並不僅是把進程立馬開起來,若是連續有幾個start有可能執行的前後順序會錯亂,若是start方法後面立馬接一個join,多個子進程會變成串行
  • is_alive()查看子進程時候已經結束
  • terminate()殺死這個子進程
  • pid()查看進程id
  • name()查看這個對象的名字

守護進程

  • 當主進程執行結束子進程跟着結束:將daemon屬性設置爲true,守護進程不能有子進程。代碼執行到程序最後一行表明程序運行結束

互斥鎖

  • 把併發變成串行:使用multiprocessing模塊下的Lock對象
#犧牲效率實現子進程串行
from multiprocessing import Process,Lock
import time

def tack(name,lock):
    lock.acquire()#加鎖
    print('%s,1'%name)
    time.sleep(1)
    print('%s,2'%name)
    time.sleep(1)
    print('%s,3'%name)
    lock.release()#釋放鎖
if __name__ == '__main__':
    lock = Lock()#實例化鎖對象
    for i in range(3):
        p = Process(target=tack,args=('進程%s'%i,lock))#把鎖傳到子進程中
        p.start()
  • 互斥鎖與join的區別:join確實能實現代碼串行,但join把整個代碼變成串行,但互斥鎖能夠把部分代碼變成串行

事件

  • 一個信號可使全部的進程都進入阻塞狀態,也能夠控制全部的進程解除阻塞
  • 一個事件被建立以後,默認是阻塞狀態
from multiprocessing import Event
e = Event()# 建立了一事件
print(e.is_set())# 查看一個事件的狀態,默認是阻塞
e.set() # 將這個事件狀態改成True
print(e.is_set())
e.wait()# 根據e.is_set()的值決定是否阻塞,若是是True就是不阻塞,若是是False就是阻塞狀態
e.clear() # 將這個事件狀態改成False
  • 事件模擬紅綠燈:
import time
import random
from multiprocessing import Process,Event

def light(e):
    while True:
        if e.is_set():
            e.clear()
            print('紅燈亮了')
        else:
            e.set()
            print('綠燈亮了')
        time.sleep(5)
def cars(i,e):
    if not e.is_set():
        print('%s 在等待'%i)
        e.wait()
    print('%s 通行了'%i)


if __name__ == '__main__':
    e = Event()
    p1 = Process(target=light,args=(e,))
    p1.start()
    for i in range(20):
        p = Process(target=cars,args=(i,e))
        p.start()
        time.sleep(2)

隊列

  • multiprocessing有提供基於ipc通訊類(inter-process communication)
from multiprocessing import Queue

q = Queue(4)#指定隊列大小,若是不指定大小則大小爲無窮盡

q.put('hello')#插入數據到隊列中
print(q.full())#判斷隊列中數據是否滿了
q.get()#從隊列讀取並刪除一個數據
print(q.empty())#判斷隊列中數據是否空了
  • 生產者消費這模型:
    • 生產者:生產數據任務
    • 消費者:消費數據任務
from multiprocessing import Process,Queue
import time
def producer(q):
    for i in range(3):
        res = '包子%s'%i
        time.sleep(0.5)
        q.put(res)
        print('生產者生產了%s'%res)

def consumer(q):
    while True:
        res = q.get()
        if res == None:break
        time.sleep(0.7)
        print('消費者吃了%s'%res)


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    p1.start()
    c1.start()
    p1.join()
    c1.join()
    q.put(None)
  • JoinableQueue:
    • 消費者:每次獲取一個數據,處理一個數據,而且發送一個記號:標誌一個數據被處理成功,計數器-1
    • 生產者:每一次生產一個數據,且每一次生產的數據放在隊列中計數器+1,當生產者所有生產完畢以後發送一個join:已經中止生產數據且要等待以前生產的數據被消費完,當數據都被處理完時,join阻塞結束
from multiprocessing import Process,JoinableQueue
import time
def producer(q):
    for i in range(3):
        res = '包子%s'%i
        time.sleep(0.5)
        q.put(res)
        print('生產者生產了%s'%res)
    q.join() #阻塞,直到一個隊列中的全部數據,所有被處理完畢

def consumer(q):
    while True:
        res = q.get()
        time.sleep(0.7)
        print('消費者吃了%s'%res)
        q.task_done() # 處理完一個數據,隊列中的計數器-1


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    p1.start()
    c1.daemon = True #守護進程,主進程中的代碼執行完畢後,子進程自動結束
    c1.start()
    p1.join()

管道

  • 在進程之間傳遞信息(數據)
from multiprocessing import Pipe,Process

def func(conn1,conn2):
    conn1.close()
    while True:
        try:
            msg = conn2.recv()
            print(msg)
        except EOFError:
            print('子進程結束')
            conn2.close()
            break


if __name__ == '__main__':
    conn1,conn2 = Pipe()
    Process(target=func,args=(conn1,conn2)).start()
    conn2.close()
    for i in range(5):
        conn1.send('吃屎了你')
    conn1.close()

進程池

  • 建立進程池的過程
  1. 建立一個屬於進程的池子
  2. 這個池子指定能存放多少個進程
  3. 開啓進程
from multiprocessing import Pool,Process
import time
import os
def func(i):
    print('%s 進程正在運行'%i,os.getpid())
    time.sleep(2)
    print('%s 進程運行結束'%i,os.getpid())


if __name__ == '__main__':
    pool = Pool(3)
    for i in range(10):
        # pool.apply(func,args=(i,)) 同步調用
        pool.apply_async(func,args=(i,)) # 異步調用
    pool.close() # 結束進程池接收任務
    pool.join() # 感知進程池中的任務執行結束
    # apply_async必須是與close、join一塊兒使用
  • 進程池的返回值
from multiprocessing import Pool

def func(i):

    return i*i

if __name__ == '__main__':
    p = Pool(5)
    res_lis = []
    for i in range(10):
        res = p.apply_async(func,args=(i,))
        res_lis.append(res) 

    for res in res_lis:
        print(res.get())# res.get 默認是阻塞的,由於須要接收進程處理的結果,以後接收到結果以後才能夠繼續執行,因此
                            # 因此將進程的返回結果放在一個列表中,而後循環這個列表,再執行get就不會阻塞了
    p.close()
    p.join()
  • 進程池的回調函數
from multiprocessing import Pool
import os
def func1(i):
    print('in func1',os.getpid())
    return i*i

def func2(ii):
    print('in func2',os.getpid())
    print(ii)

if __name__ == '__main__':
    '''
    1. func1執行結果看成func2的參數,func1進程執行結束後返回結果,經過callback調用func2,將func1的執行返回結果看成參數傳遞給func2執行
    2. func1是一個新的進程
    3. func2是在主進程中執行的
    '''
    p = Pool(5)
    p.apply_async(func1,args=(2,),callback=func2) 
    p.close()
    p.join()
相關文章
相關標籤/搜索