Python說文解字_Python之多任務_02

  第三部分:Semaphore控制進入數量的鎖html

   有時候可能須要運行多個工做線程同時訪問一個資源,但要限制總數。例如,鏈接池支持同時鏈接,可是數目多是固定的,或者一個網絡應用可能支持固定數據的併發下載。這些鏈接就可使用semaphore來進行管理。網絡

import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self,url):
        super().__init__()
        self.url = url

    def run(self):
        time.sleep(2)
        print("got html text success")

class UrlProducer(threading.Thread):
    def run(self):
        for i in range(20): # 好比抓取20個網站信息
            html_thread = HtmlSpider("http://baidu.com/{}".format(i))
            html_thread.start()

if __name__ == '__main__':
    url_producer = UrlProducer()
    url_producer.start()

  咱們能夠看到結果是20個併發去執行的,若是咱們想一次併發3個線程如何處理呢?多線程

   更改代碼以下:併發

import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self,url,sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()

class UrlProducer(threading.Thread):
    def __init__(self,sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20): # 好比抓取20個網站信息
            self.sem.acquire()
            html_thread = HtmlSpider("http://baidu.com/{}".format(i),self.sem)
            html_thread.start()

if __name__ == '__main__':
    sem = threading.Semaphore(3)
    url_producer = UrlProducer(sem)
    url_producer.start()

 

   其實semaphore內部是調用了一個condition。咱們注意semaphore也是必須有acquire方法和release方法。框架

  另外,咱們發現Queue內部也是調用了不少condition的方法。ide

 

問:前面介紹了不少同步的方法,其餘一些軟件都有池的概念,Python也具有嗎?函數

答:固然,Python有兩個池子,一個叫線程池,一個叫進程池,後續咱們講到進程的時候回搠進程池。如今先說線程池。網站

  線程池就是concurrent模塊包,是在Python3.2時候引入的。這個池是很是頂層的,對於咱們進行線程和進程池編碼是非好的。並且接口會高度的一致。ui

  一個問題:爲何要有線程池?編碼

  目的很簡單:就是很是容易的管理線程,線程池本身去調度新的線程去使用,線程池過大的時候會阻塞,知道最新的線程空出來。它不只僅起到了數量控制,若是咱們在主線程當中能夠獲取某一個線程的狀態,或者某一個任務的狀態,或者返回值,這樣就會變得很是簡單。另外,當一個線程完成的時候咱們主線程就會立馬知道。futures可讓多線程和多進程編碼接口一直。

from concurrent.futures import ThreadPoolExecutor
import time


def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times


executor = ThreadPoolExecutor(max_workers=2)
# 經過submit函數提交執行的函數到線程池中,submit是當即返回
task1 = executor.submit(get_html,(3))
task2 = executor.submit(get_html,(2))

# done方法用於斷定某我的物是否完成
print(task1.done()) # 斷定咱們的函數是否執行成功的
print(task2.cancel()) # 如我咱們執行的狀態是執行中是cancel不了的
time.sleep(3)
print(task1.done()) # 斷定咱們的函數是否執行成功的

# result方法能夠獲取task的執行結果
print(task1.result())

  運行結果:

False
False
get page 2 success
get page 3 success
True
3

 

  這裏面咱們用到了ThreadPoolExecutor的類,其中規定了運行的線程數量。

  done()方法:斷定咱們的函數是否執行成功

  result()方法:返回函數是否成功執行

  cancel()方法:取消一個線程任務(可是若是咱們的任務是在執行中,是沒法cancel掉的)

 

  另外,咱們在想一下,咱們想批量的進行提交而且知道提交是否成功怎麼寫。

  這個時候咱們須要導入as_completed的模塊,as_completed是一個生成器(咱們知道生成器最好的方式就用for循環提取出來),咱們再用比較高端的推導式的方式進行提交。

from concurrent.futures import ThreadPoolExecutor,as_completed
import time

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3,2,4]
all_task = [executor.submit(get_html,(url)) for url in urls]
for future in as_completed(all_task):
    data = future.result()  #
    print("get {} page success".format(data))

 

  另外,咱們還能夠經過executor自己的map方法來完成task

# 經過executor獲取已經完成的task
for data in executor.map(get_html,urls):
    print("get {} page success".format(data))
    
# get page 2 success
# get page 3 success
# get 3 page success
# get 2 page success
# get page 4 success
# get 4 page success

  可是,略有有點兒差異,上面是完成一個打印一個。

 

  再加一個wait等待。這個命令其實也是很是經常使用並且也是很是好的模塊。wait模塊是等待某一個函數結束再執行下面的內容。另外wait模塊有有個一傳參return_when=後面有四種方式:

  FIRST_COMPLETED 當地一個執行完畢

  FIRST_EXCEPTION 

  ALL_COMPLETED

  _AS_COMPLETED

