tornado 源碼分析 之 異步io的實現方式

前言

本文將嘗試詳細的帶你們一步步走完一個異步操做,從而瞭解tornado是如何實現異步io的.
其實本文是對[上一篇文][1]的實踐和複習
主旨在於關注異步io的實現,因此會忽略掉代碼中的一些異常處理.文字較多,湊合下吧

接下來只會貼出部分源碼,幫助理解,但願有耐心的同窗打開tornado源碼,一塊兒跟蹤一遍吧.

AsyncHTTPClient :

AsyncHTTPClient 繼承 Configurable ,從__new__重看出是單例模式.  
根據 Configurable 的__new__和 AsyncHTTPClient 的 configurable_base 和 configurable_default 得知,
實例化後必定是 SimpleAsyncHTTPClient 的實例

fetch

def fetch(self, request, callback=None, raise_error=True, **kwargs):

        if self._closed:
            raise RuntimeError("fetch() called on closed AsyncHTTPClient")
        if not isinstance(request, HTTPRequest):
            request = HTTPRequest(url=request, **kwargs)
        # We may modify this (to add Host, Accept-Encoding, etc),
        # so make sure we don't modify the caller's object.  This is also
        # where normal dicts get converted to HTTPHeaders objects.
        request.headers = httputil.HTTPHeaders(request.headers)
        request = _RequestProxy(request, self.defaults)
        future = TracebackFuture()
        if callback is not None:
            callback = stack_context.wrap(callback)

            def handle_future(future):
                exc = future.exception()
                if isinstance(exc, HTTPError) and exc.response is not None:
                    response = exc.response
                elif exc is not None:
                    response = HTTPResponse(
                        request, 599, error=exc,
                        request_time=time.time() - request.start_time)
                else:
                    response = future.result()
                self.io_loop.add_callback(callback, response)
            future.add_done_callback(handle_future)
        ##fetch_impl帶上handle_response,重點
        def handle_response(response):
            if raise_error and response.error:
                future.set_exception(response.error)
            else:
                future.set_result(response)
        self.fetch_impl(request, handle_response)
        return future
fetch 中調用 fetch_impl,fetch_impl 中其中一個參數是 callback ,而代碼中的 callback 包含了 future 的 set_result ,
因此,當 callback 被調用時,外部的 yield 操做將被激活,程序會在 ioloop 中調用此 callback ,而後回到原函數的 yield 處,
而且原函數返回這次 qeust 的 future 對象,以便在函數外部增長別的 callback

fetch_impl

def _connection_class(self):
        return _HTTPConnection

def _handle_request(self, request, release_callback, final_callback):
        self._connection_class()(
            self.io_loop, self, request, release_callback,
            final_callback, self.max_buffer_size, self.tcp_client,
            self.max_header_size, self.max_body_size)
在 return 以前,繼續查看 fetch_impl 內部是如何處理,根據推測,他必定是將繼續處理網絡請求,
確定會將網絡請求交由 ioloop 的 epoll 部分處理,設定好處理的 hanlder 再返回
 future.set_result ,接下來繼續分析 fetch_impl 內部是若是設置網絡請求的.
 fetch_impl 的實現代碼中查看,實例化中建立了 tcpclient 對象,這個確定是關鍵

根據以前的分析 SimpleAsyncHTTPClient 是單例模式,那他怎麼處理各類 http 請求呢?
查看代碼得知,他將請求的 request 和 callback 存儲在 self.queue 中,
每次 fetch_impl 的時候,一個個 pop 出來處理就行了,這樣就能處理n個請求了

一步步跟蹤到 _handle_request ,發現最後到了 _HTTPConnection 的實例化中了.
實例化的參數有以前那個包含 future 的 callback .
這樣就能夠保證 yield 操做能夠回到原處了.好了,繼續走

_HTTPConnection

