Python併發編程系列之多線程

1 引言

  上一篇博文詳細總結了Python進程的用法,這一篇博文來因此說Python中線程的用法。實際上,程序的運行都是以線程爲基本單位的,每個進程中都至少有一個線程(主線程),線程又能夠建立子線程。線程間共享數據比進程要容易得多(垂手可得),進程間的切換也要比進程消耗CPU資源少。html

  線程管理能夠經過thead模塊(Python中已棄用)和threading 模塊,但目前主要以threading模塊爲主。由於更加先進,有更好的線程支持,且 threading模塊的同步原語遠多於thread模塊。另外,thread 模塊中的一些屬性會和 threading 模塊有衝突。故,本文建立線程和使用線程都經過threading模塊進行。python

  threading模塊提供的類: Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。編程

  threading 模塊提供的經常使用方法:安全

  threading.currentThread(): 返回當前的線程變量。服務器

  threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。多線程

  threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。併發

  threading 模塊提供的常量:app

  threading.TIMEOUT_MAX 設置threading全局超時時間。dom

2 建立線程

  不管是用自定義函數的方法建立線程仍是用自定義類的方法建立線程與建立進程的方法極其類似的,不過,建立線程時,能夠不在「if __name__==」__main__:」語句下進行」。不管是哪一種方式,都必須經過threading模塊提供的Thread類進行。Thread類經常使用屬性和方法以下。ide

  Thread類屬性:

  name:線程名

  ident:線程的標識符

  daemon:布爾值,表示這個線程是不是守護線程

  Thread類方法:

  __init__(group=None,target=None,name=None,args=(),kwargs={},verbose=None,daemon=None):實例化一個線程對象,須要一個可調用的target對象,以及參數args或者kwargs。還能夠傳遞name和group參數。daemon的值將會設定thread.daemon的屬性

  start():開始執行該線程

  run():定義線程的方法。(一般開發者應該在子類中重寫)

  join(timeout=None):直至啓動的線程終止以前一直掛起;除非給出了timeout(單位秒),不然一直被阻塞

  isAlive:布爾值,表示這個線程是否還存活(駝峯式命名,python2.6版本開始已被取代)

  isDaemon():布爾值,表示是不是守護線程

  setDaemon(布爾值):在線程start()以前調用,把線程的守護標識設定爲指定的布爾值

  在下面兩小節咱們分別經過代碼來演示。

2.1 自定義函數的方式建立線程

import os

import time

import threading

def fun(n):

    print('子線程開始運行……')

    time.sleep(1)

    my_thread_name = threading.current_thread().name#獲取當前線程名稱

    my_thread_id = threading.current_thread().ident#獲取當前線程id

    print('當前線程爲:{},線程id爲:{},所在進程爲:{},您輸入的參數爲:{}'.format(my_thread_name ,my_thread_id , os.getpid(),n))

    print('子線程運行結束……')

 

t = threading.Thread(target=fun , name='線程1',args=('參數1',))

t.start()

time.sleep(2)

main_thread_name = threading.current_thread().name#獲取當前線程名稱

main_thread_id = threading.current_thread().ident#獲取當前線程id

print('主線程爲:{},線程id爲:{},所在進程爲:{}'.format(main_thread_name ,main_thread_id , os.getpid()))

2.2 類的方式建立線程

import os

import time

import threading

class MyThread(threading.Thread):

    def __init__(self , n , name=None):

        super().__init__()

        self.name = name

        self.n = n

    def run(self):

        print('子線程開始運行……')

        time.sleep(1)

        my_thread_name = threading.current_thread().name#獲取當前線程名稱

        my_thread_id = threading.current_thread().ident#獲取當前線程id

        print('當前線程爲:{},線程id爲:{},所在進程爲:{},您輸入的參數爲:{}'.format(my_thread_name ,my_thread_id , os.getpid(),self.n))

        print('子線程運行結束……')

t = MyThread(name='線程1', n=1)

t.start()

time.sleep(2)

main_thread_name = threading.current_thread().name#獲取當前線程名稱

main_thread_id = threading.current_thread().ident#獲取當前線程id

