Python多線程同步教程

概述

  • 多線程給咱們帶來的好處是能夠併發的執行多個任務,特別是對於I/O密集型的業務,使用多線程,能夠帶來成倍的性能增加。
  • 但是當咱們多個線程須要修改同一個數據,在不作任何同步控制的狀況下,產生的結果每每是不可預料的,好比兩個線程,一個輸出hello,一個輸出world,實際運行的結果,每每多是一個是hello world,一個是world hello。
  • python裏提供了多個用於控制多線程同步的同步原語,這些原語,包含在python的標準庫threading.py當中。我今天簡單的介紹一下python裏的這些控制多線程同步的原語,包括:Locks、RLocks、Semaphores、Events、Conditions和Barriers,你也能夠繼承這些類,實現本身的同步控制原語。

Lock(鎖)

  • Locks是python裏最簡單的同步原語,只包括兩個狀態:locked和unlocked,剛建立時狀態是unlocked。Locks有兩個方法,acquire和release。acquire方法加鎖,release方法釋放鎖,若是acquire枷鎖失敗,則阻塞,代表其餘線程已經加鎖。release方法只有當狀態是locked調用方法True,若是是unlocked狀態,調用release方法會拋出RunTimeError異常。例如代碼:
from threading import Lock, Thread
    lock = Lock()
    g = 0
    
    def add_one():
       """
       Just used for demonstration. It’s bad to use the ‘global’
       statement in general.
       """
       global g
       lock.acquire()
       g += 1
       lock.release()
    
    def add_two():
       global g
       lock.acquire()
       g += 2
       lock.release()
    
    threads = []
    for func in [add_one, add_two]:
       threads.append(Thread(target=func))
       threads[-1].start()
    
    for thread in threads:
       """
       Waits for threads to complete before moving on with the main
       script.
       """
       thread.join()

    print(g)
  • 最終輸出的結果是3,經過Lock的使用,雖然在兩個線程中修改了同一個全局變量,但兩個線程是順序計算出結果的。

RLock(循環鎖)

  • 上面的Lock對象雖然能達到同步的效果,可是沒法得知當前是那個線程獲取到了鎖。若是鎖沒被釋放,則其餘獲取這個鎖的線程都會被阻塞住。若是不想阻塞,能夠使用RLock,例如:
# 使用Lock
    import threading
    num = 0
    lock = Threading.Lock()
    
    lock.acquire()
    num += 1
    lock.acquire() # 這個地方阻塞
    num += 2
    lock.release()
    
    # 使用RLock
    lock = Threading.RLock()
    lock.acquire()
    num += 3
    lock.acquire() # 這不會阻塞
    num += 4
    lock.release()
    lock.release() # 這個地方注意是釋放兩次鎖

Semaphores

  • Semaphores是個最簡單的計數器,有兩個方法acquire()和release(),若是有多個線程調用acquire()方法,acquire()方法會阻塞住,每當調用次acquire方法,就作一次減1操做,每當release()方法調用這次,就加1,若是最後的計數數值大於調用acquire()方法的線程數目,release()方法會拋出ValueError異常。下面是個生產者消費者的示例。
import random, time
    from threading import BoundedSemaphore, Thread
    max_items = 5
    container = BoundedSemaphore(max_items)
    def producer(nloops):
        for i in range(nloops):
            time.sleep(random.randrange(2, 5))
            print(time.ctime(), end=": ")
            try:
                container.release()
                print("Produced an item.")
            except ValueError:
                print("Full, skipping.")
    def consumer(nloops):
        for i in range(nloops):
            time.sleep(random.randrange(2, 5))
            print(time.ctime(), end=": ")
            if container.acquire(False):
                print("Consumed an item.")
            else:
                print("Empty, skipping.")
    threads = []
    nloops = random.randrange(3, 6)
    print("Starting with %s items." % max_items)
    threads.append(Thread(target=producer, args=(nloops,)))
    threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),)))
    for thread in threads:  # Starts all the threads.
        thread.start()
    for thread in threads:  # Waits for threads to complete before moving on with the main script.
        thread.join()
    print("All done.")

  • threading模塊還提供了一個Semaphore對象,它容許你能夠任意次的調用release函數,可是最好仍是使用BoundedSemaphore對象,這樣在release調用次數過多時會報錯,有益於查找錯誤。Semaphores最長用來限制資源的使用,好比最多十個進程。

