淺析Python多線程

學習Python多線程的資料不少,吐槽Python多線程的博客也很多。本文主要介紹Python多線程實際應用,且假設讀者已經瞭解多線程的基本概念。若是讀者對進程線程概念不甚瞭解,可參見知名博主 阮一峯 轉譯的一篇博客:《進程與線程的一個簡單解釋》html

1 線程的基本操做

Python中多線程主要有兩個模塊,_thread和threading模塊。前者更底層,後者更經常使用,能知足絕大部分編程需求,今天主要圍繞threading模塊展開介紹。啓動一個線程須要用threading模塊中的Thread。python

線程的啓動須要先建立Thread對象,而後調用該對象的start()方法,參見下例:算法

import time
import threading

def func(n):
    while n > 0:
        print("線程name:", threading.current_thread().name, "參數n:", n)
        n -= 1
        time.sleep(1)

t = threading.Thread(target=func, args=(5,))
t.start()
print("主線程:", threading.current_thread().name)
# 運行結果:
# 線程name: Thread-1 參數n: 5
# 主線程: MainThread
# 線程name: Thread-1 參數n: 4
# 線程name: Thread-1 參數n: 3
# 線程name: Thread-1 參數n: 2
# 線程name: Thread-1 參數n: 1

上例中,threading.current_thread().name 是獲取當前線程的name屬性。數據庫

Thread中,形參target傳入函數名,args傳入函數對應的參數,參數必須是可迭代對象,若是是元組且只有一個參數必須寫成(參數,)的形式,逗號不能省略編程

一旦啓動一個線程,該線程將由操做系統來全權管理,獨立執行直到目標函數返回。通常狀況下,線程的操做有如下幾種:安全

t.is_alive()    # 查詢線程對象的狀態,返回布爾值
t.join()        # 將線程加入到當前線程,並等待其終止

t = Thread(target=countdown, args=(10,), daemon=True)  # 後臺線程
t.start()  

查看線程狀態示例:服務器

import time
import threading

def func(n):
    while n > 0:
        print("線程name:", threading.current_thread().name, "參數n:", n)
        n -= 1
        time.sleep(1)

t = threading.Thread(target=func, args=(2,))
t.start()
print("主線程:", threading.current_thread().name)

if t.is_alive():
    print("活着的")
else:
    print("未存活")
print("主線程結束")

讓主線程等待其餘線程,就是主線程會在join()處一直等待全部線程都結束以後,再繼續運行。參見下例:多線程

import time
import threading

def func(n):
    while n > 0:
        print("線程name:", threading.current_thread().name, "參數n:", n)
        n -= 1
        time.sleep(1)

t = threading.Thread(target=func, args=(2,))
t.start()
t.join()
print("主線程:", threading.current_thread().name)
print("主線程結束")
# 運行結果:
# 線程name: Thread-1 參數n: 2
# 線程name: Thread-1 參數n: 1
# 主線程: MainThread
# 主線程結束

後臺線程參見下例:app

import time
import threading

def func(n):
    while n > 0:
        print("參數n:", n)
        n -= 1
        time.sleep(1)

t = threading.Thread(target=func, args=(10, ), daemon=True)
t.start()
time.sleep(3)
print("主線程結束")

# 參數n: 10
# 參數n: 9
# 參數n: 8
# 參數n: 7
# 主線程結束

 

後臺線程沒法等待,但主線程終止時後臺線程自動銷燬。 若是要對線程進行高級操做,如發送信號終止線程,都須要本身實現。下例經過輪詢控制線程退出異步

import time
from threading import Thread

class StopThread:
    def __init__(self):
        self._flag = True

    def terminate(self):
        self._flag = False

    def run(self, n):
        while self._flag and n > 0:
            print('num>>:', n)
            n -= 1
            time.sleep(1)

obj = StopThread()
t = Thread(target=obj.run, args=(11,))
t.start()

time.sleep(5)    # 表示do something

obj.terminate()  # 終止線程
t.join()
print("主線程結束")

上例經過類中的_flag控制線程的終止,當主線程執行5秒以後,主動將_flag賦值爲False終止線程。經過輪詢終止線程存在一個問題,若是while self._flag and n > 0:這句後,某次循環一直阻塞在I/O操做上,根本不會進行下一次循環,天然就沒法終止。這該怎麼辦呢?留一個思考題。

多線程還能夠經過繼承Thread實現,以下:

import time
from threading import Thread

class A(Thread):
    def __init__(self,):
        super().__init__()

    def run(self):
        print("run1..", )
        time.sleep(5)
        print("run2..")

obj = A()
obj.start()
print("主線程結束")

2 線程鎖和一個怪象

當咱們用多個線程同時修改同一份數據時,怎麼保證最終結果是咱們期許的呢?舉個例子,當前有一個全局變量a=0,若是有10個線程同時對其加1,這就出現了線程間的競爭,到底應該聽誰的呢?這時候,應該用線程鎖來解決。也就是當某一個線程A對該數據操做時,對該數據加鎖,其餘線程只能等着,等待A操做完以後釋放了該鎖,其餘線程才能操做該數據,一旦某個線程得到操做數據的權限,當即又加上鎖。如此便能保證數據的安全準確。奇怪的是,在Python3中,即便不加鎖,好像也不會發生數據出錯的狀況。或許這個例子不是很好,也或許是Python3中自動加了鎖。但願有知道的讀者賜教一下。這個奇怪的現象就是下例了:

from threading import Thread
import time

def add_one(a):
    time.sleep(1)
    print("in thread a:", a)
    a[1] += 1

if __name__ == '__main__':
    array = [0, 1, 4]
    thread_obj_list = []

    for i in range(50):
        t = Thread(target=add_one, args=(array,))
        t.start()
        thread_obj_list.append(t)

    for j in thread_obj_list:
        j.join()

    print("array result::", array)
    # array result:: [0, 51, 4]  

咱們看到,最後array的第二個元素是51,並無出錯,這真是使人費解。好了,言歸正傳,來看看線程鎖的幾個方法吧:

lock = threading.Lock()     # Lock對象
lock.acquire()              # 鎖定
lock.release()              # 解鎖

Lock有「鎖定」或「解鎖」兩種狀態之一。它是在解鎖狀態下建立的。它有兩個基本方法,acquire() 和 release()
當狀態爲解鎖時,acquire()將狀態更改成鎖定並當即返回。當狀態被鎖定時,acquire()塊直到對另外一個協程中的release()的調用將其改變爲解鎖,而後acquire()調用將其重置爲鎖定並返回。
release()方法只應在鎖定狀態下調用;它將狀態更改成已解鎖並當即返回。若是嘗試釋放已解鎖的鎖,則會引起 RuntimeError。

下面是一個具體的使用例子:

from threading import Thread
import time
import threading

lock = threading.Lock()

def add_one(a):
    time.sleep(1)
    lock.acquire()
    a[1] += 1
    lock.release()

if __name__ == '__main__':
    array = [0, 1, 4]
    thread_obj_list = []

    for i in range(50):
        t = Thread(target=add_one, args=(array,))
        t.start()
        thread_obj_list.append(t)

    for j in thread_obj_list:
        j.join()

    print("array result::", array)
    # array result:: [0, 51, 4]  

acquire()和release()方法成對出現。可是這樣手動釋放有時候可能會遺忘,這時候能夠考慮用上下文管理協議。關於上下文管理協議,可參見做者的這篇文章【Python上下文管理器】。

Lock對象支持with語句:

def add_one(a):
    time.sleep(1)
    with lock:
        a[1] += 1 

3 遞歸鎖 

可重入鎖(又稱遞歸鎖,RLock),就是大鎖中包含子鎖的狀況下使用。在這種狀況下,再用Lock時,就會出現死鎖現象,此時應該用threading.RLock()對象了,用法同Lock,參見下例:

from threading import Thread
import time
import threading

lock = threading.RLock()

def add_one(a):
    lock.acquire()
    a[1] += 1
    lock.release()

def add_two(b):
    time.sleep(1)
    lock.acquire()
    b[1] += 2
    add_one(b)
    lock.release()

