python線程和進程

總結

1,怎麼建立進程和啓動
2,jion方法的做用
3,subprocess模塊的做用和使用
4,線程的建立和啓動
5,繼承線程類建立
6,臨界資源
7,多線程的通訊,設置一個全局變量
8,如何對臨界資源進行保護,線程鎖
9, 如何加鎖和解鎖
10,io密集和計算密集
11,協程:單線程+異步IOpython

在腳本運行過程當中有一個主線程,若在主線程中建立了子線程,當主線程結束時根據子線程daemon屬性值的不一樣可能會發生下面的兩種狀況之一:git

若是某個子線程的daemon屬性爲False,主線程結束時會檢測該子線程是否結束,若是該子線程還在運行,則主線程會等待它完成後再退出;程序員

若是某個子線程的daemon屬性爲True,主線程運行結束時不對這個子線程進行檢查而直接退出,同時全部daemon值爲True的子線程將隨主線程一塊兒結束,而不管是否運行完成。github

進程和線程

今天咱們使用的計算機早已進入多CPU或多核時代,而咱們使用的操做系統都是支持「多任務」的操做系統,這使得咱們能夠同時運行多個程序,也能夠將一個程序分解爲若干個相對獨立的子任務,讓多個子任務併發的執行,從而縮短程序的執行時間,同時也讓用戶得到更好的體驗。所以在當下無論是用什麼編程語言進行開發,實現讓程序同時執行多個任務也就是常說的「併發編程」,應該是程序員必備技能之一。爲此,咱們須要先討論兩個概念,一個叫進程,一個叫線程。shell

概念

進程就是操做系統中執行的一個程序,操做系統以進程爲單位分配存儲空間,每一個進程都有本身的地址空間、數據棧以及其餘用於跟蹤進程執行的輔助數據,操做系統管理全部進程的執行,爲它們合理的分配資源。進程能夠經過fork或spawn的方式來建立新的進程來執行其餘的任務,不過新的進程也有本身獨立的內存空間,所以必須經過進程間通訊機制(IPC,Inter-Process Communication)來實現數據共享,具體的方式包括管道、信號、套接字、共享內存區等。編程

一個進程還能夠擁有多個併發的執行線索,簡單的說就是擁有多個能夠得到CPU調度的執行單元,這就是所謂的線程。因爲線程在同一個進程下,它們能夠共享相同的上下文,所以相對於進程而言,線程間的信息共享和通訊更加容易。固然在單核CPU系統中,真正的併發是不可能的,由於在某個時刻可以得到CPU的只有惟一的一個線程,多個線程共享了CPU的執行時間。使用多線程實現併發編程爲程序帶來的好處是不言而喻的,最主要的體如今提高程序的性能和改善用戶體驗,今天咱們使用的軟件幾乎都用到了多線程技術,這一點能夠利用系統自帶的進程監控工具(如macOS中的「活動監視器」、Windows中的「任務管理器」)來證明,以下圖所示。服務器

固然多線程也並非沒有壞處,站在其餘進程的角度,多線程的程序對其餘程序並不友好,由於它佔用了更多的CPU執行時間,致使其餘程序沒法得到足夠的CPU執行時間;另外一方面,站在開發者的角度,編寫和調試多線程的程序都對開發者有較高的要求,對於初學者來講更加困難。網絡

Python既支持多進程又支持多線程,所以使用Python實現併發編程主要有3種方式:多進程、多線程、多進程+多線程。數據結構

Python中的多進程

Unix和Linux操做系統上提供了fork()系統調用來建立進程,調用fork()函數的是父進程,建立出的是子進程,子進程是父進程的一個拷貝,可是子進程擁有本身的PID。fork()函數很是特殊它會返回兩次,父進程中能夠經過fork()函數的返回值獲得子進程的PID,而子進程中的返回值永遠都是0。Python的os模塊提供了fork()函數。因爲Windows系統沒有fork()調用,所以要實現跨平臺的多進程編程,可使用multiprocessing模塊的Process類來建立子進程,並且該模塊還提供了更高級的封裝,例如批量啓動進程的進程池(Pool)、用於進程間通訊的隊列(Queue)和管道(Pipe)等。多線程

下面用一個下載文件的例子來講明使用多進程和不使用多進程到底有什麼差異,先看看下面的代碼。

