joinablequeue模塊 生產者消費者模型 Manager模塊 進程池 管道

1、生產者消費者程序員

  主要是爲解耦(藉助隊列來實現生產者消費者模型)安全

  import queue  # 不能進行多進程之間的數據傳輸app

  (1)from multiprocessing import Queue    藉助Queue解決生產者消費者模型,隊列是安全的。異步

    q = Queue(num)async

    num :爲隊列的最大長度函數

    q.get() # 阻塞等待獲取數據,若是有數據直接獲取,若是沒有數據,阻塞等待spa

    q.put() # 阻塞,若是能夠繼續往隊列中放數據,就直接放,不能放就阻塞等待code

    q.get_nowait() # 不阻塞,若是有數據直接獲取,沒有數據就報錯對象

    q.put_nowait() # 不阻塞,若是能往隊列中放數據直接放,不能夠就報錯blog

    

  (2)from multiprocessing import JoinableQueue  # 可鏈接的隊列

    JoinableQueue 是繼承Queue  ,因此能夠使用Queue中的方法

    而且JoinableQueue 又多了兩個方法

    q.join() # 用於生產者。等待q.task_done的返回結果,經過返回結果,生產者就能得到消費者當前消費了多少個數據。

    q.task_done() # 用於消費者,是指每消費隊列中的一個數據,就給join返回一個標識。

 1 from multiprocessing import Queue,Process,Pool,JoinableQueue  2 
 3 
 4 def consumer(q,lis):  5     while 1:  6         for i in lis:  7             print(i + '拿走了' + q.get())  8             q.task_done() # get() 一次就會給生產者的join返回一次數據
 9 
10 
11 def producer(q,name1): 12     for i in range(1,9): 13         q.put(name1 + '第%s號劍'% i) 14     q.join() # 記錄了生產者往隊列中添加了8個數據,此時會阻塞,等待消費返回8次數據,後生產者進程纔會結束
15 
16 
17 if __name__ == '__main__': 18     q = JoinableQueue() # 實例化一個隊列
19     p = Process(target=consumer,args=(q,['蓋聶','衛莊','高漸離','勝七','掩日'])) 20     p1 = Process(target=producer,args=(q,'越王八劍')) 21     p.daemon = True # 注意是把消費者設置爲守護進程,會隨着主進程的結束而結束。
22  p.start() 23  p1.start() 24     p1.join() # 主進程會等待生產者進程結束後才結束,而生產者進程又會等待消費者進程消費完之後才結束。

 

2、進程之間的共享內存

  from multiprocessing import Manager,Value

  m = Manager() 

  num = m.dict({鍵 :值})

  num = m.list([1,2,3])

from multiprocessing import Process,Manager,Value def func(num): for i in num: print(i - 1) # 結果爲:0,1,2

if __name__ == '__main__': m = Manager() # 用來進程之間共享數據的
    num = m.list([1,2,3]) p = Process(target=func,args=(num,)) p.start() p.join() # 等待func子進程執行完畢後結束



#################Value################

from multiprocessing import Process,Manager,Value def func1(num): print(num) num.value += 1 # 和Manager用法不同
    print(num.value) if __name__ == '__main__': num = Value('i',123) # Manager裏面不須要傳參數
    p = Process(target=func1,args=(num,)) p.start() p.join()

 

3、進程池

  進程池:一個池子,裏邊有固定數量的進程。這些進程一直處於待命狀態,一旦有任務來,立刻就去處理。

  進程池還會幫程序員去管理池中的進程。

  from multiprocessing import Pool

  p = Pool(os.cpu_count() + 1)

  進程池有三個方法:

    map(func,iterable)     有返回值

    iterable:可迭代對象,是把可迭代對象中的每一個元素一次傳給任務函數當參數 

from multiprocessing import Pool def func(num): num += 1
    print(num) return num # 返回給map方法


if __name__ == '__main__': p = Pool() res = p.map(func,[i for i in range(10)]) # 參數爲目標對象和可迭代對象
 p.close() p.join() # 等待子進程結束
    print('主進程',res) # res是一個列表

 

    apply(func,args=()) :apply的實現是進程之間是同步的,池中的進程一個一個的去執行。

    func :進程池中的進程執行的任務函數。

    args :可迭代對象型的參數,是傳給任務函數的參數。

    同步處理任務時,不須要close和join

    同步處理任務時,進程池中的全部進程是普通進程(主進程須要等待其子進程執行結束)

from multiprocessing import Pool def func(num): num += 1
    return num if __name__ == '__main__': p = Pool(5) # 實例化5個進程
    for i in range(100): res = p.apply(func,args=(i,)) # 這裏傳的參數是元祖,這裏是同步執行
        print(res)

 

    apply_async(func,args=(),callback=None) :進城之間是異步的,

    func :進程池中的進程執行的任務函數。

    args :可迭代對象型的參數,是傳給任務函數的參數

