草根學Python(十三)線程和進程

前言

拖了很久,不過仍是得堅持。喜歡本文的話能夠加下公衆號【於你供讀】。python

目錄

草根學Python(十三) 線程和進程

線程與進程

線程與進程是操做系統裏面的術語,簡單來說,每個應用程序都有一個本身的進程。編程

操做系統會爲這些進程分配一些執行資源,例如內存空間等。在進程中,又能夠建立一些線程,他們共享這些內存空間,並由操做系統調用,以便並行計算。瀏覽器

咱們都知道現代操做系統好比 Mac OS X,UNIX,Linux,Windows 等能夠同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽敲代碼,一邊用 Markdown 寫博客,這就是多任務,至少同時有 3 個任務正在運行。固然還有不少任務悄悄地在後臺同時運行着,只是桌面上沒有顯示而已。對於操做系統來講,一個任務就是一個進程(Process),好比打開一個瀏覽器就是啓動一個瀏覽器進程,打開 PyCharm 就是一個啓動了一個 PtCharm 進程,打開 Markdown 就是啓動了一個 Md 的進程。安全

雖然如今多核 CPU 已經很是普及了。但是因爲 CPU 執行代碼都是順序執行的,這時候咱們就會有疑問,單核 CPU 是怎麼執行多任務的呢?多線程

其實就是操做系統輪流讓各個任務交替執行,任務 1 執行 0.01 秒,切換到任務 2 ,任務 2 執行 0.01 秒,再切換到任務 3 ,執行 0.01秒……這樣反覆執行下去。表面上看,每一個任務都是交替執行的,可是,因爲 CPU的執行速度實在是太快了,咱們肉眼和感受上無法識別出來,就像全部任務都在同時執行同樣。併發

真正的並行執行多任務只能在多核 CPU 上實現,可是,因爲任務數量遠遠多於 CPU 的核心數量,因此,操做系統也會自動把不少任務輪流調度到每一個核心上執行。app

有些進程不只僅只是幹一件事的啊,好比瀏覽器,咱們能夠播放時視頻,播放音頻,看文章,編輯文章等等,其實這些都是在瀏覽器進程中的子任務。在一個進程內部,要同時幹多件事,就須要同時運行多個「子任務」,咱們把進程內的這些「子任務」稱爲線程(Thread)。dom

因爲每一個進程至少要幹一件事,因此,一個進程至少有一個線程。固然,一個進程也能夠有多個線程,多個線程能夠同時執行,多線程的執行方式和多進程是同樣的,也是由操做系統在多個線程之間快速切換,讓每一個線程都短暫地交替運行,看起來就像同時執行同樣。async

那麼在 Python 中咱們要同時執行多個任務怎麼辦?函數

有兩種解決方案:

一種是啓動多個進程,每一個進程雖然只有一個線程,但多個進程能夠一塊執行多個任務。

還有一種方法是啓動一個進程,在一個進程內啓動多個線程,這樣,多個線程也能夠一塊執行多個任務。

固然還有第三種方法,就是啓動多個進程,每一個進程再啓動多個線程,這樣同時執行的任務就更多了,固然這種模型更復雜,實際不多采用。

總結一下就是,多任務的實現有3種方式:

  • 多進程模式;
  • 多線程模式;
  • 多進程+多線程模式。

同時執行多個任務一般各個任務之間並非沒有關聯的,而是須要相互通訊和協調,有時,任務 1 必須暫停等待任務 2 完成後才能繼續執行,有時,任務 3 和任務 4 又不能同時執行,因此,多進程和多線程的程序的複雜度要遠遠高於咱們前面寫的單進程單線程的程序。

由於複雜度高,調試困難,因此,不是無可奈何,咱們也不想編寫多任務。可是,有不少時候,沒有多任務還真不行。想一想在電腦上看電影,就必須由一個線程播放視頻,另外一個線程播放音頻,不然,單線程實現的話就只能先把視頻播放完再播放音頻,或者先把音頻播放完再播放視頻,這顯然是不行的。

多線程編程

其實建立線程以後,線程並非始終保持一個狀態的,其狀態大概以下:

  • New 建立
  • Runnable 就緒。等待調度
  • Running 運行
  • Blocked 阻塞。阻塞可能在 Wait Locked Sleeping
  • Dead 消亡