print('主線程爲:{},線程id爲:{},所在進程爲:{}'.format(main_thread_name ,main_thread_id , os.getpid()))

 

  輸出結果:

  子線程開始運行……

  當前線程爲:線程1,線程id爲:11312,所在進程爲:4532,您輸入的參數爲:1

  子線程運行結束……

  主線程爲:MainThread,線程id爲:10868,所在進程爲:4532

  上述兩塊代碼輸出結果是同樣的(id不同),觀察輸出結果能夠發現,子線程和主線程所在的進程都是同樣的,證實是在同一進程中的進程。

3 Thread的經常使用方法和屬性

3.1 守護線程:Deamon

  Thread類有一個名爲deamon的屬性,標誌該線程是否爲守護線程,默認值爲False,當爲設爲True是表示設置爲守護線程。是不是守護線程有什麼區別呢?咱們先來看看deamon值爲False(默認)狀況時:

import os

import time

import threading

 

def fun():

    print('子線程開始運行……')

    for i in range(6):#運行3秒,每秒輸出一次

        time.sleep(1)

        my_thread_name = threading.current_thread().name

        print('{}已運行{}秒……'.format(my_thread_name ,i+1))

    print('子線程運行結束……')

 

print('主線程開始運行……')

t = threading.Thread(target=fun , name='線程1')

print('daemon的值爲:{}'.format(t.daemon))

t.start()

for i in range(3):

    time.sleep(1)

    my_thread_name = threading.current_thread().name

    print('{}已運行{}秒……'.format(my_thread_name, i+1))

print('主線程結束運行……')

  輸出結果:

  主線程開始運行……

  daemon的值爲:False

  子線程開始運行……

  MainThread已運行1秒……

  線程1已運行1秒……

  MainThread已運行2秒……

  線程1已運行2秒……

  MainThread已運行3秒……

  主線程結束運行……

  線程1已運行3秒……

  線程1已運行4秒……

  線程1已運行5秒……

  線程1已運行6秒……

  子線程運行結束……

  代碼中,主線程只須要運行3秒便可結束,但子線程須要運行6秒,從運行結果中能夠看到,主線程代碼運行結束後,子線程還能夠繼續運行,這就是非守護線程的特徵。

再來看看daemon值爲True時:

import time

import threading

 

def fun():

    print('子線程開始運行……')

    for i in range(6):#運行3秒,每秒輸出一次

        time.sleep(1)

        my_thread_name = threading.current_thread().name

        print('{}已運行{}秒……'.format(my_thread_name ,i+1))

    print('子線程運行結束……')

 

print('主線程開始運行……')

t = threading.Thread(target=fun , name='線程1')

t.daemon=True #設置爲守護線程

print('daemon的值爲:{}'.format(t.daemon))

t.start()

for i in range(3):

    time.sleep(1)

    my_thread_name = threading.current_thread().name

    print('{}已運行{}秒……'.format(my_thread_name, i+1))

print('主線程結束運行……')

  輸出結果:

  主線程開始運行……

  daemon的值爲:True

  子線程開始運行……

  MainThread已運行1秒……

  線程1已運行1秒……

  MainThread已運行2秒……

  線程1已運行2秒……

  MainThread已運行3秒……

  主線程結束運行……

  從運行結果中能夠看出,當deamon值爲True,即設爲守護線程後,只要主線程結束了,不管子線程代碼是否結束,都得跟着結束,這就是守護線程的特徵。另外,修改deamon的值必須在線程start()方法調用以前,不然會報錯。

3.2 join()方法

  join()方法的做用是在調用join()方法處,讓所在線程(主線程)同步的等待被join的線程(下面的p線程),只有p線程結束。咱們嘗試在不一樣的位置調用join方法,對比運行結果。首先在p線程一開始的位置進行join:

import time

import threading

 

def fun():

    print('子線程開始運行……')

    for i in range(6):#運行3秒,每秒輸出一次

        time.sleep(1)

        my_thread_name = threading.current_thread().name

        print('{}已運行{}秒……'.format(my_thread_name ,i+1))

    print('子線程運行結束……')

 

print('主線程開始運行……')

t = threading.Thread(target=fun , name='線程1')

t.daemon=True #設置爲守護線程

t.start()

t.join() #此處進行join

for i in range(3):

    time.sleep(1)

    my_thread_name = threading.current_thread().name

    print('{}已運行{}秒……'.format(my_thread_name, i+1))

