python併發4:使用thread處理併發

這一篇是Python併發的第四篇,主要介紹進程和線程的定義,Python線程和全局解釋器鎖以及Python如何使用thread模塊處理併發html

引言&動機

考慮一下這個場景,咱們有10000條數據須要處理,處理每條數據須要花費1秒,但讀取數據只須要0.1秒,每條數據互不干擾。該如何執行才能花費時間最短呢?python

在多線程(MT)編程出現以前,電腦程序的運行由一個執行序列組成,執行序列按順序在主機的中央處理器(CPU)中運行。不管是任務自己要求順序執行仍是整個程序是由多個子任務組成,程序都是按這種方式執行的。即便子任務相互獨立,互相無關(即,一個子任務的結果不影響其它子 任務的結果)時也是這樣。linux

HUGOMORE42git

對於上邊的問題,若是使用一個執行序列來完成,咱們大約須要花費 10000*0.1 + 10000 = 11000 秒。這個時間顯然是太長了。程序員

那咱們有沒有可能在執行計算的同時取數據呢?或者是同時處理幾條數據呢?若是能夠,這樣就能大幅提升任務的效率。這就是多線程編程的目的。github

對於本質上就是異步的, 須要有多個併發事務,各個事務的運行順序能夠是不肯定的,隨機的,不可預測的問題,多線程是最理想的解決方案。這樣的任務能夠被分紅多個執行流,每一個流都有一個要完成的目標,而後將獲得的結果合併,獲得最終的結果。算法

線程和進程

什麼是進程

進程(有時被稱爲重量級進程)是程序的一次 執行。每一個進程都有本身的地址空間,內存,數據棧以及其它記錄其運行軌跡的輔助數據。操做系 統管理在其上運行的全部進程,併爲這些進程公平地分配時間。進程也能夠經過 fork 和 spawn 操做 來完成其它的任務。不過各個進程有本身的內存空間,數據棧等,因此只能使用進程間通信(IPC), 而不能直接共享信息。編程

什麼是線程

線程(有時被稱爲輕量級進程)跟進程有些類似,不一樣的是,全部的線程運行在同一個進程中, 共享相同的運行環境。它們能夠想像成是在主進程或「主線程」中並行運行的「迷你進程」。安全

線程狀態如圖服務器

線程狀態如圖
線程狀態如圖

線程有開始,順序執行和結束三部分。它有一個本身的指令指針,記錄本身運行到什麼地方。 線程的運行可能被搶佔(中斷),或暫時的被掛起(也叫睡眠),讓其它的線程運行,這叫作讓步。 一個進程中的各個線程之間共享同一片數據空間,因此線程之間能夠比進程之間更方便地共享數據以及相互通信。

固然,這樣的共享並非徹底沒有危險的。若是多個線程共同訪問同一片數據,則因爲數據訪 問的順序不同,有可能致使數據結果的不一致的問題。這叫作競態條件(race condition)。

線程通常都是併發執行的,不過在單 CPU 的系統中,真正的併發是不可能的,每一個線程會被安排成每次只運行一小會,而後就把 CPU 讓出來,讓其它的線程去運行。因爲有的函數會在完成以前阻塞住,在沒有特別爲多線程作修改的情 況下,這種「貪婪」的函數會讓 CPU 的時間分配有所傾斜。致使各個線程分配到的運行時間可能不 盡相同,不盡公平。

Python、線程和全局解釋器鎖

全局解釋器鎖(GIL)

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行(其中的JPython就沒有GIL)。

那麼CPython實現中的GIL又是什麼呢?GIL全稱Global Interpreter Lock爲了不誤導,咱們仍是來看一下官方給出的解釋:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

儘管Python徹底支持多線程編程, 可是解釋器的C語言實現部分在徹底並行執行時並非線程安全的。 實際上,解釋器被一個全局解釋器鎖保護着,它確保任什麼時候候都只有一個Python線程執行。

在多線程環境中,Python 虛擬機按如下方式執行:

  1. 設置GIL
  2. 切換到一個線程去執行
  3. 運行
    • 指定數量的字節碼指令
    • 線程主動讓出控制(能夠調用time.sleep(0))
  4. 把線程設置完睡眠狀態
  5. 解鎖GIL
  6. 再次重複以上步驟

對全部面向 I/O 的(會調用內建的操做系統 C 代碼的)程序來講,GIL 會在這個 I/O 調用之 前被釋放,以容許其它的線程在這個線程等待 I/O 的時候運行。若是某線程並未使用不少 I/O 操做, 它會在本身的時間片內一直佔用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集 型的程序更能充分利用多線程環境的好處。

退出線程

當一個線程結束計算,它就退出了。線程能夠調用 thread.exit()之類的退出函數,也可使用 Python 退出進程的標準方法,如 sys.exit()或拋出一個 SystemExit 異常等。不過,你不能夠直接 「殺掉」("kill")一個線程。

在 Python 中使用線程

在 Win32 和 Linux, Solaris, MacOS, *BSD 等大多數類 Unix 系統上運行時,Python 支持多線程 編程。Python 使用 POSIX 兼容的線程,即 pthreads。

默認狀況下,只要在解釋器中

>> import thread複製代碼

若是沒有報錯,則說明線程可用。

Python 的 threading 模塊

Python 供了幾個用於多線程編程的模塊,包括 thread, threading 和 Queue 等。thread 和 threading 模塊容許程序員建立和管理線程。thread 模塊 供了基本的線程和鎖的支持,而 threading 供了更高級別,功能更強的線程管理的功能。Queue 模塊容許用戶建立一個能夠用於多個線程之間 共享數據的隊列數據結構。

核心 示:避免使用 thread 模塊

出於如下幾點考慮,咱們不建議您使用 thread 模塊。

  1. 更高級別的 threading 模塊更爲先 進,對線程的支持更爲完善,並且使用 thread 模塊裏的屬性有可能會與 threading 出現衝突。其次, 低級別的 thread 模塊的同步原語不多(實際上只有一個),而 threading 模塊則有不少。
  2. 對於你的進程何時應該結束徹底沒有控制,當主線程結束 時,全部的線程都會被強制結束掉,沒有警告也不會有正常的清除工做。咱們以前說過,至少 threading 模塊能確保重要的子線程退出後進程才退出。

thread 模塊

除了產生線程外,thread 模塊也提供了基本的同步數 據結構鎖對象(lock object,也叫原語鎖,簡單鎖,互斥鎖,互斥量,二值信號量)。

thread 模塊函數

  • start_new_thread(function, args, kwargs=None):產生一個新的線程,在新線程中用指定的參數和可選的 kwargs 來調用這個函數。
  • allocate_lock():分配一個 LockType 類型的鎖對象
  • exit():讓線程退出
  • acquire(wait=None):嘗試獲取鎖對象
  • locked():若是獲取了鎖對象返回 True,不然返回 False
  • release():釋放鎖

下面是一個使用 thread 的例子:

import thread
from time import sleep, time


def loop(num):
    print('start loop at:', time())
    sleep(num)
    print('loop done at:', time())


def loop1(num):
    print('start loop 1 at:', time())
    sleep(num)
    print('loop 1 done at:', time())


def main():
    print('starting at:', time())
    thread.start_new_thread(loop, (4,))
    thread.start_new_thread(loop1, (5,))
    sleep(6)
    print('all DONE at:', time())

if __name__ == '__main__':
    main()

('starting at:', 1489387024.886667)
('start loop at:', 1489387024.88705)
('start loop 1 at:', 1489387024.887277)
('loop done at:', 1489387028.888182)
('loop 1 done at:', 1489387029.888904)
('all DONE at:', 1489387030.889918)複製代碼

start_new_thread()要求必定要有前兩個參數。因此,就算咱們想要運行的函數不要參數,也要傳一個空的元組。
爲何要加上sleep(6)這一句呢? 由於,若是咱們沒有讓主線程停下來,那主線程就會運行下一條語句,顯示 「all done」,而後就關閉運行着 loop()和 loop1()的兩個線程,退出了。

咱們有沒有更好的辦法替換使用sleep() 這種不靠譜的同步方式呢?答案是使用鎖,使用了鎖,咱們就能夠在兩個線程都退出以後立刻退出。

#! -*- coding: utf-8 -*-

import thread
from time import sleep, time

loops = [4, 2]

def loop(nloop, nsec, lock):
    print('start loop %s at: %s' % (nloop, time()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, time()))
    # 每一個線程都會被分配一個事先已經得到的鎖,在 sleep()的時間到了以後就釋放 相應的鎖以通知主線程,這個線程已經結束了。
    lock.release()


