python之進程----Queue

1、Queue是經過multiprocessing使用
from multiprocessing import Process,Queue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        if res is None:
            break
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃了 %s\033[0m' % (os.getpid(), res))
def producer(q):
    for i in range(5):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 製造了 %s\033[0m' %(os.getpid(),res))
    q.put(None)
if __name__ == '__main__':
    q=Queue()
    #生產者們:廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:吃貨們
    p2=Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('')
生產者,消費者模型1
from multiprocessing import Process,Queue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃了 %s\033[0m' % (os.getpid(), res))
def product_baozi(q):
    for i in range(3):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 製造了 %s\033[0m' %(os.getpid(),res))
def product_jiaozi(q):
    for i in range(3):
        time.sleep(2)
        res='餃子%s' %i
        q.put(res)
        print('\033[44m%s 製造了 %s\033[0m' %(os.getpid(),res))
def product_dabing(q):
    for i in range(3):
        time.sleep(2)
        res='大餅%s' %i
        q.put(res)
        print('\033[44m%s 製造了 %s\033[0m' %(os.getpid(),res))
if __name__ == '__main__':
    q=Queue()
    #生產者們:廚師們
    p1=Process(target=product_baozi,args=(q,))
    p2=Process(target=product_jiaozi,args=(q,))
    p3=Process(target=product_dabing,args=(q,))
    #消費者們:吃貨們
    p4=Process(target=consumer,args=(q,))
    p5=Process(target=consumer,args=(q,))

    p_l=[p1,p2,p3,p4,p5]
    for p in p_l:
        p.start()
    # for p in p_l:
    #     p.join()
    # p1.start()
    # p2.start()
    # p3.start()
    # p4.start()
    # p5.start()
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    p4.join()
    p5.join()
    print('')
生產者,消費者模型2
 q .put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),
而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。
若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。 q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。
若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。
若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
1:能夠往隊列裏聽任意類型的數據 2 隊列:先進先出
from multiprocessing import Process,Queue
q=Queue(3)
q.put('first')
q.put('second')
q.put('third')
# q.put('fourht')

print(q.get())
print(q.get())
print(q.get())
# print(q.get())
q.put和q.get
 
 
from multiprocessing import Process,Queue
q=Queue(3)
q.put('first',block=False)
q.put('second',block=False)
q.put('third',block=False)
# q.put('fourth',block=False)
q.put('fourth',block=True,timeout=3)


q.get(block=False)
q.get(block=True,timeout=3)

q.get_nowait() #q.get(block=False)
p.get的參數
2、JoinableQueue一樣經過multiprocessing使用。 
     JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
   q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數      量,將引起ValueError異常
 q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃了 %s\033[0m' % (os.getpid(), res))
        q.task_done()
def product_baozi(q):
    for i in range(5):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 製造了 %s\033[0m' %(os.getpid(),res))
    q.join()
if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們:廚師們
    p1=Process(target=product_baozi,args=(q,))
    #消費者們:吃貨們
    p4=Process(target=consumer,args=(q,))
    p4.daemon=True
    p1.start()
    p4.start()
    p1.join()
    print('')
生產者,消費者模型3
from multiprocessing import Process,JoinableQueue
import time
import random
import os
def product_baozi(q):
    for i in range(3):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 製造了 %s\033[0m' %(os.getpid(),res))
    q.join()
def product_jiaozi(q):
    for i in range(3):
        time.sleep(2)
        res='餃子%s' %i
        q.put(res)
        print('\033[44m%s 製造了 %s\033[0m' %(os.getpid(),res))
    q.join()
def product_dabing(q):
    for i in range(3):
        time.sleep(2)
        res='大餅%s' %i
        q.put(res)
        print('\033[44m%s 製造了 %s\033[0m' %(os.getpid(),res))
    q.join()
def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃了 %s\033[0m' % (os.getpid(), res))
        q.task_done()
if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們:廚師們
    p1=Process(target=product_baozi,args=(q,))
    p2=Process(target=product_jiaozi,args=(q,))
    p3=Process(target=product_dabing,args=(q,))
    #消費者們:吃貨們
    p4=Process(target=consumer,args=(q,))
    p5=Process(target=consumer,args=(q,))
    p4.daemon=True
    p5.daemon=True
    p_l=[p1,p2,p3,p4,p5]
    for p in p_l:
        p.start()
    p1.join()
    p2.join()
    p3.join()
    print('')
生產者,消費者模型4

三,互斥鎖json

互斥鎖應用:dom

from multiprocessing import Process,Lock
import os
import time
def work(mutex):
    mutex.acquire()
    print('task[%s] 上廁所' %os.getpid())
    time.sleep(3)
    print('task[%s] 上完廁所' %os.getpid())
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    p1=Process(target=work,args=(mutex,))
    p2=Process(target=work,args=(mutex,))
    p3=Process(target=work,args=(mutex,))
    p1.start()
    p2.start()
    p3.start()
    print('')
互斥鎖

4、模擬搶票:ide

from multiprocessing import Process,Lock
import json
import time
import random
import os
def search():
    dic=json.load(open('db.txt',))
    print('剩餘票數%s' %dic['count'])

def get_ticket():
    dic=json.load(open('db.txt',))
    if dic['count'] > 0:
        dic['count']-=1
        json.dump(dic,open('db.txt','w'))
        print('%s 購票成功' %os.getpid())
def task(mutex):
    search()
    time.sleep(random.randint(1, 3)) #模擬購票一系列繁瑣的過程所花費的時間
    mutex.acquire()
    get_ticket()
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    for i in range(50):
        p=Process(target=task,args=(mutex,))
        p.start()
模擬搶票

5、process對象的其餘屬性補充ui

from multiprocessing import Process
import os
import time
def work():
    print('%s is working' %os.getpid())
    time.sleep(3)
if __name__ == '__main__':
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start() #初始化1
    p2.start() #初始化2
    p3.start() #初始化3

    p3.join()
    p1.join()
    p2.join()
    print('基於初始化的結果來繼續運行')
process屬性補充1
from multiprocessing import Process
import os
import time
def work():
    print('%s is working' %os.getpid())
    time.sleep(3)
if __name__ == '__main__':
    p1=Process(target=work)
    # p2=Process(target=work)
    # p3=Process(target=work)
    p1.start() #初始化1
    # p2.start() #初始化2
    # p3.start() #初始化3

    # p1.terminate()
    # time.sleep(3)
    # print(p1.is_alive())
    print(p1.name)
    print(p1.pid)
    print('基於初始化的結果來繼續運行')
process屬性補充2
相關文章
相關標籤/搜索