世界是複雜的,每一種思想都是爲了解決某些現實問題而簡化成的模型,想解決就得先面對,面對就須要選擇角度,角度決定了模型的質量, 喜歡此UP主湯質看本質的哲學科普,其中簡潔又不失細節的介紹了人類解決問題的思路,以及由概念搭建的思惟模型對人類解決問題的重要性與限制.也認識到學習的本質就是: 認識獲取(瞭解概念) -> 知識學習(創建模型) -> 技能訓練(實踐)html
閱讀也好, 學習也好, 妨礙咱們「理解」的障礙主要有兩個:java
也就是說 概念明確 + 關係明確, 才能構成「模型」, 對照「現象」, 造成「理解」。python
在理解編程知識時能夠關鍵概括爲兩點:理解核心概念羣+使用場景思考與故事化講述react
這裏特別推薦碼農翻身中大話編程式的科普:linux
碼農翻身整年文章精華git
併發思想的一些探尋併發之痛 Thread, Goroutine, Actor中有較好的總結:
陳力就列, 不能者止 能幹活的代碼片斷就放在線程裏, 若是幹不了活(須要等待, 被阻塞等), 就摘下來。通俗的說就是不要佔着茅坑不拉屎, 若是拉不出來, 須要醞釀下, 先把茅坑讓出來, 由於茅坑是稀缺資源。程序員
要作到這點通常有兩種方案:github
典型如NodeJS, 遇到阻塞的狀況, 好比網絡調用, 則註冊一個回調方法(其實還包括了一些上下文數據對象)給IO調度器(linux下是libev, 調度器在另外的線程裏), 當前線程就被釋放了, 去幹別的事情了。等數據準備好, 調度器會將結果傳遞給回調方法而後執行, 執行其實不在原來發起請求的線程裏了, 但對用戶來講無感知。但這種方式的問題就是很容易遇到callback hell, 由於全部的阻塞操做都必須異步, 不然系統就卡死了。還有就是異步的方式有點違反人類思惟習慣, 人類仍是習慣同步的方式。golang
這種方案其實和上面的方案本質上區別不大, 關鍵在於回調上下文的保存以及執行機制。爲了解決回調方法帶來的難題, 這種方案的思路是寫代碼的時候仍是按順序寫, 但遇到IO等阻塞調用時, 將當前的代碼片斷暫停, 保存上下文, 讓出當前線程。等IO事件回來, 而後再找個線程讓當前代碼片斷恢復上下文繼續執行, 寫代碼的時候感受好像是同步的, 彷彿在同一個線程完成的, 但實際上系統可能切換了線程, 但對程序無感。算法
GreenThread
* 用戶空間 首先是在用戶空間, 避免內核態和用戶態的切換致使的成本。 * 由語言或者框架層調度 * 更小的棧空間容許建立大量實例(百萬級別)
幾個概念
* Continuation 這個概念不熟悉FP編程的人可能不太熟悉, 不過這裏能夠簡單的顧名思義, 能夠理解爲讓咱們的程序能夠暫停, 而後下次調用繼續(contine)從上次暫停的地方開始的一種機制。至關於程序調用多了一種入口。 * Coroutine 是Continuation的一種實現, 通常表現爲語言層面的組件或者類庫。主要提供yield, resume機制。 * Fiber 和Coroutine實際上是一體兩面的, 主要是從系統層面描述, 能夠理解成Coroutine運行以後的東西就是Fiber。
Goroutine其實就是前面GreenThread系列解決方案的一種演進和實現。
程序員修神之路--分佈式高併發下Actor模型如此優秀中說:
傳統多數流行的語言併發是基於多線程之間的共享內存, 使用同步方法防止寫爭奪, Actors使用消息模型, 每一個Actor在同一時間處理最多一個消息, 能夠發送消息給其餘Actor, 保證了單獨寫原則。從而巧妙避免了多線程寫爭奪。和共享數據方式相比, 消息傳遞機制最大的優勢就是不會產生數據競爭狀態。實現消息傳遞有兩種常見的類型:基於channel(golang爲典型表明)的消息傳遞和基於Actor(erlang爲表明)的消息傳遞。兩者的格言都是:"Don’t communicate by sharing memory, share memory by communicating"
每一個進程各自有不一樣的用戶地址空間,任何一個進程的全局變量在另外一個進程中都看不到, 因此進程之間要交換數據必須經過內核,在內核中開闢一塊緩衝區,進程A把數據從用戶空間拷到內核緩衝區,進程B再從內核緩衝區把數據讀走,內核提供的這種機制稱爲進程間通訊。
進程間幾種通訊方式:
管道:速度慢, 容量有限, 只有父子進程能通信 FIFO:任何進程間都能通信, 但速度慢 消息隊列:容量受到系統限制, 且要注意第一次讀的時候, 要考慮上一次沒有讀完數據的問題 信號量:不能傳遞複雜消息, 只能用來同步 5.共享內存區:可以很容易控制容量, 速度快, 但要保持同步, 好比一個進程在寫的時候, 另外一個進程要注意讀寫的問題, 至關於線程中的線程安全, 固然, 共享內存區一樣能夠用做線程間通信, 不過沒這個必要, 線程間原本就已經共享了同一進程內的一塊內存 Socket通訊(又名客戶機服務器系統)
Python 爲進程通訊提供了兩種機制:
Queue:一個進程向 Queue 中放入數據, 另外一個進程從 Queue 中讀取數據。如multiprocessing.Queue() Pipe:Pipe 表明鏈接兩個進程的管道。程序在調用 Pipe() 函數時會產生兩個鏈接端, 分別交給通訊的兩個進程, 接下來進程既可從該鏈接端讀取數據, 也可向該鏈接端寫入數據。如multiprocessing.Pipe()
方式有不少種,其餘模型中也都能在此找到影子.
CSP(communicating sequential processes)模型裏消息和Channel是主體
也就是說發送方須要關心本身的消息類型以及應該寫到哪一個Channel, 但不須要關心誰消費了它, 以及有多少個消費者。
Golang是本身解決的通訊問題, 從概念上就當消息隊列理解, 可是技術上, golang用的不是linux的消息隊列.
Actor模型是1973年提出的一個分佈式併發編程模式, 在Erlang語言中獲得普遍支持和應用。
Actor模型和CSP模型的區別 CSP模型和Actor模型是兩門很是復古且外形接近的併發模型。但CSP與Actor有如下幾點比較大的區別:
以上的鋪墊應該對併發涉及到的概念有清晰的認識,也能發現這些概念都不是go或python原創的,這裏有較好的總結
Go/Python/Erlang編程語言對比分析及示例 說:
Go的不少語言特性借鑑與它的三個祖先:C, Pascal和CSP。Go的語法、數據類型、控制流等繼承於C, Go的包、面對對象等思想來源於Pascal分支, 而Go最大的語言特點, 基於管道通訊的協程併發模型, 則借鑑於CSP分支。
不要用共享內存來通訊, 要用通訊來共享內存大概是golang在推廣中最容易被人說起的了,相似python之禪同樣.
Golang調度器有三個主要數據結構。
G (goroutine) 協程, 被Golang語言自己管理的線程 舉例來講, func main() { go other() }, 這段代碼建立了兩個goroutine, 一個是main, 另外一個是other, 注意main自己也是一個goroutine. goroutine的新建, 休眠, 恢復, 中止都受到go運行時的管理. goroutine執行異步操做時會進入休眠狀態, 待操做完成後再恢復, 無需佔用系統線程, goroutine新建或恢復時會添加到運行隊列, 等待M取出並運行. M (machine) 操做系統的線程, 被操做系統管理的, 原生線程 M能夠運行兩種代碼: go代碼, 即goroutine, M運行go代碼須要一個P 原生代碼, 例如阻塞的syscall, M運行原生代碼不須要P M會從運行隊列中取出G, 而後運行G, 若是G運行完畢或者進入休眠狀態, 則從運行隊列中取出下一個G運行, 周而復始. 有時候G須要調用一些沒法避免阻塞的原生代碼, 這時M會釋放持有的P並進入阻塞狀態, 其餘M會取得這個P並繼續運行隊列中的G. go須要保證有足夠的M能夠運行G, 不讓CPU閒着, 也須要保證M的數量不能過多. P (process) 調度的上下文, 運行在M上的調度器。 P是process的頭文字, 表明M運行G所須要的資源. 一些講解協程的文章把P理解爲cpu核心, 其實這是錯誤的. 雖然P的數量默認等於cpu核心數, 但能夠經過環境變量GOMAXPROC修改, 在實際運行時P跟cpu核心並沒有任何關聯. P也能夠理解爲控制go代碼的並行度的機制, 若是P的數量等於1, 表明當前最多隻能有一個線程(M)執行go代碼, 若是P的數量等於2, 表明當前最多隻能有兩個線程(M)執行go代碼. 執行原生代碼的線程數量不受P控制. 由於同一時間只有一個線程(M)能夠擁有P, P中的數據都是鎖自由(lock free)的, 讀寫這些數據的效率會很是的高.
計算機科學領域的任何問題均可以經過增長一個間接的中間層來解決 -- G-P-M模型正是此理論踐行者,此理論也用到了python的asyncio對地獄回調的處理上(使用Task+Future避免回調嵌套),是否是巧合?
其實異步≈可中斷的函數+事件循環+回調,go和python都把嵌套結構轉換成列表結構有點像算法中的遞歸轉迭代.
G的狀態
M的狀態
M並無像G和P同樣的狀態標記, 但能夠認爲一個M有如下的狀態:
P的狀態
Golang 的協程本質上其實就是對 IO 事件的封裝, 而且經過語言級的支持讓異步的代碼看上去像同步執行的同樣。
能夠參考這裏:
本段落涉及的代碼基本是對深刻理解Python異步編程(上) 的註解,以前也學習過yield,也總結了幾回,
但以前都沒有把事件循環聯繫進來,感性的知道python中的協程就是靠:"事件循環 + 回調",其中細節一直沒深刻看,asyncio源碼也看過幾回,也是蜻蜓點水.此次偶然看到這麼有系統且有示例代碼輔助的文章,因此下面的東西不少都來自此文章以及對其代碼的註解.
在asyncio正式轉正前,就有不少人和庫嘗試了其餘方式,如:
stackless 的通道(channel)
yield和greenlet
gevent
先了解 py3.3 -> py3.8 之間的異步方式演進,建議使用官方yield例子,在idea中debug調試運行,着重看函數中yield處中斷執行後又如何被恢復,其實主要就是經過next或send讓函數恢復執行.而後就是找到那些next和send以及是被怎麼推進的
總結來講,協程就是對能夠中斷/恢復執行的函數的調度.
題外話閱讀源碼的三種境界
1. 生成器 2. 用於定義上下文管理器 3. 協程 4. 配合 from 造成 yield from 用於消費子生成器並傳遞消息
這四種用法, 其實都源於 yield 所具備的暫停的特性, 也就說程序在運行到 yield 所在的位置 result = yield expr 時, 先執行 yield expr 將產生的值返回給調用生成器的 caller
, 而後暫停, 等待 caller 再次激活並恢復程序的執行。而根據恢復程序使用的方法不一樣, yield expr 表達式的結果值 result 也會跟着變化。
若是使用 next() 來調用, 則 yield 表達式的值 result 是 None;若是使用 send() 來調用, 則 yield 表達式的值 result 是經過 send 函數傳送的值。
yield from 一方面能夠迭代地消耗生成器, 另外一方面則創建了一條雙向通道, 把最外層的調用方與最內層的子生成器鏈接起來, 並自動地處理異常, 接收子生成器返回的值。
yield from 更多地被用於協程, 而 await 關鍵字的引入會大大減小 yield from 的使用頻率。
實現yield from語法的僞代碼以下:
""" _i:子生成器, 同時也是一個迭代器 _y:子生成器生產的值 _r:yield from 表達式最終的值 _s:調用方經過send()發送的值 _e:異常對象 """ #簡化版 _i = iter(EXPR) try: _y = next(_i) except StopIteration as _e: _r = _e.value else: while 1: try: _s = yield _y except StopIteration as _e: _r = _e.value break RESULT = _r #完整版 _i = iter(EXPR) try: _y = next(_i) except StopIteration as _e: _r = _e.value else: while 1: try: _s = yield _y except GeneratorExit as _e: try: _m = _i.close except AttributeError: pass else: _m() raise _e except BaseException as _e: _x = sys.exc_info() try: _m = _i.throw except AttributeError: raise _e else: try: _y = _m(*_x) except StopIteration as _e: _r = _e.value break else: try: if _s is None: _y = next(_i) else: _y = _i.send(_s) except StopIteration as _e: _r = _e.value break RESULT = _r
參考 yield_to_from.py,劃分一下方便理解:
一、調用方:調用委派生成器的客戶端(調用方)代碼 二、委託生成器:包含yield from 表達式的生成器函數 三、子生成器:yield from 後面加的生成器函數
有不清晰的地方,就在IDE中debug下,着重來看包含yield的函數之間的跳轉,以及yield from存在的意義.
n = m = 5 flag = "stop" # 子生成器中止信號,此例子中是有調用者控制,也能夠改寫成子生成器控制,調用者檢查到信號還中止迭代子生成器. """ 一、調用方:調用委派生成器的客戶端(調用方)代碼 二、委託生成器:包含yield from 表達式的生成器函數 三、子生成器:yield from 後面加的生成器函數 重點:yield讓函數中斷執行,next或send讓函數恢復執行,使用debug查看各個函數間的跳轉,或者直接運行,看print打印. """ def gen(): # 子生成器 print("start 子生成器") # for k in range(n): # 有限子生成器 k = "k" while True: # 無限子生成器 print("子生成器--要返回的值:", k) t = yield k # 1.運行到這裏就會停下來,切換到其餘地方,等待send或next觸發後再今後處繼續執行 2.yield功能至關於golang中的chan,可接受可發送 print("子生成器--接受到的值:", t) if t is flag: break print("end 子生成器") return "這就是result" # 生成器退出時, 生成器(或子生成器)中的return expr表達式會出發StopIteration(expr)異常拋出 def proxy_gen(): # 委託生成器--相似go-chan # 在調用方與子生成器之間創建一個雙向通道,調用方能夠經過send()直接發送消息給子生成器,而子生成器yield的值,也是直接返回給調用方 # while True: result = yield from gen() print("委託生成器result:", result) yield result def main1(): # 調用方1--不經過proxy_gen迭代子生成器 g = gen() # 子生成器 print(g.send(None)) print(g.send(1)) # 發送1到子生成器中 print(next(g)) try: print(g.send(flag)) # 不使用委託器 子生成器的中止信號就得手動處理 except StopIteration as e: print("StopIteration") print("子生成器return的值:", e.value) def main2(): # 調用方2--經常使用迭代 g = proxy_gen() g.send(None) # 須要先激活子生成器,不然會報錯 TypeError: can't send non-None value to a just-started generator for k in range(m): print("調用方--要發送的值:", k) print("調用方--接受到的值:", g.send(k)) print("--------------------") g.send(flag) # 針對無限子生成器的中止信號 def main3(): # 調用方3--死循環 g = proxy_gen() g.send(None) # 須要先激活子生成器,不然會報錯 TypeError: can't send non-None value to a just-started generator for k in g: # for調用能完整的遍歷生成器,遍歷的時候已經調用了__next__,至關於g.send(None) print("調用方--接受到的值:", k) print("調用方--要發送的值:", g.send("m")) print("調用方--接受到的值:", k) print("--------------------") print("*********************") main1() print("*********************") main2() print("*********************") main3() print("*********************")
包含yield語句的函數就是一個生成器對象, 調用一個生成器函數, 返回的是一個迭代器對象。迭代器Iterator表示的是一個數據流, 迭代器能夠被next()函數調用並不斷返回下一個數據, 直到沒有數據時拋出StopIteration錯誤。迭代器控制生成器函數的執行, 當函數開始運行, 執行到第一個yield語句時暫停, 將yield表達式後的表達式的值返回給調用者。
在生成器函數暫停時, 其現階段的狀態都被保存下來, 包括生成器函數局部變量當前綁定的值、指令指針、函數內部執行堆棧以及任何異常狀態的處理。當生成器函數再次被調用時則直接從上次暫停的yield表達式處接着運行, 直到遇到下一個yield語句, 或者沒有遇到yield語句則運行結束。
須要說明的是, 在函數從新運行時, 其實上次暫停處的yield表達式會先接收一個值做爲結果, 而後才接着運行直到碰到下一個yield表達式。
若是調用者使用next函數或者__next__()方法, 則默認返回給yield表達式None值;使用send()方法則傳遞一個值做爲yield表達式的結果。
對於簡單的迭代器, yield from iterable本質上等於for item in iterable: yield item的縮寫版(iterable 也能夠是generator),yield 和 send(next)成對出現,有點相似於go中的chan,彼此通知對方數據到位請繼續執行下去
通常將yield from視爲提供了一個調用者和子生成器之間的透明的雙向通道。包括從子生成器獲取數據以及向子生成器傳送數據。
總結:
asyncio是Python 3.4 試驗性引入的異步I/O框架(PEP 3156), 提供了基於協程作異步I/O編寫單線程併發代碼的基礎設施。其核心組件有事件循環(Event Loop)、協程(Coroutine)、任務(Task)、將來對象(Future)以及其餘一些擴充和輔助性質的模塊。
實現原理:
事件循環+回調
有一個任務調度器(event loop), 而後能夠用async def定義異步函數做爲任務邏輯, 經過create_task接口把任務掛到event loop上。
event loop的運行過程應該是個不停循環的過程, 不停查看等待類別有沒有能夠執行的任務, 若是有的話執行任務, 直到碰到await之類的主動讓出event loop的函數, 如此反覆。
如果看源碼的你就會發現使用yield和yield from實現協程也會用到相似EventLoop,Future,Future,Coroutine的東西,這在下面的示例部分再次看到.
對比生成器版的協程, 使用asyncio庫後變化很大:
* 沒有了yield 或 yield from, 而是async/await * 沒有了自造的loop(), 取而代之的是asyncio.get_event_loop() * 無需本身在socket上作異步操做, 不用顯式地註冊和註銷事件, aiohttp庫已經代勞 * 沒有了顯式的 Future 和 Task, asyncio已封裝
更少許的代碼, 更優雅的設計
分別使用yield,yield from,asyncio 模擬協程,併發的爬幾個url的代碼.
__doc__ = '如何使用yield完成協程(簡化版的asyncio)' import socket from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ selector = DefaultSelector() stopped = False host = "127.0.0.1" # 自建一個簡單服務,模擬一個設置每一個請求須要等待1s才返回結果 port = 5000 urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} # urls_todo = {'/'} # 在單線程內作協做式多任務調度 # 要異步,必回調 # 但爲了不地獄式回調,將回調一拆爲三,回調鏈變成了Future-Task-Coroutine # 下面的註解都是爲了能方便理解Future-Task-Coroutine之間的互動以及怎麼串起來的. """ 無鏈式調用 selector的回調裏只管給future設置值, 再也不關心業務邏輯 loop 內回調callback()再也不關注是誰觸發了事件,由於協程可以保存本身的狀態, 知道本身的future是哪一個。也不用關心到底要設置什麼值, 由於要設置什麼值也是協程內安排的。 已趨近於同步代碼的結構 無需程序員在多個協程之間維護狀態, 例如哪一個纔是本身的sock """ """ 1.建立Crawler 實例; 2.調用fetch方法, 會建立socket鏈接和在selector上註冊可寫事件; 3.fetch內並沒有阻塞操做, 該方法當即返回; 4.重複上述3個步驟, 將10個不一樣的下載任務都加入事件循環; 5.啓動事件循環, 進入第1輪循環, 阻塞在事件監聽上; 6.當某個下載任務EVENT_WRITE被觸發, 回調其connected方法, 第一輪事件循環結束; 7.進入第2輪事件循環, 當某個下載任務有事件觸發, 執行其回調函數;此時已經不能推測是哪一個事件發生, 由於有多是上次connected裏的EVENT_READ先被觸發, 也多是其餘某個任務的EVENT_WRITE被觸發;(此時, 原來在一個下載任務上會阻塞的那段時間被利用起來執行另外一個下載任務了) 8.循環往復, 直至全部下載任務被處理完成 9.退出事件循環, 結束整個下載程序 """ # 異步調用執行完的時候, 就把結果放在它裏面。這種對象稱之爲將來對象。 # 暫存task執行的結果和回調 class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): # 各階段的回調 self._callbacks.append(fn) def set_result(self, result): self.result = result # 調用結果,b'http請求的結果字符' for fn in self._callbacks: # 重要,回調函數集 fn(self) # Task.step class Task: def __init__(self, coro): self.coro = coro # Crawler(url).fetch() f = Future() # f.set_result(None) # 感受這句不是很必要 self.step(f) # 預激活 def step(self, future): # 管理fetch生成器: 第一次的激活/暫停後的恢復執行/以及配合set_result循環調用 try: # send會進入到coro執行, 即fetch, 直到下次yield # next_future 爲yield返回的對象,也就是下一次要調用的Future對象 next_future = self.coro.send(future.result) # __init__中的第一次step,將fetch運行到的82行的yield, # 返回EVENT_WRITE時的事件回調要用的future,而後等事件觸發,由select調用on_connected,進而繼續future中的回調 except StopIteration: return next_future.add_done_callback(self.step) # 這裏須要重點理解,爲下一次要調用的Future對象,註冊下一次的step,供on_readable調用 # Coroutine yield實現的協程 class Crawler: def __init__(self, url): self.url = url self.response = b'' def fetch(self): # 函數內有了yield表達式,就是生成器了,生成器須要先調用next()迭代一次或者是先send(None)啓動,遇到yield以後便暫停 sock = socket.socket() sock.setblocking(False) try: sock.connect((host, port)) except BlockingIOError: pass f = Future() # 每到一個io事件都註冊一個對應的Future def on_connected(): # pass # 若沒有f.set_result,會報錯KeyError: '236 (FD 236) is already registered' f.set_result(None) # 必要語句,還涉及到恢復回調 selector.register(sock.fileno(), EVENT_WRITE, on_connected) # 鏈接io寫事件 yield f # 註冊完就yield出去,等待事件觸發 selector.unregister(sock.fileno()) get = 'GET {0} HTTP/1.0\r\nHost: example.com\r\n\r\n'.format(self.url) # self.url 區分每一個協程 sock.send(get.encode('ascii')) global stopped while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) # 可讀的狀況下,讀取4096個bytes暫存給Future,執行回調,使生成器繼續執行下去 selector.register(sock.fileno(), EVENT_READ, on_readable) # io讀事件 chunk = yield f # 返回f,並接受step中send進來的future.result值,也就是暫存的請求返回字符 selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo.remove(self.url) if not urls_todo: stopped = True break print("result:", self.response) def loop(): while not stopped: # 阻塞, 直到一個事件發生 events = selector.select() for event_key, event_mask in events: # 監聽事件,觸發回調,推進協程運行下去 callback = event_key.data # 就是 on_connected,和 on_readable callback() if __name__ == '__main__': import time start = time.time() for url in urls_todo: crawler = Crawler(url) Task(crawler.fetch()) loop() print(time.time() - start)
__doc__ = '如何使用yield from完成協程(簡化版的asyncio)' import socket from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE selector = DefaultSelector() stopped = False host = "127.0.0.1" # 自建一個簡單服務,模擬一個設置每一個請求須要等待1s才返回結果 port = 5000 urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} # urls_todo = {'/'} # 在單線程內作協做式多任務調度 # 要異步,必回調 # 但爲了不地獄式回調,將回調一拆爲三,回調鏈變成了Future-Task-Coroutine # 下面的註解都是爲了能方便理解Future-Task-Coroutine之間的互動以及怎麼串起來的. """ 無鏈式調用 selector的回調裏只管給future設置值, 再也不關心業務邏輯 loop 內回調callback()再也不關注是誰觸發了事件,由於協程可以保存本身的狀態, 知道本身的future是哪一個。也不用關心到底要設置什麼值, 由於要設置什麼值也是協程內安排的。 已趨近於同步代碼的結構 無需程序員在多個協程之間維護狀態, 例如哪一個纔是本身的sock """ """ 1.建立Crawler 實例; 2.調用fetch方法, 會建立socket鏈接和在selector上註冊可寫事件; 3.fetch內並沒有阻塞操做, 該方法當即返回; 4.重複上述3個步驟, 將10個不一樣的下載任務都加入事件循環; 5.啓動事件循環, 進入第1輪循環, 阻塞在事件監聽上; 6.當某個下載任務EVENT_WRITE被觸發, 回調其connected方法, 第一輪事件循環結束; 7.進入第2輪事件循環, 當某個下載任務有事件觸發, 執行其回調函數;此時已經不能推測是哪一個事件發生, 由於有多是上次connected裏的EVENT_READ先被觸發, 也多是其餘某個任務的EVENT_WRITE被觸發;(此時, 原來在一個下載任務上會阻塞的那段時間被利用起來執行另外一個下載任務了) 8.循環往復, 直至全部下載任務被處理完成 9.退出事件循環, 結束整個下載程序 """ # 結果保存, 每個處須要異步的地方都會調用, 保持狀態和callback # 程序得知道當前所處的狀態, 並且要將這個狀態在不一樣的回調之間延續下去。 class Future: def __init__(self): self.result = None # 重要參數1 self._callbacks = [] # 重要參數2 def add_done_callback(self, fn): # 各階段的回調 self._callbacks.append(fn) def set_result(self, result): self.result = result # 調用結果,b'http請求的結果字符' for fn in self._callbacks: fn(self) # 執行Task.step def __iter__(self): """ yield的出現使得__iter__函數變成一個生成器, 生成器自己就有next方法, 因此不須要額外實現。 yield from x語句首先調用iter(x)獲取一個迭代器(生成器也是迭代器) """ yield self # 外面使用yield from把f實例自己返回 return self.result # 在Task.step中send(result)的時候再次調用這個生成器, 可是此時會拋出stopInteration異常, 而且把self.result返回 # 激活包裝的生成器, 以及在生成器yield後恢復執行繼續下面的代碼 class Task: def __init__(self, coro): # Crawler(url).fetch() self.coro = coro f = Future() # f.set_result(None) self.step(f) # 激活Task包裹的生成器 def step(self, future): try: # next_future = self.coro.send(future.result) next_future = self.coro.send(None) # 驅動future # next_future = future.send(None) # 這樣是錯誤的 # __init__中的第一次step,將fetch運行到的82行的yield, # 返回EVENT_WRITE時的事件回調要用的future,而後等事件觸發,由select調用on_connected,進而繼續future中的回調 except StopIteration: return next_future.add_done_callback(self.step) # 這裏須要重點理解,爲下一次要調用的Future對象,註冊下一次的step,供on_readable調用 # 異步就是能夠暫定的函數, 函數間切換的調度靠事件循環,yield 正好能夠中斷函數運行 # Coroutine yield實現的協程 # 將yield_demo.py中的Crawler進行了拆解,並使用yield from class Crawler: def __init__(self, url): self.url = url self.response = b"" def fetch(self): # 委託生成器,參考yield_to_from.py global stopped sock = socket.socket() yield from connect(sock, (host, port)) get = "GET {0} HTTP/1.0\r\nHost:example.com\r\n\r\n".format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) print(self.response) urls_todo.remove(self.url) if not urls_todo: stopped = True # 鏈接事件的子協程:註冊+回調 def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f # f須要可迭代,須要新增Future.__iter__ selector.unregister(sock.fileno()) # 可讀事件的子協程:註冊+回調 def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) # 註冊一個文件對象以監聽其IO事件; """ 此處的chunck接收的是f中return的f.result, 同時會跑出一個stopIteration的異常, 只不過被yield from處理了。 這裏也可直接寫成chunck = yiled f """ chunck = yield from f # f須要可迭代,須要新增Future.__iter__ selector.unregister(sock.fileno()) # 從selection中註銷文件對象, 即從監聽列表中移除它; 文件對象應該在關閉前註銷. return chunck # 委託生成器,參考yield_to_from.py,生成器的嵌套 def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) # yield from來解決生成器裏玩生成器的問題 result = b"".join(response) print("result:", result) # 打印下結果吧 return result # 事件驅動, 讓全部以前註冊的callback運行起來 def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: # 監聽事件,觸發回調,推進協程運行下去 callback = event_key.data # data就是 on_connected,和 on_readable callback() if __name__ == "__main__": import time start = time.time() for url in urls_todo: crawler = Crawler(url) Task(crawler.fetch()) # 將各生成器和對應的callback註冊到事件循環loop中, 並激活生成器 loop() print(time.time() - start)
__doc__ = "使用asyncio" import asyncio import aiohttp host = 'http://127.0.0.1:5000' urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} loop = asyncio.get_event_loop() async def fetch(url): async with aiohttp.ClientSession(loop=loop) as session: async with session.get(url) as response: response = await response.read() print("result:", response) return response if __name__ == '__main__': import time start = time.time() tasks = [fetch(host + url) for url in urls_todo] loop.run_until_complete(asyncio.gather(*tasks)) print(time.time() - start)
到這裏基本python的協程改進歷史就說完了,下面就是對比goroutine與asyncio.
這裏python協程與go協程的區別有我之前寫的一個簡單對比,下面的一些東西是補充和聯想.
select、poll、epoll都是I/O複用的事件通知機制
它的做用是將大量的文件描述符託管給內核,內核將最底層的 I/O 狀態變化封裝成讀寫事件,這樣就避免了由程序員去主動輪詢狀態變化的重複工做,程序員將回調函數註冊到 epoll 的狀態上,當檢測到相對應文件描述符產生狀態變化時,就進行函數回調。select/poll因爲效率問題基本已被取代epoll和kqueue取代。 所謂 I/O 多路複用指的就是 select/poll/epoll 這一系列的多路選擇器:支持單一線程同時監聽多個文件描述符(I/O 事件),阻塞等待,並在其中某個文件描述符可讀寫時收到通知。 I/O 複用其實複用的不是 I/O 鏈接,而是複用線程,讓一個 thread of control 可以處理多個鏈接(I/O 事件)。select 是 epoll 以前 Linux 使用的 I/O 事件驅動技術。
python的eventloop網絡事件模型在asyncio庫中是能夠選擇的(asyncio is configured to use SelectorEventLoop on Unix and ProactorEventLoop on Windows.),eventloop經過不一樣平臺上的事件通知機制檢測事件是否可讀/可寫,是協程的心臟.
# 事件驅動, 讓全部以前註冊的callback運行起來 def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: # 監聽事件,觸發回調,推進協程運行下去 callback = event_key.data # data就是 on_connected,和 on_readable callback()
go中的select關鍵詞主要是檢測多個channel是否ready(便是否可讀或可寫),避免goroutine沒必要要的阻塞, 對比python中的select.select;
一個是針對channel狀態檢測的關鍵詞,另外一個是針對IO事件的狀態檢測的函數方法.都是檢查數據是否就緒,但一個是關鍵詞一個是事件模型.
select { case v1 := <-c1: fmt.Printf("received %v from c1\n", v1) case v2 := <-c2: fmt.Printf("received %v from c2\n", v1) case c3 <- 23: fmt.Printf("sent %v to c3\n", 23) default: fmt.Printf("no one was ready to communicate\n") }
select實現邏輯在源碼包src/runtime/select.go:selectgo()其僞代碼以下:
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) { //1. 鎖定scase語句中全部的channel //2. 按照隨機順序檢測scase中的channel是否ready // 2.1 若是case可讀,則讀取channel中數據,解鎖全部的channel,而後返回(case index, true) // 2.2 若是case可寫,則將數據寫入channel,解鎖全部的channel,而後返回(case index, false) // 2.3 全部case都未ready,則解鎖全部的channel,而後返回(default index, false) //3. 全部case都未ready,且沒有default語句 // 3.1 將當前協程加入到全部channel的等待隊列 // 3.2 當將協程轉入阻塞,等待被喚醒 //4. 喚醒後返回channel對應的case index // 4.1 若是是讀操做,解鎖全部的channel,而後返回(case index, true) // 4.2 若是是寫操做,解鎖全部的channel,而後返回(case index, false) }
既然同爲編程語言,確定也有網絡事件模型,我就猜想go對事件狀態的檢測中會與那些場景下用到關鍵詞select呢? 以及當I/O 事件發生以後,模型是經過什麼方式喚醒那些在I/O wait 的goroutine的?
在Go/src/net/http/server.go:Serve()中主要作的就是啓動http服務監聽請求,
能夠看到也有用到了select,追尋源碼當請求來到時就新建一個go協程去處理,處理過程主要在這裏Go/src/net/http/server.go:serve()能夠看到就是for循環等待數據傳輸過來.
再嘗試追蹤http.ListenAndServe,在Serve中select主要是監控請求是否完結或關閉,而在處理listen的分支中你會看到如Go/src/net/sock_posix.go
主要經過各類Syscall鏈接了系統底層經過for+switch+case不斷檢測鏈接IO類型,再經過/Go/src/net/dial.go:dialParallel()中的select檢測對應的chan中是否知足條件
在/Go/src/net/fd_unix.go和/Go/src/net/fd_windows.go,connect函數中一樣使用select檢測chan,
一番走讀,能夠看出select是做爲關鍵詞被抽象出來,用到了go中每一處須要檢查chan的地方.但仍是不能理解調用關係,再往下深追我找到了這些有用的資料:
視頻:Go 原生同步網絡模型解析 vs Multi-Reactors 異步網絡模型
Go netpoll I/O 多路複用構建原生網絡模型之源碼深度解析
Go 基於 I/O multiplexing 和 goroutine 構建了一個簡潔而高性能的原生網絡模型(基於Go的I/O 多路複用netpoll,其底層基於epoll/kqueue/iocp這些系統調用來作封裝的), 提供了 goroutine-per-connection 這樣簡單的網絡編程模式。 在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯,而那些調度和上下文切換的工做轉移到了 runtime 的 Go scheduler,讓它來負責調度 goroutines,從而極大地下降了程序員的心智負擔! 這個原生網絡模型不論從適用性仍是性能上都足以知足絕大部分的應用場景。 然而,在工程性上能作到如此高的普適性和兼容性,最終暴露給開發者提供接口/模式如此簡潔,其底層必然是基於很是複雜的封裝,作了不少取捨,也有可能放棄了一些『極致』的設計和理念。 Go netpoll 在不一樣的操做系統,其底層使用的 I/O 多路複用技術也不同,能夠從 Go 源碼目錄結構和對應代碼文件瞭解 Go 在不一樣平臺下的網絡 I/O 模式的實現。 好比,在 Linux 系統下基於 epoll,freeBSD 系統下基於 kqueue,以及 Windows 系統下基於 iocp。 當 I/O 事件發生以後,netpoll 是經過什麼方式喚醒那些在 I/O wait 的 goroutine 的?是經過 epoll_wait, 在 Go 源碼中的 src/runtime/netpoll_epoll.go文件中有一個 func netpoll(block bool) gList 方法,它會內部調用epoll_wait獲取就緒的 fd 列表, 並將每一個 fd 對應的 goroutine 添加到鏈表返回,go的runtime 掌握網絡I/O的控制權.
Go netpoll 核心
Go netpoll 經過在底層對 epoll/kqueue/iocp 的封裝,從而實現了使用同步編程模式達到異步執行的效果。 總結來講,全部的網絡操做都以網絡描述符 netFD 爲中心實現。netFD 與底層 PollDesc 結構綁定,當在一個 netFD 上讀寫遇到 EAGAIN 錯誤時, 就將當前 goroutine 存儲到這個 netFD 對應的 PollDesc 中,同時調用 gopark 把當前 goroutine 給 park 住,直到這個 netFD 上再次發生讀寫事件, 纔將此 goroutine 給 ready 激活從新運行。顯然,在底層通知 goroutine 再次發生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅動機制。
哦,原來go中爲了配合本身的runtime調度方式,在經常使用的事件通知機制epoll/kqueue基礎上封裝了本身的netpoll,這樣就能夠像GPM那樣調度多個協程,將多核資源利用起來.而python中的eventloop都是單線程的,主要是將回調和各類事件通知機制綁定並經過隊列進行循環調度,沒法使用多核資源.
題外話go也可使用eventloop:
var ch chan ElementType ch := make(chan int) ch <- value //寫入 value := <-ch //讀取
def step(self, future): # 管理fetch生成器: 第一次的激活/暫停後的恢復執行/以及配合set_result循環調用 try: # send會進入到coro執行, 即fetch, 直到下次yield # next_future 爲yield返回的對象,也就是下一次要調用的Future對象 next_future = self.coro.send(future.result) # __init__中的第一次step,將fetch運行到的82行的yield, # 返回EVENT_WRITE時的事件回調要用的future,而後等事件觸發,由select調用on_connected,進而繼續future中的回調 except StopIteration: return next_future.add_done_callback(self.step) # 這裏須要重點理解,爲下一次要調用的Future對象,註冊下一次的step,供on_readable調用 while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) # 可讀的狀況下,讀取4096個bytes暫存給Future,執行回調,使生成器繼續執行下去 selector.register(sock.fileno(), EVENT_READ, on_readable) # io讀事件 chunk = yield f # 返回f,並接受step中send進來的future.result值,也就是暫存的請求返回字符 selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo.remove(self.url) if not urls_todo: stopped = True break print("result:", self.response)
chunk = yield f,返回f並接受step中send進來的值,yield暫停子生成器函數的運行把cpu的使用權讓出去,對比chan等待其餘chan時處於等待中狀態(_Gwaiting),是否是有點 chan 的味道了.
子生成器中包含多個yield和帶緩存的chan,是否是也有類似呢?
python是單線程中調度多個協程,而go是多個進程中調度多個協程,感受yield和chan是有殊途同歸之妙的.
維特根斯坦說「在語言中顯示自身的東西, 咱們沒法用語言來表示它」, 這句話不太好理解, 請容許我作一個不負責任的類比。好比計算機編程, 邏輯至關於機器語言或者彙編語言, 反正是比較底層的那種;人的語言至關於高級編程語言, 相似java和python;咱們的生活就是軟件的圖形界面。若是你是一個工程師, 你必定是順着理解這件事的——機器語言必定是基礎啊, 它是一切得以運做絕對前提啊。維特根斯坦會說, 幼稚!我當年也是這麼想的他說, 必須倒過來理解。由於人和圖形界面的交互, 纔會有高級語言的各類安排, 纔會有機器語言的各類運做。爲何?由於人才是一切的尺度, 人這個主體和軟件界面產生交互模式(人和生活), 最終決定了你那些0和1的意義(語言和邏輯)。維特根斯坦那句話的意思是, 你從圖形界面的維度能解釋爲何這行代碼要這樣寫, 但你在這行代碼的維度解釋不了它爲何會被寫成這樣, 在人與圖形界面交互的過程當中, 這段字符承載的意義遠超過這段字符自己所顯示的所有, 代碼的意義在於使用, 「語言的意義也在於使用」, Meaning is use!
簡單理解, 維特根斯坦的整個邏輯是:底層原理能解釋表層現象, 但反過來卻不行。表層最多能描述底層。
好比, 人性能解釋商業爲何是那個樣子的, 但商業卻不能解釋人性爲何是那個樣子, 商業只能從它所在的側面描述人性是什麼樣的, 由於商業形式就是被人性塑造的。以前, 咱們覺得代碼是底層, 圖形是表層。其實, 圖形纔是底層, 代碼是表層, 這裏的意思是, 生活能解釋語言, 語言卻只能描述生活。語言妄圖解釋生活, 表層妄圖解釋底層的結果就是哲學的出現。
這樣就造就了一個可悲的事實,即人類對天然的認識永遠只能無限的接近真理, 卻永遠沒法探究到所謂的本源, 認識天然的過程其實都是在盲人摸象。
從實際出發, 不一樣問題用不一樣方法, 一個模型是否可靠, 看的歷來不是理論或模型是否高明, 檢驗真理的惟一標準只有一條, 就是實踐, 本身動手去嘗試證明。
深刻理解Python異步編程(上) # 十分期待後續的中與下.
python3.6異步IO包asyncio部分核心源碼思路梳理
以上不少內容都來自參考文檔的摘抄和本身的理解,若有錯誤,還望指正.