Python編程學習圈 3月5日算法
這兩天由於一點我的緣由寫了點很久沒碰的 Python ,其中涉及到「協程」編程,上次搞的時候,它仍是 Web 框架 tornado
特有的 feature
,如今已經有 async
、await
關鍵字支持了。思考了一下其實現,回顧了下這些年的演變,以爲還有點意思。編程
都是單線程,爲何原來低效率的代碼用了async
、await
加一些異步庫就變得效率高了?
若是在作基於 Python 的網絡或者 Web 開發時,對於這個問題曾感到疑惑,這篇文章嘗試給一個答案。api
0x00 開始以前首先,本文不是帶你瀏覽源代碼,而後對照原始代碼給你講 Python 標準的實現。相反,咱們會從實際問題出發,思考解決問題的方案,一步步體會解決方案的演進路徑,最重要的,但願能在過程當中得到知識系統性提高。⚠️ 本文僅是提供了一個獨立的思考方向,並未遵循歷史和現有實際具體的實現細節。其次,閱讀這篇文章須要你對 Python 比較熟悉,至少了解 Python 中的生成器 generator
的概念。緩存
這是性能的關鍵。但咱們這裏只解釋概念,其實現細節不是重點,這對咱們理解 Python 的協程已經足夠了,如已足夠了解,前進到 0x02
。首先,你要知道全部的網絡服務程序都是一個巨大的死循環,你的業務邏輯都在這個循環的某個時刻被調用:服務器
def handler(request):
# 處理請求
pass
# 你的 handler 運行在 while 循環中
while True:
# 獲取一個新請求
request = accept()
# 根據路由映射獲取到用戶寫的業務邏輯函數
handler = get_handler(request)
# 運行用戶的handler,處理請求
handler(request)
設想你的 Web 服務的某個 handler
,在接收到請求後須要一個 API 調用才能響應結果。對於最傳統的網絡應用,你的 API 請求發出去後在等待響應,此時程序中止運行,甚至新的請求也得在響應結束後才進得來。若是你依賴的 API 請求網絡丟包嚴重,響應特別慢呢?那應用的吞吐量將很是低。不少傳統 Web 服務器使用多線程技術解決這個問題:把 handler
的運行放到其餘線程上,每一個線程處理一個請求,本線程阻塞不影響新請求進入。這能必定程度上解決問題,但對於併發比較大的系統,過多線程調度會帶來很大的性能開銷。IO 多路複用能夠作到不使用線程解決問題,它是由操做系統內核提供的功能,能夠說專門爲這類場景而生。簡單來說,你的程序遇到網絡IO時,告訴操做系統幫你盯着,同時操做系統提供給你一個方法,讓你能夠隨時獲取到有哪些 IO 操做已經完成。就像這樣:網絡
# 操做系統的IO複用示例僞代碼
# 向操做系統IO註冊本身關注的IO操做的id和類型
io_register(io_id, io_type)
io_register(io_id, io_type)
# 獲取完成的IO操做
events = io_get_finished()
for (io_id, io_type) in events:
if io_type == READ:
data = read_data(io_id)
elif io_type == WRITE:
write_data(io_id,data)
把 IO 複用邏輯融合到咱們的服務器中,大概會像這樣:數據結構
call_backs = {}
def handler(req):
# do jobs here
io_register(io_id, io_type)
def call_back(result):
# 使用返回的result完成剩餘工做...
call_backs[io_id] = call_back
# 新的循環
while True:
# 獲取已經完成的io事件
events = io_get_finished()
for (io_id, io_type) in events:
if io_type == READ: # 讀取
data = read(io_id)
call_back = call_backs[io_id]
call_back(data)
else:
# 其餘類型io事件的處理
pass
# 獲取一個新請求
request = accept()
# 根據路由映射獲取到用戶寫的業務邏輯函數
handler = get_handler(request)
# 運行用戶的handler,處理請求
handler(request)
咱們的 handler
對於 IO 操做,註冊了回調就馬上返回,同時每次迭代都會對已完成的 IO 執行回調,網絡請求再也不阻塞整個服務器。上面的僞代碼僅便於理解,具體實現細節更復雜。並且就鏈接受新請求也是在從操做系統獲得監聽端口的 IO 事件後進行的。咱們若是把循環部分還有 call_backs
字典拆分到單獨模塊,就能獲得一個 EventLoop
,也就是 Python 標準庫 asyncio
包中提供的 ioloop
。多線程
着重看下咱們業務中常常寫的 handler
函數,在有獨立的 ioloop
後,它如今變成相似這樣:併發
def handler(request):
# 業務邏輯代碼...
# 須要執行一次 API 請求
def call_back(result):
# 使用 API 返回的result完成剩餘工做
print(result)
# 沒有io_call這個方法,這裏只是示意,表示註冊一個IO操做
asyncio.get_event_loop().io_call(api, call_back)
到這裏,性能問題已經解決了:咱們再也不須要多線程就能源源不斷接受新請求,並且不用care依賴的 API 響應有多慢。可是咱們也引入了一個新問題,原來流暢的業務邏輯代碼如今被拆成了兩部分,請求 API 以前的代碼還正常,請求 API 以後的代碼只能寫在回調函數裏面了。這裏咱們業務邏輯只有一個 API 調用,若是有多個 API ,再加上對 Redis 或者 MySQL 的調用(它們本質也是網絡請求),整個邏輯會被拆分的更散,這對業務開發是一筆負擔。對於有匿名函數的一些語言(沒錯就是JavaScript),還可能會引起所謂的「回調地獄」。接下來咱們想辦法解決這個問題。咱們很容易會想到:若是函數在運行到網絡 IO 操做處後可以暫停,完成後又能在斷點處喚醒就行了。若是你對 Python 的「生成器」熟悉,你應該會發現,它剛好具備這個功能:app
def example():
value = yield 2
print("get", value)
return value
g = example()
# 啓動生成器,咱們會獲得 2
got = g.send(None)
print(got) # 2
try:
# 再次啓動 會顯示 "get 4", 就是咱們傳入的值
got = g.send(got*2)
except StopIteration as e:
# 生成器運行完成,將會print(4),e.value 是生成器return的值
print(e.value)
函數中有 yield
關鍵字,調用函數將會獲得一個生成器,生成器一個關鍵的方法 send()
能夠跟生成器交互。g.send(None)
會運行生成器內代碼直到遇到 yield
,並返回其後的對象,也就是 2
,生成器代碼就停在這裏了,直到咱們再次執行 g.send(got*2)
,會把 2*2
也就是 4
賦值給yield
前面的變量 value
,而後繼續運行生成器代碼。yield
在這裏就像一扇門,能夠把一件東西從這裏送出去,也能夠把另外一件東西拿進來。若是 send
讓生成器運行到下一個 yield
前就結束了,send
調用會引起一個特殊的異常StopIteration
,這個異常自帶一個屬性 value
,爲生成器 return
的值。若是咱們把咱們的 handler
用 yield
關鍵字轉換成一個生成器,運行它來把 IO 操做的具體內容返回,IO 完成後的回調函數中把 IO 結果放回並恢復生成器運行,那就解決了業務代碼不流暢的問題了:
def handler(request):
# 業務邏輯代碼...
# 須要執行一次 API 請求,直接把 IO 請求信息yield出去
result = yield io_info
# 使用 API 返回的result完成剩餘工做
print(result)
# 這個函數註冊到ioloop中,用來當有新請求的時候回調
def on_request(request):
# 根據路由映射獲取到用戶寫的業務邏輯函數
handler = get_handler(request)
g = handler(request)
# 首次啓動得到io_info
io_info = g.send(None)
# io完成回調函數
def call_back(result):
# 從新啓動生成器
g.send(result)
asyncio.get_event_loop().io_call(io_info, call_back)
上面的例子,用戶寫的 handler
代碼已經不會被打散到 callback
中,on_request
函數使用 callback
和 ioloop
交互,但它會被實如今 Web 框架中,對用戶不可見。上面代碼足以給咱們提供用生成器消滅的 callback
的啓發,但侷限性有兩點:
咱們來看一個更復雜的例子:其中 request
執行真正的 IO,func1
、func2
僅調用。顯然咱們的代碼只能寫成這樣:
def func1():
ret = yield request("http://test.com/foo")
ret = yield func2(ret)
return ret
def func2(data):
result = yield request("http://test.com/"+data)
return result
def request(url):
# 這裏模擬返回一個io操做,包含io操做的全部信息,這裏用字符串簡化代替
result = yield "iojob of %s" % url
return result
對於 request
,咱們把 IO 操做經過 yield
暴露給框架。對於 func1
和 func2
,調用 request
顯然也要加 yield
關鍵字,不然 request
調用返回一個生成器後不會暫停,繼續執行後續邏輯顯然會出錯。這基本就是咱們在沒有 yield from
、aysnc
、await
時代,在 tornado
框架中寫異步代碼的樣子。要運行整個調用棧,大概流程以下:
func1()
獲得生成器send(None)
啓動它獲得會獲得 request("http://test.com/foo")
的結果,仍是生成器對象send(None)
啓動由 request()
產生的生成器,會獲得 IO 操做,由框架註冊到 ioloop
並指定回調request
生成器,生成器會走到 return
語句結束request
生成器的返回值,將上一層 func1
喚醒,同時又獲得 func2()
生成器對算法和數據結構熟悉的朋友遇到這種前進後退的遍歷邏輯,能夠遞歸也能夠用棧,由於遞歸使用生成器還作不到,咱們可使用棧,其實這就是「調用棧」一詞的由來。藉助棧,咱們能夠把整個調用鏈上串聯的全部生成器對錶現爲一個生成器,對其不斷 send
就能不斷獲得全部 IO 操做信息並推進調用鏈前進,實現方法以下:
send
,若是獲得生成器就入棧並進入下一輪迭代yield
出來,讓框架註冊到 ioloop
若是實現出來,代碼不長但信息量比較大。它把整個調用鏈對外變成一個生成器,對其調用 send
,就能整個調用鏈中的 IO,完成這些 IO,繼續推進調用鏈內的邏輯執行,直到總體邏輯結束:
def wrapper(gen):
# 第一層調用 入棧
stack = Stack()
stack.push(gen)
# 開始逐層調用
while True:
# 獲取棧頂元素
item = stack.peak()
result = None
# 生成器
if isgenerator(item):
try:
# 嘗試獲取下層調用併入棧
child = item.send(result)
stack.push(child)
# result 使用事後就還原爲None
result = None
# 入棧後直接進入下次循環,繼續向下探索
continue
except StopIteration as e:
# 若是本身運行結束了,就暫存result,下一步讓本身出棧
result = e.value
else: # IO 操做
# 遇到了 IO 操做,yield 出去,IO 完成後會被用 IO 結果喚醒並暫存到 result
result = yield item
# 走到這裏則本層已經執行完畢,出棧,下次迭代將是調用鏈上一層
stack.pop()
# 沒有上一層的話,那整個調用鏈都執行完成了,return
if stack.empty():
print("finished")
return result
這多是最複雜的部分,若是看起來吃力的話,其實只要明白,對於上面示例中的調用鏈,它能夠實現的效果以下就行了:
w = wrapper(func1())
# 將會獲得 "iojob of http://test.com/foo"
w.send(None)
# 上個iojob foo 完成後的結果"bar"傳入,繼續運行,獲得 "iojob of http://test.com/bar"
w.send("bar")
# 上個iojob bar 完成後的結構"barz"傳入,繼續運行,結束。
w.send("barz")
有了這部分之後框架再加上配套的代碼:
# 維護一個就緒列表,存放全部完成的IO事件,格式爲(wrapper,result)
ready = []
def on_request(request):
handler = get_handler(request)
# 使用 wrapper 包裝後,能夠只經過 send 處理 IO 了
g = wrapper(func1())
# 把開始狀態直接視爲結果爲None的就緒狀態
ready.append((g, None))
# 讓ioloop每輪循環都執行此函數,用來處理的就緒的IO
def process_ready(self):
def call_back(g, result):
ready.append((g, result))
# 遍歷全部已經就緒生成器,將其向下推動
for g, result in self.ready:
# 用result喚醒生成器,並獲得下一個io操做
io_job = g.send(result)
# 註冊io操做 完成後把生成器加入就緒列表,等待下一輪處理
asyncio.get_event_loop().io_call(
io_job, lambda result: ready.append((g, result)
這裏核心思想是維護一個就緒列表,ioloop
每輪迭代都來掃一遍,推進就緒的狀態的生成器向下運行,並把新的 IO 操做註冊,IO 完成後再次加入就緒,通過幾輪 ioloop
的迭代一個 handler
最終會被執行完成。至此,咱們使用生成器寫法寫業務邏輯已經能夠正常運行。
若是到這裏能讀懂, Python 的協程原理基本就明白了。咱們已經實現了一個微型的協程框架,標準庫的實現細節跟這裏看起來大不同,但具體的思想是一致的。咱們的協程框架有一個限制,咱們只能把 IO 操做異步化,雖然在網絡編程和 Web 編程的世界裏,阻塞的基本只有 IO 操做,但也有一些例外,好比我想讓當前操做 sleep
幾秒,用 time.sleep()
又會讓整個線程阻塞住,就須要特殊實現。再好比,能夠把一些 CPU 密集的操做經過多線程異步化,讓另外一個線程通知事件已經完成後再執行後續。因此,協程最好能與網絡解耦開,讓等待網絡IO只是其中一種場景,提升擴展性。Python 官方的解決方案是讓用戶本身處理阻塞代碼,至因而向 ioloop
來註冊 IO 事件仍是開一個線程徹底由你本身,並提供了一個標準「佔位符」Future
,表示他的結果等到將來纔會有,其部分原型以下:
class Future:
# 設置結果
def set_result(result): pass
# 獲取結果
def result(): pass
# 表示這個future對象是否是已經被設置過結果了
def done(): pass
# 設置在他被設置結果時應該執行的回調函數,能夠設置多個
def add_done_callback(callback): pass
咱們的稍加改動就能支持 Future
,讓擴展性變得更強。對於用戶代碼的中的網絡請求函數 request
:
# 如今 request 函數,不是生成器,它返回future
def request(url):
# future 理解爲佔位符
fut = Future()
def callback(result):
# 當網絡IO完成回調的時候給佔位符賦值
fut.set_result(result)
asyncio.get_event_loop().io_call(url, callback)
# 返回佔位符
return future
如今,request
再也不是一個生成器,而是直接返回 future
。而對於位於框架中處理就緒列表的函數:
def process_ready(self):
def callback(fut):
# future被設置結果會被放入就緒列表
ready.append((g, fut.result()))
# 遍歷全部已經就緒生成器,將其向下推動
for g, result in self.ready:
# 用result喚醒生成器,獲得的再也不是io操做,而是future
fut = g.send(result)
# future被設置結果的時候會調用callback
fut.add_done_callback(callback)
0x05 發展和變革
許多年前用 tornado
的時候,大概只有一個 yield
關鍵字可用,協程要想實現,就是這麼個思路,甚至 yield
關鍵字和 return
關鍵字不能一個函數裏面出現,你要想在生成器運行完後返回一個值,須要手動 raise
一個異常,雖然效果跟如今 return
同樣,但寫起來仍是很彆扭,不優雅。後來有了 yield from
表達式。它能夠作什麼?通俗地說,它就是作了上面那個生成器 wrapper
所作的事:經過棧實現調用鏈遍歷的 ,它是 wrapper
邏輯的語法糖。有了它,同一個例子你能夠這麼寫:
def func1():
# 注意 yield from
ret = yield from request("http://test.com/foo")
# 注意 yield from
ret = yield from func2(ret)
return ret
def func2(data):
# 注意 yield from
result = yield from request("http://test.com/"+data)
return result
# 如今 request 函數,不是生成器,它返回future
def request(url):
# 同上基於future實現的request
而後你就再也不須要那個燒腦的 wrapper
函數了:
g = func1()
# 返回第一個請求的 future
g.send(None)
# 繼續運行,自動進入func2 並獲得第它裏面的那個future
g.send("bar")
# 繼續運行,完成調用鏈剩餘邏輯,拋出StopIteration異常
g.send("barz")
yield from
直接打通了整個調用鏈,已是很大的進步了,可是用來異步編程看着仍是彆扭,其餘語言都有專門的協程 async
、await
關鍵字了,直到再後來的版本把這些內容用專用的 async
、await
關鍵字包裝,才成爲今天比較優雅的樣子。
總的來講, Python 的原生的協程從兩方面實現:
callback
代碼變成同步代碼,減小業務編寫困難有生成器這種對象的語言,其 IO 協程實現大抵如此,JavaScript 協程的演進基本如出一轍,關鍵字相同,Future
類比 Promise
本質相同。可是對於以協程出名的 Go 的協程實現跟這個就不一樣了,並不顯式地基於生成器。若是類比的話,能夠 Python 的 gevent
算做一類,都是本身實現 runtime
,並 patch
掉系統調用接入本身的 runtime
,本身來調度協程,gevent
專一於網絡相關,基於網絡 IO 調度,比較簡單,而 Go 實現了完善的多核支持,調度更加複雜和完善,並且創造了基於 channel
新編程範式。