Python多進程與多線程編程及GIL詳解

介紹如何使用python的multiprocess和threading模塊進行多線程和多進程編程。html

Python的多進程編程與multiprocess模塊python

python的多進程編程主要依靠multiprocess模塊。咱們先對比兩段代碼,看看多進程編程的優點。咱們模擬了一個很是耗時的任務,計算8的20次方,爲了使這個任務顯得更耗時,咱們還讓它sleep 2秒。第一段代碼是單進程計算(代碼以下所示),咱們按順序執行代碼,重複計算2次,並打印出總共耗時。linux

import time
import os
def long_time_task():
    print('當前進程: {}'.format(os.getpid()))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))

if __name__ == "__main__":
    print('當前母進程: {}'.format(os.getpid()))
    start = time.time()
    for i in range(2):
        long_time_task()

    end = time.time()
    print("用時{}秒".format((end-start)))

輸出結果以下,總共耗時4秒,至始至終只有一個進程14236。看來電腦計算8的20次方基本不費時。git

當前母進程: 14236
當前進程: 14236
結果: 1152921504606846976
當前進程: 14236
結果: 1152921504606846976
用時4.01080060005188秒

第2段代碼是多進程計算代碼。咱們利用multiprocess模塊的Process方法建立了兩個新的進程p1和p2來進行並行計算。Process方法接收兩個參數, 第一個是target,通常指向函數名,第二個時args,須要向函數傳遞的參數。對於建立的新進程,調用start()方法便可讓其開始。咱們可使用os.getpid()打印出當前進程的名字。程序員

