48.深刻理解Tornado——一個異步web服務器

原文地址:http://golubenco.org/?p=16 html

這篇文章的目的在於對Tornado這個異步服務器軟件的底層進行一番探索。我採用自底向上的方式進行介紹,從輪訓開始,向上一直到應用層,指出我認爲有趣的部分。 python

因此,若是你有打算要閱讀Tornado這個web框架的源碼,又或者是你對一個異步web服務器是如何工做的感興趣,我能夠在這成爲你的指導。 linux

經過閱讀這篇文章,你將能夠: ios

  • 本身寫一個Comet架構程序的服務器端部分,即便你是從拷貝別人的代碼開始。
  • 若是你想在Tornado框架上作開發,經過這篇文章你將更好的理解Tornado web框架。
  • Tornado和Twisted的爭論上,你將更有看法。

介紹

假設你還不知道Tornado是什麼也不知道爲何應該對它感興趣,那我將用簡短的話來介紹Tornado這個項目。若是你已經對它有了興趣,你能夠跳去看下一節內容。 git

Tornado是一個用Python編寫的異步HTTP服務器,同時也是一個web開發框架。該框架服務於FriendFeed網站,最近Facebook也在使用它。FriendFeed網站有用戶數多和應用實時性強的特色,因此性能和可擴展性是很受重視的。因爲如今它是開源的了(這得歸功於Facebook),咱們能夠完全的對它是如何工做的一探究竟。 github

我以爲對非阻塞式IO (nonblocking IO) 和異步IO (asynchronous IO  AIO)頗有必要談一談。若是你已經徹底知道他們是什麼了,能夠跳去看下一節。我儘量的使用一些例子來講明它們是什麼。 web

讓咱們假設你正在寫一個須要請求一些來自其餘服務器上的數據(好比數據庫服務,再好比新浪微博的open api)的應用程序,而後呢這些請求將花費一個比較長的時間,假設須要花費5秒鐘。大多數的web開發框架中處理請求的代碼大概長這樣: 數據庫

def  handler_request(self, request):
    answ 
=  self.remote_server.query(request)  #  this takes 5 seconds
    request.write_response(answ)

若是這些代碼運行在單個線程中,你的服務器只能每5秒接收一個客戶端的請求。在這5秒鐘的時間裏,服務器不能幹其餘任何事情,因此,你的服務效率是每秒0.2個請求,哦,這太糟糕了。  後端

固然,沒人那麼天真,大部分服務器會使用多線程技術來讓服務器一次接收多個客戶端的請求,咱們假設你有20個線程,你將在性能上得到20倍的提升, 因此如今你的服務器效率是每秒接受4個請求,但這仍是過低了,固然,你能夠經過不斷地提升線程的數量來解決這個問題,可是,線程在內存和調度方面的開銷是 昂貴的,我懷疑若是你使用這種提升線程數量的方式將永遠不可能達到每秒100個請求的效率。 api

若是使用AIO,達到每秒上千個請求的效率是很是輕鬆的事情。服務器請求處理的代碼將被改爲這樣:

def  handler_request(self, request):
    self.remote_server.query_async(request, self.response_received)     
def  response_received(self, request, answ):     #  this is called 5 seconds later
    request.write(answ)

AIO的思想是當咱們在等待結果的時候不阻塞,轉而咱們給框架一個回調函數做爲參數,讓框架在有結果的時候經過回調函數通知咱們。這樣,服務器就能夠被解放去接受其餘客戶端的請求了。

然而這也是AIO不太好的地方:代碼有點不直觀了。還有,若是你使用像Tornado這樣的單線程AIO服務器軟件,你須要時刻當心不要去阻塞什麼,由於全部本該在當前返回的請求都會像上述處理那樣被延遲返回。

關於異步IO,比當前這篇過度簡單的介紹更好的學習資料請看 The C10K problem

源代碼

該項目由github託管,你能夠經過以下命令得到,雖然經過閱讀這篇文章你也能夠不須要它是吧。

git clone git: // github.com / facebook / tornado.git

在tornado的子目錄中,每一個模塊都應該有一個.py文件,你能夠經過檢查他們來判斷你是否從已經從代碼倉庫中完整的遷出了項目。在每一個源代碼 的文件中,你均可以發現至少一個大段落的用來解釋該模塊的doc string,doc string中給出了一到兩個關於如何使用該模塊的例子。

IOLoop模塊

