Python | 感知線程狀態的解決方案,Event與信號量

本文始發於我的公衆號:TechFlow,原創不易,求個關注web


今天是Python專題的第21篇文章,咱們繼續多線程的話題。多線程

上週的文章當中咱們簡單介紹了線程和進程的概念,以及在Python當中如何在主線程以外建立其餘線程,而且還了解了用戶級線程和後臺線程的區別以及使用方法。今天咱們來看看線程的其餘使用,好比如何中止一個線程,線程之間的Event用法等等。併發

中止線程

利用Threading庫咱們能夠很方便地建立線程,讓它按照咱們的想法執行咱們想讓它執行的事情,從而加快程序運行的效率。然而有一點坑爹的是,線程建立以後,就交給了操做系統執行,咱們沒法直接結束一個線程,也沒法給它發送信號,沒法調整它的調度,也沒有其餘高級操做。若是想要相關的功能,只能本身開發。socket

怎麼開發呢?編輯器

咱們建立線程的時候指定了target等於一個咱們想讓它執行的函數,這個函數並不必定是全局函數,實際上也能夠是一個對象中的函數。若是是對象中的函數,那麼咱們就能夠在這個函數當中獲取到對象中的其餘信息,咱們能夠利用這一點來實現手動控制線程的中止。函數

提及來好像不太好理解,可是看下代碼真的很是簡單:工具

import time
from threading import Thread  class TaskWithSwitch:  def __init__(self):  self._running = True   def terminate(self):  self._running = False   def run(self, n):  while self._running and n > 0:  print('Running {}'.format(n))  n -= 1  time.sleep(1)  c = TaskWithSwitch() t = Thread(target=c.run, args=(10, )) t.start() c.terminate() t.join() 複製代碼

若是你運行這段代碼,會發現屏幕上只輸出了10,由於咱們將_running這個字段置爲False以後,下次循環的時候再也不知足循環條件,它就會本身退出了。flex

若是咱們想要用多線程來讀取IO,因爲IO可能存在堵塞,因此可能會出現線程一直沒法返回的狀況。也就是說咱們在循環內部卡死了,這個時候單純用_running來判斷仍是不夠的,咱們須要在線程內部設置計時器,防止循環內部的卡死。ui

class IOTask:
 def __init__(self):  self._running = True   def terminate(self):  self._running = False   def run(self, sock):  # 在socket中設置計時器  sock.settimeout(10)  while self._running:  try:  # 因爲設置了計時器,因此這裏不會永久等待  data = sock.recv(1024)  break  except socket.timeout:  continue  return 複製代碼

線程信號的傳遞

咱們之因此如此費勁才能控制線程的運行,主要緣由是線程的狀態是不可知的,而且咱們沒法直接操做它,由於它是被操做系統管理的。咱們運行的主線程和建立出來的線程是獨立的,二者之間並無從屬關係,因此想要實現對線程的狀態進行控制,每每須要咱們經過其餘手段來實現。url

咱們來思考一個場景,假設咱們有一個任務,須要在另一個線程運行結束以後才能開始執行。要想要實現這一點,就必須對線程的狀態有所感知,須要其餘線程傳遞出信號來才行。咱們可使用threading中的Event工具來實現這一點。Event工具就是能夠用來傳遞信號的,就好像是一個開關,當一個線程執行完成以後,會去啓動這個開關。而這個開關控制着另一段邏輯的運行。

咱們來看下樣例代碼:

import time
from threading import Thread, Event  def run_in_thread():  time.sleep(1)  print('Thread is running')  t = Thread(target=run_in_thread) t.start()  print('Main thread print')  複製代碼

咱們在線程裏面就只作了輸出一行提示符,沒有其餘任何邏輯。因爲咱們在run_in_thread函數當中沉睡了1s,因此必定是先輸出Main thread print再輸出的Thread is running。假設這個線程是一個很重要的任務,咱們但願主線程可以等待它運行到一個階段再往下執行,咱們應該怎麼辦呢?

注意,這裏說的是運行到一個階段,並非運行結束。運行結束咱們很好處理,能夠經過join來完成。但若是不是運行結束,而是運行完成了某一個階段,固然經過join也能夠,可是會損害總體的效率。這個時候咱們就必需要用上Event了。加上Event以後,咱們再來看下代碼:

import time
from threading import Thread, Event  def run_in_thread(event):  time.sleep(1)  print('Thread is running')  # set一下event,這樣外面wait的部分就會被啓動  event.set()  # 初始化Event event = Event() t = Thread(target=run_in_thread, args=(event, )) t.start()  # event等待set event.wait() print('Main thread print') 複製代碼

總體的邏輯沒有太多的修改,主要的是增長了幾行關於Event的使用代碼。

咱們若是要用到Event,最好在代碼當中只使用一次。固然經過Event中的clear方法咱們能夠重置Event的值,但問題是咱們沒辦法保證重置的這個邏輯會在wait以前執行。若是是在以後執行的,那麼就會問題,而且在debug的時候會異常痛苦,由於bug不是必現的,而是有時候會出現有時候不會出現。這種狀況每每都是由於多線程的使用問題。

因此若是要屢次使用開關和信號的話,不要使用Event,可使用信號量。

信號量

Event的問題在於若是多個線程在等待Event的發生,當它一旦被set的時候,那麼這些線程都會同時執行。但有時候咱們並不但願這樣,咱們但願能夠控制這些線程一個一個地運行。若是想要作到這一點,Event就沒法知足了,而須要使用信號量。

信號量和Event的使用方法相似,不一樣的是,信號量能夠保證每次只會啓動一個線程。由於這二者的底層邏輯不太一致,對於Event來講,它更像是一個開關。一旦開關啓動,全部和這個開關關聯的邏輯都會同時執行。而信號量則像是許可證,只有拿到許可證的線程才能執行工做,而且許可證一次只發一張。

想要使用信號量並不須要本身開發,thread庫當中爲咱們提供了現成的工具——Semaphore,咱們來看它的使用代碼:

# 工做線程
def worker(n, sema):  # 等待信號量  sema.acquire()  print('Working', n)  # 初始化 sema = threading.Semaphore(0) nworkers = 10 for n in range(nworkers):  t = threading.Thread(target=worker, args=(n, sema,))  t.start() 複製代碼

在上面的代碼當中咱們建立了10個線程,雖然這些線程都被啓動了,可是都不會執行邏輯,由於sema.acquire是一個阻塞方法,沒有監聽到信號量是會一直掛起等待。

當咱們釋放信號量以後,線程被啓動,纔開始了執行。咱們每釋放一個信號,則會多啓動一個線程。這裏面的邏輯應該不難理解。

總結

在併發場景當中,多線程的使用毫不是多啓動幾個線程作不一樣的任務而已,咱們須要線程間協做,須要同步、獲取它們的狀態,這是很是不容易的。一不當心就會出現幽靈bug,時顯時隱,這也是併發問題讓人頭疼的主要緣由。

這篇文章當中咱們只是簡單介紹了線程間通訊的基本方法,針對這個問題,還有更好的解決方案。咱們將在後續的文章當中繼續討論這個問題,敬請期待。

今天的文章到這裏就結束了,若是喜歡本文的話,請來一波素質三連,給我一點支持吧(關注、轉發、點贊)。

本文使用 mdnice 排版

相關文章
相關標籤/搜索