豬行天下之Python基礎——9.2 Python多線程與多進程(中)

內容簡述:

  • 一、threading模塊詳解
  • 二、queue模塊詳解

一、threading模塊詳解

Python提供的與線程操做相關的模塊,網上有不少資料仍是用的thread模塊,在3.x版本中已經使用threading來替代thread,若是你在python 2.x版本想使用threading的話,可使用dummy_threading模塊css


① threading模塊提供的可直接調用的函數

  • active_count():獲取當前活躍(alive)線程的個數。
  • current_thread():獲取當前的線程對象。
  • get_ident():返回當前線程的索引,一個非零的整數(3.3新增)。
  • enumerate():獲取當前全部活躍線程的列表。
  • main_thread():返回主線程對象(3.4新增)。
  • settrace(func):設置一個回調函數,在run()執行以前被調用。
  • setprofile(func):設置一個回調函數,在run()執行完畢以後調用。
  • stack_size():返回建立新線程時使用的線程堆棧大小。
  • threading.TIMEOUT_MAX:堵塞線程時間最大值,超過這個值會棧溢出。

② 線程局部變量(Thread-Local Data)

問題引入python

在一個進程內全部的線程共享進程的全局變量,線程間共享數據很方便可是每一個線程均可以隨意修改全局變量,可能會引發線程安全問題。sql

解決方法apache

對於這種線程私有數據,最簡單的方法就是對變量加鎖或使用局部變量,只有線程自身能夠訪問,其餘線程沒法訪問。除此以外還可使用threading模塊爲咱們提供的ThreadLocal變量,它自己是一個全局變量,可是線程們卻可使用它來保存私有數據。安全

用法簡介多線程

定義一個全局變量:data = thread.local(),而後就能夠往裏面存數據啦,好比data.num = xxx,可是有一點要注意:若是data裏沒有設置對應的屬性,直接取會報AttributeError異常,使用時能夠捕獲這個異常或先調用hasattr(對象,屬性)判斷對象中是否有該屬性!使用代碼示例以下:併發

import threading
import random

data = threading.local()

def show(d):
    try:
        num = d.num
    except AttributeError:
        print("線程 %s 還未設置該屬性!" % threading.current_thread().getName())
    else:
        print("線程 %s 中該屬性的值爲 = %s" % (threading.current_thread().getName(), num))

def thread_call(d):
    show(d)
    d.num = random.randint(1100)
    show(d)

if __name__ == '__main__':
    show(data)
    data.num = 666
    show(data)
    for i in range(2):
        t = threading.Thread(target=thread_call, args=(data,), name="Thread " + str(i))
        t.start()
複製代碼

運行結果以下app

線程 MainThread 還未設置該屬性!
線程 MainThread 中該屬性的值爲 = 666
線程 Thread 0 還未設置該屬性!
線程 Thread 0 中該屬性的值爲 = 80
線程 Thread 1 還未設置該屬性!
線程 Thread 1 中該屬性的值爲 = 17
複製代碼

不一樣線程訪問這個ThreadLocal變量,返回的都是不同的值,原理:dom

threading.local()實例化一個全局對象,這個全局對象裏有一個大字典,鍵值爲兩個弱引用對象{線程對象,字典對象},而後能夠經過current_thread()得到當前的線程對象,而後根據這個對象能夠拿到對應的字典對象,而後進行參數的讀或者寫。ide


③ 線程對象(threading.Thread)

建立新線程的兩種方式

  • 1.直接建立threading.Thread對象並把調用對象做爲參數傳入
  • 2.繼承threading.Thread類重寫run()方法

使用代碼示例(驗證單線程快仍是多線程快):

import threading
import time

def catch_fish():
    pass

def one_thread():
    start_time = time.time()
    for i in range(11001):
        catch_fish()
    end_time = time.time()
    print("單線程測試 耗時 === %s" % str(end_time - start_time))

def muti_thread():
    start_time = time.time()
    for i in range(11001):
        threading.Thread(target=catch_fish()).start()
    end_time = time.time()
    print("多線程測試 耗時 === %s" % str(end_time - start_time))

