asyncio 如何處理io事件

asyncio 是如何利用事件循環來監控和處理io事件的,看源代碼:python

# asyncio.streams.py
async def open_connection(host=None, port=None, *,
                          loop=None, limit=_DEFAULT_LIMIT, **kwds):
    """A wrapper for create_connection() returning a (reader, writer) pair.
    """
    if loop is None:
        loop = events.get_event_loop()
    reader = StreamReader(limit=limit, loop=loop)   # 初始化 reader 實例
    protocol = StreamReaderProtocol(reader, loop=loop)   # 協議實例 而且對 reader 封裝,提供幾個方法來設置 reader 的讀通道等
    transport, _ = await loop.create_connection(   # 建立 tcp 鏈接, 返回寫通道
        lambda: protocol, host, port, **kwds)
    writer = StreamWriter(transport, protocol, reader, loop)   # 建立 writer 實例
    return reader, writer

loop.create_connection 方法緩存

# asyncio.base_events.py
async def create_connection(
        self, protocol_factory, host=None, port=None,
        *, ssl=None, family=0,
        proto=0, flags=0, sock=None,
        local_addr=None, server_hostname=None,
        ssl_handshake_timeout=None):
    """Connect to a TCP server.

    Create a streaming transport connection to a given Internet host and
    port: socket family AF_INET or socket.AF_INET6 depending on host (or
    family if specified), socket type SOCK_STREAM. protocol_factory must be
    a callable returning a protocol instance.

    This method is a coroutine which will try to establish the connection
    in the background. When successful, the coroutine returns a
    (transport, protocol) pair.
    """
    if server_hostname is not None and not ssl:
        raise ValueError('server_hostname is only meaningful with ssl')

    if server_hostname is None and ssl:
        # Use host as default for server_hostname. It is an error
        # if host is empty or not set, e.g. when an
        # already-connected socket was passed or when only a port
        # is given. To avoid this error, you can pass
        # server_hostname='' -- this will bypass the hostname
        # check. (This also means that if host is a numeric
        # IP/IPv6 address, we will attempt to verify that exact
        # address; this will probably fail, but it is possible to
        # create a certificate for a specific IP address, so we
        # don't judge it here.)
        if not host:
            raise ValueError('You must set server_hostname '
                                'when using ssl without a host')
        server_hostname = host

    if ssl_handshake_timeout is not None and not ssl:  # 只有在 ssl 模式下才能使用 timeout 參數
        raise ValueError(
            'ssl_handshake_timeout is only meaningful with ssl')

    if host is not None or port is not None: 
        if sock is not None:  # host/port 不能和 socket 同時傳遞
            raise ValueError(
                'host/port and sock can not be specified at the same time')

        infos = await self._ensure_resolved(   # 根據 host port 獲取socket 信息包括協議族、類型、協議六、cname 、(ip,port)
            (host, port), family=family,
            type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
        if not infos:
            raise OSError('getaddrinfo() returned empty list')

        if local_addr is not None:      # 若是傳遞了本地地址 (ip, port)
            laddr_infos = await self._ensure_resolved(  # 根據 ip, port 獲取 socket 信息
                local_addr, family=family,
                type=socket.SOCK_STREAM, proto=proto,
                flags=flags, loop=self)
            if not laddr_infos:
                raise OSError('getaddrinfo() returned empty list')

        exceptions = []
        for family, type, proto, cname, address in infos:
            try:
                sock = socket.socket(family=family, type=type, proto=proto)   # 建立socket套接字
                sock.setblocking(False)                # 設置非阻塞 socket
                if local_addr is not None:          # 若是有傳遞本地ip及端口, 則開始監聽端口
                    for _, _, _, _, laddr in laddr_infos:
                        try:
                            sock.bind(laddr)
                            break
                        except OSError as exc:
                            msg = (
                                f'error while attempting to bind on '
                                f'address {laddr!r}: '
                                f'{exc.strerror.lower()}'
                            )
                            exc = OSError(exc.errno, msg)
                            exceptions.append(exc)
                    else:
                        sock.close()
                        sock = None
                        continue
                if self._debug:
                    logger.debug("connect %r to %r", sock, address)
                await self.sock_connect(sock, address)    # 發出 Tcp 鏈接
            except OSError as exc:
                if sock is not None:
                    sock.close()
                exceptions.append(exc)
            except:
                if sock is not None:
                    sock.close()
                raise
            else:
                break
        else:
            if len(exceptions) == 1:
                raise exceptions[0]
            else:
                # If they all have the same str(), raise one.
                model = str(exceptions[0])
                if all(str(exc) == model for exc in exceptions):
                    raise exceptions[0]
                # Raise a combined exception so the user can see all
                # the various error messages.
                raise OSError('Multiple exceptions: {}'.format(
                    ', '.join(str(exc) for exc in exceptions)))

    else:
        if sock is None:
            raise ValueError(
                'host and port was not specified and no sock specified')
        if sock.type != socket.SOCK_STREAM:
            # We allow AF_INET, AF_INET6, AF_UNIX as long as they
            # are SOCK_STREAM.
            # We support passing AF_UNIX sockets even though we have
            # a dedicated API for that: create_unix_connection.
            # Disallowing AF_UNIX in this method, breaks backwards
            # compatibility.
            raise ValueError(
                f'A Stream Socket was expected, got {sock!r}')

    # 建立鏈接通道,返回寫通道
    #(_SelectorSocketTransport()._loop.call_soon(self._protocol.connection_made, self) 經過該函數綁定寫通道)
    # 上面 self._protocol 就是 protocol 
    transport, protocol = await self._create_connection_transport(  
        sock, protocol_factory, ssl, server_hostname,
        ssl_handshake_timeout=ssl_handshake_timeout)
    if self._debug:
        # Get the socket from the transport because SSL transport closes
        # the old socket and creates a new SSL socket
        sock = transport.get_extra_info('socket')
        logger.debug("%r connected to %s:%r: (%r, %r)",
                        sock, host, port, transport, protocol)
    return transport, protocol     # 返回 寫通道和協議對象

異步處理寫事件

而後簡單看一下 StreamWriter 類:app

class StreamWriter:
    ...
    def write(self, data):
        self._transport.write(data)   
        # 調用 通道的 write 方法, 將數據寫入到 通道緩存或直接發給socket

    async def wait_closed(self):
        await self._protocol._closed

    async def drain(self):   # 等待緩存中的數據所有存儲完畢
        """Flush the write buffer.

        The intended use is to write

          w.write(data)
          await w.drain()
        """
        if self._reader is not None:
            exc = self._reader.exception()
            if exc is not None:
                raise exc
        if self._transport.is_closing():
            await sleep(0, loop=self._loop)
        await self._protocol._drain_helper()  # 建立 future 直到 取消暫停狀態

writer 只有 wait_closeddrain 兩個協程方法。異步

# asyncio.selector_events.BaseSelectorEventLoop
    def _add_writer(self, fd, callback, *args):
        self._check_closed()
        handle = events.Handle(callback, args, self, None)
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_WRITE,
                                    (None, handle))
        else:
            mask, (reader, writer) = key.events, key.data
            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
                                  (reader, handle))
            if writer is not None:
                writer.cancel()