if __name__ == '__main__':
    array = [0, 1, 4]
    thread_obj_list = []

    for i in range(50):
        t = Thread(target=add_two, args=(array,))
        t.start()
        thread_obj_list.append(t)

    for j in thread_obj_list:
        j.join()

    print("array result::", array)
    # array result:: [0, 151, 4]  

上例讀者能夠試試Lock(),看看什麼效果。RLock()還支持上下文管理協議,上例中的兩個函數能夠改爲這樣:

def add_one(a):
    with rlock:
        a[1] += 1

def add_two(b):
    time.sleep(1)
    with rlock:
        b[1] += 2
        add_one(b)

4 GIL

全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),是計算機程序設計語言解釋器用於同步線程的一種機制,它使得任什麼時候刻僅有一個線程在執行。因此不少人說Python的線程是假線程,並能利用多核,並不能真正並行。之因此感受到線程並行,是由於線程上下文不斷切換的緣故。Python 3.2開始使用新的GIL。新的GIL實現中用一個固定的超時時間來指示當前的線程放棄全局鎖。在當前線程保持這個鎖,且其餘線程請求這個鎖時,當前線程就會在5毫秒後被強制釋放該鎖。關於全局鎖,強調三點:

(1)GIL的存在,同一時刻只能有一個線程在運行。

(2)GIL是CPython的特性,Jython,pypy等並沒有GIL。

(3)Cpython的多線程適用於I/O密集型問題,計算密集型問題可以使用多進程編程。  

5 判斷線程狀態

在多線程編程中,有時候某個線程依賴另外一個線程的狀態,須要使用threading庫中的Event對象。 Event對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。可將線程設置等待Event對象, 直到有其餘線程將Event對象設置爲真,這些等待Event對象的線程將開始執行。Event()對象的經常使用方法:

event = threading.Event()   # 建立threading.Event()對象

event.is_set()   # 獲取event的設置值,默認爲False
event.set()      # 設置event的值爲True
event.clear()    # 設置event的值爲False
event.wait()     # 等到event的值被設爲True就執行

下面經過「交通訊號燈」問題示範event的使用:

import threading
import time

def traffic_light(event):
    count = 0
    event.set()
    while True:
        # 若是計數器[0, 5)之間, 紅燈,event=False
        if 0 <= count < 5:
            event.clear()
            print("light is Red")
        # 若是計數器[5, 10)之間, 綠燈,event=True
        elif 5 <= count < 10:
            event.set()
            print("light is Green")
        # 若是計數器大於10,紅燈,將event設置爲False,計數器置爲0
        else:
            event.clear()
            count = 0
        time.sleep(1)
        count += 1

def car(name, event):
    while True:
        if not event.is_set():
            # event爲False, 表示紅燈, 車只能等待
            print("RED, the %s is waiting..." % name)
            # 此處會阻塞住,直到event被設置爲True在執行
            event.wait()
            print("Green, The %s going...." % name)

e = threading.Event()
light = threading.Thread(target=traffic_light, args=(e,))
light.start()
car1 = threading.Thread(target=car, args=("Tesla", e, ))
car1.start()

交通訊號燈有紅燈和綠燈兩種狀態,每5秒切換一次狀態,而car()函數中,只要燈變綠就放car通行。運行試試看。

event對象的一個重要特色是當它被設置爲真時會喚醒全部等待它的線程。若是你只想喚醒單個或者必定數目的線程,最好是使用信號量或者 Condition 對象來替代。

Condition對象

 condition對象老是與鎖關聯,能夠手動傳入鎖對象,也能夠不傳入使用默認值。當有多個線程須要等待某個變量改變時,纔開始執行。這種狀況能夠用condition對象實現。condition對象的主要方法有:

condition = threading.Condition(lock=None)   # 建立Condition對象  參數能夠不傳

condition.acquire()    # 加鎖
condition.release()    # 解鎖

condition.wait(timeout=None)                 # 阻塞,直到有調用notify(),或者notify_all()時再觸發
condition.wait_for(predicate, timeout=None)  # 阻塞,等待predicate條件爲真時執行

condition.notify(n=1)        # 通知n個wait()的線程執行, n默認爲1
condition.notify_all()       # 通知全部wait着的線程執行