print('主線程結束運行……')

  輸出結果:

  主線程開始運行……

  子線程開始運行……

  線程1已運行1秒……

  線程1已運行2秒……

  線程1已運行3秒……

  線程1已運行4秒……

  線程1已運行5秒……

  線程1已運行6秒……

  子線程運行結束……

  MainThread已運行1秒……

  MainThread已運行2秒……

  MainThread已運行3秒……

  主線程結束運行……

  能夠看出,等子線程運行完以後,主線程才繼續join下面的代碼。而後在主線程即將結束時進行join:

import time

import threading

def fun():

    print('子線程開始運行……')

    for i in range(6):#運行3秒,每秒輸出一次

        time.sleep(1)

        my_thread_name = threading.current_thread().name

        print('{}已運行{}秒……'.format(my_thread_name ,i+1))

    print('子線程運行結束……')

 

print('主線程開始運行……')

t = threading.Thread(target=fun , name='線程1')

t.daemon=True #設置爲守護線程

t.start()

for i in range(3):

    time.sleep(1)

    my_thread_name = threading.current_thread().name

    print('{}已運行{}秒……'.format(my_thread_name, i+1))

t.join()

print('主線程結束運行……')

  輸出結果:

  主線程開始運行……

  子線程開始運行……

  MainThread已運行1秒……

  線程1已運行1秒……

  MainThread已運行2秒……

  線程1已運行2秒……

  MainThread已運行3秒……

  線程1已運行3秒……

  線程1已運行4秒……

  線程1已運行5秒……

  線程1已運行6秒……

  子線程運行結束……

  主線程結束運行……

  上面代碼中,子線程是設置爲守護線程的,若是沒有調用join()方法主線程3秒結束,子線程也會跟着結束,可是從運行結果中咱們能夠看出,主線程3秒後,陷入等待,等子線程運行完以後,纔會繼續下面的代碼。

4 線程間的同步機制

  在默認狀況在,多個線程之間是併發執行的,這就可能給數據代碼不安全性,例若有一個全局變量num=10,線程一、線程2每次讀取該變量後在原有值基礎上減1。但,若是線程1讀取num的值(num=10)後,還沒來得及減1,CPU就切換去執行線程2,線程2也去讀取num,這時候讀取到的值也仍是num=10,而後讓num=9,這是CPU有切換回線程1,由於線程1讀取到的值是原來的num=10,因此作減1運算後,也作出num=9的結果。兩個線程都執行了該任務,但最後的值可不是8。以下代碼所示:

import time

import threading

def fun():

    global num

    temp = num

    time.sleep(0.2)

    temp -= 1

    num = temp

print('主線程開始運行……')

t_lst = []

num =10 # 全局變量

for i in range(10):

    t = threading.Thread(target=fun)

    t_lst.append(t)

    t.start()

[t.join() for t in t_lst]

print('num最後的值爲:{}'.format(num))

print('主線程結束運行……')

  輸出結果:

  主線程開始運行……

  num最後的值爲:9

  主線程結束運行……

  最後結果爲9,不是0。這就形成了數據混亂。因此,就有了線程同步機制。

4.1 互斥鎖:Lock

  線程同步可以保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖爲資源設置一個狀態:鎖定和非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態爲「鎖定」,其餘線程不能更改;直到該線程釋放資源,將資源的狀態變成「非鎖定」,其餘的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操做,從而保證了多線程狀況下數據的正確性。

import time

import threading

def fun(lock):

    lock.acquire()

    global num

    temp = num

    time.sleep(0.2)

    temp -= 1

    num = temp

    lock.release()

print('主線程開始運行……')

t_lst = []

num =10 # 全局變量

lock = threading.Lock()

for i in range(10):

    t = threading.Thread(target=fun , args=(lock,))

    t_lst.append(t)

    t.start()

[t.join() for t in t_lst]

print('num最後的值爲:{}'.format(num))

print('主線程結束運行……')

  輸出結果:

  主線程開始運行……

  num最後的值爲:0

  主線程結束運行……

  能夠看到,最後輸出結果爲0,值正確。固然,若是你運行了上述兩塊代碼,你就會發現,使用了鎖以後,代碼運行速度明顯下降,這是由於線程由原來的併發執行變成了串行,不過數據安全性獲得保證。

  使用Lock的時候必須注意是否會陷入死鎖,所謂死鎖是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。關於死鎖一個著名的模型是「科學家吃麪」模型:

import time

from threading import Thread

from threading import Lock

def eatNoodles_1(noodle_lock, fork_lock, scientist):

    noodle_lock.acquire()

    print('{} 拿到了面'.format(scientist))

    fork_lock.acquire()

    print('{} 拿到了叉子'.format(scientist))

    time.sleep(1)

    print('{} 吃到了面'.format(scientist))

    fork_lock.release()

    noodle_lock.release()

    print('{} 放下了面'.format(scientist))

    print('{} 放下了叉子'.format(scientist))

def eatNoodles_2(noodle_lock, fork_lock, scientist):

    fork_lock.acquire()

    print('{} 拿到了叉子'.format(scientist))

    noodle_lock.acquire()

    print('{} 拿到了面'.format(scientist))

    print('{} 吃到了面'.format(scientist))

    noodle_lock.release()

    print('{} 放下了面'.format(scientist))

    fork_lock.release()

    print('{} 放下了叉子'.format(scientist))

 

scientist_list1 = ['霍金','居里夫人']

scientist_list2 = ['愛因斯坦','富蘭克林']

noodle_lock  = Lock()

fork_lock = Lock()

for i in scientist_list1:

    t = Thread(target=eatNoodles_1, args=(noodle_lock, fork_lock, i))

    t.start()

for i in scientist_list2:

    t = Thread(target=eatNoodles_2, args=(noodle_lock, fork_lock, i))

t.start()

  輸出結果:

  霍金 拿到了面

  霍金 拿到了叉子

  霍金 吃到了面

  霍金 放下了面

  霍金 放下了叉子

  愛因斯坦 拿到了叉子

  居里夫人 拿到了面

  霍金吃完後,愛因斯坦拿到了叉子,把叉子鎖住了;居里夫人拿到了面,把面鎖住了。愛因斯坦就想:居里夫人不給我面,我就吃不了面,因此我不給叉子。居里夫人就想:愛因斯坦不給我叉子我也吃不了面,我就不給叉子。因此就陷入了死循環。

  爲了解決Lock死鎖的狀況,就有了遞歸鎖:RLock。

4.2 遞歸鎖:RLock

  所謂的遞歸鎖也被稱爲「鎖中鎖」,指一個線程能夠屢次申請同一把鎖,可是不會形成死鎖,這就能夠用來解決上面的死鎖問題。

import time

from threading import Thread

from threading import RLock

def eatNoodles_1(noodle_lock, fork_lock, scientist):

    noodle_lock.acquire()

    print('{} 拿到了面'.format(scientist))

    fork_lock.acquire()

    print('{} 拿到了叉子'.format(scientist))

    time.sleep(1)

    print('{} 吃到了面'.format(scientist))

    fork_lock.release()

    noodle_lock.release()

    print('{} 放下了面'.format(scientist))

    print('{} 放下了叉子'.format(scientist))

def eatNoodles_2(noodle_lock, fork_lock, scientist):

    fork_lock.acquire()

    print('{} 拿到了叉子'.format(scientist))

    noodle_lock.acquire()

    print('{} 拿到了面'.format(scientist))

    print('{} 吃到了面'.format(scientist))

    noodle_lock.release()

    print('{} 放下了面'.format(scientist))

    fork_lock.release()

    print('{} 放下了叉子'.format(scientist))

 

scientist_list1 = ['霍金','居里夫人']

scientist_list2 = ['愛因斯坦','富蘭克林']

noodle_lock=fork_lock = RLock()

for i in scientist_list1:

    t = Thread(target=eatNoodles_1, args=(noodle_lock, fork_lock, i))

    t.start()

for i in scientist_list2:

    t = Thread(target=eatNoodles_2, args=(noodle_lock, fork_lock, i))

t.start()

  上面代碼能夠正常運行到全部科學家吃完麪條。

  RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖,兩者的區別是:遞歸鎖能夠連續acquire屢次,而互斥鎖只能acquire一次