class _HTTPConnection(httputil.HTTPMessageDelegate):
    _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])

    def __init__(self, io_loop, client, request, release_callback,
                 final_callback, max_buffer_size, tcp_client,
                 max_header_size, max_body_size):
        self.start_time = io_loop.time()
        self.io_loop = io_loop
        self.client = client
        self.request = request
        self.release_callback = release_callback
        self.final_callback = final_callback
        self.max_buffer_size = max_buffer_size
        self.tcp_client = tcp_client
        self.max_header_size = max_header_size
        self.max_body_size = max_body_size
        self.code = None
        self.headers = None
        self.chunks = []
        self._decompressor = None
        # Timeout handle returned by IOLoop.add_timeout
        self._timeout = None
        self._sockaddr = None
        with stack_context.ExceptionStackContext(self._handle_exception):
            self.parsed = urlparse.urlsplit(_unicode(self.request.url))
            if self.parsed.scheme not in ("http", "https"):
                raise ValueError("Unsupported url scheme: %s" %
                                 self.request.url)
            # urlsplit results have hostname and port results, but they
            # didn't support ipv6 literals until python 2.7.
            netloc = self.parsed.netloc
            if "@" in netloc:
                userpass, _, netloc = netloc.rpartition("@")
            host, port = httputil.split_host_and_port(netloc)
            if port is None:
                port = 443 if self.parsed.scheme == "https" else 80
            if re.match(r'^\[.*\]$', host):
                # raw ipv6 addresses in urls are enclosed in brackets
                host = host[1:-1]
            self.parsed_hostname = host  # save final host for _on_connect

            if request.allow_ipv6 is False:
                af = socket.AF_INET
            else:
                af = socket.AF_UNSPEC

            ssl_options = self._get_ssl_options(self.parsed.scheme)

            timeout = min(self.request.connect_timeout, self.request.request_timeout)
            if timeout:
                self._timeout = self.io_loop.add_timeout(
                    self.start_time + timeout,
                    stack_context.wrap(self._on_timeout))
            self.tcp_client.connect(host, port, af=af,
                                    ssl_options=ssl_options,
                                    max_buffer_size=self.max_buffer_size,
                                    callback=self._on_connect)
_HTTPConnection 的實例化中有一堆成員變量,有點暈,
先無論這麼多,關注咱們的 callback ,和 tcpclient .

一行行往下看,是 host 和 port 的初始化操做 ,http 和 https 是不同的嘛,固然得處理一下,

終於到了最後,是 tcpclient.connect ,從 connect 的參數中看到 callback=self._on_connect ,
應該是個重要的方法,出去那些字符串處理,發現 self.connection.write_headers(start_line ,  self.request.headers ) ,
這應該是發送 http 頭的操做吧,是網絡請求,因此這是處理 connect 這個 url 後,發送 http 頭的操做.

仍是回頭看看是如何 connect 的吧,由於這是異步的關鍵,搞懂了這個,那剩下來的也是同出一則

TCPClient

轉到 tcpclient 的代碼去看他的實例化和 connect 操做,看來剩下的路還很長呢
 TCPClient 實例化的代碼很短,有個 resolver 對象,先無論

connect

@gen.coroutine
    def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
                max_buffer_size=None):
        """Connect to the given host and port.

        Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
        ``ssl_options`` is not None).
        """
        addrinfo = yield self.resolver.resolve(host, port, af)
        connector = _Connector(
            addrinfo, self.io_loop,
            functools.partial(self._create_stream, max_buffer_size))
        af, addr, stream = yield connector.start()
        # TODO: For better performance we could cache the (af, addr)
        # information here and re-use it on subsequent connections to
        # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
        if ssl_options is not None:
            stream = yield stream.start_tls(False, ssl_options=ssl_options,
                                            server_hostname=host)
        raise gen.Return(stream)
去到 connect 方法裏,發現 coroutine 裝飾器,而且調用時設置了 callback=self._on_connect ,
因此當這個 coroutine 的 future 被解決時,會調用 self._on_connect ,
你也能夠看到 _on_connect 的參數是 stream ,就是 gen.Return(stream )傳過去的. 
由於 gen.coroutine 實現時的代碼中,
 send 了 value 後,代碼繼續走,走到 gen.Return (其實這是個 exception ,
就會走到 gen.coroutine 裏的 set_result 了.)

