Unix/Linux操做系統提供了一個fork()系統調用,它很是特殊。普通的函數調用,調用一次,返回一次,可是fork()調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。
子進程永遠返回0,而父進程返回子進程的ID。這樣作的理由是,一個父進程能夠fork出不少子進程,因此,父進程要記下每一個子進程的ID,而子進程只須要調用getppid()就能夠拿到父進程的ID。
Python的os模塊封裝了常見的系統調用,其中就包括fork,能夠在Python程序中輕鬆建立子進程。python
示例以下linux
import os pid=os.fork() if pid==0: print('I am child process %s my parents is %s'%(os.getpid(),os.getppid())) else: print('I (%s) just created a child process (%s).'%(os.getpid(),pid))
輸出以下服務器
I (64225) just created a child process (64226). I am child process 64226 my parents is 64225
multiprocessing
模塊提供了一個Process類來表明一個進程對象。\網絡
示例1併發
from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getppid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') #join()方法能夠等待子進程結束後再繼續往下運行,一般用於進程間的同步。
示例2app
from multiprocessing import Process import time import os class P(Process): def run(self): print('Run child process %s (%s)...'%(self.name,os.getpid())) # 默認函數對象有name方法 ,結果爲:P-1 time.sleep(3) print('%s is done' % self.name) if __name__ == '__main__': print('Parent process %s.' % os.getppid()) p=P() p.start() p.join()
多個進程間的數據是隔離的,也就是說多個進程修改全局變量互不影響\dom
驗證示例異步
from multiprocessing import Process import time x=100 def task(): global x print('子進程開啓,當前x的值爲%d'%x) time.sleep(3) x=10 print('子進程結束,當前x的值爲%d'%x) if __name__ == '__main__': print('當前爲父進程,準備開啓子進程,x的值爲%d' % x) p1=Process(target=task) p1.start() p1.join() print('當前爲父進程,準備結束父進程,x的值爲%d' % x)
輸出async
當前爲父進程,準備開啓子進程,x的值爲100 子進程開啓,當前x的值爲100 子進程結束,當前x的值爲10 當前爲父進程,準備結束父進程,x的值爲100
==注意:有些狀況是須要加鎖的狀況,如文件讀寫問題==分佈式
示例以下
import time from multiprocessing import Process def task(name,n): print('%s is running'%name) time.sleep(n) print('%s is done'%name) if __name__ == '__main__': p1=Process(target=task,args=("進程1",1)) #用時1s p2=Process(target=task,args=("進程2",2)) #用時1s p3=Process(target=task,args=("進程3",3)) #用時1s start_time=time.time() p1.start() p2.start() p3.start() # 當第一秒在運行p1時,其實p二、p3也已經在運行,當1s後到p2時只須要再運行1s就到p3了,到p3也是同樣。 p1.join() p2.join() p3.join() stop_time=time.time() print(stop_time-start_time) #3.2848567962646484
from multiprocessing import Pool # 導入進程池模塊pool import time,os def foo(i): time.sleep(2) print("in process", os.getpid()) # 打印進程號 if __name__ == "__main__": pool = Pool(processes=5) # 設置容許進程池同時放入5個進程 for i in range(10): pool.apply(func=foo, args=(i,)) # 同步執行掛起進程 print('end') pool.close() # 關閉進程池,再也不接受新進程 pool.join() # 進程池中進程執行完畢後再關閉,若是註釋掉,那麼程序直接關閉。
from multiprocessing import Pool # 導入進程池模塊pool import time,os def foo(i): time.sleep(2) print("in process", os.getpid()) # 打印進程號 if __name__ == "__main__": pool = Pool(processes=5) # 設置容許進程池同時放入5個進程,而且將這5個進程交給cpu去運行 for i in range(10): pool.apply_async(func=foo, args=(i,)) # 採用異步方式執行foo函數 print('end') pool.close() pool.join() # 進程池中進程執行完畢後再關閉,若是註釋掉,那麼程序直接關閉。
from multiprocessing import Process,Pool import time,os def foo(i): time.sleep(2) print("in process", os.getpid()) # 打印子進程的進程號 def bar(arg):#注意arg參數是必需要有的 print('-->exec done:', arg, os.getpid()) # 打印進程號 if __name__ == "__main__": pool = Pool(processes=2) print("主進程", os.getpid()) # 主進程的進程號 for i in range(3): pool.apply_async(func=foo, args=(i,), callback=bar) # 執行回調函數callback=Bar print('end') pool.close() pool.join() # 進程池中進程執行完畢後再關閉,若是註釋掉,那麼程序直接關閉。
執行結果
主進程 752 end in process 2348 -->exec done: None 752 in process 8364 -->exec done: None 752 in process 2348 -->exec done: None 752 #回調函數說明fun=Foo幹不完就不執行bar函數,等Foo執行完就去執行Bar #這個回調函數是主進程去調用的,而不是每一個子進程去調用的。
一、 不少時候子進程是一個外部進程,如執行一條命令,這和命令行執行效果是同樣的
示例以下
import subprocess print('$nslookup https://www.baidu.com') r = subprocess.call(['nslookup','https://www.baidu.com']) print('Exit code',r)
二、 有時候子進程還須要進行輸入,能夠經過communicate
方法來輸入
示例以下
import subprocess print('$ nslookup https://www.baidu.com') p = subprocess.Popen(['nslookup'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) output,err = p.communicate(b'set q=mx\nbaidu.com\nexit\n') print(output.decode('gbk')) print('Exit code:',p.returncode)
輸出以下
$ nslookup https://www.baidu.com 默認服務器: bogon Address: 192.168.111.1 > > 服務器: bogon Address: 192.168.111.1 baidu.com MX preference = 10, mail exchanger = mx.maillb.baidu.com baidu.com MX preference = 20, mail exchanger = jpmx.baidu.com baidu.com MX preference = 15, mail exchanger = mx.n.shifen.com baidu.com MX preference = 20, mail exchanger = mx50.baidu.com baidu.com MX preference = 20, mail exchanger = mx1.baidu.com > Exit code: 0
守護進程在主進程代碼執行完畢時馬上掛掉,而後主進程等待非守護進程執行完畢後回收子進程的資源(避免產生殭屍進程),總體纔算結束。 示例
from multiprocessing import Process import os import time def task(x): print('%s is running ' %x) time.sleep(3) print('%s is done' %x) if __name__ == '__main__': p1=Process(target=task,args=('守護進程',)) p2=Process(target=task,args=('子進程',)) p2.start() p1.daemon=True # 設置p1爲守護進程 p1.start() print('主進程代碼執行完畢') >>:主進程代碼執行完畢 >>:子進程 is running >>:子進程 is done
==能夠從結果看出,主進程代碼執行完,守護進程當即掛掉,主進程在等待子進程執行完畢後退出==
若是想要進程間通訊可使用Queue
或Pipe
來實現
使用Queue示例
from multiprocessing import Queue,Process def put_id(q): q.put([1,2,3,4]) if __name__ == '__main__': q=Queue() p=Process(target=put_id,args=(q,)) p.start() print(q.get()) p.join() # 輸出 [1,2,3,4]
==注意:在這須要從multiprocessing導入Queue模塊==
使用Pipe示例
from multiprocessing import Process,Pipe def put_id(conn): conn.send([1,2,3]) conn.send([4,5,6]) conn.close() if __name__ == '__main__': ## 生成管道。 生成時會產生兩個返回對象,這兩個對象至關於兩端的電話,經過管道線路鏈接。 ## 兩個對象分別交給兩個變量。 parent_conn,child_conn=Pipe() p=Process(target=put_id,args=(child_conn,))#child_conn須要傳給對端,用於send數據給parent_conn p.start() print(parent_conn.recv()) # parent_conn在這斷用於接收數據>>>>[1,2,3] print(parent_conn.recv()) # parent_conn在這斷用於接收數據>>>>[4,5,6] p.join()
==注意兩端要發送次數和接受次數要對等,否則會卡住直到對等==
前面說過,進程間數據是隔離的,若是想要進程間數據共享能夠經過Manager
來實現
示例以下
from multiprocessing import Manager,Process from random import randint import os def run(d,l): d[randint(1,50)]=randint(51,100)#生成一個可在多個進程之間傳遞和共享的字典 l.append(os.getpid()) print(l) if __name__ == '__main__': with Manager() as manage: #作一個別名,此時manager就至關於Manager() d=manage.dict()#生成一個可在多個進程之間傳遞和共享的字典 l=manage.list(range(5))#生成一個可在多個進程之間傳遞和共享的列表 p_list=[] for i in range(10):#生成10個進程 p=Process(target=run,args=(d,l)) p_list.append(p)# 將每一個進程放入空列表中 p.start() for i in p_list: i.join() print(d)#全部進程都執行完畢後打印字典 print(l)#全部進程都執行完畢後打印列表
在作分佈式計算時顯然進程比線程各合適,一來進程更穩定,二來線程最多隻能在同一臺機器的多個cpu上運行;
multiprocessing
的managers
子模塊支持把多進程分佈到多個機器上,一個服務進程用做調度者,依靠網絡將任務分佈到其它多個進程中。
假設有一個需求,擁有兩臺機器,一臺機器用來作發送任務的服務進程,一臺用來作處理任務的服務進程;
示例以下
# task_master.py from multiprocessing.managers import BaseManager from queue import Queue import random import time task_queue = Queue() result_queue = Queue() class QueueManager(BaseManager): pass def get_task_queue(): global task_queue return task_queue def get_result_queue(): global result_queue return result_queue if __name__ == '__main__': # 將兩個隊列註冊到網絡上,calltable參數關聯Queue對象 QueueManager.register('get_task_queue', callable=get_task_queue) QueueManager.register('get_result_queue', callable=get_result_queue) # 建立一個隊列管理器,綁定端口5000,設定密碼爲abc manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc') manager.start() # 經過網絡獲取Queue對象 task = manager.get_task_queue() result = manager.get_result_queue() # 聽任務進去 for i in range(10): n = random.randint(0,1000) print('Put Task %d'%n) task.put(n) # 從結果隊列獲取結果 print('Try get results') for i in range(10): r = result.get() print('Result: %s' % r) manager.shutdown() print('master exit')
==注意:必定要用註冊過的Queue對象,另外在linux/unix/mac等系統上註冊可直接使用QueueManager.register('get_result_queue', callable=lambda : result_queue)
==
# task_worker.py from multiprocessing.managers import BaseManager from queue import Queue from queue import Empty import time class QueueManager(BaseManager): pass if __name__ == '__main__': # 從服務器上獲取,因此註冊時只須要提供名字,也就是接口名字 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 鏈接到服務器,也就是task_master.py的機器 server_addr = '127.0.0.1' manager = QueueManager(address=(server_addr,5000),authkey=b'abc') manager.connect() # 獲取Queue對象 task = manager.get_task_queue() result = manager.get_result_queue() # 從隊列提取任務,將處理結果插入result隊列 for i in range(10): try: n = task.get(timeout=1) print('run task %d*%d'%(n,n)) r = '%d * %d = %d'%(n,n,n*n) time.sleep(1) result.put(r) except Empty: print('task queue is empty') print('worker exit')