python -- 進程補充

生產者消費者模型

import time
import random
from multiprocessing import Queue
from multiprocessing import Process
def producer(q):
    for i in range(10):
        q.put('%s-%s'%(food,i)
        print('生產了%s'%food)
        time.sleep(random.randint(1,3)
    q.put(None)

    
def consumer():
    while True:
        food = q.get()
        if food == None:break
        print('%s 吃了 %s'%(name,food))
    
if __name__ == ' __main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,'泔水'))
    p1.start()
    p2 = Process(target=producer,args=(q,'骨頭'))
    p2.start()
    c1 = Process(target=consumer,args=(q,'alex'))
    c1.start()
    c2 = Process(target=consumer,args=(q,'jin'))
    c2.start()
    c3 = Process(target=consumer,args=(q,'egon'))
    c3.start()

#隊列很安全

生產者消費者模型
  1.消費者要處理多少數據是不肯定的
  2.只能用while循環來處理數據,但沒法結束
  3.須要生產者發送信號
  4.有多少個消費者 就須要發送多少個信號
  5.可是發送的信號數量須要根據 生產者數組

JoinableQueue([maxsize]) 方法

import time
import random
from multiprocessing import JoinableQueue
from multiprocessing import Process

def producer(q):
    for i in range(10):
        q.put('%s-%s'%(food,i)
        print('生產了%s'%food)
        time.sleep(random.randint(1,3)
    q.join() #等待消費者把全部數據都處理完
    
def consumer():
    while True:
        food = q.get()
        if food == None:break
        print('%s 吃了 %s'%(name,food))
        q.task_done()
        
if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,'泔水'))
    p1.start()
    p2 = Process(target=producer,args=(q,'骨頭'))
    p2.start()
    c1 = Process(target=consumer,args=(q,'alex'))
    c1.daemon = True
    c1.start()
    c2 = Process(target=consumer,args=(q,'jin'))
    c2.daemon = True
    c2.start()
    c3 = Process(target=consumer,args=(q,'egon'))
    c3.daemon = True
    c3.start()
    
    p1.join() #等待p1執行完畢
    p2.join()

管道

#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2)
  ,其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹: dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。
    若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
#其餘方法: conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回鏈接使用的整數文件描述符 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。
  timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):
  接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。
  若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。
  若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):
  經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,
  而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,
  該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。
  offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
from multiprocessing import Process
from multiprocessing import Pipe

def func(p):
    foo,son = p
    foo.close()
    while True:
        try:
            print(son.recv())
        except EOFError:
            break
        # print(son.recv())

if __name__ == '__main__':
    foo,son = Pipe()
    p = Process(target=func, args=((foo,son),))
    p.start()
    son.close()
    foo.send('hello')
    foo.send('hello')
    foo.send('hello')
    foo.send('hello')
    foo.send('hello')
    foo.close()
from multiprocessing import Process
from multiprocessing import Pipe
from multiprocessing import Lock

def func(p,l):
    foo, son = p
    foo.close()
    while True:
        try:
            l.acquire()
            print(son.recv())
            l.release()
        except EOFError:
            l.release()
            son.close()
            break

def func2(p):
    foo, son = p
    son.close()
    for i in range(10):
        foo.send(i)
    foo.close()

if __name__ == '__main__':
    foo,son = Pipe()
    l = Lock()
    p = Process(target=func,args=((foo,son),l))
    p1 = Process(target=func,args=((foo,son),l))
    p2 = Process(target=func,args=((foo,son),l))
    p.start()
    p1.start()
    p2.start()
    p3 = Process(target=func2, args=((foo, son),)).start()
    p4 = Process(target=func2, args=((foo, son),)).start()
    p5 = Process(target=func2, args=((foo, son),)).start()
    p6 = Process(target=func2, args=((foo, son),)).start()
    p7 = Process(target=func2, args=((foo, son),)).start()
    son.close()
    foo.close()

應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。 安全

管道具備不安全性,而隊列至關於管道與鎖的結合。併發

進程之間的數據共享

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

'''
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
'''

進程池和multiprocess.Pool模塊

進程池

進程池的概念。app

在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?dom

在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。異步

Pool([numprocess  [,initializer [, initargs]]]):建立進程池

numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組

p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
   
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成

P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用


方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
import os
import time
from multiprocessing import Pool
print(os.cpu_count())
def func(i):
    time.sleep(1)
    print(i,os.getpid())
    
if __name__ == '__main__':
    p = Pool(5)
    p.map(func,range(20))  #默認是無序的
    p.close()  #不容許再向進程池中添加任務
    p.join()
    print('====>')




import time
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i += 1
    
if __name__ == '__main__':
    p = Pool(5)
    start = time.time()
    #target=func,args=next(iterable)
    p.map(func,range(20))  #默認是無序的
    p.close()  #不容許再向進程池中添加任務
    p.join()
    print('====>')
數據池與進程之間的對比
import time
from multiprocessing import Pool

def func(i):
    time.sleep(1)
    i += 1
    return i+1
    
if __name__ == '__main__':
    p = Pool(5)
    for i in range(20):
        p.apply(func,args=(i,))
        #apply是同步提交任務的機制
    p.close() #close必須加在join前 不準添新任務
    p.join()  #等待子進程結束在往下執行
數據池的同步調用
import time
from multiprocessing import Pool

def func(i):
    time.sleep(1)
    i += 1
    return i+1
    
if __name__ == '__main__':
    p = Pool(5)
    for i in range(20):
        p.apply_async(func,args=(i,))
        #apply是異步提交任務的機制
        #異步必需要有close和join
    p.close() #close必須加在join前 不準添新任務
    p.join()  #等待子進程結束在往下執行
數據池異步調用
相關文章
相關標籤/搜索