繼上節使用原生多進程並行運行,基於Redis做爲消息隊列完成了圓周率的計算,本節咱們使用原生操做系統消息隊列來替換Redis。python
使用文件進行通訊是最簡單的一種通訊方式,子進程將結果輸出到臨時文件,父進程從文件中讀出來。文件名使用子進程的進程id來命名。進程隨時均可以經過os.getpid()
來獲取本身的進程id。linux
# coding: utf-8
import os
import sys
import math
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
pids = []
unit = n / 10
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
pids.append(pid)
else:
s = slice(mink, maxk) # 子進程開始計算
with open("%d" % os.getpid(), "w") as f:
f.write(str(s))
sys.exit(0) # 子進程結束
sums = []
for pid in pids:
os.waitpid(pid, 0) # 等待子進程結束
with open("%d" % pid, "r") as f:
sums.append(float(f.read()))
os.remove("%d" % pid) # 刪除通訊的文件
return math.sqrt(sum(sums) * 8)
print pi(10000000)
複製代碼
輸出docker
3.14159262176
複製代碼
管道是Unix進程間通訊最經常使用的方法之一,它經過在父子進程之間開通讀寫通道來進行雙工交流。咱們經過os.read()和os.write()來對文件描述符進行讀寫操做,使用os.close()關閉描述符。macos
# coding: utf-8
import os
import sys
import math
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
childs = {}
unit = n / 10
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
r, w = os.pipe()
pid = os.fork()
if pid > 0:
childs[pid] = r # 將子進程的pid和讀描述符存起來
os.close(w) # 父進程關閉寫描述符,只讀
else:
os.close(r) # 子進程關閉讀描述符,只寫
s = slice(mink, maxk) # 子進程開始計算
os.write(w, str(s))
os.close(w) # 寫完了,關閉寫描述符
sys.exit(0) # 子進程結束
sums = []
for pid, r in childs.items():
sums.append(float(os.read(r, 1024)))
os.close(r) # 讀完了,關閉讀描述符
os.waitpid(pid, 0) # 等待子進程結束
return math.sqrt(sum(sums) * 8)
print pi(10000000)
複製代碼
輸出api
3.14159262176
複製代碼
套接字無疑是通訊使用最爲普遍的方式了,它不但能跨進程還能跨網絡。今天英特網能發達成這樣,全拜套接字所賜。不過做爲同一個機器的多進程通訊仍是挺浪費的。暫不討論這個,仍是先看看它如何使用吧。bash
# coding: utf-8
import os
import sys
import math
import socket
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
childs = []
unit = n / 10
servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 注意這裏的AF_INET表示普通套接字
servsock.bind(("localhost", 0)) # 0表示隨機端口
server_address = servsock.getsockname() # 拿到隨機出來的地址,給後面的子進程使用
servsock.listen(10) # 監聽子進程鏈接請求
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
childs.append(pid)
else:
servsock.close() # 子進程要關閉servsock引用
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address) # 鏈接父進程套接字
s = slice(mink, maxk) # 子進程開始計算
sock.sendall(str(s))
sock.close() # 關閉鏈接
sys.exit(0) # 子進程結束
sums = []
for pid in childs:
conn, _ = servsock.accept() # 接收子進程鏈接
sums.append(float(conn.recv(1024)))
conn.close() # 關閉鏈接
for pid in childs:
os.waitpid(pid, 0) # 等待子進程結束
servsock.close() # 關閉套接字
return math.sqrt(sum(sums) * 8)
print pi(10000000)
複製代碼
輸出網絡
3.14159262176
複製代碼
當同一個機器的多個進程使用普通套接字進行通訊時,須要通過網絡協議棧,這很是浪費,由於同一個機器根本沒有必要走網絡。因此Unix提供了一個套接字的特殊版本,它使用和套接字一摸同樣的api,可是地址再也不是網絡端口,而是文件。至關於咱們經過某個特殊文件來進行套接字通訊。併發
# coding: utf-8
import os
import sys
import math
import socket
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
server_address = "/tmp/pi_sock" # 套接字對應的文件名
childs = []
unit = n / 10
servsock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # 注意AF_UNIX表示「域套接字」
servsock.bind(server_address)
servsock.listen(10) # 監聽子進程鏈接請求
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
childs.append(pid)
else:
servsock.close() # 子進程要關閉servsock引用
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(server_address) # 鏈接父進程套接字
s = slice(mink, maxk) # 子進程開始計算
sock.sendall(str(s))
sock.close() # 關閉鏈接
sys.exit(0) # 子進程結束
sums = []
for pid in childs:
conn, _ = servsock.accept() # 接收子進程鏈接
sums.append(float(conn.recv(1024)))
conn.close() # 關閉鏈接
for pid in childs:
os.waitpid(pid, 0) # 等待子進程結束
servsock.close() # 關閉套接字
os.unlink(server_address) # 移除套接字文件
return math.sqrt(sum(sums) * 8)
print pi(10000000)
複製代碼
輸出app
3.14159262176
複製代碼
咱們知道跨網絡通訊免不了要經過套接字進行通訊,可是本例的多進程是在同一個機器上,用不着跨網絡,使用普通套接字進行通訊有點浪費。socket
爲了解決這個問題,Unix系統提供了無名套接字socketpair,不須要端口也能夠建立套接字,父子進程經過socketpair來進行全雙工通訊。
socketpair返回兩個套接字對象,一個用於讀一個用於寫,它有點相似於pipe,只不過pipe返回的是兩個文件描述符,都是整數。因此寫起代碼形式上跟pipe幾乎沒有什麼區別。
咱們使用sock.send()和sock.recv()來對套接字進行讀寫,經過sock.close()來關閉套接字對象。
# coding: utf-8
import os
import sys
import math
import socket
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
childs = {}
unit = n / 10
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
rsock, wsock = socket.socketpair()
pid = os.fork()
if pid > 0:
childs[pid] = rsock
wsock.close()
else:
rsock.close()
s = slice(mink, maxk) # 子進程開始計算
wsock.send(str(s))
wsock.close()
sys.exit(0) # 子進程結束
sums = []
for pid, rsock in childs.items():
sums.append(float(rsock.recv(1024)))
rsock.close()
os.waitpid(pid, 0) # 等待子進程結束
return math.sqrt(sum(sums) * 8)
print pi(10000000)
複製代碼
輸出
3.14159262176
複製代碼
相對於管道只能用於父子進程之間通訊,Unix還提供了有名管道可讓任意進程進行通訊。有名管道又稱fifo,它會將本身註冊到文件系統裏一個文件,參數通訊的進程經過讀寫這個文件進行通訊。 fifo要求讀寫雙方必須同時打開才能夠繼續進行讀寫操做,不然打開操做會堵塞直到對方也打開。
# coding: utf-8
import os
import sys
import math
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
childs = []
unit = n / 10
fifo_path = "/tmp/fifo_pi"
os.mkfifo(fifo_path) # 建立named pipe
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
childs.append(pid)
else:
s = slice(mink, maxk) # 子進程開始計算
with open(fifo_path, "w") as ff:
ff.write(str(s) + "\n")
sys.exit(0) # 子進程結束
sums = []
while True:
with open(fifo_path, "r") as ff:
# 子進程關閉寫端,讀進程會收到eof
# 因此必須循環打開,屢次讀取
# 讀夠數量了就能夠結束循環了
sums.extend([float(x) for x in ff.read(1024).strip().split("\n")])
if len(sums) == len(childs):
break
for pid in childs:
os.waitpid(pid, 0) # 等待子進程結束
os.unlink(fifo_path) # 移除named pipe
return math.sqrt(sum(sums) * 8)
print pi(10000000)
複製代碼
輸出
3.14159262176
複製代碼
操做系統也提供了跨進程的消息隊列對象可讓咱們直接使用,只不過python沒有默認提供包裝好的api來直接使用。咱們必須使用第三方擴展來完成OS消息隊列通訊。第三方擴展是經過使用Python包裝的C實現來完成的。
OS消息隊列有兩種形式,一種是posix消息隊列,另外一種是systemv消息隊列,有些操做系統二者都支持,有些只支持其中的一個,好比macos僅支持systemv消息隊列,我本地的python的docker鏡像是debian linux,它僅支持posix消息隊列。
posix消息隊列 咱們先使用posix消息隊列來完成圓周率的計算,posix消息隊列須要提供一個惟一的名稱,它必須是/
開頭。close()方法僅僅是減小內核消息隊列對象的引用,而不是完全關閉它。unlink()方法才能完全銷燬它。O_CREAT選項表示若是不存在就建立。向隊列裏塞消息使用send方法,收取消息使用receive方法,receive方法返回一個tuple,tuple的第一個值是消息的內容,第二個值是消息的優先級。之因此有優先級,是由於posix消息隊列支持消息的排序,在send方法的第二個參數能夠提供優先級整數值,默認爲0,越大優先級越高。
# coding: utf-8
import os
import sys
import math
from posix_ipc import MessageQueue as Queue
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
pids = []
unit = n / 10
q = Queue("/pi", flags=os.O_CREAT)
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
pids.append(pid)
else:
s = slice(mink, maxk) # 子進程開始計算
q.send(str(s))
q.close()
sys.exit(0) # 子進程結束
sums = []
for pid in pids:
sums.append(float(q.receive()[0]))
os.waitpid(pid, 0) # 等待子進程結束
q.close()
q.unlink() # 完全銷燬隊列
return math.sqrt(sum(sums) * 8)
print pi(10000000)
複製代碼
輸出
3.14159262176
複製代碼
systemv消息隊列 systemv消息隊列和posix消息隊列用起來有所不一樣。systemv的消息隊列是以整數key做爲名稱,若是不指定,它就建立一個惟一的未佔用的整數key。它還提供消息類型的整數參數,可是不支持消息優先級。
# coding: utf-8
import os
import sys
import math
import sysv_ipc
from sysv_ipc import MessageQueue as Queue
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
pids = []
unit = n / 10
q = Queue(key=None, flags=sysv_ipc.IPC_CREX)
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
pids.append(pid)
else:
s = slice(mink, maxk) # 子進程開始計算
q.send(str(s))
sys.exit(0) # 子進程結束
sums = []
for pid in pids:
sums.append(float(q.receive()[0]))
os.waitpid(pid, 0) # 等待子進程結束
q.remove() # 銷燬消息隊列
return math.sqrt(sum(sums) * 8)
print pi(10000000)
複製代碼
輸出
3.14159262176
複製代碼
共享內存也是很是常見的多進程通訊方式,操做系統負責將同一份物理地址的內存映射到多個進程的不一樣的虛擬地址空間中。進而每一個進程均可以操做這分內存。考慮到物理內存的惟一性,它屬於臨界區資源,須要在進程訪問時搞好併發控制,好比使用信號量。咱們經過一個信號量來控制全部子進程的順序讀寫共享內存。咱們分配一個8字節double類型的共享內存用來存儲極限的和,每次從共享內存中讀出來時,要使用struct進行反序列化(unpack),將新的值寫進去以前也要使用struct進行序列化(pack)。每次讀寫操做都須要將讀寫指針移動到內存開頭位置(lseek)。
# coding: utf-8
import os
import sys
import math
import struct
import posix_ipc
from posix_ipc import Semaphore
from posix_ipc import SharedMemory as Memory
def slice(mink, maxk):
s = 0.0
for k in range(mink, maxk):
s += 1.0/(2*k+1)/(2*k+1)
return s
def pi(n):
pids = []
unit = n / 10
sem_lock = Semaphore("/pi_sem_lock", flags=posix_ipc.O_CREX, initial_value=1) # 使用一個信號量控制多個進程互斥訪問共享內存
memory = Memory("/pi_rw", size=8, flags=posix_ipc.O_CREX)
os.lseek(memory.fd, 0, os.SEEK_SET) # 初始化和爲0.0的double值
os.write(memory.fd, struct.pack('d', 0.0))
for i in range(10): # 分10個子進程
mink = unit * i
maxk = mink + unit
pid = os.fork()
if pid > 0:
pids.append(pid)
else:
s = slice(mink, maxk) # 子進程開始計算
sem_lock.acquire()
try:
os.lseek(memory.fd, 0, os.SEEK_SET)
bs = os.read(memory.fd, 8) # 從共享內存讀出來當前值
cur_val, = struct.unpack('d', bs) # 反序列化,逗號不能少
cur_val += s # 加上當前進程的計算結果
bs = struct.pack('d', cur_val) # 序列化
os.lseek(memory.fd, 0, os.SEEK_SET)
os.write(memory.fd, bs) # 寫進共享內存
memory.close_fd()
finally:
sem_lock.release()
sys.exit(0) # 子進程結束
sums = []
for pid in pids:
os.waitpid(pid, 0) # 等待子進程結束
os.lseek(memory.fd, 0, os.SEEK_SET)
bs = os.read(memory.fd, 8) # 讀出最終這結果
sums, = struct.unpack('d', bs) # 反序列化
memory.close_fd() # 關閉共享內存
memory.unlink() # 銷燬共享內存
sem_lock.unlink() # 銷燬信號量
return math.sqrt(sums * 8)
print pi(10000000)
複製代碼
輸出
3.14159262176
複製代碼
閱讀更多Python高級文章,請關注公衆號「碼洞」