PythonI/O進階學習筆記_11.python的多進程

content:
1. 爲何要多進程編程?和多線程有什麼區別?
2. python 多進程編程
3. 進程間通訊
=======================================
 
一. 爲何要多進程編程?和多線程有什麼區別?
因爲GIL的存在,因此對於某一些多線程任務來講,沒法利用多核的優點,對這些耗cpu的任務,用多進程反而能利用多cpu。
因此多cpu的操做用多進程編程。
對io操做較多的任務來講,瓶頸不在於cpu,更多的在於io的切換中的消耗和時間等待。用多線程反而能在io掛起的時候,進行線程切換。
雖然io操做多的時候,也能夠用多進程編程,可是由於進程的切換系統的代價是十分大的,因此能使用多線程的狀況下,儘可能用多線程。
 
因此,對於耗費cpu的操做,好比計算、挖礦等,多進程優於多線程。
例:同計算一組斐波拉契數列的時間比較(耗cpu的操做)
#多線程
from concurrent.futures import  ThreadPoolExecutor,as_completed
from concurrent.futures import  ProcessPoolExecutor
import time
def fib(n):
    if n <= 2:
        return 1
    return fib(n-1)+fib(n-2)
 
with ThreadPoolExecutor(3) as excutor:
    all_task=[excutor.submit(fib,(num)) for num in range(25,35)]
    start_time=time.time()
    for future in as_completed(all_task):
        data=future.result()
        print("result:{}".format(data))
    end_time=time.time()
    print("last time : {}".format(end_time-start_time))
 
#output:
result:75025
result:121393
result:196418
result:317811
result:514229
result:832040
result:1346269
result:2178309
result:3524578
result:5702887
last time : 98.66604399681091

 

#多進程
from concurrent.futures import  ThreadPoolExecutor,as_completed
from concurrent.futures import  ProcessPoolExecutor
import time
def fib(n):
    if n <= 2:
        return 1
    return fib(n-1)+fib(n-2)
