''' python提供了一些複雜的工具用於管理使用進程和線程的併發操做。 經過應用這些計數,使用這些模塊併發地運行做業的各個部分,即使是一些至關簡單的程序也能夠更快的運行 subprocess提供了一個API能夠建立子進程並與之通訊 這對於運行生產或消費文本的程序尤爲有好處,由於這個API支持經過新進程的標準輸入和輸出通道來回傳遞數據。 signal模塊提供了unix信號機制,能夠向其餘進程發送事件。信號會被異步處理,一般信號到來時要中斷程序正在作的工做。 信號做爲一個粗粒度的消息系統會頗有用,不過其餘進程內通訊技術更可靠,並且能夠傳遞更復雜的消息。 threading模塊包塊一個面向對象的高層API,用於處理python的併發性。Thread對象在同一個進程中併發地運行,並共享內存。 對於I/O受限而不是CPU受限的任務來講,使用線程是這些任務實現縮放的一種簡單方法。 miltiprocessing模塊是threading的鏡像,只是它提供了一個Process而非一個Thread類。 每一個Process都是真正的系統進程(而無共享內存),multiprocessing提供了一些特性能夠共享數據並傳遞消息,使得不少狀況下從線程轉化爲進程很簡單,只須要修改幾個import語句。 asyncio使用一個基於類的協議系統或協程爲併發和異步I/O管理提供了一個框架。 asyncio替換了原來的asyncore和asynchat模塊,這些模塊仍可用,但已經廢棄。 concurrent.futures提供了基於線程和進程的執行器實現,用來管理資源池以運行併發的任務 '''
''' subprocess模塊提供了3個API來處理進程。 run函數時python3.5中新增的,做爲一個高層API,其用於運行進程並收集它的輸出。函數call、check_call、check_output是從python2沿襲來的原高層API 這些函數仍受到支持,並在現有的程序中普遍使用。 類Popen是一個用於創建其餘API的底層API,對更復雜的進程交互頗有用。Popen的構造函數利用參數創建新進程,使父進程能夠經過管道與之通訊。 它能夠替換一些其餘模塊和函數,並能提供所替換的這些模塊和函數的所有功能,甚至還更多。 在全部狀況下,這個API的用法是一致的,不少開銷的額外步驟(如關閉額外的文件描述符,以及確保管道關閉)都已經內置在這個API中,不須要由應用代碼單獨設置。 subprocess模塊是爲了替換os.system、os.spawnv,os和peopen2模塊中不一樣形式的popen函數,以及commands模塊。 '''
import subprocess ''' subprocess的命令蠻多的,這裏我只想介紹一個。 用好這一個,包治百病 ''' # 第一個是要執行的shell命令,下面幾個固定不變 result = subprocess.Popen("xxxx", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) # 而後全部的結果都在result.stdout裏面 # 讀取出來進行解碼便可,記住是操做系統的默認編碼 print(result.stdout.read().decode("gbk")) ''' 2019/03/22 20:05 <DIR> . 2019/03/22 20:05 <DIR> .. 2019/02/27 15:54 <DIR> 1.文本 2019/03/25 17:13 <DIR> 10.使用進程線程協程提供併發性 2019/03/05 14:15 <DIR> 2.數據結構 2019/03/08 15:33 <DIR> 3.算法 2019/03/19 10:20 <DIR> 4.日期和時間 2019/03/19 17:16 <DIR> 5.數學運算 2019/03/21 13:54 <DIR> 6.文件系統 2019/03/21 16:53 <DIR> 7.數據持久存儲與交換 2019/03/22 19:13 <DIR> 8.數據壓縮與歸檔 2019/03/22 19:54 <DIR> 9.加密 2019/03/22 19:07 809 test.py 1 個文件 809 字節 12 個目錄 59,315,806,208 可用字節 ''' # 若是命令不存在 ''' 'xxxx' 不是內部或外部命令,也不是可運行的程序 或批處理文件。 '''
''' 信號是一個操做系統特性,它提供了一個途徑能夠通知程序這裏發生了一個事件,而且異步處理這個事件。 信號能夠由系統自己生成,也能夠從一個進程發送到另外一個進程。 因爲信號會中斷程序的正常控制流,若是在操做過程當中間接收到信號,有些操做(特別是I/O操做)可能會產生錯誤 信號由整數表示,在操做系統C首部中定義。python在signal模塊中提供了適合不一樣平臺的多種信號。 '''
passpython
''' threading模塊提供了管理多個線程執行的API,容許程序哎同一個進程空間併發地執行多個操做 '''
import threading ''' 要使用Thread,最簡單的方法就是用一個目標函數實例化一個Thread對象,並調用start方法讓它工做 ''' def worker(name, age): print(f"my name is {name}, age is {age}") for i in range(5): # 建立Thread對象 # 接收兩個參數,target,使咱們要執行的函數。args是函數裏面須要的參數,要以元組的方式傳進去,即使只有一個參數,也要以元組的方式傳進去,(xxx, ) t = threading.Thread(target=worker, args=(f"name{i}", f"age{i}")) # 調用start方法,啓動 t.start() ''' my name is name0, age is age0 my name is name1, age is age1 my name is name2, age is age2 my name is name3, age is age3 my name is name4, age is age4 '''
import threading ''' 使用參數來標識或者命名線程很麻煩,也沒有必要。 每一個Thread實例都帶有一個默認地名字,該默認值能夠在建立線程時改變。 若是服務器進程中有多個服務器線程處理不一樣的操做,那麼在這樣的服務器進程中,對線程命名就頗有用。 ''' def worker1(): print(threading.current_thread(), "----", threading.current_thread().getName()) # 除了剛纔說的那兩個參數以外,還能夠有第三個參數,就是給線程指定的名字 t1 = threading.Thread(target=worker1, name="work1") # 這裏咱們不指定名字,因爲函數不須要參數,因此args也不須要傳 t2 = threading.Thread(target=worker1) t1.start() t2.start() ''' <Thread(work1, started 6844)> ---- work1 <Thread(Thread-1, started 6888)> ---- Thread-1 '''
import threading import logging import time def worker1(): logging.debug("start") time.sleep(0.5) logging.debug("end") logging.basicConfig( level=logging.DEBUG, format="[%(levelname)s] (%(threadName)s -10s) %(message)s" ) t1 = threading.Thread(target=worker1, name="work1") t2 = threading.Thread(target=worker1) t1.start() t2.start() ''' [DEBUG] (work1 -10s) start [DEBUG] (Thread-1 -10s) start [DEBUG] (work1 -10s) end [DEBUG] (Thread-1 -10s) end ''' # logging是線程安全的,因此來自不一樣的線程的消息在輸出中會有所區分
import threading import time ''' 當咱們開啓一個單獨的線程以後,主線程是會繼續往下走的。若是子線程沒有處理完,那麼主線程會卡在最後等待着,直到全部線程處理完畢。 若是我想某些子線程要在主線程結束的時候也跟着結束,不會出現本身還沒結束而讓主線程等着的狀況,該怎麼辦呢?那麼能夠將這些子線程設置爲守護線程便可。 ''' def daemon(t): print(f"線程{t1}正在睡眠····") time.sleep(t) print(f"線程{t1}睡眠結束····") # 加上daemon=True,表示將這個線程設置爲守護線程 t1 = threading.Thread(target=daemon, args=(3, ), name="t1", daemon=True) ''' 固然設置守護線程還能夠這麼設置 t1.setDaemon(True) 同理設置線程名字也是同樣 t1.setName("t1") ''' # 當咱們這樣執行的時候,主線程開啓一個name="t1"的線程以後,就無論了,而後往下執行 # 然而下面沒有代碼了,因此就結束了。當子線程不是守護線程,那麼主線程在最後會等待住,當子線程執行完畢以後才結束 # 可是當咱們設置爲守護線程時,那麼意義就不同了,守護線程是守護主線程的,主線程結束,那麼守護線程也會自殺。 t1.start() ''' 輸出結果: 線程<Thread(t1, started daemon 8012)>正在睡眠···· '''
import threading import time def daemon(t): print(f"線程{threading.current_thread().getName()}正在睡眠····") time.sleep(t) print(f"線程{threading.current_thread().getName()}睡眠結束····") t1 = threading.Thread(target=daemon, args=(3, ), name="t1", daemon=True) t2 = threading.Thread(target=daemon, args=(4, ), name="t2") t1.start() t2.start() ''' 線程t1正在睡眠···· 線程t2正在睡眠···· 線程t1睡眠結束···· 線程t2睡眠結束···· ''' # 首先t1是守護線程,可是t2不是。主線程雖然不須要等待t1,可是須要等待t2. # t2沉睡4秒,t1沉睡3秒,主要在等待t2的4s中,守護線程已經執行結束,因此會打印4條 # 若是守護線程t1 sleep 4秒,非守護線程t2 sleep 3秒的話,那麼非守護線程先結束,主線程也就不須要再等待了。 # 所以打印的結果會變成 ''' 線程t1正在睡眠···· 線程t2正在睡眠···· 線程t2睡眠結束···· '''
import threading import time def daemon(t): print(f"線程{threading.current_thread().getName()}正在睡眠····") time.sleep(t) start_time = time.time() t = threading.Thread(target=daemon, args=(3, )) t.start() end_time = time.time() print("總耗時:", end_time-start_time) ''' 線程Thread-1正在睡眠····總耗時: 0.0010001659393310547 ''' # 咱們把函數daemon裏面time.sleep(t)想象成耗時t秒的操做,可是以前說了,主線程只是開啓了一個線程就無論了 # 所以最後的時間確定是不許的,由於任務還沒執行完畢就往下走了 # 若是咱們想當任務執行完成以後,再讓主線程往下走呢?
import threading import time def daemon(t): print(f"線程{threading.current_thread().getName()}正在睡眠····") time.sleep(t) start_time = time.time() t = threading.Thread(target=daemon, args=(3, )) t.start() # 可使用join方法,這個方法會使得主線程卡在這個地方,直到使用join方法的Tread對象執行完畢以後纔會往下走 # 固然join裏面還能夠加上參數,傳入2,表示最多等待2s。2s以內,何時執行結束何時往下走,若是2s內沒執行完畢,那麼無論了,直接往下走 t.join() end_time = time.time() print("總耗時:", end_time-start_time) ''' 線程Thread-1正在睡眠···· 總耗時: 3.001171827316284 '''
import threading import time def daemon(t): print(f"線程{threading.current_thread().getName()}正在睡眠····") time.sleep(t) start_time = time.time() t = threading.Thread(target=daemon, args=(3, )) t.start() print(f"線程是否活着:{t.is_alive()}") # 線程是否活着:True # 可使用join方法,這個方法會使得主線程卡在這個地方,直到使用join方法的Tread對象執行完畢以後纔會往下走 # 固然join裏面還能夠加上參數,傳入2,表示最多等待2s。2s以內,何時執行結束何時往下走,若是2s內沒執行完畢,那麼無論了,直接往下走 t.join() end_time = time.time() print(f"線程是否活着:{t.is_alive()}") # 線程是否活着:False print("總耗時:", end_time-start_time) ''' 線程Thread-1正在睡眠···· 總耗時: 3.001171827316284 ''' # 調用Thead對象下的is_alive能夠判斷該線程是否還活着,當join以後,主線程都往下走了,因此確定死了
import threading import time import random ''' 沒有必要爲全部守護線程維護一個顯式句柄來確保它們在退出主進程以前已經完成。 enumerate會返回Thread實例的一個列表。 ''' def worker(t): print(f"我要sleep{t}秒") time.sleep(t) for i in range(3): t = threading.Thread(target=worker, args=(random.randint(1, 5), )) t.start() # 我要讓全部子線程都執行完畢以後,才繼續往下走 ''' 一種方法是先建立一個列表, list_t = [], 而後 for i in range(3): t = threading.Thread(target=worker, args=(random.randint(1, 5), )) list_t.append(t) t.start() for i in list_t: i.join() 這是一種方法,我在啓動的時候把全部Thread對象都放到一個列表裏面,而後循環列表,把全部的Thread對象都join住,這樣就完成當全部子線程都結束時,主線程才往下走的需求 ''' # 可是還有更好的方法 # threading.enumerate方法能夠查看全部的線程 for t in threading.enumerate(): print(t.name) ''' MainThread Thread-1 Thread-2 Thread-3 ''' # 能夠看到,這個方法可以拿到全部的線程 # 可是它把主線程也拿出來了 # 可是咱們能夠單獨獲取主線程 main_thread = threading.main_thread() print(main_thread.name) # MainThread for t in threading.enumerate(): if t is main_thread: continue t.join() # 這樣也能實現等待子線程的功能,並且更加的pythonic print("我必須是最後一個打印的,由於全部的子線程全都join了,我必須等待它們都執行結束,我才能走到這一步")
import threading ''' 咱們除了定義一個函數以外,也能夠定義一個類 ''' # 必需要繼承Thread方法 class MyThread(threading.Thread): def __init__(self, name, info): self.info = info # name是咱們的線程名稱,所以交給父類的init方法執行,可是info是咱們本身的參數就不須要了 super().__init__(name=name) # 而後重寫父類的run方法,當咱們調用start時,就會執行run方法 def run(self): print(f"當前線程:{self.name}, info:{self.info}") for i in range(3): t = MyThread(name=f"t{i}", info=f"這是線程t{i}") t.start() ''' 當前線程:t0, info:這是線程t0 當前線程:t1, info:這是線程t1 當前線程:t2, info:這是線程t2 '''
import threading import time ''' 有時出於某種緣由須要派生Thread,Timer就是這樣一個例子,Timer也包含在threading中。 Timer在一個延遲以後開始工做,並且能夠在這個延遲期間內的任意時刻被取消 ''' def delayed(): print(f"{threading.current_thread().name}:work running") time.sleep(4) print(f"{threading.current_thread().name}:work done") # 2s以後執行delayed函數 t1 = threading.Timer(2, delayed) # 3s以後執行delayed函數 t2 = threading.Timer(3, delayed) t1.setName("t1") t2.setName("t2") t1.start() t2.start() # 主線程繼續往下走 # 將t2取消,調用cancel方法 t2.cancel() print("主線程走到頭啦") ''' 主線程走到頭啦 t1:work running t1:work done ''' # t2都還沒來得及打印,就被取消了
import threading import time ''' 儘管使用多線程的目的是併發地運行單獨的操做,但有時也須要在兩個或多個線程中同步操做。 事件對象是實現線程安全通訊的一種簡單方法。 Event管理一個內部標誌,調用者能夠用set和clear方法控制這個標誌。其餘線程可使用wait暫停,直到這個標誌被設置,能夠有效地阻塞進程直至容許這些線程繼續。 ''' def go1(event: threading.Event): print(f"{threading.current_thread().name}前進啦, 三秒以後碰見紅燈要停下") time.sleep(3) print(f"{threading.current_thread().name}停下啦,其餘人能夠走了") event.set() def go2(event: threading.Event): print(f"{threading.current_thread().name}遇到紅燈啦,先不能走") event.wait() print(f"{threading.current_thread().name}前進啦") e = threading.Event() t1 = threading.Thread(target=go1, args=(e, )) t2 = threading.Thread(target=go2, args=(e, )) t1.start() t2.start() ''' Thread-1前進啦, 三秒以後碰見紅燈要停下 Thread-2遇到紅燈啦,先不能走 Thread-1停下啦,其餘人能夠走了 Thread-2前進啦 ''' # event.wait方法會阻塞,直到調用set方法設置標誌位 # 因此go1先執行,go2打印第一句話以後會卡住,當go1執行完畢以後,調用set方法以後go2能夠繼續執行. # 注意:一旦set以後再調用wait是沒有用的,因此go1set以後,go2的wait就失去效果了 # 若是想要使wait生效,那麼必須clear,將標誌位清空,而後纔可使用wait # 也能夠調用is_set方法來查看是否設置了標誌位
import threading import time ''' 除了同步線程操做,還有一點很重要,要可以控制對共享資源的訪問,從而避免破壞或丟失數據。 python的內置數據結構(列表、字典等)是線程安全的,這是python使用原子字節碼來管理這些數據結構的一個反作用(更新過程當中不會釋放保護python內部數據結構的全局解釋器鎖)。 python中實現的其餘數據結構或更簡單的類型(如整數和浮點數)則沒有這個保護。 所以要保證同時安全地訪問一個對象,可使用一個Lock對象 ''' num = 0 def add(): global num for _ in range(1000000): num += 1 def sub(): global num for _ in range(1000000): num -= 1 t1 = threading.Thread(target=add) t2 = threading.Thread(target=sub) t1.start() t2.start() t1.join() t2.join() print(num) # 執行三次 ''' 428530 73280 -435653 ''' # 爲何會出現這種結果,每次還都不同。 # 由於python的多線程不是真正意義上的多線程,沒法在同時利用多個核,而是時間片輪轉。 # 有可能在num += 1的時候,好比此時num=100,原本應該得101,可是還沒執行完,線程讓出,執行了num-=1,num變成99了。 # 再執行num+=1操做獲得的仍是100,這就致使了最後的結果確定是不對的
import threading ''' 怎麼改變這一局面呢?可使用加鎖的方式 ''' num = 0 lock = threading.Lock() def add(): global num for _ in range(1000000): # 加上鎖 lock.acquire() num += 1 # 執行完畢以後釋放 lock.release() def sub(): global num for _ in range(1000000): lock.acquire() num -= 1 lock.release() t1 = threading.Thread(target=add) t2 = threading.Thread(target=sub) t1.start() t2.start() t1.join() t2.join() print(num) # 0 ''' 這樣的話,無論執行多少次,結果都是0 由於咱們在對num進行相加和相減的時候,加上了鎖,這個過程是不會被打斷的。只有運算完畢將鎖釋放以後纔會切換 '''
pass算法
passshell
''' asyncio模塊提供了使用協程構建併發應用的工具。 threading模塊經過應用線程實現併發,multiprocessing使用系統進程實現併發,asyncio使用一種單線程、單進程模式實現併發,應用的各個部分會彼此合做,在最優的時刻顯式的切換任務。 大多數狀況下,會在程序阻塞等待讀寫數據時發生這種上下文切換,不過asyncio也支持調度代碼在未來的某個特定時間運行,從而支持一個協程等待另外一個協程完成,以處理系統信號和識別其餘一些事件(這些事件可能致使應用改變其工做內容) '''
''' 使用其餘併發模型的大多數程序都採用線性方式編寫,並且依賴於語言運行時系統或操做系統的底層線程或進程管理來適當地改變上下文。 基於asyncio的應用要求應用代碼顯式地處理上下文切換,要正確地使用相關技術,這取決因而否能正確理解一些相關聯的概念。 asyncio提供的框架以一個事件循環(event loop)爲中心,這是一個首類對象,負責高效地處理I/O事件、系統事件、和應用上下文切換。 目前已經提供了多個循環實現來高效地利用操做系統的功能。儘管一般會自動選擇一個合理的默認實現,但也徹底能夠在應用中選擇某個特定的事件循環實現。 在不少狀況下都頗有用,例如:在Windows下,一些循環類增長了對外部進程的支持,這可能會以犧牲一些網絡I/O效率爲代價 與事件循環交互的應用要顯式地註冊將運行的代碼,讓事件循環在資源可用時嚮應用代碼發出必要的調用。 例如:一個網絡服務器打開套接字,而後註冊爲當這些套接字上出現輸入事件時服務器要獲得的通知。 事件循環在創建一個新的進入連接或者在數據可讀取時都會提醒服務器代碼。當前上下文中沒有更多工做可作時,應用代碼要再次短期地交出控制權。 例如:若是一個套接字沒有更多的數據能夠接收,那麼服務器會把控制權交給事件循環 因此,就是把代碼註冊到事件循環中,不斷地循環這些事件,能夠處理了那麼就去處理,若是卡住了,那麼把控制權交給事件循環,繼續執行其餘可執行的任務。 像傳統的twisted、gevent、以tornado,都是採用了事件循環的方式,這種模式只適用於高I/O,低CPU的場景,一旦出現了耗時的複雜運算,那麼全部任務都會被卡住。 將控制權交給事件循環的機制依賴於協程(coroutine),這是一些特殊的函數,能夠將控制返回給調用者而不丟失其狀態。 協程與生成器很是相似,實際上,在python3.5版本以前還未對協程提供原生支持時,能夠用生成器來實現協程。 asyncio還爲協議(protocol)和傳輸(transport)提供了一個基於類的抽象層,可使用回調編寫代碼而不是直接編寫協程。 在基於類的模型和協程模型時,能夠經過從新進入事件循環顯式地改變上下文,以取代python多線程實現中隱式的上線文改變 future是一個數據結構,表示還未完成的工做結果。事件循環能夠監視Future對象是否完成,從而容許應用的一部分等待另外一部分完成一些工做。 處理future,asyncio還包括其餘併發原語,如鎖和信號量。 Task是Future的一個子類,它知道如何包裝和管理一個協程的執行。 任務所須要的資源可用時,事件循環會調度任務運行,並生成一個結果,從而能夠由其餘協程消費。 '''
import asyncio ''' 協程是一個專門設計用來實現併發操做的語言構造。 調用協程函數時會建立一個協程對象,而後調用者使用協程的send方法運行這個函數的代碼。協程可使用await關鍵字(並提供另外一個協程)暫停執行。 暫停時,這個協程的狀態會保留,使得下一次被喚醒時能夠從暫停的地方恢復執行 ''' # 使用async def能夠直接定義一個協程 async def coroutine(): print("in coroutine") # 建立事件循環 loop = asyncio.get_event_loop() try: print("start coroutine") # 協程是沒法直接運行的,必需要扔到事件循環裏,讓事件循環驅動運行 coro = coroutine() print("entering event loop") # 必須扔到事件循環裏,這個方法的含義從名字也能看出來,直到協程運行完成 loop.run_until_complete(coro) finally: print("closing event loop") # 關閉事件循環 loop.close() ''' start coroutine entering event loop in coroutine closing event loop ''' # 第一步是獲得事件循環的引用。 # 可使用默認地循環類型,也能夠實例化一個特定的循環類。 # run_until_complete方法啓動協程,協程退出時這個方法會中止循環
import asyncio ''' 咱們也能夠獲取協程的返回值 ''' async def coroutine(): print("in coroutine") return "result" loop = asyncio.get_event_loop() try: coro = coroutine() result = loop.run_until_complete(coro) print(result) finally: loop.close() ''' in coroutine result ''' # 在這裏,run_until_complete還會返回它等待的協程的結果
import asyncio ''' 一個協程還能夠驅動另外一個協程並等待結果,從而能夠更容易地將一個任務分解爲可重用的部分。 ''' async def worker(): print("worker....") # 使用await方法會驅動協程consumer執行,並獲得其返回值 res = await consumer() print(res) async def consumer(): return "i am consumer" loop = asyncio.get_event_loop() try: loop.run_until_complete(worker()) finally: loop.close() ''' worker.... i am consumer ''' # 在這裏,使用await關鍵字,而不是向循環中增長新的協程。由於控制流已經在循環管理的一個協程中,因此不必告訴循環管理這些協程。 # 另外,協程能夠併發運行,但前提是多個協程。這個協程卡住了,能夠切換到另外一個協程。可是就卡住的協程自己來講,該卡多長時間仍是多長時間,不可能說跳過卡住的部分執行下面的代碼。
import asyncio ''' 協程函數時asyncio設計中的關鍵部分。 它們提供了一個語言構造,能夠中止程序某一部分的執行,保留這個調用的狀態,並在之後從新進入這個狀態,這些動做都是併發框架很重要的功能。 python3.5中引入了一些新的語言特性,可使用async def以原生方式定義這些協程,以及使用await交出控制,asyncio的例子應用了這些新特性。 可是早期版本,可使用asyncio.coroutine裝飾器將函數裝飾成一個協程並使用yield from來達到一樣的效果。 ''' @asyncio.coroutine def worker(): print("worker....") res = yield from consumer() print(res) @asyncio.coroutine def consumer(): return "i am consumer" loop = asyncio.get_event_loop() try: loop.run_until_complete(worker()) finally: loop.close() ''' worker.... i am consumer ''' # 儘管使用生成器能夠達到一樣的效果,但仍是推薦使用async和await ''' 生成器既能夠作生成器,又能夠包裝爲協程,那麼它究竟是協程仍是生成器呢?這會使得代碼出現混亂 生成器應該作本身 基於async的原生協程比使用yield裝飾器的協程要快,大概快10-20% '''
import asyncio from functools import partial ''' 除了管理協程和I/P回調,asyncio事件循環還能夠根據循環中保存的一個定時器值來調度常規函數調用。 ''' # 若是回調的時間不重要,那麼可使用call_soon調度下一次循環迭代的調用 def callback(*args, **kwargs): print("callback:", args, kwargs) async def main(loop): print("register callback") # 接收一個回調函數,和參數 loop.call_soon(callback, "mashiro", 16) print("********") # 若是是關鍵字參數是不能直接傳的,須要使用偏函數轉換一下 wrapped = partial(callback, kwargs={"name": "satori", "age": 16}) loop.call_soon(wrapped, "mahsiro", 16) print("—————————") await asyncio.sleep(0.6) event_loop = asyncio.get_event_loop() try: print("entering event loop") event_loop.run_until_complete(main(event_loop)) finally: print("closing event loop") event_loop.close() ''' entering event loop register callback ******** ————————— callback: ('mashiro', 16) {} callback: ('mahsiro', 16) {'kwargs': {'name': 'satori', 'age': 16}} closing event loop ''' # 能夠看到,call_soon調用callback是最後執行的
import asyncio from functools import partial ''' 要將回調推遲到未來的某個時間調用,可使用call_later。這個方法的第一個參數是延遲時間(單位爲秒),第二個參數是回調。 ''' def callback(cb, n): print(f"{cb} {n}") async def main(loop): print("register callback") loop.call_later(0.2, callback, "call_later", "0.2s") loop.call_later(0.1, callback, "call_later", "0.1s") loop.call_soon(callback, "call_soon", 3) print("-----------") await asyncio.sleep(0.6) event_loop = asyncio.get_event_loop() try: print("entering event loop") event_loop.run_until_complete(main(event_loop)) finally: print("closing event loop") event_loop.close() ''' entering event loop register callback ----------- call_soon 3 call_later 0.1s call_later 0.2s closing event loop ''' # 能夠看到,call_soon調用callback的延遲是最小的,當咱們碰見了asyncio.sleep的時候,自動切換,瞬間執行。
import asyncio import time ''' 除了call_soon瞬間執行,和call_later延遲執行以外,還有一個call_at在指定之間內執行。 實現這個目的的循環依賴於一個單調時鐘,而不是牆上的時鐘時間,以確保now時間絕對不會逆轉。 要爲一個調度回調選擇時間,必須使用循環的time方法從這個時鐘的內部開始 ''' def callback(cb, loop): print(f"callback {cb} invoked at {loop.time()}") async def main(loop): now = loop.time() print("clock time:", time.time()) print("loop time:", now) print("register callback") loop.call_at(now+0.2, callback, "call_at", loop) loop.call_at(now+0.1, callback, "call_at", loop) loop.call_soon(callback, "call_soon", loop) sum = 0 for i in range(99999999): sum += i print("sum =", sum) await asyncio.sleep(1) event_loop = asyncio.get_event_loop() try: print("entering event loop") event_loop.run_until_complete(main(event_loop)) finally: print("closing event loop") event_loop.close() ''' entering event loop clock time: 1553659076.2547848 loop time: 7238.493 register callback sum = 4999999850000001 callback call_soon invoked at 7246.012 callback call_at invoked at 7246.012 callback call_at invoked at 7246.012 closing event loop ''' # call_soon會馬上調用,可是若是下面還有代碼的話會繼續執行,當碰見asyncio.sleep的時候,自動切換執行,若是沒有或者沒有阻塞,那麼會最後執行。 # call_at原本是在0.2s以後執行的,可是當中出現了複雜的運算,因此計算時間日後推遲了
import asyncio import time ''' Future表示還未完成的工做的結果。事件循環能夠經過監視一個Future對象的狀態來指示它已經完成,從而容許應用的一部分等待另外一部分完成一些工做。 ''' # Future的作法相似於協程,因此等待協程所用的技術一樣能夠用於等待Future。 def mark_done(future, result): print("setting result") future.set_result(result) event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() print("sceduling mark_done") event_loop.call_soon(mark_done, all_done, "the result") print("entering loop") event_loop.run_until_complete(all_done) finally: print("close loop") event_loop.close() print("future result:", all_done.result()) ''' sceduling mark_done entering loop setting result close loop future result: the result ''' # 調用set_result時,Future的狀態改成完成,Future實例會保留提供給方法的結果,以備後續獲取 future = asyncio.Future() # 設置只能設置一次 future.set_result("xxx") # 可是取能夠取屢次 print(future.result()) # xxx print(future.result()) # xxx print(future.result()) # xxx print(future.result()) # xxx print(future.result()) # xxx
import asyncio import time ''' Future還能夠結合await關鍵字使用 ''' def mark_done(future, result): print("setting result") future.set_result(result) async def main(loop): all_done = asyncio.Future() print("scheduling mark_done") loop.call_soon(mark_done, all_done, "the result") # 會等到all_done這個Future對象裏面有值位置 res = await all_done print("res =", res) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close() ''' scheduling mark_done setting result res = the result ''' # Future的結果由await返回,因此常常會讓一樣的代碼處理一個常規的協程和一個Future實例
import asyncio import functools ''' 除了作法與協程相似,Future也能夠調用回調,回調的順序按照其註冊的順序調用 ''' def callback(future, n): print(f"future result: {future.result()} n:{n}") async def register_callback(all_done): print("register callback on futures") all_done.add_done_callback(functools.partial(callback, n=1)) all_done.add_done_callback(functools.partial(callback, n=2)) async def main(all_done): await register_callback(all_done) print("setting result of future") all_done.set_result("the result") event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() event_loop.run_until_complete(main(all_done)) finally: event_loop.close() ''' register callback on futures setting result of future future result: the result n:1 future result: the result n:2 '''
import asyncio ''' 任務是與事件循環交互的主要途徑之一。 任務能夠包裝協程,並跟蹤協程什麼時候完成。 因爲任務是Future的子類,因此其餘協程能夠等待任務,並且每一個任務能夠有一個結果,在它完成時能夠獲取這些結果 ''' # 啓動一個任務,可使用create_task函數建立一個Task實例。 # 只要循環還在運行並且協程沒有返回,create_task獲得的任務便會做爲事件循環管理的併發操做的一部分運行 async def task_func(): print("in task func") return "the result" async def main(loop): print("creating task") task = loop.create_task(task_func()) print(f"wait for {task}") return_value = await task print(f"task completed {task}") print(f"return value {return_value}") event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close() ''' creating task wait for <Task pending coro=<task_func() running at 5.asyncio.py:13>> in task func task completed <Task finished coro=<task_func() done, defined at 5.asyncio.py:13> result='the result'> return value the result ''' # 一開始task是pending狀態,而後執行結束變成了done # 這個Task是Futrue的子類,await task獲得的就是任務task的返回值。 # future.set_result的時候,就表明這個Future對象已經完成了,能夠調用註冊的回調函數了 # 那麼Task對象也是同樣,當這個協程已經return了,就表明這個協程完成了,那麼return的值就相似於set_result設置的值 # Task在註冊回調,調用相應的回調函數的時候,也能夠經過task.result方法獲取返回值。 # 那麼同理和Future對象同樣,使用await Task()也能夠直接獲取返回值
import asyncio ''' 經過create_task能夠建立對象,那麼也能夠在任務完成前取消操做 ''' async def task_func(): print("in task func") return "the result" async def main(loop): print("creating task") task = loop.create_task(task_func()) print("canceling task") task.cancel() print(f"canceled task: {task}") try: await task except asyncio.CancelledError: print("caught error from canceled task") else: print(f"task result: {task.result()}") event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
import asyncio ''' ensure_future函數返回一個與協程執行綁定的Task。 這個Task實例再傳遞到其餘代碼,這個代碼能夠等待這個實例,而無須知道原來的協程是如何構造或調用的 ''' async def task_func(): print("in task func") return "the result" async def main(loop): print("creating task") task = asyncio.ensure_future(task_func()) print(f"return value: {await task}") event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close() ''' creating task in task func return value: the result '''
import asyncio ''' 一系列協程之間的線性控制流用內置的await能夠很容易地管理。 更復雜的結構可能容許一個協程等待多個其餘協程並行完成,可使用asyncio中的工具建立這些更復雜的結構 一般能夠把一個操做劃分爲多個部分,而後分別執行,這會頗有用。 例如:採用這種方法,能夠高效的下載多個遠程資源或者查詢遠程API。 有些狀況下,執行順序並不重要,並且可能有任意多個操做,這種狀況下,可使用wait函數暫停一個協程,直到其餘後臺操做完成 ''' async def phase(i): print(f"in phase {i}") await asyncio.sleep(0.1 * i) print(f"done with phase {i}") return f"phase {i} result" async def main(): print("start main") phases = [phase(i) for i in range(3)] print("waiting for phases to complete") # 能夠await 一個協程,但若是是多個協程呢?能夠將其組合成一個列表,而後交給asyncio.wait函數,再對其進行await,就能夠等全部的協程了 # 會有兩個返回值,一個是已完成的任務,一個是未完成的任務 completed, pending = await asyncio.wait(phases) print(f"results: {[t.result() for t in completed]}") event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close() ''' start main waiting for phases to complete in phase 1 in phase 0 in phase 2 done with phase 0 done with phase 1 done with phase 2 results: ['phase 1 result', 'phase 0 result', 'phase 2 result'] ''' # 能夠看到順序貌似亂了,這是由於在內部,wait函數使用一個set來保存它建立的Task實例,這說明這些實例會按一種不可預知的順序啓動和完成。
import asyncio ''' 若是後臺階段是明確的,並且這些階段的結果很重要,那麼gather方法可能對等待多個操做頗有用 ''' async def phase(i): print(f"in phase {i}") await asyncio.sleep(0.1 * i) print(f"done with phase {i}") return f"phase {i} result" async def main(): print("start main") phases = [phase(i) for i in range(3)] print("waiting for phases to complete") # 當使用gather的時候,內部直接傳入多個任務便可 # 因此咱們還要將列表進行打散 # 而且和wait不同,返回值再也不是任務,而是任務的返回值,並且是完成的任務的返回值。 # 正由於是返回值,因此只有一個,而且順序和咱們添加任務的順序是一致的 # 無論後臺是怎麼執行了,返回值得順序和咱們添加任務的順序保持一致 completed = await asyncio.gather(*phases) print(f"results: {[t for t in completed]}") event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close() ''' start main waiting for phases to complete in phase 0 in phase 1 in phase 2 done with phase 0 done with phase 1 done with phase 2 results: ['phase 0 result', 'phase 1 result', 'phase 2 result'] '''
import asyncio async def task1(): return "task1" async def task2(): return "task2" async def task3(): return "task3" tasks = [task1(), task2(), task3()] event_loop = asyncio.get_event_loop() try: completed, pending = event_loop.run_until_complete(asyncio.wait(tasks)) finally: event_loop.close() for t in completed: print(t.result()) ''' task2 task3 task1 '''
import asyncio async def task1(): return "task1" async def task2(): return "task2" async def task3(): return "task3" tasks = [task1(), task2(), task3()] event_loop = asyncio.get_event_loop() try: completed = event_loop.run_until_complete(asyncio.gather(*tasks)) finally: event_loop.close() for t in completed: print(t) ''' task1 task2 task3 '''
import asyncio ''' as_completed函數是一個生成器,會管理指定的一個協程列表,並生成它們的結果,每一個協程結束運行時一次生成一個結果。 與wait相似,as_completed不能保證順序,從名字也能看出來,哪一個先完成哪一個先返回 ''' async def task1(): print("我是task1,我睡了3秒") await asyncio.sleep(3) print("我是task1,睡完了") return "task1" async def task2(): print("我是task2,我睡了1秒") await asyncio.sleep(1) print("我是task2,睡完了") return "task2" async def task3(): print("我是task3,我睡了2秒") await asyncio.sleep(2) print("我是task3,睡完了") return "task3" async def main(): print("start main") tasks = [task1(), task2(), task3()] for task in asyncio.as_completed(tasks): res = await task print(res) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close() ''' start main 我是task3,我睡了2秒 我是task2,我睡了1秒 我是task1,我睡了3秒 我是task2,睡完了 task2 我是task3,睡完了 task3 我是task1,睡完了 task1 '''
import asyncio ''' Lock能夠用來保護對一個共享資源的訪問,只有鎖的持有者可使用這個資源。 若是有多個請求要獲得這個鎖,那麼其將會阻塞,以保證一次只有一個持有者 ''' def unlock(lock): print("回調釋放鎖,否則其餘協程獲取不到。") print("但我是1秒後被調用,鎖又在只能經過調用我才能釋放,因此很遺憾,其餘協程要想執行,至少要1秒後了") lock.release() async def coro1(lock): print("coro1在等待鎖") # 使用async with語句很方便,是一個上下文。至關於幫咱們自動實現了開始的lock.acquire和結尾lock.release async with lock: print("coro1得到了鎖") print("coro1釋放了鎖") async def coro2(lock): print("coro2在等待鎖") async with lock: print("coro2得到了鎖") print("coro2釋放了鎖") async def main(loop): # 建立共享鎖 lock = asyncio.Lock() print("在開始協程以前建立一把鎖") await lock.acquire() print("鎖是否被獲取:", lock.locked()) # 執行回調將鎖釋放,否則協程沒法獲取鎖 loop.call_later(1, unlock, lock) # 運行想要使用鎖的協程 print("等待全部協程") await asyncio.wait([coro1(lock), coro2(lock)]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close() ''' 在開始協程以前建立一把鎖 鎖是否被獲取: True 等待全部協程 coro2在等待鎖 coro1在等待鎖 回調釋放鎖,否則其餘協程獲取不到。 但我是1秒後被調用,鎖又在只能經過調用我才能釋放,因此很遺憾,其餘協程要想執行,至少要1秒後了 coro2得到了鎖 coro2釋放了鎖 coro1得到了鎖 coro1釋放了鎖 '''
import asyncio ''' asyncio.Event基於threading.Event。它容許多個消費者等待某個事件發生,而沒必要尋找一個特定值與關聯 首先Event對象可使用set,wait,clear set:設置標誌位 wait:等待,在沒有set的狀況下,會阻塞。若是set以後,不會阻塞。 clear:清空標誌位 ''' def set_event(event): print("設置標誌位,由於協程會卡住,只有設置了標誌位纔會往下走") print("但我是一秒後才被調用,因此協程想往下走起碼也要等到1秒後了") event.set() async def coro1(event): print("coro1在這裏卡住了,快設置標誌位啊") await event.wait() print(f"coro1飛起來了,不信你看如今標誌位,是否設置標誌位:{event.is_set()}") async def coro2(event): print("coro1在這裏卡住了,快設置標誌位啊") await event.wait() print(f"coro2飛起來了,不信你看如今標誌位,是否設置標誌位:{event.is_set()}") async def main(loop): # 建立共享事件 event = asyncio.Event() # 如今設置標誌位了嗎? print("是否設置標誌位:", event.is_set()) # 執行回調將標誌位設置,否則協程卡住了 loop.call_later(1, set_event, event) # 運行卡住的的協程 print("等待全部協程") await asyncio.wait([coro1(event), coro2(event)]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close() ''' 是否設置標誌位: False 等待全部協程 coro1在這裏卡住了,快設置標誌位啊 coro1在這裏卡住了,快設置標誌位啊 設置標誌位,由於協程會卡住,只有設置了標誌位纔會往下走 但我是一秒後才被調用,因此協程想往下走起碼也要等到1秒後了 coro2飛起來了,不信你看如今標誌位,是否設置標誌位:True coro1飛起來了,不信你看如今標誌位,是否設置標誌位:True ''' # asyncio裏面的事件和threading裏面的事件的API是一致的。
import asyncio ''' asyncio.Queue爲協程提供了一個先進先出的數據結構,這與線程的queue.Queue或者進程裏面的Queue很相似 ''' async def consumer(q: asyncio.Queue, n): print(f"消費者{n}號 開始") while True: item = await q.get() print(f"消費者{n}號: 消費元素{item}") # 因爲咱們要開啓多個消費者,爲了讓其停下來,咱們添加None做爲停下來的信號 if item is None: # task_done是什麼意思?隊列有一個屬性,叫作unfinished_tasks # 每當咱們往隊列裏面put一個元素的時候,這個值就會加1, # 而且隊列還有一個join方法,表示阻塞,何時不阻塞呢?當unfinished_tasks爲0的時候。 # 所以咱們每put一個元素的時候,unfinished_tasks都會加上1,那麼當我get一個元素的時候,unfinished_tasks是否是也應該要減去1啊,可是咱們想多了 # get方法不會自動幫咱們作這件事,須要手動調用task_done方法實現 q.task_done() break else: await asyncio.sleep(3) q.task_done() async def producer(q: asyncio.Queue, consumer_num): print(f"生產者 開始") for i in range(20): await q.put(i) print(f"生產者: 生產元素{i},並放在了隊列裏") # 爲了讓消費者停下來,我就把None添加進去吧 # 開啓幾個消費者,就添加幾個None for i in range(consumer_num): await q.put(None) # 等待全部消費者執行完畢 # 只要unfinished_tasks不爲0,那麼q.join就會卡住,知道消費者所有消費完爲止 await q.join() print("生產者生產的東西全被消費者消費了") async def main(consumer_num): q = asyncio.Queue() consumers = [consumer(q, i) for i in range(consumer_num)] await asyncio.wait(consumers + [producer(q, consumer_num)]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close() ''' 生產者 開始 生產者: 生產元素0,並放在了隊列裏 生產者: 生產元素1,並放在了隊列裏 生產者: 生產元素2,並放在了隊列裏 生產者: 生產元素3,並放在了隊列裏 生產者: 生產元素4,並放在了隊列裏 生產者: 生產元素5,並放在了隊列裏 生產者: 生產元素6,並放在了隊列裏 生產者: 生產元素7,並放在了隊列裏 生產者: 生產元素8,並放在了隊列裏 生產者: 生產元素9,並放在了隊列裏 生產者: 生產元素10,並放在了隊列裏 生產者: 生產元素11,並放在了隊列裏 生產者: 生產元素12,並放在了隊列裏 生產者: 生產元素13,並放在了隊列裏 生產者: 生產元素14,並放在了隊列裏 生產者: 生產元素15,並放在了隊列裏 生產者: 生產元素16,並放在了隊列裏 生產者: 生產元素17,並放在了隊列裏 生產者: 生產元素18,並放在了隊列裏 生產者: 生產元素19,並放在了隊列裏 消費者1號 開始 消費者1號: 消費元素0 消費者0號 開始 消費者0號: 消費元素1 消費者2號 開始 消費者2號: 消費元素2 消費者1號: 消費元素3 消費者0號: 消費元素4 消費者2號: 消費元素5 消費者1號: 消費元素6 消費者0號: 消費元素7 消費者2號: 消費元素8 消費者1號: 消費元素9 消費者0號: 消費元素10 消費者2號: 消費元素11 消費者1號: 消費元素12 消費者0號: 消費元素13 消費者2號: 消費元素14 消費者1號: 消費元素15 消費者0號: 消費元素16 消費者2號: 消費元素17 消費者1號: 消費元素18 消費者0號: 消費元素19 消費者2號: 消費元素None 消費者1號: 消費元素None 消費者0號: 消費元素None 生產者生產的東西全被消費者消費了 '''
''' concurrent.futures模塊提供了使用工做線程或進程池運行任務的接口。 線程池和進程池的API是一致的,因此應用只須要作最小的修改就能夠在線程和進程之間進行切換 這個模塊提供了兩種類型的類與這些池交互。執行器(executor)用來管理工做線程或進程池,future用來管理計算的結果。 要使用一個工做線程或進程池,應用要建立適當的執行器類的一個實例,而後向它提交任務來運行。 每一個任務啓動時,會返回一個Future實例。須要任務的結果時,應用可使用Future阻塞,直到獲得結果。 目前已經提供了不一樣的API,能夠很方便地等待任務完成,因此不須要直接管理Future對象。 '''
from concurrent.futures import ThreadPoolExecutor import threading import time ''' ThreadPoolExecutor管理一組工做線程,當這些線程可用於完成更多工做時,能夠向他們傳入任務。 ''' def task(n): print(f"{threading.current_thread().name}開始睡覺了") time.sleep(n) print(f"{threading.current_thread().name}睡了{n}秒") return n exe = ThreadPoolExecutor() print("main: start") results = exe.map(task, [5, 3, 2, 1, 4]) print(results) print(list(results)) ''' main: start ThreadPoolExecutor-0_0開始睡覺了 ThreadPoolExecutor-0_1開始睡覺了 ThreadPoolExecutor-0_2開始睡覺了 ThreadPoolExecutor-0_3開始睡覺了 ThreadPoolExecutor-0_4開始睡覺了 <generator object Executor.map.<locals>.result_iterator at 0x0000000009F022A0> ThreadPoolExecutor-0_3睡了1秒 ThreadPoolExecutor-0_2睡了2秒 ThreadPoolExecutor-0_1睡了3秒 ThreadPoolExecutor-0_4睡了4秒 ThreadPoolExecutor-0_0睡了5秒 [5, 3, 2, 1, 4] ''' # 能夠看到map至關於開啓了一個線程,而後打印results,results是全部任務的返回值組合而成的序列,這裏是迭代器。 # 顯然裏面一開始是沒有值的,所以轉化list會卡住,正如future.result()同樣,任務沒有結束我就獲取不到值,那麼就會卡住 # 而後誰先睡完,誰先打印,因此是1,2,3,4,5 # 最後打印返回值比較特殊,能夠看到這是按照咱們添加的順序打印的。 # 能夠用一種不恰當的方式來理解 ''' results在當前這個示例中,就是五個future.result()組成的迭代器,咱們就用列表的展現 這5個future就是咱們map的順序來的,剛纔說了每進行一次map,等於開啓了一個線程。 [future.result(), future.result(), future.result(), future.result(), future.result()] 首先睡了1秒的執行結束,那麼結果就是這個。由於返回值爲1的任務是咱們第四個map添加的 [future.result(), future.result(), future.result(), 1, future.result()] 接下來睡了2秒的執行結束,那麼結果就是這個。由於返回值爲2的任務是咱們第三個map添加的 [future.result(), future.result(), 2, 1, future.result()] 接下來睡了3秒的執行結束,那麼結果就是這個。由於返回值爲3的任務是咱們第二個map添加的 [future.result(), 3, 2, 1, future.result()] 接下來睡了4秒的執行結束,那麼結果就是這個。由於返回值爲4的任務是咱們第五個map添加的 [future.result(), 3, 2, 1, 4] 最後睡了5秒的執行結束,那麼結果就是這個。由於返回值爲5的任務是咱們第一個map添加的 [5, 3, 2, 1, 4] 返回值只有當全部任務所有完成以後才能夠獲取,所以一開始只是至關於按照添加的順序佔了個坑,而後誰好了就把對應的坑給填上 '''
from concurrent.futures import ThreadPoolExecutor ''' 多個參數怎麼辦呢? ''' def task(name, age): print(f"name is {name}, age is {age}") exe = ThreadPoolExecutor() exe.map(task, ["mashiro", "satori", "koishi"], [16, 16, 15]) ''' name is mashiro, age is 16 name is satori, age is 16 name is koishi, age is 15 ''' ''' 傳參是把全部的name組合在一塊兒,把全部的age組合在一塊兒。 因此不是exe.map(task, ["mashiro", 16], ["satori", 16], ["koishi", 15])這種傳參方式 ''' # 但我若是有這種格式的數據呢? girls = [ ["mashiro", 16], ["satori", 16], ["koishi", 15] ] print(list(zip(*girls))) # [('mashiro', 'satori', 'koishi'), (16, 16, 15)] exe.map(task, *zip(*girls)) ''' name is mashiro, age is 16 name is satori, age is 16 name is koishi, age is 15 '''
from concurrent.futures import ThreadPoolExecutor import time ''' 除了使用map,還能夠藉助submit利用一個執行器調度單個任務。 而後可使用返回的future實例等待這個任務的結果 ''' def task(n): print(f"我接下來要睡{n}秒") time.sleep(n) return f"我睡了{n}秒" exe = ThreadPoolExecutor() f = exe.submit(task, 3) print(f) print(f.result()) ''' 我接下來要睡3秒 <Future at 0x2f88c50 state=running> 我睡了3秒 '''
from concurrent.futures import ThreadPoolExecutor, as_completed import time ''' 調用Future對象的result方法會阻塞,直到任務完成。 若是使用submit添加多個任務,f = [exe.submit(task, 3), exe.submit(task, 1), exe.submit(task, 2)] for i in f: print(i.result()) 最終獲得的仍是3, 1, 2 這和map是相似的,會依舊按照添加的順序返回。 若是任務處理的結果的順序不重要,可使用as_completed函數,哪一個任務先完成,哪一個先返回。能夠看到不少功能和asyncio模塊相似。 ''' def task(n): print(f"我接下來要睡{n}秒") time.sleep(n) return f"我睡了{n}秒" exe = ThreadPoolExecutor() f = [exe.submit(task, 3), exe.submit(task, 2), exe.submit(task, 1)] for i in f: print(i.result()) ''' 我接下來要睡3秒 我接下來要睡2秒 我接下來要睡1秒 我睡了3秒 我睡了2秒 我睡了1秒 ''' f = [exe.submit(task, 3), exe.submit(task, 2), exe.submit(task, 1)] for i in as_completed(f): print(i.result()) ''' 我接下來要睡3秒 我接下來要睡2秒 我接下來要睡1秒 我睡了1秒 我睡了2秒 我睡了3秒 ''' # 若是加上as_completed,那麼result的時候,那麼先返回哪一個先打印 # 沒有as_completed,那麼按照順序,若是結束就能打印 # 可是若是是map的話,必須等到全部任務所有結束,而後一次性返回
from concurrent.futures import ThreadPoolExecutor, Future import time ''' 要在任務完成時採起某個動做,不用顯示的等待結果,可使用add_done_callback指示Future完成時要調用一個新函數。 這個回調應當是有一個參數(Future實例)的callable函數。 這個asyncio是同樣的,每個任務均可以當作是是一個Task對象(Future對象的子類),當任務return的時候,會自動傳入給callback。 或者手動建立一個Future對象,當set_result的時候,也會自動執行callback函數 ''' def task(n): print(f"我接下來要睡{n}秒") time.sleep(n) return f"我睡了{n}秒" # 會自動傳進去,所以至少有一個參數用來接收 # 若是還須要指定額外的參數,也能夠寫進去,當咱們指定回調的時候就要指定偏函數了 def callback(future): print(future.result()) exe = ThreadPoolExecutor() f = exe.submit(task, 4) f.add_done_callback(callback) # 或者手動建立一個Future對象 future = Future() future.add_done_callback(callback) future.set_result("當我set_result以後,那麼會自動調用callback") ''' 我接下來要睡4秒 當我set_result以後,那麼會自動調用callback 我睡了4秒 '''
from concurrent.futures import ThreadPoolExecutor, Future import time ''' 若是一個Future已經提交但尚未提供,那麼能夠調用它的cancel方法將其撤銷 ''' def task(n): print(f"我接下來要睡{n}秒") time.sleep(n) return f"我睡了{n}秒" exe = ThreadPoolExecutor() task1 = exe.submit(task, 3) task2 = exe.submit(task, 1) task3 = exe.submit(task, 2) task3.cancel() ''' 我接下來要睡3秒 我接下來要睡1秒 我接下來要睡2秒 ''' # 能夠看到task3並無取消掉,由於任務已經添加到線程池裏面執行了。 # 但是咱們只有先submit以後才能cancel,一旦submit以後又不能cancel了,這樣不就死循環了嗎 # 因此咱們能夠限制池子的任務數 # 表示池子的最大任務數爲2 exe = ThreadPoolExecutor(max_workers=2) task1 = exe.submit(task, 3) task2 = exe.submit(task, 1) task3 = exe.submit(task, 2) # 儘管我添加了三個任務,可是最大數量爲2,因此task3尚未執行,只是在池子裏面排着隊呢 # 這個時候就能夠取消了 task3.cancel() ''' 我接下來要睡3秒 我接下來要睡1秒 ''' # 能夠看到此時的task3並無執行
from concurrent.futures import ThreadPoolExecutor, Future import time ''' 若是一個任務產生一個未處理的異常,那麼它會被保存到這個任務的Future,並且能夠經過result方法或者exception方法獲得 ''' def task(): 1/0 exe = ThreadPoolExecutor() f = exe.submit(task) print(f.exception()) # division by zero try: f.result() except Exception as e: print(e) # division by zero
from concurrent.futures import ThreadPoolExecutor, wait import time ''' 咱們在執行多個任務的時候,咱們但願等全部任務都執行完畢以後,再往下走,該怎麼作呢? ''' nums = [] def task(n): nums.append(n) exe = ThreadPoolExecutor() f1 = exe.submit(task, 1) f2 = exe.submit(task, 2) f3 = exe.submit(task, 3) f4 = exe.submit(task, 4) # 若是沒有這句話,nums會爲空,由於在子線程尚未來得及執行,print(nums)就已經打印了 wait([f1, f2, f3, f4]) print(nums) # [1, 2, 3, 4] # 此外還能夠有另外一種寫法,使用上下文的形式 with ThreadPoolExecutor() as exe: exe.submit(task, 1) exe.submit(task, 2) exe.submit(task, 3) exe.submit(task, 4) print(nums) # [1, 2, 3, 4, 1, 2, 3, 4]