關於我
編程界的一名小程序猿,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是咱們團隊的主要技術棧。html
在前面的文章中對Python
協程的概念和實現作了簡單地介紹。爲了對Python
併發編程有更加全面地認識,我也對Python
線程和進程的概念和相關技術的使用進行了學習,因而有了這篇文字。python
當咱們在手機或者PC
上打開一個應用時,操做系統就會建立一個進程實例,並開始執行進程裏的主線程,它有獨立的內存空間和數據結構。線程是輕量級的進程。在同一個進程裏,多個線程共享內存和數據結構,線程間的通訊也更加容易。數據庫
熟悉Java
編程的同窗就會發現Python
中的線程模型與Java
很是相似。下文咱們將主要使用Python
中的線程模塊threading
包。(對於低級別的API
模塊thread
不推薦初學者使用。本文全部代碼將使用Python 3.7
的環境)編程
要使用線程咱們要導入threading
包,這個包是在_thread
包(即上文提到的低級別thread
模塊)的基礎上封裝了許多高級的API
,在開發中應當首選threading
包。小程序
常見地,有兩種方式構建一個線程:經過Thread
的構造函數傳遞一個callable
對象,或繼承Thread
類並重寫run
方法。數據結構
import threading
import time
def do_in_thread(arg):
print('do in thread {}'.format(arg))
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1')
t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2')
t1.start()
t2.start()
# join方法讓主線程等待子線程執行完畢
t1.join()
t2.join()
print("\nduration {} ".format(time.time() - start_time))
# do in thread 1
# do in thread 2
# duration 2.001628875732422
複製代碼
還能夠經過繼承threading.Thread
類定義線程併發
import threading
import time
def do_in_thread(arg):
print('do in thread {}'.format(arg))
time.sleep(2)
class MyThread(threading.Thread):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self):
start_time = time.time()
do_in_thread(self.arg)
print("duration {} ".format(time.time() - start_time))
def start_thread_2():
start_time = time.time()
print("duration {} ".format(time.time() - start_time))
if __name__ == '__main__':
mt1 = MyThread(3)
mt2 = MyThread(4)
mt1.start()
mt2.start()
# join方法讓主線程等待子線程執行完畢
mt1.join()
mt2.join()
# do in thread 3
# do in thread 4
# duration 2.004937171936035
複製代碼
join
方法的做用是讓調用它的線程等待其執行完畢。app
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
複製代碼
定義線程時能夠經過指定構造方法的name
參數設置線程名稱。
target
用於指定callable
對象,將在run
方法中被調用。
args
設置target
對象被調用時的參數,類型是元組()
,例如上文中的do_in_thread(arg)
方法的參數。
kwargs
是一個字典類型的參數,也用於target
對象的參數。
daemon
設置守護線程的標識,若是設置爲True
那麼這個線程就是守護線程,此時若是主線程結束了,那麼守護線程也會當即被殺死。因此當有在守護線程中打開文件、數據庫等資源操做時,釋放資源就有可能出錯。dom
程序中如有大量的線程建立和銷燬,則對性能影響較大。咱們可使用線程池。一樣地,它的API
與Java
極爲類似。函數
concurrent.futures.Executor
複製代碼
這是一個抽象類,定義了線程池的接口。
submit(fn, *args, **kwargs)
future
對象,經過future
可獲取到執行結果map(func, *iterables, timeout=None, chunksize=1)
map(func,*iterables)
相似shutdown(wait=True)
from concurrent.futures import ThreadPoolExecutor
# 使用max_workers參數指定線程池中線程的最大數量爲2
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交任務到線程池
future = executor.submit(pow, 2, 31) # 計算2^31
future2 = executor.submit(pow, 1024, 2)
# 使用future 獲取執行結果
print(future.result())
print(future2.result())
# 執行結果
# 2147483648
# 1048576
複製代碼
如有多個線程對同一個資源或內存進行訪問或操做就有會產生競爭條件。
Python
提供了鎖、信號量、條件和事件等同步原語可幫忙咱們實現線程的同步機制。
Lock
有兩種狀態:locked
和unlocked
。它有兩個基本方法:acquire()
和release()
,且都是原子操做的。
一個線程經過acquire()
獲取到鎖,Lock
的狀態變成locked
,而其它線程調用acquire()
時只能等待鎖被釋放。 當線程調用了release()
時Lock
的狀態就變成了unlocked
,此時其它等待線程中只有一個線程將得到鎖。
import threading
share_mem_lock = 0
share_mem = 0
count = 1000000
locker = threading.Lock()
def add_in_thread_with_lock():
global share_mem_lock
for i in range(count):
locker.acquire()
share_mem_lock += 1
locker.release()
def minus_in_thread_with_lock():
global share_mem_lock
for i in range(count):
locker.acquire()
share_mem_lock -= 1
locker.release()
def add_in_thread():
global share_mem
for i in range(count):
share_mem += 1
def minus_in_thread():
global share_mem
for i in range(count):
share_mem -= 1
if __name__ == '__main__':
t1 = threading.Thread(target=add_in_thread_with_lock)
t2 = threading.Thread(target=minus_in_thread_with_lock)
t3 = threading.Thread(target=add_in_thread)
t4 = threading.Thread(target=minus_in_thread)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print("share_mem_lock : ", share_mem_lock)
print("share_mem : ", share_mem)
# 執行結果
# share_mem_lock : 0
# share_mem : 51306
複製代碼
沒有使用鎖機制的代碼執行後最後的值頗有可能就不爲0。而有鎖的代碼則能夠保證同步。
RLock
即Reentrant Lock
,就是能夠重複進入的鎖,也叫遞歸鎖。它有3個特色:
acquire
屢次acquire
多少次,對應release
就多少次,且最後一次release
纔會釋放鎖。條件是另外一種同步原語機制。其實它的內部是封裝了RLock
,它的acquire()
和release()
方法就是RLock
的方法。
Condition
經常使用的API
還有wait()
、notify()
和notify_all()
方法。 wait()
方法會釋放鎖,而後進入阻塞狀態直到其它線程經過notify()
或notify_all()
喚醒本身。wait()
方法從新獲取到鎖就會返回。
notify()
會喚醒其中一個等待的線程,而notify_all()
會喚醒全部等待的線程。
須要注意的是notify()
或notify_all()
執行後並不會釋放鎖,只有調用了release()
方法後鎖纔會釋放。
讓咱們看一個來自於《Python並行編程手冊》中的一個生產者與消費者例子
from threading import Thread, Condition
import time
items = []
condition = Condition()
class consumer(Thread):
def __init__(self):
Thread.__init__(self)
def consume(self):
global condition
global items
# 獲取鎖
condition.acquire()
if len(items) == 0:
# 當items爲空時,釋放了鎖,並等待生產者notify
condition.wait()
print("Consumer notify : no item to consume")
# 開始消費
items.pop()
print("Consumer notify : consumed 1 item")
print("Consumer notify : items to consume are " + str(len(items)))
# 消費以後notify喚醒生產者,由於notify不會釋放鎖,因此還要調用release釋放鎖
condition.notify()
condition.release()
def run(self):
for i in range(0, 10):
time.sleep(2)
self.consume()
class producer(Thread):
def __init__(self):
Thread.__init__(self)
def produce(self):
global condition
global items
condition.acquire()
if len(items) == 5:
# 若items時滿的,則執行wait,釋放鎖,並等待消費者notify
condition.wait()
print("Producer notify : items producted are " + str(len(items)))
print("Producer notify : stop the production!!")
# 開始生產
items.append(1)
print("Producer notify : total items producted " + str(len(items)))
# 生產後notify消費者,由於notify不會釋放鎖,因此還執行了release釋放鎖。
condition.notify()
condition.release()
def run(self):
for i in range(0, 10):
time.sleep(1)
self.produce()
if __name__ == "__main__":
producer = producer()
consumer = consumer()
producer.start()
consumer.start()
producer.join()
consumer.join()
複製代碼
信號量內部維護一個計數器。acquire()
會減小這個計數,release()
會增長這個計數,這個計數器永遠不會小於0。當計數器等於0時,acquire()
方法就會等待其它線程調用release()
。
仍是藉助一個生產者與消費者的例子來理解
# -*- coding: utf-8 -*-
"""Using a Semaphore to synchronize threads"""
import threading
import time
import random
# 默認狀況內部計數爲1,這裏設置爲0。
# 若設置爲負數則會拋出ValueError
semaphore = threading.Semaphore(0)
def consumer():
print("consumer is waiting.")
# 獲取一個信號量,由於初始化時內部計數設置爲0,因此這裏一開始時是處於等待狀態
semaphore.acquire()
# 開始消費
print("Consumer notify : consumed item number %s " % item)
def producer():
global item
time.sleep(2)
# create a random item
item = random.randint(0, 1000)
# 開始生產
print("producer notify : produced item number %s" % item)
# 釋放信號量, 內部計數器+1。當有等待的線程發現計數器大於0時,就會喚醒並從acquire方法中返回
semaphore.release()
if __name__ == '__main__':
for i in range(0, 5):
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print("program terminated")
複製代碼
信號量常常會用於資源容量肯定的場景,好比數據庫鏈接池等。
事件在線程間的通訊方式很是簡單。一個線程發送事件另外一個線程等待接收。
Event
對象內部維護了一個bool
變量flag
。經過set()
方法設置該變量爲True
,clear()
方法設置flag
爲False
。wait()
方法會一直等待直到flag
變成True
結合例子
# -*- coding: utf-8 -*-
import time
from threading import Thread, Event
import random
items = []
event = Event()
class consumer(Thread):
def __init__(self, items, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
while True:
time.sleep(2)
# 等待事件
self.event.wait()
# 開始消費
item = self.items.pop()
print('Consumer notify : %d popped from list by %s' % (item, self.name))
class producer(Thread):
def __init__(self, integers, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
global item
while True:
time.sleep(2)
# 開始生產
item = random.randint(0, 256)
self.items.append(item)
print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
print('Producer notify : event set by %s' % self.name)
# 發送事件通知消費者消費
self.event.set()
print('Produce notify : event cleared by %s ' % self.name)
# 設置事件內部變量爲False,隨後消費者線程調用wait()方法時,進入阻塞狀態
self.event.clear()
if __name__ == '__main__':
t1 = producer(items, event)
t2 = consumer(items, event)
t1.start()
t2.start()
t1.join()
t2.join()
複製代碼
定時器Timer
是Thread
的子類。用於處理定時執行的任務。啓動定時器使用start()
,取消定時器使用cancel()
。
from threading import Timer
def hello():
print("hello, world")
t = Timer(3.0, hello)
t.start() # 3秒後 打印 "hello, world"
複製代碼
Lock
、RLock
、Condition
和Semaphore
可使用with
語法。 這幾個對象都實現擁有acquire()
和release()
方法,且都實現了上下文管理協議。
with some_lock:
# do something...
複製代碼
等價於
some_lock.acquire()
try:
# do something...
finally:
some_lock.release()
複製代碼
本文主要介紹Python
中線程的使用,主要是對threading
模塊中Thread
對象、線程池Executor
常見用法的展現。還了解了線程的同步原語Lock
、RLock
、Condition
、Semaphore
、Event
以及Timer
等API
的使用。