在我以前的一篇博文中詳細介紹了Python多線程的應用:html
可是因爲GIL的存在,使得python多線程沒有充分利用CPU的多核,爲了利用多核,我能夠採用多進程;python
wiki上對於父進程與子進程的定義:windows
a)Parent process服務器
In Unix-like operating systems, every process except process 0 (the swapper) is created when another process executes the fork() system call. The process that invoked fork is the parent process and the newly created process is the child process. Every process (except process 0) has one parent process, but can have many child processes.網絡
In the Linux kernel, in which there is a very slim difference between processes and POSIX threads, there are two kinds of parent processes, namely real parent and parent. Parent is the process that receives the SIGCHLD signal on child's termination, whereas real parent is the thread that actually created this child process in a multithreaded environment. For a normal process, both these two values are same, but for a POSIX thread which acts as a process, these two values may be different.[1]多線程
b)Child processapp
A child process in computing is a process created by another process (the parent process). This technique pertains to multitasking operating systems, and is sometimes called a subprocess or traditionally a subtask.運維
There are two major procedures for creating a child process: the fork system call (preferred in Unix-like systems and the POSIX standard) and the spawn (preferred in the modern (NT) kernel of Microsoft Windows, as well as in some historical operating systems).async
即,Unix/Linux操做系統提供了一個fork()
系統調用,用於建立子進程;fork()很是特殊。普通的函數調用,調用一次,返回一次,可是fork()
調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。對於返回值,子進程永遠返回0
,而父進程返回子進程的ID。這樣作的理由是,一個父進程能夠fork出不少子進程,因此,父進程要記下每一個子進程的ID;函數
python的os模塊,就含有fork函數:
#!/bin/env python #coding:utf-8 import os import time print('Process %s start...' % os.getpid()) pid = os.fork() if pid == 0: print('i am child process %s and my parent is %s' % (os.getpid(), os.getppid())) else: print('i %s just created a child process %s' % (os.getpid(), pid))
運行結果:
Process 3522 start... i 3522 just created a child process 3523 i am child process 3523 and my parent is 3522
由於fork()調用一次,返回兩次,因此獲得上面的結果;這裏注意:因爲Windows沒有fork
調用,上面的代碼在Windows上沒法運行;有了fork
調用,一個進程在接到新任務時就能夠複製出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。
上面說到windows沒有fork調用,那麼如何在windows上實現多進程呢?
經過multiprocess模塊,因爲Python是跨平臺的,天然也應該提供一個跨平臺的多進程支持。multiprocessing
模塊就是跨平臺版本的多進程模塊。
python中兩個經常使用來處理進程的模塊分別是subprocess和multiprocessing,其中subprocess一般用於執行外部程序,好比一些第三方應用程序,而不是Python程序。若是須要實現調用外部程序的功能,python的psutil模塊是更好的選擇,它不只支持subprocess提供的功能,並且還能對當前主機或者啓動的外部程序進行監控,好比獲取網絡、cpu、內存等信息使用狀況,在作一些自動化運維工做時支持的更加全面。multiprocessing是python的多進程模塊,主要經過啓動python進程,調用target回調函數來處理任務。
注意:multiprocessing的方法與threading的方法相似,因此咱們這裏只給出示例代碼,而不作詳細介紹;
與threading相似,也是有兩種方式
a)直接調用
1 from multiprocessing import Process, freeze_support 2 import os 3 4 processes = [] 5 6 def run(item): 7 print('-'*50) 8 print('child process %s id: %s'%(item, os.getpid())) 9 print('child process %s parent id: %s' % (item, os.getppid())) 10 11 def main(): 12 #打印主進程進程號 13 print('main process id: ', os.getpid()) 14 #建立多個子進程 15 for item in range(2): 16 p = Process(target=run, args=(item, )) 17 processes.append(p) 18 print('child process %s name: %s' % (item, p.name)) 19 print('child process %s id: %s' % (item, p.pid)) 20 21 for item in processes: 22 item.start() 23 24 for item in processes: 25 item.join() 26 27 if __name__ == '__main__': 28 main() 29 freeze_support()
b)面向對象方式調用
1 from multiprocessing import Process, freeze_support 2 import os 3 4 processes = [] 5 6 class MyProcess(Process): 7 def __init__(self, func, item): 8 super(MyProcess, self).__init__() 9 self.__func = func 10 self.__item = item 11 12 def run(self): 13 self.__func(self.__item) 14 15 def proc(item): 16 print('-'*50) 17 print('child process %s id: %s'%(item, os.getpid())) 18 print('child process %s parent id: %s' % (item, os.getppid())) 19 20 def main(): 21 #打印主進程進程號 22 print('main process id: ', os.getpid()) 23 #建立多個子進程 24 for item in range(2): 25 p = MyProcess(proc, item) 26 processes.append(p) 27 print('child process %s name: %s' % (item, p.name)) 28 print('child process %s id: %s' % (item, p.pid)) 29 30 for item in processes: 31 item.start() 32 33 for item in processes: 34 item.join() 35 36 if __name__ == '__main__': 37 main() 38 freeze_support()
注:2.7中,if __name__ == '__main__'的代碼塊中必須加上freeze_support(),python3好像不須要了
結果:
main process id: 10972 child process 0 name: MyProcess-1 child process 0 id: None child process 1 name: MyProcess-2 child process 1 id: None -------------------------------------------------- child process 0 id: 10636 child process 0 parent id: 10972 -------------------------------------------------- child process 1 id: 8076 child process 1 parent id: 10972
1 from multiprocessing import Process 2 import time 3 4 processes = [] 5 6 def run(item): 7 time.sleep(1) 8 print('item: ', item) 9 10 def main(): 11 #建立多個子進程 12 for item in range(2): 13 p = Process(target=run, args=(item, )) 14 processes.append(p) 15 p.daemon = True 16 17 for item in processes: 18 item.start() 19 20 print('all done') 21 22 if __name__ == '__main__': 23 main()
結果:
all done
注意daemon和threading的方式不一樣,這裏是直接設置屬性,而不是調用方法;另外要在start前設置daemon;
既然進程之間不共享數據,爲何還有進程同步問題呢?若是多個進程打開同一個文件,在同一個屏幕輸出呢?這些仍是須要進程同步的,經過Lock
同threading.Semaphore()用法相同,只是建立的Semaphore須要做爲參數傳入子進程,由於進程間不共享資源
同threading.Event()用法相同,只是建立的Event須要做爲參數傳入子進程
由於進程之間不共享資源,咱們先看一個例子證實一下:
1 from multiprocessing import Process 2 3 processes = [] 4 data_list = [] 5 6 def run(lst, item): 7 lst.append(item) 8 print('%s : %s' % (item, lst)) 9 10 def main(): 11 for item in range(4): 12 p = Process(target=run, args=(data_list, item)) 13 processes.append(p) 14 15 for item in processes: 16 item.start() 17 18 for item in processes: 19 item.join() 20 21 print('final lst: ', data_list) 22 23 if __name__ == '__main__': 24 main()
結果:
1 : [1] 2 : [2] 0 : [0] 3 : [3] final lst: []
因此必須經過第三方實現進程間通信,下面介紹3種方法
a)Queue
用法與queue.Queue在多線程中的應用相同,只是建立的queue要做爲參數傳入子進程
1 from multiprocessing import Process, Queue 2 import time 3 4 q = Queue(10) 5 6 def put(q): 7 for i in range(3): 8 q.put(i) 9 print('queue size after put: %s' % q.qsize()) 10 11 def get(q): 12 print('queue size before get: %s' % q.qsize()) 13 while not q.empty(): 14 print('queue get: ', q.get()) 15 16 def main(): 17 p_put = Process(target=put, args=(q,)) 18 p_get = Process(target=get, args=(q,)) 19 p_put.start() 20 time.sleep(1) 21 p_get.start() 22 p_get.join() 23 print('all done') 24 25 if __name__ == '__main__': 26 main()
結果:
queue size after put: 3 queue size before get: 3 queue get: 0 queue get: 1 queue get: 2 all done
b)Pipe
1 import multiprocessing 2 import time 3 4 pipe = multiprocessing.Pipe() 5 6 def send(pipe): 7 for i in range(5): 8 print("send: %s" % (i,)) 9 pipe.send(i) 10 time.sleep(0.2) 11 12 def recv_1(pipe): 13 while True: 14 print("rev_1:", pipe.recv()) 15 time.sleep(1) 16 17 def recv_2(pipe): 18 while True: 19 print("rev_2:", pipe.recv()) 20 time.sleep(1) 21 22 def main(): 23 p_send = multiprocessing.Process(target=send, args=(pipe[0],)) 24 p_recv_1 = multiprocessing.Process(target=recv_1, args=(pipe[1],)) 25 p_recv_2 = multiprocessing.Process(target=recv_2, args=(pipe[1],)) 26 27 p_send.start() 28 p_recv_1.start() 29 p_recv_2.start() 30 31 p_send.join() 32 p_recv_1.join() 33 p_recv_2.join() 34 35 if __name__ == "__main__": 36 main()
結果:
send: 0 rev_1: 0 send: 1 rev_2: 1 send: 2 send: 3 send: 4 rev_1: 2 rev_2: 3 rev_1: 4
c)Manager
至關至關給力,上面的Queue,Pipe僅僅能夠傳遞數據,而不能作到數據共享(不一樣進程修改同一份數據),可是Manger能夠作到數據共享
看一下官方文檔:
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Queue
, Value
and Array
.
from multiprocessing import Process, Manager def run(d, l): d['name'] = 'winter' l.reverse() def main(): p = Process(target=run, args=(d, l, )) p.start() p.join() print('final dict: ', d) print('final list: ', l) if __name__ == "__main__": mgmt = Manager() d = mgmt.dict() l = mgmt.list(range(10)) main()
注意:mgmt = Manger()必須放在if __name__ == "__main__"的代碼塊中,否則報freeze_support()的錯誤
並且,注意這裏:
Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.
還能夠在不一樣主機之間共享數據;
若是要啓動大量的子進程,能夠用進程池pool批量建立子進程:Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程來執行。
有兩種方法:阻塞方法Pool.apply()和非阻塞方法Pool.apply_async()
a)阻塞方法Pool.apply()
import multiprocessing import time def func(name): print("start: %s" % name) time.sleep(2) return 'end: %s' % name if __name__ == "__main__": name_list = ['winter', 'elly', 'james', 'yule'] res_list = [] # 建立一個進程總數爲3的進程池 pool = multiprocessing.Pool(3) for member in name_list: # 建立子進程,並執行,不須要start res = pool.apply(func, (member,)) print(res) pool.close() # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool pool.join() print("all done...")
結果:
start: winter
end: winter
start: elly
end: elly
start: james
end: james
start: yule
end: yule
all done...
發現,阻塞方式下,進程是一個一個執行的,仍是串行,因此apply用的少;
注意兩點:
1. 進程池執行子進程不須要start;
2. 調用join()
以前必須先調用close()
,調用close()
以後就不能繼續添加新的Process
了;
b)非阻塞方法Pool.apply_async()
import multiprocessing import time def func(name): print("start: %s" % name) time.sleep(2) return 'end: %s' % name def func_exp(msg): print('callback: %s' % msg) if __name__ == "__main__": name_list = ['winter', 'elly', 'james', 'yule'] res_list = [] # 建立一個進程總數爲3的進程池 pool = multiprocessing.Pool() for member in name_list: # 建立子進程,並執行,不須要start res = pool.apply_async(func, (member,), callback=func_exp) #注意這裏是append了res,不是res.get(),否則又要阻塞了 res_list.append(res) for res_mem in res_list: print(res_mem.get()) pool.close() # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool pool.join() print("all done...")
結果:
start: winter
start: elly
start: james
start: yule
callback: end: winter
end: winter
callback: end: elly
end: elly
callback: end: james
end: james
callback: end: yule
end: yule
all done...
結果分析:
1. 能夠看到非阻塞狀況下,充分利用了多核,實現了並行;
2. apply_async方法含有callback參數,能夠用於回調
3.爲何apply方法是阻塞的呢?到底阻塞在了哪裏呢?同時apply_async方法作了什麼改進呢?
查看apply方法源碼:
def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. ''' assert self._state == RUN return self.apply_async(func, args, kwds).get()
apply方法最終執行了self.apply_async(func, args, kwds).get(),一樣調用了apply_async()方法,只是對結果執行了get()方法;阻塞就是阻塞在了這裏;
那我修改一下apply_async()的代碼是否是可讓apply_async()能夠變成阻塞的呢?試一下
1 import multiprocessing 2 import time 3 4 def func(name): 5 print("start: %s" % name) 6 time.sleep(2) 7 return 'end: %s' % name 8 9 def func_exp(msg): 10 print('callback: %s' % msg) 11 12 if __name__ == "__main__": 13 name_list = ['winter', 'elly', 'james', 'yule'] 14 # 建立一個進程總數爲3的進程池 15 pool = multiprocessing.Pool() 16 for member in name_list: 17 # 建立子進程,並執行,不須要start 18 res = pool.apply_async(func, (member,), callback=func_exp) 19 print(res.get()) 20 pool.close() 21 # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool 22 pool.join() 23 print("all done...")
注意紅色部分是我修改的編碼,結果果真變成了阻塞狀態:
start: winter
callback: end: winter
end: winter
start: elly
callback: end: elly
end: elly
start: james
callback: end: james
end: james
start: yule
callback: end: yule
end: yule
all done...
c)進程池該設置多少個進程數?
既然多進程能夠利用多核,那麼是否是建立越多的進程越好呢?不是的,由於進程的切換成本高,因此數量太多的進程來回切換反而會下降效率!
進程數是一個經驗值,和系統的硬件資源有很大關係;最優的進程數須要經過不斷調整得出;
Pool建立時,進程池的進程數默認大小爲CPU的邏輯CPU數目(內核線程數);
經驗上來講: