Python 開源異步併發框架的將來

呵呵,這個標題有點大,其實只是想從零開始介紹一下異步的基礎,以及 Python 開源異步併發框架的發展和互操做性。html

另外,這是我在 OSTC 2014 作的一個同題演講,幻燈片在這裏,歡迎拍磚。python

開源

Python 是開源的,介紹的這幾個框架 TwistedTornadoGeventtulip 也都是開源的,最後這個演講是在開源大會弄的,因此標題裏確定少不了開源。另外,個人 gevent3 項目也是開源的——貌似很多同窗被我起的極品名字給搞混了,特別說明一下,gevent3 雖然有跟 Gevent 同樣的接口外貌,但底層倒是 tulip 驅動的(考慮把名字改回 gulip 之類的);請區別於未來會支持 Python 3 的 Gevent 1.1。git

非阻塞

先上一段代碼。請原諒我用 Python 代碼充當僞代碼了,但 Python 的語法實在是太簡單了,忍不住啊。程序員

import socket
s = socket.socket()
s.connect(('www.google.com', 80))
print("We are connected to %s:%d" % s.getpeername())

這是很簡單的一個客戶端 TCP 鏈接程序。假如網絡情況不是很好,執行這段程序時,咱們頗有可能要等個幾秒鐘,才能看到 We are connected 的輸出字樣。github

對於這樣的代碼,咱們就能夠說程序阻塞在了 connect() 的調用上;而這樣的函數咱們叫作阻塞式的。web

那麼非阻塞呢?仍是看一段代碼。redis

import socket
s = socket.socket()
s.setblocking(0)

try:
    s.connect(('www.google.com', 80))
except socket.error as e:
    print(str(e))
    i = 0
    while True:
        try:
            print("We are connected to %s:%d" % s.getpeername())
            break
        except:
            print("Let's do some math while waiting: %d" % i)
            i += 1
else:
    print("We are connected to %s:%d" % s.getpeername())

這一下代碼就多了——可是並不複雜。express

首先看一開始的變化,多了一句 s.setblocking(0)。這是說,將這個 socket 對象變成非阻塞式的。這樣一來,接下來的許多本應阻塞的調用將不會阻塞。django

好比 connect()。非阻塞的 connect() 調用將會當即結束,而無論這個 TCP 鏈接是否真正創建了——若是 TCP 鏈接尚未完成握手,那麼 connect() 會拋出一個異常說「開始連了,彆着急一下子就好」;不然(應該沒有不然)就會「正常」地走 try...else 的路線。

抓到這個異常以後呢,咱們就能夠充分利用這段本來要阻塞的時間,在鏈接徹底創建以前作一些有意義的事情——好比數數。我這裏網絡條件還湊合,通常狀況下數到一萬多的時候就能跟 Google 連上了。

異步

能夠看得出來,阻塞和非阻塞是說函數調用的,調用了以後要等到底層完事兒了以後才能繼續的叫作阻塞;調用了以後,要麼當即返回,要麼當即拋異常,這就是非阻塞。

而與之形影不離的一對兒概念——同步和異步——則說的是一段程序的執行處理方式。通常狀況下,阻塞式的調用均可以叫作同步,但非阻塞式的調用不必定是異步的。怎麼講呢,咱們仍是來看幾個例子。

while server.running:
    request = server.receive()
    response = handle(request)
    server.send(response)

這片代碼片斷示意的是同步的處理方式。能夠看得出來,接收請求、處理請求、發送響應依次執行,前一個任務完成了纔會作下一個;最外面還有一個 while 循環,使之不斷地收請求發響應,且是發送完上一個響應以後纔會接收下一個請求。請注意,咱們並無看到 receive() 等函數的實現細節,他們在底層能夠是阻塞的,也能夠是非阻塞的,這都不會影響咱們看到的這片代碼片斷是同步的。

那麼異步的代碼看上去是什麼樣的呢?請容許我用 Twisted 風格的代碼來展現,由於異步的代碼太「扭曲」了:

while server.running:
    deferred = server.receive()
    deferred.addCallback(on_request)

def on_request(request):
    deferred = handle(request)
    deferred.addCallback(on_response)

