併發程序含有多個邏輯上的獨立執行塊,他們能夠獨立的並行執行,也能夠串行執行。 並行程序解決問題的速度比串行程序快的多,由於其能夠同時執行整個任務的多個部分。並行程序可能有多個獨立執行塊,也可能只有一個。node
引用Rob Pike的經典描述就是: 併發是同一時間應對多件事情的能力; 並行是同一時間動手作多件事情的能力。python
常見的併發模型有:git
這篇主要介紹線程與鎖模型github
線程與鎖模型是對底層硬件運行過程的形式化,很是簡單直接,幾乎全部的編程語言都對其提供了支持,且不對其使用方法加以限制(易出錯)。算法
這篇文章主要使用python語言來演示線程與鎖模型。文章結構來自《七週七併發模型》編程
from threading import Thread
def hello_world():
print("Hello from new thread")
def main():
my_thread = Thread(target=hello_world)
my_thread.start()
print("Hello from main thread")
my_thread.join()
main()
複製代碼
這段代碼建立並啓動了一個Thread
實例,首先從start()
開始,my_thread.start() main()
函數的餘下部分一塊兒併發執行。最後調用join()
來等待my_thread
線程結束。緩存
運行這段代碼輸出結果有幾種:安全
Hello from new thread
Hello from main thread
複製代碼
或者bash
Hello from main thread
Hello from new thread
複製代碼
或者多線程
Hello from new threadHello from main thread
複製代碼
究竟哪一個結果取決於哪一個線程先執行print()
。多線程編程很難的緣由之一就是運行結果可能依賴於時序,屢次運行結果並不穩定。
from threading import Thread, Lock
class Counter(object):
def __init__(self, count=0):
self.count = count
def increment(self):
self.count += 1
def get_count(self):
print("Count: %s" % self.count)
return self.count
def test_count():
counter = Counter()
class CounterThread(Thread):
def run(self):
for i in range(10000):
counter.increment()
t1 = CounterThread()
t2 = CounterThread()
t1.start()
t2.start()
t1.join()
t2.join()
counter.get_count()
test_count()
複製代碼
這段代碼建立一個簡單的類Counter 和兩個線程,每一個線程都調用counter.increment() 10000次。
屢次運行這段代碼會獲得不一樣的值,緣由是兩個線程在使用 counter.count 時發生了競態條件
(代碼行爲取決於各操做的時序)。
一個可能的操做是:
競態條件的解決方法是對 count 進行同步(synchronize)訪問。一種操做是使用 內置鎖
(也稱互斥鎖(mutex)、管程(monitor)或臨界區(critical section))來同步對increment() 的調用。
線程同步可以保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖爲資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態爲「鎖定」,其餘線程不能更改; 直到該線程釋放資源,將資源的狀態變成「非鎖定」,其餘的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操做,從而保證了多線程狀況下數據的正確性。
當一個線程調用鎖的acquire()方法得到鎖時,鎖就進入「locked」狀態。每次只有一個線程能夠得到鎖。若是此時另外一個線程試圖得到這個鎖,該線程就會變爲「blocked」狀態,稱爲「同步阻塞」。 直到擁有鎖的線程調用鎖的release()方法釋放鎖以後,鎖進入「unlocked」狀態。線程調度程序從處於同步阻塞狀態的線程中選擇一個來得到鎖,並使得該線程進入運行(running)狀態。
python 鎖的使用流程以下:
#建立鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([timeout])
#釋放
mutex.release()
複製代碼
推薦使用上下文管理器來操做鎖,
with lock:
do someting
# 至關於
lock.acquire()
try:
# do something...
finally:
lock.release()
複製代碼
acquire(blocking=True, timeout=-1) 能夠阻塞或非阻塞地得到鎖。 當調用時參數 blocking 設置爲 True (缺省值),阻塞直到鎖被釋放,而後將鎖鎖定並返回 True 。 在參數 blocking 被設置爲 False 的狀況下調用,將不會發生阻塞。若是調用時 blocking 設爲 True 會阻塞,並當即返回 False ;不然,將鎖鎖定並返回 True。 當浮點型 timeout 參數被設置爲正值調用時,只要沒法得到鎖,將最多阻塞 timeout 設定的秒數。timeout 參數被設置爲 -1 時將無限等待。當 blocking 爲 false 時,timeout 指定的值將被忽略。 若是成功得到鎖,則返回 True,不然返回 False (例如發生 超時 的時候)。 timeout 參數須要
python3.2+
from threading import Thread, Lock
mutex = Lock()
class SynchronizeCounter(object):
def __init__(self, count=0):
self.count = count
def increment(self):
# if mutex.acquire(1): # 獲取鎖
# self.count += 1
# mutex.release() # 釋放鎖
# 等同於上述代碼
with mutex:
self.count += 1
def get_count(self):
print("Count: %s" % self.count)
return self.count
def test_synchronize_count():
counter = SynchronizeCounter()
class CounterThread(Thread):
def run(self):
for i in range(100000):
counter.increment()
t1 = CounterThread()
t2 = CounterThread()
t1.start()
t2.start()
t1.join()
t2.join()
counter.get_count()
if __name__ == "__main__":
for i in range(100):
test_synchronize_count()
複製代碼
這段代碼還有一個隱藏的bug,那就是 get_count(),這裏get_count() 是在join()以後調用的,所以是線程安全的,可是若是在其它地方調用了 get_count() 函數。 因爲在 get_count() 中沒有進行線程同步,調用時可能會獲取到一個失效的值。
對於JAVA等競態編譯語言,
更糟糕的是,有時一個線程產生的修改可能會對另外一個線程不可見。
從直覺上來講,編譯器、JVM、硬件都不該插手修改本來的代碼邏輯。可是近幾年的運行效率提高,尤爲是共享內存交媾的運行效率提高,都仰仗於此類代碼優化。 具體的反作用,Java 內存模型有明確說明。 Java 內存模型定義了什麼時候一個線程對內存的修改對另外一個線程可見。
基本原則是:若是讀線程和寫線程不進行同步,就不能保證可見性。
一個重點: 兩個線程都須要進行同步。只在其中一個線程進行同步是不夠的。
可若是全部的方法都同步,大多數線程可能都會被阻塞,失去了併發的意義,而且可能會出現死鎖。
哲學家就餐問題
:假設有五位哲學家圍坐在一張圓形餐桌旁,作如下兩件事情之一:吃飯,或者思考。吃東西的時候,他們就中止思考,思考的時候也中止吃東西。餐桌中間有一大碗意大利麪,每兩個哲學家之間有一隻餐叉。由於用一隻餐叉很難吃到意大利麪,因此假設哲學家必須用兩隻餐叉吃東西。他們只能使用本身左右手邊的那兩隻餐叉。
哲學家歷來不交談,這就很危險,可能產生死鎖,每一個哲學家都拿着左手的餐叉,永遠都在等右邊的餐叉(或者相反)。 即便沒有死鎖,也有可能發生資源耗盡。例如,假設規定當哲學家等待另外一隻餐叉超過五分鐘後就放下本身手裏的那一隻餐叉,而且再等五分鐘後進行下一次嘗試。這個策略消除了死鎖(系統總會進入到下一個狀態),但仍然有可能發生「活鎖」。若是五位哲學家在徹底相同的時刻進入餐廳,並同時拿起左邊的餐叉,那麼這些哲學家就會等待五分鐘,同時放下手中的餐叉,再等五分鐘,又同時拿起這些餐叉。
下面是哲學家進餐問題的一個實現:
import threading
import random
import time
class Philosopher(threading.Thread):
running = True
def __init__(self, xname, forkOnLeft, forkOnRight):
threading.Thread.__init__(self)
self.name = xname
self.forkOnLeft = forkOnLeft
self.forkOnRight = forkOnRight
def run(self):
while self.running:
# Philosopher is thinking (but really is sleeping).
time.sleep(random.uniform(1, 3))
print("%s is hungry." % self.name)
self.dine()
def dine(self):
fork1, fork2 = self.forkOnLeft, self.forkOnRight
while self.running:
fork1.acquire(True) # 阻塞式獲取left 鎖
# locked = fork2.acquire(True) # 阻塞式 獲取right 鎖 容易產生死鎖
locked = fork2.acquire(False) # 非阻塞式 獲取right 鎖
if locked:
break # 若是被鎖定,釋放 left 退出等待
fork1.release()
print("%s swaps forks" % self.name)
fork1, fork2 = fork2, fork1
else:
return
self.dining()
fork2.release()
fork1.release()
def dining(self):
print("%s starts eating " % self.name)
time.sleep(random.uniform(1, 5))
print("%s finishes eating and leaves to think." % self.name)
def DiningPhilosophers():
forks = [threading.Lock() for n in range(5)]
philosopherNames = ("Aristotle", "Kant", "Buddha", "Marx", "Russel")
philosophers = [
Philosopher(philosopherNames[i], forks[i % 5], forks[(i + 1) % 5])
for i in range(5)
]
Philosopher.running = True
for p in philosophers:
p.start()
for p in philosophers:
p.join()
time.sleep(100)
Philosopher.running = False
print("Now we're finishing.")
DiningPhilosophers()
複製代碼
規模較大的程序經常使用監聽器模式來解耦模塊,這裏咱們構造一個類從一個URL進行下載,Listeners 監聽下載進度。
import requests
import threading
class Listeners(object):
def __init__(self, count=0):
self.count = count
self.done_count = 0.0
self.listeners = []
def append(self, listener):
self.listeners.append(listener)
def remove(self, listener):
self.listeners.remove(listener)
def on_progress(self, n):
# 一些咱們不知道的實現
# do someting
# self.done_count += 1
# print("Process: %f" % (self.done_count / self.count))
pass
listeners = Listeners(5)
class Downloader(threading.Thread):
def __init__( self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None ):
threading.Thread.__init__(
self, group=group, target=target, name=name, daemon=daemon
)
self.url = kwargs.get("url")
def download(self):
resp = requests.get(self.url)
def add_listener(self, listener):
listeners.append(listener)
def remove_listener(self, listener):
listeners.delete(listener)
def update_progress(self, n):
for listener in listeners:
listner.on_progress(n)
def run(self):
self.download()
print(self.url)
listeners.on_progress(1)
def test():
urls = [
"https://www.baidu.com",
"https://www.google.com",
"https://www.bing.com",
"https://www.zaih.com",
"https://www.github.com",
]
ts = [Downloader(kwargs=dict(url=url)) for url in urls]
print(ts)
[t.start() for t in ts]
[t.join() for t in ts]
if __name__ == "__main__":
test()
複製代碼
這段代碼中,add_listener, remove_listener 和 update_progress 都是同步方法,但 update_progress 調用了一個咱們不知道如何實現的方法。若是這個方法中,獲取了一把鎖,程序在執行的過程當中就可能發生死鎖。因此,咱們要儘可能避免使用這種方法。還有一種方法是在遍歷以前對 listeners 進行保護性複製,再針對這份副本進行遍歷。(如今調用外星方法再也不須要加鎖)
Lock() 雖然方便,但限制不少:
重入鎖是(threading.RLock)一個能夠被同一個線程屢次獲取的同步基元組件。在內部,它在基元鎖的鎖定/非鎖定狀態上附加了 "所屬線程" 和 "遞歸等級" 的概念。在鎖定狀態下,某些線程擁有鎖 ; 在非鎖定狀態下, 沒有線程擁有它。
若要鎖定鎖,線程調用其 acquire() 方法;一旦線程擁有了鎖,方法將返回。若要解鎖,線程調用 release() 方法。 acquire()/release() 對能夠嵌套;只有最終 release() (最外面一對的 release() ) 將鎖解開,才能讓其餘線程繼續處理 acquire() 阻塞。
threading.RLock 提供了顯式的 acquire() 和 release() 方法 一個好的實踐是:
lock = threading.RLock()
複製代碼
Lock 和 RLock 的使用區別以下:
#rlock_tut.py
import threading
num = 0
lock = Threading.Lock()
lock.acquire()
num += 1
lock.acquire() # 這裏會被阻塞
num += 2
lock.release()
# With RLock, that problem doesn’t happen.
lock = Threading.RLock()
lock.acquire()
num += 3
lock.acquire() # 不會被阻塞.
num += 4
lock.release()
lock.release() # 兩個鎖都須要調用 release() 來釋放.
複製代碼
使用內置鎖時,阻塞的線程沒法被中斷,程序不能從死鎖恢復,能夠給鎖設置超時時間來解決這個問題。
timeout 參數須要 python3.2+
import time
from threading import Thread, Lock
lock1 = RLock()
lock2 = RLock()
# 這個程序會一直死鎖下去,若是想突破這個限制,能夠在獲取鎖的時候加上超時時間
# > python threading 沒有實現 銷燬(destroy),中止(stop),暫停(suspend),繼續(resume),中斷(interrupt)等
class T1(Thread):
def run(self):
print("start run T1")
lock1.acquire()
# lock1.acquire(timeout=2) # 設置超時時間可避免死鎖
time.sleep(1)
lock2.acquire()
# lock2.acquire(timeout=2) # 設置超時時間可避免死鎖
lock1.release()
lock2.release()
class T2(Thread):
def run(self):
print("start run T2")
lock2.acquire()
# lock2.acquire(timeout=2) # 設置超時時間可避免死鎖
time.sleep(1)
lock1.acquire()
# lock1.acquire(timeout=2) # 設置超時時間可避免死鎖
lock2.release()
lock1.release()
def test():
t1, t2 = T1(), T2()
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == "__main__":
test()
複製代碼
若是咱們要在鏈表中插入一個節點。一種作法是用鎖保護整個鏈表,但鏈表加鎖時其它使用者沒法訪問。交替鎖能夠只所追殺鏈表的一部分,容許不涉及被鎖部分的其它線程自由訪問。
from random import randint
from threading import Thread, Lock
class Node(object):
def __init__(self, value, prev=None, next=None):
self.value = value
self.prev = prev
self.next = next
self.lock = Lock()
class SortedList(Thread):
def __init__(self, head):
Thread.__init__(self)
self.head = head
def insert(self, value):
head = self.head
node = Node(value)
print("insert: %d" % value)
while True:
if head.value <= value:
if head.next != None:
head = head.next
else:
head.lock.acquire()
head.next = node
node.prev = head
head.lock.release()
break
else:
prev = head.prev
prev.lock.acquire()
head.lock.acquire()
if prev != None:
prev.next = node
else:
self.head = node
node.prev = prev
prev.lock.release()
node.next = head
head.prev = node
head.lock.release()
break
def run(self):
for i in range(5):
self.insert(randint(10, 20))
def test():
head = Node(10)
t1 = SortedList(head)
t2 = SortedList(head)
t1.start()
t2.start()
t1.join()
t2.join()
while head:
print(head.value)
head = head.next
if __name__ == "__main__":
test()
複製代碼
這種方案不只可讓多個線程併發的進行鏈表插入操做,還能讓其餘的鏈表操做安全的併發。
併發編程常常須要等待某個事件發生。好比從隊列刪除元素前須要等待隊列非空、向緩存添加數據前須要等待緩存有足夠的空間。條件變量就是爲這種狀況設計的。
條件變量老是與某種類型的鎖對象相關聯,鎖對象能夠經過傳入得到,或者在缺省的狀況下自動建立。當多個條件變量須要共享同一個鎖時,傳入一個鎖頗有用。鎖是條件對象的一部分,沒必要單獨地跟蹤它。
條件變量服從上下文管理協議:使用 with 語句會在它包圍的代碼塊內獲取關聯的鎖。 acquire() 和 release() 方法也能調用關聯鎖的相關方法。
其它方法必須在持有關聯的鎖的狀況下調用。 wait() 方法釋放鎖,而後阻塞直到其它線程調用 notify() 方法或 notify_all() 方法喚醒它。一旦被喚醒, wait() 方法從新獲取鎖並返回。它也能夠指定超時時間。
#condition_tut.py
import random, time
from threading import Condition, Thread
""" 'condition' variable will be used to represent the availability of a produced item. """
condition = Condition()
box = []
def producer(box, nitems):
for i in range(nitems):
time.sleep(random.randrange(2, 5)) # Sleeps for some time.
condition.acquire()
num = random.randint(1, 10)
box.append(num) # Puts an item into box for consumption.
condition.notify() # Notifies the consumer about the availability.
print("Produced:", num)
condition.release()
def consumer(box, nitems):
for i in range(nitems):
condition.acquire()
condition.wait() # Blocks until an item is available for consumption.
print("%s: Acquired: %s" % (time.ctime(), box.pop()))
condition.release()
threads = []
""" 'nloops' is the number of times an item will be produced and consumed. """
nloops = random.randrange(3, 6)
for func in [producer, consumer]:
threads.append(Thread(target=func, args=(box, nloops)))
threads[-1].start() # Starts the thread.
for thread in threads:
"""Waits for the threads to complete before moving on with the main script. """
thread.join()
print("All done.")
複製代碼
與鎖相比使用原子變量的優勢:
- 不會忘記在正確的時候獲取鎖
- 因爲沒有鎖的參與,對原子變量的操做不會引起死鎖。
- 原子變量時無鎖(lock-free)非阻塞(non-blocking)算法的基礎,這種算法能夠不用鎖和阻塞來達到同步的目的。
python 不支持原子變量
線程與鎖模型最大的優勢是適用面廣,更接近於「本質」--近似於對硬件工做方式的形式化--正確使用時效率高。 此外,線程與鎖模型也可輕鬆的集成到大多數編程語言。
[1] 哲學家進餐問題: zh.wikipedia.org/wiki/哲學家就餐問… [2] Let’s Synchronize Threads in Python: hackernoon.com/synchroniza…