Python進程專題6:共享數據與同步

上一篇文章: Python進程專題5:進程間通訊
下一篇文章: Python進程專題7:託管對象
咱們如今知道,進程之間彼此是孤立的,惟一通訊的方式是隊列或管道,但要讓這兩種方式完成進程間通訊,底層離不開共享內容,這就是今天的主角:共享內存。

建立共享值得方法

  • Value
v=Value(typecode,arg1,...,argN,lock):
typecode:要麼是包含array模塊使用的相同類型代碼(如'i'、'd'等)的字符串,要麼是來自ctypes模塊的類型對象
(例如:ctypes.c_int,ctypes.c_double等)。
arg1,...,argN:傳遞給構造函數的參數。
lock:只能使用關鍵字傳入的參數,默認爲True:將建立一個新鎖來保護對值的訪問。若是傳入一個現有鎖,該鎖將用於進行同步。

訪問底層的值:v.value
  • RawValue
r=RawValue(typecode,arg1,...,argN):同Value對象,惟一區別是不存在lock
  • Array
a=Array(typecode,initializer,lock):在共享內存中建立ctypes數組。
initializer:要麼是設置數組初始大小的整數,要麼是項序列,其值和大小用於初始化數組。
能夠使用標準的Python索引、切片、迭代操做訪問它,其中每項操做均→鎖進程同步,
對於字節字符串,a還具備a.value屬性,能夠把整個數組當作一個字符串進行訪問。
  • RawArray
r=RawArray(typcode,initlizer):同Array,單不存在鎖。當所編寫的程序必須一次性操做大量的數組項時,
若是同時使用這種數據類型和用於同步的單獨大的鎖,性能將極大提高。
  • 同步原語
除了使用上面方法建立共享值,multiprocess模塊還提供了一下同步原語的共享版本。
原語 描述
Lock 互斥鎖
RLock 可重入的互斥鎖(同一個進程能夠多吃得到它,同時不會形成阻塞)
Semaphore 信號量
BoundedSemaphore 有邊界的信號量
Event 事件
Condition 條件變量

實例:使用共享數組代替管道,將一個由浮點數組成的Python隊列發送給另一個進程。

代碼:segmentfault

#使用共享數組代替管道,將一個由浮點數組成的Python列表發送給另一個進程
import multiprocessing

class FloatChannel(object):
    def __init__(self,maxsize):
        #在共享內存中建立一個試數組
        self.buffer=multiprocessing.RawArray('d',maxsize)
        #在共享內存中建立ctypes對象
        self.buffer_len=multiprocessing.Value('i')
        #定義一個信號量1表明:empty
        self.empty=multiprocessing.Semaphore(1)
        #定義一個信號量0表明:full
        self.full=multiprocessing.Semaphore(0)

    def send(self,values):
        #只在緩存爲null時繼續
        #acquire()會阻塞線程,直到release被調用
        self.empty.acquire()
        nitems=len(values)
        print("保存內容的長度",nitems)
        #設置緩衝區大小
        self.buffer_len.value=nitems
        #將值複製到緩衝區中
        self.buffer[:nitems]=values
        print(self.buffer[:nitems])
        #發信號通知緩衝區已滿
        self.full.release()

    def recv(self):
        #只在緩衝區已滿時繼續
        self.full.acquire()
        #複製值
        values=self.buffer[:self.buffer_len.value]
        #發送信號,通知緩衝區爲空
        self.empty.release()
        return values

#性能測試,接受多條消息
def consume_test(count,ch):
    #for i in range(count):
        values=ch.recv()
        print("接收到的值:",values)

#性能測試,發送多條消息
def produce_test(count,values,ch):
    #for i in range(count):
        print("發送:",values)
        ch.send(values)


if __name__=="__main__":
    ch=FloatChannel(10000)
    p=multiprocessing.Process(target=consume_test,args=(1000,ch))

    p.start()

    values=[float(x) for x in range(10)]

    produce_test(10,values,ch)

    print("done")

    p.join()

結果:數組

發送: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
保存內容的長度 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
done
接收到的值: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
相關文章
相關標籤/搜索