第三部分:Semaphore控制進入數量的鎖html
有時候可能須要運行多個工做線程同時訪問一個資源,但要限制總數。例如,鏈接池支持同時鏈接,可是數目多是固定的,或者一個網絡應用可能支持固定數據的併發下載。這些鏈接就可使用semaphore來進行管理。網絡
import threading import time class HtmlSpider(threading.Thread): def __init__(self,url): super().__init__() self.url = url def run(self): time.sleep(2) print("got html text success") class UrlProducer(threading.Thread): def run(self): for i in range(20): # 好比抓取20個網站信息 html_thread = HtmlSpider("http://baidu.com/{}".format(i)) html_thread.start() if __name__ == '__main__': url_producer = UrlProducer() url_producer.start()
咱們能夠看到結果是20個併發去執行的,若是咱們想一次併發3個線程如何處理呢?多線程
更改代碼以下:併發
import threading import time class HtmlSpider(threading.Thread): def __init__(self,url,sem): super().__init__() self.url = url self.sem = sem def run(self): time.sleep(2) print("got html text success") self.sem.release() class UrlProducer(threading.Thread): def __init__(self,sem): super().__init__() self.sem = sem def run(self): for i in range(20): # 好比抓取20個網站信息 self.sem.acquire() html_thread = HtmlSpider("http://baidu.com/{}".format(i),self.sem) html_thread.start() if __name__ == '__main__': sem = threading.Semaphore(3) url_producer = UrlProducer(sem) url_producer.start()
其實semaphore內部是調用了一個condition。咱們注意semaphore也是必須有acquire方法和release方法。框架
另外,咱們發現Queue內部也是調用了不少condition的方法。ide
問:前面介紹了不少同步的方法,其餘一些軟件都有池的概念,Python也具有嗎?函數
答:固然,Python有兩個池子,一個叫線程池,一個叫進程池,後續咱們講到進程的時候回搠進程池。如今先說線程池。網站
線程池就是concurrent模塊包,是在Python3.2時候引入的。這個池是很是頂層的,對於咱們進行線程和進程池編碼是非好的。並且接口會高度的一致。ui
一個問題:爲何要有線程池?編碼
目的很簡單:就是很是容易的管理線程,線程池本身去調度新的線程去使用,線程池過大的時候會阻塞,知道最新的線程空出來。它不只僅起到了數量控制,若是咱們在主線程當中能夠獲取某一個線程的狀態,或者某一個任務的狀態,或者返回值,這樣就會變得很是簡單。另外,當一個線程完成的時候咱們主線程就會立馬知道。futures可讓多線程和多進程編碼接口一直。
from concurrent.futures import ThreadPoolExecutor import time def get_html(times): time.sleep(times) print("get page {} success".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 經過submit函數提交執行的函數到線程池中,submit是當即返回 task1 = executor.submit(get_html,(3)) task2 = executor.submit(get_html,(2)) # done方法用於斷定某我的物是否完成 print(task1.done()) # 斷定咱們的函數是否執行成功的 print(task2.cancel()) # 如我咱們執行的狀態是執行中是cancel不了的 time.sleep(3) print(task1.done()) # 斷定咱們的函數是否執行成功的 # result方法能夠獲取task的執行結果 print(task1.result())
運行結果:
False False get page 2 success get page 3 success True 3
這裏面咱們用到了ThreadPoolExecutor的類,其中規定了運行的線程數量。
done()方法:斷定咱們的函數是否執行成功
result()方法:返回函數是否成功執行
cancel()方法:取消一個線程任務(可是若是咱們的任務是在執行中,是沒法cancel掉的)
另外,咱們在想一下,咱們想批量的進行提交而且知道提交是否成功怎麼寫。
這個時候咱們須要導入as_completed的模塊,as_completed是一個生成器(咱們知道生成器最好的方式就用for循環提取出來),咱們再用比較高端的推導式的方式進行提交。
from concurrent.futures import ThreadPoolExecutor,as_completed import time def get_html(times): time.sleep(times) print("get page {} success".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3,2,4] all_task = [executor.submit(get_html,(url)) for url in urls] for future in as_completed(all_task): data = future.result() # print("get {} page success".format(data))
另外,咱們還能夠經過executor自己的map方法來完成task
# 經過executor獲取已經完成的task for data in executor.map(get_html,urls): print("get {} page success".format(data)) # get page 2 success # get page 3 success # get 3 page success # get 2 page success # get page 4 success # get 4 page success
可是,略有有點兒差異,上面是完成一個打印一個。
再加一個wait等待。這個命令其實也是很是經常使用並且也是很是好的模塊。wait模塊是等待某一個函數結束再執行下面的內容。另外wait模塊有有個一傳參return_when=後面有四種方式:
FIRST_COMPLETED 當地一個執行完畢
FIRST_EXCEPTION
ALL_COMPLETED
_AS_COMPLETED
executor = ThreadPoolExecutor(max_workers=2) urls = [3,2,4] all_task = [executor.submit(get_html,(url)) for url in urls] wait(all_task,return_when='FIRST_EXCEPTION') print("main over")
小結一下:
* 這樣關於線程池,咱們知道最經常使用的三個模塊:ThreadPoolExecutor, as_completed(注意是一個迭代器), wait。其中方法有submit,result,cancel,done等方法。wait也是能夠傳遞參數的。
* concurrent.futures 中的Future對象咱們通常叫作將來對象,但實際上呢,更形象的說叫task返回容器,task執行結果都會放入裏面。
問:線程的內容真是很多,功能也是很多,可是仍是挺有規律的。
答:其實線程這塊兒,還有幾個內容,都很是簡單,講解完畢咱們最線程進行總結,而後進入進程方面的講解。
補充1(threadLocal模塊):咱們發現若是兩個線程同時操做一個函數的時候,會形成函數中的變量混亂的狀況。咱們能夠經過threadLocal的方法,也叫作線程特定數據。給每個線程單獨去分配一個本地變量能夠防止這個問題:代碼以下。
import threading num = 0 local = threading.local() def run(x,n): x = x + n x = x - n def func(n): local.value = num for i in range(1000000): run(local.value,n) print("%s--%d" %(threading.current_thread().getName(),local.value)) if __name__ == '__main__': t1 = threading.Thread(target=func,args=(6,)) t2 = threading.Thread(target=func,args=(9,)) t1.start() t2.start() t1.join() t2.join() # # Thread - 1 - -0 # Thread - 2 - -0
補充2(barrier模塊):這個單詞是障礙的意思,也就是說像是一個「班車」湊夠了多少個「人」才發車。這裏就是湊夠了多少個線程再進行線程計算,不過這個方法有一個維內託,若是數量不夠時候,會一直停在那裏等待線程。這種方法平時用的也不是不少。它的方法也是wait,代碼以下:
import threading,time bar = threading.Barrier(4) def run(): print("{} -- start".format(threading.current_thread().getName())) time.sleep(1) bar.wait() print("{} -- end".format(threading.current_thread().getName())) if __name__ == '__main__': for i in range(6): threading.Thread(target=run).start()
咱們發現:分配6個線程,其實給的是4個,線程6個併發了以後,等待2個結束併發,一直等不到,就停在那裏了
補充3(Timer模塊):這個模塊很好理解,就是控制線程併發的事件,這是一個定時器,這個定時的事件結束的時候再去開啓。
import threading def run(): print("Thomas is running") t = threading.Timer(5,run) print("父線程開始......") t.start() t.join() print("父線程結束......")
補充4(Event模塊):這個模塊很是簡單,咱們使用手工的方式進行線程之間上鎖解鎖的方式進行通信,咱們能夠調用線程的事件(由於線程行動自己就是一個事件),讓上一個線程事件等待時間觸發。和Condition模塊很是的相似。
import threading,time def func(): event = threading.Event() def run(): for i in range(5): event.wait() event.clear() print("Thomas is running") threading.Thread(target=run).start() return event e = func() for i in range(5): e.set() time.sleep(2)
分析代碼咱們能夠看出.wait是阻塞等待時間的觸發。clear是重置的意思。set是設定的內容。
補充5(enumerate模塊):略
總結:如今咱們能夠對Python的進程進行一下總結了。
第一:進程是運行程序最小的操做單元。在IO操做的時候會常常用到。
第二:Python自己具有GIL(全局解釋器鎖),因此在CPython的解釋下,一個線程放入一個CPU下,在諸如PyPy的Python解釋器下,就是一種去GIL話的解釋器。
第三:進程在上面的框架解釋下,是線程交替來進行多線程操做的,系統沒法自動的調配多核。
第四:因爲線程自己設計的緣由,線程在運行程序後會按照自有的規則釋放空間,因爲這個釋放空間的時間很是短暫,形成程序和程序,數據和數據之間可能產生混亂的狀況。所以咱們引入了鎖、event、condition等方式進行控制。
第五:線程有一些概念是成對出現的,正是因爲第四條的狀況。好比守護和阻塞(daemon和join),wait和clear(event事件),wait和notify(Condition條件),done和wait等。
第六:線程分主線程和子線程這麼一說,wait這個模塊實際上是屬於小而精的一種阻塞操做方式,另外咱們還能夠用with語句來簡化代碼,用推導式直接進行推送任務到進程中。
第七:平時咱們也經常使用線程池來讓Python自動推送任務到線程當中,submit就是這個動做。
第八:諸如像ThreadLocal,Event,Timer,semaphore,barrier等小技巧也須要了解。