一、基本概念程序員
除了順序執行和並行執行的模型之外,還有異步模型,這是事件驅動模型的基礎。異步活動的執行模型能夠只有一個單一的主控制流,能在單核心繫統和多核心繫統中運行。編程
在併發執行的異步模型中,許多任務被穿插在同一時間線上,全部的任務都由一個控制流執行(單一線程)。任務的執行可能被暫停或恢復,中間的這段時間線程將會執行其餘任務。大體以下:多線程
如上圖所示,任務(不一樣的顏色表示不一樣的任務)可能被其餘任務插入,可是都處在同一個線程下。這代表當某一個任務執行的時候,其餘任務都暫停了。與多線程編程模型很大的一點不一樣是,多線程的某個任務在時間線上何時掛起某個活動或恢復某個活動由系統決定,而在異步中,程序員必須假設線程可能在任什麼時候間被掛起和替換。併發
程序員能夠將任務編寫成許多能夠間隔執行的小步驟,若是一個任務須要另外一個任務的輸出,那麼被依賴的任務必須接收它的輸入。dom
二、使用Python的concurrent.futures模塊異步
這個模塊具備線程池和進程池、管理並行編程任務、處理非肯定性的執行流程、進程/線程同步等功能。async
此模塊由一下部分組成:函數
- concurrent.futures.Executor:這是一個虛擬基類,提供了異步執行的方法。oop
- submit(function, argument):調度函數(可調用的對象)的執行,將argument做爲參數傳入。性能
- map(function, argument):將argument做爲參數執行函數,以異步的方式。
- shutdown(Wait=True):發出讓執行者釋放全部資源的信號。
- concurrent.futures.Future:其中包括函數的異步執行。Future對象是submit任務(即帶有參數的functions)到executor的實例。
Executor是抽象類,能夠經過子類訪問,即線程或進程的ExecutorPools。由於線程或進程的實例是依賴於資源的任務,因此最好以池的形式將他們組織在一塊兒,做爲能夠重用的launcher和executor。
線程池和進程池是用於在程序中優化和簡化線程/進程的使用。經過池能夠提交任務給executor。池由兩部分組成,一部分是內部的隊列,存放着待執行的任務;另外一部分是一系列的進程或線程,用於執行這些任務。池的概念主要目的是爲了重用:讓線程或進程在生命週期內能夠屢次使用。他減小了建立線程和進程的開銷,提升了程序性能。重用不是必須的規則,但它是程序員在應用中使用池的主要緣由。
current.Futures提供了兩種Executor的子類,各自獨立操做一個線程池和一個進程池。這兩個子類分別是:
- concurrent.futures.ThreadPoolExecutor(max_workers)
- concurrent.futures.ProcessPoolExecutor(max_workers)
max_workers參數表示最多有多少個worker並行執行任務
代碼測試:
import concurrent.futures import time number_list = [1,2,3,4,5,6,7,8,9,10] def evaluate_item(x): #For time consuming result_item = count(x) return result_item def count(number): for i in range(0, 10000000): i = i + 1 return i * number if __name__ == "__main__": # Sequential execution start_time = time.time() for item in number_list: print(evaluate_item(item)) print("Sequential execution in %s seconds" %(str(time.time() - start_time))) # Thread pool execution start_time_1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Thread pool execution in %s seconds" %(str(time.time() - start_time_1))) # Process pool execution start_time_2 = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))
運行結果:
10000000 20000000 30000000 40000000 50000000 60000000 70000000 80000000 90000000 100000000 Sequential execution in 8.975373029708862 seconds 10000000 20000000 30000000 40000000 60000000 70000000 50000000 80000000 90000000 100000000 Thread pool execution in 8.699156045913696 seconds Process pool execution in 5.916198968887329 seconds
建立一個list存放10個數字,而後使用一個循環計算從1加到10000000,打印出和與number_list的乘積。
number_list = [1,2,3,4,5,6,7,8,9,10] def evaluate_item(x): #For time consuming result_item = count(x) return result_item def count(number): for i in range(0, 10000000): i = i + 1 return i * number
在主程序中,首先順序執行了一次程序並打印其執行時間:
start_time = time.time() for item in number_list: print(evaluate_item(item)) print("Sequential execution in %s seconds" %(str(time.time() - start_time)))
其次使用futures.ThreadPoolExecutor模塊的線程池並打印其時間:
start_time_1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Thread pool execution in %s seconds" %(str(time.time() - start_time_1)))
ThreadPoolExecutor使用線程池中的一個線程執行給定任務。池中一共有5個線程,每個線程從池中取得一個任務而後執行它,當任務執行完成,再從池中拿到另外一個任務。
最後是使用進程池:
start_time_2 = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))
和ThreadPoolExecutor同樣,ProcessPoolExecutor是一個executor,使用一個線程池來並行執行任務。由於ProcessPoolExecutor使用了多核處理的模塊,讓咱們能夠不受GIL的限制,大大縮短執行時間。
幾乎全部須要處理多個客戶端請求的服務應用都會使用池。也有應用要求須要當即執行,或者要求對任務的線程有更多的控制器,這種狀況下,池不是一個最佳選擇。
三、使用Asyncio管理事件循環
先入爲主:
import asyncio import datetime import time def function_1(end_time, loop): print("function_1 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_2, end_time, loop) else: loop.stop() def function_2(end_time, loop): print("function_2 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_3, end_time, loop) else: loop.stop() def function_3(end_time, loop): print("function_3 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_1, end_time, loop) else: loop.stop() def function_4(end_time, loop): print("function_4 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_4, end_time, loop) else: loop.stop() loop = asyncio.get_event_loop() print(loop.time()) end_loop = loop.time() + 9.0 print(end_loop) loop.call_soon(function_1, end_loop, loop) #loop.call_soon(function_4, end_loop, loop) loop.run_forever() loop.close()
執行結果:
上述例子定義了三個異步任務,相繼執行,如圖所示:
首先,咱們要獲得這個事件循環:
loop = asyncio.get_event_loop()
而後咱們經過call_soon方法調用了function_1()函數。
end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)
function_1:
def function_1(end_time, loop): print("function_1 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_2, end_time, loop) else: loop.stop()
- end_time定義了function_1能夠運行的最長時間,並經過call_later方法傳入到function_2中做爲參數
- loop經過get_event_loop()方法獲得的事件循環
任務執行結束以後,它會比較loop.time() + 1s和設定的運行時間,若是沒有超過,使用call_later在1秒以後執行function_2(),function_2和3做用相似
若是運行時間超過了設定,時間循環終止。
概念解釋:
Python的Asyncio模塊提供了管理事件、協程、任務和線程方法,以及編寫併發代碼的原語。主要組件和概念包括:
- 事件循環:在Asyncio模塊中,每個進程都有一個事件循環。
- 協程:這是子程序的泛化概念。協程能夠在執行期間暫停,這樣就能夠等待外部的處理(例如IO)完成以後,從以前暫停的地方恢復執行。
- Futures:定義了Future對象,和concurrent.futures模塊同樣,表示還沒有完成的計算。
- Tasks:這是Asyncio的子類,用於封裝和管理並行模式下的協程。
事件循環:
在計算機系統中,能夠產生事件的實體叫作事件源,能處理事件的實體叫作事件處理者,還有一些第三方實體叫作事件循環。它的做用是管理全部的事件,在整個程序運行過程當中不斷循環執行,追蹤事件發生的順序將他們放到隊列中,當主線程空閒的時候,調用相應的事件處理者處理事件。
Asyncio管理事件循環的方法:
- loop = get_event_loop():獲得當前上下文的事件循環。
- loop.call_later(time_delay, callback, argument):延後time_delay秒再執行callback方法。
- loop.call_soon(callback, argument):儘量快的調用callback。call_soon()函數結束,主線程回到事件循環以後就會立刻調用callback。
- loop.time():以float類型返回當前時間循環的內部時間。
- asyncio.set_event_loop():爲當前上下文設置時間循環。
- asyncio.new_event_loop():根據此策略建立一個新的時間循環並返回。
- loop.run_forever():在調用stop()以前將一直運行。run_forever真正開始執行函數。
四、使用Asyncio管理協程
上述例子中一個程序變得很大並且複雜時,將其劃分爲子程序,每一部分實現特定的任務。子程序不能單獨執行,只能在主程序的請求下執行,主程序負責協調使用各個子程序。協程是子程序的泛化,和子程序同樣的是,協程只負責計算任務的一步;不一樣的是協程沒有主程序來進行調度。由於協程經過管道鏈接在一塊兒,沒有監視函數負責順序調用他們。在協程中,執行點能夠被掛起,能夠被以前掛起的點恢復執行。經過協程池就能夠插入到計算中:運行第一個任務,直到它返回yield執行權,而後運行下一個,這樣順着執行下去。
這種插入的控制組件就是前文提到的事件循環,它持續追蹤全部的協程並執行它們。
協程的另一些重要特性以下:
- 協程能夠有多個入口點,並能夠yield屢次
- 協程能夠將執行權交給其餘協程
yield表示協程在此暫停,而且將執行權交給其餘協程,由於協程能夠將值與控制權一塊兒傳遞給另外一個協程,因此yield一個值就表示將值傳給下一個執行的協程。
測試用例:
import asyncio import time from random import randint @asyncio.coroutine def StartState(): print("Start State called \n") input_value = randint(0,1) time.sleep(1) print("I am StartState.input_value is %s" %input_value) if (input_value == 0): result = yield from State2(input_value) else: result = yield from State1(input_value) print("Resume of the Transition : \nStart State calling %s" %result) @asyncio.coroutine def State1(transition_value): outputValue = str("State 1 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State1.input_value is %s" %input_value) if input_value == 0: result = yield from State3(input_value) else: result = yield from State2(input_value) result = "State 1 calling %s" %result return outputValue + str(result) @asyncio.coroutine def State2(transition_value): outputValue = str("State 2 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State2.input_value is %s" %input_value) if input_value == 0: result = yield from State1(input_value) else: result = yield from State3(input_value) result = "State 2 calling %s" %result return outputValue + str(result) @asyncio.coroutine def State3(transition_value): outputValue = str("State 3 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State3.input_value is %s" %input_value) if input_value == 0: result = yield from State1(input_value) else: result = yield from EndState(input_value) result = "State 1 calling %s" %result return outputValue + str(result) @asyncio.coroutine def EndState(transition_value): outputValue = str("End State with transition value = %s \n" %transition_value) print("I am EndState.outputValue is %s" %outputValue) print("...Stop Computation...") return outputValue if __name__ == "__main__": print("Finite State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())
上述代碼爲使用Asyncio的協程來模擬有限狀態機(一個數學模型,不只在工程領域應用普遍,在科學領域也很著名)。模擬的狀態機以下:
系統有四個狀態,0和1是狀態機能夠從一個狀態到另外一個狀態的值,這個過程叫轉換。
運行結果(結果不惟一):
每個狀態都由一個裝飾器裝飾:@asyncio.coroutine
經過yield from命令調用下一個協程。
啓動事件循環:
if __name__ == "__main__": print("Finite State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())
五、使用Asyncio控制任務
Asyncio是用來處理事件循環中的異步進程和併發任務執行的。它還提供了asyncio.Task()類,能夠在任務中使用協程。它的做用是在同一事件循環中,運行某一個任務的同時能夠併發地運行多個任務。當協程被包在任務中,它會自動將任務和事件循環鏈接起來,當事件循環啓動的時候,任務自動運行。這樣就提供了一個能夠自動驅動協程的機制。
Asyncio模塊爲咱們提供了asyncio.Task(coroutine)方法來處理計算任務,它能夠調度協程的執行。任務對協程對象在事件循環的執行負責。若是被包裹的協程要從future yield,那麼任務會被掛起,等待future的計算結果。
當future計算完成,被包裹的協程將會拿到future返回的結果或異常(exception)繼續執行。另外,須要注意的是事件循環一次只能運行一個任務,除非還有其它事件循環在不一樣的線程並行運行,此任務纔有可能和其餘任務並行。當一個任務在等待future執行的期間,事件循環會運行一個新的任務。
測試用例:
import asyncio @asyncio.coroutine def factorial(number): f = 1 for i in range(2, number + 1): print("Asyncio.Task: Compute factorial(%s)" %i) yield from asyncio.sleep(0.5) f *= i print("Asyncio.Task - factorial(%s) = %s" %(number, f)) @asyncio.coroutine def fibonacci(number): a,b = 0,1 for i in range(number): print("Asyncio.Task: Compute fibonacci(%s)" %i) yield from asyncio.sleep(0.5) a, b = b, a+b print("Asyncio.Task - fibonacci(%s) = %s" %(number, a)) @asyncio.coroutine def binomialCoeff(n, k): result = 1 for i in range(1, k+1): result = result * (n-i+1)/i print("Asyncio.Task:Compute binomialCoeff(%s)" %i) yield from asyncio.sleep(0.5) print("Asyncio.Task - binomialCoeff(%s, %s) = %s" %(n, k, result)) if __name__ == "__main__": tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()
執行結果:
Asyncio.Task: Compute factorial(2) Asyncio.Task: Compute fibonacci(0) Asyncio.Task:Compute binomialCoeff(1) Asyncio.Task: Compute factorial(3) Asyncio.Task: Compute fibonacci(1) Asyncio.Task:Compute binomialCoeff(2) Asyncio.Task: Compute factorial(4) Asyncio.Task: Compute fibonacci(2) Asyncio.Task:Compute binomialCoeff(3) Asyncio.Task: Compute factorial(5) Asyncio.Task: Compute fibonacci(3) Asyncio.Task:Compute binomialCoeff(4) Asyncio.Task: Compute factorial(6) Asyncio.Task: Compute fibonacci(4) Asyncio.Task:Compute binomialCoeff(5) Asyncio.Task: Compute factorial(7) Asyncio.Task: Compute fibonacci(5) Asyncio.Task:Compute binomialCoeff(6) Asyncio.Task: Compute factorial(8) Asyncio.Task: Compute fibonacci(6) Asyncio.Task:Compute binomialCoeff(7) Asyncio.Task: Compute factorial(9) Asyncio.Task: Compute fibonacci(7) Asyncio.Task:Compute binomialCoeff(8) Asyncio.Task: Compute factorial(10) Asyncio.Task: Compute fibonacci(8) Asyncio.Task:Compute binomialCoeff(9) Asyncio.Task - factorial(10) = 3628800 Asyncio.Task: Compute fibonacci(9) Asyncio.Task:Compute binomialCoeff(10) Asyncio.Task - fibonacci(10) = 55 Asyncio.Task - binomialCoeff(20, 10) = 184756.0
上述例子定義了三個線程,factorial,fibonacci,binomialCoeff,每個都帶有asyncio.coroutine裝飾器:
將三個task放入到一個list中:
tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
經過run_until_complete並行運行三個協程,asyncio.wait(tasks)表示運行直到全部給定的協程都完成。
最後關閉事件循環:loop.close()
六、使用Asyncio和Futures
Asyncio模塊的另外一個重要的組件是Futures。它和concurrent.futures.Futures很像,可是針對Asyncio的事件循環作了不少定製。asyncio.Futures類表明還未完成的結果,有多是一個Exception,因此綜合來講,它是一種抽象的表明尚未作完的事情。
實際上,必須處理一些結果的回調函數被加入到了這個類的實例中。
基本方法:
- cancel():取消future的執行,調度回調函數
- result():返回future表明的結果
- exception():返回future中的Exception
- add_done_callback(fn):添加一個回調函數,當future執行的時候會調用這個回調函數
- remove_done_callback(fn):從call when done列表中移除全部的callback的實例
- set_result(result):將future標爲執行完成,而且設置result的值
- set_exception(exception):將future標爲執行完成,並設置Exception
測試用例:
# coding : utf-8 import asyncio import sys @asyncio.coroutine def first_coroutine(future, n): # 計算前n個數的和 count = 0 for i in range(1, n+1): count = count + i print("first yield") yield from asyncio.sleep(2) print("first_coroutine finished") # 將future標記爲已完成,並設置result的值 future.set_result("first coroutine (sum of n integers) result = %s" %str(count)) @asyncio.coroutine def second_coroutine(future, n): count = 1 for i in range(2, n+1): count *= i print("second yield") yield from asyncio.sleep(1) print("second_coroutine finished") future.set_result("second coroutine (factorial) result = %s" %str(count)) def got_result(future): # 獲取future的set_result結果 print(future.result()) if __name__ == "__main__": N1 = int(sys.argv[1]) N2 = int(sys.argv[2]) loop = asyncio.get_event_loop() future1 = asyncio.Future() future2 = asyncio.Future() tasks = [first_coroutine(future1, N1), second_coroutine(future2, N2)] # 添加回調函數 future1.add_done_callback(got_result) future2.add_done_callback(got_result) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
運行結果: