asyncio 是如何利用事件循環來監控和處理io事件的,看源代碼:python
# asyncio.streams.py async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. """ if loop is None: loop = events.get_event_loop() reader = StreamReader(limit=limit, loop=loop) # 初始化 reader 實例 protocol = StreamReaderProtocol(reader, loop=loop) # 協議實例 而且對 reader 封裝,提供幾個方法來設置 reader 的讀通道等 transport, _ = await loop.create_connection( # 建立 tcp 鏈接, 返回寫通道 lambda: protocol, host, port, **kwds) writer = StreamWriter(transport, protocol, reader, loop) # 建立 writer 實例 return reader, writer
loop.create_connection
方法緩存
# asyncio.base_events.py async def create_connection( self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None): """Connect to a TCP server. Create a streaming transport connection to a given Internet host and port: socket family AF_INET or socket.AF_INET6 depending on host (or family if specified), socket type SOCK_STREAM. protocol_factory must be a callable returning a protocol instance. This method is a coroutine which will try to establish the connection in the background. When successful, the coroutine returns a (transport, protocol) pair. """ if server_hostname is not None and not ssl: raise ValueError('server_hostname is only meaningful with ssl') if server_hostname is None and ssl: # Use host as default for server_hostname. It is an error # if host is empty or not set, e.g. when an # already-connected socket was passed or when only a port # is given. To avoid this error, you can pass # server_hostname='' -- this will bypass the hostname # check. (This also means that if host is a numeric # IP/IPv6 address, we will attempt to verify that exact # address; this will probably fail, but it is possible to # create a certificate for a specific IP address, so we # don't judge it here.) if not host: raise ValueError('You must set server_hostname ' 'when using ssl without a host') server_hostname = host if ssl_handshake_timeout is not None and not ssl: # 只有在 ssl 模式下才能使用 timeout 參數 raise ValueError( 'ssl_handshake_timeout is only meaningful with ssl') if host is not None or port is not None: if sock is not None: # host/port 不能和 socket 同時傳遞 raise ValueError( 'host/port and sock can not be specified at the same time') infos = await self._ensure_resolved( # 根據 host port 獲取socket 信息包括協議族、類型、協議六、cname 、(ip,port) (host, port), family=family, type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) if not infos: raise OSError('getaddrinfo() returned empty list') if local_addr is not None: # 若是傳遞了本地地址 (ip, port) laddr_infos = await self._ensure_resolved( # 根據 ip, port 獲取 socket 信息 local_addr, family=family, type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) if not laddr_infos: raise OSError('getaddrinfo() returned empty list') exceptions = [] for family, type, proto, cname, address in infos: try: sock = socket.socket(family=family, type=type, proto=proto) # 建立socket套接字 sock.setblocking(False) # 設置非阻塞 socket if local_addr is not None: # 若是有傳遞本地ip及端口, 則開始監聽端口 for _, _, _, _, laddr in laddr_infos: try: sock.bind(laddr) break except OSError as exc: msg = ( f'error while attempting to bind on ' f'address {laddr!r}: ' f'{exc.strerror.lower()}' ) exc = OSError(exc.errno, msg) exceptions.append(exc) else: sock.close() sock = None continue if self._debug: logger.debug("connect %r to %r", sock, address) await self.sock_connect(sock, address) # 發出 Tcp 鏈接 except OSError as exc: if sock is not None: sock.close() exceptions.append(exc) except: if sock is not None: sock.close() raise else: break else: if len(exceptions) == 1: raise exceptions[0] else: # If they all have the same str(), raise one. model = str(exceptions[0]) if all(str(exc) == model for exc in exceptions): raise exceptions[0] # Raise a combined exception so the user can see all # the various error messages. raise OSError('Multiple exceptions: {}'.format( ', '.join(str(exc) for exc in exceptions))) else: if sock is None: raise ValueError( 'host and port was not specified and no sock specified') if sock.type != socket.SOCK_STREAM: # We allow AF_INET, AF_INET6, AF_UNIX as long as they # are SOCK_STREAM. # We support passing AF_UNIX sockets even though we have # a dedicated API for that: create_unix_connection. # Disallowing AF_UNIX in this method, breaks backwards # compatibility. raise ValueError( f'A Stream Socket was expected, got {sock!r}') # 建立鏈接通道,返回寫通道 #(_SelectorSocketTransport()._loop.call_soon(self._protocol.connection_made, self) 經過該函數綁定寫通道) # 上面 self._protocol 就是 protocol transport, protocol = await self._create_connection_transport( sock, protocol_factory, ssl, server_hostname, ssl_handshake_timeout=ssl_handshake_timeout) if self._debug: # Get the socket from the transport because SSL transport closes # the old socket and creates a new SSL socket sock = transport.get_extra_info('socket') logger.debug("%r connected to %s:%r: (%r, %r)", sock, host, port, transport, protocol) return transport, protocol # 返回 寫通道和協議對象
而後簡單看一下 StreamWriter
類:app
class StreamWriter: ... def write(self, data): self._transport.write(data) # 調用 通道的 write 方法, 將數據寫入到 通道緩存或直接發給socket async def wait_closed(self): await self._protocol._closed async def drain(self): # 等待緩存中的數據所有存儲完畢 """Flush the write buffer. The intended use is to write w.write(data) await w.drain() """ if self._reader is not None: exc = self._reader.exception() if exc is not None: raise exc if self._transport.is_closing(): await sleep(0, loop=self._loop) await self._protocol._drain_helper() # 建立 future 直到 取消暫停狀態
writer 只有 wait_closed
和 drain
兩個協程方法。異步
# asyncio.selector_events.BaseSelectorEventLoop def _add_writer(self, fd, callback, *args): self._check_closed() handle = events.Handle(callback, args, self, None) try: key = self._selector.get_key(fd) except KeyError: self._selector.register(fd, selectors.EVENT_WRITE, (None, handle)) else: mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_WRITE, (reader, handle)) if writer is not None: writer.cancel() # asyncio.selector_events._SelectorSocketTransport def write(self, data): ... # 省略的都是一些檢測 if not self._buffer: # 若是緩存是空,直接嘗試經過socket 發送數據 # Optimization: try to send now. try: n = self._sock.send(data) except (BlockingIOError, InterruptedError): pass except Exception as exc: self._fatal_error(exc, 'Fatal write error on socket transport') return else: data = data[n:] if not data: return # 若是直接經過 socket 發送了就結束 # Not all was written; register write handler. # 直接發送失敗 self._loop._add_writer(self._sock_fd, self._write_ready) # 建立寫處理器,修改 監聽已註冊的 fd 狀態或註冊 fd 寫事件 # Add it to the buffer. self._buffer.extend(data) # 若是有緩存 將數據存入緩存 self._maybe_pause_protocol() # 若是緩存大小 達到 64M (默認) 就設置 protocol 寫暫停 def _write_ready(self): # 有可寫的 fd 後的回調函數 assert self._buffer, 'Data should not be empty' if self._conn_lost: return try: n = self._sock.send(self._buffer) # 發送緩存中的數據 except (BlockingIOError, InterruptedError): pass except Exception as exc: self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on socket transport') if self._empty_waiter is not None: self._empty_waiter.set_exception(exc) else: if n: del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. # 解除寫暫停狀態 if not self._buffer: self._loop._remove_writer(self._sock_fd) if self._empty_waiter is not None: self._empty_waiter.set_result(None) if self._closing: self._call_connection_lost(None) elif self._eof: self._sock.shutdown(socket.SHUT_WR)
若是調用 StreamWriter.write
方法(通道的write)
首先嚐試直接發送
若發送失敗,將數據放入緩存區,建立事件處理器 而後註冊(或更新)selector 所監聽的 fd(若 selector 發現此fd可寫就將該事件處理器加入到這次事件循環中),若緩存區滿(> 64m)調用FlowControlMixin.pause_writing()
暫停寫狀態,在調用 writer 的 drain 方法刷新緩存區時建立 future 而後等待 future 完成,在緩存中有空位置後調用FlowControlMixin.resume_writing()
來解除暫停狀態並給 future 結果。那麼此時就能夠再次寫入數據到緩存中。socket
而後再看一下 StreamReader
類:async
class StreamReader(): ... def set_transport(self, transport): # 設置讀取通道 assert self._transport is None, 'Transport already set' self._transport = transport def feed_data(self, data): # 通道會調用 protocol.data_received 將 socket 數據傳遞給 feed_data, 而後存儲到 緩存中 assert not self._eof, 'feed_data after feed_eof' if not data: return self._buffer.extend(data) # 存入緩存 self._wakeup_waiter() # 喚醒協程 if (self._transport is not None and not self._paused and len(self._buffer) > 2 * self._limit): # 若是緩存大小超過了 limit 兩倍 暫停傳輸? try: self._transport.pause_reading() except NotImplementedError: # The transport can't be paused. # We'll just have to buffer all data. # Forget the transport so we don't keep trying. self._transport = None else: self._paused = True async def _wait_for_data(self, func_name): # 調用通道的 resume_reading 方法,而後等待 數據傳輸過來(等待 feed_data() or feed_eof() 被調用,self._waiter 就能夠結束等待了) """Wait until feed_data() or feed_eof() is called. If stream was paused, automatically resume it. """ if self._waiter is not None: raise RuntimeError( f'{func_name}() called while another coroutine is ' f'already waiting for incoming data') assert not self._eof, '_wait_for_data after EOF' # Waiting for data while paused will make deadlock, so prevent it. # This is essential for readexactly(n) for case when n > self._limit. if self._paused: # 若是如今是暫停狀態 self._paused = False # 修改成False self._transport.resume_reading() # 調用 通道的 resume_reading() 方法恢復讀取狀態,修改通道的暫停狀態, # 而後使用 selector (modify 修改已經註冊的該通道綁定的 fd )或(register註冊該 fd ) self._waiter = self._loop.create_future() try: await self._waiter finally: self._waiter = None async def read(self, n=-1): .... if not self._buffer and not self._eof: # 若是緩存中沒有數據,就等待 _wait_for_data await self._wait_for_data('read') ...
# asyncio.selector_events.BaseSelectorEventLoop def _add_reader(self, fd, callback, *args): self._check_closed() handle = events.Handle(callback, args, self, None) # 建立事件處理器,回調函數是從socket 或 緩存中讀取數據而後再存儲到 reader 的緩存中 try: key = self._selector.get_key(fd) except KeyError: self._selector.register(fd, selectors.EVENT_READ, (handle, None)) # 註冊 selector 監聽的 fd 並傳遞事件處理器, (註冊讀??) else: mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_READ, (handle, writer)) # 修改 selector 監聽的fd 並傳遞事件處理器, (註冊寫??) if reader is not None: reader.cancel() # selector_events._SelectorSocketTransport def resume_reading(self): if self._closing or not self._paused: return self._paused = False # 暫停狀態改成 False self._add_reader(self._sock_fd, self._read_ready) # 註冊或修改 fd 監聽事件 if self._loop.get_debug(): logger.debug("%r resumes reading", self)
拿read(10)
舉例(readline 同理)讀取10字節內容,若是當前緩存爲空而且沒有遇到結束符,await self._wait_for_data('read')
建立一個 future 等待,若是當前處於讀暫停
狀態那解除讀暫停
狀態,並調用通道的resume_reading
方法(_SelectorSocketTransport.resume_reading
)建立事件處理器(回調函數有兩種這裏說一種,_SelectorSocketTransport._read_ready__data_received
接收數據並存儲到reader的緩存中,喚醒_wait_for_data
建立的 future)來註冊(或更新)selector
監控的fd,若selector 監控到有可讀的fd,將該 fd 綁定的讀事件處理器加入當前事件循環中。tcp