python 學習筆記 - Queue & Pipes,進程間通信

上面寫了Python如何建立多個進程,可是前面文章中建立的進程都是啞吧和聾子,本身顧本身執行,不會相互交流。
那麼如何讓進程間相互說說話呢?
Python爲咱們提供了一個函數multiprocessing.Pipe
和一個類:multiprocessing.Queue。dom

multiprocessing.Pipe()

multiprocessing.Pipe()即管道模式,調用Pipe()返回管道的兩端的Connection。函數

Python官方文檔的描述:
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

所以, Pipe僅僅適用於只有兩個進程一讀一寫的單雙工狀況,也就是說信息是隻向一個方向流動。例如電視、廣播,看電視的人只能看,電視臺是能播送電視節目。ui

Pipe的讀寫效率要高於Queue。
進程間的Pipe基於fork機制創建。
當主進程建立Pipe的時候,Pipe的兩個Connections鏈接的的都是主進程。
當主進程建立子進程後,Connections也被拷貝了一份。此時有了4個Connections。
此後,關閉主進程的一個Out Connection,關閉一個子進程的一個In Connection。那麼就創建好了一個輸入在主進程,輸出在子進程的管道。
原理示意圖以下:
Pipe原理示意圖
跟多資料能夠閱讀:http://www.tuicool.com/articl...spa

# 示例代碼
# coding=utf-8
from multiprocessing import Pipe, Process


def son_process(x, pipe):
    _out_pipe, _in_pipe = pipe

    # 關閉fork過來的輸入端
    _in_pipe.close()
    while True:
        try:
            msg = _out_pipe.recv()
            print msg
        except EOFError:
            # 當out_pipe接受不到輸出的時候且輸入被關閉的時候,會拋出EORFError,能夠捕獲而且退出子進程
            break


if __name__ == '__main__':
    out_pipe, in_pipe = Pipe(True)
    son_p = Process(target=son_process, args=(100, (out_pipe, in_pipe)))
    son_p.start()

    # 等pipe被fork 後,關閉主進程的輸出端
    # 這樣,建立的Pipe一端鏈接着主進程的輸入,一端鏈接着子進程的輸出口
    out_pipe.close()
    for x in range(1000):
        in_pipe.send(x)
    in_pipe.close()
    son_p.join()
    print "主進程也結束了"

總結一下:code

  • 上面的代碼中主要用到了pipe的send()、recv()、close()方法。當pipe的輸入端被關閉,且沒法接收到輸入的值,那麼就會拋出EOFError。blog

  • 新建一個Pipe(duplex)的時候,若是duplex爲True,那麼建立的管道是雙向的;若是duplex爲False,那麼建立的管道是單向的。隊列

multiprocessing.Queue

Queue據官方文檔也是基於pipe的實現。
Queue的使用主要是一邊put(),一邊get().可是Queue能夠是多個Process 進行put操做,也能夠是多個Process進行get()操做。
Demo:進程

# coding=utf-8
from multiprocessing import Queue, Process
from Queue import Empty as QueueEmpty
import random


def getter(name, queue):
    print 'Son process %s' % name
    while True:
        try:
            value = queue.get(True, 10)
            # block爲True,就是若是隊列中無數據了。
            #   |—————— 若timeout默認是None,那麼會一直等待下去。
            #   |—————— 若timeout設置了時間,那麼會等待timeout秒後纔會拋出Queue.Empty異常
            # block 爲False,若是隊列中無數據,就拋出Queue.Empty異常
            print "Process getter get: %f" % value
        except QueueEmpty:
            break


def putter(name, queue):
    print "Son process %s" % name
    for i in range(0, 1000):
        value = random.random()
        queue.put(value)
        # 放入數據 put(obj[, block[, timeout]])
        # 若block爲True,如隊列是滿的:
        #  |—————— 若timeout是默認None,那麼就會一直等下去
        #  |—————— 若timeout設置了等待時間,那麼會等待timeout秒後,若是仍是滿的,那麼就拋出Queue.Full.
        # 若block是False,若是隊列滿了,直接拋出Queue.Full
        print "Process putter put: %f" % value


if __name__ == '__main__':
    queue = Queue()
    getter_process = Process(target=getter, args=("Getter", queue))
    putter_process = Process(target=putter, args=("Putter", queue))
    getter_process.start()
    putter_process.start()

Queue的一些說明已經寫在代碼中了。ip

相關文章
相關標籤/搜索