4.3 Condition

  Condition能夠認爲是一把比Lock和RLOK更加高級的鎖,其在內部維護一個瑣對象(默認是RLock),能夠在建立Condigtion對象的時候把瑣對象做爲參數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的調用內部瑣對象的對應的方法而已。Condition內部經常使用方法以下:

  1)acquire(): 上線程鎖

  2)release(): 釋放鎖

  3)wait(timeout): 線程掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)纔會被喚醒繼續運行。wait()必須在已得到Lock前提下才能調用,不然會觸發RuntimeError。

  4)notify(n=1): 通知其餘線程,那些掛起的線程接到這個通知以後會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已得到Lock前提下才能調用,不然會觸發RuntimeError。notify()不會主動釋放Lock。

  5)notifyAll(): 若是wait狀態線程比較多,notifyAll的做用就是通知全部線程

  須要注意的是,notify()方法、notifyAll()方法只有在佔用瑣(acquire)以後才能調用,不然將會產生RuntimeError異常。

  用Condition來實現生產者消費者模型:

import threading

import time

 

# 生產者

def produce(con):

    # 鎖定線程

    global num

    con.acquire()

    print("工廠開始生產……")

    while True:

        num += 1

        print("已生產商品數量:{}".format(num))

        time.sleep(1)

        if num >= 5:

            print("商品數量達到5件,倉庫飽滿,中止生產……")

            con.notify()  # 喚醒消費者

            con.wait()# 生產者自身陷入沉睡

    # 釋放鎖

    con.release()
# 消費者 def consumer(con): con.acquire() global num print("消費者開始消費……") while True: num -= 1 print("剩餘商品數量:{}".format(num)) time.sleep(2) if num <= 0: print("庫存爲0,通知工廠開始生產……") con.notify() # 喚醒生產者線程 con.wait() # 消費者自身陷入沉睡 con.release() con = threading.Condition() num = 0 p = threading.Thread(target=produce , args=(con ,)) c = threading.Thread(target=consumer , args=(con ,)) p.start() c.start()

  輸出結果:

  工廠開始生產……

  已生產商品數量:1

  已生產商品數量:2

  已生產商品數量:3

  已生產商品數量:4

  已生產商品數量:5

  商品數量達到5件,倉庫飽滿,中止生產……

  消費者開始消費……

  剩餘商品數量:4

  剩餘商品數量:3

  剩餘商品數量:2

  剩餘商品數量:1

  剩餘商品數量:0

  庫存爲0,通知工廠開始生產……

  已生產商品數量:1

  已生產商品數量:2

  已生產商品數量:3

  已生產商品數量:4

4.4 信號量:Semaphore

  鎖同時只容許一個線程更改數據,而信號量是同時容許必定數量的進程更改數據 。繼續用上篇博文中用的的吃飯例子,加入有一下應用場景:有10我的吃飯,但只有一張餐桌,只容許作3我的,沒上桌的人不容許吃飯,已上桌吃完飯離座以後,下面的人才能搶佔桌子繼續吃飯,若是不用信號量,確定是10人一窩蜂一塊兒吃飯:

from threading import Thread

import time

import random

def fun(i):

    print('{}號顧客上座,開始吃飯'.format(i))

    time.sleep(random.random())

    print('{}號顧客吃完飯了,離座'.format(i))


if __name__=='__main__':

    for i in range(20):

        p = Thread(target=fun, args=(i,))

        p.start()

  輸出結果:

  0號顧客上座,開始吃飯

  1號顧客上座,開始吃飯

  2號顧客上座,開始吃飯

  3號顧客上座,開始吃飯

  4號顧客上座,開始吃飯

  5號顧客上座,開始吃飯

  6號顧客上座,開始吃飯

  7號顧客上座,開始吃飯

  8號顧客上座,開始吃飯

  9號顧客上座,開始吃飯

  3號顧客吃完飯了,離座

  4號顧客吃完飯了,離座

  2號顧客吃完飯了,離座

  0號顧客吃完飯了,離座

  8號顧客吃完飯了,離座

  5號顧客吃完飯了,離座

  1號顧客吃完飯了,離座

  6號顧客吃完飯了,離座

  9號顧客吃完飯了,離座

  7號顧客吃完飯了,離座

  使用信號量以後:

from threading import Thread

import time

import random

from threading import Semaphore

