python3 multiprocessing 模塊

多進程 Multiprocessing 模塊

multiprocessing 模塊官方說明文檔php

Process 類

Process 類用來描述一個進程對象。建立子進程的時候,只須要傳入一個執行函數和函數的參數便可完成 Process 示例的建立。css

  • star() 方法啓動進程,
  • join() 方法實現進程間的同步,等待全部進程退出。
  • close() 用來阻止多餘的進程涌入進程池 Pool 形成進程阻塞。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • 1
  • target 是函數名字,須要調用的函數
  • args 函數須要的參數,以 tuple 的形式傳入

示例:html

import multiprocessing import os def run_proc(name): print('Child process {0} {1} Running '.format(name, os.getpid())) if __name__ == '__main__': print('Parent process {0} is Running'.format(os.getpid())) for i in range(5): p = multiprocessing.Process(target=run_proc, args=(str(i),)) print('process start') p.start() p.join() print('Process close')

結果:python

Parent process 809 is Running process start process start process start process start process start Child process 0 810 Running Child process 1 811 Running Child process 2 812 Running Child process 3 813 Running Child process 4 814 Running Process close

Pool

Pool 能夠提供指定數量的進程供用戶使用,默認是 CPU 核數。當有新的請求提交到 Poll 的時候,若是池子沒有滿,會建立一個進程來執行,不然就會讓該請求等待。
- Pool 對象調用 join 方法會等待全部的子進程執行完畢
- 調用 join 方法以前,必須調用 close
- 調用 close 以後就不能繼續添加新的 Process 了bash

pool.apply_async

apply_async 方法用來同步執行進程,容許多個進程同時進入池子。app

import multiprocessing import os import time def run_task(name): print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid())) time.sleep(1) print('Task {0} end.'.format(name)) if __name__ == '__main__': print('current process {0}'.format(os.getpid())) p = multiprocessing.Pool(processes=3) for i in range(6): p.apply_async(run_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All processes done!')

結果:dom

current process 921 Waiting for all subprocesses done... Task 0 pid 922 is running, parent id is 921 Task 1 pid 923 is running, parent id is 921 Task 2 pid 924 is running, parent id is 921 Task 0 end. Task 3 pid 922 is running, parent id is 921 Task 1 end. Task 4 pid 923 is running, parent id is 921 Task 2 end. Task 5 pid 924 is running, parent id is 921 Task 3 end. Task 4 end. Task 5 end. All processes done!

pool.apply

apply(func[, args[, kwds]])

該方法只能容許一個進程進入池子,在一個進程結束以後,另一個進程才能夠進入池子。async

import multiprocessing import os import time def run_task(name): print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid())) time.sleep(1) print('Task {0} end.'.format(name)) if __name__ == '__main__': print('current process {0}'.format(os.getpid())) p = multiprocessing.Pool(processes=3) for i in range(6): p.apply(run_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All processes done!')

結果:函數

Task 0 pid 928 is running, parent id is 927 Task 0 end. Task 1 pid 929 is running, parent id is 927 Task 1 end. Task 2 pid 930 is running, parent id is 927 Task 2 end. Task 3 pid 928 is running, parent id is 927 Task 3 end. Task 4 pid 929 is running, parent id is 927 Task 4 end. Task 5 pid 930 is running, parent id is 927 Task 5 end. Waiting for all subprocesses done... All processes done!

Queue 進程間通訊

Queue 用來在多個進程間通訊。Queue 有兩個方法,get 和 put。ui

put 方法

Put 方法用來插入數據到隊列中,有兩個可選參數,blocked 和 timeout。
- blocked = True(默認值),timeout 爲正

該方法會阻塞 timeout 指定的時間,直到該隊列有剩餘空間。若是超時,拋出 Queue.Full 異常。

 

  • blocked = False
若是 Queue 已滿, 馬上拋出 Queue.Full 異常

get 方法

get 方法用來從隊列中讀取並刪除一個元素。有兩個參數可選,blocked 和 timeout
- blocked = False (默認),timeout 正值

等待時間內,沒有取到任何元素,會拋出 Queue.Empty 異常。

 

  • blocked = True
Queue 有一個值可用,馬上返回改值;Queue 沒有任何元素,
from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def proc_write(q,urls): print('Process(%s) is writing...' % os.getpid()) for url in urls: q.put(url) print('Put %s to queue...' % url) time.sleep(random.random()) # 讀數據進程執行的代碼: def proc_read(q): print('Process(%s) is reading...' % os.getpid()) while True: url = q.get(True) print('Get %s from queue.' % url) if __name__=='__main__': # 父進程建立Queue,並傳給各個子進程: q = Queue() proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3'])) proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6'])) proc_reader = Process(target=proc_read, args=(q,)) # 啓動子進程proc_writer,寫入: proc_writer1.start() proc_writer2.start() # 啓動子進程proc_reader,讀取: proc_reader.start() # 等待proc_writer結束: proc_writer1.join() proc_writer2.join() # proc_reader進程裏是死循環,沒法等待其結束,只能強行終止: proc_reader.terminate()

結果:

Process(1083) is writing... Put url_1 to queue... Process(1084) is writing... Put url_4 to queue... Process(1085) is reading... Get url_1 from queue. Get url_4 from queue. Put url_5 to queue... Get url_5 from queue. Put url_2 to queue... Get url_2 from queue. Put url_6 to queue... Get url_6 from queue. Put url_3 to queue... Get url_3 from queue.

Pipe 進程間通訊

經常使用來在兩個進程間通訊,兩個進程分別位於管道的兩端。

multiprocessing.Pipe([duplex])
  • 1

示例一和示例二,也是網上找的別人的例子,嘗試理解並增長了註釋而已。網上的例子,大可能是例子一和例子二在一塊兒的,這裏分開來看,比較容易理解。

示例一:

from multiprocessing import Process, Pipe def send(pipe): pipe.send(['spam'] + [42, 'egg']) # send 傳輸一個列表 pipe.close() if __name__ == '__main__': (con1, con2) = Pipe() # 建立兩個 Pipe 實例 sender = Process(target=send, args=(con1, )) # 函數的參數,args 必定是實例化以後的 Pip 變量,不能直接寫 args=(Pip(),) sender.start() # Process 類啓動進程 print("con2 got: %s" % con2.recv()) # 管道的另外一端 con2 從send收到消息 con2.close() # 關閉管道

結果:

con2 got: ['spam', 42, 'egg']

示例二:

from multiprocessing import Process, Pipe def talk(pipe): pipe.send(dict(name='Bob', spam=42)) # 傳輸一個字典 reply = pipe.recv() # 接收傳輸的數據 print('talker got:', reply) if __name__ == '__main__': (parentEnd, childEnd) = Pipe() # 建立兩個 Pipe() 實例,也能夠改爲 conf1, conf2 child = Process(target=talk, args=(childEnd,)) # 建立一個 Process 進程,名稱爲 child child.start() # 啓動進程 print('parent got:', parentEnd.recv()) # parentEnd 是一個 Pip() 管道,能夠接收 child Process 進程傳輸的數據 parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一個 Pip() 管道,可使用 send 方法來傳輸數據 child.join() # 傳輸的數據被 talk 函數內的 pip 管道接收,並賦值給 reply print('parent exit')

結果:

parent got: {'name': 'Bob', 'spam': 42} talker got: {'ss', 'aa', 'pp', 'mm'} parent exit
相關文章
相關標籤/搜索