(數據科學學習手札70)面向數據科學的Python多進程簡介及應用

本文對應腳本已上傳至個人Github倉庫https://github.com/CNFeffery/DataScienceStudyNotespython

1、簡介

  進程是計算機系統中資源分配的最小單位,也是操做系統能夠控制的最小單位,在數據科學中不少涉及大量計算、CPU密集型的任務均可以經過多進程並行運算的方式大幅度提高運算效率從而節省時間開銷,而在Python中實現多進程有多種方式,本文就將針對其中較爲易用的幾種方式進行介紹。

git

2、利用multiprocessing實現多進程

  multiprocessingPython自帶的用於管理進程的模塊,經過合理地利用multiprocessing,咱們能夠充分榨乾所使用機器的CPU運算性能,在multiprocessing中實現多進程也有幾種方式。github

2.1 Process

  Processmultiprocessing中最基礎的類,用於建立進程,先來看看下面的示例:算法

single_process.py多線程

import multiprocessing
import datetime
import numpy as np
import os

def job():

    print(f'進程{os.getpid()}開始計算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    for j in range(100):
        _ = np.sum(np.random.rand(10000000))
    print(f'進程{os.getpid()}結束運算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

if __name__ == '__main__':

    process = multiprocessing.Process(target=job)
    process.start()
圖1 single_process.py運行結果

  在上面的例子中,咱們首先定義了函數job(),其連續執行一項運算任務100次,並在開始和結束的時刻打印該進程對應的pid,用來惟一識別一個獨立的進程,接着利用Process()將一個進程實例化,其主要參數以下:app

target: 須要執行的運算函數
args: target函數對應的傳入參數,元組形式傳入dom

  在process建立完成以後,咱們對其調用.start()方法執行運算,這樣咱們就實現了單個進程的建立與使用,在此基礎上,咱們將上述例子多線程化:函數

multi_processes.py性能

import multiprocessing
import datetime
import numpy as np
import os

def job():

    print(f'進程{os.getpid()}開始計算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    for j in range(100):
        _ = np.sum(np.random.rand(10000000))
    print(f'進程{os.getpid()}結束運算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

if __name__ == '__main__':

    process_list = []

    for i in range(multiprocessing.cpu_count() - 1):
        process = multiprocessing.Process(target=job)
        process_list.append(process)

    for process in process_list:
        process.start()

    for process in process_list:
        process.join()
圖2 multi_processes.py運行結果

  在上面的例子中,咱們首先初始化用於存放多個線程的列表process_list,接着用循環的方式建立了CPU核心數-1個進程並添加到process_list中,再接着用循環的方式將全部進程逐個激活,最後使用到.join()方法,這個方法用於控制進程之間的並行,以下例:操作系統

join_demo.py

import multiprocessing
import os
import datetime
import time

def job():

    print(f'進程{os.getpid()}開始:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    time.sleep(5)
    print(f'進程{os.getpid()}結束:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


if __name__ == '__main__':

    process1 = multiprocessing.Process(target=job)
    process2 = multiprocessing.Process(target=job)

    process1.start()
    process1.join()

    process2.start()
    process2.join()
    print('='*200)
    process3 = multiprocessing.Process(target=job)
    process4 = multiprocessing.Process(target=job)

    process3.start()
    process4.start()

    process3.join()
    process4.join()
圖2 multi_processes.py運行結果

  觀察對應進程執行的開始結束時間信息能夠發現,一個進程對象在.start()以後,若在其餘的進程對象.start()以前調用.join()方法,則必須等到先前的進程對象運行結束纔會接着執行.join()以後的非.join()的內容,即前面的進程阻塞了後續的進程,這種狀況下並不能實現並行的多進程,要想實現真正的並行,須要現行對多個進程執行.start(),接着再對這些進程對象執行.join(),才能使得各個進程之間相互獨立,瞭解了這些咱們就能夠利用Process來實現多進程運算;

2.2 Pool

  除了上述的Process,在multiprocessing中還能夠使用Pool來快捷地實現多進程,先來看下面的例子:

Pool_demo.py

from multiprocessing import Pool
import numpy as np
from pprint import pprint

def job(n):
    return np.mean(np.random.rand(n)), np.std(np.random.rand(n))

if __name__ == '__main__':
    with Pool(5) as p:
        pprint(p.map(job, [i**10 for i in range(1, 6)]))
圖3 Pool_demo.py運行結果

  在上面的例子中,咱們使用Pool這個類,將自編函數job利用.map()方法做用到後面傳入序列每個位置上,與Python自帶的map()函數類似,不一樣的是map()函數將傳入的函數以串行的方式做用到傳入的序列每個元素之上,而Pool()中的.map()方法則根據前面傳入的並行數量5,以多進程並行的方式執行,大大提高了運算效率。

3、利用joblib實現多進程

  與multiprocessing須要將執行運算的語句放置於含有if name == 'main':的腳本文件中下不一樣,joblib將多進程的實現方式大大簡化,使得咱們能夠在IPython交互式環境下中靈活地使用它,先看下面這個例子:

from joblib import Parallel, delayed
import numpy as np
import time
import datetime

def job(i):
    start = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    time.sleep(5)
    end = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
    return start, end

result = Parallel(n_jobs=5, verbose=1)(delayed(job)(j) for j in range(5))
result
圖4 joblib並行示例

  在上面的例子中,咱們從joblib中導入Paralleldelayed,僅用Parallel(n_jobs=5, verbose=1)(delayed(job)(j) for j in range(5))一句就實現了並行運算的功能,其中n_jobs控制並行進程的數量,verbose參數控制是否打印進程運算過程,若是你熟悉scikit-learn,相信這兩個參數你必定不會陌生,由於scikit-learnRandomForestClassifier等能夠並行運算的算法都是經過joblib來實現的。

  以上就是本文的所有內容,若有筆誤望指出!

相關文章
相關標籤/搜索