multiprocessing模塊是跨平臺版本的多進程模塊。html
multiprocessing模塊提供了一個Process
類來表明一個進程對象。python
multiprocessing模塊提供了一個Pool類,能夠提供指定數量的進程供用戶調用。sql
Python的multiprocessing
模塊包裝了底層的機制,提供了Queue
、Pipes
等多種方式來交換數據。app
1、Process類
multiprocessing
模塊提供了一個Process
類來表明一個進程對象。less
實例:dom
from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.')
執行結果以下:async
Parent process 928. Process will start. Run child process test (929)... Process end.
建立子進程時,只須要傳入一個執行函數和函數的參數,建立一個Process
實例,用start()
方法啓動。函數
join()
方法能夠等待子進程結束後再繼續往下運行,一般用於進程間的同步。ui
若是要啓動大量的子進程,能夠用進程池的方式批量建立子進程。this
multiprocessing模塊提供了一個Pool類,能夠提供指定數量的進程供用戶調用。
當有新的請求提交到Pool中時,若是池尚未滿,就會建立一個新的進程來執行請求。若是池滿,請求就會告知先等待,直到池中有進程結束,纔會建立新的進程來執行這些請求。
例1:
from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.')
執行結果:
Parent process 669. Waiting for all subprocesses done... Run task 0 (671)... Run task 1 (672)... Run task 2 (673)... Run task 3 (674)... Task 2 runs 0.14 seconds. Run task 4 (673)... Task 1 runs 0.27 seconds. Task 3 runs 0.86 seconds. Task 0 runs 1.41 seconds. Task 4 runs 1.91 seconds. All subprocesses done.
例2:
from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': pool = Pool(processes=4) # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print result.get(timeout=1) # prints "100" unless your computer is *very* slow print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print it.next() # prints "0" print it.next() # prints "1" print it.next(timeout=1) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print result.get(timeout=1) # raises multiprocessing.TimeoutError
1.apply()
函數原型:apply(func[, args=()[, kwds={}]])
該函數用於傳遞不定參數,同python中的apply函數一致,主進程會被阻塞直到函數執行結束(不建議使用,而且3.x之後不在出現)。
2.apply_async
函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
與apply用法一致,但它是非阻塞的且支持結果返回後進行回調。
Pool.apply_async is also like Python's built-in apply
, except that the call returns immediately instead of waiting for the result. An ApplyResult
object is returned.
You call its get()
method to retrieve the result of the function call. The get()
method blocks until the function is completed.
3.map()
函數原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內置的map函數用法行爲基本一致,它會使進程阻塞直到結果返回。
注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程序纔會運行子進程。
4.map_async()
函數原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,可是它是非阻塞的。其有關事項見apply_async。
5.close()
關閉進程池(pool),使其不在接受新的任務。
6.terminal()
結束工做進程,不在處理未處理的任務。
7.join()
主進程阻塞等待子進程的退出, join方法要在close或terminate以後使用。
Notice also that you could call a number of different functions with Pool.apply_async
(not all calls need to use the same function).
In contrast, Pool.map
applies the same function to many arguments. However, unlike Pool.apply_async
, the results are returned in an order corresponding to the order of the arguments.
Like Pool.apply
, Pool.map
blocks until the complete result is returned.
In Python 3, a new function starmap
can accept multiple arguments.
Multi-args Concurrence Blocking Ordered-results
map no yes yes yes
apply yes no yes no
map_async no yes no yes
apply_async yes yes no no
class multiprocessing.
Queue
([maxsize])¶
Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.
The usual Queue.Empty
and Queue.Full
exceptions from the standard library’s Queue
module are raised to signal timeouts.
舉例:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()
multiprocessing.Pipe
([duplex])¶
Returns a pair (conn1, conn2)
of Connection
objects representing the ends of a pipe.
If duplex is True
(the default) then the pipe is bidirectional. If duplex is False
then the pipe is unidirectional: conn1
can only be used for receiving messages and conn2
can only be used for sending messages
Pipe()函數的做用是:返回由管道鏈接的一對鏈接對象,該管道在默認狀況下是雙向的。例如:
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print parent_conn.recv() # prints "[42, None, 'hello']" p.join()
Pipe()返回的兩個鏈接對象表示管道的兩端。每一個鏈接對象都有send()和recv()方法。
請注意,若是兩個進程(或線程)試圖同時從管道的相同一端讀取或寫入數據,管道中的數據可能會損壞。固然,在同時使用管道的不一樣端點的過程當中不存在損壞的風險
官網:
https://docs.python.org/2/library/multiprocessing.html
感謝:
https://blog.csdn.net/xluren/article/details/46861621
https://www.cnblogs.com/freeman818/p/7154089.html#undefined
http://blog.shenwei.me/python-multiprocessing-pool-difference-between-map-apply-map_async-apply_async/