executor = ThreadPoolExecutor(max_workers=2)
urls = [3,2,4]
all_task = [executor.submit(get_html,(url)) for url in urls]
wait(all_task,return_when='FIRST_EXCEPTION')
print("main over")

 

  

  小結一下:

  * 這樣關於線程池,咱們知道最經常使用的三個模塊:ThreadPoolExecutor, as_completed(注意是一個迭代器), wait。其中方法有submit,result,cancel,done等方法。wait也是能夠傳遞參數的。

  * concurrent.futures 中的Future對象咱們通常叫作將來對象,但實際上呢,更形象的說叫task返回容器,task執行結果都會放入裏面。

 

問:線程的內容真是很多,功能也是很多,可是仍是挺有規律的。

答:其實線程這塊兒,還有幾個內容,都很是簡單,講解完畢咱們最線程進行總結,而後進入進程方面的講解。

  補充1(threadLocal模塊):咱們發現若是兩個線程同時操做一個函數的時候,會形成函數中的變量混亂的狀況。咱們能夠經過threadLocal的方法,也叫作線程特定數據。給每個線程單獨去分配一個本地變量能夠防止這個問題:代碼以下。

import threading

num = 0
local = threading.local()

def run(x,n):
    x = x + n
    x = x - n

def func(n):
    local.value = num
    for i in range(1000000):
        run(local.value,n)
    print("%s--%d" %(threading.current_thread().getName(),local.value))

if __name__ == '__main__':
    t1 = threading.Thread(target=func,args=(6,))
    t2 = threading.Thread(target=func,args=(9,))

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    # 
    # Thread - 1 - -0
    # Thread - 2 - -0

 

  

  補充2(barrier模塊):這個單詞是障礙的意思,也就是說像是一個「班車」湊夠了多少個「人」才發車。這裏就是湊夠了多少個線程再進行線程計算,不過這個方法有一個維內託,若是數量不夠時候,會一直停在那裏等待線程。這種方法平時用的也不是不少。它的方法也是wait,代碼以下:

import threading,time

bar = threading.Barrier(4)

def run():
    print("{} -- start".format(threading.current_thread().getName()))
    time.sleep(1)
    bar.wait()
    print("{} -- end".format(threading.current_thread().getName()))

if __name__ == '__main__':
    for i in range(6):
        threading.Thread(target=run).start()

   咱們發現:分配6個線程,其實給的是4個,線程6個併發了以後,等待2個結束併發,一直等不到,就停在那裏了

 

  補充3(Timer模塊):這個模塊很好理解,就是控制線程併發的事件,這是一個定時器,這個定時的事件結束的時候再去開啓。

import threading


def run():
    print("Thomas is running")

t = threading.Timer(5,run)

print("父線程開始......")
t.start()
t.join()
print("父線程結束......")

 

  

  補充4(Event模塊):這個模塊很是簡單,咱們使用手工的方式進行線程之間上鎖解鎖的方式進行通信,咱們能夠調用線程的事件(由於線程行動自己就是一個事件),讓上一個線程事件等待時間觸發。和Condition模塊很是的相似。

import threading,time


def func():
    event = threading.Event()
    def run():
        for i in range(5):
            event.wait()
            event.clear()
            print("Thomas is running")
    threading.Thread(target=run).start()

    return event

e = func()
for i in range(5):
    e.set()
    time.sleep(2)

  分析代碼咱們能夠看出.wait是阻塞等待時間的觸發。clear是重置的意思。set是設定的內容。

 

  補充5(enumerate模塊):略

 

  總結:如今咱們能夠對Python的進程進行一下總結了。

  第一:進程是運行程序最小的操做單元。在IO操做的時候會常常用到。

  第二:Python自己具有GIL(全局解釋器鎖),因此在CPython的解釋下,一個線程放入一個CPU下,在諸如PyPy的Python解釋器下,就是一種去GIL話的解釋器。

  第三:進程在上面的框架解釋下,是線程交替來進行多線程操做的,系統沒法自動的調配多核。

  第四:因爲線程自己設計的緣由,線程在運行程序後會按照自有的規則釋放空間,因爲這個釋放空間的時間很是短暫,形成程序和程序,數據和數據之間可能產生混亂的狀況。所以咱們引入了鎖、event、condition等方式進行控制。

  第五:線程有一些概念是成對出現的,正是因爲第四條的狀況。好比守護和阻塞(daemon和join),wait和clear(event事件),wait和notify(Condition條件),done和wait等。

  第六:線程分主線程和子線程這麼一說,wait這個模塊實際上是屬於小而精的一種阻塞操做方式,另外咱們還能夠用with語句來簡化代碼,用推導式直接進行推送任務到進程中。

  第七:平時咱們也經常使用線程池來讓Python自動推送任務到線程當中,submit就是這個動做。

  第八:諸如像ThreadLocal,Event,Timer,semaphore,barrier等小技巧也須要了解。

相關文章
相關標籤/搜索