Python自動化開發學習9-多線程、隊列

threading 模塊

先理解一下進程與線程的概念和區別,而後經過threading模塊來學習理解線程。進程要下次講了。
以後看一下兩種調用線程的方式,效果和實現都同樣。貌似也沒有何時用哪一種,反正愛用哪一種用哪一種。通常的話直接調用就行了。python

線程與進程

線程,是操做系統可以進行運算調度的最小單位。
進程,是對各類資源管理的集合。
進程就是一個執行中的程序。程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。
進程要進行運算,必需要先建立一個線程。由於進程不具有執行的動做,可是他包含線程,經過線程來進行運算。全部在同一個進程裏的線程,是共享同一塊內存空間的。編程

直接調用

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    t1 = threading.Thread(target=task, args=(1,))  # 生成一個線程實例
    t2 = threading.Thread(target=task, args=(2,))  # 再生成一個實例
    t1.start()  # 啓動線程
    t2.start()  # 再啓動一個
    print(t1.getName())  # 獲取線程名
    print(t2.getName())

參數注意:函數名和參數要分開寫。而且參數要寫成元組的形式,這裏只有一個參數,因此也必須用括號括起來後面加個逗號,表示這是一個元組。多線程

繼承式調用

上面是直接實例化了 threading.Thread這個類,咱們也能夠像下面這樣先繼承這個類,而後重構它的run方法。併發

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, num):
        super(MyThread, self).__init__()  # 繼承父類的構造函數
        self.num = num
    def run(self):
        "每一個線程要運行的函數,必須寫到run方法裏"
        print("running on task", self.num)
        time.sleep(3)
        print("task over", self.num)

if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()  # 這裏就會執行run方法
    t2.start()
    print(t1.getName())
    print(t2.getName())

在直接調用中,就是將你的函數名和運行參數,在實例化的時候,經過類的構造函數傳遞給了threading.Thread類的run方法。而這裏咱們是重構了這個run方法app

使用for循環調用多個線程

若是須要一次調用多個線程,就不能再像上面那樣一個一個寫了。能夠用一個for,來循環調用啓動ssh

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        t.start()
    print("所有運行結束?")  # 注意這句print執行的時間

程序主函數的線程

在上面的例子中,最後的print並無等待以前的sleep運行結束,而是直接執行了。這裏主函數是一個線程,其餘使用threading啓動的都是這個主線程啓動的子線程。全部線程都是獨立執行的,主線程啓動了子線程以後二者就相互獨立了,相互獨立並行執行。
咱們能夠經過 threading.current_thread() 獲取到當前的線程名:ide

import threading
import time

def task(num):
    print("running on task", num, threading.current_thread())
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    print("running on Main", threading.current_thread())
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        t.start()
    print("所有運行結束?")  # 注意這句print執行的時間

能夠看到,主函數是MainThread。每個線程都有線程名和線程號。函數

join方法

若是咱們但願全部的子線程都是並行的,可是主函數須要等待全部子線程都執行完畢後再統一繼續執行,就須要join方法。
join方法,是等待這個線程執行完畢後纔會繼續執行以後的語句。每個線程都要有一個join,不然就不會等待這個線程執行完畢。oop

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    t1 = threading.Thread(target=task, args=(1,))  # 生成一個線程實例
    t2 = threading.Thread(target=task, args=(2,))  # 再生成一個實例
    t1.start()  # 啓動線程
    t2.start()  # 再啓動一個
    t1.join()  # 爲每一個線程加一個join
    t2.join()
    print(t1.getName())
    print(t2.getName())

上面是2個子線程的狀況,若是是以前那樣的50個子線程,那麼還須要再寫一個for循環來執行join。此次能夠來計算一下程序的運行時間。學習

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    t_objs = []  # 先定義一個空列表
    start_time = time.time()
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        t.start()
        t_objs.append(t)  # 保存每個實例,不然跳出當前for循環後沒法調用
    for j in t_objs:
        j.join()
    print("總共運行時間:", time.time()-start_time)  # 計算總共的運行時間
    print("所有運行結束?")  # 注意這句print執行的時間

活動的線程個數

