1.信號量
1.Semaphore
2.內部經過鎖的原理實現,內置計數器
3.同一時間,只能有指定數量的進程執行某一段被控制的代碼
2.事件
1.Event
2.wait阻塞,受到事件狀態控制的同步組件
3.is_set
clear() -->false 阻塞
set() -->true 不阻塞
3.隊列
1.Queue,JoinableQueue
2.put() 隊列滿時,阻塞
get() 隊列空時,阻塞
full() empty() 不許確
3.task_done() --get()
join() --put()
1.管道
隊列又是基於(管道+鎖)實現的
2.進程間的數據共享
3.進程池和回調函數
1.爲何會出現進程池?
開啓進程,會開啓屬於這個進程的空間,過多的進程會下降效率
2.進程池
1.python中,啓動時建立一個屬於進程的池子
這個池子指定能存放多少個進程
1.隊列python
from multiprocessing import Pipe,Process def func(conn1,conn2): conn2.close() while True: try: msg = conn1.recv() print(msg) except: conn1.close() break if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,conn2)) p.start() conn1.close() for i in range(10): conn2.send('hello!!!') conn2.close()
2.生產者消費者,隊列,加鎖app
import time,random from multiprocessing import Process,Pipe,Lock def producer(con,pro,name,food): con.close() for i in range(2): time.sleep(random.randint(1,3)) f = '%s生產了%s%s'%(name,food,i) print(f) pro.send(f) pro.close() def consumer(con,pro,name,lock): pro.close() while True: try: lock.acquire() c = con.recv() lock.release() print('%s消費了%s'%(name,c)) time.sleep(random.randint(0,1)) except EOFError: print('%s我來啦'%name) lock.release() con.close() break if __name__ == '__main__': con, pro = Pipe() lock = Lock() p = Process(target=producer,args=(con,pro,'alex','包子')) p.start() c1 = Process(target=consumer,args=(con,pro,'alex',lock)) c1.start() c2 = Process(target=consumer,args=(con,pro,'小明',lock)) c2.start() con.close() pro.close()
3.數據共享dom
from multiprocessing import Manager,Process,Lock def main(dic,lock): lock.acquire() dic['count'] -= 1 lock.release() if __name__ == '__main__': lock = Lock() m = Manager() dic = m.dict({'count': 30}) p_lst = [] for i in range(20): p = Process(target=main, args=(dic,lock)) p.start() p_lst.append(p) for i in p_lst: i.join() print('main process:',dic)
4.進程池效率async
import time from multiprocessing import Pool,Process def func(n): for i in range(10): print(n) if __name__ == '__main__': start = time.time() pool = Pool(5) pool.map(func,range(100)) t1 = time.time() - start start = time.time() p_lst = [] for u in range(100): p = Process(target=func,args=(u,)) p.start() p_lst.append(p) for i in p_lst: i.join() t2 = time.time() - start print(t1,t2)
5.進程池開啓方式2函數
import os import time from multiprocessing import Pool def func(n): print('start %s'% os.getpid()) time.sleep(1) print('end %s'% os.getpid()) if __name__ == '__main__': p = Pool() for i in range(10): p.apply_async(func,args=(i,)) p.close() p.join()
6.進程池的返回值學習
import time from multiprocessing import Pool def func(i): time.sleep(1) return i*i if __name__ == '__main__': p = Pool() ret_l = [] for i in range(10): ret = p.apply_async(func,args=(i,)) ret_l.append(ret) for i in ret_l: print(i.get()) # map import time from multiprocessing import Pool def func(i): time.sleep(1) return i*i if __name__ == '__main__': p = Pool() ret_l = [] ret = p.map(func,range(10)) # map自帶join和close,一次性把全部結果返回 print(map)
7.回調函數ui
import os from multiprocessing import Pool def func1(i): print('func1: ', os.getpid()) func2(i) return i*i def func2(i): print('func2: ', os.getpid()) def func3(i): print('func3: ', os.getpid()) print(i) if __name__ == '__main__': print('主進程:',os.getpid()) p = Pool() p.apply_async(func1,args=(10,),callback=func3) p.close() p.join()