with condition:              # 支持with語法,沒必要每次手動調用acquire()/release() 

看一個例子不是很優雅的例子:

import threading
import time

condition = threading.Condition()    # 建立condition對象

def func():
    condition.acquire()    # 若是沒有with語句,必寫這句,否者報錯
    condition.wait()       # 阻塞,等待其餘線程調用notify()
    print("in func..")
    condition.release()    # 與acquire()成對出現

# 啓10個線程
for i in range(10):
    t = threading.Thread(target=func, args=())
    t.start()

time.sleep(5)

condition.acquire()
condition.notify(2)        # 通知兩個線程執行
condition.release()

# in func..
# in func..
# 其餘8個線程會繼續等待... 

上例中,咱們看到啓動的10個線程會等待5秒鐘而且調用了notify(2)以後,纔會通知兩個線程繼續運行。且這兩個線程執行完畢以後,其餘8個線程仍然會阻塞在condition.wait() 處。

頻繁寫acquire() / release()很繁瑣,下面是優雅的寫法:

import threading
import time

condition = threading.Condition()    # 建立condition對象

def func(n):
    with condition:            # with更優雅
        condition.wait()       # 阻塞,等待其餘線程調用notify()
        print("in func..", n)


# 啓10個線程
for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()

time.sleep(5)

with condition:
    condition.notify_all()        # 通知全部線程執行 

運行下,是否是等待5秒以後,全部線程都繼續執行了?

7 信號量

信號量一般用於防範容量有限的資源,例如數據庫服務器。通常而言信號量能夠控制釋放固定量的線程。好比啓動100個線程,信號量的控制值設爲5,那麼前5個線程拿到信號量以後,其他線程只能阻塞,等到這5個線程釋放信號量鎖以後才能去拿鎖。參見下例:

import threading
import time

def func(n):
    # semaphore.acquire()
    with semaphore:
        time.sleep(2)
        print("Thread::", n)
    # semaphore.release()

semaphore = threading.BoundedSemaphore(5)   # 信號量, 每次釋放5個線程

thread_list = []
for i in range(23):
    t = threading.Thread(target=func, args=(i,))
    thread_list.append(t)
    t.start()

for j in thread_list:
    j.join()

print("all threads done") 

上例中,能夠看到線程是每5個一組進行釋放的。 

8 Barrier對象

Barriers字面意思是「屏障」,是Python線程(或進程)同步原語。每一個線程中都調用wait()方法,當其中一個線程執行到wait方法處會立阻塞;一直等到全部線程都執行到wait方法處,全部線程再繼續執行。參見下例:

import time
import threading

bar = threading.Barrier(3)  # 建立barrier對象,指定知足3個線程

def worker1():
    print("worker1")
    time.sleep(1)
    bar.wait()
    print("worker1 end")

def worker2():
    print("worker2")
    time.sleep(2)
    bar.wait()
    print("worker2 end")

def worker3():
    print("worker3")
    time.sleep(5)
    bar.wait()
    print("worker3 end")


thread_list = []
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t3 = threading.Thread(target=worker3)
thread_list.append(t1)
thread_list.append(t2)
thread_list.append(t3)

for t in thread_list:
    t.start()

# 每一個線程中都調用了wait()方法,在全部(此處設置爲3)線程調用wait方法以前是阻塞的。
# 也就是說,只有等到3個線程都執行到了wait方法這句時,全部線程才繼續執行。  

上例中,能夠看到,全部線程會先各自運行wait()方法以前的代碼,到wait()處阻塞。等待最後一個線程執行到wait()處,也就是5秒以後,全部線程恢復執行。

9 線程間通訊

兩個或多個線程之間相互發送數據最安全的方式可能就是使用 queue 庫中的隊列了。建立一個線程共享的 Queue 對象,線程經過使用 put()和 get()操做來向隊列中添加或者刪除元素。Queue對象已經內置了鎖機制,編程時沒必要手動操做鎖。下例producer()函數表明包子鋪,生產包子放入隊列中;consumer()函數表明吃包子的人,不斷從隊列中取出包子吃掉;以此演示線程間經過隊列通訊。

