Tornado 異步框架學習

Tornado是一個很是優秀的非阻塞的web服務框架。按照功能能夠把module劃分爲:python

Core web framework:linux

    webios

        Application (Web Application 負責path解析與請求分發,重要的方法'__call__')web

        RequestHandler (請求處理基本,重載http verb 處理請求)緩存

    httpserver  (初始化TCPServer 與 路由讀寫請求)app

Asynchronous networking:框架

    ioloop (調度callback 以及輪詢 socket fd,處理相應的讀寫或者錯誤事件)異步

    iostream (異步的讀寫處理)socket

    netutil (建立綁定套接字,監聽端口,等待連接 )ide

Other utilities

Tornado的異步實現就是創建在ioloop(輪詢socket fd 讀寫事件)和iostream(異步的讀寫)的基礎之上。

因爲不一樣的os使用的輪詢模型不一樣,只選出linux內核支持的epoll模型來描述。

    We use epoll (Linux) or kqueue (BSD and Mac OS X; requires python 2.6+) if they are available, 

    or else we fall back on select(). If you are implementing a system that needs to handle thousands of

    simultaneous connections, you should use a system that supports either epoll or queue.


最好的學習參考資料就是代碼,下面根據框架正常的使用流程,提供一個各個模塊和方法的信息圖:

    

import tornado.web
import tornado.ioloop

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        return 'Hello World'

application = tornado.web.Application([(r'/', MainHandler),],)

application.listen(8080)
tornado.ioloop.IOLoop.instance().start()

Application                                             HTTPServer
    __init__    
    add_handler(pattern, handler)
    listen(port, address, **kwargs)
        server = HTTPServer(self, **kwargs)             __init__(request_callback, ...)
        |                                                   self.request_callback = request_callback
        v                                                   TCPServer.__init__()
        server.listen(port, address)                    handle_stream(stream, address, self.request_callback)
    __call__(self, request)                                 HTTPConnection(...)

TCPServer                                                               HTTPConnection
    __init__()                                                              __init__
    listen(port, address)                                                       self.request_callback = request_callback
        sockets = bind_socket(port, address)                                    self._header_callback = stack_context.wrap(self._on_headers)
            #getaddrinfo,create socket, setsockopt                              self.stream.read_until(r'\r\n\r\n', self._header_callback)
            #setblocking(0), bind, listen                                       
        add_sockets(sockets)                                                _on_headers(self, data)
            self.io_loop = IOLoop.instance()                                    # parse http protocol
            add_accept_handler(sock, self._handle_connection,                   eol = data.find(r'\r\n')
                    self.io_loop)                                               start_line = data[:eol]
                def accept_handler(fd, events)                                  method, uri, version = start_line.split(' ')
                    while True:                                                 headers = httputil.HTTPHeaders.parse(data[eol:])
                        connection, address = sock.accept()                     # construct a request object 
                                                                                self._request = HTTPRequest(connection=self, method, uri, version, headers, remote_address)
                        self._handle_connection(connection, address)             content-length = headers.get('Content-Length')
                            #If ssl_options wrap socket                         if content-length:
                            stream = IOStream(connection, self.io_loop)             self.stream.read_bytes(content-length, self._on_request_body)
                            #HTTPServer implement handle_stream                     return
                            self.handle_stream(stream, address)                 self.request_callback(self._request)
                                -> HTTPServer.handle_stream                 _on_request_body(self, data)
                self.io_loop.add_handler(sock.fileno(), accept_handler,         # parse content body, and set thire info to self._request.arguments
                                        IOLoop.READ)                            self.request_callback(self._request)

TcpServer調用一個很是重要的方法add_accept_handler,把鏈接回調函數和相應的socket傳遞進去,最終

調用ioloop對象註冊socket fd 以及對應的callback操做和事件。同時也是在這裏把no-blocking socket與異步的

讀寫經過IOStream鏈接起來。

