32 - 併發編程-線程-多線程

1 概述

        咱們都知道windows是支持多任務的操做系統。
        什麼叫'多任務'呢?簡單地說,就是操做系統能夠同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕做業,這就是多任務,至少同時有3個任務正在運行。還有不少任務悄悄地在後臺同時運行着,只是桌面上沒有顯示而已。
        如今,多核CPU已經很是普及了,可是,即便過去的單核CPU,也能夠執行多任務。因爲CPU執行代碼都是順序執行的,那麼,單核CPU是怎麼執行多任務的呢?
        答案就是操做系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每一個任務都是交替執行的,可是,因爲CPU的執行速度實在是太快了,咱們感受就像全部任務都在同時執行同樣。bootstrap

這裏的執行時間,通常被稱爲時間片,即操做系統把CPU的時間劃分爲一個一個的時間片,在一個時間片內,線程能夠可勁兒的運行,時間一到,當前線程就被掛起了。windows

        真正的並行執行多任務只能在多核CPU上實現,可是,因爲任務數量遠遠多於CPU的核心數量,因此,操做系統也會自動把不少任務輪流調度到每一個核心上執行。
        對於操做系統來講,一個任務就是一個進程(Process),好比打開一個瀏覽器就是啓動一個瀏覽器進程,打開一個記事本就啓動了一個記事本進程,打開兩個記事本就啓動了兩個記事本進程,打開一個Word就啓動了一個Word進程。
        有些進程還不止同時幹一件事,好比Word,它能夠同時進行打字、拼寫檢查、打印等事情。在一個進程內部,要同時幹多件事,就須要同時運行多個'子任務',咱們把進程內的這些'子任務'稱爲線程(Thread)。
        因爲每一個進程至少要幹一件事,因此,一個進程至少有一個線程。固然,像Word這種複雜的進程能夠有多個線程,多個線程能夠同時執行,多線程的執行方式和多進程是同樣的,也是由操做系統在多個線程之間快速切換,讓每一個線程都短暫地交替運行,看起來就像同時執行同樣。固然,真正地同時執行多線程須要多核CPU纔可能實現。
        咱們前面編寫的全部的Python程序,都是執行單任務的進程,也就是隻有一個線程。若是咱們要同時執行多個任務怎麼辦?有兩種解決方案:api

  • 一種是啓動多個進程,每一個進程雖然只有一個線程,但多個進程能夠一塊執行多個任務。
  • 一種方法是啓動一個進程,在一個進程內啓動多個線程,這樣,多個線程也能夠一塊執行多個任務。

        固然還有第三種方法,就是啓動多個進程,每一個進程再啓動多個線程,這樣同時執行的任務就更多了,固然這種模型更復雜,實際不多采用。總結一下就是,多任務的實現有3種方式:瀏覽器

  1. 多進程模式;
  2. 多線程模式;
  3. 多進程+多線程模式

        同時執行多個任務一般各個任務之間並非沒有關聯的,而是須要相互通訊和協調,有時,任務1必須暫停等待任務2完成後才能繼續執行,有時,任務3和任務4又不能同時執行,因此,多進程和多線程的程序的複雜度要遠遠高於咱們前面寫的單進程單線程的程序。安全

Python既支持多進程,又支持多線程。網絡

2 進程和線程

        進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。線程(Thread)是操做系統可以進行運算調度的最小單位,它被包含在進程中,是進程中的實際運做單位。多線程

一個程序的執行實例就是一個進程app

2.1 進程和線程的關係

        程序是源代碼編譯後的文件,而這些文件存放在磁盤上。當程序被操做系統加載到內存中,就是進程,進程中存放着指令和數據(資源),它也是線程的容器。ide

Linux進程有父進程、子進程,而Windows中的進程都是平等關係。
        線程,有時被稱爲輕量級進程(Lightweight Process,LWP),是程序執行流的最小單元。一個標準的線程由線程ID,當前指令指針(PC),寄存器集合和堆棧組成。在許多系統中,建立一個線程比建立一個進程快10-100倍。

2.2 進程和線程的特色

如今操做系統提出進程的概念,每個進程都認爲本身獨佔因此計算機硬件資源:

  1. 進程就是獨立的王國,進程間不能夠隨便的共享數據(IPC)
  2. 線程就是省份,同一個進程內的線程能夠共享進程資源,每個線程擁有本身獨立的堆棧。

2.3 線程與進程的區別

  1. 線程共享建立它的進程的地址空間,進程擁有本身的地址空間
  2. 線程能夠直接訪問進程的數據,進程擁有它父進程內存空間的拷貝
  3. 線程能夠和同一進程內其餘的線程直接通訊,進程必須interprocess communicateion(IPC機制)進行通訊
  4. 線程能夠被很容易的建立,而進程依賴於父進程內存空間的拷貝
  5. 線程能夠直接控制同一進程內的其餘線程,進程只能控制本身的子進程
  6. 改變主線程(控制)可能會影響其餘線程,改變主進程不會影響它的子進程

2.3 線程的狀態

線程的狀態以下表:

狀態 含義
就緒(Ready) 線程可以運行,但在等待被調度。可能線程剛剛建立啓動,或者剛剛從阻塞中恢復,或者被其餘線程搶佔
運行(Running) 線程正在運行
阻塞(Blocked) 線程等待外部事件發生而沒法運行,如磁盤I/O,網絡I/O等。
終止(Terminated) 線程完成,或退出,或被取消

xiancheng

3 Python線程

Python 標準庫提供了 threadthreading 兩個模塊來對多線程進行支持。其中, thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊經過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。編寫多線程代碼以前還須要知道:

  1. 進程靠線程執行代碼,至少有一個主線程,其餘線程是工做線程。
  2. 主線程是第一個啓動的線程
  3. 父線程:若是線程A啓動了一個線程B,A就是B的父線程。
  4. 子線程:B就是A的子線程。

3.1 Thread類

Thread 是threading模塊中最重要的類之一,可使用它來建立線程。Thread類的格式以下:

class Thread:
    def __init__(self, group=None, target=None,name=None,args=(), kwargs=None, *, daemon=None):

各參數含義以下:

參數名 含義
group 預留參數
traget 線程要乾的事情,一般是一個函數
name 線程的名字
args 爲函數傳遞的位置參數(元組)
kwargs 爲函數傳遞的關鍵字參數(字典)
daemon 當它的值爲True時使子線程變爲守護線程,主線程退出後,子線程一同退出

3.1.1 運行線程

建立完子線程,那麼就能夠啓動了,一般咱們使用start方法來啓動一個線程

import threading

def worker(name):
    print('hello, {}'.format(name))

t = threading.Thread(target=worker,args=('daxin',),name='daxin_thread')
t.start()

print('End')


# hello, daxin

# End

3.1.2 線程退出

Python沒有提供線程退出的方法,線程在如下兩種狀況時退出:

  1. 線程函數內語句執行完畢
  2. 線程函數中拋出未處理的異常
import threading
import time

def worker(name):
    count = 0
    while True:
        time.sleep(1)
        if count == 5:
            raise Exception('Bye Bye')   # 異常退出(線程關閉)
        if count == 10:
            break                        # 執行完畢後退出(線程關閉)
        print('hello, {}'.format(name))
        count += 1

t = threading.Thread(target=worker,args=('daxin',),name='daxin_thread')
t.start()

print('End')

# hello, daxin
# End

3.1.3 屬性方法

threading模塊還提供了許多用於查看線程相關屬性的方法:

名稱 方法
current_thread() 返回當前線程對象
main_thread() 返回主線程對象
active_count() 當前處於alive狀態的線程個數
enumerate() 返回因此或者的線程的列表,不包括已經終止的線程和未開始的線程
get_ident() 返回當前線程的ID,非0整數
name 返回線程的名字(屬性)
ident 線程的ID(屬性)
is_alive() 返回線程是否還或者
start() 啓動線程,每一個線程只能啓動一次
run() 運行線程函數
import threading
import time

def worker(name):
    current_thread = threading.current_thread()
    print('個人名字:{}  個人ID:{}  個人狀態:{}'.format(current_thread.name,current_thread.ident,current_thread.is_alive()))
    time.sleep(1)   # 執行完會當即退出,這裏爲了掩飾,讓它停頓一秒

t = threading.Thread(target=worker,args=('daxin',),name='daxin_thread')
t.start()

print('主線程:{}'.format(threading.main_thread()))
print('存活的線程數量:{}'.format(threading.active_count()))
print('存活的線程: {}'.format(threading.enumerate()))
print('End')