第一個 yield  右邊是 self.resolver.resolve ,左邊是 addrinfo ,是地址信息,
這個異步操做處理的即是解析 url 的地址信息.此處 tornado 默認使用了阻塞的實現,暫時先不看,
之後在新的篇幅補充,主要內容是 run_on_executor 裝飾器的內容,
此處實際上是同步返回的,由於默認用的是 BlockingResolver 的代碼,直接看下一個 yield

_Connector

def __init__(self, addrinfo, io_loop, connect):
        self.io_loop = io_loop
        self.connect = connect

        self.future = Future()
        self.timeout = None
        self.last_error = None
        self.remaining = len(addrinfo)
        self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
_Connector 實例化,參數有一個 callback ,是本類的 _create_stream ,
並把 self.connect 設置成傳過來的 callback 
因此 self.connect 就是 TCPClient._create_stream 了,
成員變量有個 future 實例,咱們須要全程高度關注 future 和 callback .

實例化後調用了 start 方法 ,start 內部,調用 try_connect,set_timout ,
根據函數名得知,是 connect 操做和設置超時的操做.最後返回實例化時建立的 future .

try_connect

def start(self, timeout=_INITIAL_CONNECT_TIMEOUT):
        self.try_connect(iter(self.primary_addrs))
        self.set_timout(timeout)
        return self.future

    def try_connect(self, addrs):
        try:
            af, addr = next(addrs)
        except StopIteration:
            # We've reached the end of our queue, but the other queue
            # might still be working.  Send a final error on the future
            # only when both queues are finished.
            if self.remaining == 0 and not self.future.done():
                self.future.set_exception(self.last_error or
                                          IOError("connection failed"))
            return
        future = self.connect(af, addr)
        future.add_done_callback(functools.partial(self.on_connect_done,
                                                   addrs, af, addr))
future  =  self.connect(af ,  addr ),執行了 TCPClient._create_stream 方法,
返回 future ,而且設置 future 的 callback=on_connect_done

_create_stream

def _create_stream(self, max_buffer_size, af, addr):
        # Always connect in plaintext; we'll convert to ssl if necessary
        # after one connection has completed.
        stream = IOStream(socket.socket(af),
                          io_loop=self.io_loop,
                          max_buffer_size=max_buffer_size)
        return stream.connect(addr)
實例化 IOStream ,執行並返回 stream.connect,stream.connect 返回的 future 即是 try_connect 中的 future ,
因此,進去看看 stream.connect 內部是怎麼」解決」這個 future 的.

IOStream

connect

def connect(self, address, callback=None, server_hostname=None):
        self._connecting = True
        if callback is not None:
            self._connect_callback = stack_context.wrap(callback)
            future = None
        else:
            future = self._connect_future = TracebackFuture()
        try:
            self.socket.connect(address)
        except socket.error as e:

            if (errno_from_exception(e) not in _ERRNO_INPROGRESS and
                    errno_from_exception(e) not in _ERRNO_WOULDBLOCK):
                if future is None:
                    gen_log.warning("Connect error on fd %s: %s",
                                    self.socket.fileno(), e)
                self.close(exc_info=True)
                return future
        self._add_io_state(self.io_loop.WRITE)
        return future
self._connecting  =  True  設置此實例正在鏈接中,鏈接完畢設置成 false 
若是沒有 callback 傳入,生成 future 對象, 剛纔返回的 future 記錄在這個實例的成員變量 self._connect_future 中.
而後執行 socket 的 connect 操做,由於 socket 設置成非阻塞,
因此此處會當即返回,不會阻塞,當鏈接成功時,緩衝區可寫,失敗時緩衝區可讀可寫.這是基礎知識,詳情百度.
而後調用 self._add_io_state ,返回 future

_add_io_state

def _add_io_state(self, state):

        if self.closed():
            # connection has been closed, so there can be no future events
            return
        if self._state is None:
            self._state = ioloop.IOLoop.ERROR | state
            with stack_context.NullContext():
                self.io_loop.add_handler(
                    self.fileno(), self._handle_events, self._state)
        elif not self._state & state:
            self._state = self._state | state
            self.io_loop.update_handler(self.fileno(), self._state)