def main():
    print('starting at:', time())
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        # 調用 thread.allocate_lock()函數建立一個鎖的列表
        lock = thread.allocate_lock()
        # 分別調用各個鎖的 acquire()函數得到, 得到鎖表示「把鎖鎖上」
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        # 建立線程,每一個線程都用各自的循環號,睡眠時間和鎖爲參數去調用 loop()函數
        thread.start_new_thread(loop, (i, loops[i], locks[i]))

    for i in nloops:
        # 在線程結束的時候,線程要本身去作解鎖操做
        # 當前循環只是坐在那一直等(達到暫停主 線程的目的),直到兩個鎖都被解鎖爲止才繼續運行。
        while locks[i].locked(): pass

    print('all DONE at:', time())

if __name__ == '__main__':
    main()複製代碼

爲何咱們不在建立鎖的循環裏建立線程呢?有如下幾個緣由:

  1. 咱們想到實現線程的同步,因此要讓「全部的馬同時衝出柵欄」。
  2. 獲取鎖要花一些時間,若是你的 線程退出得「太快」,可能會致使尚未得到鎖,線程就已經結束了的狀況。

threading 模塊

threading 模塊不只提供了 Thread 類,還提供了各類很是好用的同步機制。

下面是threading 模塊裏全部的對象:

  1. Thread: 表示一個線程的執行的對象
  2. Lock: 鎖原語對象(跟 thread 模塊裏的鎖對象相同)
  3. RLock: 可重入鎖對象。使單線程能夠再次得到已經得到了的鎖(遞歸鎖定)。
  4. Condition: 條件變量對象能讓一個線程停下來,等待其它線程知足了某個「條件」。 如,狀態的改變或值的改變。
  5. Event: 通用的條件變量。多個線程能夠等待某個事件的發生,在事件發生後, 全部的線程都會被激活。
  6. Semaphore: 爲等待鎖的線程 供一個相似「等候室」的結構
  7. BoundedSemaphore: 與 Semaphore 相似,只是它不容許超過初始值
  8. Timer: 與 Thread 類似,只是,它要等待一段時間後纔開始運行。

守護線程

另外一個避免使用 thread 模塊的緣由是,它不支持守護線程。當主線程退出時,全部的子線程不 論它們是否還在工做,都會被強行退出。有時,咱們並不指望這種行爲,這時,就引入了守護線程 的概念
threading 模塊支持守護線程,它們是這樣工做的:守護線程通常是一個等待客戶請求的服務器, 若是沒有客戶 出請求,它就在那等着。若是你設定一個線程爲守護線程,就表示你在說這個線程 是不重要的,在進程退出的時候,不用等待這個線程退出。
若是你的主線程要退出的時候,不用等待那些子線程完成,那就設定這些線程的 daemon 屬性。 即,在線程開始(調用 thread.start())以前,調用 setDaemon()函數設定線程的 daemon 標誌 (thread.setDaemon(True))就表示這個線程「不重要」
若是你想要等待子線程完成再退出,那就什麼都不用作,或者顯式地調用 thread.setDaemon(False)以保證其 daemon 標誌爲 False。你能夠調用 thread.isDaemon()函數來判 斷其 daemon 標誌的值。新的子線程會繼承其父線程的 daemon 標誌。整個 Python 會在全部的非守護 線程退出後纔會結束,即進程中沒有非守護線程存在的時候才結束。

Thread 類

Thread類提供瞭如下方法:

  • run(): 用以表示線程活動的方法。
  • start():啓動線程活動。
  • join([time]): 等待至線程停止。這阻塞調用線程直至線程的join() 方法被調用停止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
  • is_alive(): 返回線程是否活動的。
  • name(): 設置/返回線程名。
  • daemon(): 返回/設置線程的 daemon 標誌,必定要在調用 start()函數前設置

用 Thread 類,你能夠用多種方法來建立線程。咱們在這裏介紹三種比較相像的方法。

  • 建立一個Thread的實例,傳給它一個函數
  • 建立一個Thread的實例,傳給它一個可調用的類對象
  • 從Thread派生出一個子類,建立一個這個子類的實例

下邊是三種不一樣方式的建立線程的示例:

#! -*- coding: utf-8 -*-

# 建立一個Thread的實例,傳給它一個函數

import threading
from time import sleep, time

loops = [4, 2]

def loop(nloop, nsec, lock):
    print('start loop %s at: %s' % (nloop, time()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, time()))
    # 每一個線程都會被分配一個事先已經得到的鎖,在 sleep()的時間到了以後就釋放 相應的鎖以通知主線程,這個線程已經結束了。


