python進程池:multiprocessing.pool

本文轉至http://www.cnblogs.com/kaituorensheng/p/4465768.html,在其基礎上進行了一些小小改動。css

在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。
Pool能夠提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程來它。html

例1:使用進程池

from multiprocessing import freeze_support,Pool
import time

def Foo(i):
    time.sleep(2)
    print('___time---',time.ctime())
    return i+100

def Bar(arg):
    print('----exec done:',arg,time.ctime())

if __name__ == '__main__':
    freeze_support()
    pool = Pool(3) #線程池中的同時執行的進程數爲3

    for i in range(4):
        pool.apply_async(func=Foo,args=(i,),callback=Bar) #線程池中的同時執行的進程數爲3,當一個進程執行完畢後,若是還有新進程等待執行,則會將其添加進去
        # pool.apply(func=Foo,args=(i,))

    print('end')
    pool.close()
    pool.join()#調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束
View Code

執行結果:app

end
___time--- Thu Jun 16 15:11:45 2016
----exec done: 100 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:45 2016
----exec done: 101 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:45 2016
----exec done: 102 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:47 2016
----exec done: 103 Thu Jun 16 15:11:47 2016

函數解釋dom

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
  • close()    關閉pool,使其不在接受新的任務。
  • terminate()    結束工做進程,不在處理未完成的任務。
  • join()    主進程阻塞,等待子進程的退出, join方法要在close或terminate以後使用。

執行說明:建立一個進程池pool,並設定進程的數量爲3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數爲3,因此0、一、2會直接送到進程中執行,當其中一個執行完過後才空出一個進程處理對象3,因此會出現輸出「msg: hello 3」出如今"end"後。由於爲非阻塞,主函數會本身執行自個的,不搭理進程的執行,因此運行完for循環後直接輸出「mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~」,主程序在pool.join()處等待各個進程的結束。async

例2:使用進程池(阻塞)

from multiprocessing import freeze_support,Pool
import time

def Foo(i):
    time.sleep(2)
    print('___time---',time.ctime())
    return i+100

def Bar(arg):
    print('----exec done:',arg,time.ctime())

if __name__ == '__main__':
    freeze_support()
    pool = Pool(3) #線程池中的同時執行的進程數爲3

    for i in range(4):
        pool.apply(func=Foo,args=(i,))

    print('end')
    pool.close()
    pool.join()#調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束
View Code

執行結果ide

___time--- Thu Jun 16 15:15:16 2016
___time--- Thu Jun 16 15:15:18 2016
___time--- Thu Jun 16 15:15:20 2016
___time--- Thu Jun 16 15:15:22 2016
end

例3:使用進程池,並關注結果

import multiprocessing
import time

def func(msg):
    print('hello :',msg,time.ctime())
    time.sleep(2)
    print('end',time.ctime())
    return 'done' + msg

if __name__=='__main__':
    pool = multiprocessing.Pool(2)
    result = []
    for i in range(3):
        msg = 'hello %s' %i
        result.append(pool.apply_async(func=func,args=(msg,)))

    pool.close()
    pool.join()

    for res in result:
        print('***:',res.get())

    print('AAAAAAAAll end--')
View Code

執行結果函數

 
 

hello : hello 0 Thu Jun 16 15:26:33 2016
hello : hello 1 Thu Jun 16 15:26:33 2016
end Thu Jun 16 15:26:35 2016
hello : hello 2 Thu Jun 16 15:26:35 2016
end Thu Jun 16 15:26:35 2016
end Thu Jun 16 15:26:37 2016
***: donehello 0
***: donehello 1
***: donehello 2
AAAAAAAAll end--post

 :get()函數得出每一個返回結果的值spa

例4:使用多個進程池

import multiprocessing
import time,os,random

def Lee():
    print('\nRun task Lee--%s******ppid:%s'%(os.getpid(),os.getppid()),'~~~~',time.ctime())
    start = time.time()
    time.sleep(random.randrange(10))
    end = time.time()
    print('Task Lee,runs %0.2f seconds.'%(end-start),'~~~~',time.ctime())

def Marlon():
    print("\nRun task Marlon-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print( 'Task Marlon runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())

def Allen():
    print( "\nRun task Allen-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print( 'Task Allen runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())

def Frank():
    print( "\nRun task Frank-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print( 'Task Frank runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())

if __name__ == '__main__':
    func_list = [Lee,Marlon,Allen,Frank]
    print('parent process id %s'%os.getpid())

    pool = multiprocessing.Pool(4)
    for func in func_list:
        pool.apply_async(func)  #Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中

    print( 'Waiting for all subprocesses done...')
    pool.close()
    pool.join()    #調用join以前,必定要先調用close() 函數,不然會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束
    print ('All subprocesses done.')
View Code

執行結果.net

parent process id 98552
Waiting for all subprocesses done...

Run task Lee--97316******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016

Run task Marlon-95536******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016

Run task Allen-95720******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016

Run task Frank-98784******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
Task Allen runs 0.31 seconds. ~~~~ Thu Jun 16 15:20:51 2016
Task Lee,runs 7.00 seconds. ~~~~ Thu Jun 16 15:20:57 2016
Task Frank runs 14.48 seconds. ~~~~ Thu Jun 16 15:21:05 2016
Task Marlon runs 31.72 seconds. ~~~~ Thu Jun 16 15:21:22 2016
All subprocesses done.

multiprocessing pool map

複製代碼
#coding: utf-8
import multiprocessing 

def m1(x): 
    print x * x 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
    i_list = range(8)
    pool.map(m1, i_list)
複製代碼

一次執行結果

0
1
4
9
16
25
36
49

 參考:http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx 

 

問題:http://bbs.chinaunix.net/thread-4111379-1-1.html

複製代碼
#coding: utf-8
import multiprocessing
import logging

def create_logger(i):
    print i

class CreateLogger(object):
    def __init__(self, func):
        self.func = func

if __name__ == '__main__':
    ilist = range(10)

    cl = CreateLogger(create_logger)
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    pool.map(cl.func, ilist)

    print "hello------------>"
複製代碼

一次執行結果

0123456789hello------------>

相關文章
相關標籤/搜索