33 - 併發編程-線程同步-Event-lock

1 線程同步

        線程同步,線程間協同,經過某種技術,讓一個線程訪問某些數據時,其餘線程不能訪問這些數據,直到該線程完成對數據的操做後。不一樣的操做系統有多種實現方式。好比臨界區(Critical Section)、互斥鎖(Mutex)、信號量(Semaphore)、事件(Event)等。多線程

1.1 Event

        Event是線程間通信機制最簡單的實現,使用一個內部標記flag,主要提供了三個方法wait、clear、set,經過操做flag來控制線程的執行。app

  • clear():將'Flag'設置爲False。
  • set():將'Flag'設置爲True。
  • wait(timeout=None):等待'Flag'爲True後,繼續執行(timeout爲超時時間,不然永遠等待)。
  • is_set(): 判斷'Flag'是否爲

1.1.1 什麼是Flag?

        Event對象在全局定義了一個'Flag',若是'Flag'值爲 False,那麼當程序執行 Event對象的wait方法時就會阻塞,若是'Flag'值爲True,那已經阻塞的wait方法會繼續執行。dom

1.1.2 Event原理

        在使用threading.Event 實現線程間通訊時:使用threading.Event可使一個線程等待其餘線程的通知,咱們把這個Event傳遞到線程對象中,Event默認內置了一個標誌,初始值爲False。一旦該線程經過wait()方法進入等待狀態,直到另外一個線程調用該Event的set()方法將內置標誌設置爲True時,該Event會通知全部等待狀態的線程恢復運行。性能

1.1.3 吃包子

有下面代碼,大欣負責吃包子,廚師負責作包子,只有廚師作好了,大欣才能開始吃。ui

import time
import random

event = threading.Event()

def consumer():
    current = threading.current_thread()
    print('{} 等着吃包子...'.format(current.name))
    event.wait()
    print('包子來了,我正在吃')
    for i in range(1,5):
        time.sleep(random.randrange(1,3))
        print('{} 吃 包子-{}。'.format(current.name,i))


def chef():
    current = threading.current_thread()
    print('{} 正在作包子'.format(current.name))
    time.sleep(3)
    print('{} 包子作好了,下班'.format(current.name))
    event.set()

chef = threading.Thread(target=chef,name='大廚師')
chef.start()

consumer = threading.Thread(target=consumer,name='大欣')
consumer.start()

運行consumer時,包子還沒作,因此只能等着,等chef作完了之後,設置了event爲True,這時consumer就開始吃了。wait還能夠指定等待時間,好比chef作的太慢了,consumer不吃了。操作系統

import time
import random

event = threading.Event()

def consumer():
    current = threading.current_thread()
    print('{} 等着吃包子...'.format(current.name))
    if not event.wait(2):
        print('太慢了,不吃了')
    else:
        print('包子來了,我正在吃')
        for i in range(1,5):
            time.sleep(random.randrange(1,3))
            print('{} 吃 包子-{}。'.format(current.name,i))

def chef():
    current = threading.current_thread()
    print('{} 正在作包子'.format(current.name))
    time.sleep(8)
    print('{} 包子作好了,下班'.format(current.name))
    event.set()

chef = threading.Thread(target=chef,name='大廚師')
chef.start()

consumer = threading.Thread(target=consumer,name='大欣')
consumer.start()

當Event被set後,wait的返回值就是True,若是wait(2),在2秒內,Event沒有被set,那麼返回值是False。線程

1.2 Lock

        因爲線程間的數據是共享的,當咱們多個線程操做一個相同的用戶的數據時,有可能形成混亂,以下例子:code

import threading
import time

n = 10

def work():
    global n
    while n > 0:
        time.sleep(0.01)
        n -= 1

if __name__ == '__main__':
    t_l = []
    for i in range(10):
        t = threading.Thread(target=work)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    print(n)

  咱們認爲結果應該是0,可是結果可能不如人意,由於進程間共享數據的問題,多個進程同時修改共享數據時,因爲GIL的存在同一時刻只有1個線程在運行。當n的值爲1的時候,10個子線程頗有可能同時判斷成功,再要修改的時候被掛起(時間片用完),等到真正回來修改的時候,n已經被其餘線程改過來!因此若是要保持數據的正確性,那麼就須要犧牲性能,即便用鎖機制。orm

  1. 建立一個鎖,因爲進程內的線程共享進程數據,那麼不須要傳遞,就能夠直接調用
import threading
mutex = threading.Lock()
  1. 加鎖解鎖
    Lock內部實現了__enter__和__exit__的方法,因此咱們可使用兩種方式來加鎖或者解鎖。
# 調用方式一
mutex.acquire()    # 加鎖
'''code'''
mutex.release()    # 解鎖


# 調用方式二
with mutex:
    '''code'''   # 離開wit代碼段,自動解鎖

1.2.1 lock方法

