Python多進程

普通方式開啓進程

from multiprocessing import Process
import os
import time

def fun(i):

    time.sleep(3)
    with open('text','r') as f:
        count = int(f.read())
        count -= 1
    with open('text','w') as f:
        f.write(str(count))
    print('這是 %s 個子進程,PID爲 %s 父進程PID爲 %s count:%s'%(i,os.getpid(),os.getppid(),count))



if __name__ == '__main__':
    p_s = []
    for i in range(5):
        p = Process(target=fun,args=(i,))
        p_s.append(p)

    for p in p_s:
        p.start()

繼承的方式開啓進程

from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,*args,**kwargs):
        super().__init__(*args,**kwargs)

    def run(self):
        print('開啓了一個子進程...')
        self.fun()

    def fun(self):
        time.sleep(3)
        print('子進程運行了')

if __name__ == '__main__':

    p = MyProcess()
    p.start()

from multiprocessing import Process,Value,Lock
def get_num(num,lock):
    for i in range(10):
        lock.acquire()# 獲取鎖
        num.value -= 1
        lock.release() # 釋放鎖
        print(num.value)



def put_num(num,lock):
    for i in range(10):
        lock.acquire() # 獲取鎖
        num.value += 1
        lock.release() # 釋放鎖

        print(num.value)


if __name__ == '__main__':
    num = Value('i',10) # Value能夠在多個進程之間共享數據
    lock = Lock() # 實例化一個鎖

    get_p = Process(target=get_num,args=(num,lock))
    get_p.start()
    put_p = Process(target=put_num,args=(num,lock))
    put_p.start()
    get_p.join()
    put_p.join()
    print(num.value)

信號量

from multiprocessing import Semaphore,Process
import time
def fun(num,lock):
    lock.acquire()
    print('%s進來了。。。'%num)
    time.sleep(1)
    lock.release()
    print('%s出來了....'%num)



if __name__ == '__main__':
    lock = Semaphore(5) # 至關於實例化了一個鎖,可是鎖有五把鑰匙
    for i in range(50):
        p = Process(target=fun,args=(i,lock))
        p.start()
from multiprocessing import Event,Process
import time


def signal_lamp(e):
    while True:
        if e.is_set(): # 若是當前事件狀態爲true那麼將,狀態改成false阻塞進程,模擬實現紅燈亮起狀態
            print('紅燈涼了..')
            e.clear()# 將event狀態改成false
        else:# 若是當前事件狀態爲false那麼將,狀態改成true容許進程運行,模擬實現綠燈燈亮起狀態
            print('綠燈涼了....')
            e.set()#將event狀態改成true
        time.sleep(10)


def Car(i,e):
    e.wait() # 若是e.is_set()爲false則阻塞在這一步,不然運行
    print('第%s輛車過去了....'%i)

if __name__ == '__main__':
    # Event幾種方法:
    
    # event.isSet():返回event的狀態值;
    
    # event.wait():
    # 若是
    # event.isSet() == False將阻塞進程;
    # event.isSet() == True 進程正常運行;
    
    # event.set(): 設置event的狀態值爲True,全部阻塞池的進程激活進入就緒狀態, 等待操做系統調度;
    
    # event.clear():恢復event的狀態值爲False。
    e = Event()
    sl = Process(target=signal_lamp,args=(e,))
    sl.start()

    for i in range(10):
        p = Process(target=Car,args=(i,e,))
        p.start()

生產者消費者模型Queue

from multiprocessing import Queue,Process

def producer(q):
    for i in range(20):
        q.put('第 %s 個包子' % (i,))


def comsumer(q):
    while True:
        info = q.get()
        if not info:
            break
        print('消費:%s'%(info,))


if __name__ == '__main__':
    q = Queue(20) # 建立一個隊列,隊列內部實現了鎖機制,因此在進程中共享數據是安全的
    p_p = Process(target=producer,args=(q,))
    p_c = Process(target=comsumer,args=(q,))

    p_p.start()
    p_c.start()
    p_p.join() # 等待生產者完成
    q.put(None)# 若是生產者完成了生成,則在隊列中存入None若是,消費者讀取到None則退出