讓咱們經過查看ioloop.py文件直接進入服務器的核心。這個模塊是異步機制的核心。它包含了一系列已經打開的文件描述符(譯者:也就是文件指針)和每一個描述符的處理器(handlers)。它的功能是選擇那些已經準備好讀寫的文件描述符,而後調用它們各自的處理器(一種IO多路複用的實現,其實就是socket衆多IO模型中的select模型,在Java中就是NIO,譯者注)。

能夠經過調用add_handler()方法將一個socket加入IO循環中:

def  add_handler(self, fd, handler, events):
    
""" Registers the given handler to receive the given events for fd. """
    self._handlers[fd] 
=  handler
    self._impl.register(fd, events 
|  self.ERROR)

_handlers這個字典類型的變量保存着文件描述符(其實就是socket,譯者注)到當該文件描述符準備好時須要調用的方法的映射(在Tornado中,該方法被稱爲處理器)。而後,文件描述符被註冊到epoll(unix中的一種IO輪詢機制,貌似,譯者注)列表中。Tornado關心三種類型的事件(指發生在文件描述上的事件,譯者注):READ,WRITE 和 ERROR。正如你所見,ERROR是默認爲你自動添加的。

self._impl是select.epoll()selet.select()二者中的一個。咱們稍後將看到Tornado是如何在它們之間進行選擇的。

如今讓咱們來看看實際的主循環,不知何故,這段代碼被放在了start()方法中:

複製代碼
def start(self):
    """Starts the I/O loop.
    The loop will run until one of the I/O handlers calls stop(), which
    will make the loop stop after the current event iteration completes.
    """
    self._running = True
    while True:
    [ ... ]
        if not self._running:
            break
        [ ... ]
        try:
            event_pairs = self._impl.poll(poll_timeout)
        except Exception, e:
            if e.args == (4, "Interrupted system call"):
                logging.warning("Interrupted system call", exc_info=1)
                continue
            else:
                raise
        # Pop one fd at a time from the set of pending fds and run
        # its handler. Since that handler may perform actions on
        # other file descriptors, there may be reentrant calls to
        # this IOLoop that update self._events
        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            try:
                self._handlers[fd](fd, events)
            except KeyboardInterrupt:
                raise
            except OSError, e:
                if e[0] == errno.EPIPE:
                    # Happens when the client closes the connection
                    pass
                else:
                    logging.error("Exception in I/O handler for fd %d",
                                  fd, exc_info=True)
            except:
                logging.error("Exception in I/O handler for fd %d",
                              fd, exc_info=True)
複製代碼

poll()方法返回一個形如(fd: events)的鍵值對,並賦值給event_pairs變量。因爲當一個信號在任何一個事件發生前到來時,C函數庫中的poll()方法會返回 EINTR(實際是一個值爲4的數值),因此"Interrupted system call"這個特殊的異常須要被捕獲。更詳細的請查看man poll

在內部的while循環中,event_pairs中的內容被一個一個的取出,而後相應的處理器會被調用。pipe 異常在這裏默認不進行處理。爲了讓這個類適應更通常的狀況,在http處理器中處理這個異常是一個更好的方案,可是選擇如今這樣處理或許是由於更容易一些。

註釋中解釋了爲何使用字典的popitem()方法,而不是使用更廣泛一點的下面這種作法(指使用迭代,譯者注):

for  fd, events  in  self._events.items():

緣由很簡單,在主循環期間,這個_events字典變量可能會被處理器所修改。好比remove_handler()處理器。這個方法把fd(即文件描述符,譯者注)從_events字典中取出(extracts,意思是取出並從_events中刪除,譯者注),因此即便fd被選擇到了,它的處理器也不會被調用(做 者的意思是,若是使用for迭代循環_events,那麼在迭代期間_events就不能被修改,不然會產生不可預計的錯誤,好比,明明調用了 remove_handler()方法刪除了某個<fd, handler>鍵值對,可是該handler仍是被調用了,譯者注)。

(意義不大的)循環結束技巧

怎麼讓這個主循環中止是頗有技巧性的。self._running變量被用來在運行時從主循環中跳出,處理器能夠經過調用stop()方法把它設置 爲False。一般狀況下,這就能讓主循環中止了,可是stop()方法還能被一個信號處理器所調用,因此,若是1)主循環正阻塞在poll()方法 處,2)服務端沒有接收到任何來自客戶端的請求3)信號沒有被OS投遞到正確的線程中,你將不得不等待poll()方法出現超時狀況後纔會返回。考慮到這 些狀況並不時常發生,還有poll()方法的默認超時時間只不過是0.2秒,因此這種讓主循環中止的方式還算過得去。