# asyncio.selector_events._SelectorSocketTransport
   def write(self, data):
        ... # 省略的都是一些檢測
        if not self._buffer:   # 若是緩存是空,直接嘗試經過socket 發送數據
            # Optimization: try to send now.
            try:
                n = self._sock.send(data)
            except (BlockingIOError, InterruptedError):
                pass
            except Exception as exc:
                self._fatal_error(exc, 'Fatal write error on socket transport')
                return
            else:
                data = data[n:]
                if not data:
                    return   # 若是直接經過 socket 發送了就結束
            # Not all was written; register write handler.  # 直接發送失敗
            self._loop._add_writer(self._sock_fd, self._write_ready)  
            # 建立寫處理器,修改 監聽已註冊的 fd 狀態或註冊 fd 寫事件

        # Add it to the buffer.
        self._buffer.extend(data)  # 若是有緩存 將數據存入緩存
        self._maybe_pause_protocol()  # 若是緩存大小 達到 64M (默認) 就設置 protocol 寫暫停

    def _write_ready(self):   # 有可寫的 fd 後的回調函數
        assert self._buffer, 'Data should not be empty'

        if self._conn_lost:
            return
        try:
            n = self._sock.send(self._buffer)      # 發送緩存中的數據
        except (BlockingIOError, InterruptedError):
            pass
        except Exception as exc:
            self._loop._remove_writer(self._sock_fd)
            self._buffer.clear()
            self._fatal_error(exc, 'Fatal write error on socket transport')
            if self._empty_waiter is not None:
                self._empty_waiter.set_exception(exc)
        else:
            if n:
                del self._buffer[:n]
            self._maybe_resume_protocol() # May append to buffer.   # 解除寫暫停狀態
            if not self._buffer:
                self._loop._remove_writer(self._sock_fd)
                if self._empty_waiter is not None:
                    self._empty_waiter.set_result(None)
                if self._closing:
                    self._call_connection_lost(None)
                elif self._eof:
                    self._sock.shutdown(socket.SHUT_WR)