from queue import Queue
import threading
import time

q = Queue(10)

def producer():
    n = 0
    while True:
        q.put("包子%s" % n)
        print("包子鋪生產 包子%s" % n)
        n += 1
        time.sleep(2)

def consumer():
    while True:
        r = q.get()
        print("bucker 吃掉 %s" % r)
        time.sleep(1)

t1 = threading.Thread(target=producer)
t1.start()
t2 = threading.Thread(target=consumer)
t2.start()  

形如上例的編程模型,又叫生產者-消費者模型。它下降了程序以前的耦合,使得隊列的上游只關注生產數據,隊列的下游只關注消費數據。在票務系統,或者資源有限的狀況中可用此模型。補充兩點:

(1)get() 和 put() 方法都支持非阻塞方式和設定超時。
(2)q.qsize() , q.full() , q.empty() 等能夠獲取一個隊列的當前大小和狀態。但它們不是線程安全的,儘可能別用

10 線程池

Python3.2開始,增長了標準庫concurrent.futures,該庫中的ThreadPoolExecutor是自帶的線程池。簡單使用:

from concurrent.futures import ThreadPoolExecutor
import time

def tell(i):
    print("this is tread {}.".format(i))
    time.sleep(1)

if __name__ == '__main__':
    future = ThreadPoolExecutor(10)
    a = "ddd"
    for i in range(100):
        future.submit(tell, (i,))   # 添加一個線程到線程池
    future.shutdown(wait=True)      # 此函數用於釋放異步執行操做後的系統資源。

其中,submit()方法第一個參數爲函數名,第二個爲函數的參數。shutdown(wait=True)用於釋放異步執行操做後的系統資源。ThreadPoolExecutor還有一個優勢就是:任務提交者更方便的從被調用函數中獲取返回值。參見下例:

import concurrent.futures
import requests

URLS = ['http://www.cnblogs.com/zingp/p/5878330.html',
        'http://www.cnblogs.com/zingp/',
        'https://docs.python.org/']

# 爬取網頁內容
def load_url(url, timeout):
    with requests.get(url, timeout=timeout) as conn:
        return conn.text

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 建立future對象和對應的url的字典
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as err:
            print('url:%s -- err: %s' % (url, err))
        else:
            print(url, len(data))
            
# http://www.cnblogs.com/zingp/ 12391
# http://www.cnblogs.com/zingp/p/5878330.html 90029
# https://docs.python.org/ 9980   

上例建立一個大小爲3的線程池,用了很多with語句,並用future.result() 獲取函數返回值。最終,咱們看到爬取了三個網頁,並得到網頁內容。future.result()操做會阻塞,直到對應的函數執行完成並返回一個結果。

此外,ThreadPoolExecutor還提供了異步回調的功能,大大簡化了多線程編程中處理線程返回結果的難度,參見下例:

from concurrent.futures import ThreadPoolExecutor
import time
import threading

def tell(i):
    print("this is tread {}.".format(i))
    time.sleep(1)
    return [i, threading.get_ident()]   # 必須有返回,經過.result()拿到返回值

def callback(obj): 
    # obj 至關於傳過來的future獨享,且回調函數必須有這個參數
    result = obj.result()    # 線程函數的返回值
    print(result)

if __name__ == '__main__':
    future = ThreadPoolExecutor(10)
    a = "ddd"
    for i in range(100):
        # 線程運行結束後將future對象傳給回調函數callback(obj)
        future.submit(tell, i,).add_done_callback(callback)   
    future.shutdown(wait=True)      # 此函數用於釋放異步執行操做後的系統資源。

Python3.2之前並無自帶線程池,那時每每採用自定義線程池。下面一個就是自定義線程池的例子,看看是否可以看得懂:

import queue
import threading
import contextlib

StopEvent = object()