生產者消費者模型JoinableQueue

from multiprocessing import Process,JoinableQueue

def producer(q):
    for i in range(20):
        q.put('第 %s 個包子' % (i,))
    q.join() # 阻塞,一直阻塞到消費者消費完全部包子(隊列中記錄了生產的包子個數)

def comsumer(q):
    while True:
        info = q.get()
        print('消費:%s'%(info,))
        q.task_done()# 每消費完一個包子,就會發送一個信號到隊列。
        




if __name__ == '__main__':
    q = JoinableQueue(20)

    p_p = Process(target=producer,args=(q,))
    p_c = Process(target=comsumer,args=(q,))

    p_p.start()
    p_c.daemon = True # 將消費者進程設置爲守護進程,那麼當主進程的代碼所有執行完畢,則會退出消費者進程
    p_c.start()
    p_p.join()  # 主進程等待生產者進程完成
    '''
    這段代碼的中心思想:
    首先:生產者進程q.join()會等待消費者吃完全部包子以後纔會結束
    而後:主進程 p_p.join() 會等待生產者進程結束
    最後:p_c.daemon = True 會等待主進程結束,而後結束
    
    致使的結果:消費者吃完全部包子 -> 生產者進程結束 -> 主進程代碼執行完畢 -> 消費者進程(守護進程)結束
    
    '''

管道

from multiprocessing import Process,Pipe

def producer(con):
    con1,con2 = con
    con2.close()
    for i in range(20):
        con1.send('第 %s 個包子' % (i,))


def comsumer(con):
    con1, con2 = con
    con1.close()
    while True:
        try:
            info = con2.recv()
        except EOFError:
            con2.close()
            break

        print('消費:%s'%(info,))

if __name__ == '__main__':
    # 管道內部沒有實現鎖機制,須要本身實現。因此管道在多進程中共享數據是不安全的
    con1,con2 = Pipe() # 管道的核心,若是是con1發送,那麼只能用con2接收
    p_p = Process(target=producer,args=((con1,con2),))

    p_c = Process(target=comsumer,args=((con1,con2),))
    p_p.start()
    p_c.start()
    p_p.join()
    con1.close()

進程池

from multiprocessing import Pool
import os
import time
def fun(num):

    return num + 1

def call_back(num):
    with open('a.txt','a') as f:
        f.write(str(num))
if __name__ == '__main__':
    pool = Pool(os.cpu_count()+1) # os.cpu_count() 能夠計算出當前計算機的核數
    # # map方法 用時:0.2000112533569336
    # start = time.time()
    # res = pool.map(fun,range(100)) # pool.map(函數,可迭代對象) 將可迭代對象中的每個元素傳入函數計算,而且將計算結果返回 res獲得一個列表
    # pool.close()  # 想要主進程等待進程池中的任務所有完成。首先得關閉進程池,使新任務沒法在放入進程池。以後在使用pool.join()來使主進程阻塞等待
    # pool.join()
    # print(res)
    # print(time.time()-start)


    # #apply(同步)方法,跟沒開多進程同樣,一個一個執行 用時 0.15600895881652832
    # start = time.time()
    # for i in range(100):
    #     res = pool.apply(func=fun,args=(i,)) # pool.apply(func=函數,args=(參數,)) 能夠獲得函數的返回值
    #     print(res)
    # print(time.time()-start)

    # apply(異步)方法 用時 0.08100461959838867
    # pool.apply(func=函數,args=(參數,),callback=回調函數)
        #   使用 res.get()能夠獲得函數的返回值。
        #   回調函數接收一個參數,參數就是func的返回值,自動傳參。(回調函數由主進程調用)
    
    # #回調函數的練習 
    # start = time.time()
    # res_l = []
    # for i in range(100):
    #     res = pool.apply_async(func=fun,args=(i,),callback=call_back)
    #     res_l.append(res)
    # pool.close()
    # pool.join()
    # print(time.time()-start)
相關文章
相關標籤/搜索