經過 threading.active_count() 能夠獲取到活動的線程個數

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    print(threading.active_count())  # 運行前是1,由於有一個主函數的線程
    t_objs = []  # 先定義一個空列表
    start_time = time.time()
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        t.start()
        t_objs.append(t)  # 保存沒一個實例,不然跳出當前for循環後沒法調用
    print(threading.active_count())  # 全部子線程都起來了,一個50個,再加上一個主函數
    for j in t_objs:
        j.join()
    print(threading.active_count())  # 全部子線程都結束了,又只剩一個主函數了
    print("總共運行時間:", time.time()-start_time)  # 計算總共的運行時間

守護線程

以前的進程互相之間都是獨立的,雖然有父線程建立子線程,可是建立完以後,這2個線程也就互相獨立了。這個是默認的設置。
守護線程,在建立完線程尚未運行以前,能夠將線程設置爲守護線程。守護線程依賴主線程而存在,主線程一旦運行完畢,守護線程不管是什麼狀況,都會中止。
二者的差異就是,以前的狀況,全部線程都是獨立運行的。若是沒有使用join,全部的線程包括主線程都是獨立運行,當全部線程所有運行結束後,咱們的程序纔會結束。若是將線程設爲守護線程後,那麼當主線程和其餘線程運行完以後,不會等待守護線程運行結束,程序會直接結束。

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)  # 設爲守護線程後,不會等待守護線程運行結束了
    print("task over", num)  # 因此這裏也不會打印了

if __name__ == '__main__':
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        # 將線程設置爲守護線程,當主線程退出時,守護線程也會退出
        # 而且由這個守護線程啓動的其它子線程也是守護線程,也會會同時退出,無論是否執行完任務
        t.setDaemon(True)  # 必須在建立線程後,可是在運行前才能將線程設置爲守護線程
        t.start()
    print("運行結束,進程數量", threading.active_count())  # 這裏不會等待子線程運行完畢

小結

線程類的方法:

  • start :線程準備就緒,等待CPU調度
  • setName :爲線程設置名稱
  • getName :獲取線程名稱
  • setDaemon:設置守護線程,設爲True就是守護線程。默認False
  • join :逐個執行每一個線程,執行完畢後繼續往下執行
  • run :線程被cpu調度後自動執行線程對象的run方法
  • threading.current_thread : 查看當前的線程名
  • threading.active_count : 查看活動線程的數量

Python GIL

Python GIL(Global Interpreter Lock),全程解釋器鎖。不管你啓多少個線程,你的cpu是多少核,Python在執行的時候只能是單核運行。這個是使用Python解析器(CPython)時會有的狀況。Python除了(CPython)還能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行,可是CPython是大部分環境下默認的Python執行環境。
因此咱們用CPython就會有GIL,有GIL就不是真正的多線程,只能單核運行。咱們仍是會繼續在CPython下學習和運行Python,GIL仍是會繼續存在。目前只要知道這麼多就好了,怎麼利用多核是下次的內容。

線程鎖

這段上課演砸了,不過不要緊,我大概搞明白了。
一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,這是數據就混亂了。下面模擬沒有線程鎖形成數據混亂的狀況。

import threading
import time

gl_num = 0
def show():
    global gl_num  # 聲明全局變量
    gl_num += 1  # 先執行自增1,而後停頓一會
    # 把下面的sleep註釋掉,多是數據處理到輸出之間幾乎沒有間隔,看不到數據混亂的狀況
    time.sleep(0.1)  # sleep的這段時間,其餘線程也會操做這個變量
    print(gl_num)  # 最後輸出的時候,就是全部線程操做後的結果

for i in range(10):
    t = threading.Thread(target=show)
    t.start()
print("運行結束")

因此,出現了線程鎖,同一時刻只容許一個線程修改數據。

import threading
import time

gl_num = 0
lock = threading.Lock()  # 申請一把鎖,生成一個實例
def show():
    global gl_num
    lock.acquire()  # 修改數據前加鎖,此時別的線程就沒法操做了。
    gl_num += 1
    time.sleep(0.1)
    print(gl_num)
    lock.release()  # 上面打印出結果了,釋放鎖容許別的線程繼續操做

for i in range(10):
    t = threading.Thread(target=show)
    t.start()
print("運行結束")

上面加了鎖之間的內容其實就變成了串行執行了。

遞歸鎖

在使用線程鎖的時候,若是你須要用到多把鎖嵌套使用,可能會致使程序鎖死,永遠沒法release。下面演示一個會出現鎖死的狀況。說白了,就是大鎖中還要再包含子鎖。

