問:線程學完了,如今咱們開始學習進程了吧?html
答:是的。前面說到線程就是咱們的手,咱們如今能夠學習一下咱們的「胳膊」了。linux
咱們有了多線程,爲何還要學習多進程呢?這是由於在Python當中有一把GIL鎖的存在,好比某些耗CPU的運算的時候,咱們能夠運行多進程多個CPU併發的操做進行操做。對於IO操做來講,咱們的瓶頸不在於咱們的CPU所以咱們用多線程操做。進程切換操做不是輕量級的。編程
咱們首先舉例一個數據密集型的操做,來計算斐波那契數列:多線程
from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor import time def fib(n): if n<=2: return 1 return fib(n-1) + fib(n-2) if __name__ == '__main__': with ThreadPoolExecutor(3) as executor: all_task = [executor.submit(fib,(num)) for num in range(25,40)] start_time = time.time() for future in as_completed(all_task): data = future.result() print("get result:= {}".format(data)) print("multithread last time is {}".format(time.time()-start_time)) with ProcessPoolExecutor(3) as executor: all_task = [executor.submit(fib,(num)) for num in range(25,40)] start_time = time.time() for future in as_completed(all_task): data = future.result() print("get result:= {}".format(data)) print("multiprocess last time is {}".format(time.time()-start_time)) # # multithread last time is 43.156678199768066 # multiprocess last time is 27.62783455848694
咱們明顯看到多進程比多線程快。併發
咱們在以一個IO操做來進行對比:app
from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor import time def random_sleep(n): time.sleep(n) return n if __name__ == '__main__': with ThreadPoolExecutor(3) as executor: all_task = [executor.submit(random_sleep,(num)) for num in [2]*30] start_time = time.time() for future in as_completed(all_task): data = future.result() print("get result:= {}".format(data)) print("multithread last time is {}".format(time.time()-start_time)) with ProcessPoolExecutor(3) as executor: all_task = [executor.submit(random_sleep,(num)) for num in [2]*30] start_time = time.time() for future in as_completed(all_task): data = future.result() print("get result:= {}".format(data)) print("multiprocess last time is {}".format(time.time()-start_time)) # # multithread last time is 20.035860300064087 # multiprocess last time is 20.641016483306885
正式進入咱們的進程操做:dom
import os import time # fork只能用於linux下面 pid = os.fork() print("bobby") if pid == 0: print("子進程{},父進程是{}".format(os.getpid(),os.getppid())) else: print("我是父進程:{}".format(pid)) time.sleep(2)
這段代碼只能在Linux下運行。咱們發現的問題是若是主進程結束了,子進程仍是會運行的。async
問:進程如何進行編程?性能
答:咱們懂了線程的編程,進程的編程會變得很是的簡單。多餘的內容就再也不講解,咱們講解一些不一樣的包,其實這些包的應用也是跟進程差很少的。學習
multiprocessing
import multiprocessing import time def get_html(n): time.sleep(n) return n if __name__ == '__main__': progress = multiprocessing.Process(target=get_html,args=(2,)) progress.start() progress.join()
咱們還能夠直接獲取進程的pid和ppid。
其餘和咱們多線程差不都就不詳解了。
使用進程池:
進程池:Pool和ProcessPoolExecutor。後那個跟線程同樣。咱們單獨說一下Pool這個進程池。
import multiprocessing import time from multiprocessing import Pool def get_html(n): time.sleep(n) return n if __name__ == '__main__': progress = multiprocessing.Process(target=get_html,args=(1,)) progress.start() progress.join() pool = Pool(multiprocessing.cpu_count()) print(multiprocessing.cpu_count()) result = pool.apply_async(get_html,args=(3,)) pool.close()
注意最後要關閉線程池。詳細的關於線程池的代碼能夠參照這裏:https://www.cnblogs.com/noah0532/p/10938771.html
特別要說明的是有兩個方法:imap 和 imap_unordered(這個是誰先完成先打印誰)
for result in pool.imap(get_html,[1,5,3]):
進程間的通訊:
進程間的通訊和線程間的通訊有同樣的也有不同的地方,好比鎖就不能使用了。
舉一個簡單的例子:用隊列進行通訊
from multiprocessing import Process,Queue # from queue import Queue # 這個queue就不能用了 import time def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == '__main__': queue = Queue(10) my_producer = Process(target=producer,args=(queue,)) my_consumer = Process(target=consumer, args=(queue,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()
在多進程的編程中不能用以前的queue了,帶用multiprocessing裏面的queue,這一帶你要注意
咱們再舉一個共享變量的例子:
from multiprocessing import Process import time def producer(a): a += 1 time.sleep(2) def consumer(a): time.sleep(2) print(a) if __name__ == '__main__': a = 1 my_producer = Process(target=producer,args=(a,)) my_consumer = Process(target=consumer, args=(a,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()
咱們發現咱們的全局變量不能用了,正如咱們前面說的,咱們再進程中每一塊的變量是單獨的,不能共享的。
另外multiprocessing中的queue也不能用在進程池當中。若是咱們想在進程當中應用就帶用Manager當中的Queue
from multiprocessing import Process,Queue,Manager,Pool import time def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == '__main__': queue = Manager().Queue(10) pool = Pool(2) pool.apply_async(producer,args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close() pool.join()
另外,咱們還能夠經過咱們的pipe管道來進行通信,可是Pipe只能使用兩個進程間的通訊,若是是兩個交換pipe的性能比queue高
from multiprocessing import Process,Queue,Manager,Pool,Pipe import time def producer(pipe): pipe.send("bobby") def consumer(pipe): print(pipe.recv()) if __name__ == '__main__': # pipe只能用於兩個進程間的通信 receive_pipe,send_pipe = Pipe() my_producer = Process(target=producer,args=(send_pipe,)) my_consumer = Process(target=consumer, args=(receive_pipe,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()
重點:進程間的共享內存操做:Manager().dict(),array()....經常使用的數據類型都有。
from multiprocessing import Process,Queue,Manager,Pool,Pipe def add_data(p_dict,key,value): p_dict[key] = value if __name__ == '__main__': progress_dict = Manager().dict() first_progess = Process(target=add_data,args=(progress_dict,"bobby1",22)) second_progess = Process(target=add_data, args=(progress_dict, "bobby1", 23)) first_progess.start() second_progess.start() first_progess.join() second_progess.join() print(progress_dict) # {'bobby1': 23}