from random import randint
from time import time, sleep


def download_task(filename):
    print('開始下載%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))


def main():
    start = time()
    download_task('Python從入門到住院.pdf')
    download_task('Peking Hot.avi')
    end = time()
    print('總共耗費了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

下面是運行程序獲得的一次運行結果。

開始下載Python從入門到住院.pdf...
Python從入門到住院.pdf下載完成! 耗費了6秒
開始下載Peking Hot.avi...
Peking Hot.avi下載完成! 耗費了7秒
總共耗費了13.01秒.

從上面的例子能夠看出,若是程序中的代碼只能按順序一點點的往下執行,那麼即便執行兩個絕不相關的下載任務,也須要先等待一個文件下載完成後才能開始下一個下載任務,很顯然這並不合理也沒有效率。接下來咱們使用多進程的方式將兩個下載任務放到不一樣的進程中,代碼以下所示。

from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep


def download_task(filename):
    print('啓動下載進程,進程號[%d].' % getpid())
    print('開始下載%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target=download_task, args=('Python從入門到住院.pdf', ))
    p1.start()
    p2 = Process(target=download_task, args=('Peking Hot.avi', ))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('總共耗費了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

在上面的代碼中,咱們經過Process類建立了進程對象,經過target參數咱們傳入一個函數來表示進程啓動後要執行的代碼,後面的args是一個元組,它表明了傳遞給函數的參數。Process對象的start方法用來啓動進程,而join方法表示等待進程執行結束。運行上面的代碼能夠明顯發現兩個下載任務「同時」啓動了,並且程序的執行時間將大大縮短,再也不是兩個任務的時間總和。下面是程序的一次執行結果。

啓動下載進程,進程號[1530].
開始下載Python從入門到住院.pdf...
啓動下載進程,進程號[1531].
開始下載Peking Hot.avi...
Peking Hot.avi下載完成! 耗費了7秒
Python從入門到住院.pdf下載完成! 耗費了10秒
總共耗費了10.01秒.

咱們也可使用subprocess模塊中的類和函數來建立和啓動子進程,而後經過管道來和子進程通訊,這些內容咱們不在此進行講解,有興趣的讀者能夠本身瞭解這些知識。接下來咱們將重點放在如何實現兩個進程間的通訊。咱們啓動兩個進程,一個輸出Ping,一個輸出Pong,兩個進程輸出的Ping和Pong加起來一共10個。聽起來很簡單吧,可是若是這樣寫但是錯的哦。

from multiprocessing import Process
from time import sleep

counter = 0


def sub_task(string):
    global counter
    while counter < 10:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)

        
def main():
    Process(target=sub_task, args=('Ping', )).start()
    Process(target=sub_task, args=('Pong', )).start()


if __name__ == '__main__':
    main()

看起來沒毛病,可是最後的結果是Ping和Pong各輸出了10個,Why?當咱們在程序中建立進程的時候,子進程複製了父進程及其全部的數據結構,每一個子進程有本身獨立的內存空間,這也就意味着兩個子進程中各有一個counter變量,因此結果也就可想而知了。要解決這個問題比較簡單的辦法是使用multiprocessing模塊中的Queue類,它是能夠被多個進程共享的隊列,底層是經過管道和信號量(semaphore)機制來實現的,有興趣的讀者能夠本身嘗試一下。

Python中的多線程

在Python早期的版本中就引入了thread模塊(如今名爲_thread)來實現多線程編程,然而該模塊過於底層,並且不少功能都沒有提供,所以目前的多線程開發咱們推薦使用threading模塊,該模塊對多線程編程提供了更好的面向對象的封裝。咱們把剛纔下載文件的例子用多線程的方式來實現一遍。

from random import randint
from threading import Thread
from time import time, sleep


def download(filename):
    print('開始下載%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))


def main():
    start = time()
    t1 = Thread(target=download, args=('Python從入門到住院.pdf',))
    t1.start()
    t2 = Thread(target=download, args=('Peking Hot.avi',))
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('總共耗費了%.3f秒' % (end - start))


if __name__ == '__main__':
    main()

咱們能夠直接使用threading模塊的Thread類來建立線程,可是咱們以前講過一個很是重要的概念叫「繼承」,咱們能夠從已有的類建立新類,所以也能夠經過繼承Thread類的方式來建立自定義的線程類,而後再建立線程對象並啓動線程。代碼以下所示。

from random import randint
from threading import Thread
from time import time, sleep


class DownloadTask(Thread):

    def __init__(self, filename):
        super().__init__()
        self._filename = filename

    def run(self):
        print('開始下載%s...' % self._filename)
        time_to_download = randint(5, 10)
        sleep(time_to_download)
        print('%s下載完成! 耗費了%d秒' % (self._filename, time_to_download))


def main():
    start = time()
    t1 = DownloadTask('Python從入門到住院.pdf')
    t1.start()
    t2 = DownloadTask('Peking Hot.avi')
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('總共耗費了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

由於多個線程能夠共享進程的內存空間,所以要實現多個線程間的通訊相對簡單,你們能想到的最直接的辦法就是設置一個全局變量,多個線程共享這個全局變量便可。可是當多個線程共享同一個變量(咱們一般稱之爲「資源」)的時候,頗有可能產生不可控的結果從而致使程序失效甚至崩潰。若是一個資源被多個線程競爭使用,那麼咱們一般稱之爲「臨界資源」,對「臨界資源」的訪問須要加上保護,不然資源會處於「混亂」的狀態。下面的例子演示了100個線程向同一個銀行帳戶轉帳(轉入1元錢)的場景,在這個例子中,銀行帳戶就是一個臨界資源,在沒有保護的狀況下咱們頗有可能會獲得錯誤的結果。

from time import sleep
from threading import Thread


class Account(object):

    def __init__(self):
        self._balance = 0

    def deposit(self, money):
        # 計算存款後的餘額
        new_balance = self._balance + money
        # 模擬受理存款業務須要0.01秒的時間
        sleep(0.01)
        # 修改帳戶餘額
        self._balance = new_balance

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    # 建立100個存款的線程向同一個帳戶中存錢
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    # 等全部存款的線程都執行完畢
    for t in threads:
        t.join()
    print('帳戶餘額爲: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()

運行上面的程序,結果讓人大跌眼鏡,100個線程分別向帳戶中轉入1元錢,結果竟然遠遠小於100元。之因此出現這種狀況是由於咱們沒有對銀行帳戶這個「臨界資源」加以保護,多個線程同時向帳戶中存錢時,會一塊兒執行到new_balance = self._balance + money這行代碼,多個線程獲得的帳戶餘額都是初始狀態下的0,因此都是0上面作了+1的操做,所以獲得了錯誤的結果。在這種狀況下,「鎖」就能夠派上用場了。咱們能夠經過「鎖」來保護「臨界資源」,只有得到「鎖」的線程才能訪問「臨界資源」,而其餘沒有獲得「鎖」的線程只能被阻塞起來,直到得到「鎖」的線程釋放了「鎖」,其餘線程纔有機會得到「鎖」,進而訪問被保護的「臨界資源」。下面的代碼演示瞭如何使用「鎖」來保護對銀行帳戶的操做,從而得到正確的結果。

from time import sleep
from threading import Thread, Lock


class Account(object):

    def __init__(self):
        self._balance = 0
        self._lock = Lock()

    def deposit(self, money):
        # 先獲取鎖才能執行後續的代碼
        self._lock.acquire()
        try:
            new_balance = self._balance + money
            sleep(0.01)
            self._balance = new_balance
        finally:
            # 在finally中執行釋放鎖的操做保證正常異常鎖都能釋放
            self._lock.release()

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print('帳戶餘額爲: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()

比較遺憾的一件事情是Python的多線程並不能發揮CPU的多核特性,這一點只要啓動幾個執行死循環的線程就能夠獲得證明了。之因此如此,是由於Python的解釋器有一個「全局解釋器鎖」(GIL)的東西,任何線程執行前必須先得到GIL鎖,而後每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行,這是一個歷史遺留問題,可是即使如此,就如咱們以前舉的例子,使用多線程在提高執行效率和改善用戶體驗方面仍然是有積極意義的。

多進程仍是多線程

不管是多進程仍是多線程,只要數量一多,效率確定上不去,爲何呢?咱們打個比方,假設你不幸正在準備中考,天天晚上須要作語文、數學、英語、物理、化學這5科的做業,每項做業耗時1小時。若是你先花1小時作語文做業,作完了,再花1小時作數學做業,這樣,依次所有作完,一共花5小時,這種方式稱爲單任務模型。若是你打算切換到多任務模型,能夠先作1分鐘語文,再切換到數學做業,作1分鐘,再切換到英語,以此類推,只要切換速度足夠快,這種方式就和單核CPU執行多任務是同樣的了,以旁觀者的角度來看,你就正在同時寫5科做業。

可是,切換做業是有代價的,好比從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫保存現場),而後,打開數學課本、找出圓規直尺(這叫準備新環境),才能開始作數學做業。操做系統在切換進程或者線程時也是同樣的,它須要先保存當前執行的現場環境(CPU寄存器狀態、內存頁等),而後,把新任務的執行環境準備好(恢復上次的寄存器狀態,切換內存頁等),才能開始執行。這個切換過程雖然很快,可是也須要耗費時間。若是有幾千個任務同時進行,操做系統可能就主要忙着切換任務,根本沒有多少時間去執行任務了,這種狀況最多見的就是硬盤狂響,點窗口無反應,系統處於假死狀態。因此,多任務一旦多到一個限度,反而會使得系統性能急劇降低,最終致使全部任務都作很差。

是否採用多任務的第二個考慮是任務的類型,能夠把任務分爲計算密集型和I/O密集型。計算密集型任務的特色是要進行大量的計算,消耗CPU資源,好比對視頻進行編碼解碼或者格式轉換等等,這種任務全靠CPU的運算能力,雖然也能夠用多任務完成,可是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低。計算密集型任務因爲主要消耗CPU資源,這類任務用Python這樣的腳本語言去執行效率一般很低,最能勝任這類任務的是C語言,咱們以前提到了Python中有嵌入C/C++代碼的機制。

除了計算密集型任務,其餘的涉及到網絡、存儲介質I/O的任務均可以視爲I/O密集型任務,這類任務的特色是CPU消耗不多,任務的大部分時間都在等待I/O操做完成(由於I/O的速度遠遠低於CPU和內存的速度)。對於I/O密集型任務,若是啓動多任務,就能夠減小I/O等待時間從而讓CPU高效率的運轉。有一大類的任務都屬於I/O密集型任務,這其中包括了咱們很快會涉及到的網絡應用和Web應用。

說明:上面的內容和例子來自於廖雪峯官方網站的《Python教程》,由於對做者文中的某些觀點持有不一樣的見解,對原文的文字描述作了適當的調整。

單線程+異步I/O

現代操做系統對I/O操做的改進中最爲重要的就是支持異步I/O。若是充分利用操做系統提供的異步I/O支持,就能夠用單進程單線程模型來執行多任務,這種全新的模型稱爲事件驅動模型。Nginx就是支持異步I/O的Web服務器,它在單核CPU上採用單進程模型就能夠高效地支持多任務。在多核CPU上,能夠運行多個進程(數量與CPU核心數相同),充分利用多核CPU。用Node.js開發的服務器端程序也使用了這種工做模式,這也是當下實現多任務編程的一種趨勢。

在Python語言中,單線程+異步I/O的編程模型稱爲協程,有了協程的支持,就能夠基於事件驅動編寫高效的多任務程序。協程最大的優點就是極高的執行效率,由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷。協程的第二個優點就是不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不用加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。若是想要充分利用CPU的多核特性,最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。關於這方面的內容,我稍後會作一個專題來進行講解。

應用案例

例子1:將耗時間的任務放到線程中以得到更好的用戶體驗。

以下所示的界面中,有「下載」和「關於」兩個按鈕,用休眠的方式模擬點擊「下載」按鈕會聯網下載文件須要耗費10秒的時間,若是不使用「多線程」,咱們會發現,當點擊「下載」按鈕後整個程序的其餘部分都被這個耗時間的任務阻塞而沒法執行了,這顯然是很是糟糕的用戶體驗,代碼以下所示。

import time
import tkinter
import tkinter.messagebox


def download():
    # 模擬下載任務須要花費10秒鐘時間
    time.sleep(10)
    tkinter.messagebox.showinfo('提示', '下載完成!')


def show_about():
    tkinter.messagebox.showinfo('關於', '做者: 駱昊(v1.0)')


def main():
    top = tkinter.Tk()
    top.title('單線程')
    top.geometry('200x150')
    top.wm_attributes('-topmost', True)

    panel = tkinter.Frame(top)
    button1 = tkinter.Button(panel, text='下載', command=download)
    button1.pack(side='left')
    button2 = tkinter.Button(panel, text='關於', command=show_about)
    button2.pack(side='right')
    panel.pack(side='bottom')

    tkinter.mainloop()


if __name__ == '__main__':
    main()

若是使用多線程將耗時間的任務放到一個獨立的線程中執行,這樣就不會由於執行耗時間的任務而阻塞了主線程,修改後的代碼以下所示。

import time
import tkinter
import tkinter.messagebox
from threading import Thread


def main():

    class DownloadTaskHandler(Thread):

        def run(self):
            time.sleep(10)
            tkinter.messagebox.showinfo('提示', '下載完成!')
            # 啓用下載按鈕
            button1.config(state=tkinter.NORMAL)

    def download():
        # 禁用下載按鈕
        button1.config(state=tkinter.DISABLED)
        # 經過daemon參數將線程設置爲守護線程(主程序退出就再也不保留執行)
        # 在線程中處理耗時間的下載任務
        DownloadTaskHandler(daemon=True).start()

    def show_about():
        tkinter.messagebox.showinfo('關於', '做者: 駱昊(v1.0)')

    top = tkinter.Tk()
    top.title('單線程')
    top.geometry('200x150')
    top.wm_attributes('-topmost', 1)

    panel = tkinter.Frame(top)
    button1 = tkinter.Button(panel, text='下載', command=download)
    button1.pack(side='left')
    button2 = tkinter.Button(panel, text='關於', command=show_about)
    button2.pack(side='right')
    panel.pack(side='bottom')

    tkinter.mainloop()


if __name__ == '__main__':
    main()

例子2:使用多進程對複雜任務進行「分而治之」。

咱們來完成1~100000000求和的計算密集型任務,這個問題自己很是簡單,有點循環的知識就能解決,代碼以下所示。

from time import time


def main():
    total = 0
    number_list = [x for x in range(1, 100000001)]
    start = time()
    for number in number_list:
        total += number
    print(total)
    end = time()
    print('Execution time: %.3fs' % (end - start))


if __name__ == '__main__':
    main()

在上面的代碼中,我故意先去建立了一個列表容器而後填入了100000000個數,這一步實際上是比較耗時間的,因此爲了公平起見,當咱們將這個任務分解到8個進程中去執行的時候,咱們暫時也不考慮列表切片操做花費的時間,只是把作運算和合並運算結果的時間統計出來,代碼以下所示。

from multiprocessing import Process, Queue
from random import randint
from time import time


def task_handler(curr_list, result_queue):
    total = 0
    for number in curr_list:
        total += number
    result_queue.put(total)


def main():
    processes = []
    number_list = [x for x in range(1, 100000001)]
    result_queue = Queue()
    index = 0
    # 啓動8個進程將數據切片後進行運算
    for _ in range(8):
        p = Process(target=task_handler,
                    args=(number_list[index:index + 12500000], result_queue))
        index += 12500000
        processes.append(p)
        p.start()
    # 開始記錄全部進程執行完成花費的時間
    start = time()
    for p in processes:
        p.join()
    # 合併執行結果
    total = 0
    while not result_queue.empty():
        total += result_queue.get()
    print(total)
    end = time()
    print('Execution time: ', (end - start), 's', sep='')


if __name__ == '__main__':
    main()

比較兩段代碼的執行結果(在我目前使用的MacBook上,上面的代碼須要大概6秒左右的時間,而下面的代碼只須要不到1秒的時間,再強調一次咱們只是比較了運算的時間,不考慮列表建立及切片操做花費的時間),使用多進程後因爲得到了更多的CPU執行時間以及更好的利用了CPU的多核特性,明顯的減小了程序的執行時間,並且計算量越大效果越明顯。固然,若是願意還能夠將多個進程部署在不一樣的計算機上,作成分佈式進程,具體的作法就是經過multiprocessing.managers模塊中提供的管理器將Queue對象經過網絡共享出來(註冊到網絡上讓其餘計算機能夠訪問),這部份內容也留到爬蟲的專題再進行講解。

文章來源https://github.com/jackfrued/Python-100-Days

相關文章
相關標籤/搜索