Tornado源碼淺析

初識tornado

經典的hello world 案例:javascript

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([
    (r"/index", MainHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

程序執行流程:css

一、建立Application 對象,把正則表達式和類名MainHandler傳入構造函數,即:  tornado.web.Application(...)html

二、執行Application 對象的 listen(...) 方法,即: application.listen(8888)前端

三、執行IOLoop類的類的 start() 方法,即:tornado.ioloop.IOLoop.instance().start()java

程序實質:建立一個socket 監聽8888端口,當請求到來時,根據請求的url和請求方式(get,post,put)來指定相應的類中的方法來處理本次請求。python

在瀏覽器上訪問:http://127.0.0.1:8888/index,則服務器給瀏覽器就會返回 Hello,world ,不然返回 404: Not Found(tornado內部定義的值), 即完成一次http請求和響應。git

由上述分析,咱們將整個Web框架分爲兩大部分:

  • 待請求階段(程序啓動階段),即:建立服務端socket並監聽端口github

  • 處理請求階段,即:當有客戶端鏈接時,接受請求,並根據請求的不一樣作出相應的相應web

 

待請求階段(程序啓動階段)
import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([             ##  <=======  1
    (r"/index", MainHandler),
])

if __name__ == "__main__":
    application.listen(8888)                        ##  <======== 2
    tornado.ioloop.IOLoop.instance().start()        ##  <======== 3

1. application = tornado.web.Application([(xxx,xxx)])正則表達式

def __init__(self, handlers=None, default_host="", transforms=None,
             **settings):
    # 設置響應的編碼和返回方式,對應的http相應頭:Content-Encoding和Transfer-Encoding
    # Content-Encoding:gzip 表示對數據進行壓縮,而後再返回給用戶,從而減小流量的傳輸。
    # Transfer-Encoding:chunck 表示數據的傳送方式經過一塊一塊的傳輸。
    if transforms is None:
        self.transforms = []
        if settings.get("compress_response") or settings.get("gzip"):
            self.transforms.append(GZipContentEncoding)
    else:
        self.transforms = transforms
    # 將參數賦值爲類的變量
    self.handlers = []
    self.named_handlers = {}
    self.default_host = default_host
    self.settings = settings
    # ui_modules和ui_methods用於在模版語言中擴展自定義輸出
    # 這裏將tornado內置的ui_modules和ui_methods添加到類的成員變量self.ui_modules和self.ui_methods中
    self.ui_modules = {'linkify': _linkify,
                       'xsrf_form_html': _xsrf_form_html,
                       'Template': TemplateModule,
                       }
    self.ui_methods = {}
    # 獲取獲取用戶自定義的ui_modules和ui_methods,並將他們添加到以前建立的成員變量self.ui_modules和self.ui_methods中
    self._load_ui_modules(settings.get("ui_modules", {}))
    self._load_ui_methods(settings.get("ui_methods", {}))
    # 設置靜態文件路徑,設置方式則是經過正則表達式匹配url,讓StaticFileHandler來處理匹配的url
    if self.settings.get("static_path"):
        # 從settings中讀取key爲static_path的值,用於設置靜態文件路徑
        path = self.settings["static_path"]
        # 獲取參數中傳入的handlers,若是空則設置爲空列表
        handlers = list(handlers or [])
        # 靜態文件前綴,默認是/static/
        static_url_prefix = settings.get("static_url_prefix",
                                         "/static/")
        static_handler_class = settings.get("static_handler_class",
                                            StaticFileHandler)
        static_handler_args = settings.get("static_handler_args", {})
        static_handler_args['path'] = path
        # 在參數中傳入的handlers前再添加三個映射:
        # 【/static/.*】            -->  StaticFileHandler
        # 【/(favicon\.ico)】    -->  StaticFileHandler
        # 【/(robots\.txt)】        -->  StaticFileHandler
        for pattern in [re.escape(static_url_prefix) + r"(.*)",
                        r"/(favicon\.ico)", r"/(robots\.txt)"]:
            handlers.insert(0, (pattern, static_handler_class,
                                static_handler_args))
    # 執行本類的Application的add_handlers方法
    # 此時,handlers是一個列表,其中的每一個元素都是一個對應關係,即:url正則表達式和處理匹配該正則的url的Handler
    if handlers:
        self.add_handlers(".*$", handlers)   #<==================

    # Automatically reload modified modules
    # 若是settings中設置了 debug 模式,那麼就使用自動加載重啓
    if self.settings.get('debug'):
        self.settings.setdefault('autoreload', True)
        self.settings.setdefault('compiled_template_cache', False)
        self.settings.setdefault('static_hash_cache', False)
        self.settings.setdefault('serve_traceback', True)

    if self.settings.get('autoreload'):
        from tornado import autoreload
        autoreload.start()
Application.__init__
def add_handlers(self, host_pattern, host_handlers):
    """Appends the given handlers to our handler list.

    Host patterns are processed sequentially in the order they were
    added. All matching patterns will be considered.
    """
    # 若是主機模型最後沒有結尾符,那麼就爲他添加一個結尾符。
    if not host_pattern.endswith("$"):
        host_pattern += "$"
    handlers = []
    # 對主機名先作一層路由映射,例如:http://www.alex.com 和 http://safe.alex.com
    # 即:safe對應一組url映射,www對應一組url映射,那麼當請求到來時,先根據它作第一層匹配,以後再繼續進入內部匹配。

    # 對於第一層url映射來講,因爲.*會匹配全部的url,所將 .* 的永遠放在handlers列表的最後,否則 .* 就會截和了...
    # re.complie是編譯正則表達式,之後請求來的時候只須要執行編譯結果的match方法就能夠去匹配了
    if self.handlers and self.handlers[-1][0].pattern == '.*$':
        self.handlers.insert(-1, (re.compile(host_pattern), handlers))
    else:
        self.handlers.append((re.compile(host_pattern), handlers))
        # [(re.compile('.*$'), [])]

    # 遍歷咱們設置的和構造函數中添加的【url->Handler】映射,將url和對應的Handler封裝到URLSpec類中(構造函數中會對url進行編譯)
    # 並將全部的URLSpec對象添加到handlers列表中,而handlers列表和主機名模型組成一個元祖,添加到self.Handlers列表中。
    for spec in host_handlers:
        if isinstance(spec, (tuple, list)):
            assert len(spec) in (2, 3, 4)
            spec = URLSpec(*spec)  # <==============
        handlers.append(spec)
        if spec.name:
            if spec.name in self.named_handlers:
                app_log.warning(
                    "Multiple handlers named %s; replacing previous value",
                    spec.name)
            self.named_handlers[spec.name] = spec
Application.add_handlers

上述代碼主要完成了如下功能:加載配置信息和生成url映射,而且把全部的信息封裝在一個application對象中。

加載的配置信息包括:

  • 編碼和返回方式信息

  • 靜態文件路徑

  • ui_modules(模版語言中使用,暫時忽略)

  • ui_methods(模版語言中使用,暫時忽略)

  • 是否debug模式運行

以上的全部配置信息,均可以在settings中配置,而後在建立Application對象時候,傳入參數便可。如:application = tornado.web.Application([(r"/index", MainHandler),],**settings)

生成url映射:

  • 將url和對應的Handler添加到對應的主機前綴中,如:safe.index.com、www.auto.com

 封裝數據:

將配置信息和url映射關係封裝到Application對象中,信息分別保存在Application對象的如下字段中:

  • self.transforms,保存着編碼和返回方式信息

  • self.settings,保存着配置信息

  • self.ui_modules,保存着ui_modules信息

  • self.ui_methods,保存這ui_methods信息

  • self.handlers,保存着全部的主機名對應的Handlers,每一個handlers則是url正則對應的Handler

2. application.listen(...)

def listen(self, port, address="", **kwargs):
    from tornado.httpserver import HTTPServer
    server = HTTPServer(self, **kwargs)
    server.listen(port, address)
    return server

這步執行application對象的listen方法,該方法內部又把以前包含各類信息的application對象封裝到了一個HttpServer對象中,而後繼續調用HttpServer對象的liseten方法。

class HTTPServer(TCPServer, Configurable,httputil.HTTPServerConnectionDelegate):

HttpServer 類繼承了TCPServer類,

TCPServer類listen方法:

def listen(self, port, address=""):
    sockets = bind_sockets(port, address=address)
    self.add_sockets(sockets)

1.def bind_sockets 創建socket鏈接

bind_sockets

2.add_sockets(sockets) 添加socket,進行監聽

def add_sockets(self, sockets):
    if self.io_loop is None:
        self.io_loop = IOLoop.current()         #1 <==============
        # tornado.platform.select.SelectIOLoop object
        # 設置成員變量self.io_loop爲IOLoop的實例,注:IOLoop使用單例模式建立

    for sock in sockets:
        self._sockets[sock.fileno()] = sock
        add_accept_handler(sock, self._handle_connection,io_loop=self.io_loop)       #2 <=============
        # 執行IOLoop的add_accept_handler方法,將socket句柄、self._handle_connection方法和io_loop對象當參數傳入
def _handle_connection(self, connection, address):
    if self.ssl_options is not None:
        assert ssl, "Python 2.6+ and OpenSSL required for SSL"
        try:
            connection = ssl_wrap_socket(connection,
                                         self.ssl_options,
                                         server_side=True,
                                         do_handshake_on_connect=False)
        except ssl.SSLError as err:
            if err.args[0] == ssl.SSL_ERROR_EOF:
                return connection.close()
            else:
                raise
        except socket.error as err:
            # If the connection is closed immediately after it is created
            # (as in a port scan), we can get one of several errors.
            # wrap_socket makes an internal call to getpeername,
            # which may return either EINVAL (Mac OS X) or ENOTCONN
            # (Linux).  If it returns ENOTCONN, this error is
            # silently swallowed by the ssl module, so we need to
            # catch another error later on (AttributeError in
            # SSLIOStream._do_ssl_handshake).
            # To test this behavior, try nmap with the -sT flag.
            # https://github.com/tornadoweb/tornado/pull/750
            if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
                return connection.close()
            else:
                raise
    try:
        if self.ssl_options is not None:
            stream = SSLIOStream(connection, io_loop=self.io_loop,
                                 max_buffer_size=self.max_buffer_size,
                                 read_chunk_size=self.read_chunk_size)
        else:
            stream = IOStream(connection, io_loop=self.io_loop,
                              max_buffer_size=self.max_buffer_size,
                              read_chunk_size=self.read_chunk_size)
        future = self.handle_stream(stream, address)
        if future is not None:
            self.io_loop.add_future(future, lambda f: f.result())
    except Exception:
        app_log.error("Error in connection callback", exc_info=True)
self._handle_connection

#1 建立的IOLoop對象爲: SelectIOLoop (windows下)

def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
        return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
    return SelectIOLoop          # <============

#2 而後執行add_accept_handler()

def add_accept_handler(sock, callback, io_loop=None):
    """Adds an `.IOLoop` event handler to accept new connections on ``sock``.

    When a connection is accepted, ``callback(connection, address)`` will
    be run (``connection`` is a socket object, and ``address`` is the
    address of the other end of the connection).  Note that this signature
    is different from the ``callback(fd, events)`` signature used for
    `.IOLoop` handlers.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    if io_loop is None:
        io_loop = IOLoop.current()

    def accept_handler(fd, events):
        # More connections may come in while we're handling callbacks;
        # to prevent starvation of other tasks we must limit the number
        # of connections we accept at a time.  Ideally we would accept
        # up to the number of connections that were waiting when we
        # entered this method, but this information is not available
        # (and rearranging this method to call accept() as many times
        # as possible before running any callbacks would have adverse
        # effects on load balancing in multiprocess configurations).
        # Instead, we use the (default) listen backlog as a rough
        # heuristic for the number of connections we can reasonably
        # accept at once.
        for i in xrange(_DEFAULT_BACKLOG):
            try:
                connection, address = sock.accept()
            except socket.error as e:
                # _ERRNO_WOULDBLOCK indicate we have accepted every
                # connection that is available.
                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
                    return
                # ECONNABORTED indicates that there was a connection
                # but it was closed while still in the accept queue.
                # (observed on FreeBSD).
                if errno_from_exception(e) == errno.ECONNABORTED:
                    continue
                raise
            callback(connection, address)
    io_loop.add_handler(sock, accept_handler, IOLoop.READ)
add_accept_handler

執行其中的 io_loop.add_handler  io_loop對象爲: SelectIOLoop 因此執行它的add_handler()

class SelectIOLoop(PollIOLoop):      # <============
    def initialize(self, **kwargs):
        super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)

執行PollIOLoop 的add_handler()

def add_handler(self, fd, handler, events):
    fd, obj = self.split_fd(fd)
    self._handlers[fd] = (obj, stack_context.wrap(handler))     #1 <==========
    self._impl.register(fd, events | self.ERROR)

stack_context.wrap其實就是對函數進行一下封裝,即:函數在不一樣狀況下上下文信息可能不一樣。

def wrap(fn):
    """Returns a callable object that will restore the current `StackContext`
    when executed.

    Use this whenever saving a callback to be executed later in a
    different execution context (either in a different thread or
    asynchronously in the same thread).
    """
    # Check if function is already wrapped
    if fn is None or hasattr(fn, '_wrapped'):
        return fn

    # Capture current stack head
    # TODO: Any other better way to store contexts and update them in wrapped function?
    cap_contexts = [_state.contexts]

    if not cap_contexts[0][0] and not cap_contexts[0][1]:
        # Fast path when there are no active contexts.
        def null_wrapper(*args, **kwargs):
            try:
                current_state = _state.contexts
                _state.contexts = cap_contexts[0]
                return fn(*args, **kwargs)
            finally:
                _state.contexts = current_state
        null_wrapper._wrapped = True
        return null_wrapper

    def wrapped(*args, **kwargs):
        ret = None
        try:
            # Capture old state
            current_state = _state.contexts

            # Remove deactivated items
            cap_contexts[0] = contexts = _remove_deactivated(cap_contexts[0])

            # Force new state
            _state.contexts = contexts

            # Current exception
            exc = (None, None, None)
            top = None

            # Apply stack contexts
            last_ctx = 0
            stack = contexts[0]

            # Apply state
            for n in stack:
                try:
                    n.enter()
                    last_ctx += 1
                except:
                    # Exception happened. Record exception info and store top-most handler
                    exc = sys.exc_info()
                    top = n.old_contexts[1]

            # Execute callback if no exception happened while restoring state
            if top is None:
                try:
                    ret = fn(*args, **kwargs)
                except:
                    exc = sys.exc_info()
                    top = contexts[1]

            # If there was exception, try to handle it by going through the exception chain
            if top is not None:
                exc = _handle_exception(top, exc)
            else:
                # Otherwise take shorter path and run stack contexts in reverse order
                while last_ctx > 0:
                    last_ctx -= 1
                    c = stack[last_ctx]

                    try:
                        c.exit(*exc)
                    except:
                        exc = sys.exc_info()
                        top = c.old_contexts[1]
                        break
                else:
                    top = None

                # If if exception happened while unrolling, take longer exception handler path
                if top is not None:
                    exc = _handle_exception(top, exc)

            # If exception was not handled, raise it
            if exc != (None, None, None):
                raise_exc_info(exc)
        finally:
            _state.contexts = current_state
        return ret

    wrapped._wrapped = True
    return wrapped
stack_context.wrap

上述代碼本質上就幹了如下這麼四件事:

  1. 把包含了各類配置信息的application對象封裝到了HttpServer對象的request_callback字段中

  2. 建立了服務端socket對象

  3. 單例模式建立IOLoop對象,而後將socket對象句柄做爲key,被封裝了的函數_handle_connection做爲value,添加到IOLoop對象的_handlers字段中

  4. 向epoll中註冊監聽服務端socket對象的讀可用事件

3. tornado.ioloop.IOLoop.instance().start()

該步驟則就來執行epoll的epoll方法去輪詢已經註冊在epoll對象中的socket句柄,當有讀可用信息時,則觸發操做

def start(self):
    if self._running:
        raise RuntimeError("IOLoop is already running")
    self._setup_logging()
    if self._stopped:
        self._stopped = False
        return
    old_current = getattr(IOLoop._current, "instance", None)
    IOLoop._current.instance = self
    self._thread_ident = thread.get_ident()
    self._running = True

    # signal.set_wakeup_fd closes a race condition in event loops:
    # a signal may arrive at the beginning of select/poll/etc
    # before it goes into its interruptible sleep, so the signal
    # will be consumed without waking the select.  The solution is
    # for the (C, synchronous) signal handler to write to a pipe,
    # which will then be seen by select.
    #
    # In python's signal handling semantics, this only matters on the
    # main thread (fortunately, set_wakeup_fd only works on the main
    # thread and will raise a ValueError otherwise).
    #
    # If someone has already set a wakeup fd, we don't want to
    # disturb it.  This is an issue for twisted, which does its
    # SIGCHLD processing in response to its own wakeup fd being
    # written to.  As long as the wakeup fd is registered on the IOLoop,
    # the loop will still wake up and everything should work.
    old_wakeup_fd = None
    if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
        # requires python 2.6+, unix.  set_wakeup_fd exists but crashes
        # the python process on windows.
        try:
            old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
            if old_wakeup_fd != -1:
                # Already set, restore previous value.  This is a little racy,
                # but there's no clean get_wakeup_fd and in real use the
                # IOLoop is just started once at the beginning.
                signal.set_wakeup_fd(old_wakeup_fd)
                old_wakeup_fd = None
        except ValueError:
            # Non-main thread, or the previous value of wakeup_fd
            # is no longer valid.
            old_wakeup_fd = None

    try:
        while True:

            # Prevent IO event starvation by delaying new callbacks
            # to the next iteration of the event loop.
            with self._callback_lock:
                callbacks = self._callbacks
                self._callbacks = []

            # Add any timeouts that have come due to the callback list.
            # Do not run anything until we have determined which ones
            # are ready, so timeouts that call add_timeout cannot
            # schedule anything in this iteration.
            due_timeouts = []
            if self._timeouts:
                now = self.time()
                while self._timeouts:
                    if self._timeouts[0].callback is None:
                        # The timeout was cancelled.  Note that the
                        # cancellation check is repeated below for timeouts
                        # that are cancelled by another timeout or callback.
                        heapq.heappop(self._timeouts)
                        self._cancellations -= 1
                    elif self._timeouts[0].deadline <= now:
                        due_timeouts.append(heapq.heappop(self._timeouts))
                    else:
                        break
                if (self._cancellations > 512 and
                        self._cancellations > (len(self._timeouts) >> 1)):
                    # Clean up the timeout queue when it gets large and it's
                    # more than half cancellations.
                    self._cancellations = 0
                    self._timeouts = [x for x in self._timeouts
                                      if x.callback is not None]
                    heapq.heapify(self._timeouts)

            for callback in callbacks:
                self._run_callback(callback)
            for timeout in due_timeouts:
                if timeout.callback is not None:
                    self._run_callback(timeout.callback)
            # Closures may be holding on to a lot of memory, so allow
            # them to be freed before we go into our poll wait.
            callbacks = callback = due_timeouts = timeout = None

            if self._callbacks:
                # If any callbacks or timeouts called add_callback,
                # we don't want to wait in poll() before we run them.
                poll_timeout = 0.0
            elif self._timeouts:
                # If there are any timeouts, schedule the first one.
                # Use self.time() instead of 'now' to account for time
                # spent running callbacks.
                poll_timeout = self._timeouts[0].deadline - self.time()
                poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
            else:
                # No timeouts and no callbacks, so use the default.
                poll_timeout = _POLL_TIMEOUT

            if not self._running:
                break

            if self._blocking_signal_threshold is not None:
                # clear alarm so it doesn't fire while poll is waiting for
                # events.
                signal.setitimer(signal.ITIMER_REAL, 0, 0)

            try:
                event_pairs = self._impl.poll(poll_timeout)
            except Exception as e:
                # Depending on python version and IOLoop implementation,
                # different exception types may be thrown and there are
                # two ways EINTR might be signaled:
                # * e.errno == errno.EINTR
                # * e.args is like (errno.EINTR, 'Interrupted system call')
                if errno_from_exception(e) == errno.EINTR:
                    continue
                else:
                    raise

            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL,
                                 self._blocking_signal_threshold, 0)

            # Pop one fd at a time from the set of pending fds and run
            # its handler. Since that handler may perform actions on
            # other file descriptors, there may be reentrant calls to
            # this IOLoop that modify self._events
            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                try:
                    fd_obj, handler_func = self._handlers[fd]
                    handler_func(fd_obj, events)
                except (OSError, IOError) as e:
                    if errno_from_exception(e) == errno.EPIPE:
                        # Happens when the client closes the connection
                        pass
                    else:
                        self.handle_callback_exception(self._handlers.get(fd))
                except Exception:
                    self.handle_callback_exception(self._handlers.get(fd))
            fd_obj = handler_func = None

    finally:
        # reset the stopped flag so another start/stop pair can be issued
        self._stopped = False
        if self._blocking_signal_threshold is not None:
            signal.setitimer(signal.ITIMER_REAL, 0, 0)
        IOLoop._current.instance = old_current
        if old_wakeup_fd is not None:
            signal.set_wakeup_fd(old_wakeup_fd)
tornado.ioloop.IOLoop.instance().start()

對於上述代碼,執行start方法後,程序就進入「死循環」,也就是會一直不停的輪詢的去檢查是否有請求到來,

若是有請求到達,則執行封裝了HttpServer類的_handle_connection方法和相關上下文的stack_context.wrap(handler)

 

請求來了

輪詢過程當中,當有請求到達時,先執行, accept_handler 接收請求地址,調用callback方法,即: _handle_connection()

def add_accept_handler(sock, callback, io_loop=None):
    if io_loop is None:
        io_loop = IOLoop.current()

    def accept_handler(fd, events):

        for i in xrange(_DEFAULT_BACKLOG):
            try:
                connection, address = sock.accept()
            except socket.error as e:
                # _ERRNO_WOULDBLOCK indicate we have accepted every
                # connection that is available.
                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
                    return
                # ECONNABORTED indicates that there was a connection
                # but it was closed while still in the accept queue.
                # (observed on FreeBSD).
                if errno_from_exception(e) == errno.ECONNABORTED:
                    continue
                raise
            callback(connection, address)      # <=============
    io_loop.add_handler(sock, accept_handler, IOLoop.READ)

建立封裝了客戶端socket對象和IOLoop對象的IOStream實例(用於以後獲取或輸出數據)。

def _handle_connection(self, connection, address):
    # ... 省略
    try:
        if self.ssl_options is not None:
            # ...省略
        else:
            stream = IOStream(connection, io_loop=self.io_loop,
                              max_buffer_size=self.max_buffer_size,
                              read_chunk_size=self.read_chunk_size)
        future = self.handle_stream(stream, address)
        if future is not None:
            self.io_loop.add_future(future, lambda f: f.result())
    except Exception:
        app_log.error("Error in connection callback", exc_info=True)
class IOStream(BaseIOStream):
    def __init__(self, socket, *args, **kwargs):
        self.socket = socket
        self.socket.setblocking(False)
        super(IOStream, self).__init__(*args, **kwargs)
IOStream
class BaseIOStream(object):
    """A utility class to write to and read from a non-blocking file or socket.

    We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
    All of the methods take an optional ``callback`` argument and return a
    `.Future` only if no callback is given.  When the operation completes,
    the callback will be run or the `.Future` will resolve with the data
    read (or ``None`` for ``write()``).  All outstanding ``Futures`` will
    resolve with a `StreamClosedError` when the stream is closed; users
    of the callback interface will be notified via
    `.BaseIOStream.set_close_callback` instead.

    When a stream is closed due to an error, the IOStream's ``error``
    attribute contains the exception object.

    Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
    `read_from_fd`, and optionally `get_fd_error`.
    """
    def __init__(self, io_loop=None, max_buffer_size=None,
                 read_chunk_size=None, max_write_buffer_size=None):
        """`BaseIOStream` constructor.

        :arg io_loop: The `.IOLoop` to use; defaults to `.IOLoop.current`.
                      Deprecated since Tornado 4.1.
        :arg max_buffer_size: Maximum amount of incoming data to buffer;
            defaults to 100MB.
        :arg read_chunk_size: Amount of data to read at one time from the
            underlying transport; defaults to 64KB.
        :arg max_write_buffer_size: Amount of outgoing data to buffer;
            defaults to unlimited.

        .. versionchanged:: 4.0
           Add the ``max_write_buffer_size`` parameter.  Changed default
           ``read_chunk_size`` to 64KB.
        """
        self.io_loop = io_loop or ioloop.IOLoop.current()
        self.max_buffer_size = max_buffer_size or 104857600
        # A chunk size that is too close to max_buffer_size can cause
        # spurious failures.
        self.read_chunk_size = min(read_chunk_size or 65536,
                                   self.max_buffer_size // 2)
        self.max_write_buffer_size = max_write_buffer_size
        self.error = None
        self._read_buffer = collections.deque()
        self._write_buffer = collections.deque()
        self._read_buffer_size = 0
        self._write_buffer_size = 0
        self._write_buffer_frozen = False
        self._read_delimiter = None
        self._read_regex = None
        self._read_max_bytes = None
        self._read_bytes = None
        self._read_partial = False
        self._read_until_close = False
        self._read_callback = None
        self._read_future = None
        self._streaming_callback = None
        self._write_callback = None
        self._write_future = None
        self._close_callback = None
        self._connect_callback = None
        self._connect_future = None
        # _ssl_connect_future should be defined in SSLIOStream
        # but it's here so we can clean it up in maybe_run_close_callback.
        # TODO: refactor that so subclasses can add additional futures
        # to be cancelled.
        self._ssl_connect_future = None
        self._connecting = False
        self._state = None
        self._pending_callbacks = 0
        self._closed = False
BaseIOStream

1.tornado.web.RequestHandler

這是全部業務處理handler須要繼承的父類,接下來,介紹一些RequestHandler類中經常使用的一些方法:

#1 initialize

def __init__(self, application, request, **kwargs):
    super(RequestHandler, self).__init__()

    self.application = application
    self.request = request
    self._headers_written = False
    self._finished = False
    self._auto_finish = True
    self._transforms = None  # will be set in _execute
    self._prepared_future = None
    self._headers = None  # type: httputil.HTTPHeaders
    self.path_args = None
    self.path_kwargs = None
    self.ui = ObjectDict((n, self._ui_method(m)) for n, m in
                         application.ui_methods.items())
    # UIModules are available as both `modules` and `_tt_modules` in the
    # template namespace.  Historically only `modules` was available
    # but could be clobbered by user additions to the namespace.
    # The template {% module %} directive looks in `_tt_modules` to avoid
    # possible conflicts.
    self.ui["_tt_modules"] = _UIModuleNamespace(self,
                                                application.ui_modules)
    self.ui["modules"] = self.ui["_tt_modules"]
    self.clear()
    self.request.connection.set_close_callback(self.on_connection_close)
    self.initialize(**kwargs)

def initialize(self):
    pass

從源碼中能夠看出initialize函數會在RequestHandler類初始化的時候執行,可是源碼中initialize函數並無作任何事情,

這實際上是tornado爲咱們預留的修改源碼的地方,這就容許程序在執行全部的handler前首先執行咱們在initialize中定義的方法。

#2 write

def write(self, chunk):
    if self._finished:
        raise RuntimeError("Cannot write() after finish()")
    if not isinstance(chunk, (bytes, unicode_type, dict)):
        message = "write() only accepts bytes, unicode, and dict objects"
        if isinstance(chunk, list):
            message += ". Lists not accepted for security reasons; see http://www.tornadoweb.org/en/stable/web.html#tornado.web.RequestHandler.write"
        raise TypeError(message)
    if isinstance(chunk, dict):
        chunk = escape.json_encode(chunk)
        self.set_header("Content-Type", "application/json; charset=UTF-8")

    chunk = utf8(chunk)
    self._write_buffer.append(chunk)

write方法接收字典和字符串類型的參數,若是用戶傳來的數據是字典類型,源碼中會自動用json對字典進行序列化,最終序列化成字符串。

self._write_buffer是源碼中定義的一個臨時存放須要輸出的字符串的地方,是列表形式。

#3 flush

def flush(self, include_footers=False, callback=None):
    chunk = b"".join(self._write_buffer)
    self._write_buffer = []
    if not self._headers_written:
        self._headers_written = True
        for transform in self._transforms:
            self._status_code, self._headers, chunk = \
                transform.transform_first_chunk(
                    self._status_code, self._headers,
                    chunk, include_footers)

        if self.request.method == "HEAD":
            chunk = None
        if hasattr(self, "_new_cookie"):
            for cookie in self._new_cookie.values():
                self.add_header("Set-Cookie", cookie.OutputString(None))

        start_line = httputil.ResponseStartLine('',
                                                self._status_code,
                                                self._reason)
        return self.request.connection.write_headers(
            start_line, self._headers, chunk, callback=callback)
    else:
        for transform in self._transforms:
            chunk = transform.transform_chunk(chunk, include_footers)
       
        if self.request.method != "HEAD":
            return self.request.connection.write(chunk, callback=callback)
        else:
            future = Future()
            future.set_result(None)
            return future

flush方法會self._write_buffer列表中的全部元素拼接成字符串,並賦值給chunk,而後清空self._write_buffer列表,而後設置請求頭,最終調用request.write方法在前端頁面顯示。

#4 render

def render(self, template_name, **kwargs):
    """Renders the template with the given arguments as the response."""
    if self._finished:
        raise RuntimeError("Cannot render() after finish()")
    html = self.render_string(template_name, **kwargs)

    # Insert the additional JS and CSS added by the modules on the page
    js_embed = []
    js_files = []
    css_embed = []
    css_files = []
    html_heads = []
    html_bodies = []

由上述源碼可看出render方法是根據參數渲染模板:

js和css部分的源碼:

for module in getattr(self, "_active_modules", {}).values():
    embed_part = module.embedded_javascript()
    if embed_part:
        js_embed.append(utf8(embed_part))
    file_part = module.javascript_files()
    if file_part:
        if isinstance(file_part, (unicode_type, bytes)):
            js_files.append(file_part)
        else:
            js_files.extend(file_part)
    embed_part = module.embedded_css()
    if embed_part:
        css_embed.append(utf8(embed_part))
    file_part = module.css_files()
    if file_part:
        if isinstance(file_part, (unicode_type, bytes)):
            css_files.append(file_part)
        else:
            css_files.extend(file_part)
    head_part = module.html_head()
    if head_part:
        html_heads.append(utf8(head_part))
    body_part = module.html_body()
    if body_part:
        html_bodies.append(utf8(body_part))

由上述源碼可看出,靜態文件(以JavaScript爲例,css是相似的)的渲染流程是:

首先經過module.embedded_javascript() 獲取須要插入JavaScript字符串,添加到js_embed 列表中;

進而經過module.javascript_files()獲取已有的列表格式的JavaScript files,最終將它加入js_files.

下面對js_embed和js_files作進一步介紹:

js_embed源碼:

if js_embed:
    js = b'<script type="text/javascript">\n//<![CDATA[\n' + \
        b'\n'.join(js_embed) + b'\n//]]>\n</script>'
    sloc = html.rindex(b'</body>')
    html = html[:sloc] + js + b'\n' + html[sloc:]

上圖源碼即生成script標籤,這是一些咱們本身定義的一些JavaScript代碼;最終是經過字符串拼接方式插入到整個html中。

js_files源碼:

if js_files:
    # Maintain order of JavaScript files given by modules
    paths = []
    unique_paths = set()
    for path in js_files:
        if not is_absolute(path):
            path = self.static_url(path)
        if path not in unique_paths:
            paths.append(path)
            unique_paths.add(path)
    js = ''.join('<script src="' + escape.xhtml_escape(p) +
                 '" type="text/javascript"></script>'
                 for p in paths)
    sloc = html.rindex(b'</body>')
    html = html[:sloc] + utf8(js) + b'\n' + html[sloc:]

上述源碼即生成script標籤,這是一些須要引入的JavaScript代碼塊;最終是經過字符串拼接方式插入到整個html中。

須要注意的是:其中靜態路徑是調用self.static_url(path)實現的。

def static_url(self, path, include_host=None, **kwargs):
    self.require_setting("static_path", "static_url")
    get_url = self.settings.get("static_handler_class",
                                StaticFileHandler).make_static_url

    if include_host is None:
        include_host = getattr(self, "include_host", False)

    if include_host:
        base = self.request.protocol + "://" + self.request.host
    else:
        base = ""

    return base + get_url(self.settings, path, **kwargs)

由上述代碼可看出:源碼首先會判斷用戶有沒有設置靜態路徑的前綴,而後將靜態路徑與相對路徑進行拼接成絕對路徑,

接下來按照絕對路徑打開文件,並對文件內容(f.read())作md5加密,最終將根目錄+靜態路徑前綴+相對路徑拼接在前端html中展現。

#5 render_string

1. 建立Loader對象,並執行load方法

  -- 經過open函數打開html文件並讀取內容,並將內容做爲參數又建立一個 Template 對象

  -- 當執行Template的 __init__ 方法時,根據模板語言的標籤 {{}}、{%%}等分割並html文件,最後生成一個字符串表示的函數

2. 獲取全部要嵌入到html模板中的變量,包括:用戶返回和框架默認

3. 執行Template對象的generate方法

  -- 編譯字符串表示的函數,並將用戶定義的值和框架默認的值做爲全局變量

  -- 執行被編譯的函數獲取被嵌套了數據的內容,而後將內容返回(用於響應給請求客戶端)

def render_string(self, template_name, **kwargs):
    template_path = self.get_template_path()
    if not template_path:
        frame = sys._getframe(0)
        web_file = frame.f_code.co_filename
        while frame.f_code.co_filename == web_file:
            frame = frame.f_back
        template_path = os.path.dirname(frame.f_code.co_filename)
    with RequestHandler._template_loader_lock:
        if template_path not in RequestHandler._template_loaders:
            loader = self.create_template_loader(template_path)
            RequestHandler._template_loaders[template_path] = loader
        else:
            loader = RequestHandler._template_loaders[template_path]
    t = loader.load(template_name)
    namespace = self.get_template_namespace()
    namespace.update(kwargs)
    return t.generate(**namespace)

示例html:

源碼模板語言處理部分的截圖:

相關文章
相關標籤/搜索