tornao.iostream-io流

首先看tornado支持的讀操做。
python

read_until_regex(self, regex, callback=None, max_bytes=None):支持正則的讀取閉包

read_until(self, delimiter, callback=None, max_bytes=None):支持結束符的讀取app

read_bytes(self, num_bytes, callback=None, streaming_callback=None, partial=False):支持指定數量的讀取socket


read_bytes爲例,其他的也差很少ide

def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
                   partial=False):
        """Asynchronously read a number of bytes.

        If a ``streaming_callback`` is given, it will be called with chunks
        of data as they become available, and the final result will be empty.
        Otherwise, the result is all the data that was read.
        If a callback is given, it will be run with the data as an argument;
        if not, this method returns a `.Future`.

        If ``partial`` is true, the callback is run as soon as we have
        any bytes to return (but never more than ``num_bytes``)

        .. versionchanged:: 4.0
            Added the ``partial`` argument.  The callback argument is now
            optional and a `.Future` will be returned if it is omitted.
        """
        future = self._set_read_callback(callback)
        assert isinstance(num_bytes, numbers.Integral)
        self._read_bytes = num_bytes
        self._read_partial = partial
        self._streaming_callback = stack_context.wrap(streaming_callback)
        try:
            self._try_inline_read()
        except:
            if future is not None:
                future.add_done_callback(lambda f: f.exception())
            raise
        return future


,它首先調用_set_read_callback方法:函數

def _set_read_callback(self, callback):
        assert self._read_callback is None, "Already reading"
        assert self._read_future is None, "Already reading"
        if callback is not None:
            self._read_callback = stack_context.wrap(callback)
        else:
            self._read_future = TracebackFuture()
        return self._read_future

self._read_callback,self._read_future在__init__方法中初始化爲None,這兩個屬性會在_run_read_callback中實現回調機制。若是指定了callback,那麼就更新self._read_callback。若是沒有指定callback,就更self._read_future。tornado


而後更新讀取屬性,oop

read_bytes會更新self._read_bytes屬性。ui

read_until_regex會更新self._read_regex屬性。this

read_until會更新_read_delimite屬性。

這幾個屬性默認爲None,在每次讀取時會調用_read_from_buffer方法,從新初始化爲None。


而後調用_try_inline_read()方法,

def _try_inline_read(self):
        self._run_streaming_callback()
        pos = self._find_read_pos()
        if pos is not None:
            self._read_from_buffer(pos)
            return
        self._check_closed()
        try:
            pos = self._read_to_buffer_loop()
        except Exception:
            self._maybe_run_close_callback()
            raise
        if pos is not None:
            self._read_from_buffer(pos)
            return
        # We couldn't satisfy the read inline, so either close the stream
        # or listen for new data.
        if self.closed():
            self._maybe_run_close_callback()
        else:
            self._add_io_state(ioloop.IOLoop.READ)

首先它調用了_run_sreaming_callback,它當指定了stream_callback會被調用。

它優先從_read_buffer中尋找,而後將socket的可讀數據讀取到_read_buffer中,再重複上一步。

而後調用了_find_read_pos方法,負責從_read_buffer中,尋找知足讀條件的位置。若是找到,返回位置值。若是沒有,則返回None。


_read_from_buffer方法是在self._read_buffer有足夠的數據,纔會調用。它會重置一些屬性,而且會調用相應的回調函數。

def _read_from_buffer(self, pos):
        self._read_bytes = self._read_delimiter = self._read_regex = None
        self._read_partial = False
        self._run_read_callback(pos, False)

初始化讀取相關的屬性,而且調用_run_read_callback。

def _run_read_callback(self, size, streaming):
        if streaming:
            callback = self._streaming_callback
        else:
            callback = self._read_callback
            self._read_callback = self._streaming_callback = None
        if self._read_future is not None:
            assert callback is None
            future = self._read_future
            self._read_future = None
            future.set_result(self._consume(size))
        if callback is not None:
            assert self._read_future is None
            self._run_callback(callback, self._consume(size))
        else:
            # If we scheduled a callback, we will add the error listener
            # afterwards.  If we didn't, we have to do it now.
            self._maybe_add_error_listener()

_run_read_callback會根據_read_future和_read_callback是否爲None,決定怎麼返回結果,而且會初始化爲None。

若是_read_future不爲None,就調用future.set_result()返回結果。

若是_read_callback不爲None,就調用_run_callback。

_consume()方法就是從_read_buffer中讀取數據。