from multiprocessing import Process
import os
import time
def long_time_task(i):
    print('子進程: {} - 任務{}'.format(os.getpid(), i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    print('當前母進程: {}'.format(os.getpid()))
    start = time.time()
    p1 = Process(target=long_time_task, args=(1,))
    p2 = Process(target=long_time_task, args=(2,))
    print('等待全部子進程完成。')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

輸出結果以下所示,耗時變爲2秒,時間減了一半,可見併發執行的時間明顯比順序執行要快不少。你還能夠看到儘管咱們只建立了兩個進程,可實際運行中卻包含裏1個母進程和2個子進程。之因此咱們使用join()方法就是爲了讓母進程阻塞,等待子進程都完成後纔打印出總共耗時,不然輸出時間只是母進程執行的時間。github

當前母進程: 6920
等待全部子進程完成。
子進程: 17020 - 任務1
子進程: 5904 - 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.131091356277466秒

知識點:算法

  • 新建立的進程與進程的切換都是要耗資源的,因此平時工做中進程數不能開太大。編程

  • 同時能夠運行的進程數通常受制於CPU的核數。緩存

  • 除了使用Process方法,咱們還可使用Pool類建立多進程。安全

 

利用multiprocess模塊的Pool類建立多進程

不少時候系統都須要建立多個進程以提升CPU的利用率,當數量較少時,能夠手動生成一個個Process實例。當進程數量不少時,或許能夠利用循環,可是這須要程序員手動管理系統中併發進程的數量,有時會很麻煩。這時進程池Pool就能夠發揮其功效了。能夠經過傳遞參數限制併發進程的數量,默認值爲CPU的核數。 

Pool類能夠提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,若是進程池尚未滿,就會建立一個新的進程來執行請求。若是池滿,請求就會告知先等待,直到池中有進程結束,纔會建立新的進程來執行這些請求。 

下面介紹一下multiprocessing 模塊下的Pool類的幾個方法:

1.apply_async

函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

其做用是向進程池提交須要執行的函數及參數, 各個進程採用非阻塞(異步)的調用方式,即每一個子進程只管運行本身的,無論其它進程是否已經完成。

2.map()

函數原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內置的map函數用法行爲基本一致,它會使進程阻塞直到結果返回。 注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程序纔會運行子進程。

3.map_async()

函數原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,可是它是非阻塞的。其有關事項見apply_async。

4.close()

關閉進程池(pool),使其不在接受新的任務。

5. terminate()

結束工做進程,不在處理未處理的任務。

6.join()

主進程阻塞等待子進程的退出, join方法要在close或terminate以後使用。

 

下例是一個簡單的multiprocessing.Pool類的實例。由於小編個人CPU是4核的,一次最多能夠同時運行4個進程,因此我開啓了一個容量爲4的進程池。4個進程須要計算5次,你能夠想象4個進程並行4次計算任務後,還剩一次計算任務(任務4)沒有完成,系統會等待4個進程完成後從新安排一個進程來計算。

from multiprocessing import Pool, cpu_count
import os
import time
def long_time_task(i):
    print('子進程: {} - 任務{}'.format(os.getpid(), i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    print("CPU內核數:{}".format(cpu_count()))
    print('當前母進程: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待全部子進程完成。')
    p.close()
    p.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

知識點:  

  • 對Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close()或terminate()方法,讓其再也不接受新的Process了。

 輸出結果以下所示,5個任務(每一個任務大約耗時2秒)使用多進程並行計算只需4.37秒,, 耗時減小了60%。

CPU內核數:4
當前母進程: 2556
等待全部子進程完成。
子進程: 16480 - 任務0
子進程: 15216 - 任務1
子進程: 15764 - 任務2
子進程: 10176 - 任務3
結果: 1152921504606846976
結果: 1152921504606846976
子進程: 15216 - 任務4
結果: 1152921504606846976
結果: 1152921504606846976
結果: 1152921504606846976
總共用時4.377134561538696秒

 相信你們都知道python解釋器中存在GIL(全局解釋器鎖), 它的做用就是保證同一時刻只有一個線程能夠執行代碼。因爲GIL的存在,不少人認爲python中的多線程其實並非真正的多線程,若是想要充分地使用多核CPU的資源,在python中大部分狀況須要使用多進程。然而這並意味着python多線程編程沒有意義哦,請繼續閱讀下文。

  多進程間的數據共享與通訊

 一般,進程之間是相互獨立的,每一個進程都有獨立的內存。經過共享內存(nmap模塊),進程之間能夠共享對象,使多個進程能夠訪問同一個變量(地址相同,變量名可能不一樣)。多進程共享資源必然會致使進程間相互競爭,因此應該盡最大可能防止使用共享狀態。還有一種方式就是使用隊列queue來實現不一樣進程間的通訊或數據共享,這一點和多線程編程相似。

from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
    print('Process to read:{}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
if __name__=='__main__':
    # 父進程建立Queue,並傳給各個子進程:
   q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啓動子進程pw,寫入:
    pw.start()
    # 啓動子進程pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr進程裏是死循環,沒法等待其結束,只能強行終止:
    pr.terminate()

下例這段代碼中中建立了2個獨立進程,一個負責寫(pw), 一個負責讀(pr), 實現了共享一個隊列queue。

輸出結果以下所示:

Process to write: 3036
Put A to queue...
Process to read:9408
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

 Python的多線程編程與threading模塊

 python 3中的多進程編程主要依靠threading模塊。建立新線程與建立新進程的方法很是相似。threading.Thread方法能夠接收兩個參數, 第一個是target,通常指向函數名,第二個時args,須要向函數傳遞的參數。對於建立的新線程,調用start()方法便可讓其開始。咱們還可使用current_thread().name打印出當前線程的名字。 下例中咱們使用多線程技術重構以前的計算代碼。

import threading
import time
def long_time_task(i):
    print('當前子線程: {} - 任務{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))
    t1 = threading.Thread(target=long_time_task, args=(1,))
    t2 = threading.Thread(target=long_time_task, args=(2,))
    t1.start()
    t2.start()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

下面是輸出結果。爲何總耗時竟然是0秒? 咱們能夠明顯看到主線程和子線程實際上是獨立運行的,主線程根本沒有等子線程完成,而是本身結束後就打印了消耗時間。主線程結束後,子線程仍在獨立運行,這顯然不是咱們想要的。

這是主線程:MainThread
當前子線程: Thread-1 - 任務1
當前子線程: Thread-2 - 任務2
總共用時0.0017192363739013672秒
結果: 1152921504606846976
結果: 1152921504606846976

若是要實現主線程和子線程的同步,咱們必需使用join方法(代碼以下所示)。

import threading
import time
def long_time_task(i):
    print('當前子線程: {} 任務{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))
    thread_list = []
    for i in range(1, 3):
        t = threading.Thread(target=long_time_task, args=(i, ))
        thread_list.append(t)
    for t in thread_list:
        t.start()
    for t in thread_list:
        t.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

修改代碼後的輸出以下所示。這時你能夠看到主線程在等子線程完成後才答應出總消耗時間(2秒),比正常順序執行代碼(4秒)仍是節省了很多時間。

這是主線程:MainThread
當前子線程: Thread - 1 任務1
當前子線程: Thread - 2 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.0166890621185303秒

當咱們設置多線程時,主線程會建立多個子線程,在python中,默認狀況下主線程和子線程獨立運行互不干涉。若是但願讓主線程等待子線程實現線程的同步,咱們須要使用join()方法。若是咱們但願一個主線程結束時再也不執行子線程,咱們應該怎麼辦呢? 咱們可使用t.setDaemon(True),代碼以下所示。

import threading
import time
def long_time_task():
    print('當子線程: {}'.format(threading.current_thread().name))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))
    for i in range(5):
        t = threading.Thread(target=long_time_task, args=())
        t.setDaemon(True)
        t.start()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

經過繼承Thread類重寫run方法建立新線程

 除了使用Thread()方法建立新的線程外,咱們還能夠經過繼承Thread類重寫run方法建立新的線程,這種方法更靈活。下例中咱們自定義的類爲MyThread, 隨後咱們經過該類的實例化建立了2個子線程。

#-*- encoding:utf-8 -*-
import threading
import time
def long_time_task(i):
    time.sleep(2)
    return 8**20
class MyThread(threading.Thread):
    def __init__(self, func, args , name='', ):
        threading.Thread.__init__(self)
        self.func = func
        self.args = args
        self.name = name
        self.result = None
    def run(self):
        print('開始子進程{}'.format(self.name))
        self.result = self.func(self.args[0],)
        print("結果: {}".format(self.result))
        print('結束子進程{}'.format(self.name))
if __name__=='__main__':
    start = time.time()
    threads = []
    for i in range(1, 3):
        t = MyThread(long_time_task, (i,), str(i))
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

輸出結果以下所示:

開始子進程1
開始子進程2
結果: 1152921504606846976
結果: 1152921504606846976
結束子進程1
結束子進程2
總共用時2.005445718765259秒

 不一樣線程間的數據共享

一個進程所含的不一樣線程間共享內存,這就意味着任何一個變量均可以被任何一個線程修改,所以線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂了。若是不一樣線程間有共享的變量,其中一個方法就是在修改前給其上一把鎖lock,確保一次只有一個線程能修改它。threading.lock()方法能夠輕易實現對一個共享變量的鎖定,修改完後release供其它線程使用。好比下例中帳戶餘額balance是一個共享變量,使用lock可使其不被改亂。

# -*- coding: utf-8 -*
import threading
class Account:
    def __init__(self):
        self.balance = 0
    def add(self, lock):
        # 得到鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance += 1
        # 釋放鎖
        lock.release()
    def delete(self, lock):
        # 得到鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance -= 1
            # 釋放鎖
        lock.release()
if __name__ == "__main__":
    account = Account()
    lock = threading.Lock()
    # 建立線程
   thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
    thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
    # 啓動線程
   thread_add.start()
    thread_delete.start()
    # 等待線程結束
   thread_add.join()
    thread_delete.join()
    print('The final balance is: {}'.format(account.balance))

 

另外一種實現不一樣線程間數據共享的方法就是使用消息隊列queue。不像列表,queue是線程安全的,能夠放心使用,見下文。

 使用queue隊列通訊-經典的生產者和消費者模型

下例中建立了兩個線程,一個負責生成,一個負責消費,所生成的產品存放在queue裏,實現了不一樣線程間溝通。

from queue import Queue
import random, threading, time
# 生產者類
class Producer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
    def run(self):
        for i in range(1, 5):
            print("{} is producing {} to the queue!".format(self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randrange(10) / 5)
        print("%s finished!" % self.getName())
# 消費者類
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue

    def run(self):
        for i in range(1, 5):
            val = self.queue.get()
            print("{} is consuming {} in the queue.".format(self.getName(), val))
            time.sleep(random.randrange(10))
        print("%s finished!" % self.getName())
def main():
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    print('All threads finished!')
if __name__ == '__main__':
    main()

隊列queue的put方法能夠將一個對象obj放入隊列中。若是隊列已滿,此方法將阻塞至隊列有空間可用爲止。queue的get方法一次返回隊列中的一個成員。若是隊列爲空,此方法將阻塞至隊列中有成員可用爲止。queue同時還自帶emtpy(), full()等方法來判斷一個隊列是否爲空或已滿,可是這些方法並不可靠,由於多線程和多進程,在返回結果和使用結果之間,隊列中可能添加/刪除了成員。

 Python多進程和多線程哪一個快?

 因爲GIL的存在,不少人認爲Python多進程編程更快,針對多核CPU,理論上來講也是採用多進程更能有效利用資源。網上不少人已作過比較,我直接告訴你結論吧。

  • 對CPU密集型代碼(好比循環計算) - 多進程效率更高

  • 對IO密集型代碼(好比文件操做,網絡爬蟲) - 多線程效率更高。 

爲何是這樣呢?其實也不難理解。對於IO密集型操做,大部分消耗時間實際上是等待時間,在等待時間中CPU是不須要工做的,那你在此期間提供雙CPU資源也是利用不上的,相反對於CPU密集型代碼,2個CPU幹活確定比一個CPU快不少。那麼爲何多線程會對IO密集型代碼有用呢?這因是爲python碰到等待會釋放GIL供新的線程使用,實現了線程間的切換。

 

GIL是什麼

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。

GIL: 一個防止多線程併發執行機器碼的一個Mutex,乍一看就是個BUG般存在的全局鎖嘛!別急,咱們下面慢慢的分析。

爲何會有GIL

因爲物理上得限制,各CPU廠商在覈心頻率上的比賽已經被多核所取代。爲了更有效的利用多核處理器的性能,就出現了多線程的編程方式,而隨之帶來的就是線程間數據一致性和狀態同步的困難。即便在CPU內部的Cache也不例外,爲了有效解決多份緩存之間的數據同步時各廠商花費了很多心思,也不可避免的帶來了必定的性能損失。

Python固然也逃不開,爲了利用多核,Python開始支持多線程。而解決多線程之間數據完整性和狀態同步的最簡單方法天然就是加鎖。 因而有了GIL這把超級大鎖,而當愈來愈多的代碼庫開發者接受了這種設定後,他們開始大量依賴這種特性(即默認python內部對象是thread-safe的,無需在實現時考慮額外的內存鎖和同步操做)。

慢慢的這種實現方式被發現是蛋疼且低效的。但當你們試圖去拆分和去除GIL的時候,發現大量庫代碼開發者已經重度依賴GIL而很是難以去除了。有多難?作個類比,像MySQL這樣的「小項目」爲了把Buffer Pool Mutex這把大鎖拆分紅各個小鎖也花了從5.5到5.6再到5.7多個大版爲期近5年的時間,本且仍在繼續。MySQL這個背後有公司支持且有固定開發團隊的產品走的如此艱難,那又更況且Python這樣核心開發和代碼貢獻者高度社區化的團隊呢?

因此簡單的說GIL的存在更多的是歷史緣由。若是推到重來,多線程的問題依然仍是要面對,可是至少會比目前GIL這種方式會更優雅。

 GIL的影響

從上文的介紹和官方的定義來看,GIL無疑就是一把全局排他鎖。毫無疑問全局鎖的存在會對多線程的效率有不小影響。甚至就幾乎等於Python是個單線程的程序。
那麼讀者就會說了,全局鎖只要釋放的勤快效率也不會差啊。只要在進行耗時的IO操做的時候,能釋放GIL,這樣也仍是能夠提高運行效率的嘛。或者說再差也不會比單線程的效率差吧。理論上是這樣,而實際上呢?Python比你想的更糟。

下面咱們就對比下Python在多線程和單線程下得效率對比。測試方法很簡單,一個循環1億次的計數器函數。一個經過單線程執行兩次,一個多線程執行。最後比較執行總時間。測試環境爲雙核的Mac pro。注:爲了減小線程庫自己性能損耗對測試結果帶來的影響,這裏單線程的代碼一樣使用了線程。只是順序的執行兩次,模擬單線程。

順序執行的單線程(single_thread.py)

相關文章
相關標籤/搜索