def on_response(response):
    server.send(response)

讓我來大概地解釋一下。爲了實現異步,這裏的 receive()handle() 都必須是非阻塞的。在 Twisted 中非阻塞的函數會當即返回一個 Deferred 對象,經過給 Deferred 對象添加回調函數,咱們能夠實如今這件事情真正完成以後,執行回調函數中定義的接下來要作的事兒。

看到扭曲的程度了吧。先接收一個請求——等等,你不必定當即就能接收到。好吧,等到接收到了的時候(on_request),咱們把這個請求送去處理,而後——等等,處理不必定立刻能完成。那好吧,等處處理完成以後(on_response),咱們再把這個響應發送回去。說實話,我沒忍心寫,其實發送也不會當即完成……

雖然上面這段代碼示例有些過份,仍有一些能夠變得更簡潔的地方,可是這對於大型項目中異步代碼的描述並不失真。難道用所謂的異步框架寫代碼都會是這麼扭曲麼?

前面咱們說的異步只是異步編碼——從編寫代碼的方式上來判斷。而一般說的異步框架,每每還會展示給用戶一些同步的接口(後面還會提到),在框架內部,這些接口也都是用非阻塞的異步代碼來實現的。對於這樣的框架,咱們仍然叫他們異步框架——總不能叫非阻塞框架,或是同步框架吧。

另外,異步編碼也不必定就非要扭曲人性,仍是有不少項目能夠簡潔明瞭地編寫異步代碼的,只不過對於程序員的要求會比編寫同步代碼稍高一些罷了。

併發與並行

好了,讓咱們先把糾結的異步放下,來看看另外兩個容易混淆的概念。

估計您已經從視頻裏聽了我辦港澳通行證的慘痛經歷了,這裏就不重複了,但仍然用這個例子來解釋一下併發和並行的概念吧。

並行的概念着重於處理端,也就是辦理通行證的工做人員。有 5 個窗口開放,就意味着同一時間能夠有 5 個業務能夠獲得並行的處理。對於計算機來講,並行勢必要有多顆處理器,真正從物理上能夠並行地處理多個任務;單 CPU 用多線程實現的叫作時分複用——也許超線程除外。

相對於並行着重於處理端,併發的概念則是關於請求端,也就是關於用戶的。當咱們談及朝陽區出入境辦證大廳的併發量的時候,咱們是在說該大廳在某一時刻能容納的前來辦證的人數,最大併發量說白了就是大廳裏能站下多少人——包括正在辦的和排隊的。

包括排隊的?那往大廳外面使勁兒排唄,這併發量豈不是無限大了?

與併發一塊兒的還有很重要的一個概念,就是處理時間。若是一味追求併發量,勢必會致使處理時間的大幅上升,大量請求多半時間在排隊,這樣並不能算是一個高效的系統設計。因此在系統資源到達瓶頸的時候,也許限制併發量,拒絕一些請求也許是一個明智的選擇。

併發並非不關心處理端,只不過多核並行或者單核時分複用都能實現併發,並且在實踐中這兩種實現方法每每會同時使用。多核並行實現的併發,其任務調度主要由操做系統完成,咱們接下來着重關心一下單線程併發的任務調度問題。

事件驅動的單線程併發

只有一個線程,用阻塞調用是確定沒法實現併發的——除非把每次僅服務一個客戶叫作「併發量爲 1 的併發」。因此,咱們必然會用到非阻塞調用。

請回憶一下前面咱們演示非阻塞調用的那個例子,咱們在等待鏈接創建的過程當中,作了一些其餘的有意義的事情,一旦鏈接創建成功,咱們會接着以前作一些關於鏈接的事情——輸出對方的地址。如今咱們試着擴展這個例子,實現併發鏈接——咱們同時啓動 100 個 TCP 鏈接,任何一個鏈接成功了就當即輸出對方地址。一開始咱們能夠這麼寫:

import socket
sockets = {}
for i in range(100):
    s = socket.socket()
    sockets[s.fileno()] = s
    s.setblocking(0)
    try:
        s.connect(('www.google.com', 80))
    except:
        pass

咱們將這 100 個 socket 對象按照他們的文件描述符保存在了一個叫作 sockets 的字典裏,而且一一調用了非阻塞的 connect() 函數。

但是,接下來怎麼寫呢?難道要重複調用每個 socket 對象的 getpeername() 函數,直到他們都正確返回了爲止?CPU 消耗太大了吧。

操做系統給咱們提供了一些接口,專門用於這類問題的:select 及其升級版 epoll(Linux) 和 kqueue(*BSD 和 Mac OS X),他們一般也被統稱爲 select 函數。select 是一種阻塞調用,專門用於從一些文件描述符中,選出那些有新事件到達的描述符,其中事件包括可讀、可寫和出錯。換句話講呢,就是監視給出的 socket,任何一個有動靜了就當即返回有動靜的描述符。

好比前面這個例子裏,咱們但願在任何一個鏈接成功創建的時候,輸出該鏈接的目的地址。因而接下來就能夠這麼寫:

import select
while sockets:
    fds = select.select([], list(sockets.keys()), [])[1]
    for fd in fds:
        s = sockets.pop(fd)
        print("%d connected to %s:%d" % ((fd,) + s.getpeername()))

也就是說,每次循環,咱們都會從剩餘的鏈接中,選出一些可寫的 socket 對象——那意味着鏈接已經成功創建了,而後將他們的目標地址輸出出來。

這就是一個很簡單的事件驅動的異步併發了,雖然咱們只是建立了 100 個 TCP 鏈接,但咱們併發了,是事件驅動的了,並且咱們異步地調用了後續的操做——輸出目的地址。

異步併發不過如此,而已。

框架

只用 socketselect 來寫一個異步 web 服務器也行,只不過會出一兩條人命而已。雖然是開玩笑,可是咱們多數狀況下仍是會選擇使用一些現有的框架。

何謂框架呢,其實就是把上一小節的例子代碼給拆開,一部分是僅包含 www.google.comprint() 的所謂用戶代碼,另外一部分就是全部剩下的叫作框架的東西。好比這樣:

import socket

sockets = {}
for i in range(   ):
    s = 
    sockets[s.fileno()] = s
    s.setblocking(0)
    try:
        s.       (              )
    except:
        pass

import select
while sockets:
    fds = select.select([], list(sockets.keys()), [])[ ]
    for fd in fds:
        s = sockets.pop(fd)
             (            s.          )

固然這段代碼並非一個框架,由於它根本沒法運行。可是咱們能夠經過它看到一個異步框架應該有的東西:

  1. 用於建立與框架契合的、非阻塞的 I/O 對象的接口
  2. 有一個主循環,用戶能夠啓動它
  3. 用戶能夠在關心的事件發生時,執行本身的代碼

回調函數和 Tornado

讓咱們以 Tornado 爲例,來看一下最基本的異步框架是怎麼用的——雖然 Tornado 並不只限於此。

sock = socket.socket()
sock.setblocking(0)
sock.bind((「」, 80))
sock.listen(128)

def on_conn(fd, events):
    conn, address = sock.accept()
    conn.send(b’Hello’)

io_loop = ioloop.IOLoop.instance()
io_loop.add_handler(sock.fileno(), on_conn, io_loop.READ)
io_loop.start()

這是一個簡單的服務器程序,它會向每個連進來的客戶端發送一句問候。其中 add_handler() 的調用就是——我認爲—— Tornado 的經典用法,也就是註冊回調函數。當有鏈接進來的時候,Tornado 就會根據要求來調用 on_conn(),後者隨即會與客戶端鏈接並送上問候。

Twisted 和封裝……和回調函數

Twisted 裏是各類封裝,經過 Transport 將 socket 對象封裝的更隱蔽,經過 Protocol 來實現用戶協議的封裝,像這樣:

from twisted.internet import protocol, reactor

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

reactor.listenTCP(1234, EchoFactory())
reactor.run()

對於回調函數,Twisted 則發明了著名的 Deferred 用以實現事件源與回調函數的分離,其實本質上沒有區別,只是在寫法上略有不一樣,這裏就很少說了。

同步地異步

正如前面提到的,異步的編碼方式——不管是 Tornado 的回調函數,仍是 TwistedDeferred——想要用的出彩,須要程序員有相對較高的心理素質和職業修養。那若是能正常地、用同步的方式來編寫異步執行的代碼呢?

藉助 Python 的 generator 功能TwistedTornado 紛紛提供了這樣的功能。好比下面這一段 Twisted 的代碼(請關注開頭的修飾器和代碼中的 yield):

@defer.inlineCallbacks
def main(endpoint, username="alice", password=「secret」):
    endpoint = endpoints.clientFromString(reactor, strport)
    factory = protocol.Factory()
    factory.protocol = imap4.IMAP4Client
    try:
        client = yield endpoint.connect(factory)
        yield client.login(username, password)
        yield client.select('INBOX')
        info = yield client.fetchEnvelope(imap4.MessageSet(1))
        print 'First message subject:', info[1]['ENVELOPE'][1]
    except:
        print "IMAP4 client interaction failed"
        failure.Failure().printTraceback()
task.react(main, sys.argv[1:])

這裏的第一個 yield 中,endpoint.connect() 返回的是一個 Deferred 對象,其回調函數的參數纔是前面的 client 對象。經過 yieldinlineCallbacks 修飾器的配合,咱們就把回調函數和 main 函數揉在了一塊兒,後面那三個 yield 也是如此,這樣的代碼看上去是同步的,執行的底層實則是異步的。Tornado 也有相似的用法,這裏就很少說了。

神奇的 yield!在這裏到底發生了什麼事情呢?我管它叫作異步切換,具體的代碼能夠看 inlineCallbacks 的實現。簡單來講呢,yield 以前,connect() 在主循環裏註冊了一個關於鏈接創立的事件監聽,而後經過 yield 把事件的處理權交給了 inlineCallbacks,同時將當前函數的執行狀態掛起(yield 的功能,能夠把棧保存下來),切換到 inlineCallbaks 裏繼續執行,而 inlineCallbacks 則會返回至主循環,繼續執行別的異步任務,直至前述事件發生且主循環排到了該事件,主循環會調用 inlineCallbacks 裏的回調函數,後者會將以前掛起的執行狀態恢復,這樣 client 就被賦上了正確的值。

總的來看,在 yield 的時候,當前執行流程會被暫停以等待事件,別的執行流程會插進來執行,直至事件發生後,當前執行流程纔有可能恢復執行。這很是相似於操做系統裏面的任務調度,因此我管它叫作異步切換,只不過這種切換是主動進行的,而不是操做系統強制的。因此,若是你不 yield 交出執行權,別的執行流程永遠沒有辦法被執行到,這也是單線程異步併發的一個須要注意的點。另外,單線程異步併發須要有足夠的異步切換才能作到近似公平的排程,因此很是適合 I/O 密集型的運算,而 CPU 密集型的運算在這裏每每會遇到比較嚴重的問題。

隱式的異步切換

在寫單線程異步代碼的時候,切記不要混合調用底層會阻塞的代碼,由於那樣會阻塞整個線程,致使全部併發的處理時間增長,最終會致使嚴重的性能問題。若是有一些阻塞的、同步的遺留代碼,那該如何是好呢?答案是:把它們統一改爲非阻塞的,或者使用多線程/多進程來處理。但是,若是要改爲非阻塞的形式,那得加多少 yield 呀!

不要緊,還有隱式的異步切換呢。一般咱們把這種須要顯式地寫 yield 的代碼叫作顯式的異步切換,與之相對的就是隱式的異步切換。好比下面這段代碼,我說它有隱式的異步切換,您信嗎?

import socket
s = socket.socket()
s.connect(('www.google.com', 80))
print("We are connected to %s:%d" % s.getpeername())

這不就是文章一開頭的那個例子嘛。別急,若是在最前面加這麼兩句,狀況就徹底不同了:

from gevent import monkey
monkey.patch_all()

Gevent 就是隱式的異步切換的表明。經過所謂的 monkey patch,Gevent 把系統庫裏的 socket 等模塊,替換成了 Gevent 本身提供的相應的非阻塞模塊。這樣,上面的代碼就變成(底層)異步的了。考慮到 monkey patch 的侵入性,您也能夠考慮直接使用 Gevent 提供的模塊,好比這樣:

