多進程編程知識是Python程序員進階高級的必備知識點,咱們平時習慣了使用multiprocessing庫來操縱多進程,可是並不知道它的具體實現原理。下面我對多進程的經常使用知識點都簡單列了一遍,使用原生的多進程方法調用,幫助讀者理解多進程的實現機制。代碼跑在linux環境下。沒有linux條件的,可使用docker或者虛擬機運行進行體驗。python
docker pull python:2.7
Python生成子進程使用os.fork()
,它將產生一個子進程。fork調用同時在父進程和主進程同時返回,在父進程中返回子進程的pid,在子進程中返回0,若是返回值小於零,說明子進程產生失敗,通常是由於操做系統資源不足。linux
import os def create_child(): pid = os.fork() if pid > 0: print 'in father process' return True elif pid == 0: print 'in child process' return False else: raise
咱們調用create_child
方法屢次就能夠生成多個子進程,前提是必須保證create_child
是在父進程裏執行,若是是子進程,就不要在調用了。 程序員
# coding: utf-8 # child.py import os def create_child(i): pid = os.fork() if pid > 0: print 'in father process' return pid elif pid == 0: print 'in child process', i return 0 else: raise for i in range(10): # 循環10次,建立10個子進程 pid = create_child(i) # pid==0是子進程,應該當即退出循環,不然子進程也會繼續生成子進程 # 子子孫孫,那就生成太多進程了 if pid == 0: break
運行python child.py
,輸出redis
in father process in father process in child process 0 in child process 1 in father process in child process 2 in father process in father process in child process 3 in father process in child process 4 in child process 5 in father process in father process in child process 6 in child process 7 in father process in child process 8 in father process in child process 9
使用time.sleep可使進程休眠任意時間,單位爲秒,能夠是小數docker
import time for i in range(5): print 'hello' time.sleep(1) # 睡1s
使用os.kill(pid, sig_num)能夠向進程號爲pid的子進程發送信號,sig_num經常使用的有SIGKILL(暴力殺死,至關於kill -9),SIGTERM(通知對方退出,至關於kill不帶參數),SIGINT(至關於鍵盤的ctrl+c)。macos
# coding: utf-8 # kill.py import os import time import signal def create_child(): pid = os.fork() if pid > 0: return pid elif pid == 0: return 0 else: raise pid = create_child() if pid == 0: while True: # 子進程死循環打印字符串 print 'in child process' time.sleep(1) else: print 'in father process' time.sleep(5) # 父進程休眠5s再殺死子進程 os.kill(pid, signal.SIGKILL) time.sleep(5) # 父進程繼續休眠5s觀察子進程是否還有輸出
運行python kill.py
,咱們看到控制檯輸出以下編程
in father process in child process # 等1s in child process # 等1s in child process # 等1s in child process # 等1s in child process # 等了5s
說明os.kill執行以後,子進程已經中止輸出了api
在上面的例子中,os.kill執行完以後,咱們經過ps -ef|grep python快速觀察進程的狀態,能夠發現子進程有一個奇怪的顯示<defunct>
網絡
root 12 1 0 11:22 pts/0 00:00:00 python kill.py root 13 12 0 11:22 pts/0 00:00:00 [python] <defunct>
待父進程終止後,子進程也一塊消失了。那<defunct>
是什麼含義呢? 它的含義是「殭屍進程」。子進程結束後,會當即成爲殭屍進程,殭屍進程佔用的操做系統資源並不會當即釋放,它就像一具屍體啥事也不幹,可是仍是持續佔據着操做系統的資源(內存等)。併發
若是完全乾掉殭屍進程?父進程須要調用waitpid(pid, options)函數,「收割」子進程,這樣子進程才能夠灰飛煙滅。waitpid函數會返回子進程的退出狀態,它就像子進程留下的臨終遺言,必須等父進程聽到後才能完全瞑目。
# coding: utf-8 import os import time import signal def create_child(): pid = os.fork() if pid > 0: return pid elif pid == 0: return 0 else: raise pid = create_child() if pid == 0: while True: # 子進程死循環打印字符串 print 'in child process' time.sleep(1) else: print 'in father process' time.sleep(5) # 父進程休眠5s再殺死子進程 os.kill(pid, signal.SIGTERM) ret = os.waitpid(pid, 0) # 收割子進程 print ret # 看看到底返回了什麼 time.sleep(5) # 父進程繼續休眠5s觀察子進程是否還存在
運行python kill.py輸出以下
in father process in child process in child process in child process in child process in child process in child process (125, 9)
咱們看到waitpid返回了一個tuple,第一個是子進程的pid,第二個9是什麼含義呢,它在不一樣的操做系統上含義不盡相同,不過在Unix上,它一般的value是一個16位的整數值,前8位表示進程的退出狀態,後8位表示致使進程退出的信號的整數值。因此本例中退出狀態位0,信號編號位9,還記得kill -9
這個命令麼,就是這個9表示暴力殺死進程。
若是咱們將os.kill換一個信號纔看結果,好比換成os.kill(pid, signal.SIGTERM),能夠看到返回結果變成了(138, 15)
,15就是SIGTERM信號的整數值。
waitpid(pid, 0)
還能夠起到等待子進程結束的功能,若是子進程不結束,那麼該調用會一直卡住。
SIGTERM信號默認處理動做就是退出進程,其實咱們還能夠設置SIGTERM信號的處理函數,使得它不退出。
# coding: utf-8 import os import time import signal def create_child(): pid = os.fork() if pid > 0: return pid elif pid == 0: return 0 else: raise pid = create_child() if pid == 0: signal.signal(signal.SIGTERM, signal.SIG_IGN) while True: # 子進程死循環打印字符串 print 'in child process' time.sleep(1) else: print 'in father process' time.sleep(5) # 父進程休眠5s再殺死子進程 os.kill(pid, signal.SIGTERM) # 發一個SIGTERM信號 time.sleep(5) # 父進程繼續休眠5s觀察子進程是否還存在 os.kill(pid, signal.SIGKILL) # 發一個SIGKILL信號 time.sleep(5) # 父進程繼續休眠5s觀察子進程是否還存在
咱們在子進程裏設置了信號處理函數,SIG_IGN表示忽略信號。咱們發現第一次調用os.kill以後,子進程會繼續輸出。說明子進程沒有被殺死。第二次os.kill以後,子進程終於中止了輸出。
接下來咱們換一個自定義信號處理函數,子進程收到SIGTERM以後,打印一句話再退出。
# coding: utf-8 import os import sys import time import signal def create_child(): pid = os.fork() if pid > 0: return pid elif pid == 0: return 0 else: raise def i_will_die(sig_num, frame): # 自定義信號處理函數 print "child will die" sys.exit(0) pid = create_child() if pid == 0: signal.signal(signal.SIGTERM, i_will_die) while True: # 子進程死循環打印字符串 print 'in child process' time.sleep(1) else: print 'in father process' time.sleep(5) # 父進程休眠5s再殺死子進程 os.kill(pid, signal.SIGTERM) time.sleep(5) # 父進程繼續休眠5s觀察子進程是否還存在
輸出以下
in father process in child process in child process in child process in child process in child process child will die
信號處理函數有兩個參數,第一個sig_num表示被捕獲信號的整數值,第二個frame不太好理解,通常也不多用。它表示被信號打斷時,Python的運行的棧幀對象信息。讀者能夠沒必要深度理解。
下面咱們使用多進程進行一個計算圓周率PI。對於圓周率PI有一個數學極限公式,咱們將使用該公司來計算圓周率PI。
先使用單進程版本
import math def pi(n): s = 0.0 for i in range(n): s += 1.0/(2*i+1)/(2*i+1) return math.sqrt(8 * s) print pi(10000000)
輸出
3.14159262176
這個程序跑了有一小會纔出結果,不過這個值已經很是接近圓周率了。
接下來咱們用多進程版本,咱們用redis進行進程間通訊。
# coding: utf-8 import os import sys import math import redis def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 client = redis.StrictRedis() client.delete("result") # 保證結果集是乾淨的 del client # 關閉鏈接 for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子進程開始計算 client = redis.StrictRedis() client.rpush("result", str(s)) # 傳遞子進程結果 sys.exit(0) # 子進程結束 for pid in pids: os.waitpid(pid, 0) # 等待子進程結束 sum = 0 client = redis.StrictRedis() for s in client.lrange("result", 0, -1): sum += float(s) # 收集子進程計算結果 return math.sqrt(sum * 8) print pi(10000000)
咱們將級數之和的計算拆分紅10個子進程計算,每一個子進程負責1/10的計算量,並將計算的中間結果扔到redis的隊列中,而後父進程等待全部子進程結束,再將隊列中的數據所有彙總起來計算最終結果。
輸出以下
3.14159262176
這個結果和單進程結果一致,可是花費的時間要縮短了很多。
這裏咱們之因此使用redis做爲進程間通訊方式,是由於進程間通訊是一個比較複雜的技術,接下來咱們將會使用進程間通訊技術來替換掉這裏的redis。
使用文件進行通訊是最簡單的一種通訊方式,子進程將結果輸出到臨時文件,父進程從文件中讀出來。文件名使用子進程的進程id來命名。進程隨時均可以經過os.getpid()
來獲取本身的進程id。
# coding: utf-8 import os import sys import math def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子進程開始計算 with open("%d" % os.getpid(), "w") as f: f.write(str(s)) sys.exit(0) # 子進程結束 sums = [] for pid in pids: os.waitpid(pid, 0) # 等待子進程結束 with open("%d" % pid, "r") as f: sums.append(float(f.read())) os.remove("%d" % pid) # 刪除通訊的文件 return math.sqrt(sum(sums) * 8) print pi(10000000)
輸出
3.14159262176
管道是Unix進程間通訊最經常使用的方法之一,它經過在父子進程之間開通讀寫通道來進行雙工交流。咱們經過os.read()和os.write()來對文件描述符進行讀寫操做,使用os.close()關閉描述符。
上圖爲單進程的管道
上圖爲父子進程分離後的管道
# coding: utf-8 import os import sys import math def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): childs = {} unit = n / 10 for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit r, w = os.pipe() pid = os.fork() if pid > 0: childs[pid] = r # 將子進程的pid和讀描述符存起來 os.close(w) # 父進程關閉寫描述符,只讀 else: os.close(r) # 子進程關閉讀描述符,只寫 s = slice(mink, maxk) # 子進程開始計算 os.write(w, str(s)) os.close(w) # 寫完了,關閉寫描述符 sys.exit(0) # 子進程結束 sums = [] for pid, r in childs.items(): sums.append(float(os.read(r, 1024))) os.close(r) # 讀完了,關閉讀描述符 os.waitpid(pid, 0) # 等待子進程結束 return math.sqrt(sum(sums) * 8) print pi(10000000)
輸出
3.14159262176
套接字無疑是通訊使用最爲普遍的方式了,它不但能跨進程還能跨網絡。今天英特網能發達成這樣,全拜套接字所賜。不過做爲同一個機器的多進程通訊仍是挺浪費的。暫不討論這個,仍是先看看它如何使用吧。
# coding: utf-8 import os import sys import math import socket def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): childs = [] unit = n / 10 servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 注意這裏的AF_INET表示普通套接字 servsock.bind(("localhost", 0)) # 0表示隨機端口 server_address = servsock.getsockname() # 拿到隨機出來的地址,給後面的子進程使用 servsock.listen(10) # 監聽子進程鏈接請求 for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: childs.append(pid) else: servsock.close() # 子進程要關閉servsock引用 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(server_address) # 鏈接父進程套接字 s = slice(mink, maxk) # 子進程開始計算 sock.sendall(str(s)) sock.close() # 關閉鏈接 sys.exit(0) # 子進程結束 sums = [] for pid in childs: conn, _ = servsock.accept() # 接收子進程鏈接 sums.append(float(conn.recv(1024))) conn.close() # 關閉鏈接 for pid in childs: os.waitpid(pid, 0) # 等待子進程結束 servsock.close() # 關閉套接字 return math.sqrt(sum(sums) * 8) print pi(10000000)
輸出
3.14159262176
當同一個機器的多個進程使用普通套接字進行通訊時,須要通過網絡協議棧,這很是浪費,由於同一個機器根本沒有必要走網絡。因此Unix提供了一個套接字的特殊版本,它使用和套接字一摸同樣的api,可是地址再也不是網絡端口,而是文件。至關於咱們經過某個特殊文件來進行套接字通訊。
# coding: utf-8 import os import sys import math import socket def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): server_address = "/tmp/pi_sock" # 套接字對應的文件名 childs = [] unit = n / 10 servsock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # 注意AF_UNIX表示「域套接字」 servsock.bind(server_address) servsock.listen(10) # 監聽子進程鏈接請求 for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: childs.append(pid) else: servsock.close() # 子進程要關閉servsock引用 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(server_address) # 鏈接父進程套接字 s = slice(mink, maxk) # 子進程開始計算 sock.sendall(str(s)) sock.close() # 關閉鏈接 sys.exit(0) # 子進程結束 sums = [] for pid in childs: conn, _ = servsock.accept() # 接收子進程鏈接 sums.append(float(conn.recv(1024))) conn.close() # 關閉鏈接 for pid in childs: os.waitpid(pid, 0) # 等待子進程結束 servsock.close() # 關閉套接字 os.unlink(server_address) # 移除套接字文件 return math.sqrt(sum(sums) * 8) print pi(10000000)
輸出
3.14159262176
咱們知道跨網絡通訊免不了要經過套接字進行通訊,可是本例的多進程是在同一個機器上,用不着跨網絡,使用普通套接字進行通訊有點浪費。
上圖爲單進程的socketpair
上圖爲父子進程分離後的socketpair
爲了解決這個問題,Unix系統提供了無名套接字socketpair,不須要端口也能夠建立套接字,父子進程經過socketpair來進行全雙工通訊。
socketpair返回兩個套接字對象,一個用於讀一個用於寫,它有點相似於pipe,只不過pipe返回的是兩個文件描述符,都是整數。因此寫起代碼形式上跟pipe幾乎沒有什麼區別。
咱們使用sock.send()和sock.recv()來對套接字進行讀寫,經過sock.close()來關閉套接字對象。
# coding: utf-8 import os import sys import math import socket def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): childs = {} unit = n / 10 for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit rsock, wsock = socket.socketpair() pid = os.fork() if pid > 0: childs[pid] = rsock wsock.close() else: rsock.close() s = slice(mink, maxk) # 子進程開始計算 wsock.send(str(s)) wsock.close() sys.exit(0) # 子進程結束 sums = [] for pid, rsock in childs.items(): sums.append(float(rsock.recv(1024))) rsock.close() os.waitpid(pid, 0) # 等待子進程結束 return math.sqrt(sum(sums) * 8) print pi(10000000)
輸出
3.14159262176
相對於管道只能用於父子進程之間通訊,Unix還提供了有名管道可讓任意進程進行通訊。有名管道又稱fifo,它會將本身註冊到文件系統裏一個文件,參數通訊的進程經過讀寫這個文件進行通訊。 fifo要求讀寫雙方必須同時打開才能夠繼續進行讀寫操做,不然打開操做會堵塞直到對方也打開。
# coding: utf-8 import os import sys import math def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): childs = [] unit = n / 10 fifo_path = "/tmp/fifo_pi" os.mkfifo(fifo_path) # 建立named pipe for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: childs.append(pid) else: s = slice(mink, maxk) # 子進程開始計算 with open(fifo_path, "w") as ff: ff.write(str(s) + "\n") sys.exit(0) # 子進程結束 sums = [] while True: with open(fifo_path, "r") as ff: # 子進程關閉寫端,讀進程會收到eof # 因此必須循環打開,屢次讀取 # 讀夠數量了就能夠結束循環了 sums.extend([float(x) for x in ff.read(1024).strip().split("\n")]) if len(sums) == len(childs): break for pid in childs: os.waitpid(pid, 0) # 等待子進程結束 os.unlink(fifo_path) # 移除named pipe return math.sqrt(sum(sums) * 8) print pi(10000000)
輸出
3.14159262176
操做系統也提供了跨進程的消息隊列對象可讓咱們直接使用,只不過python沒有默認提供包裝好的api來直接使用。咱們必須使用第三方擴展來完成OS消息隊列通訊。第三方擴展是經過使用Python包裝的C實現來完成的。
OS消息隊列有兩種形式,一種是posix消息隊列,另外一種是systemv消息隊列,有些操做系統二者都支持,有些只支持其中的一個,好比macos僅支持systemv消息隊列,我本地的python的docker鏡像是debian linux,它僅支持posix消息隊列。
posix消息隊列 咱們先使用posix消息隊列來完成圓周率的計算,posix消息隊列須要提供一個惟一的名稱,它必須是/
開頭。close()方法僅僅是減小內核消息隊列對象的引用,而不是完全關閉它。unlink()方法才能完全銷燬它。O_CREAT選項表示若是不存在就建立。向隊列裏塞消息使用send方法,收取消息使用receive方法,receive方法返回一個tuple,tuple的第一個值是消息的內容,第二個值是消息的優先級。之因此有優先級,是由於posix消息隊列支持消息的排序,在send方法的第二個參數能夠提供優先級整數值,默認爲0,越大優先級越高。
# coding: utf-8 import os import sys import math from posix_ipc import MessageQueue as Queue def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 q = Queue("/pi", flags=os.O_CREAT) for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子進程開始計算 q.send(str(s)) q.close() sys.exit(0) # 子進程結束 sums = [] for pid in pids: sums.append(float(q.receive()[0])) os.waitpid(pid, 0) # 等待子進程結束 q.close() q.unlink() # 完全銷燬隊列 return math.sqrt(sum(sums) * 8) print pi(10000000)
輸出
3.14159262176
systemv消息隊列 systemv消息隊列和posix消息隊列用起來有所不一樣。systemv的消息隊列是以整數key做爲名稱,若是不指定,它就建立一個惟一的未佔用的整數key。它還提供消息類型的整數參數,可是不支持消息優先級。
# coding: utf-8 import os import sys import math import sysv_ipc from sysv_ipc import MessageQueue as Queue def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 q = Queue(key=None, flags=sysv_ipc.IPC_CREX) for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子進程開始計算 q.send(str(s)) sys.exit(0) # 子進程結束 sums = [] for pid in pids: sums.append(float(q.receive()[0])) os.waitpid(pid, 0) # 等待子進程結束 q.remove() # 銷燬消息隊列 return math.sqrt(sum(sums) * 8) print pi(10000000)
輸出
3.14159262176
共享內存也是很是常見的多進程通訊方式,操做系統負責將同一份物理地址的內存映射到多個進程的不一樣的虛擬地址空間中。進而每一個進程均可以操做這分內存。考慮到物理內存的惟一性,它屬於臨界區資源,須要在進程訪問時搞好併發控制,好比使用信號量。咱們經過一個信號量來控制全部子進程的順序讀寫共享內存。咱們分配一個8字節double類型的共享內存用來存儲極限的和,每次從共享內存中讀出來時,要使用struct進行反序列化(unpack),將新的值寫進去以前也要使用struct進行序列化(pack)。每次讀寫操做都須要將讀寫指針移動到內存開頭位置(lseek)。
# coding: utf-8 import os import sys import math import struct import posix_ipc from posix_ipc import Semaphore from posix_ipc import SharedMemory as Memory def slice(mink, maxk): s = 0.0 for k in range(mink, maxk): s += 1.0/(2*k+1)/(2*k+1) return s def pi(n): pids = [] unit = n / 10 sem_lock = Semaphore("/pi_sem_lock", flags=posix_ipc.O_CREX, initial_value=1) # 使用一個信號量控制多個進程互斥訪問共享內存 memory = Memory("/pi_rw", size=8, flags=posix_ipc.O_CREX) os.lseek(memory.fd, 0, os.SEEK_SET) # 初始化和爲0.0的double值 os.write(memory.fd, struct.pack('d', 0.0)) for i in range(10): # 分10個子進程 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子進程開始計算 sem_lock.acquire() try: os.lseek(memory.fd, 0, os.SEEK_SET) bs = os.read(memory.fd, 8) # 從共享內存讀出來當前值 cur_val, = struct.unpack('d', bs) # 反序列化,逗號不能少 cur_val += s # 加上當前進程的計算結果 bs = struct.pack('d', cur_val) # 序列化 os.lseek(memory.fd, 0, os.SEEK_SET) os.write(memory.fd, bs) # 寫進共享內存 memory.close_fd() finally: sem_lock.release() sys.exit(0) # 子進程結束 sums = [] for pid in pids: os.waitpid(pid, 0) # 等待子進程結束 os.lseek(memory.fd, 0, os.SEEK_SET) bs = os.read(memory.fd, 8) # 讀出最終這結果 sums, = struct.unpack('d', bs) # 反序列化 memory.close_fd() # 關閉共享內存 memory.unlink() # 銷燬共享內存 sem_lock.unlink() # 銷燬信號量 return math.sqrt(sums * 8) print pi(10000000)
輸出
3.14159262176
閱讀更多Python高級文章,請關注公衆號「碼洞」