# 個人名字:daxin_thread  個人ID:6820  個人狀態:True
# 主線程:<_MainThread(MainThread, started 22980)>
# 存活的線程數量:2
# 存活的線程: [<_MainThread(MainThread, started 22980)>, <Thread(daxin_thread, started 6820)>]
# End

線程的ID是會重複利用的,因此說一個線程消亡,那麼下一個新建的線程可能還會服用以前線程的ID。因此不要覺得這個線程執行完畢還在存活。

3.1.4 start方法和run方法

        start和run方法看起來都是啓動一個線程用的,他們的主要區別時,start方法執行後會開啓一個新的線程,而後在新的線程中調用run方法運行咱們指定的函數,而run方法只會在當前線程中調用咱們指定的函數,實際上就是在主線程調用了一個普通的函數而已。

import threading

class MyThread(threading.Thread):

    def start(self):
        print('{} , start ~~~'.format(self.__class__))
        super().start()

    def run(self):
        print('{} , run ~~~~'.format(self.__class__))
        super().run()

def worker():
    print('I am worker')

t = MyThread(target=worker, name='daxin')
t.start()


# <class '__main__.MyThread'> , start ~~~
# <class '__main__.MyThread'> , run ~~~~
# I am worker

先執行了start方法,而後調用了run方法。觀察start方法的原碼

# 查看start幹了啥
_start_new_thread(self._bootstrap, ())

# 查看self._bootstrap幹了啥
self._bootstrap_inner()

# 操做了一個屬性
self._started.set()

# 設置爲True了
self._flag = True

這麼作是爲何呢?,來看一下start方法的運行條件

if self._started.is_set():
    raise RuntimeError("threads can only be started once")

總結一下就是說:當咱們使用start方法運行一個子進程的時候,它會檢測_started屬性,它的初始值是False,當運行之後,它被置爲True,下次再運行時,就會異常提示,因此線程只能被運行一次。

3.2 多線程

多線程故名思議,多個線程運行

import threading
import time

def worker():
    time.sleep(1)
    print('I am worker')

t1 = threading.Thread(target=worker, name='daxin1')
t2 = threading.Thread(target=worker, name='daxin2')
t1.start()
t2.start()
print(threading.enumerate())  # [<_MainThread(MainThread, started 15012)>, <Thread(daxin1, started 20808)>, <Thread(daxin2, started 21328)>]
print(threading.active_count())  # 3
time.sleep(2)

同時運行了t1,t2和當前主線程。因此打印當前活動線程的數量爲3.

一個進程中至少有一個線程,並做爲程序的入口,這個線程就是主線程,一個進程至少有一個主線程。其餘線程稱爲工做線程。

3.3 線程安全

        多個線程訪問同一個對象時,若是不用考慮這些線程在運行時環境下的調度和交替執行,也不須要進行額外的同步,或者在調用方進行任何其餘操做,調用這個對象的行爲均可以得到正確的結果,那麼這個對象就是線程安全的。
        換句話說就是線程執行的結果(顯示)不會由於其餘因素(時間片用完被交換、網絡I/O被等待等待其餘因素)而改變。那麼這個線程就是安全的,典型的print函數,就是一個線程不安全的。

import threading

def work():
    for i in range(100):
        print('Thread: {} start~~~'.format(threading.current_thread().ident))

count = 10
while count < 20:
    t = threading.Thread(target=work)
    t.start()
    count += 1


# 輸出結果
# Thread: 11276 start~~~
# Thread: 11276 start~~~Thread: 5056 start~~~
# 
# Thread: 11276 start~~~
# Thread: 5056 start~~~Thread: 23320 start~~~
# Thread: 20652 start~~~
# Thread: 11276 start~~~Thread: 23320 start~~~Thread: 12016 start~~~Thread: 12356 start~~~Thread: 21532 start~~~Thread: 16908 start~~~
# 
# 
# 
# Thread: 12672 start~~~
# Thread: 19372 start~~~
# 
# Thread: 11276 start~~~Thread: 12016 start~~~
# Thread: 5056 start~~~
# Thread: 19372 start~~~
# 
# Thread: 20652 start~~~
# 上面代碼須要在ipython環境下執行纔會有相似的效果

        咱們看到上面的輸出信息,有不少都打在了一塊兒,這是由於print函數在執行打印完畢,而後換行的時候,時間片用完被暫停運行,其餘線程的print函數開始運行,向輸出終端開始打印信息,而此時光標在還在上一行的末尾,因此就連續輸出了,等到再次切換回來,只剩換行符了,因此會看到有一些空行是輸出。

print函數的輸出信息是原子的,即組成一句話總體被打印,而打印換行符的時候是能夠被中斷的,因此也能夠在print函數打印的字符串中手動添加\n,而後指定sep=''便可。這裏只討論線程安全問題。

        相對於print函數來講,logging模塊都是線程安全的。將上面的例子換成logging就能夠完美打印了。

import threading
import logging

def work():
    for i in range(100):
        logging.warning('Thread: {} start~~~'.format(threading.current_thread().ident))

count = 10
while count < 20:
    t = threading.Thread(target=work)
    t.start()
    count += 1

3.4 daemon和non-daemon線程

        daemon,守護進程。用於告訴子線程要追隨主線程的狀態,即主線程退出,子線程無論有沒有執行完畢,都要跟着退出。Python中,構建線程的時候,能夠設置線程是不是守護線程,須要注意的是這個屬性必須在start方法前設置。(線程都已經運行起來了,你還設置,有毛用啊。)

import threading
import time

def work():
    print('I am Worked')
    time.sleep(10)
    print('I am Finished')

t = threading.Thread(target=work,daemon=True)
t.start()

time.sleep(2)
print('運行完畢,主線程關閉')

# I am Worked
# 運行完畢,主線程關閉

設置線程t的屬性爲守護進程,當主線程執行完print後,雖然子線程還在睡眠中,可是也會被強制關閉,因此'I am Finished'不會被打印。默認狀況下,主線程是non-daemon的。若是沒有指定子線程的daemon屬性,那麼它會取當前線程的daemon的值

import threading
import time

def work2():
    # time.sleep(10)
    current = threading.current_thread()
    print(current.daemon)   # 繼承work線程的daemon屬性,這裏是True
    print('I am Work2')

def work():
    print('I am Worked')
    t = threading.Thread(target=work2)
    t.start()
    print('I am Finished')

t = threading.Thread(target=work,daemon=True)
t.start()

time.sleep(3)

關於daemin的其餘屬性:

名稱 含義
isDaemon() 是不是daemon線程
setDaemon 設置daemon線程(必須在start方法以前)

主線程執行完畢後,會檢查是否有子線程的daemon屬性是False,若是有,則等待子線程執行完畢後退出,若是沒有,則直接退出。

3.5 join方法

用於告知當前線程等待某個線程終止後再執行,這種等待是阻塞的。它的格式爲:

threadobj.join(timeout=None)

timeout用於指定具體等待多久,默認爲None,表示永遠等下去,不然只等待指定的時間。看下面的例子:

import threading
import time

def work():
    print('I am Worked')
    time.sleep(3)
    print('I am Finished')

t = threading.Thread(target=work, daemon=True)
t.start()

print('主線程執行完畢,準備退出了')
t.join() 
print('主線程退出')


# 結果
# I am Worked
# 主線程執行完畢,準備退出了
# I am Finished
# 主線程退出

        t.join()表示阻塞等待,等待線程t執行完畢,再向後執行,因此會打印'I am Finished'。若是不等待,線程t的daemon屬性爲True,那麼主線程執行完畢,就會直接退出了。

在哪一個線程join(),那麼哪一個線程就會等待,並阻塞。

3.6 daemon的使用場景

當你把一個線程設置爲daemon,它能夠隨主線程的退出而退出:

  1. 後臺任務。如發送心跳包、監控、這種場景最多。
  2. 主線程工做纔有用的線程。如主線程中維護公共資源,主線程已經可以被清理了。工做線程也就沒有了意義,一塊兒退出最合適。
  3. 隨時能夠被終止的線程。

3.7 threading.Local類

        在多線程環境下,每一個線程都有本身的數據。一個線程使用本身的局部變量比使用全局變量好,由於局部變量只有線程本身能看見,不會影響其餘線程,而全局變量的修改必須加鎖(鎖住資源,不然一旦在多個線程中修改,會產生沒法預期的結果)。可是局部變量也有問題,就是在函數調用的時候,每次都要傳遞,感受很麻煩。
        在主進程中建立ThreadLocal對象,每一個Thread對它均可以讀寫屬性,但互不影響。你能夠把ThreadLocal當作全局變量,但每一個屬性都是線程的局部變量,能夠任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。即:一個ThreadLocal變量雖然是全局變量,但每一個線程都只能讀寫本身線程的獨立副本,互不干擾。ThreadLocal解決了參數在一個線程中各個函數之間互相傳遞的問題。