但無論怎樣,Tornado的開發者爲了讓主循環中止,仍是額外的建立了一個沒有名字的管道和對應的處理器,並把管道的一端放在了輪詢文件描述符列表中。當須要中止時,在管道的另外一端隨便寫點什麼,這能高效率的(意思是立刻,譯者注)喚醒主循環在poll()方法處的阻塞(貌似Java NIO的Windows實現就用了這種方法,譯者注)。這裏節選了一些代碼片斷:

複製代碼
def __init__(self, impl=None):
    [...]
    # Create a pipe that we send bogus data to when we want to wake
    # the I/O loop when it is idle
    r, w = os.pipe()
    self._set_nonblocking(r)
    self._set_nonblocking(w)
    self._waker_reader = os.fdopen(r, "r", 0)
    self._waker_writer = os.fdopen(w, "w", 0)
    self.add_handler(r, self._read_waker, self.WRITE)
def _wake(self):
    try:
        self._waker_writer.write("x")
    except IOError:
        pass
複製代碼

實際上,上述代碼中存在一個bug:那個只讀文件描述符r,雖然是用來讀的,但在註冊時卻附加上了WRITE類型的事件,這將致使該註冊實際不會被響應。正如我先前所說的,用不用專門找個方法其實沒什麼的,因此我對他們沒有發現這個方法不起做用的事實並不感到驚訝。我在mail list中報告了這個狀況,可是還沒有收到答覆。

定時器

另一個在IOLoop模塊中頗有特色的設計是對定時器的簡單實現。一系列的定時器會被以是否過時的形式來維護和保存,這用到了python的bisect模塊:

def add_timeout(self, deadline, callback):
    """Calls the given callback at the time deadline from the I/O loop."""
    timeout = _Timeout(deadline, callback)
    bisect.insort(self._timeouts, timeout)
    return timeout

在主循環中,全部過時了的定時器的回調會按照過時的順序被觸發。poll()方法中的超時時間會動態的進行調整,調整的結果就是若是沒有新的客戶端請求,那麼下一個定時器就好像沒有延遲同樣的被觸發(意思是若是沒有新的客戶端的請求,poll()方法將被阻塞直到超時,這個超時時間的設定會根據下一個定時器與當前時間之間的間隔進行調整,調整後,超時的時間會等同於距離下一個定時器被觸發的時間,這樣在poll()阻塞完後,下一個定時器恰好過時,譯者注)。

選擇select方案

讓咱們如今快速的看一下poll和select這兩種select方案的實現代碼。Python已經在版本2.6的標準庫中支持了epoll,你可 以經過在select模塊上使用hasattr()方法檢測當前Python是否支持epoll。若是python版本小於2.6,Tornado將用它 本身的基於C的epoll模塊。你能夠在tornado/epoll.c文件中找到它源代碼。若是最後這也不行(由於epoll不是每一個Linux都有 的),它將回退到selec._Select並把_EPoll類包裝成和select.epoll同樣的api接口。在你作性能測試以前,請肯定你能使用 epoll,由於select在有大量文件描述符狀況下的效率很是低。

複製代碼
# Choose a poll implementation. Use epoll if it is available, fall back to
# select() for non-Linux platforms
if hasattr(select, "epoll"):
    # Python 2.6+ on Linux
    _poll = select.epoll
else:
    try:
        # Linux systems with our C module installed
        import epoll
        _poll = _EPoll
    except:
        # All other systems
        import sys
        if "linux" in sys.platform:
            logging.warning("epoll module not found; using select()")
        _poll = _Select
複製代碼

經過上述閱讀,咱們的介紹已經涵蓋了大部分IOLoop模塊。正如廣告中介紹的那樣,它是一段優雅而又簡單的代碼。

從sockets到流

讓咱們來看看IOStream模塊。它的目的是提供一個對非阻塞式sockets的輕量級抽象,它提供了三個方法:

  • read_until(),從socket中讀取直到遇到指定的字符串。這爲在讀取HTTP頭時遇到空行分隔符自動中止提供了方便。
  • read_bytes(),從socket中讀取指定數量的字節。這爲讀取HTTP消息的body部分提供了方便。
  • write(),將指定的buffer寫入socket並持續監測直到這個buffer被髮送。
全部上述的方法均可以經過異步方式在它們完成時觸發回調函數。

write()方法提供了將調用者提供的數據加以緩衝直到IOLoop調用了它的(指write方法的,譯者注)處理器的功能,由於到那時候就說明socket已經爲寫數據作好了準備:

複製代碼
def write(self, data, callback=None):
    """Write the given data to this stream.
    If callback is given, we call it when all of the buffered write
    data has been successfully written to the stream. If there was
    previously buffered write data and an old write callback, that
    callback is simply overwritten with this new callback.
    """
    self._check_closed()
    self._write_buffer += data
    self._add_io_state(self.io_loop.WRITE)
    self._write_callback = callback
