1,Python的multiprocessing 實現多cpu 多進程html
爲啥用multiprocessing?python
*如今cpu都多核了, 寫大型程序你還在用單線程的傳統寫法?很不fashion並且效率很低, 你out了。 安全
*那用multi-threading不就結了?C語言能夠, 編譯完執行沒問題。 可是python這種解釋型的語言用多線程就不行了, python的multithreading效率並不高。聽說是受制於GIL (global interpreter lock) 的鎖機制, 該鎖只能工做於單個cpu core。這樣別的cpu core乾着急也幫不上忙。多線程
*那怎麼辦呢? 本身fork一堆子進程,而後管理唄? 呵呵,DIY?這不是從新造輪子嗎。app
*親, 用python現成的multiprocessing庫吧。支持多個cpu core,簡直就是爲了多核cpu打造的, python 2.6以後都支持。less
不用說了,直接上乾貨異步
*sample 1, 建立process, target=f 函數名, args傳遞參數async
# encoding: utf8 # @Time : 2017/10/26 20:54 # @Author : Ycs # @Site : # @File : mymultiprocessing.py # @Desc : Python的multiprocessing 實現多cpu 多進程, python進程池multiprocessing.Pool和線程池multiprocessing.dummy.Pool實例 from multiprocessing import Process,Queue,Pipe,Pool import time def f(name): print('hello:', name) if __name__ == '__main__': #建立process, target=f 函數名, args傳遞參數 p=Process(target=f,args=('bbb',)) p.start() p.join() #支持隊列 def ff(q): q.put([42,None,'hello']) if __name__ == '__main__': q=Queue() p=Process(target=ff,args=(q,)) p.start() print(q.get()) #這裏prints "[42, None, 'hello']" p.join() #支持Pipe def f3(conn): conn.send([43,None,'hello']) conn.close() if __name__ == '__main__': parent_conn,child_conn=Pipe() p=Process(target=f3,args=(child_conn,)) p.start() print(parent_conn.recv()) # 這裏prints "[42, None, 'hello']" p.join() #支持Pool, 僱一羣worker來幹活 def f4(x): return x*x if __name__ == '__main__': pool=Pool(processes=4) # start 4 worker processes result=pool.apply_async(f4,(10,)) # evaluate "f(10)" asynchronously try: print(result.get(timeout=1)) except TimeoutError: pass print(pool.map(f4,range(10))) it=pool.imap(f4,range(10)) print(it.next()) print(it.next()) print(it.next(timeout=1)) # raises TimeoutError result=pool.apply_async(time.sleep,(10,)) print(result.get(timeout=1))
注意事項:函數
* 在UNIX平臺上,當某個進程終結以後,該進程須要被其父進程調用wait,不然進程成爲殭屍進程(Zombie)。因此,有必要對每一個Process對象調用join()方法 (實際上等同於wait)。對於多線程來講,因爲只有一個進程,因此不存在此必要性。ui
* multiprocessing提供了threading包中沒有的IPC(好比Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (由於它們佔據的不是用戶進程的資源)。
* 多進程應該避免共享資源。在多線程中,咱們能夠比較容易地共享資源,好比使用全局變量或者傳遞參數。在多進程狀況下,因爲每一個進程有本身獨立的內存空間,以上方法並不合適。此時咱們能夠經過共享內存和Manager的方法來共享資源。但這樣作提升了程序的複雜度,並由於同步的須要而下降了程序的效率。
Reference:
[1] http://www.ibm.com/developerworks/aix/library/au-multiprocessing
[2] http://docs.python.org/2/library/multiprocessing.html
1)Process
要建立一個Process是很簡單的。
要得到一個Process的進程ID也是很簡單的。
建立進程:multiprocessing.Process([group[, target[, name[, args[, kargs]]]]])
參數:
group: None,它的存在僅僅是爲了與threading.Thread兼容
target: 通常是函數
name: 進程名
args: 函數的參數
kargs: keywords參數
函數:
run() 默認的run()函數調用target的函數,你也能夠在子類中覆蓋該函數
start() 啓動該進程
join([timeout]) 父進程被中止,直到子進程被執行完畢。
當timeout爲None時沒有超時,不然有超時。
進程能夠被join不少次,但不能join本身
is_alive()
terminate() 結束進程。
在Unix上使用的是SIGTERM
在Windows平臺上使用TerminateProcess
屬性:
name 進程名
daemon 守護進程
pid 進程ID
exitcode 若是進程尚未結束,該值爲None
authkey
2)Queue
Queue相似於queue.Queue,通常用來進程間交互信息
例子:
注意:Queue是進程和線程安全的。
Queue實現了queue.Queue的大部分方法,但task_done()和join()沒有實現。
建立Queue:multiprocessing.Queue([maxsize])
函數:
qsize() 返回Queue的大小
empty() 返回一個boolean值表示Queue是否爲空
full() 返回一個boolean值表示Queue是否滿
put(item[, block[, timeout]])
put_nowait(item)
get([block[, timeout]])
get_nowait()
get_no_wait()
close() 表示該Queue不在加入新的元素
join_thread()
cancel_join_thread()
3)JoinableQueue
建立:multiprocessing.JoinableQueue([maxsize])
task_done()
join()
4)Pipe
multiprocessing.Pipe([duplex]) 返回一個Connection對象
5)異步化synchronization
6)Shared Memory
1>Value
2>Array
7)Manager
8)Pool
multiprocessing.Pool([processes[, initializer[, initargs]]])
函數:
apply(func[, args[, kwds]])
apply_async(func[, args[, kwds[, callback]]])
map(func,iterable[, chunksize])
map_async(func,iterable[, chunksize[, callback]])
imap(func, iterable[, chunksize])
imap_unordered(func, iterable[, chunksize])
close()
terminate()
join()
9)雜項
multiprocessing.active_children() 返回全部活動子進程的列表
multiprocessing.cpu_count() 返回CPU數目
multiprocessing.current_process() 返回當前進程對應的Process對象
multiprocessing.freeze_support()
multiprocessing.set_executable()
10)Connection對象
send(obj)
recv()
fileno()
close()
poll([timeout])
send_bytes(buffer[, offset[, size]])
recv_bytes([maxlength])
recv_bytes_info(buffer[, offset])