mport threading

local = threading.local()
local.x = 200

def worker():
    for i in range(100):
        local.x += 1
    print('{} {}'.format(threading.current_thread().name,local.x))

for i in range(10):
    t = threading.Thread(target=worker)
    t.start()

print(local.x)

上面的執行結果是互補不干擾的。爲何local對象能夠安全使用?
        經過分析原碼,咱們看到:在主線程中建立後local後,threading.local會爲實例初始化一個字典管理器,建立一個大字典,用於嵌套每個線程建立的它本身線程相關的字典。當開啓了新的線程時,查找local對象的某個屬性時,首先會被__getattribute__方法捕獲,在內部又調用了包裝爲上下文管理器的_path(打補丁),首先它經過local初始化時構建的key(prefix+id(當前線程))來查找它嵌套的字典。若是不存在說明線程第一次運行,爲它初始化一個__dict__屬性字典,替換local的屬性,若是存在,則把字典管理器拿出來,直接替換local的屬性。因此在不一樣線程內,每一個local對象在調用時都會切換到它對應的__dict__上。線程執行完畢後,回到主線程,在_path中從新切換到主線程的小字典,而後繼續把local對象的__dict進行切換。
        一句話總結:
運行時,threading.local實例處在不一樣的線程中,就從大字典中找到當前線程相關鍵值對中的字典,覆蓋threading.local實例的 __dict__ 。這樣就能夠在不一樣的線程中,安全地使用線程獨有的數據,作到了線程間數據隔離,如同本地變量同樣安全。__

簡單的模擬thread_local的實現:

import threading
import time


class Mylocal(object):
    def __init__(self):
        self.storage = {}

    def __getitem__(self, item):
        process_ident = threading.get_ident()
        return self.storage[process_ident][item]

    def __setitem__(self, key, value):
        process_ident = threading.get_ident()
        if self.storage.get(process_ident, None):
            self.storage[process_ident][key] = value
        else:
            self.storage[process_ident] = {key: value}

data = Mylocal()

def sum():
    data['value'] = 1
    while True:
        if data['value'] < 10:
            data['value'] += 1
            time.sleep(1)
            print(data['value'])
        else:
            print(data['value'])
            break

if __name__ == '__main__':
    threads = []
    for i in range(10):
        t = threading.Thread(target=sum, name=i)
        threads.append(t)
        t.start()

    for thread in threads:
        thread.join()

    print(data.storage)  # {18900: {'value': 10}, 22140: {'value': 10}, 25816: {'value': 10}, 17784: {'value': 10}, 20588: {'value': 10}, 3744: {'value': 10}, 10448: {'value': 10}, 15896: {'value': 10}, 18016: {'value': 10}, 16824: {'value': 10}}

3.8 Timer定時器

threading模塊提供了一個Timer定時器對象,用於告訴某一線程延遲多久執行一個函數。它的基本格式以下:

threading.Timer(interval, function, args=None, kwargs=None)
  • interval:表示延遲時間(秒)
  • function: 同target,要執行的函數
  • args/kwargs:傳遞的位置/關鍵字參數

timer對象執行start方法後,會等待interval的時間,而後開始執行function函數

import threading

def work():
    print('hello world')

t = threading.Timer(3, work,)
t.start()

咱們看到Timer類沒有提供daemon參數,但由於它也是繼承自Thread對象,因此若是要設置只能使用setDeamon方法了。

import threading

def work():
    print(threading.current_thread().isDaemon())
    print('hello world')

t = threading.Timer(3, work,)
t.setDaemon(True)   # 設置deamon屬性
t.start()

添加daemon之後,一啓動,主進程執行完畢,直接退出了,定時器就沒機會執行了。固然也能夠經過Timer對象的cancel方法來取消定時器

import threading
import time
def work():
    time.sleep(2)
    print(threading.current_thread().isDaemon())
    print('hello world')

t = threading.Timer(3, work,)
t.start()
time.sleep(4)
t.cancel()

但若是定時器已經在運行,那麼就沒法取消了。cancel本質上使用的死Event對象實現的。

相關文章
相關標籤/搜索