python之路——39

複習

1.信號量
1.Semaphore
2.內部經過鎖的原理實現,內置計數器
3.同一時間,只能有指定數量的進程執行某一段被控制的代碼
2.事件
1.Event
2.wait阻塞,受到事件狀態控制的同步組件
3.is_set
clear() -->false 阻塞
set() -->true 不阻塞
3.隊列
1.Queue,JoinableQueue
2.put() 隊列滿時,阻塞
get() 隊列空時,阻塞
full() empty() 不許確
3.task_done() --get()
join() --put()

學習內容

1.管道
隊列又是基於(管道+鎖)實現的
2.進程間的數據共享
3.進程池和回調函數
1.爲何會出現進程池?
開啓進程,會開啓屬於這個進程的空間,過多的進程會下降效率
2.進程池
1.python中,啓動時建立一個屬於進程的池子
這個池子指定能存放多少個進程

代碼區

1.隊列python

from multiprocessing import Pipe,Process
def func(conn1,conn2):
    conn2.close()
    while True:
        try:
            msg = conn1.recv()
            print(msg)
        except:
            conn1.close()
            break
if __name__ == '__main__':
    conn1,conn2 = Pipe()
    p = Process(target=func,args=(conn1,conn2))
    p.start()
    conn1.close()
    for i in range(10):
        conn2.send('hello!!!')
    conn2.close()

2.生產者消費者,隊列,加鎖app

import time,random
from multiprocessing import Process,Pipe,Lock
def producer(con,pro,name,food):
    con.close()
    for i in range(2):
        time.sleep(random.randint(1,3))
        f = '%s生產了%s%s'%(name,food,i)
        print(f)
        pro.send(f)
    pro.close()
def consumer(con,pro,name,lock):
    pro.close()
    while True:
        try:
            lock.acquire()
            c = con.recv()
            lock.release()
            print('%s消費了%s'%(name,c))
            time.sleep(random.randint(0,1))
        except EOFError:
            print('%s我來啦'%name)
            lock.release()
            con.close()
            break
if __name__ == '__main__':
    con, pro = Pipe()
    lock = Lock()
    p = Process(target=producer,args=(con,pro,'alex','包子'))
    p.start()
    c1 = Process(target=consumer,args=(con,pro,'alex',lock))
    c1.start()
    c2 = Process(target=consumer,args=(con,pro,'小明',lock))
    c2.start()
    con.close()
    pro.close()

3.數據共享dom

from multiprocessing import Manager,Process,Lock
def main(dic,lock):
    lock.acquire()
    dic['count'] -= 1
    lock.release()
if __name__ == '__main__':
    lock = Lock()
    m = Manager()
    dic = m.dict({'count': 30})
    p_lst = []
    for i in range(20):
        p = Process(target=main, args=(dic,lock))
        p.start()
        p_lst.append(p)
    for i in p_lst:
        i.join()
    print('main process:',dic)

4.進程池效率async

import time
from multiprocessing import Pool,Process
def func(n):
    for i in range(10):
        print(n)
if __name__ == '__main__':
    start = time.time()
    pool = Pool(5)
    pool.map(func,range(100))
    t1 = time.time() - start
    start = time.time()
    p_lst = []
    for u in range(100):
        p = Process(target=func,args=(u,))
        p.start()
        p_lst.append(p)
    for i in p_lst:
        i.join()
    t2 = time.time() - start
    print(t1,t2)

5.進程池開啓方式2函數

import os
import time
from multiprocessing import Pool
def func(n):
    print('start %s'% os.getpid())
    time.sleep(1)
    print('end %s'% os.getpid())
if __name__ == '__main__':
    p = Pool()
    for i in range(10):
        p.apply_async(func,args=(i,))
    p.close()
    p.join()

6.進程池的返回值學習

import time
from multiprocessing import Pool
def func(i):
    time.sleep(1)
    return i*i
if __name__ == '__main__':
    p = Pool()
    ret_l = []
    for i in range(10):
        ret = p.apply_async(func,args=(i,))
        ret_l.append(ret)
    for i in ret_l:
        print(i.get())

#   map
import time
from multiprocessing import Pool
def func(i):
    time.sleep(1)
    return i*i
if __name__ == '__main__':
    p = Pool()
    ret_l = []
    ret = p.map(func,range(10)) # map自帶join和close,一次性把全部結果返回
    print(map)

7.回調函數ui

import os
from multiprocessing import Pool
def func1(i):
    print('func1: ', os.getpid())
    func2(i)
    return i*i
def func2(i):
    print('func2: ', os.getpid())
def func3(i):
    print('func3: ', os.getpid())
    print(i)
if __name__ == '__main__':
    print('主進程:',os.getpid())
    p = Pool()
    p.apply_async(func1,args=(10,),callback=func3)
    p.close()
    p.join()
相關文章
相關標籤/搜索