Bottle源碼閱讀(一)

在初識bottle(一)中,咱們瞭解了bottle框架的基本用法。在本篇文章中,咱們經過源碼來探究一些基本原理。

1. run的實現

全部的框架請求響應都基於一個原理
http請求 --> wsgi服務器 --> wsgi接口(實際就是框架中自定義實現的函數通過底層封裝) --> 響應
能夠參考廖雪峯的教程中關於wsgi接口的講解html

下咱們先看看bottle是如何實現服務器運行時自動從新加載python

def run(app=None,
        server='wsgiref',
        host='127.0.0.1',
        port=8080,
        interval=1,
        reloader=False,
        quiet=False,
        plugins=None,
        debug=None,
        config=None, **kargs):
    """ Start a server instance. This method blocks until the server terminates.

        :param app: WSGI application or target string supported by
               :func:`load_app`. (default: :func:`default_app`)
        :param server: Server adapter to use. See :data:`server_names` keys
               for valid names or pass a :class:`ServerAdapter` subclass.
               (default: `wsgiref`)
        :param host: Server address to bind to. Pass ``0.0.0.0`` to listens on
               all interfaces including the external one. (default: 127.0.0.1)
        :param port: Server port to bind to. Values below 1024 require root
               privileges. (default: 8080)
        :param reloader: Start auto-reloading server? (default: False)
        :param interval: Auto-reloader interval in seconds (default: 1)
        :param quiet: Suppress output to stdout and stderr? (default: False)
        :param options: Options passed to the server adapter.
     """
    if NORUN: return
    # 自動重載
    if reloader and not os.environ.get('BOTTLE_CHILD'):
        import subprocess
        lockfile = None
        try:
            # tempfile 臨時文件操做模塊https://docs.python.org/2/library/tempfile.html
            # 第一個至關於執行os.open()函數返回文件handler,第二個表示絕對路徑
            fd, lockfile = tempfile.mkstemp(prefix='bottle.', suffix='.lock')
            os.close(fd)  # We only need this file to exist. We never write to it
            # sys.executable 是獲取當前python解釋器的路徑
            while os.path.exists(lockfile):
                args = [sys.executable] + sys.argv
                environ = os.environ.copy()
                environ['BOTTLE_CHILD'] = 'true'
                environ['BOTTLE_LOCKFILE'] = lockfile

                # 建立一個子進程實例
                p = subprocess.Popen(args, env=environ)
                # 若是返回None表示子進程未結束
                while p.poll() is None:  # Busy wait...
                    # 臨時文件設置爲當前時間
                    os.utime(lockfile, None)  # I am alive!
                    time.sleep(interval)
                # linux 系統的信號機制http://www.cppblog.com/sleepwom/archive/2010/12/27/137564.html
                # 3表示按下退出鍵
                # 非正常退出時
                if p.poll() != 3:
                    # os.unlink 至關於去除remove()
                    if os.path.exists(lockfile): os.unlink(lockfile)
                    sys.exit(p.poll())
        except KeyboardInterrupt:
            pass
        finally:
            if os.path.exists(lockfile):
                os.unlink(lockfile)
        return

首先第一次運行時,開啓一個新的進程,確保運行server時的進程和python解釋器一致
不影響主進程的繼續運行linux

try:
        # 這一部分主要是app的相關設置
        if debug is not None: _debug(debug)
        app = app or default_app()
        if isinstance(app, basestring):
            app = load_app(app)
        if not callable(app):
            raise ValueError("Application is not callable: %r" % app)

        for plugin in plugins or []:
            if isinstance(plugin, basestring):
                plugin = load(plugin)
            app.install(plugin)

        if config:
            app.config.update(config)

        if server in server_names:
            server = server_names.get(server)
        if isinstance(server, basestring):
            server = load(server)
        if isinstance(server, type):
            server = server(host=host, port=port, **kargs)
        if not isinstance(server, ServerAdapter):
            raise ValueError("Unknown or unsupported server: %r" % server)

        server.quiet = server.quiet or quiet
        if not server.quiet:
            _stderr("Bottle v%s server starting up (using %s)...\n" %
                    (__version__, repr(server)))
            _stderr("Listening on http://%s:%d/\n" %
                    (server.host, server.port))
            _stderr("Hit Ctrl-C to quit.\n\n")
        
        # 當選擇自動重載時,若是解釋器進程已經啓動
        # 則只須要檢測應用相關內容有沒有變化,若是有變化終止主線程並從新實現異常捕獲
        if reloader:
            lockfile = os.environ.get('BOTTLE_LOCKFILE')
            bgcheck = FileCheckerThread(lockfile, interval)
            # 開啓新線程檢測文件修改,若是修改終止當前主線程,拋出異常
            with bgcheck:
                # 主線程監聽請求
                server.run(app)
            if bgcheck.status == 'reload':
                sys.exit(3)
        else:
            server.run(app)
    except KeyboardInterrupt:
        pass

    except (SystemExit, MemoryError):
        raise
    except:
        if not reloader: raise
        if not getattr(server, 'quiet', quiet):
            print_exc()
        time.sleep(interval)
        sys.exit(3)

