python-進程池

********進程池********

****進程池****

爲何要有進程池?

答:
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千
萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬
程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響
序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?

在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就
一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。
果有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進
才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣
增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。

******multiprocess.Pool模塊******

****概念介紹

Pool([numprocess  [,initializer [, initargs]]]):建立進程池

**參數介紹

numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值(這是os模塊的一個方法)
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initialiizer的參數組

**主要方法
    def apply(self, func, args=(), kwds={}):
        '''
        Equivalent of `func(*args, **kwds)`.
        '''
        assert self._state == RUN
        return self.apply_async(func, args, kwds).get()

    def map(self, func, iterable, chunksize=None):
        '''
        Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned.
        '''
        return self._map_async(func, iterable, mapstar, chunksize).get()

    def apply_async(self, func, args=(), kwds={}, callback=None,
            error_callback=None):
        '''
        Asynchronous version of `apply()` method.
        '''
        if self._state != RUN:
            raise ValueError("Pool not running")
        result = ApplyResult(self._cache, callback, error_callback)
        self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
        return result

1 p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,
必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
同步,只有func被執行完後纔會繼續執行代碼,返回值爲func的return值
同步處理任務,進程池中的全部進程都是普通進程
2 p.map(self, func, iterable, chunksize=None):
異步,自帶close和join,返回值爲func返回值組成的列表
3 p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,
將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
異步,當func被註冊進入一個進程後,程序就繼續向下執行,返回一個對象,這個對象有get方法能夠取到值(這是func的返回值)
obj.get() 會阻塞,知道對應的func執行完畢拿到結果。須要先close後join來保持多進程和主進程代碼的同步性。
異步處理任務時,進程池中的全部進程都是守護進程
有回調函數 callback
4 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
5 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
**其餘方法

1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
2 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是
遠程操做中引起了異常,它將在調用此方法時再次被引起。
3 obj.ready():若是調用完成,返回True
4 obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
5 obj.wait([timeout]):等待結果變爲可用。
6 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

**例子

*同步和異步*



同步
from multiprocessing import Pool
import os, time


def work(n):
    print('%s run' % os.getpid())
    time.sleep(1)
    return n * 3


if __name__ == '__main__':
    p = Pool(3)  # 進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    for i in range(10):
        res = p.apply(work, args=(i,))  # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞
                                        # 但無論該任務是否存在阻塞,同步調用都會在原地等着
        print(res)  # res爲func的返回值


異步
import os
import time
import random
from multiprocessing import Pool


def work(n):
    print('%s run' % os.getpid())
    time.sleep(random.random())
    return n ** 2


if __name__ == '__main__':
    p = Pool(3)  # 進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l = []
    for i in range(10):
        res = p.apply_async(work, args=(i,))  # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
        # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
        # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束
        # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。
        res_l.append(res)

    # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果
    # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get())  # 使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get


回調函數

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:
我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),
這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。


若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數
相關文章
相關標籤/搜索