線程有着不一樣的狀態,也有不一樣的類型。大體可分爲:

  • 主線程
  • 子線程
  • 守護線程(後臺線程)
  • 前臺線程

簡單瞭解完這些以後,咱們開始看看具體的代碼使用了。

一、線程的建立

Python 提供兩個模塊進行多線程的操做,分別是 threadthreading

前者是比較低級的模塊,用於更底層的操做,通常應用級別的開發不經常使用。

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import time
import threading


class MyThread(threading.Thread):
    def run(self):
        for i in range(5):
            print('thread {}, @number: {}'.format(self.name, i))
            time.sleep(1)


def main():
    print("Start main threading")

    # 建立三個線程
    threads = [MyThread() for i in range(3)]
    # 啓動三個線程
    for t in threads:
        t.start()

    print("End Main threading")


if __name__ == '__main__':
    main()

複製代碼

運行結果:

Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
End Main threading
thread Thread-2, @number: 1
thread Thread-1, @number: 1
thread Thread-3, @number: 1
thread Thread-1, @number: 2
thread Thread-3, @number: 2
thread Thread-2, @number: 2
thread Thread-2, @number: 3
thread Thread-3, @number: 3
thread Thread-1, @number: 3
thread Thread-3, @number: 4
thread Thread-2, @number: 4
thread Thread-1, @number: 4
複製代碼

注意喔,這裏不一樣的環境輸出的結果確定是不同的。

二、線程合併(join方法)

上面的示例打印出來的結果來看,主線程結束後,子線程還在運行。那麼咱們須要主線程要等待子線程運行完後,再退出,要怎麼辦呢?

這時候,就須要用到 join 方法了。

在上面的例子,新增一段代碼,具體以下:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import time
import threading


class MyThread(threading.Thread):
    def run(self):
        for i in range(5):
            print('thread {}, @number: {}'.format(self.name, i))
            time.sleep(1)


def main():
    print("Start main threading")

    # 建立三個線程
    threads = [MyThread() for i in range(3)]
    # 啓動三個線程
    for t in threads:
        t.start()

    # 一次讓新建立的線程執行 join
    for t in threads:
        t.join()

    print("End Main threading")


if __name__ == '__main__':
    main()

複製代碼

從打印的結果,能夠清楚看到,相比上面示例打印出來的結果,主線程是在等待子線程運行結束後才結束的。

Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
thread Thread-1, @number: 1
thread Thread-3, @number: 1
thread Thread-2, @number: 1
thread Thread-2, @number: 2
thread Thread-1, @number: 2
thread Thread-3, @number: 2
thread Thread-2, @number: 3
thread Thread-1, @number: 3
thread Thread-3, @number: 3
thread Thread-3, @number: 4
thread Thread-2, @number: 4
thread Thread-1, @number: 4
End Main threading

複製代碼

三、線程同步與互斥鎖

使用線程加載獲取數據,一般都會形成數據不一樣步的狀況。固然,這時候咱們能夠給資源進行加鎖,也就是訪問資源的線程須要得到鎖才能訪問。

其中 threading 模塊給咱們提供了一個 Lock 功能。

lock = threading.Lock()
複製代碼

在線程中獲取鎖

lock.acquire()
複製代碼

使用完成後,咱們確定須要釋放鎖

lock.release()
複製代碼

固然爲了支持在同一線程中屢次請求同一資源,Python 提供了可重入鎖(RLock)。RLock 內部維護着一個 Lock 和一個 counter 變量,counter 記錄了 acquire 的次數,從而使得資源能夠被屢次 require。直到一個線程全部的 acquire 都被 release,其餘的線程才能得到資源。

那麼怎麼建立重入鎖呢?也是一句代碼的事情:

r_lock = threading.RLock()
複製代碼

四、Condition 條件變量

實用鎖能夠達到線程同步,可是在更復雜的環境,須要針對鎖進行一些條件判斷。Python 提供了 Condition 對象。使用 Condition 對象能夠在某些事件觸發或者達到特定的條件後才處理數據,Condition 除了具備 Lock 對象的 acquire 方法和 release 方法外,還提供了 wait 和 notify 方法。線程首先 acquire 一個條件變量鎖。若是條件不足,則該線程 wait,若是知足就執行線程,甚至能夠 notify 其餘線程。其餘處於 wait 狀態的線程接到通知後會從新判斷條件。