FileCheckerThread會對應用相關文件內容變化進行檢測
server加載app,由server接收請求並執行相應的應用函數
在此以前,咱們先了解FileCheckerThreadgit

2. 應用修改後的自動重載

這是一個上下文管理器,當__enter__時開啓一個新的線程,這個線程的任務就是檢測應用相關模塊文件的變化,決定是否終止主線程,當__exit__時,若是返回True則重現異常,不然正常執行後續代碼服務器

class FileCheckerThread(threading.Thread):
    """ Interrupt main-thread as soon as a changed module file is detected,
        the lockfile gets deleted or gets too old. """

    def __init__(self, lockfile, interval):
        threading.Thread.__init__(self)
        self.daemon = True
        self.lockfile, self.interval = lockfile, interval
        #: Is one of 'reload', 'error' or 'exit'
        self.status = None

    def run(self):
        exists = os.path.exists
        mtime = lambda p: os.stat(p).st_mtime
        files = dict()

        for module in list(sys.modules.values()):
            path = getattr(module, '__file__', '')
            if path[-4:] in ('.pyo', '.pyc'): path = path[:-1]
            if path and exists(path): files[path] = mtime(path)

        while not self.status:
            if not exists(self.lockfile)\
            or mtime(self.lockfile) < time.time() - self.interval - 5:
                self.status = 'error'
                thread.interrupt_main()
            for path, lmtime in list(files.items()):
                if not exists(path) or mtime(path) > lmtime:
                    self.status = 'reload'
                    thread.interrupt_main()
                    break
            time.sleep(self.interval)

    def __enter__(self):
        self.start()
    
    # 這個地方是從新載入更新後模塊的關鍵
    # 當檢測到文件變化時,終止主線程使監聽請求中止,退出上下文管理器時,若是返回True則重現異常捕獲
    def __exit__(self, exc_type, *_):
        if not self.status: self.status = 'exit'  # silent exit
        self.join()
        return exc_type is not None and issubclass(exc_type, KeyboardInterrupt)

3. server調用應用函數

bottle提供了一個ServerAdapter的適配器類,重寫run方法就能使bottle可使用多種框架提供的server。app

class ServerAdapter(object):
    quiet = False

    def __init__(self, host='127.0.0.1', port=8080, **options):
        self.options = options
        self.host = host
        self.port = int(port)

    def run(self, handler):  # pragma: no cover
        pass

    def __repr__(self):
        args = ', '.join(['%s=%s' % (k, repr(v))
                          for k, v in self.options.items()])
        return "%s(%s)" % (self.__class__.__name__, args)

默認使用了python自帶的wsgiref, 從代碼中咱們能夠看到其中主要由三部分組成:接收請求模塊,處理請求模塊,組裝模塊框架

class WSGIRefServer(ServerAdapter):
    def run(self, app):  # pragma: no cover
        from wsgiref.simple_server import make_server
        from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
        import socket

        class FixedHandler(WSGIRequestHandler):
            def address_string(self):  # Prevent reverse DNS lookups please.
                return self.client_address[0]

            def log_request(*args, **kw):
                if not self.quiet:
                    return WSGIRequestHandler.log_request(*args, **kw)

        handler_cls = self.options.get('handler_class', FixedHandler)
        server_cls = self.options.get('server_class', WSGIServer)

        if ':' in self.host:  # Fix wsgiref for IPv6 addresses.
            if getattr(server_cls, 'address_family') == socket.AF_INET:

                class server_cls(server_cls):
                    address_family = socket.AF_INET6

        self.srv = make_server(self.host, self.port, app, server_cls,
                               handler_cls)
        self.port = self.srv.server_port  # update port actual port (0 means random)
        try:
            self.srv.serve_forever()
        except KeyboardInterrupt:
            self.srv.server_close()  # Prevent ResourceWarning: unclosed socket
            raise

4.WSGIServer

4.1 尋根到底,咱們現研究一下WSGIServer 的基類
BaseServer 主要實現線程上的控制,實現一些供上層調用的接口,例如dom

server_activate
serve_forever
shutdown
handle_request
verify_request
handle_error

TCPServer 繼承BaseServer, 實現bind,listen,accept, close等函數的封裝socket

def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

HttpServer 繼承TCPServer, 添加了host和port兩個屬性
WSGIServer 繼承HttpServer, 設置了環境變量,提供了獲取應用和設置應用的接口async