from gevent import socket

Gevent 這樣的隱式的異步切換有個好處很明顯,就是能夠很容易地將阻塞式的遺留代碼遷移到 Gevent 上來,而不須要額外修改大量代碼,這對於須要異步併發支持的許多大型現有項目來講,無疑是爲數很少的幾個選擇之一——好比說 Django

可是,有很多人也認爲,隱式的異步切換的代價太大——倒不是說它的性能有多差,而是這種寫法把異步切換隱藏的太深了,不知道何時就切換到別的地方去執行了。這樣帶來的直接問題就是——跟常規共享狀態的多線程編程同樣——咱們很難保證在一段程序的執行過程當中,某些本地狀態不會被別的代碼修改,再加上狀態同步的代價,隱式的異步切換並不被特別看好。若是非得要用,記得儘可能少共享狀態,多用隊列來實現信息傳遞,而後當心編碼,仔細檢查。

綠色的 Gevent

Gevent 之因此能實現隱式的異步切換,主要歸功於 GreenletGreenletStackless Python 的一個分項目,用於在標準 CPython 中實現微線程(也稱協程、綠色線程)。

Python 中的 Greenlet 跟常規線程相似,也是會在獨立的空間中執行一段代碼,也有本身獨立的棧空間。不一樣的是:

  1. Greenlet 並不啓動任何操做系統的線程,是綠色產品
  2. Greenlet 任務之間的調度須要每一個微線程裏的代碼本身顯式地實現

用官方的一個例子演示一下這兩個特色吧:

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34

def test2():
    print 56
    gr1.switch()
    print 78

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

這個例子裏一共有三個微線程,分別是 main(也就是最外層默認的主微線程,自動建立的)、gr1gr2。程序一直順序執行,直至最後一句 gr1.switch(),由 main 微線程切換至 gr1gr1 輸出 12 以後,又切換至 gr2;接着 gr2 輸出 56 後,又切換回 gr1 以前的切出點,繼續輸出 34;這是 gr1 結束了,系統會自動切換回 gr1 的父微線程——也就是 main 的最後一句 switch() 返回,至此整個程序結束。注意,78 並無機會被輸出。

Gevent 的主循環叫作 Hub,跑在一個單獨的 greenlet 裏。用戶的程序從 main greenlet 開始執行,直至第一個異步切換。此時,Gevent 會把當前微線程——也就是 main ——與異步事件作一個關聯,而後切換到 HubHub 因而開始運轉,當某些事件發生時,Gevent 就會切換到相應關聯的 greenlet 來執行,直至他們結束返回 Hub,或者主動切換回 Hub。好比 main 等待的事件發生了,Hub 就會切到 main 上執行——固然,若是這時 main 結束了,就不會像其餘 greenlet 同樣再返回 Hub 了。

因此,greenlet 和 generator、Deferred 同樣,其實都是用來實現回調封裝的一些工具,因此前面提到過的一些異步併發的注意事項,Gevent 也都適用。

互操做性存在的問題

多種框架的存在,說好聽了是百花齊放各顯神通、競爭纔有發展,說難聽了就是碎片化、選擇恐懼症和維護代價巨大。好比說,一樣是一個 Python 的 PostgreSQL 鏈接適配程序,有支持 Twistedtxpostgres,有支持 Tornadomomoko,還有 Gevent 須要的 psycogreen——有啥話咱不能一氣兒說完呢?若是上游的 psycopg 更新了,這麼多的適配器,是否是得要跟着更新哪。

再一個問題就是遺留代碼。若是一個項目一直在用 Twisted,有一天老闆拿着張光盤說給我把這個弄上去,打開一看全都是 .pyc 文件,木有源代碼——直接調用會有以前提到的阻塞主線程的問題,扔到線程池裏作又不甘心。若是能在 Twisted 裏用 Gevent 就行了(如今確實能夠,不過會替換 Twisted 的一部分)。

將來