終於到了這一步,要用 epoll 了!!!根據實例化的代碼得知 self._state=None ,
會走 self.io_loop.add_handler 這步,根據我以前發的[文章][2],會將當前的 fd ,當前實例的 _handle_events ,和寫,錯誤操做註冊到 epoll 中
接着!!!!!終於走完了這個 yield 的流程了!!!!!!

小總結:

請必定弄清 future 是怎麼傳遞的,每一個 future 管理的 callback 是什麼操做.
  _HTTPConnection  中  tcpclient.connect 一個 future   ,callback=self._on_connect  .
 他將在 raise   gen.Return(stream )時被添加到 ioloop 執行. 
  tcpclient.connect 內部的  connector.start 一個 future , 
  callback 是 on_connect_done ,他將在 poll 檢測到 write 事件時,被添加到 ioloop 執行

ioloop

def start(self):
        if self._running:
            raise RuntimeError("IOLoop is already running")
        self._setup_logging()
        if self._stopped:
            self._stopped = False
            return
        old_current = getattr(IOLoop._current, "instance", None)
        IOLoop._current.instance = self
        self._thread_ident = thread.get_ident()
        self._running = True

        old_wakeup_fd = None
        if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':

            try:
                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
                if old_wakeup_fd != -1:
                    signal.set_wakeup_fd(old_wakeup_fd)
                    old_wakeup_fd = None
            except ValueError:
                old_wakeup_fd = None

        try:
            while True:
                with self._callback_lock:
                    callbacks = self._callbacks
                    self._callbacks = []

                due_timeouts = []

                if self._timeouts:
                    now = self.time()
                    while self._timeouts:
                        if self._timeouts[0].callback is None:

                            heapq.heappop(self._timeouts)
                            self._cancellations -= 1
                        elif self._timeouts[0].deadline <= now:
                            due_timeouts.append(heapq.heappop(self._timeouts))
                        else:
                            break
                    if (self._cancellations > 512
                            and self._cancellations > (len(self._timeouts) >> 1)):

                        self._cancellations = 0
                        self._timeouts = [x for x in self._timeouts
                                          if x.callback is not None]
                        heapq.heapify(self._timeouts)

                for callback in callbacks:
                    self._run_callback(callback)
                for timeout in due_timeouts:
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)
                callbacks = callback = due_timeouts = timeout = None

                if self._callbacks:
                    poll_timeout = 0.0
                elif self._timeouts:

                    poll_timeout = self._timeouts[0].deadline - self.time()
                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
                else:

                    poll_timeout = _POLL_TIMEOUT

                if not self._running:
                    break

                if self._blocking_signal_threshold is not None:

                    signal.setitimer(signal.ITIMER_REAL, 0, 0)

                try:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:

                    if errno_from_exception(e) == errno.EINTR:
                        continue
                    else:
                        raise

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)

                self._events.update(event_pairs)
                while self._events:
                    fd, events = self._events.popitem()
                    try:
                        fd_obj, handler_func = self._handlers[fd]
                        handler_func(fd_obj, events)
                    except (OSError, IOError) as e:
                        if errno_from_exception(e) == errno.EPIPE:
                            # Happens when the client closes the connection
                            pass
                        else:
                            self.handle_callback_exception(self._handlers.get(fd))
                    except Exception:
                        self.handle_callback_exception(self._handlers.get(fd))
                fd_obj = handler_func = None

        finally:
            # reset the stopped flag so another start/stop pair can be issued
            self._stopped = False
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL, 0, 0)
            IOLoop._current.instance = old_current
            if old_wakeup_fd is not None:
                signal.set_wakeup_fd(old_wakeup_fd)
接下來 tornado 終於也回到了 ioloop 代碼中了(淚奔)!!當鏈接成功時,該 fd 的緩衝區可寫,
 epoll 收到 fd 的 write 操做通知~進入到了 epoll 的 loop 中處理.而後!回到剛纔註冊的 _handle_events 了!
注意這個 _handle_events 是 IOStream 的實例裏的 _handle_events ,他有剛纔咱們處理的全部信息哦~

接下來看 _handle_events 的代碼,看他若是解決掉 future