其中條件變量能夠當作不一樣的線程前後 acquire 得到鎖,若是不知足條件,能夠理解爲被扔到一個( Lock 或 RLock )的 waiting 池。直達其餘線程 notify 以後再從新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。

Condition

該模式經常使用於生產者消費者模式,具體看看下面在線購物買家和賣家的示例:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import threading, time


class Consumer(threading.Thread):
    def __init__(self, cond, name):
        # 初始化
        super(Consumer, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        # 確保先運行Seeker中的方法
        time.sleep(1)
        self.cond.acquire()
        print(self.name + ': 我這兩件商品一塊兒買,能夠便宜點嗎')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 我已經提交訂單了,你修改下價格')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 收到,我支付成功了')
        self.cond.notify()
        self.cond.release()
        print(self.name + ': 等待收貨')


class Producer(threading.Thread):
    def __init__(self, cond, name):
        super(Producer, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        self.cond.acquire()
        # 釋放對瑣的佔用,同時線程掛起在這裏,直到被 notify 並從新佔有瑣。
        self.cond.wait()
        print(self.name + ': 能夠的,你提交訂單吧')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 好了,已經修改了')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 嗯,收款成功,立刻給你發貨')
        self.cond.release()
        print(self.name + ': 發貨商品')


cond = threading.Condition()
consumer = Consumer(cond, '買家(兩點水)')
producer = Producer(cond, '賣家(三點水)')
consumer.start()
producer.start()

複製代碼

輸出的結果以下:

買家(兩點水): 我這兩件商品一塊兒買,能夠便宜點嗎
賣家(三點水): 能夠的,你提交訂單吧
買家(兩點水): 我已經提交訂單了,你修改下價格
賣家(三點水): 好了,已經修改了
買家(兩點水): 收到,我支付成功了
買家(兩點水): 等待收貨
賣家(三點水): 嗯,收款成功,立刻給你發貨
賣家(三點水): 發貨商品
複製代碼

五、線程間通訊

若是程序中有多個線程,這些線程避免不了須要相互通訊的。那麼咱們怎樣在這些線程之間安全地交換信息或數據呢?

從一個線程向另外一個線程發送數據最安全的方式可能就是使用 queue 庫中的隊列了。建立一個被多個線程共享的 Queue 對象,這些線程經過使用 put()get() 操做來向隊列中添加或者刪除元素。

# -*- coding: UTF-8 -*-
from queue import Queue
from threading import Thread

isRead = True


def write(q):
    # 寫數據進程
    for value in ['兩點水', '三點水', '四點水']:
        print('寫進 Queue 的值爲:{0}'.format(value))
        q.put(value)


def read(q):
    # 讀取數據進程
    while isRead:
        value = q.get(True)
        print('從 Queue 讀取的值爲:{0}'.format(value))


if __name__ == '__main__':
    q = Queue()
    t1 = Thread(target=write, args=(q,))
    t2 = Thread(target=read, args=(q,))
    t1.start()
    t2.start()
複製代碼

輸出的結果以下:

寫進 Queue 的值爲:兩點水
寫進 Queue 的值爲:三點水
從 Queue 讀取的值爲:兩點水
寫進 Queue 的值爲:四點水
從 Queue 讀取的值爲:三點水
從 Queue 讀取的值爲:四點水
複製代碼

Python 還提供了 Event 對象用於線程間通訊,它是由線程設置的信號標誌,若是信號標誌位真,則其餘線程等待直到信號接觸。

Event 對象實現了簡單的線程通訊機制,它提供了設置信號,清楚信號,等待等用於實現線程間的通訊。

  • 設置信號

使用 Event 的 set() 方法能夠設置 Event 對象內部的信號標誌爲真。Event 對象提供了 isSe() 方法來判斷其內部信號標誌的狀態。當使用 event 對象的 set() 方法後,isSet() 方法返回真

  • 清除信號