from multiprocessing import Pool


def func(num):
    num += 1
    return num


if __name__ == '__main__':
    p = Pool(5) # 實例化5個進程
    lis = []
    for i in range(100):
        res = p.apply_async(func,args=(i,)) # 異步執行,5個進程同時去調用func
        lis.append(res)
        print(res) # 打印結果爲 <multiprocessing.pool.ApplyResult object at 0x0347F3D0>
    p.close() # Pool中用apply_async異步執行時必須關閉進程
    p.join() # 由於是異步執行因此須要等待子進程結束
    print(lis) # 100個<multiprocessing.pool.ApplyResult object at 0x0347F3D0> 這種存放在列表中
    [print(i.get()) for i in lis] # 輸出100個數字[1......100]

 

    callback :回調函數,就是說每當進程池中有進程處理完任務,返回的結果能夠交給回調函數,由回調函數進行進一步的處理,回調函數只有異步纔有,同步是沒有的

    異步處理任務時,進程池中的全部進程是守護進程(主進程代碼執行完畢守護進程就結束)

    異步處理任務時,必需要加上close和join

    回調函數的使用:

      進程的任務函數的返回值,被當成回調函數的形參接收到,以此進行進一步的處理操做

      回調函數是由主進程調用的,而不是子進程,子進程只負責把結果傳遞給回調函數。

from multiprocessing import Pool import requests import os def func(ulr): res = requests.get(ulr) print('func進程的pid:%s' % os.getpid(),'父進程的pid:%s' % os.getppid()) if res.status_code == 200: return ulr,res.text def cal_back(sta): # func中返回的值被自動調用,並當成形參傳進來
    ulr,text = sta print('callback回調函數的pid:%s'% os.getpid(),'父進程的pid:%s' % os.getppid()) # 回調函數的pid和父進程的pid同樣

if __name__ == '__main__': p = Pool(5) lis = ['https://www.baidu.com', 'http://www.jd.com', 'http://www.taobao.com', 'http://www.mi.com', 'http://www.cnblogs.com', 'https://www.bilibili.com', ] print('父進程的pid:%s' % os.getpid()) for i in lis: p.apply_async(func,(i,),callback=cal_back) # 異步的執行每個進程,這裏的傳參和Process不一樣,這裏必須這樣寫callback=cal_back
    # 異步執行程序func,在每一個任務結束後,在func中return回一個結果,這個結果會自動的被callback函數調用,並當成形參來接收。
    p.close() # 進程間異步必須加上close()
    p.join()  # 等待子進程的結束

 

4、管道機制

  from multiprocessing import Pipe

  con1,con2 = Pipe()

  管道是不安全的

  管道是用於多進程之間通訊的一種方式。

  若是在單進程中使用管道,con1發數據,那麼就用con2來收數據

              con2發數據,那麼就用con1來收數據

  若是在多進程中使用管道,那麼就必須是父進程使用con1收,子進程就必須使用con2發

                    父進程使用con1發,子進程就必須使用con2收

                    父進程使用con2收,子進程就必須使用con1發

                    父進程使用con2發,子進程就必須使用con1收

  在管道中有一個著名的錯誤叫作EOFError。是指,父進程若是關閉了發送端,子進程還繼續收數據,那麼就會引起EOFError。 

# 單進程中管道的應用
from multiprocessing import Pipe con1,con2 = Pipe() # 管道機制
 con1.send('123') # con1發送,須要con2來接收 是固定
print(con2.recv()) con2.send('456') # con2發送,須要con1來接收 是固定
print(con1.recv())
# 多進程中管道的應用
from multiprocessing import Process,Pipe def func(con): con1,con2 = con con1.close() # 由於子進程只用con2與父進程通訊,因此關閉了
    while 1: try: print(con2.recv()) # 接收父進程con1發來的數據
        except EOFError: # 若是父進程的con1發完數據,並關閉管道,子進程的con2繼續接收數據,就會報錯。
            con2.close() # 當接到報錯,此時數據已經接收完畢,關閉con2管道。
            break # 退出循環


if __name__ == '__main__': con1,con2 = Pipe() p = Process(target=func,args=(con1,con2)) p.start() con2.close() # 由於父進程是用con1來發數據的,con2提早關閉。
    for i in range(10): # 生產數據
        con1.send('郭%s' % i) # 給子進程的con2發送數據
    con1.close() # 生產完數據,關閉父進程的con1管道
相關文章
相關標籤/搜索