IOLoop                                                                      IOStream
    __init__(self, impl=None) #Singleton                                        __init__(conn, self.io_loop)  
                                                                                    #max_buffer_size = 100MB, read_chunk_size=4096 byte
        self._impl = impl or _poll()                                                self._read_buffer = collections.deque()
                                                                                    self._write_buffer = collections.deque()
        self._waker = Waker()                                                   read_until(delimiter, callback)
        self.add_handler(self._waker.fileno(),                                      self._read_delimiter = delimiter
                lambda fd, events: self._waker.consume(),                           self._read_callback = callback
                self.READ)                                                          while True:
                                                                                        if self._read_from_buffer():
    add_handler(fd, handler, events)                                                        return 
        self._handlers[fd] = stack_context.wrap(handler)                                self.check_closed()
        self._impl.register(fd, events|IOLoop.ERROR)                                    if self._read_to_buffer() == 0:
                                                                                            break
    add_callback(callback)                                                              self._add_io_state(self.io_loop.READ)
        self._callbacks.append(stack_context(callback))                                                                        
                                                                                _read_to_buffer()
    start()                                                                         chunk = self.socket.recv(read_chunk_size) 
        while True:                                                                  self._read_buffer.append(chunk)
            poll_timeout = 0.2                                                       self._read_buffer_size += len(chunk)
            with self._callbacks_lock:
                callbacks, self._callbacks = self._callbacks, []                     # Check read buffer size
            for callback in callbacks:                                               return len(chunk)  
                self._run_callback(callback)                                    _read_from_buffer() 
                                                                                    _merge_prefix(self._read_buffer, sys.maxint)
            #deal with timeout handler                                              loc = self._read_buffer[0].find(self._read_delimiter)
            event_pairs = self._imol.poll(poll_timeout)                             if loc != -1:
            self._events.update(event_pairs)                                            callback = self._read_callback
            while self._events:                                                         # reset read control variable
                fd, event = self._events.popitem()                                      self._run_callback(callback, self._consume(loc + delimiter_len))
                self._handlers[fd](fd, event)                                           return True
                                                                                _run_callback(self, callback, *args)
                                                                                    def wrapper():
                                                                                        callback(*args)
                                                                                    with stack_context.NullContext():
                                                                                        self.io_loop.add_callback(wrapper)

在調用IOLoop的 start方法後,IOLoop對象開始循環處理 延遲的數據處理callback 以及timeout callback。最終輪詢socket fd

處理fd對應的callback方法。start方法處理的self._callback,到底是什麼,在什麼時間添加的呢? 等下咱們會慢慢的揭開面紗。

假設在self._imol.poll(poll_timeout)時,檢測到有一個fd有read事件,即有客戶鏈接,則調用對應的callback方法(TCPServer對象的accept_handler對象。),Accept 鏈接而且緊跟着建立IOStream對象(根據ssl_options選項肯定是不是加密socket)。而後調用handle_stream(因爲handle_stream 是在HTTPServer定義,實際上調用的是這裏的實現). 建立一個HTTPConnection對象。在HTTPConnection的初始化函數__init__()中,調用IOStream read_until方法開始讀取數據。首先判斷讀取的數據是否

已經知足條件,若是不知足則繼續從socket緩存讀取數據.若是已經知足條件,則調用_run_callback,把相應的處理函數(self._header_callback)append到IOLoop對象中,延遲到下次輪詢再進行處理. 官方也給出了這樣操做的緣由:

        # * Prevents unbounded stack growth when a callback calls an IOLoop operation that immediately runs another callback

        # * Provides a predictable execution context for e.g.  non-reentrant mutexes

        # * Ensures that the try/except in wrapper() is run outside of the application's StackContexts

當下次輪詢處理到這個callback時, 執行以前傳遞_header_callback函數對象, 建立HTTPRequest對象,

若是請求信息包含body則處理body數據塊, 過程相似header處理. 當解析完header和body(body可選)以後,

 HTTPConnection對象調用request_callback方法,傳遞HTTPRequest對象. 這裏的request_callback對象即咱們

以前建立的Application對象,這也是爲何Application要定義__call__方法.__call__方法根據解析到得path和method

路由到相應的RequestHandler對象,動態的獲取相應的方法對象,調用相應的方法. 

Response 過程和read差異不大,實際上HTTPRequest對象包含着HTTPConnection對象,最終的寫入操做被傳遞給IOStream的

write方法.

經過這篇文章,基本上應該對Tornado web服務的工做有了一個初步的瞭解. 若是想繼續深刻,請

直接看源碼, 畢竟Tornado的源碼是很是友好的,不少實現都有很高的參考.

相關文章
相關標籤/搜索