def main():
    print('starting at:', time())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)

    for i in nloops:
        # start threads
        threads[i].start()

    for i in nloops:
        # wait for all
        # join()會等到線程結束,或者在給了 timeout 參數的時候,等到超時爲止。
        # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環清楚一些(這種鎖也被稱爲"spinlock")
        threads[i].join()  # threads to finish

    print('all DONE at:', time())

if __name__ == '__main__':
    main()複製代碼

與傳一個函數很類似的另外一個方法是在建立線程的時候,傳一個可調用的類的實例供線程啓動 的時候執行——這是多線程編程的一個更爲面向對象的方法。相對於一個或幾個函數來講,因爲類 對象裏可使用類的強大的功能,能夠保存更多的信息,這種方法更爲靈活

#! -*- coding: utf-8 -*-

# 建立一個 Thread 的實例,傳給它一個可調用的類對象

from threading import Thread
from time import sleep, time


loops = [4, 2]


class ThreadFunc(object):

    def __init__(self, func, args, name=""):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        # 建立新線程的時候,Thread 對象會調用咱們的 ThreadFunc 對象,這時會用到一個特殊函數 __call__()。
        self.func(*self.args)


def loop(nloop, nsec):
    print('start loop %s at: %s' % (nloop, time()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, time()))


def main():
    print('starting at:', time())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)

    for i in nloops:
        # start threads
        threads[i].start()

    for i in nloops:
        # wait for all
        # join()會等到線程結束,或者在給了 timeout 參數的時候,等到超時爲止。
        # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環清楚一些(這種鎖也被稱爲"spinlock")
        threads[i].join()  # threads to finish

    print('all DONE at:', time())


if __name__ == '__main__':
    main()複製代碼

最後一個例子介紹如何子類化 Thread 類,這與上一個例子中的建立一個可調用的類很是像。使用子類化建立線程(第 29-30 行)使代碼看上去更清晰明瞭。

#! -*- coding: utf-8 -*-

# 建立一個 Thread 的實例,傳給它一個可調用的類對象

from threading import Thread
from time import sleep, time


loops = [4, 2]


class MyThread(Thread):

    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        # 建立新線程的時候,Thread 對象會調用咱們的 ThreadFunc 對象,這時會用到一個特殊函數 __call__()。
        print 'starting', self.name, 'at:', time()
        self.res = self.func(*self.args)
        print self.name, 'finished at:', time()



def loop(nloop, nsec):
    print('start loop %s at: %s' % (nloop, time()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, time()))


def main():
    print('starting at:', time())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        # start threads
        threads[i].start()

    for i in nloops:
        # wait for all
        # join()會等到線程結束,或者在給了 timeout 參數的時候,等到超時爲止。
        # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環清楚一些(這種鎖也被稱爲"spinlock")
        threads[i].join()  # threads to finish

    print('all DONE at:', time())


if __name__ == '__main__':
    main()複製代碼

下載國旗的例子

下面,咱們接咱們以前按以前併發的套路,用實現一下使用 threading 併發下載國旗

# python3

import threading
from threading import Thread

from flags import save_flag, show, main, get_flag


class MyThread(Thread):

    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        # 建立新線程的時候,Thread 對象會調用咱們的 ThreadFunc 對象,這時會用到一個特殊函數 __call__()。
        self.res = self.func(*self.args)


def download_one(cc):  # <3>
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    threads = []
    for cc in cc_list:
        thread = MyThread(download_one, (cc, ), download_one.__name__)
        threads.append(thread)

    for thread in threads:
        # 啓動線程
        thread.start()

    for thread in threads:
        # wait for all
        # join()會等到線程結束,或者在給了 timeout 參數的時候,等到超時爲止。
        # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環清楚一些(這種鎖也被稱爲"spinlock")
        thread.join()

    return len(list(threads))  # <7>


if __name__ == '__main__':
    main(download_many)複製代碼

執行代碼發現和使用協程相比速度基本一致。

除了各類同步對象和線程對象外,threading 模塊還 供了一些函數。

  • active_count(): 當前活動的線程對象的數量
  • current_thread(): 返回當前線程對象
  • enumerate(): 返回當前活動線程的列表
  • settrace(func): 爲全部線程設置一個跟蹤函數
  • setprofile(func): 爲全部線程設置一個 profile 函數

Lock & RLock