使用 Event 對象的 clear() 方法能夠清除 Event 對象內部的信號標誌,即將其設爲假,當使用 Event 的 clear 方法後,isSet() 方法返回假

  • 等待

Event 對象 wait 的方法只有在內部信號爲真的時候纔會很快的執行並完成返回。當 Event 對象的內部信號標誌位假時,則 wait 方法一直等待到其爲真時才返回。

示例:

# -*- coding: UTF-8 -*-

import threading


class mThread(threading.Thread):
    def __init__(self, threadname):
        threading.Thread.__init__(self, name=threadname)

    def run(self):
        # 使用全局Event對象
        global event
        # 判斷Event對象內部信號標誌
        if event.isSet():
            event.clear()
            event.wait()
            print(self.getName())
        else:
            print(self.getName())
            # 設置Event對象內部信號標誌
            event.set()

# 生成Event對象
event = threading.Event()
# 設置Event對象內部信號標誌
event.set()
t1 = []
for i in range(10):
    t = mThread(str(i))
    # 生成線程列表
    t1.append(t)

for i in t1:
    # 運行線程
    i.start()

複製代碼

輸出的結果以下:

1
0
3
2
5
4
7
6
9
8
複製代碼

六、後臺線程

默認狀況下,主線程退出以後,即便子線程沒有 join。那麼主線程結束後,子線程也依然會繼續執行。若是但願主線程退出後,其子線程也退出而再也不執行,則須要設置子線程爲後臺線程。Python 提供了 setDeamon 方法。

進程

Python 中的多線程其實並非真正的多線程,若是想要充分地使用多核 CPU 的資源,在 Python 中大部分狀況須要使用多進程。Python 提供了很是好用的多進程包 multiprocessing,只須要定義一個函數,Python 會完成其餘全部事情。藉助這個包,能夠輕鬆完成從單進程到併發執行的轉換。multiprocessing 支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了 Process、Queue、Pipe、Lock 等組件。

一、類 Process

建立進程的類:Process([group [, target [, name [, args [, kwargs]]]]])

  • target 表示調用對象
  • args 表示調用對象的位置參數元組
  • kwargs表示調用對象的字典
  • name爲別名
  • group實質上不使用

下面看一個建立函數並將其做爲多個進程的例子:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import multiprocessing
import time


def worker(interval, name):
    print(name + '【start】')
    time.sleep(interval)
    print(name + '【end】')


if __name__ == "__main__":
    p1 = multiprocessing.Process(target=worker, args=(2, '兩點水1'))
    p2 = multiprocessing.Process(target=worker, args=(3, '兩點水2'))
    p3 = multiprocessing.Process(target=worker, args=(4, '兩點水3'))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

複製代碼

輸出的結果:

多進程輸出結果

二、把進程建立成類

固然咱們也能夠把進程建立成一個類,以下面的例子,當進程 p 調用 start() 時,自動調用 run() 方法。

# -*- coding: UTF-8 -*-

import multiprocessing
import time


