multiprocessing模塊

multiprocessing模塊爲在子進程中運行任務、通訊和共享數據,以及執行各類形式的同步提供支持。進程沒有任何共享狀態,若是某個進程修改數據,改動只限於該進程內。安全

Process()類:表示運行在一個子進程中的任務。服務器

class Process(object):
def init(self, group=None, target=None, name=None, args=(), kwargs={}):網絡

參數:dom

group:未使用ide

target:當前進程啓動時執行的可調用對象函數

name:爲進程指定描述性名稱的字符串線程

args:傳遞給target的位置參數的元組code

kwargs:傳遞給target的關鍵字參數的字典server

實例p有如下方法和屬性:對象

'authkey','daemon', 'exitcode', 'ident', 'is_alive', 'join', 'name', 'pid', 'run', 'sentinel', 'start', 'terminate'

方法

1.p.is_alive():若是p仍然運行,返回True

2.p.join():等待進程p終止。timeout是可選的超市期限。進程能夠被鏈接無數次,但若是鏈接自身則會出錯。

3.p.run():進程啓動時運行的方法。默認狀況下,會調用傳遞給Process構造函數的target。定義進程的另外一個方法是繼承Process類並從新實現run()函數。

4.p.start():啓動進程。這將運行表明進程的子進程,並調用該子進程中的p.run()函數

5.p.terminate():強制終止進程。若是調用此函數,進程P將被當即終止,同時不會進行任何清理動做。若是進程p建立了它本身的子進程,這些進程將變爲殭屍進程。使用此方法時需特別當心。若是p保存了一個鎖或參與了進程間通訊,那麼終止它可能會致使死鎖或I/O損壞。

屬性

6.p.authkey:進程的身份驗證鍵。除非顯式設定,這是由os. urandom()函數生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性。這類鏈接只有在兩端具備相同的身份驗證鍵時才能成功。

7.p.daemon:

一個布爾標誌,指示進程是不是後臺進程。當建立它的 Python進程終止時,後臺( Daemonic)進程將自動終止。另外,禁止後臺進程建立本身的新進程。p.daemon的值必須在使用 p.start()函數啓動進程以前進行設置。

8.p.exitcode:進程的整數退出代碼。若是進程仍然在運行,它的值爲None。若是值爲負數,-N表示進程由信號N所終止。

9.p.name:進程的名稱。

10.p.pid:進程的整數進程ID。

11.p.ident:

12.p.sentinel:

  • 例子:demo3.py

進程間通訊

  • multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列。這兩種方法都是使用消息傳遞實現的,但隊列接口有意模仿線程程序中常見的隊列用法。

隊列

Queue( [maxsize])

class Queue(object):

def __init__(self, maxsize=-1):

self._maxsize = maxsize
  • 建立共享的進程隊列。 maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底

層隊列使用管道和鎖實現。另外,還須要運行攴持線程以便將隊列中的數據傳輸到底層管道中

  • Queue的實例q具備如下方法:
'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join_thread', 'put', 'put_nowait', 'qsize'
  1. q.cancel_join_thread():

  2. q.close():

  3. q.empty():

  4. q.full():

  5. q.get():

  6. q.get_nowait():

  7. q.join_thread():

  8. q.put():

  9. q.put_nowait():

  10. q.qsize():

JoinableQueue([maxsize])

  • 建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項的消費者通知生產者項已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

  • JoinableQueue的實例p除了與 Queue對象相同的方法以外,還具備如下方法:

'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join', 'join_thread', 'put', 'put_nowait', 'qsize', 'task_done'
  1. p.task_done():消費者使用此方法發出信號,表示qget()返回的項已經被處理。若是調用此方法的次數大於從隊列中刪除的項的數量,將引起 Valueerror異常。

  2. p.join():生產者使用此方法進行阻塞,直到隊列中的全部項均被處理。阻塞將持續到爲隊列中的每一個項均調用q.task done()方法爲止。

  • 例子:demo4.py

管道

Pip([duplex])

在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn和conn2是表示管道兩端的 Connection對象。默認狀況下,管道是雙向的。若是將 duplex置爲 False,conn只能用於接收,而con2只能用於發送。必須在建立和啓動使用管道的 Process對象以前調用pipe()方法。

def Pipe(duplex=True):

return Connection(), Connection()

Pipe()方法返回的 Connection對象的實例c具備如下方法和屬性。

  1. c.close():關閉鏈接。若是c被垃圾回收,將自動調用此方法。

  2. c.fileno():返回鏈接使用的整數文件描述符。

  3. c.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將 timeout置爲None,操做將無限期地等待數據到達。

  4. c.recv():接收c.send()方法返回的對象。若是鏈接的另外一端已經關閉,不再存在任何數據,將引起EOFERror異常。

  5. c.recv_bytes([maxlength]):

接受c.send_ bytes()方法發送的一條完整的字節消息。 maxlength指定要接收的最大字節數。若是進入的消息超過了這個最大值,將引起 IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另外一端已經關閉,不再存在任何數據,將引EOFERror異常。

  1. c.recv_bytes_into(buffer [ offset]):接收一條完整的字節消息,並把它保存在 buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起 BufferTooShort異常。

  2. c.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象

  3. C.send_bytes(buffer [,offset [,size]]):經過鏈接發送字節數據緩衝區。 buffer是支持緩衝區接口的任意對象, offset是緩衝區中的字

節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收。

如何使用管道實現前面的生產者-消費者問題

# import multiprocessing

# def consumer(output_p,input_p):

# input_p.close()

# while True:

# try:

# item = output_p.recv()

# except EOFError:

# break

# print(item)

# print("Consumer done")

#

#

# def producer(sequence,input_P):

# for item in sequence:

# input_P.send(item)

#

#

# if __name__ == '__main__':

# (output_p,input_p) = multiprocessing.Pipe()

# cons_p = multiprocessing.Process(target=consumer,args=(output_p,input_p),)

# cons_p.start()

#

#

# output_p.close()

#

# sequence = [1,2,3,4]

# producer(sequence,input_p)

# input_p.close()

#

# cons_p.join()

管道可用於雙向通訊。利用一般在客戶端/服務器計算中使用的請求響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序,例如:

import multiprocessing

def add(pipe):

server_P, client_P = pipe

client_P.close()

while True:

try:

x,y = server_P.recv()

except EOFError:

break

result = x + y

server_P.send(result)

print("Server done")

if __name__ == '__main__':

(server_P, client_P) = multiprocessing.Pipe()

adder_p = multiprocessing.Process(target=add,args=((server_P, client_P),))

adder_p.start()

server_P.close()

client_P.send((3,4))

print(client_P.recv())

client_P.send(('hello','world'))

print(client_P.recv())

client_P.close()

adder_p.join()
  • 如何使用管道實現前面的生產者-消費者問題這個例子中有一個錯誤:

output_p, input_p= pipe

傳進來的只有一個參數卻賦值給兩個參數。

相關文章
相關標籤/搜索