原語鎖定是一個同步原語,狀態是鎖定或未鎖定。兩個方法acquire()和release() 用於加鎖和釋放鎖。
RLock 可重入鎖是一個相似於Lock對象的同步原語,但同一個線程能夠屢次調用。

Lock 不支持遞歸加鎖,也就是說即使在同 線程中,也必須等待鎖釋放。一般建議改 RLock, 它會處理 "owning thread" 和 "recursion level" 狀態,對於同 線程的屢次請求鎖 爲,只累加
計數器。每次調 release() 將遞減該計數器,直到 0 時釋放鎖,所以 acquire() 和 release() 必須 要成對出現。

from time import sleep
from threading import current_thread, Thread

lock = Rlock()

def show():
    with lock:
        print current_thread().name, i
        sleep(0.1)

def test():
    with lock:
        for i in range(3):
            show(i)

for i in range(2):
    Thread(target=test).start()複製代碼

Event

事件用於在線程間通訊。一個線程發出一個信號,其餘一個或多個線程等待。
Event 經過經過 個內部標記來協調多線程運 。 法 wait() 阻塞線程執 ,直到標記爲 True。 set() 將標記設爲 True,clear() 更改標記爲 False。isSet() 用於判斷標記狀態。

from threading import Event

def test_event():
    e = Event()
    def test():
        for i in range(5):
            print 'start wait'
            e.wait()
            e.clear()  # 若是不調用clear(),那麼標記一直爲 True,wait()就不會發生阻塞行爲
            print i
Thread(target=test).start()
return e


e = test_event()複製代碼

Condition

條件變量和 Lock 參數同樣,也是一個,也是一個同步原語,當須要線程關注特定的狀態變化或事件的發生時使用這個鎖定。

能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。

構造方法:
Condition([lock/rlock])

Condition 有如下這些方法:

  • acquire([timeout])/release(): 調用關聯的鎖的相應方法。
  • wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。
  • notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試得到鎖定(進入鎖定池);其餘線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
  • notifyAll(): 調用這個方法將通知等待池中全部的線程,這些線程都將進入鎖定池嘗試得到鎖定。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
from threading import Condition, current_thread, Thread

con = Condition()

def tc1():
    with con:
        for i in range(5):
            print current_thread().name, i
            sleep(0.3)
            if i == 3:
                con.wait()


def tc2():
    with con:
        for i in range(5):
            print current_thread().name, i
            sleep(0.1)
            con.notify()

Thread(target=tc1).start()
Thread(target=tc2).start()

Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3    # 讓出鎖
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-1 4    # 從新獲取鎖,繼續執複製代碼

只有獲取鎖的線程才能調用 wait() 和 notify(),所以必須在鎖釋放前調用。
當 wait() 釋放鎖後,其餘線程也可進入 wait 狀態。notifyAll() 激活全部等待線程,讓它們去搶鎖而後完成後續執行。

生產者-消費者問題和 Queue 模塊

如今咱們用一個經典的(生產者消費者)例子來介紹一下 Queue模塊。

生產者消費者的場景是: 生產者生產貨物,而後把貨物放到一個隊列之類的數據結構中,生產貨物所要花費的時間沒法預先肯定。消費者消耗生產者生產的貨物的時間也是不肯定的。

經常使用的 Queue 模塊的屬性:

  • queue(size): 建立一個大小爲size的Queue對象。
  • qsize(): 返回隊列的大小(因爲在返回的時候,隊列可能會被其它線程修改,因此這個值是近似值)
  • empty(): 若是隊列爲空返回 True,不然返回 False
  • full(): 若是隊列已滿返回 True,不然返回 False
  • put(item,block=0): 把item放到隊列中,若是給了block(不爲0),函數會一直阻塞到隊列中有空間爲止
  • get(block=0): 從隊列中取一個對象,若是給了 block(不爲 0),函數會一直阻塞到隊列中有對象爲止

Queue 模塊能夠用來進行線程間通信,讓各個線程之間共享數據。

如今,咱們建立一個隊列,讓 生產者(線程)把新生產的貨物放進去供消費者(線程)使用。

# python2
#! -*- coding: utf-8 -*-

from Queue import Queue
from random import randint
from time import sleep, time
from threading import Thread


class MyThread(Thread):

    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        # 建立新線程的時候,Thread 對象會調用咱們的 ThreadFunc 對象,這時會用到一個特殊函數 __call__()。
        print 'starting', self.name, 'at:', time()
        self.res = self.func(*self.args)
        print self.name, 'finished at:', time()