def fun(i , sem):

    sem.acquire()

    print('{}號顧客上座,開始吃飯'.format(i))

    time.sleep(random.random())

    print('{}號顧客吃完飯了,離座'.format(i))

    sem.release()

if __name__=='__main__':

    sem = Semaphore(3)

    for i in range(10):

        p = Thread(target=fun, args=(i,sem))

        p.start()

  輸出結果:

  0號顧客上座,開始吃飯

  1號顧客上座,開始吃飯

  2號顧客上座,開始吃飯

  2號顧客吃完飯了,離座

  3號顧客上座,開始吃飯

  0號顧客吃完飯了,離座

  4號顧客上座,開始吃飯

  1號顧客吃完飯了,離座

  5號顧客上座,開始吃飯

  3號顧客吃完飯了,離座

  6號顧客上座,開始吃飯

  5號顧客吃完飯了,離座

  7號顧客上座,開始吃飯

  4號顧客吃完飯了,離座

  8號顧客上座,開始吃飯

  8號顧客吃完飯了,離座

  9號顧客上座,開始吃飯

  9號顧客吃完飯了,離座

  6號顧客吃完飯了,離座

  7號顧客吃完飯了,離座

4.5 事件:Event

  若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時候就能夠用threading爲咱們提供的Event對象,Event對象主要有一下幾個方法:

  isSet():返回event的狀態值;

  wait():若是 isSet()==False將阻塞線程;

  set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

  clear():恢復event的狀態值爲False。

  有以下需求:獲取當前時間的秒數的個位數,若是小於5,設置子線程阻塞,若是大於5則設置子進程非阻塞。代碼以下:

from threading import Event, Thread

import time

from datetime import datetime

def func(e):

    print('子線程:開始運行……')

    while True:

        print('子線程:如今事件秒數是{}'.format(datetime.now().second))

        e.wait()  # 阻塞等待信號  這裏插入了一個Flag  默認爲 False

        time.sleep(1)

e = Event()

p = Thread(target=func, args=(e,))

p.daemon=True

p.start()

for i in range(10):

    s = int(str(datetime.now().second)[-1])#獲取當前秒數的個位數

    if s < 5:

        print('子線程線入阻塞狀態')

        e.clear()  # 使插入的flag爲False 線程線入阻塞狀態

    else:

        print('子線程取消阻塞狀態')

        e.set()  # 線程線入非阻塞狀態

    time.sleep(1)

e.set()

print("主線程運行結束……")

  輸出結果:

  子線程:開始運行……

  子線程:如今事件秒數是43

  子線程線入阻塞狀態

  子線程線入阻塞狀態

  子線程取消阻塞狀態

  子線程取消阻塞狀態

  子線程:如今事件秒數是46

  子線程取消阻塞狀態

  子線程:如今事件秒數是47

  子線程取消阻塞狀態

  子線程:如今事件秒數是48

  子線程取消阻塞狀態

  子線程:如今事件秒數是49

  子線程線入阻塞狀態

  子線程:如今事件秒數是50

  子線程線入阻塞狀態

  子線程線入阻塞狀態

  主線程運行結束……

4.6 定時器:Timer

  若是想要實現每隔一段時間就調用一個函數的話,就要在Timer調用的函數中,再次設置Timer。Timer是Thread類的一個子類。

  若是是多長時間後只執行一次:

import threading

import time

def sayTime(name):

    print('你好,{}爲您報時,如今時間是:{}'.format(name , time.ctime()))


if __name__ == "__main__":

     timer = threading.Timer(2.0, sayTime, ["Jane"])

     timer.start()

輸出結果:

你好,Jane爲您報時,如今時間是:Thu Dec  6 15:03:41 2018

若是要每一個多長時間執行一次:

import threading

import time

def sayTime(name):

    print('你好,{}爲您報時,如今時間是:{}'.format(name , time.ctime()))

    global timer

    timer = threading.Timer(3.0, sayTime, [name])

    timer.start()

if __name__ == "__main__":

     timer = threading.Timer(2.0, sayTime, ["Jane"])

     timer.start()

  輸出結果:

  你好,Jane爲您報時,如今時間是:Thu Dec  6 15:04:30 2018

  你好,Jane爲您報時,如今時間是:Thu Dec  6 15:04:33 2018

  你好,Jane爲您報時,如今時間是:Thu Dec  6 15:04:36 2018

  你好,Jane爲您報時,如今時間是:Thu Dec  6 15:04:39 2018

  ……