if __name__ == '__main__':
    # 單線程
    threading.Thread(one_thread()).start()
    # 多線程
    muti_thread()
複製代碼

運行結果以下:

單線程測試 耗時 === 0.00011301040649414062
多線程測試 耗時 === 0.07665514945983887
複製代碼

從輸出結果能夠看到,多線程反而比單線程要慢,緣由是前面介紹過的Python中的全局解釋器鎖(GIL), 使得任什麼時候候僅有一個線程在執行。

Thread類構造函數

def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None)
:

複製代碼

構造函數參數依次是

  • group:線程組
  • target:要執行的函數
  • name:線程名字
  • args/kwargs:要傳入的函數的參數
  • daemon:是否爲守護線程

相關屬性與函數

  • start():啓動線程,只能調用一次
  • run():線程執行的操做,可繼承Thread重寫,參數可從args和kwargs獲取;
  • join([timeout]):堵塞調用線程,直到被調用線程運行結束或超時;若是
    沒設置超時時間會一直堵塞到被調用線程結束。
  • name/getName():得到線程名;
  • setName():設置線程名;
  • ident:線程是已經啓動,未啓動會返回一個非零整數;
  • is_alive():判斷是否在運行,啓動後,終止前;
  • daemon/isDaemon():線程是否爲守護線程;
  • setDaemon():設置線程爲守護線程;

④ Lock(指令鎖)與RLock(可重入鎖)

在概念那裏就講了,多個進程併發的訪問臨界資源可能會引發線程同步安全問題,寫個簡單的例子,而後再引入同步鎖。代碼示例以下:

import threading

file_name = "test.txt"