複製代碼

該方法只是用socket.send()來處理WRITE類型的事件,直到EWOULDBLOCK異常發生或者buffer被髮送完畢。

讀數據的方法和上述過程正好相反。讀事件的處理器持續讀取數據直到緩衝區被填滿爲止。這就意味着要麼讀取指定數量的字節(若是調用的是read_bytes()),要麼讀取的內容中包含了指定的分隔符(若是調用的是read_util()):

複製代碼
def _handle_read(self):
    try:
        chunk = self.socket.recv(self.read_chunk_size)
    except socket.error, e:
        if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
            return
        else:
            logging.warning("Read error on %d: %s",
                            self.socket.fileno(), e)
            self.close()
            return
    if not chunk:
        self.close()
        return
    self._read_buffer += chunk
    if len(self._read_buffer) >= self.max_buffer_size:
        logging.error("Reached maximum read buffer size")
        self.close()
        return
    if self._read_bytes:
        if len(self._read_buffer) >= self._read_bytes:
            num_bytes = self._read_bytes
            callback = self._read_callback
            self._read_callback = None
            self._read_bytes = None
            callback(self._consume(num_bytes))
    elif self._read_delimiter:
        loc = self._read_buffer.find(self._read_delimiter)
        if loc != -1:
            callback = self._read_callback
            delimiter_len = len(self._read_delimiter)
            self._read_callback = None
            self._read_delimiter = None
            callback(self._consume(loc + delimiter_len))
複製代碼

以下所示的_consume方法是爲了確保在要求的返回值中不會包含多餘的來自流的數據,而且保證後續的讀操做會從當前字節的下一個字節開始(先將流中的數據讀到self.read_buffer中,而後根據要求進行切割,返回切割掉的數據,保留切割後的數據供下一次的讀取,譯者注):

def _consume(self, loc):
    result = self._read_buffer[:loc]
    self._read_buffer = self._read_buffer[loc:]
    return result

還值得注意的是在上述_handle_read()方法中read buffer的上限——self.max_buffer_size。默認值是100MB,這彷佛對我來講是有點大了。舉個例子,若是一個攻擊者和服務端建 立了100個鏈接,並持續發送不帶頭結束分隔符的頭信息,那麼Tornado須要10GB的內存來處理這些請求。即便內存ok,這種數量級數據的複製操做 (好比像上述_consume()方法中的代碼)極可能使服務器超負荷。咱們還注意到在每次迭代中_handle_read()方法是如何在這個 buffer中搜索分隔符的,因此若是攻擊者以小塊形式發送大量的數據,服務端不得不作不少次搜索工做。歸根結底,你應該想要將這個參數和諧掉,除非你真 的很但願那樣(Bottom of line, you might want to tune this parameter unless you really expect requests that big 不大明白怎麼翻譯,譯者注)而且你有足夠的硬件條件。

HTTP 服務器

有了IOLoop模塊和IOStream模塊的幫助,寫一個異步的HTTP服務器只差一步之遙,這一步就在httpserver.py中完成。

HTTPServer類它本身只負責處理將接收到的新鏈接的socket添加到IOLoop中。該監聽型的socket本身也是IOLoop的一部分,正如在listen()方法中見到的那樣:

複製代碼
def listen(self, port, address=""):
    assert not self._socket
    self._socket = socket.(socket.AF_INET, socket.SOCK_STREAM, 0)
    flags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD)
    flags |= fcntl.FD_CLOEXEC
    fcntl.fcntl(self._socket.fileno(), fcntl.F_SETFD, flags)
    self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self._socket.setblocking(0)
    self._socket.bind((address, port))
    self._socket.listen(128)
    self.io_loop.add_handler(self._socket.fileno(), self._handle_events,
                             self.io_loop.READ)


複製代碼

除了綁定給定的地址和端口外,上述代碼還設置了"close on exec"和"reuse address"這兩個標誌位。前者在應用程序建立子進程的時候特別有用。在這種狀況下,咱們不想讓套接字保持打開的狀態(任何設置了"close on exec"標誌位的文件描述符,都不能被使用exec函數方式建立的子進程讀寫,由於該文件描述符在exec函數調用前就會被自動釋放,譯者注)。後者用來避免在服務器重啓的時候發生「該地址以被使用」這種錯誤時頗有用。