若是調用 StreamWriter.write 方法(通道的write)
首先嚐試直接發送
若發送失敗,將數據放入緩存區,建立事件處理器 而後註冊(或更新)selector 所監聽的 fd(若 selector 發現此fd可寫就將該事件處理器加入到這次事件循環中),若緩存區滿(> 64m)調用FlowControlMixin.pause_writing() 暫停寫狀態,在調用 writer 的 drain 方法刷新緩存區時建立 future 而後等待 future 完成,在緩存中有空位置後調用FlowControlMixin.resume_writing() 來解除暫停狀態並給 future 結果。那麼此時就能夠再次寫入數據到緩存中。socket

異步處理讀事件

而後再看一下 StreamReader 類:async

class StreamReader():
    ...
    def set_transport(self, transport):     # 設置讀取通道
        assert self._transport is None, 'Transport already set'
        self._transport = transport

    def feed_data(self, data):    # 通道會調用 protocol.data_received 將 socket 數據傳遞給 feed_data, 而後存儲到 緩存中
        assert not self._eof, 'feed_data after feed_eof'

        if not data:
            return

        self._buffer.extend(data)  # 存入緩存
        self._wakeup_waiter()   # 喚醒協程

        if (self._transport is not None and
                not self._paused and
                len(self._buffer) > 2 * self._limit):  # 若是緩存大小超過了 limit 兩倍 暫停傳輸?
            try:
                self._transport.pause_reading()
            except NotImplementedError:
                # The transport can't be paused.
                # We'll just have to buffer all data.
                # Forget the transport so we don't keep trying.
                self._transport = None
            else:
                self._paused = True

    async def _wait_for_data(self, func_name): 
    # 調用通道的 resume_reading 方法,而後等待 數據傳輸過來(等待 feed_data() or feed_eof() 被調用,self._waiter 就能夠結束等待了)
        """Wait until feed_data() or feed_eof() is called.

        If stream was paused, automatically resume it.
        """
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')

        assert not self._eof, '_wait_for_data after EOF'

        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:      # 若是如今是暫停狀態
            self._paused = False  # 修改成False
            self._transport.resume_reading()  
            # 調用 通道的 resume_reading() 方法恢復讀取狀態,修改通道的暫停狀態,
            # 而後使用 selector (modify 修改已經註冊的該通道綁定的 fd )或(register註冊該 fd )

        self._waiter = self._loop.create_future()
        try:
            await self._waiter
        finally:
            self._waiter = None

    async def read(self, n=-1):
        ....
        if not self._buffer and not self._eof:  # 若是緩存中沒有數據,就等待 _wait_for_data
            await self._wait_for_data('read')
        ...
# asyncio.selector_events.BaseSelectorEventLoop
    def _add_reader(self, fd, callback, *args):
        self._check_closed()
        handle = events.Handle(callback, args, self, None)   
        # 建立事件處理器,回調函數是從socket 或 緩存中讀取數據而後再存儲到 reader 的緩存中
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_READ,
                                    (handle, None))  # 註冊 selector 監聽的 fd 並傳遞事件處理器, (註冊讀??)
        else:
            mask, (reader, writer) = key.events, key.data
            self._selector.modify(fd, mask | selectors.EVENT_READ,
                                  (handle, writer))   # 修改 selector 監聽的fd 並傳遞事件處理器, (註冊寫??)
            if reader is not None:
                reader.cancel()

# selector_events._SelectorSocketTransport
    def resume_reading(self):
        if self._closing or not self._paused:
            return
        self._paused = False   # 暫停狀態改成 False
        self._add_reader(self._sock_fd, self._read_ready)   # 註冊或修改 fd  監聽事件
        if self._loop.get_debug():
            logger.debug("%r resumes reading", self)

read(10)舉例(readline 同理)讀取10字節內容,若是當前緩存爲空而且沒有遇到結束符,await self._wait_for_data('read') 建立一個 future 等待,若是當前處於讀暫停 狀態那解除讀暫停狀態,並調用通道的resume_reading方法(_SelectorSocketTransport.resume_reading)建立事件處理器(回調函數有兩種這裏說一種,_SelectorSocketTransport._read_ready__data_received 接收數據並存儲到reader的緩存中,喚醒_wait_for_data 建立的 future)來註冊(或更新)selector 監控的fd,若selector 監控到有可讀的fd,將該 fd 綁定的讀事件處理器加入當前事件循環中。tcp

相關文章
相關標籤/搜索