名稱 含義
acquire(blocking=True,timeout=-1) 獲取鎖並加鎖
blocking:表示是否阻塞,默認爲True表示阻塞。
timeout:表示阻塞超時時間。
當blocking爲非阻塞時,timeout不能進行設置
release() 釋放鎖,能夠從任何線程上調用釋放.
已上鎖,調用時會釋放鎖,即重置爲unlocked。
未上鎖,調用時會拋出RuntimeError異常

因此,上面的例子能夠有以下修改:

import threading
import time

mutex = threading.Lock()
n = 10
def work():
    global n
    while True:
        mutex.acquire()
        if n > 0:
            time.sleep(1)
            n -= 1
            mutex.release()
        else:
            mutex.release()
            break

if __name__ == '__main__':
    t_l = []
    for i in range(10):
        t = threading.Thread(target=work)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    print(n)

在判斷的時候就開始加鎖,在修改完畢的時候解鎖。這樣加鎖的狀況下咱們發現運行時間變長了,那是由於只有搶到鎖的線程才能夠工做(穿行執行),

1.2.2 計數器

有下面一個計數器類,來看如何加鎖

import threading

class Counter:
    def __init__(self):
        self._value = 0

    @property
    def value(self):
        return self._value

    def inc(self):
        self._value += 1

    def dec(self):
        self._value -= 1

def calc(c:Counter):
    for _ in range(1000):
        for i in range(-50,50):
            if i < 0:
                c.dec()
            else:
                c.inc()

c = Counter()
lst = []
for i in range(10):
    t = threading.Thread(target=calc, args=(c,))
    lst.append(t)
    t.start()

for t in lst:
    t.join()

print(c.value)

        在須要調用和修改的地方加鎖,修改完畢後解鎖,是鎖使用的基本原則,通常來講,加鎖就要解鎖,可是加鎖和解鎖之間會有一些代碼要執行,若是出現異常,那麼鎖是沒法釋放的,可是當前線程已經終止了,這種狀況通常稱爲死鎖,能夠添加異常處理,來確保鎖必定被釋放。

import threading

mutex = threading.Lock()
class Counter:
    def __init__(self):
        self._value = 0

    @property
    def value(self):
        return self._value

    def inc(self):
        try:  # 添加異常處理,即使時崩潰也能夠釋放鎖
            mutex.acquire()
            self._value += 1
        finally:
            mutex.release()

    def dec(self):
        with mutex:  # 上下文管理寫法
            self._value -= 1

def calc(c:Counter):
    for _ in range(1000):
        for i in range(-50,50):
            if i < 0:
                c.dec()
            else:
                c.inc()

c = Counter()
lst = []
for i in range(10):
    t = threading.Thread(target=calc, args=(c,))
    lst.append(t)
    t.start()

for t in lst:
    t.join()

固然這裏也能夠爲每個計數器實例對象初始化一個本身的鎖,若是用全局鎖,那麼不一樣的計數器實例,會相互影響(由於多個實例,共享一把鎖),由於不一樣實例的結果是不一樣的,因此建議爲每一個實例構建一個本身的鎖。

class Counter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    @property
    def value(self):
        return self._value

    def inc(self):
        try: 
            self._lock.acquire()
            self._value += 1
        finally:
            self._lock.release()

    def dec(self):
        with self._lock:  
            self._value -= 1

1.2.3 非阻塞鎖

當lock.acquire(False)時,該鎖就是非阻塞鎖了,調用時,僅獲取一次,若是獲取到那麼返回True,不然返回False。

import time
import threading
import random
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s %(thread)s %(message)s')

def worker(tasks:list):
    for task in tasks:
        if task.lock.acquire(False):  # 搶到任務
            time.sleep(random.randrange(1,3))  # 模擬執行任務須要的時間
            logging.info('{} I am working for {}'.format(threading.current_thread().name, task))
        else:  # 沒有搶到任務
            logging.info('{} not get {}'.format(threading.current_thread().name,task))

class Task:
    def __init__(self,name):
        self.task = name
        self.lock = threading.Lock()

    def __repr__(self):
        return self.task

    __str__ = __repr__

if __name__ == '__main__':
    task_list = [Task('task{}'.format(x)) for x in range(1,11)]   # 構建任務列表
    for _ in range(10):
        threading.Thread(target=worker,args=(task_list,)).start()  # 開啓多線程執行任務

10個任務,交給10個線程運行,誰搶到哪一個線程,誰就運行。

1.2.4 鎖應用場景

        鎖適用於訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候。但若是都是讀取的話,就不須要了。使用鎖的時候有一下幾點須要特別注意:

  1. 少於鎖,必要時用鎖,使用了鎖,多線程訪問被被鎖資源時,就成了穿行的了,要麼排隊,要麼爭搶執行。
  2. 加鎖時間越短越好,不須要時當即釋放鎖。
  3. 必定要避免死鎖。
相關文章
相關標籤/搜索