class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("當前時間: {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1


if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()

複製代碼

輸出結果以下:

建立進程類

三、daemon 屬性

想知道 daemon 屬性有什麼用,看下下面兩個例子吧,一個加了 daemon 屬性,一個沒有加,對比輸出的結果:

沒有加 deamon 屬性的例子:

# -*- coding: UTF-8 -*-
import multiprocessing
import time


def worker(interval):
    print('工做開始時間:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工做結果時間:{0}'.format(time.ctime()))


if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print('【EMD】')

複製代碼

輸出結果:

【EMD】
工做開始時間:Mon Oct  9 17:47:06 2017
工做結果時間:Mon Oct  9 17:47:09 2017
複製代碼

在上面示例中,進程 p 添加 daemon 屬性:

# -*- coding: UTF-8 -*-

import multiprocessing
import time


def worker(interval):
    print('工做開始時間:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工做結果時間:{0}'.format(time.ctime()))


if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    print('【EMD】')
複製代碼

輸出結果:

【EMD】
複製代碼

根據輸出結果可見,若是在子進程中添加了 daemon 屬性,那麼當主進程結束的時候,子進程也會跟着結束。因此沒有打印子進程的信息。

四、join 方法

結合上面的例子繼續,若是咱們想要讓子線程執行完該怎麼作呢?

那麼咱們能夠用到 join 方法,join 方法的主要做用是:阻塞當前進程,直到調用 join 方法的那個進程執行完,再繼續執行當前進程。

所以看下加了 join 方法的例子:

import multiprocessing
import time


def worker(interval):
    print('工做開始時間:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工做結果時間:{0}'.format(time.ctime()))


if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    p.join()
    print('【EMD】')
複製代碼

輸出的結果:

工做開始時間:Tue Oct 10 11:30:08 2017
工做結果時間:Tue Oct 10 11:30:11 2017
【EMD】
複製代碼

五、Pool

若是須要不少的子進程,難道咱們須要一個一個的去建立嗎?

固然不用,咱們可使用進程池的方法批量建立子進程。

例子以下:

# -*- coding: UTF-8 -*-

from multiprocessing import Pool
import os, time, random


def long_time_task(name):
    print('進程的名稱:{0} ;進程的PID: {1} '.format(name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('進程 {0} 運行了 {1} 秒'.format(name, (end - start)))


if __name__ == '__main__':
    print('主進程的 PID:{0}'.format(os.getpid()))
    p = Pool(4)
    for i in range(6):
        p.apply_async(long_time_task, args=(i,))
    p.close()
    # 等待全部子進程結束後在關閉主進程
    p.join()
    print('【End】')
複製代碼

輸出的結果以下:

主進程的 PID:7256
進程的名稱:0 ;進程的PID: 1492 
進程的名稱:1 ;進程的PID: 12232 
進程的名稱:2 ;進程的PID: 4332 
進程的名稱:3 ;進程的PID: 11604 
進程 2 運行了 0.6500370502471924 秒
進程的名稱:4 ;進程的PID: 4332 
進程 1 運行了 1.0830621719360352 秒
進程的名稱:5 ;進程的PID: 12232 
進程 5 運行了 0.029001712799072266 秒
進程 4 運行了 0.9720554351806641 秒
進程 0 運行了 2.3181326389312744 秒
進程 3 運行了 2.5331451892852783 秒
【End】
複製代碼

這裏有一點須要注意: Pool 對象調用 join() 方法會等待全部子進程執行完畢,調用 join() 以前必須先調用 close() ,調用close() 以後就不能繼續添加新的 Process 了。

請注意輸出的結果,子進程 0,1,2,3是馬上執行的,而子進程 4 要等待前面某個子進程完成後才執行,這是由於 Pool 的默認大小在個人電腦上是 4,所以,最多同時執行 4 個進程。這是 Pool 有意設計的限制,並非操做系統的限制。若是改爲:

p = Pool(5)
複製代碼

就能夠同時跑 5 個進程。

六、進程間通訊

Process 之間確定是須要通訊的,操做系統提供了不少機制來實現進程間的通訊。Python 的 multiprocessing 模塊包裝了底層的機制,提供了Queue、Pipes 等多種方式來交換數據。

以 Queue 爲例,在父進程中建立兩個子進程,一個往 Queue 裏寫數據,一個從 Queue 裏讀數據:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

from multiprocessing import Process, Queue
import os, time, random


def write(q):
    # 寫數據進程
    print('寫進程的PID:{0}'.format(os.getpid()))
    for value in ['兩點水', '三點水', '四點水']:
        print('寫進 Queue 的值爲:{0}'.format(value))
        q.put(value)
        time.sleep(random.random())


def read(q):
    # 讀取數據進程
    print('讀進程的PID:{0}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('從 Queue 讀取的值爲:{0}'.format(value))


if __name__ == '__main__':
    # 父進程建立 Queue,並傳給各個子進程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啓動子進程 pw
    pw.start()
    # 啓動子進程pr
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr 進程裏是死循環,沒法等待其結束,只能強行終止
    pr.terminate()

複製代碼

輸出的結果爲:

讀進程的PID:13208
寫進程的PID:10864
寫進 Queue 的值爲:兩點水
從 Queue 讀取的值爲:兩點水
寫進 Queue 的值爲:三點水
從 Queue 讀取的值爲:三點水
寫進 Queue 的值爲:四點水
從 Queue 讀取的值爲:四點水
複製代碼
相關文章
相關標籤/搜索