class ThreadPool(object):
    """定義一個線程池類。"""

    def __init__(self, max_num, max_task_num=None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務。
        :param func: 任務函數;
        :param args: 任務函數所需參數;
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;
                            二、任務函數返回值(默認爲None,即:不執行回調函數);
        :return: 若是線程池已經終止,則返回True不然None。
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個線程。
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數。
        """
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:
            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:
            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,全部線程中止。
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程。
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數。
        """
        state_list.append(worker_thread)
        try:
            # 遇到yield就返回回去執行with中的語句,執行完了回來。
            yield
        finally:
            state_list.remove(worker_thread)  

建立大的線程池的一個可能須要關注的問題是內存的使用。 例如,若是你在OS X系統上面建立2000個線程,系統顯示Python進程使用了超過9GB的虛擬內存。 不過,這個計算一般是有偏差的。當建立一個線程時,操做系統會預留一個虛擬內存區域來 放置線程的執行棧(一般是8MB大小)。可是這個內存只有一小片斷被實際映射到真實內存中。 所以,Python進程使用到的真實內存其實很小 (好比,對於2000個線程來說,只使用到了70MB的真實內存,而不是9GB)。若是擔憂虛擬內存大小,可使用 threading.stack_size() 函數來下降它。

import threading
threading.stack_size(65536)

若是加上這條語句並再次運行前面的建立2000個線程試驗, 會發現Python進程只使用到了大概210MB的虛擬內存,而真實內存使用量沒有變。 注意線程棧大小必須至少爲32768字節,一般是系統內存頁大小(409六、8192等)的整數倍。

11 補充幾個概念

同步的定義是:在發出一個功能調用時,在沒有獲得結果以前,該調用就不返回,同時其它線程也不能調用這個方法。按照這個定義,其實絕大多數函數都是同步調用。
可是一般說進程、線程同步,每每特指多進程、線程編程時,多個進程、線程之間協同步調,按預約的前後次序進行運行。好比線程A和線程B一塊兒配合,A執行到必定程度依賴B的某個結果,因而停下來示意B運行,B開始執行,執行完將結果返回給A,A接着執行。這裏的「同」應該理解爲協同、協助、互相配合。
在多線程編程裏面,一些敏感數據不容許被多個線程同時訪問,此時就使用同步訪問技術,保證數據在任什麼時候刻,最多有一個線程訪問,以保證數據的完整性。

原語:前文說起原語,不少同窗可能不瞭解這個名詞的意思。內核或微核提供核外調用的過程或函數稱爲原語(primitive)。操做系統用語範疇。是由若干多機器指令構成的完成某種特定功能的一段程序,具備不可分割性。即原語的執行必須是連續的,在執行過程當中不容許被中斷。不一樣層次之間對話的語言稱爲原語,即不一樣層之間經過原語來實現信息交換。

 

12 小結與討論

(1)Python多線程編程經常使用threading模塊。啓動一個多線程須要建立一個Thread對象,調用star()方法啓動線程。注意is_alive() /join()方法和daemon參數的使用。
(2)python多線程鎖有Lock / Rlock, 全局鎖GIL。GIL是CPython特性,同一時刻只能運行一個線程,不能利用多核資源。
(3)線程同步原語有Event / Condition / Semaphore / Barrier。Event用於經常使用語通知所有線程,condition和Semapher經常使用於通知必定數量的線程, Barrier用於多個線程必須完成某些步驟再一塊兒執行。
(4)Lock / Rlock / Event / Condition / Semaphore 支持上下文管理協議(with語句,好用)。
(5)線程間通訊能夠用queue模塊中的Queue隊列,get()和put()已加鎖,是線程安全的。qsize()/full()/empty() 等能夠獲取一個隊列的當前大小和狀態, 不是線程安全的,儘可能別用。
(6)concurrent.futures中的ThreadPoolExecutor是Python3.2以後自帶的線程池模塊,十分好用,支持with語句,經過future.result()獲取線程返回值。
(7)Python多線程適用於I/O密集型問題,CPU密集型問題能夠用C代碼優化底層算法提高性能,需注意一個寫的很差的C語言擴展會致使這個問題更加嚴重;也能夠用pypy或者多進程。

以上是本篇所有內容,歡迎讀者批評指正。

參考資料:

threading 官方文檔

concurrent.futures 官方文檔

Python3-cookbook 中文文檔

相關文章
相關標籤/搜索