IOStream._handle_events

def _handle_events(self, fd, events):
        if self.closed():
            gen_log.warning("Got events for closed stream %s", fd)
            return
        try:
            if self._connecting:
                # Most IOLoops will report a write failed connect
                # with the WRITE event, but SelectIOLoop reports a
                # READ as well so we must check for connecting before
                # either.
                self._handle_connect()
            if self.closed():
                return
            if events & self.io_loop.READ:
                self._handle_read()
            if self.closed():
                return
            if events & self.io_loop.WRITE:
                self._handle_write()
            if self.closed():
                return
            if events & self.io_loop.ERROR:
                self.error = self.get_fd_error()
                # We may have queued up a user callback in _handle_read or
                # _handle_write, so don't close the IOStream until those
                # callbacks have had a chance to run.
                self.io_loop.add_callback(self.close)
                return
            state = self.io_loop.ERROR
            if self.reading():
                state |= self.io_loop.READ
            if self.writing():
                state |= self.io_loop.WRITE
            if state == self.io_loop.ERROR and self._read_buffer_size == 0:
                # If the connection is idle, listen for reads too so
                # we can tell if the connection is closed.  If there is
                # data in the read buffer we won't run the close callback
                # yet anyway, so we don't need to listen in this case.
                state |= self.io_loop.READ
            if state != self._state:
                assert self._state is not None, \
                    "shouldn't happen: _handle_events without self._state"
                self._state = state
                self.io_loop.update_handler(self.fileno(), self._state)
        except UnsatisfiableReadError as e:
            gen_log.info("Unsatisfiable read, closing connection: %s" % e)
            self.close(exc_info=True)
        except Exception:
            gen_log.error("Uncaught exception, closing connection.",
                          exc_info=True)
            self.close(exc_info=True)
            raise

    def _handle_connect(self):
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            self.error = socket.error(err, os.strerror(err))
            # IOLoop implementations may vary: some of them return
            # an error state before the socket becomes writable, so
            # in that case a connection failure would be handled by the
            # error path in _handle_events instead of here.
            if self._connect_future is None:
                gen_log.warning("Connect error on fd %s: %s",
                                self.socket.fileno(), errno.errorcode[err])
            self.close()
            return
        if self._connect_callback is not None:
            callback = self._connect_callback
            self._connect_callback = None
            self._run_callback(callback)
        if self._connect_future is not None:
            future = self._connect_future
            self._connect_future = None
            future.set_result(self)
        self._connecting = False
判斷是否在鏈接中,固然是了,剛纔我也強調過了,
而後進入 _handle_connect,_handle_connect 先判斷 connect 有沒成功,
成功了就是設置 _connect_future 的 result,set_result(self ),把 self(iostream )設置進去了!   
而後 _connect_future 的 callbacks 會在下一次循環被 ioloop 消化掉!!

一步步返回看,這個 future 正是咱們以前的那個 yiled 操做的右邊的返回的 future ,
因此剛纔 _Connector.try_connect 設置的 callback   ,on_connect_done 會在 ioloop 的 callback 裏執行. 
根據上一篇[文章][3]講的 coroutine 的源碼得知,此 future 裏還有 Runner.run 的 callback 哦~
因此 ,run 裏 send 了 vaule 到 gen . 
終於終於!!程序回到了剛纔的 yield 處了!!!!!

tornado正是如此實現異步io的

感受一直講完整個操做不太現實,剩下的你們仍是本身跟蹤吧,道理跟這個流程相似.
 yield 操做右邊,必定是返回一個 future (舊版本貌似是 YieldPoint ,由於沒看過舊版,因此不太清楚) ,
而後在返回 future 以前,設置好 fd 的 handler ,和其餘的解析工做,而後等待 epoll 檢測到關心的 io   event ,
在 io 的 handler 裏把 future 解決,從而回到 yield 處~ 核心就是 ioloop 三部分 ,future,gen.coroutine . 
相互配合完成異步操做. 跟蹤幾遍消化一下,就能夠寫 tornado 的擴展了. 

祝你們武運亨通
相關文章
相關標籤/搜索