class WSGIServer(HTTPServer):

    """BaseHTTPServer that implements the Python WSGI protocol"""

    application = None

    def server_bind(self):
        """Override server_bind to store the server name."""
        HTTPServer.server_bind(self)
        self.setup_environ()

    def setup_environ(self):
        # Set up base environment
        env = self.base_environ = {}
        env['SERVER_NAME'] = self.server_name
        env['GATEWAY_INTERFACE'] = 'CGI/1.1'
        env['SERVER_PORT'] = str(self.server_port)
        env['REMOTE_HOST']=''
        env['CONTENT_LENGTH']=''
        env['SCRIPT_NAME'] = ''

    def get_app(self):
        return self.application

    def set_app(self,application):
        self.application = application

4.2 WSGIRequestHandler的實現
最底層的BaseRequestHandler:處理請求的基類,定義了處理請求的流程
StreamRequestHandler: 繼承BaseRequestHandler,提供了處理請求前rfile和wfile屬性,使處理請求時能經過相似文件讀寫獲取請求和返回響應

class StreamRequestHandler(BaseRequestHandler):

    """Define self.rfile and self.wfile for stream sockets."""

    # Default buffer sizes for rfile, wfile.
    # We default rfile to buffered because otherwise it could be
    # really slow for large data (a getc() call per byte); we make
    # wfile unbuffered because (a) often after a write() we want to
    # read and we need to flush the line; (b) big writes to unbuffered
    # files are typically optimized by stdio even when big reads
    # aren't.
    rbufsize = -1
    wbufsize = 0

    # A timeout to apply to the request socket, if not None.
    timeout = None

    # Disable nagle algorithm for this socket, if True.
    # Use only when wbufsize != 0, to avoid small packets.
    disable_nagle_algorithm = False

    def setup(self):
        self.connection = self.request
        if self.timeout is not None:
            self.connection.settimeout(self.timeout)
        if self.disable_nagle_algorithm:
            self.connection.setsockopt(socket.IPPROTO_TCP,
                                       socket.TCP_NODELAY, True)
        self.rfile = self.connection.makefile('rb', self.rbufsize)
        self.wfile = self.connection.makefile('wb', self.wbufsize)

    def finish(self):
        if not self.wfile.closed:
            try:
                self.wfile.flush()
            except socket.error:
                # A final socket error may have occurred here, such as
                # the local error ECONNABORTED.
                pass
        self.wfile.close()
        self.rfile.close()

BaseHTTPRequestHandler:繼承StreamRequestHandler,handle處理一個請求,輪詢直到收到一個明確關閉鏈接;parse_request解析請求requestline,若是一切正常,繼續處理請求

WSGIRequestHandler:繼承了BaseHTTPRequestHandler, 添加get_environ獲取環境變量, 重寫了handle方法。當requestline >65536時返回414, 實例化一個ServerHandler實例

def handle(self):
        """Handle a single HTTP request"""

        self.raw_requestline = self.rfile.readline(65537)
        if len(self.raw_requestline) > 65536:
            self.requestline = ''
            self.request_version = ''
            self.command = ''
            self.send_error(414)
            return

        if not self.parse_request(): # An error code has been sent, just exit
            return

        handler = ServerHandler(
            self.rfile, self.wfile, self.get_stderr(), self.get_environ()
        )
        handler.request_handler = self      # backpointer for logging
        handler.run(self.server.get_app())

handler.run(self.server.get_app())實現了從請求到應用函數執行,並把執行後的結果寫入wfile返回
咱們再看wsgiref.handlers中BaseHandler中,是如何實現的。

def run(self, application):
        """Invoke the application"""
        # Note to self: don't move the close()!  Asynchronous servers shouldn't
        # call close() from finish_response(), so if you close() anywhere but
        # the double-error branch here, you'll break asynchronous servers by
        # prematurely closing.  Async servers must return from 'run()' without
        # closing if there might still be output to iterate over.
        try:
            self.setup_environ()
            self.result = application(self.environ, self.start_response)
            self.finish_response()
        except:
            try:
                self.handle_error()
            except:
                # If we get an error handling an error, just give up already!
                self.close()
                raise   # ...and let the actual server figure it out.
                
    def start_response(self, status, headers,exc_info=None):
        """'start_response()' callable as specified by PEP 333"""

        if exc_info:
            try:
                if self.headers_sent:
                    # Re-raise original exception if headers sent
                    raise exc_info[0], exc_info[1], exc_info[2]
            finally:
                exc_info = None        # avoid dangling circular ref
        elif self.headers is not None:
            raise AssertionError("Headers already set!")

        assert type(status) is StringType,"Status must be a string"
        assert len(status)>=4,"Status must be at least 4 characters"
        assert int(status[:3]),"Status message must begin w/3-digit code"
        assert status[3]==" ", "Status message must have a space after code"
        if __debug__:
            for name,val in headers:
                assert type(name) is StringType,"Header names must be strings"
                assert type(val) is StringType,"Header values must be strings"
                assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
        self.status = status
        self.headers = self.headers_class(headers)
        return self.write

application接受了兩個參數,一個envrion, 和一個start_response的方法。所以下一步就是研究咱們寫的應用函數是如何被封裝成適配的application

相關文章
相關標籤/搜索