# 定義一個寫入文件的方法
def write_to_file(msg):
    try:
        with open(file_name, "a+", encoding="utf-8"as f:
            f.write(msg + "\n")
    except OSError as reason:
        print(str(reason))

class MyThread(threading.Thread):
    def __init__(self, msg):
        super().__init__()
        self.msg = msg
    def run(self):
        write_to_file(self.name + "~" + self.msg)

if __name__ == '__main__':
    for i in range(121):
        t = MyThread(str(i)).start()
複製代碼

運行結果以下

# test.txt文件內容
Thread-1~1
Thread-5~5
Thread-3~3
Thread-2~2
Thread-4~4
Thread-6~6
Thread-7~7
Thread-8~8
Thread-10~10
Thread-9~9
Thread-11~11
Thread-13~13
Thread-12~12
Thread-14~14
Thread-15~15
Thread-16~16
Thread-17~17
Thread-19~19
Thread-20~20
Thread-18~18
複製代碼

發現結果並無按照咱們預想的1-20那樣順序打印,而是亂的,threading模塊中提供了兩個類來確保多線程共享資源的訪問:「Lock」 和 「RLock」。

Lock指令鎖,有兩種狀態(鎖定與非鎖定),以及兩個基本函數:

使用acquire()設置爲locked狀態,使用release()設置爲unlocked狀態。acquire()函數有兩個可選參數:blocking=True:是否堵塞當前線程等待;timeout=None:堵塞等待時間。若是成功得到lock,acquire返回True,不然返回False,超時也是返回False。使用起來也很簡單,在訪問共享資源的地方acquire一下,用完release就好。使用代碼示例以下:

import threading

file_name = "test.txt"
lock = threading.Lock()

# 定義一個寫入文件的方法(加鎖)
def write_to_file(msg):
    if lock.acquire():
        try:
            with open(file_name, "a+", encoding="utf-8"as f:
                f.write(msg + "\n")
        except OSError as reason:
            print(str(reason))
        finally:
            lock.release()

class MyThread(threading.Thread):
    def __init__(self, msg):
        super().__init__()
        self.msg = msg
    def run(self):
        write_to_file(self.name + "~" + self.msg)

if __name__ == '__main__':
    for i in range(1101):
        t = MyThread(str(i)).start()
複製代碼

這裏把循環次數改爲了101,反覆執行屢次,test.txt中寫入順序也是正確的,加鎖有效。另外有一點要注意:若是鎖的狀態是unlocked,此時調用release會拋出RuntimeError異常!

RLock可重入鎖,和Lock相似,但RLock卻能夠被同一個線程請求屢次! 舉個例子:在一個線程裏調用Lock對象的acquire方法兩次。

lock.acquire()
lock.acquire()
lock.release()
lock.release()
複製代碼

你會發現程序卡住不動,由於已經發生了死鎖,可是方法調用是在同一個線程裏的,這很不合理吧。這個時候就能夠引入RLock了,使用RLock編寫同樣代碼,只需把threading.Lock()改爲threading.RLock(),便可解決這個問題。

雖然使用RLock能夠規避同一個線程引發的死鎖問題,可是acquire和release函數要成對出現,即有多少個acquire就要有多少個release,纔可以正真釋放鎖


⑤ 條件變量(Condition)

上面的互斥鎖Lock和RLock只是最簡單的同步機制,Python爲咱們提供了Condition(條件變量),以便於處理複雜線程同步問題,好比最經典的生產者與消費者問題。Condition除了提供與Lock相似的acquire()與release()函數外,還提供了wait()與notify()函數。

用法簡介

  • 1.調用`threading.Condition`得到一個條件變量對象;
  • 2.線程調用acquire得到Condition對象
  • 3.進行條件判斷,不知足條件調用wait函數,知足條件,進行一些處理改變條件後,調用notify函數通知處於wait狀態的線程,從新進行條件判斷。

代碼示例以下(實現一個簡單的消費者和生產者):

import threading
import time

condition = threading.Condition()
products = 0  # 商品數量

# 定義生產者線程類
class Producer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products >= 99:
                    condition.wait()
                else:
                    products += 2
                    print(self.name + "生產了2個產品,當前剩餘產品數爲:" + str(products))
                    condition.notify()
                condition.release()
                time.sleep(2)

# 定義消費者線程類
class Consumer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products < 4:
                    condition.wait()
                else:
                    products -= 4
                    print(self.name + "消耗了4個產品,當前剩餘產品數爲:" + str(products))
                    condition.notify()
            condition.release()
            time.sleep(2)

if __name__ == '__main__':
    # 建立五個生產者線程
    for i in range(5):
        p = Producer()
        p.start()
    # 建立兩個消費者線程
    for j in range(2):
        c = Consumer()
        c.start()
複製代碼

部分運行結果以下

Thread-1生產了2個產品,當前剩餘產品數爲:2
Thread-2生產了2個產品,當前剩餘產品數爲:4
Thread-3生產了2個產品,當前剩餘產品數爲:6
Thread-4生產了2個產品,當前剩餘產品數爲:8
Thread-5生產了2個產品,當前剩餘產品數爲:10
Thread-6消耗了4個產品,當前剩餘產品數爲:6
Thread-7消耗了4個產品,當前剩餘產品數爲:2
Thread-1生產了2個產品,當前剩餘產品數爲:4
Thread-5生產了2個產品,當前剩餘產品數爲:6
Thread-3生產了2個產品,當前剩餘產品數爲:8
Thread-7消耗了4個產品,當前剩餘產品數爲:4
Thread-6消耗了4個產品,當前剩餘產品數爲:0
Thread-4生產了2個產品,當前剩餘產品數爲:2
複製代碼

Condition維護着一個互斥鎖對象(默認是RLock),也能夠本身實例化一個在Condition實例化的時候經過構造函數傳入,因此,調用的Condition的acquire與release函數,其實調用就是這個鎖對象的acquire與release函數。

Condition提供的其餘函數

  • wait(timeout=None):釋放鎖,同時線程被掛起,直到收到通知被喚醒
    或超時(若是設置了timeout),當線程被喚醒並從新佔有鎖時,程序才繼續執行;
  • wait_for(predicate, timeout=None):等待知道條件爲True,predicate應該是
    一個回調函數,返回布爾值,timeout用於指定超時時間,返回值爲回調函數返回
    的布爾值,或者超時,返回False(3.2新增);
  • notify(n=1):默認喚醒一個正在的等待線程,notify並不釋放鎖!!!
  • notify_all():喚醒全部等待線程,進入就緒狀態,等待得到鎖,notify_all 一樣不釋放鎖!!!

注:上述函數只有在acquire以後才能調用,否則會報RuntimeError異常。


⑥ 信號量(Semaphore)

信號量,也是一個簡單易懂的東西,舉個形象的例子:

廁所裏有五個坑位,每有我的去廁所就會佔用一個坑位,所剩餘的坑位-1,當五個坑都被人佔滿時,新來的人就只能在外面等候,直到有人出來爲止。這裏的五個坑位就是信號量,蹲坑的人就是線程,初始值爲5,來人-1,走人+1,超過最大值,新來的處於堵塞狀態,咱們寫下代碼來還原這個過程。

信號量使用代碼示例以下

import threading
import time
import random

s = threading.Semaphore(5)  # 糞坑

class Human(threading.Thread):
    def run(self):
        s.acquire()  # 佔坑
        print("蹲坑 - " + self.name + " - " + str(time.ctime()))
        time.sleep(random.randrange(13))
        print("走人 - " + self.name + " - " + str(time.ctime()))
        s.release()  # 走人

if __name__ == '__main__':
    for i in range(10):
        human = Human()
        human.start()
複製代碼

運行結果以下

蹲坑 - Thread-1 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-2 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-3 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-4 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-5 - Tue Jul 17 19:59:15 2018
走人 - Thread-1 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-6 - Tue Jul 17 19:59:16 2018
走人 - Thread-2 - Tue Jul 17 19:59:16 2018
走人 - Thread-3 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-8 - Tue Jul 17 19:59:16 2018
走人 - Thread-5 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-7 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-9 - Tue Jul 17 19:59:16 2018
走人 - Thread-4 - Tue Jul 17 19:59:17 2018
蹲坑 - Thread-10 - Tue Jul 17 19:59:17 2018
走人 - Thread-6 - Tue Jul 17 19:59:17 2018
走人 - Thread-8 - Tue Jul 17 19:59:17 2018
走人 - Thread-9 - Tue Jul 17 19:59:17 2018
走人 - Thread-7 - Tue Jul 17 19:59:18 2018
走人 - Thread-10 - Tue Jul 17 19:59:19 2018
複製代碼

⑦ 通用的條件變量(Event)

Python提供的「用於線程間通訊的信號標誌」,一個線程標識了一個事件,其餘線程處於等待狀態,直到事件發生後,全部線程都會被激活。Event對象屬性實現了簡單的線程通訊機制,提供了設置信號,清除信號,等待等用於實現線程間的通訊。提供如下四個可供調用的方法:

  • is_set():判斷內部標誌是否爲真
  • set():設置信號標誌爲真
  • clear():清除Event對象內部的信號標誌(設置爲false)
  • wait(timeout=None):使線程一直處於堵塞,知道標識符變爲True

使用代碼示例(汽車等紅綠燈的例子):

import threading
import time
import random

class CarThread(threading.Thread):
    def __init__(self, event):
        threading.Thread.__init__(self)
        self.threadEvent = event
    def run(self):
        # 休眠模擬汽車前後到達路口時間
        time.sleep(random.randrange(110))
        print("汽車 - " + self.name + " - 到達路口...")
        self.threadEvent.wait()
        print("汽車 - " + self.name + " - 經過路口...")

if __name__ == '__main__':
    light_event = threading.Event()
    # 假設有20臺車子
    for i in range(20):
        car = CarThread(event=light_event)
        car.start()
    while threading.active_count() > 1:
        light_event.clear()
        print("紅燈等待...")
        time.sleep(3)
        print("綠燈通行...")
        light_event.set()
        time.sleep(2)
複製代碼

運行結果以下

紅燈等待...
汽車 - Thread-10 - 到達路口...
汽車 - Thread-14 - 到達路口...
汽車 - Thread-9 - 到達路口...
汽車 - Thread-11 - 到達路口...
汽車 - Thread-12 - 到達路口...
綠燈通行...
汽車 - Thread-11 - 經過路口...
汽車 - Thread-10 - 經過路口...
汽車 - Thread-9 - 經過路口...
汽車 - Thread-14 - 經過路口...
汽車 - Thread-12 - 經過路口...
汽車 - Thread-6 - 到達路口...
汽車 - Thread-6 - 經過路口...
複製代碼

⑧ 定時器(Timer)

和Thread相似,只是要等待一段時間後纔會開始運行,單位秒,用法也很簡單,
代碼示例以下

import threading
import time

def skill_ready():
    print("菜餚製做完成!!!")

if __name__ == '__main__':
    t = threading.Timer(5, skill_ready)
    t.start()
    while threading.active_count() > 1:
        print("======菜餚製做中======")
        time.sleep(1)
複製代碼

運行結果以下

======菜餚製做中======
======菜餚製做中======
======菜餚製做中======
======菜餚製做中======
======菜餚製做中======
菜餚製做完成!!!
複製代碼

⑨ 柵欄(Barrier)

Barrier直譯柵欄,感受不怎麼好理解,咱們能夠把它看作是賽馬用的柵欄,而後馬(線程)依次來到柵欄前等待(wait),直到全部的馬都停在柵欄面前了,而後全部馬開始同時出發(start)。簡單點說就是: 多個線程間的相互等待,調用了wait()方法的線程進入堵塞, 直到全部的線程都調用了wait()方法,而後全部線程同時進入就緒狀態, 等待調度運行。

構造函數Barrier(parties,action=None,timeout=None)

參數解釋

  • parties:建立一個可容納parties條線程的柵欄;
  • action:所有線程被釋放時可被其中一條線程調用的可調用對象;
  • timeout:線程調用wait()方法時沒有顯式設定timeout,就用的這個做爲默認值;

相關屬性與函數

  • wait(timeout=None):表示線程就位,返回值是一個0到parties-1之間的
    整數, 每條線程都不同,這個值能夠用做挑選一條線程作些清掃工做,另外若是
    你在構造函數裏設置了action的話,其中一個線程在釋放以前將會調用它。若是調用
    出錯的話,會讓柵欄進入broken狀態,超時一樣也會進入broken狀態,若是柵欄在
    處於broke狀態的時候調用reset函數,會拋出一個BrokenBarrierError異常。
  • reset():本方法將柵欄置爲初始狀態,即empty狀態。全部已經在等待的線程都會
    接收到BrokenBarrierError異常,注意當有其餘處於unknown狀態的線程時,調用
    此方法將可能獲取到額外的訪問。所以若是一個柵欄進入了broken狀態, 最好是
    放棄他並新建一個柵欄,而不是調用reset方法。
  • abort():將柵欄置爲broken狀態。本方法將使全部正在等待或將要調用
    wait()方法的線程收到BrokenBarrierError異常。本方法的使用情景爲,好比:
    有一條線程須要abort(),又不想給其餘線程形成死鎖的狀態,或許設定
    timeout參數要比使用本方法更可靠。
  • parites:將要使用本 barrier 的線程的數量
  • n_waiting:正在等待本 barrier 的線程的數量
  • broken:柵欄是否爲broken狀態,返回一個布爾值
  • BrokenBarrierError:RuntimeError的子類,當柵欄被reset()或broken時引起;

使用代碼示例以下(公司一塊兒去旅遊等人齊纔出發):

import threading
import time
import random

class Staff(threading.Thread):
    def __init__(self, barriers):
        threading.Thread.__init__(self)
        self.barriers = barriers

    def run(self):
        print("員工 【" + self.name + "】" + "出門")
        time.sleep(random.randrange(110))
        print("員工 【" + self.name + "】" + "已簽到")
        self.barriers.wait()

def ready():
    print(threading.current_thread().name + ":人齊,出發,出發~~~")

if __name__ == '__main__':
    print("要出去旅遊啦,你們快集合~")
    b = threading.Barrier(10, action=ready, timeout=20)
    for i in range(10):
        staff = Staff(b)
        staff.start()
複製代碼

運行結果以下

要出去旅遊啦,你們快集合~
員工 【Thread-1】出門
員工 【Thread-2】出門
員工 【Thread-3】出門
員工 【Thread-4】出門
員工 【Thread-5】出門
員工 【Thread-6】出門
員工 【Thread-7】出門
員工 【Thread-8】出門
員工 【Thread-9】出門
員工 【Thread-10】出門
員工 【Thread-8】已簽到
員工 【Thread-4】已簽到
員工 【Thread-5】已簽到
員工 【Thread-6】已簽到
員工 【Thread-9】已簽到
員工 【Thread-2】已簽到
員工 【Thread-3】已簽到
員工 【Thread-7】已簽到
員工 【Thread-1】已簽到
員工 【Thread-10】已簽到
Thread-10:人齊,出發,出發~~~
複製代碼

二、queue模塊詳解

Python中的queue模塊中已經實現了一個線程安全的多生產者,多消費者隊列,自帶鎖,經常使用於多線程併發數據交換。內置三種類型的隊列:

  • Queue:FIFO(先進先出);
  • LifoQueue:LIFO(後進先出);
  • PriorityQueue:優先級最小的先出;

三種類型的隊列的構造函數都是(maxsize=0),用於設置隊列容量,若是設置的maxsize小於1,則表示隊列的長度無限長。

兩個異常

  • Queue.Empty:當調用非堵塞的get()獲取空隊列元素時會引起;
  • Queue.Full:當調用非堵塞的put()滿隊列裏添加元素時會引起;

相關函數

  • size():返回隊列的近似大小,注意:qsize()> 0不保證隨後的get()不會 阻塞也不保證qsize() < maxsize後的put()不會堵塞;
  • empty():判斷隊列是否爲空,返回布爾值,若是返回True,不保證後續 調用put()不會阻塞,同理,返回False也不保證get()調用不會被阻塞;
  • full():判斷隊列是否滿,返回布爾值若是返回True,不保證後續 調用get()不會阻塞,同理,返回False也不保證put()調用不會被阻塞;
  • put(item, block=True, timeout=None):往隊列中放入元素,若是block爲True且timeout參數爲None(默認),爲堵塞型put(),若是timeout是 正數,會堵塞timeout時間並引起Queue.Full異常,若是block爲False則 爲非堵塞put()。
  • put_nowait(item):等價於put(item, False),非堵塞put()
  • get(block=True, timeout=None):移除一個隊列元素,並返回該元素,若是block爲True表示堵塞函數,block = False爲非堵塞函數,若是設置了timeout,堵塞時最多堵塞超過多少秒,若是這段時間內沒有可用的項,會引起Queue.Empty異常,若是爲非堵塞狀態,有數據可用返回數據無數據當即拋出Queue.Empty異常;
  • get_nowait():等價於get(False),非堵塞get()
  • task_done():完成一項工做後,調用該方法向隊列發送一個完成信號,任務-1;
  • join():等隊列爲空,再執行別的操做;

使用代碼示例以下

import threading
import queue
import time
import random

work_queue = queue.Queue()

# 任務模擬
def working():
    global work_queue
    while not work_queue.empty():
        data = work_queue.get()
        time.sleep(random.randrange(12))
        print("執行" + data)
        work_queue.task_done()

# 工做線程
class WorkThread(threading.Thread):
    def __init__(self, t_name, func):
        self.func = func
        threading.Thread.__init__(self, name=t_name)
    def run(self):
        self.func()

if __name__ == '__main__':
    work_list = []
    for i in range(121):
        work_list.append("任務 %d" % i)
    # 模擬把須要執行的任務放到隊列中
    for i in work_list:
        work_queue.put(i)
    # 初始化一個線程列表
    threads = []
    for i in range(0, len(work_list)):
        t = WorkThread(t_name="線程" + str(i), func=working)
        t.daemon = True
        t.start()
        threads.append(t)
    work_queue.join()
    for t in threads:
        t.join()
    print("全部任務執行完畢")
複製代碼

運行結果以下

執行任務 1
執行任務 3
執行任務 5
執行任務 2
執行任務 4
執行任務 6
執行任務 8
執行任務 10
執行任務 13
執行任務 11
執行任務 17
執行任務 18
執行任務 19
執行任務 7
執行任務 14
執行任務 16
執行任務 9
執行任務 15
執行任務 12
執行任務 20
全部任務執行完畢
複製代碼

若是本文對你有所幫助,歡迎
留言,點贊,轉發
素質三連,謝謝😘~

相關文章
相關標籤/搜索