# writeQ()和 readQ()函數分別用來把對象放入隊列和消耗隊列中的一個對象。在這裏咱們使用 字符串'xxx'來表示隊列中的對象。
def writeQ(queue):
    print 'producing object for Q...'
    queue.put('xxx', 1)
    print "size now", queue.qsize()


def readQ(queue):
    queue.get(1)
    print("consumed object from Q... size now", queue.qsize())


def writer(queue, loops):
    # writer()函數只作一件事,就是一次往隊列中放入一個對象,等待一會,而後再作一樣的事
    for i in range(loops):
        writeQ(queue)
        sleep(1)


def reader(queue, loops):
    # reader()函數只作一件事,就是一次從隊列中取出一個對象,等待一會,而後再作一樣的事
    for i in range(loops):
        readQ(queue)
        sleep(randint(2, 5))


# 設置有多少個線程要被運行
funcs = [writer, reader]
nfuncs = range(len(funcs))


def main():
    nloops = randint(10, 20)
    q = Queue(32)
    threads = []

    for i in nfuncs:
        t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
        threads.append(t)

    for i in nfuncs:
        threads[i].start()

    for i in nfuncs:
        threads[i].join()
        print threads[i].getResult()

    print 'all DONE'


if __name__ == '__main__':
    main()複製代碼

FAQ

進程與線程。線程與進程的區別是什麼?

進程(有時被稱爲重量級進程)是程序的一次 執行。每一個進程都有本身的地址空間,內存,數據棧以及其它記錄其運行軌跡的輔助數據。
線程(有時被稱爲輕量級進程)跟進程有些類似,不一樣的是,全部的線程運行在同一個進程中, 共享相同的運行環境。它們能夠想像成是在主進程或「主線程」中並行運行的「迷你進程」。

這篇文章很好的解釋了 線程和進程的區別,推薦閱讀: www.ruanyifeng.com/blog/2013/0…

Python 的線程。在 Python 中,哪種多線程的程序表現得更好,I/O 密集型的仍是計算 密集型的?

因爲GIL的緣故,對全部面向 I/O 的(會調用內建的操做系統 C 代碼的)程序來講,GIL 會在這個 I/O 調用之 前被釋放,以容許其它的線程在這個線程等待 I/O 的時候運行。若是某線程並未使用不少 I/O 操做, 它會在本身的時間片內一直佔用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集 型的程序更能充分利用多線程環境的好處。

線程。你認爲,多 CPU 的系統與通常的系統有什麼大的不一樣?多線程的程序在這種系統上的表現會怎麼樣?

Python的線程就是C語言的一個pthread,並經過操做系統調度算法進行調度(例如linux是CFS)。爲了讓各個線程可以平均利用CPU時間,python會計算當前已執行的微代碼數量,達到必定閾值後就強制釋放GIL。而這時也會觸發一次操做系統的線程調度(固然是否真正進行上下文切換由操做系統自主決定)。
僞代碼

while True:
    acquire GIL
    for i in 1000:
        do something
    release GIL
    /* Give Operating System a chance to do thread scheduling */複製代碼

這種模式在只有一個CPU核心的狀況下毫無問題。任何一個線程被喚起時都能成功得到到GIL(由於只有釋放了GIL纔會引起線程調度)。
但當CPU有多個核心的時候,問題就來了。從僞代碼能夠看到,從release GIL到acquire GIL之間幾乎是沒有間隙的。因此當其餘在其餘核心上的線程被喚醒時,大部分狀況下主線程已經又再一次獲取到GIL了。這個時候被喚醒執行的線程只能白白的浪費CPU時間,看着另外一個線程拿着GIL歡快的執行着。而後達到切換時間後進入待調度狀態,再被喚醒,再等待,以此往復惡性循環。
簡單的總結下就是:Python的多線程在多核CPU上,只對於IO密集型計算產生正面效果;而當有至少有一個CPU密集型線程存在,那麼多線程效率會因爲GIL而大幅降低。

線程池。修改 生成者消費者 的代碼,再也不是一個生產者和一個消費者,而是能夠有任意個 消費者線程(一個線程池),每一個線程能夠在任意時刻處理或消耗任意多個產品。

參考文章

>歡迎關注 >請我喝芬達
歡迎關注
歡迎關注
請我喝芬達
請我喝芬達
相關文章
相關標籤/搜索