asyncio 這個項目其實叫作 tulip,主要開發也都在那裏,由於要進 Python 標準庫了,因此才幾經周折選了 asyncio 這麼一個名字。asyncio 是 Python 做者的一個新項目,要求至少是 Python 3.3(手動安裝),Python 3.4 裏它就已是標準庫的一部分了。

之因此要求 Python 3.3,是由於 asyncio 的微線程依賴於 Python 3.3 的新語法:yield from。區別於 yieldyield from co 實現了相似於這樣的功能:

for x in co:
    yield x

這裏說「相似」,是由於實際狀況要比這複雜不少,但意思是同樣的:將內層迭代器的元素無縫地合併到外層的迭代器裏。有了這個,asyncio 就能夠很容易地作微線程的嵌套了——也就是在一個微線程裏面等待另外一個結束返回結果。

asyncio 做爲又一個異步併發框架,與其餘現有框架差異並不大:主循環相似於 Twisted 的 reactor,Future 對回調函數進行封裝相似於 Deferred,可選的微線程相似於 inlineCallbacks,基於 yield from 的顯式的異步切換相似於 yield,這裏就很少介紹了,總的來看很是像 Twisted。可是呢,它能進入標準庫,仍是有緣由的。

互操做性

asyncio 做爲參考實現,與其規格文檔 PEP 3156 是一塊兒作出來的,蟒爹在作的過程當中尤爲關注了互操做性。

好比 asyncio 的主循環就是能夠任意替換的,任何知足 asyncio 主循環接口要求的核心均可以被安裝上去。爲了作到這一點,PEP 3156 定義了嚴格的主循環接口,將 asyncio 的框架代碼部分與主循環核心徹底分離。這樣一來,許多現有框架加個殼就能夠支持 asyncio 了——不用改現有代碼,寫一個現有主循環接口到 asyncio 主循環接口的適配層,替換掉 asyncio 自帶的主循環,這樣 asyncio 的代碼就能夠跑在現有框架上面了。

另外一個方向也是行得通的。PEP 3156 一樣定義了豐富而清晰的用戶接口,咱們可使用這些接口來實現一個現有框架的主循環替代品,這樣就能夠在不替換 asyncio 已有主循環的前提下,將別的框架的代碼嫁接到 asyncio 上來。好比說個人 gevent3 就是這麼一個例子,我將 Gevent 中原有的 libev 代碼刪掉,用 asyncio 實現了一份 Gevent Hub,這樣,gevent 的代碼就能夠跑在 asyncio 框架上了。更使人興奮的是,若是 asyncio 使用的主循環核心又剛好是好比說 Twisted,那麼原先分別依賴 GeventTwisted 的代碼,如今就能夠跑在一塊兒了,甚至互相調用也是能夠的。

好比下面一段示例代碼就演示了三個框架的融合:

import asyncio
import gevent  ## gevent3
import redis
from gevent import socket
from redis import connection
from twisted.web import server, resource
from twisted.internet import reactor


asyncio.set_event_loop(some_twisted_wrapper)


class GreenInetConnection(connection.Connection):
    def _connect(self):
        #noinspection PyUnresolvedReferences
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(self.socket_timeout)
        sock.connect((self.host, self.port))
        return sock


class HelloResource(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        gevent.spawn(self.green_GET, request)
        return server.NOT_DONE_YET

    def green_GET(self, request):
        r = redis.StrictRedis(
            connection_pool=connection.ConnectionPool(
                connection_class=GreenInetConnection))
        numberRequests = r.incr("numberRequests")
        request.setHeader("content-type", "text/plain")
        request.write("I am request #" + str(numberRequests) + "\n")
        request.finish()


reactor.listenTCP(8080, server.Site(HelloResource()))
asyncio.run_forever()

代碼演示了一個簡單的 Twisted web 服務器,使用 Gevent 來處理邏輯,asyncio 則起到了牽線搭橋的做用。

雖然目前這段代碼還不能運行,可是我相信在不久的未來,這種程度的互操做性終將實現。

更新:gevent3 項目已更名爲 tulipcore(連接仍然有效),第一個 alpha 版本已經發布至 pypi.python.org。

相關文章
相關標籤/搜索