線程、進程、協程和GIL(三)

上一篇文章介紹了:建立線程的兩種方式、Event對象判斷線程是否啓動、利用信號量控制線程併發。html

博客連接:線程、進程、協程和GIL(二)程序員

這一篇來講說線程間通訊的那些事兒: 安全

  一個線程向另外一個線程發送數據最安全的方式就是使用queue庫中的隊列了,經過建立一個供多個線程共享的Queue對象,這些線程使用put()和get()操做來向隊列中添加數據或者從隊列中取出數據,以達到線程間通訊的效果。微信

  queue隊列基本方法:數據結構

    queue.Queue(maxsize = num):  FIFO  先進先出隊列,若是maxsize小於或等於0 則表明隊列長度無線。併發

    queue.LifoQueue(maxsize = num): LIFO 後進先出隊列(相似於棧),若是maxsize小於或等於0 則表明隊列長度無線。app

    Queue.qsize(): 返回當前隊列中元素的個數函數

         Queue.empty()   若是隊列爲空,返回True,反之False ui

         Queue.full()   若是隊列滿了,返回True,反之Falsespa

         Queue.get([block[, timeout]])   讀隊列,timeout等待時間 

         Queue.put(item, [block[, timeout]])   寫隊列,timeout等待時間 

         Queue.queue.clear()   清空隊列

  使用Queue構造生產者消費者模型來實現線程間的通訊:

import time
from queue import Queue,LifoQueue
from threading import Thread

def producer(in_q):
    while True:
        time.sleep(1)
        data = '包子'
        if in_q.full() == True:
            print('蒸籠滿了,放不下了')
        in_q.put(data)  # 向隊列中塞東西
        print('小明蒸%s!' %(data))

def customer(out_q):
    while True:
        time.sleep(3)
        if out_q.empty() == True:
                print('小紅沒取到包子,餓死了!')
        data = out_q.get() # 從隊列中取東西
        print('小紅取 %s ' % (data))

if __name__ == '__main__':
    q = Queue(maxsize=3)
    t1 = Thread(target=producer, args=(q,))
    t2 = Thread(target=customer, args=(q,))
    t1.start()
    t2.start()

  上面的代碼實現了一個簡單的生產者消費者,小明負責蒸包子,小紅負責吃包子。當隊列被包子塞滿時,小明就再也放不進去了,此時生產者這個線程就會阻塞。當小紅將隊列中的包子吃完時,消費者這個線程就會阻塞。由於Queue對象已經封裝了必要的鎖,因此咱們能夠在多個線程之間安全的功能共享數據。可是在生產者消費者的關閉問題會有一些麻煩,通用的解決方式就是在隊列中放置一個特殊值,當消費者讀到這個值時,就終止執行。

  不過有個問題須要注意:向隊列中添加數據項時,並不會複製此數據項,線程間的通訊其實是在線程間傳遞對象引用。若是你單線對象的共享狀態,那麼最好只傳遞不可修改的數據結構(如:整型、字符串、或者元組)或者一個對象的深拷貝。

  給關鍵部分加鎖

  線程的不安全:同一進程裏線程是共享數據的,當各個線程訪問同一個數據資源時會出現競爭狀態,即數據可能會同時被多個線程佔用,形成數據混亂,這就是線程的不安全。

   爲了保證線程安全,因此引進了互斥鎖,確保某段關鍵代碼、共享數據只能由一個線程從頭至尾完整地執行:

  顯式的加鎖:

from threading import Thread, Lock

num = 0
lock = Lock()  # 定義一個鎖

def run():
    global num, lock  # 獲取全局變量
    lock.acquire()  # 加鎖
    num += 1
    print(num)
    lock.release()  # 釋放鎖

if __name__ == '__main__':
    Thread_list = []
    for i in range(1000):
        t = Thread(target=run)
        Thread_list.append(t)
    for i in Thread_list:
        i.start()

  死鎖:可是加入互斥鎖以後有可能會產生一個問題:死鎖:若干子線程在系統資源競爭時,都在等待對方對某部分資源解除佔用狀態,結果誰也不肯意先解鎖,互相等着,程序沒法執行下去,這就是死鎖。

  好比:有兩個線程1、二,兩個共享資源A、B,線程一給資源A加鎖,線程二給資源B加鎖,而後資源A須要訪問資源B,資源B須要調用資源A,線程一二雙方都在等待對方釋放鎖,因此就會形成死鎖。

   But、當程序員在加鎖以後忘記調用release()方法,或者加鎖以後程序拋異常致使不能正常釋放鎖,有可能會形成死鎖,爲了不這種狀況,咱們不須要顯式的手動加鎖和釋放鎖,而是使用with語句來進行自動控制:

  

from threading import Thread, Lock

num = 0
lock = Lock()  # 定義一個鎖

def run():
    global num, lock
    with lock: # 自動的控制加鎖和釋放鎖
        num += 1
        print(num)

if __name__ == '__main__':
    Thread_list = []
    for i in range(1000):
        t = Thread(target=run)
        Thread_list.append(t)
    for i in Thread_list:
        i.start()

  建立一個線程池: 

  concurrent.futures 函數庫有一個 ThreadPoolExecutor 類能夠被用來完成這個任務

from concurrent.futures import ThreadPoolExecutor

def run():
    print('我是子線程')

if __name__ == '__main__':
    pool = ThreadPoolExecutor(max_workers=3)  # 建立一個容量爲3的線程池
    for i in range(3):
        t = pool.submit(run,)  #在線程池中生成三個線程,他們都來調用run方法
    print('我是主線程')

 

想了解更多Python關於爬蟲、數據分析的內容,歡迎你們關注個人微信公衆號:悟道Python

  

相關文章
相關標籤/搜索