5 進程間的通行

5.1隊列:Queue

  python中Queue模塊提供了隊列都實現了鎖原語,是線程安全的,可以在多線程中直接使用。Queue中的隊列包括如下三種:

  1)FIFO(先進先出)隊列, 第一加入隊列的任務, 被第一個取出;

  2)LIFO(後進先出)隊列,最後加入隊列的任務, 被第一個取出;

  3)PriorityQueue(優先級)隊列, 保持隊列數據有序, 最小值被先取出。

  Queue模塊中的經常使用方法以下:

  qsize() 返回隊列的規模

  empty() 若是隊列爲空,返回True,不然False

  full() 若是隊列滿了,返回True,不然False

  get([block[, timeout]])獲取隊列,timeout等待時間

  get_nowait() 至關get(False)

  put(item) 寫入隊列,timeout等待時間,若是隊列已滿再調用該方法會阻塞線程

  put_nowait(item) 至關put(item, False)

  task_done() 在完成一項工做以後,task_done()函數向任務已經完成的隊列發送一個信號

  join() 實際上意味着等到隊列爲空,再執行別的操做。

import queue

import threading

def fun():

    while True:

        try:

            data = q.get(block = True, timeout = 1) #不設置阻塞的話會一直去嘗試獲取資源

        except queue.Empty:

            print(' {}結束……'.format(threading.current_thread().name))

            break

        print(' {}取得數據:{}'.format(threading.current_thread().name , data))

        q.task_done()

        print(' {}結束……'.format(threading.current_thread().name))

print("主線程開始運行……")

q = queue.Queue(5)

#往隊列裏面放5個數

for i in range(5):

    q.put(i)

for i in range(0, 3):

    t = threading.Thread(target=fun , name='線程'+str(i))

    t.start()

q.join() #等待全部的隊列資源都用完

print("主線程結束運行……")

  輸出結果:

  主線程開始運行……

  線程0取得數據:0

  線程0結束……

  線程0取得數據:1

  線程0結束……

  線程1取得數據:2

  線程0取得數據:3

  線程0結束……

  線程0取得數據:4

  線程0結束……

  線程1結束……

  主線程結束運行……

  線程1結束……

  線程2結束……

  線程0結束……

6 線程池

  在咱們上面執行多個任務時,使用的線程方案都是「即時建立, 即時銷燬」的策略。儘管與建立進程相比,建立線程的時間已經大大的縮短,可是若是提交給線程的任務是執行時間較短,並且執行次數極其頻繁,那麼服務器將處於不停的建立線程,銷燬線程的狀態。一個線程的運行時間能夠分爲3部分:線程的啓動時間、線程體的運行時間和線程的銷燬時間。在多線程處理的情景中,若是線程不能被重用,就意味着每次建立都須要通過啓動、銷燬和運行3個過程。這必然會增長系統相應的時間,下降了效率。因此就有了線程池的誕生,

  因爲線程預先被建立並放入線程池中,同時處理完當前任務以後並不銷燬而是被安排處理下一個任務,所以可以避免屢次建立線程,從而節省線程建立和銷燬的開銷,能帶來更好的性能和系統穩定性。

from concurrent.futures import ThreadPoolExecutor

import time

 

def func(n):

    time.sleep(2)

    print(n)

    return n*n

t = ThreadPoolExecutor(max_workers=5) # 最好不要超過CPU核數的5倍

t_lst = []

for i in range(20):

    r = t.submit(func , 1)#執行任務,傳遞參數

    t_lst.append(r.result())#獲取任務返回結果

t.shutdown()#至關於close() + join()

print(t_lst)

print('主線程運行結束……')

7 總結

  關於Python併發編程中的多線程部分就介紹完了,其中諸多描述略帶倉儲,後續博文中再來補充。

參考資料:

  https://www.cnblogs.com/linshuhui/p/9704128.html

       http://www.javashuo.com/article/p-bwrbawmd-du.html

  https://www.cnblogs.com/chengd/articles/7770898.html

相關文章
相關標籤/搜索