首先看tornado支持的讀操做。
python
read_until_regex(self, regex, callback=None, max_bytes=None):支持正則的讀取閉包
read_until(self, delimiter, callback=None, max_bytes=None):支持結束符的讀取app
read_bytes(self, num_bytes, callback=None, streaming_callback=None, partial=False):支持指定數量的讀取socket
以read_bytes爲例,其他的也差很少ide
def read_bytes(self, num_bytes, callback=None, streaming_callback=None, partial=False): """Asynchronously read a number of bytes. If a ``streaming_callback`` is given, it will be called with chunks of data as they become available, and the final result will be empty. Otherwise, the result is all the data that was read. If a callback is given, it will be run with the data as an argument; if not, this method returns a `.Future`. If ``partial`` is true, the callback is run as soon as we have any bytes to return (but never more than ``num_bytes``) .. versionchanged:: 4.0 Added the ``partial`` argument. The callback argument is now optional and a `.Future` will be returned if it is omitted. """ future = self._set_read_callback(callback) assert isinstance(num_bytes, numbers.Integral) self._read_bytes = num_bytes self._read_partial = partial self._streaming_callback = stack_context.wrap(streaming_callback) try: self._try_inline_read() except: if future is not None: future.add_done_callback(lambda f: f.exception()) raise return future
,它首先調用_set_read_callback方法:函數
def _set_read_callback(self, callback): assert self._read_callback is None, "Already reading" assert self._read_future is None, "Already reading" if callback is not None: self._read_callback = stack_context.wrap(callback) else: self._read_future = TracebackFuture() return self._read_future
self._read_callback,self._read_future在__init__方法中初始化爲None,這兩個屬性會在_run_read_callback中實現回調機制。若是指定了callback,那麼就更新self._read_callback。若是沒有指定callback,就更self._read_future。tornado
而後更新讀取屬性,oop
read_bytes會更新self._read_bytes屬性。ui
read_until_regex會更新self._read_regex屬性。this
read_until會更新_read_delimite屬性。
這幾個屬性默認爲None,在每次讀取時會調用_read_from_buffer方法,從新初始化爲None。
而後調用_try_inline_read()方法,
def _try_inline_read(self): self._run_streaming_callback() pos = self._find_read_pos() if pos is not None: self._read_from_buffer(pos) return self._check_closed() try: pos = self._read_to_buffer_loop() except Exception: self._maybe_run_close_callback() raise if pos is not None: self._read_from_buffer(pos) return # We couldn't satisfy the read inline, so either close the stream # or listen for new data. if self.closed(): self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ)
首先它調用了_run_sreaming_callback,它當指定了stream_callback會被調用。
它優先從_read_buffer中尋找,而後將socket的可讀數據讀取到_read_buffer中,再重複上一步。
而後調用了_find_read_pos方法,負責從_read_buffer中,尋找知足讀條件的位置。若是找到,返回位置值。若是沒有,則返回None。
_read_from_buffer方法是在self._read_buffer有足夠的數據,纔會調用。它會重置一些屬性,而且會調用相應的回調函數。
def _read_from_buffer(self, pos): self._read_bytes = self._read_delimiter = self._read_regex = None self._read_partial = False self._run_read_callback(pos, False)
初始化讀取相關的屬性,而且調用_run_read_callback。
def _run_read_callback(self, size, streaming): if streaming: callback = self._streaming_callback else: callback = self._read_callback self._read_callback = self._streaming_callback = None if self._read_future is not None: assert callback is None future = self._read_future self._read_future = None future.set_result(self._consume(size)) if callback is not None: assert self._read_future is None self._run_callback(callback, self._consume(size)) else: # If we scheduled a callback, we will add the error listener # afterwards. If we didn't, we have to do it now. self._maybe_add_error_listener()
_run_read_callback會根據_read_future和_read_callback是否爲None,決定怎麼返回結果,而且會初始化爲None。
若是_read_future不爲None,就調用future.set_result()返回結果。
若是_read_callback不爲None,就調用_run_callback。
_consume()方法就是從_read_buffer中讀取數據。
def _run_callback(self, callback, *args): def wrapper(): self._pending_callbacks -= 1 try: return callback(*args) except Exception: app_log.error("Uncaught exception, closing connection.", exc_info=True) # Close the socket on an uncaught exception from a user callback # (It would eventually get closed when the socket object is # gc'd, but we don't want to rely on gc happening before we # run out of file descriptors) self.close(exc_info=True) # Re-raise the exception so that IOLoop.handle_callback_exception # can see it and log the error raise finally: self._maybe_add_error_listener() # We schedule callbacks to be run on the next IOLoop iteration # rather than running them directly for several reasons: # * Prevents unbounded stack growth when a callback calls an # IOLoop operation that immediately runs another callback # * Provides a predictable execution context for e.g. # non-reentrant mutexes # * Ensures that the try/except in wrapper() is run outside # of the application's StackContexts with stack_context.NullContext(): # stack_context was already captured in callback, we don't need to # capture it again for IOStream's wrapper. This is especially # important if the callback was pre-wrapped before entry to # IOStream (as in HTTPConnection._header_callback), as we could # capture and leak the wrong context here. self._pending_callbacks += 1 self.io_loop.add_callback(wrapper)
wrapper是一個閉包函數,主要添加callback執行時,出現異常的處理(輸出錯誤log,關閉鏈接)。
並且使用NullContext,上面的註釋也很清楚,就是由於已經在wrapper中有異常的處理,因此不須要stack_context的異常處理。
上面一段是講了_read_from_buffer的執行流程。回到_try_inline_read的裏面,它若是從_read_buffer中沒有讀到知足條件的數據,就會調用_read_to_buffer_loop。它的做用,就是循環的從socket中讀取數據到_read_buffer中,直到socket沒有數據,或者已經讀到知足條件的數據。
def _read_to_buffer_loop(self): # This method is called from _handle_read and _try_inline_read. try: if self._read_bytes is not None: target_bytes = self._read_bytes elif self._read_max_bytes is not None: target_bytes = self._read_max_bytes elif self.reading(): # For read_until without max_bytes, or # read_until_close, read as much as we can before # scanning for the delimiter. target_bytes = None else: target_bytes = 0 next_find_pos = 0 # Pretend to have a pending callback so that an EOF in # _read_to_buffer doesn't trigger an immediate close # callback. At the end of this method we'll either # establish a real pending callback via # _read_from_buffer or run the close callback. # # We need two try statements here so that # pending_callbacks is decremented before the `except` # clause below (which calls `close` and does need to # trigger the callback) self._pending_callbacks += 1 while not self.closed(): # Read from the socket until we get EWOULDBLOCK or equivalent. # SSL sockets do some internal buffering, and if the data is # sitting in the SSL object's buffer select() and friends # can't see it; the only way to find out if it's there is to # try to read it. if self._read_to_buffer() == 0: break self._run_streaming_callback() # If we've read all the bytes we can use, break out of # this loop. We can't just call read_from_buffer here # because of subtle interactions with the # pending_callback and error_listener mechanisms. # # If we've reached target_bytes, we know we're done. if (target_bytes is not None and self._read_buffer_size >= target_bytes): break # Otherwise, we need to call the more expensive find_read_pos. # It's inefficient to do this on every read, so instead # do it on the first read and whenever the read buffer # size has doubled. if self._read_buffer_size >= next_find_pos: pos = self._find_read_pos() if pos is not None: return pos next_find_pos = self._read_buffer_size * 2 return self._find_read_pos() finally: self._pending_callbacks -= 1
self._pending_callbacks屬性,就是爲了延遲鏈接關閉。等到全部_pendding_callabcks回調完了,纔會真正的關閉鏈接。
若是_read_to_buffer_loop仍然不能讀取到知足條件的數據,就會在ioloop上登記此鏈接的READ事件,回調函數是_handle_event。_handle_event調用_handle_read處理可讀事件。若是有足夠的數據,就調用_read_from_buffer。若是沒有,就繼續監聽可讀事件。