本文對應腳本已上傳至個人
Github
倉庫https://github.com/CNFeffery/DataScienceStudyNotespython
進程是計算機系統中資源分配的最小單位,也是操做系統能夠控制的最小單位,在數據科學中不少涉及大量計算、CPU密集型的任務均可以經過多進程並行運算的方式大幅度提高運算效率從而節省時間開銷,而在Python
中實現多進程有多種方式,本文就將針對其中較爲易用的幾種方式進行介紹。
git
multiprocessing
是Python
自帶的用於管理進程的模塊,經過合理地利用multiprocessing
,咱們能夠充分榨乾所使用機器的CPU運算性能,在multiprocessing
中實現多進程也有幾種方式。github
Process
是multiprocessing
中最基礎的類,用於建立進程,先來看看下面的示例:算法
single_process.py多線程
import multiprocessing import datetime import numpy as np import os def job(): print(f'進程{os.getpid()}開始計算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) for j in range(100): _ = np.sum(np.random.rand(10000000)) print(f'進程{os.getpid()}結束運算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) if __name__ == '__main__': process = multiprocessing.Process(target=job) process.start()
在上面的例子中,咱們首先定義了函數job()
,其連續執行一項運算任務100次,並在開始和結束的時刻打印該進程對應的pid,用來惟一識別一個獨立的進程,接着利用Process()將一個進程實例化,其主要參數以下:app
target: 須要執行的運算函數
args: target函數對應的傳入參數,元組形式傳入dom
在process建立完成以後,咱們對其調用.start()
方法執行運算,這樣咱們就實現了單個進程的建立與使用,在此基礎上,咱們將上述例子多線程化:函數
multi_processes.py性能
import multiprocessing import datetime import numpy as np import os def job(): print(f'進程{os.getpid()}開始計算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) for j in range(100): _ = np.sum(np.random.rand(10000000)) print(f'進程{os.getpid()}結束運算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) if __name__ == '__main__': process_list = [] for i in range(multiprocessing.cpu_count() - 1): process = multiprocessing.Process(target=job) process_list.append(process) for process in process_list: process.start() for process in process_list: process.join()
在上面的例子中,咱們首先初始化用於存放多個線程的列表process_list
,接着用循環的方式建立了CPU核心數-1個進程並添加到process_list
中,再接着用循環的方式將全部進程逐個激活,最後使用到.join()
方法,這個方法用於控制進程之間的並行,以下例:操作系統
join_demo.py
import multiprocessing import os import datetime import time def job(): print(f'進程{os.getpid()}開始:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) time.sleep(5) print(f'進程{os.getpid()}結束:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) if __name__ == '__main__': process1 = multiprocessing.Process(target=job) process2 = multiprocessing.Process(target=job) process1.start() process1.join() process2.start() process2.join() print('='*200) process3 = multiprocessing.Process(target=job) process4 = multiprocessing.Process(target=job) process3.start() process4.start() process3.join() process4.join()
觀察對應進程執行的開始結束時間信息能夠發現,一個進程對象在.start()
以後,若在其餘的進程對象.start()
以前調用.join()
方法,則必須等到先前的進程對象運行結束纔會接着執行.join()
以後的非.join()
的內容,即前面的進程阻塞了後續的進程,這種狀況下並不能實現並行的多進程,要想實現真正的並行,須要現行對多個進程執行.start()
,接着再對這些進程對象執行.join()
,才能使得各個進程之間相互獨立,瞭解了這些咱們就能夠利用Process
來實現多進程運算;
除了上述的Process
,在multiprocessing
中還能夠使用Pool
來快捷地實現多進程,先來看下面的例子:
Pool_demo.py
from multiprocessing import Pool import numpy as np from pprint import pprint def job(n): return np.mean(np.random.rand(n)), np.std(np.random.rand(n)) if __name__ == '__main__': with Pool(5) as p: pprint(p.map(job, [i**10 for i in range(1, 6)]))
在上面的例子中,咱們使用Pool
這個類,將自編函數job
利用.map()
方法做用到後面傳入序列每個位置上,與Python
自帶的map()
函數類似,不一樣的是map()
函數將傳入的函數以串行的方式做用到傳入的序列每個元素之上,而Pool()
中的.map()
方法則根據前面傳入的並行數量5,以多進程並行的方式執行,大大提高了運算效率。
與multiprocessing
須要將執行運算的語句放置於含有if name == 'main':的腳本文件中下不一樣,joblib
將多進程的實現方式大大簡化,使得咱們能夠在IPython
交互式環境下中靈活地使用它,先看下面這個例子:
from joblib import Parallel, delayed import numpy as np import time import datetime def job(i): start = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') time.sleep(5) end = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') return start, end result = Parallel(n_jobs=5, verbose=1)(delayed(job)(j) for j in range(5)) result
在上面的例子中,咱們從joblib
中導入Parallel
和delayed
,僅用Parallel(n_jobs=5, verbose=1)(delayed(job)(j) for j in range(5))一句就實現了並行運算的功能,其中n_jobs
控制並行進程的數量,verbose
參數控制是否打印進程運算過程,若是你熟悉scikit-learn
,相信這兩個參數你必定不會陌生,由於scikit-learn
中RandomForestClassifier
等能夠並行運算的算法都是經過joblib
來實現的。
以上就是本文的所有內容,若有筆誤望指出!