if __name__ == "__main__":
    with ProcessPoolExecutor(3) as excutor:
        all_task = [excutor.submit(fib, (num)) for num in range(25, 35)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("result:{}".format(data))
        end_time = time.time()
        print("last time : {}".format(end_time - start_time))
 
#output:
result:75025
result:121393
result:196418
result:317811
result:514229
result:832040
result:1346269
result:2178309
result:3524578
result:5702887
last time : 14.470988988876343

 

進程和線程的區別:
  • 進程是資源分配的最小單位,線程是程序執行的最小單位。
  • 進程有本身的獨立地址空間,每啓動一個進程,系統就會爲它分配地址空間,創建數據表來維護代碼段、堆棧段和數據段,這種操做很是昂貴。而線程是共享進程中的數據的,使用相同的地址空間,所以CPU切換一個線程的花費遠比進程要小不少,同時建立一個線程的開銷也比進程要小不少。
  • 線程之間的通訊更方便,同一進程下的線程共享全局變量、靜態變量等數據,而進程之間的通訊須要以通訊的方式(IPC)進行。不過如何處理好同步與互斥是編寫多線程程序的難點。
  • 可是多進程程序更健壯,多線程程序只要有一個線程死掉,整個進程也死掉了,而一個進程死掉並不會對另一個進程形成影響,由於進程有本身獨立的地址空間。
 
2、python 多進程編程
1.from concurrent.futures import  ProcessPoolExecutor
ProcessPoolExecutor 和上一章 講到的多線程的用法是同樣的。包括其中用到的Futures類。
基本看它的入口函數就明白,這裏再也不贅述。
 
2.更加底層的multiprocessing
其實在ProcessPoolExecutor底層用的其實也是multiprocessing。
在multiprocess裏,有個Progress類。跟Thread用法又是類似的。
#input
from concurrent.futures import  ProcessPoolExecutor
import  multiprocessing
#多進程編程
import  time
def get_html(n):
    time.sleep(n)
    print("sub_progress sccess")
if __name__=="__main__":
    progress = multiprocessing.Process(target=get_html,args=(3,))
    print(progress.pid)
    progress.start()
    print(progress.pid)
    progress.join()
 
#output
None
12864
sub_progress sccess

 

3.繼承Progress類(與以前的Thread類同樣)
import  multiprocessing
#多進程編程
import  time
 
class progress_get_html(multiprocessing.Process):
    def __init__(self,n):
        self.n=n
        super().__init__()
    def run(self):
        time.sleep(self.n)
        print("sub progress success")
class MyProgress(multiprocessing.Process):
    def __init__(self,n):
        self.n=n
        super().__init__()
    def run(self):
        pro=progress_get_html(self.n)
        pro.start()
        print("progress end")
if __name__=="__main__":
    progress = MyProgress(3)
    print(progress.pid)
    progress.start()
    print(progress.pid)
    progress.join()
 
#output:
None
8744
progress end
sub progress success

 

4.使用進程池
指明進程數,不指明的話,能夠直接默認爲cpu數(cpu_count() or 1)。
from concurrent.futures import  ProcessPoolExecutor
from multiprocessing import  pool
import  multiprocessing
#多進程編程
import  time
def get_html(n):
    time.sleep(n)
    print("sub_progress sccess")
    return n
if __name__=="__main__":
    pool=multiprocessing.Pool(multiprocessing.cpu_count())
    result=pool.apply_async(get_html,args=(3,))
    print(result.get())
    #pool在調用join以前 須要調用close 來讓它再也不接收任務。不然會報錯
    pool.close()
    pool.join()
    print(result.get())
 
#output
sub_progress sccess
3
3

 

其餘方法:
- imap:按照參數輸入順序
if __name__=="__main__":
    pool=multiprocessing.Pool(multiprocessing.cpu_count())
    for result in pool.imap(get_html,[1,5,3]):
        print("sleep {} successed ".format(result))
 
#output:
sub_progress sccess
sleep 1 successed
sub_progress sccess
sub_progress sccess
sleep 5 successed
sleep 3 successed
imap_unordered: 按照執行完成順序
if __name__=="__main__":
    pool=multiprocessing.Pool(multiprocessing.cpu_count())
 
    #for result in pool.imap(get_html,[1,5,3]):
    #    print("sleep {} successed ".format(result))
    for result in pool.imap_unordered(get_html,[1,5,3]):
        print("sleep {} successed ".format(result))
 
#output:
sub_progress sccess
sleep 1 successed
sub_progress sccess
sleep 3 successed
sub_progress sccess
sleep 5 successed

 

三. 進程間通訊
與線程間不一樣的是,線程間同步的類和鎖是不可用的。
1.Queue(注意是multiprocessing而不是thread的)
from multiprocessing import Process,Queue
import  time
def producer(queue):
    queue.put("a")
    time.sleep(2)
def consumer(queue):
    time.sleep(2)
    data=queue.get()
    print(data)
if __name__== "__main__":
    queue=Queue(10)
    my_producer = Process(target=producer,args=(queue,))
    my_consumer = Process(target=consumer,args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
 
#outpu:
a
注意:multprocess中的Queue是不能用於pool進程池的
 
2.Manager(與進程池共用)
Manager中有個Queue,若是像實現pool中的進程間通訊,須要使用Manager中的Queue。
from multiprocessing import Process,pool,Manager,Pool
import  time
def producer(queue):
    queue.put("a")
    time.sleep(2)
def consumer(queue):
    time.sleep(2)
    data=queue.get()
    print(data)
if __name__== "__main__":
    queue=Manager().Queue()
    pool=Pool(3)
    pool.apply_async(producer,args=(queue,))
    pool.apply_async(consumer,args=(queue,))
    pool.close()
    pool.join()
 
#output:
a
 
3.管道pipe
pipe只能適用於兩個指定的進程。
pipe的性能高於queue的,queue加了不少的鎖操做。
from multiprocessing import Process,pool,Manager,Pool,Pipe
import  time
def producer(pipe):
    pipe.send("hello")
def consumer(pipe):
    print(pipe.recv())
if __name__== "__main__":
    recv_pipe,send_pipe=Pipe()
    my_producer=Process(target=producer,args=(send_pipe,))
    my_consumer=Process(target=consumer,args=(recv_pipe,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
 
#output:
hello

 

4.進程間共享內存操做 Mnager的dict、list、value等。
from multiprocessing import Process,pool,Manager,Pool,Pipe
import  time
 
def add_data(p_dict,key,value):
    p_dict[key]=value
if __name__ == "__main__":
    progress_dict= Manager().dict()
    first_progress= Process(target=add_data,args=(progress_dict,"name","tangrong"))
    second_progress = Process(target=add_data,args=(progress_dict,"age","18"))
    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()
    print(progress_dict)
 
#output:
{'name': 'tangrong', 'age': '18'}
在使用的時候,能夠用Manager中的數據結構,可是注意數據同步(LOCK,RLOCK等)
相關文章
相關標籤/搜索