Events

  • event能夠充當多進程之間的通訊工具,基於一個內部的標誌,線程能夠調用set()和clear()方法來操做這個標誌,其餘線程則阻塞在wait()函數,直到標誌被設置爲True。下面的代碼展現瞭如何利用Events來追蹤行爲。
import random, time
    from threading import Event, Thread
    
    event = Event()
    
    def waiter(event, nloops):
        for i in range(nloops):
        print(「%s. Waiting for the flag to be set.」 % (i+1))
        event.wait() # Blocks until the flag becomes true.
        print(「Wait complete at:」, time.ctime())
        event.clear() # Resets the flag.
        print()
    
    def setter(event, nloops):
        for i in range(nloops):
        time.sleep(random.randrange(2, 5)) # Sleeps for some time.
        event.set()
    
    threads = []
    nloops = random.randrange(3, 6)
    
    threads.append(Thread(target=waiter, args=(event, nloops)))
    threads[-1].start()
    threads.append(Thread(target=setter, args=(event, nloops)))
    threads[-1].start()
    
    for thread in threads:
        thread.join()
    
    print(「All done.」)

Conditions

  • conditions是比events更加高級一點的同步原語,能夠用戶多線程間的通訊和通知。好比A線程通知B線程資源已經能夠被消費。其餘的線程必須在調用wait()方法前調用acquire()方法。一樣的,每一個線程在資源使用完之後,要調用release()方法,這樣其餘線程就能夠繼續執行了。下面是使用conditions實現的一個生產者消費者的例子。
import random, time
    from threading import Condition, Thread
    condition = Condition()
    box = []
    def producer(box, nitems):
        for i in range(nitems):
            time.sleep(random.randrange(2, 5))  # Sleeps for some time.
            condition.acquire()
            num = random.randint(1, 10)
            box.append(num)  # Puts an item into box for consumption.
            condition.notify()  # Notifies the consumer about the availability.
            print("Produced:", num)
            condition.release()
    def consumer(box, nitems):
        for i in range(nitems):
            condition.acquire()
            condition.wait()  # Blocks until an item is available for consumption.
            print("%s: Acquired: %s" % (time.ctime(), box.pop()))
            condition.release()
    threads = []
    nloops = random.randrange(3, 6)
    for func in [producer, consumer]:
        threads.append(Thread(target=func, args=(box, nloops)))
        threads[-1].start()  # Starts the thread.
    for thread in threads:
        thread.join()
    print("All done.")

  • conditions還有其餘不少用戶,好比實現一個數據流API,當數據準備好了能夠通知其餘線程去處理數據。

Barriers

  • barriers是個簡單的同步原語,能夠用戶多個線程之間的相互等待。每一個線程都調用wait()方法,而後阻塞,直到全部線程調用了wait(),而後全部線程同時開始運行。例如:
from random import randrange
    from threading import Barrier, Thread
    from time import ctime, sleep
    
    num = 4
    b = Barrier(num)
    names = [「Harsh」, 「Lokesh」, 「George」, 「Iqbal」]
    
    def player():
        name = names.pop()
        sleep(randrange(2, 5))
        print(「%s reached the barrier at: %s」 % (name, ctime()))
        b.wait()
        
    threads = []
    print(「Race starts now…」)
    
    for i in range(num):
        threads.append(Thread(target=player))
        threads[-1].start()
    for thread in threads:
        thread.join()
    print()
    print(「Race over!」)

總結

  • 多線程同步,說難也難,說不難也很容易,關鍵是要看你的業務場景和解決問題的思路,儘可能下降多線程之間的依賴,理清楚業務流程,選擇合適的方法,則事盡成。
  • 轉載自個人博客:捕蛇者說
相關文章
相關標籤/搜索