def _run_callback(self, callback, *args):
        def wrapper():
            self._pending_callbacks -= 1
            try:
                return callback(*args)
            except Exception:
                app_log.error("Uncaught exception, closing connection.",
                              exc_info=True)
                # Close the socket on an uncaught exception from a user callback
                # (It would eventually get closed when the socket object is
                # gc'd, but we don't want to rely on gc happening before we
                # run out of file descriptors)
                self.close(exc_info=True)
                # Re-raise the exception so that IOLoop.handle_callback_exception
                # can see it and log the error
                raise
            finally:
                self._maybe_add_error_listener()
        # We schedule callbacks to be run on the next IOLoop iteration
        # rather than running them directly for several reasons:
        # * Prevents unbounded stack growth when a callback calls an
        #   IOLoop operation that immediately runs another callback
        # * Provides a predictable execution context for e.g.
        #   non-reentrant mutexes
        # * Ensures that the try/except in wrapper() is run outside
        #   of the application's StackContexts
        with stack_context.NullContext():
            # stack_context was already captured in callback, we don't need to
            # capture it again for IOStream's wrapper.  This is especially
            # important if the callback was pre-wrapped before entry to
            # IOStream (as in HTTPConnection._header_callback), as we could
            # capture and leak the wrong context here.
            self._pending_callbacks += 1
            self.io_loop.add_callback(wrapper)

wrapper是一個閉包函數,主要添加callback執行時,出現異常的處理(輸出錯誤log,關閉鏈接)。

並且使用NullContext,上面的註釋也很清楚,就是由於已經在wrapper中有異常的處理,因此不須要stack_context的異常處理。


上面一段是講了_read_from_buffer的執行流程。回到_try_inline_read的裏面,它若是從_read_buffer中沒有讀到知足條件的數據,就會調用_read_to_buffer_loop。它的做用,就是循環的從socket中讀取數據到_read_buffer中,直到socket沒有數據,或者已經讀到知足條件的數據。

def _read_to_buffer_loop(self):
        # This method is called from _handle_read and _try_inline_read.
        try:
            if self._read_bytes is not None:
                target_bytes = self._read_bytes
            elif self._read_max_bytes is not None:
                target_bytes = self._read_max_bytes
            elif self.reading():
                # For read_until without max_bytes, or
                # read_until_close, read as much as we can before
                # scanning for the delimiter.
                target_bytes = None
            else:
                target_bytes = 0
            next_find_pos = 0
            # Pretend to have a pending callback so that an EOF in
            # _read_to_buffer doesn't trigger an immediate close
            # callback.  At the end of this method we'll either
            # establish a real pending callback via
            # _read_from_buffer or run the close callback.
            #
            # We need two try statements here so that
            # pending_callbacks is decremented before the `except`
            # clause below (which calls `close` and does need to
            # trigger the callback)
            self._pending_callbacks += 1
            while not self.closed():
                # Read from the socket until we get EWOULDBLOCK or equivalent.
                # SSL sockets do some internal buffering, and if the data is
                # sitting in the SSL object's buffer select() and friends
                # can't see it; the only way to find out if it's there is to
                # try to read it.
                if self._read_to_buffer() == 0:
                    break

                self._run_streaming_callback()

                # If we've read all the bytes we can use, break out of
                # this loop.  We can't just call read_from_buffer here
                # because of subtle interactions with the
                # pending_callback and error_listener mechanisms.
                #
                # If we've reached target_bytes, we know we're done.
                if (target_bytes is not None and
                        self._read_buffer_size >= target_bytes):
                    break

                # Otherwise, we need to call the more expensive find_read_pos.
                # It's inefficient to do this on every read, so instead
                # do it on the first read and whenever the read buffer
                # size has doubled.
                if self._read_buffer_size >= next_find_pos:
                    pos = self._find_read_pos()
                    if pos is not None:
                        return pos
                    next_find_pos = self._read_buffer_size * 2
            return self._find_read_pos()
        finally:
            self._pending_callbacks -= 1

self._pending_callbacks屬性,就是爲了延遲鏈接關閉。等到全部_pendding_callabcks回調完了,纔會真正的關閉鏈接。

若是_read_to_buffer_loop仍然不能讀取到知足條件的數據,就會在ioloop上登記此鏈接的READ事件,回調函數是_handle_event。_handle_event調用_handle_read處理可讀事件。若是有足夠的數據,就調用_read_from_buffer。若是沒有,就繼續監聽可讀事件。

相關文章
相關標籤/搜索