正如你所見到的,後備鏈接所容許的最大數目是128(注意,listen方法並非你想象中的「開始在128端口上監聽」的意思,譯者注)。這意味着若是有128個鏈接正在等待被accept,那麼直到服務器有時間將前面128個鏈接中的某幾個accept了,新的鏈接都將被拒絕。我建議你在作性能測試的時候將該參數調高,由於當新的鏈接被拋棄的時候將直接影響你作測試的準確性。

在上述代碼中註冊的_handle_events()處理器用來accept新鏈接,並建立相關的IOStream對象和初始化一個HTTPConnection對象,HTTPConnection對象負責處理剩下的交互部分:

複製代碼
def _handle_events(self, fd, events):
    while True:
        try:
            connection, address = self._socket.accept()
        except socket.error, e:
            if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                return
            raise
        try:
            stream = iostream.IOStream(connection, io_loop=self.io_loop)
            HTTPConnection(stream, address, self.request_callback,
                           self.no_keep_alive, self.xheaders)
        except:
            logging.error("Error in connection callback", exc_info=True)
複製代碼

能夠看到這個方法在一次迭代中accept了全部正在等待處理的鏈接。也就是說直到EWOULDBLOCK異常發生while True循環纔會退出,這也就意味着當前沒有須要處理accept的鏈接了。

HTTP頭的部分的解析工做開始於HTTPConnection類的構造函數__init()__():

複製代碼
def __init__(self, stream, address, request_callback, no_keep_alive=False,
             xheaders=False):
    self.stream = stream
    self.address = address
    self.request_callback = request_callback
    self.no_keep_alive = no_keep_alive
    self.xheaders = xheaders
    self._request = None
    self._request_finished = False
    self.stream.read_until("rnrn", self._on_headers)
複製代碼

若是你很想知道xheaders參數的意義,請看這段註釋:

若是xheaders爲True,咱們將支持把全部請求的HTTP頭解析成X - Real - Ip和X - Scheme格式,而原先咱們將HTTP頭解析成remote IP和HTTP scheme格式。這種格式的HTTP頭在Tornado運行於反向代理或均衡負載服務器的後端時將很是有用。

_on_headers()回調函數實際用來解析HTTP頭,並在有請求內容的狀況下經過使用read_bytes()來讀取請求的內容部分。_on_request_body()回調函數用來解析POST的參數並調用應用層提供的回調函數:

複製代碼
def _on_headers(self, data):
    eol = data.find("rn")
    start_line = data[:eol]
    method, uri, version = start_line.split(" ")
    if not version.startswith("HTTP/"):
        raise Exception("Malformed HTTP version in HTTP Request-Line")
    headers = HTTPHeaders.parse(data[eol:])
    self._request = HTTPRequest(
        connection=self, method=method, uri=uri, version=version,
        headers=headers, remote_ip=self.address[0])
    content_length = headers.get("Content-Length")
    if content_length:
        content_length = int(content_length)
        if content_length > self.stream.max_buffer_size:
            raise Exception("Content-Length too long")
        if headers.get("Expect") == "100-continue":
            self.stream.write("HTTP/1.1 100 (Continue)rnrn")
        self.stream.read_bytes(content_length, self._on_request_body)
        return
    self.request_callback(self._request)
def _on_request_body(self, data):
    self._request.body = data
    content_type = self._request.headers.get("Content-Type", "")
    if self._request.method == "POST":
        if content_type.startswith("application/x-www-form-urlencoded"):
            arguments = cgi.parse_qs(self._request.body)
            for name, values in arguments.iteritems():
                values = [v for v in values if v]
                if values:
                    self._request.arguments.setdefault(name, []).extend(
                        values)
        elif content_type.startswith("multipart/form-data"):
            boundary = content_type[30:]
            if boundary: self._parse_mime_body(boundary, data)
    self.request_callback(self._request)

複製代碼

將結果寫回客戶端的工做在HTTPRequest類中處理,你能夠在上面的_on_headers()方法中看到具體的實現。HTTPRequest類僅僅將寫回的工做代理給了stream對象。

def write(self, chunk):
    assert self._request, "Request closed"
    self.stream.write(chunk, self._on_write_complete)

未完待續?

經過這篇文章,我已經涵蓋了從socket到應用層的全部方面。這應該能給你關於Tornado是如何工做的一個清晰的理解。總之,我認爲Tornado的代碼是很是友好的,我但願你也這樣認爲。

Tornado框架還有很大一部分咱們沒有探索,好比wep.py(應該是web.py,譯者注)這個實際與你應用打交道的模塊,又或者是template engine模塊。若是我有足夠興趣的話,我也會介紹這些部分。能夠經過訂閱個人RSS feed來鼓勵我。

相關文章
相關標籤/搜索