import threading

def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)

if __name__ == '__main__':
    num, num2 = 0, 0
    # 下面使用了遞歸鎖RLock,能夠正常執行,若是換成以前的Lock,就會出現鎖死的狀況
    lock = threading.RLock()
    # 其實這裏都不是線程數量的問題,起一個子線程就會鎖死了
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()
while threading.active_count() != 1:  # 這裏沒用join,而是經過判斷活躍線程數來確認子線程是否執行完畢
    print("活動線程數量:", threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

在上面的代碼中,程序會先進入run3,run3中有第一道鎖。而後在run3中會分別去執行run1和run2,而這裏面又都會有第二道鎖。此時多是程序吧兩道鎖搞混了,因此致使再也release不出來了。使用RLock來代替Lock就避免了這種狀況。
也沒講RLock和Lock的其餘區別,既然RLock沒問題,貌似不用Lock就行了。簡單的場景都OK,複雜的場景下必定要用遞歸鎖避免程序被鎖死。

信號量

線程鎖,同時只容許一個線程更改數據。
信號量(Semaphore),是同時容許必定數量的線程更改數據。好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。
信號量的使用就和線程鎖同樣,實例化的時候類名變了,多一個int參數。以後都是同樣的acquire上鎖和release釋放。

import threading,time

# 通常應該是寫到if裏的,不過這裏演示,從上到下按順序執行邏輯比較清晰
semaphore  = threading.BoundedSemaphore(5)  # 最多容許5個線程同時運行
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s" % n)
    semaphore.release()

if __name__ == '__main__':
    for i in range(30):
        t = threading.Thread(target=run, args=(i,))
        t.start()
    while threading.active_count() != 1:
        pass  # 繼續用活動進程數量等待子程序結束
    else:
        print("全部進程執行完畢")

上面代碼執行後的效果就是,每次只蹦5條結果出來。由於經過信號量限制了同一時間只容許必定數量的線程操做數據。
這裏的代碼比較簡單,並且都是同樣的,因此看上去是每次執行5個。實際是這裏面每個都是獨立的。一旦有一個執行完釋放了以後,就會讓下一個繼續執行。就是線程都是一個一個放行的,一旦一個執行完畢就放行下一個,而不是一批一批放行的。

定時器 timer

從線程被start方法調用開始,定時器開始計時。計時完畢後纔會開始執行

import threading
def hello(name):
    print("你好,%s" % name)
t = threading.Timer(2, hello, args=("世界",))
t.start()  # 須要等待上面指定的秒數後纔會真正執行

事件 event

事件(event),用於線程之間數據同步的。經過 event 來實現兩個或多個線程間的交互。
下面把用於控制的線程稱爲服務端,被控制的線程稱爲客戶端。
服務端,設置 event 的狀態,只有 set 和 clear 兩個方法
客戶端,檢查 event 的狀態,若是是 set 就繼續執行,不然就阻塞等待 set

  • event = threading.Event() :使用前,先生成一個實例
  • event.set() :服務端線程,將 event 設爲 set ,客戶端就能夠繼續執行
  • event.clear() :服務端線程,將 event 的 set 清除,客戶端會阻塞直到再次設爲 set
  • event.wait() :客戶端線程,等待 event 變成 set ,若是 set 就繼續,不然就阻塞直到 set
  • event.is_set() :布爾值,當前event的狀態。客戶端線程也能夠用它來作控制。可是若是不是 set 和 clear時都須要執行的話,仍是用 wait 來控制比較好。wait是用阻塞來控制的,而這裏是每次都要作一下判斷。

課上舉例了一個紅綠燈的例子,起一個紅綠燈的線程,若是綠燈就 set ,紅燈就 clear。而後能夠起幾個車的線程,判斷event,只在set的時候執行:

import threading, time

event = threading.Event()  # 這句應該寫到if __name__裏面,先放這裏看看清楚
def light():
    "模擬紅綠燈"
    count = 0
    while True:
        if count < 10:
            event.set()  # 綠燈將標誌位設爲 set
            print("\033[42;1m**\033[0m綠燈", 10-count, 'event狀態:', event.is_set())
        elif count < 20:
            event.clear()  # 紅燈清除標誌位
            print("\033[41;1m**\033[0m紅燈", 20-count, 'event狀態:', event.is_set())
        else:
            count = 0
            continue
        time.sleep(1)
        count += 1

def car(name):
    "在紅燈的時候wait,綠燈的時候執行"
    while True:
        event.wait()  # 只有在綠燈的時候纔會繼續執行
        print(name, '正在行駛...')
        time.sleep(1)

if __name__ == '__main__':
    light1 = threading.Thread(target=light)
    light1.start()
    # 你也能夠用for循環多起幾個車
    car1 = threading.Thread(target=car, args=("特斯拉",))
    car1.start()
    car2 = threading.Thread(target=car, args=("保時捷",))
    car2.start()

雖然不用 event 也能夠經過檢查某個變量的狀態來實現控制。可是因爲變量是進程間全部線程共享的,客戶端直接訪問控制變量也能夠修改它,雖然你程序裏可能不會這麼寫,但不是不能夠。這裏使用了 event 將這個過程封裝了,避免客戶端直接訪問這個變量。

queue模塊-隊列

隊列,能夠理解爲一個有順序的容器,裏面存放數據。數據來的時候先將數據存入,使用數據的時候再按必定順序從容器中取出。
其實列表也能實現,用pop方法。簡單的話,還真的用列表就行了,不過模塊封裝了更多高級的設置。
有3種隊列:

import queue
# 使用以前也是要先生成一個實例
q1 = queue.Queue()  # 先進先出
q2 = queue.LifoQueue()  # 後進先出,就是堆棧。last in first out (LIFO)
q3 = queue.PriorityQueue()  # 能夠設置優先級的隊列

上面再實例化的時候,都沒有參數。有一個參數 maxsize=0 ,默認的隊列大小是沒有限制。能夠設置 maxsize 來設置隊列大小。

存取數據

put(item) :存入一個數據
get(item) :取出一個數據
put_nowait(item) :存入數據的另一個方法,不等待,直接拋出異常
get_nowait(item) :取出數據的另一個方法,不等待,直接拋出異常
可選的參數,用於 put 和 get 方法:
block=False :默認爲True,就是阻塞模式。設爲False,則直接拋出異常,下面的 timeout 也就無效了。
timeout=1 :默認爲None,就是一直等着。設置後爲阻塞多少秒,若是這段時間內能夠繼續了,就立刻繼續。不然仍是拋出異常。
put在存入數據的時候,若是隊列設置了大小,而且隊列已滿,就會阻塞。直到隊列裏有數據被取出空出了位置,那麼再將這個數據存入繼續。可使用 put_nowait 存入,那麼滿的時候,就不直接拋出一個錯誤。
也能夠設置參數 block=False 也是同樣直接拋出錯誤,默認是True。
還能夠設置參數 timeout=1 這個是等待的時間,好比這裏等待1秒,若是1秒內隊列空出來,就存入,不然仍是拋出異常。若是 block 已經設置了 False 這個 timeout 就沒有用了,會直接拋出錯誤。
get也是同樣,用上面的兩個參數控制,在隊列爲空的狀況下繼續要取數據,是阻塞,仍是拋出異常,或者阻塞多久後再拋出異常。
下面的列子,生成2個線程,一個存入數據,一個用戶控制來取出數據,演示put和get的用法:

import threading, queue

q1 = queue.Queue(maxsize=2)  # 隊列大小隻有2
def put_item():
    "存入數據的線程"
    for i in range(10):
        q1.put(i, timeout=2)  # 若是隊列已滿,最長等待2秒鐘,不然拋出異常
        print("存入數據:", i)  # 存入數據的時候,你會看到
    print("數據已經所有存入...")
def control():
    "用戶控制取出數據"
    while True:
        input("按回車取出一個數據,若是等待超過2秒會拋出異常... ")
        print("取出數據:", q1.get_nowait())  # 若是隊列空了,再要取就拋出異常

if __name__ == '__main__':
    put1 = threading.Thread(target=put_item)
    put1.start()
    control1 = threading.Thread(target=control)
    control1.start()

其餘方法

qsize() :隊列中元素的個數,0就是空隊列,等於maxsize就是隊列滿了
empty() :布爾值,隊列是否爲空。空的話就get不到數據了
full() :布爾值,隊列是否已滿。滿的話就put不進數據了

import queue
q = queue.LifoQueue(3)  # 來一個堆棧,大小隻有3
def show():
    "每次打印這3個數據"
    print('隊列中的數據數量:', q.qsize(),
          '隊列是否爲空:', q.empty(),
          '隊列是否滿了', q.full())
show()  # 什麼都尚未存入,大小是0,而且是空的
q.put(1)
show()  # 存入數據後,大小就變了,已經不是空堆棧了,可是還沒滿
q.put(2)
show()
q.put(3)
show()  # 如今已經滿了
print(q.get())  # 看看取出數據的順序,是否是最後存入的最早取出
print(q.get())
print(q.get())
show()  # 都取完了,如今又是一個空堆棧了

優先級隊列

優先級隊列例子1:

import queue

q1 = queue.PriorityQueue()

q1.put('Jack')
q1.put('Perter')
q1.put('Alice')
q1.put('1Zoey')

while not q1.empty():  # empty隊列是否爲空
    print(q1.get())

從上面能夠看到,最後數據的結果給按ASCII順序取出來的。通常可能不這麼用,而是會單獨設置一個優先級,那麼就把優先級和數據已元組的形式存入
優先級隊列例子2:

import queue

q1 = queue.PriorityQueue()

q1.put((10, 'Jack'))  # 注意數據只佔1個參數位置,因此這裏得2層括號,存入一個元組
q1.put((-1, 'Zoey'))
q1.put((10, 'Perter'))
q1.put((-1, 'Alice'))
q1.put((-1, 'Bob'))

while not q1.empty():
    print(q1.get())

重構put方法,優化排序的規則

這裏還有一個問題,雖然將數據寫成了元組,可是其實排序的時候是按整個元組的內容來排序的。可是咱們須要的是按照元組的第一個元素來排序,而一樣優先級的數據,仍然按照進入的順序輸出。貌似模塊自己並無提供這樣的方法。咱們重構一個本身的優先級排序。
源碼中調用put方法,最終會調用_put方法將數據導入;
而後在_put方法中調用了heapq模塊(import queque的時候導入的)的heappush方法將數據傳遞給_siftdown方法;
最終在_siftdown方法中從新排列存儲數據的列表。問題就是這個方法,直接從列表中取出元素進行比較,而咱們須要的是取出的元素是個元組,用元組的第一個元素進行比較。
因此只要修改上面3個方法就行了。

import queue

# 繼承優先級排序的類,只重構咱們須要修改的方法
class MyPriorityQueue(queue.PriorityQueue):
    "根據元組的第一個元素進行優先級排序"
    def _put(self, item):
        "源碼是調用heapq模塊裏的方法,這裏咱們就在下面重構了,調用本身的類裏的同名方法"
        self.heappush(self.queue, item)

    # 下面的2個方法是heapq模塊裏的,抄過來小改一下就行了
    def heappush(self, heap, item):
        "這裏不用改實現的邏輯,讓它繼續調用咱們下面的_siftdown方法就行了"
        heap.append(item)
        self._siftdown(heap, 0, len(heap) - 1)

    def _siftdown(self, heap, startpos, pos):
        """主要就是修改這裏
        源碼是從heap裏取出數據進行比較:if newitem < parent:
        這裏改爲比較數據裏的第一個元素:if newitem[0] < parent[0]:
        這裏寫的比較簡單。若是存入的不是元素應該就會報錯了,不過這不是重點了,須要能夠再改
        """
        newitem = heap[pos]
        while pos > startpos:
            parentpos = (pos - 1) >> 1
            parent = heap[parentpos]
            if newitem[0] < parent[0]:
                heap[pos] = parent
                pos = parentpos
                continue
            break
        heap[pos] = newitem

q1 = MyPriorityQueue()
q1.put((10, 'Jack'))  # 注意數據只佔1個參數位置,因此這裏得2層括號,存入一個元組
q1.put((-1, 'Zoey'))
q1.put((10, 'Perter'))
q1.put((-1, 'Alice'))
q1.put((-1, 'Bob'))
while not q1.empty():
    print(q1.get())

上面好長一段,其實我只須要改一行,該怎麼作最好呢?

task_done 方法和 join 方法

task_done :每次從隊列中取出數據而且處理好以後,調用一下這個方法,來提示 join 方法,是否中止阻塞。
join :阻塞,直到隊列清空,再繼續向日後執行。
具體例子看下面的生產者消費者模型,舉例一

生產者消費者模型

回顧,以前講迭代器的時候,經過yield生成器實現過一個單線程下的有並行效果的吃包子的函數。看以前 的筆記:Python自動化開發學習4-2
如今,學習了多線程和隊列以後,咱們能夠用多線程來實現了,把數據存放到隊列之中。

概念

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
在線程裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者之間不直接通訊,而是經過隊列來傳遞數據。因此生產者生產完數據以後不用等待消費者處理,直接放入隊列;消費者不找生產者要數據,而是直接從隊列裏取,隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力,這樣就實現瞭解耦。
用以前學習的多線程和隊列的知識,就能夠實現這樣的模型了。

舉例一

舉例說明。寫一個生產者的函數,生產包子。再寫一個消費者的函數,消費包子。用上面的 task_done 和 join 的方法。
生產者一次生產10個包子,放入隊列,而後執行 join 阻塞,等到10個包子所有被取完了,纔會繼續。
消費者取包子,不是直接找生產者,並且從隊列裏去取。取了以後執行一下 task_done 通知如下生產者看看包子有沒有被取完。

import queue, threading, time

def producer():
    while True:
        for i in range(10):
            q.put("包子 %s" % i)
        print("已經放入了10個包子")
        q.join()  # 阻塞,知道包子被取完
        print("包子已經被取完了...")
def consumer(name):
    while True:
        # tmp = q.get()
        print("%s 吃了一個包子 %s" % (name, q.get()))
        q.task_done()  # 通知join方法,取了一個數據了
        time.sleep(1)

if __name__ == '__main__':
    q = queue.Queue()
    p = threading.Thread(target=producer,)
    p.start()
    c1 = threading.Thread(target=consumer, args=('Eric',))
    c1.start()
    c2 = threading.Thread(target=consumer, args=('Lassie',))
    c2.start()
    c3 = threading.Thread(target=consumer, args=('Snoopy',))
    c3.start()

上面的例子中生產數據爽哦速度遠遠高於消費數據的速度,咱們用阻塞來控制,防止生產了過多的數據來不及消費

舉例二

隊列自己就有本身的阻塞模式,因爲使用了生產者消費者模型實現瞭解耦。咱們沒必要關心生產和消費數據的速度。若是數據消費的快,在取空數據的時候就會進入阻塞,直到生產者把數據加入隊列。數據生產過快的狀況也是同樣,隊列滿了天然進入阻塞,直到消費者消費了。

import queue, threading, time

# 隨意調整一次生產的數量,以及每次sleep的時間間隔
def producer():
    while True:
        for i in range(5):
            q.put("包子 %s" % i)
        print("已經放入了5個包子")
        time.sleep(2)

def consumer(name):
    while True:
        print("%s 吃了一個包子 %s" % (name, q.get()))
        time.sleep(1)

# 這裏也是能夠多開幾個生產者和消費者
if __name__ == '__main__':
    q = queue.Queue(10)  # 此次設定一下隊列的大小,若是生產過快,也會阻塞等待消費
    p = threading.Thread(target=producer,)
    p.start()
    c1 = threading.Thread(target=consumer, args=('Eric',))
    c1.start()
    c2 = threading.Thread(target=consumer, args=('Lassie',))
    c2.start()
    c3 = threading.Thread(target=consumer, args=('Snoopy',))
    c3.start()

做業

類 Fabric 主機管理程序開發:

  1. 運行程序列出主機組或者主機列表
  2. 選擇指定主機或主機組
  3. 選擇讓主機或者主機組執行命令或者向其傳輸文件(上傳/下載)
  4. 充分使用多線程或多進程
  5. 不一樣主機的用戶名密碼、端口能夠不一樣

補充-線程池-concurrent.futures模塊

上面講線程的內容裏只有信號量,並無線程池。信號量只是限制同事運行的線程,可是全部線程應該是所有都建立好的。線程佔用的資源比較少,這也沒太大問題。不過若是要限制同一時間建立的線程的數量,就須要線程池。

從Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。

例子:

from concurrent.futures import ThreadPoolExecutor

def ssh_cmd(obj):
    pass

# 執行者:建立線程池
executor = ThreadPoolExecutor(5)
for obj in objs:
    executor.submit(ssh_cmd, obj)  # 第一個參數是方法,以後的參數都是變量
executor.shutdown(wait=True)

項目裏只用到這麼多。

相關文章
相關標籤/搜索