Tornado源碼探尋(請求到來)

上一篇中介紹了tornado框架在客戶端請求以前所作的準備(下圖一、2部分),本質上就是建立了一個socket服務端,並進行了IP和端口的綁定,可是未執行 socket的accept方法,也就是未獲取客戶端請求信息。javascript

 

概述:

本篇就來詳細介紹tornado服務器(socket服務端)是如何接收用戶請求數據以及如何根據用戶請求的URL處理並返回數據,也就是上圖的3系列全部步驟,如上圖【start】是一個死循環,其中利用epoll監聽服務端socket句柄,一旦客戶端發送請求,則當即調用HttpServer對象的_handle_events方法來進行請求的處理。html

對於整個3系列按照功能能夠劃分爲四大部分:java

  • 獲取用戶請求數據(上圖3.4)
  • 根據用戶請求URL進行路由匹配,從而使得某個方法處理具體的請求(上圖3.5~3.19)
  • 將處理後的數據返回給客戶端(上圖3.21~3.23)
  • 關閉客戶端socket(上圖3.24~3.26)

 

3.一、HTTPServer對象的_handle_events方法

此處代碼主要有三項任務:python

  一、 socket.accept() 接收了客戶端請求。
  二、建立封裝了客戶端socket對象和IOLoop對象的IOStream實例(用於以後獲取或輸出數據)。
  三、建立HTTPConnection對象,其內容是實現整個功能的邏輯。ios

class HTTPServer(object):
def _handle_events(self, fd, events):
        while True:
            try:
                #======== 獲取客戶端請求 =========#
                connection, address = self._socket.accept()
            except socket.error, e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            if self.ssl_options is not None:
                assert ssl, "Python 2.6+ and OpenSSL required for SSL"
                try:
                    connection = ssl.wrap_socket(connection,
                                                 server_side=True,
                                                 do_handshake_on_connect=False,
                                                 **self.ssl_options)
                except ssl.SSLError, err:
                    if err.args[0] == ssl.SSL_ERROR_EOF:
                        return connection.close()
                    else:
                        raise
                except socket.error, err:
                    if err.args[0] == errno.ECONNABORTED:
                        return connection.close()
                    else:
                        raise
            try:
                #這是的條件是選擇https和http請求方式
                if self.ssl_options is not None:
                    stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
                else:
                    #將客戶端socket對象和IOLoop對象封裝到IOStream對象中
                    #IOStream用於從客戶端socket中讀取請求信息
                    stream = iostream.IOStream(connection, io_loop=self.io_loop)
                #建立HTTPConnection對象
                #address是客戶端IPdizhi
                #self.request_callback是Application對象,其中包含了:url映射關係和配置文件等..
                #so,HTTPConnection的構造函數就是下一步處理請求的位置了..
                HTTPConnection(stream, address, self.request_callback,self.no_keep_alive, self.xheaders)
            except:
                logging.error("Error in connection callback", exc_info=True)

 

 

3.二、IOStream的__init__方法

此處代碼主要兩項目任務:web

  • 封裝客戶端socket和其餘信息,以便以後執行該對象的其餘方法獲取客戶端請求的數據和響應客戶信息
  • 將客戶端socket對象添加到epoll,而且指定當客戶端socket對象變化時,就去執行 IOStream的_handle_events方法(調用socket.send給用戶響應數據)

 

class IOStream(object):
    def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
                 read_chunk_size=4096):
        #客戶端socket對象
        self.socket = socket
        self.socket.setblocking(False)
        self.io_loop = io_loop or ioloop.IOLoop.instance()
        self.max_buffer_size = max_buffer_size
        self.read_chunk_size = read_chunk_size
        self._read_buffer = collections.deque()
        self._write_buffer = collections.deque()
        self._write_buffer_frozen = False
        self._read_delimiter = None
        self._read_bytes = None
        self._read_callback = None
        self._write_callback = None
        self._close_callback = None
        self._connect_callback = None
        self._connecting = False
        self._state = self.io_loop.ERROR
        with stack_context.NullContext():
            #將客戶端socket句柄添加的epoll中,並將IOStream的_handle_events方法添加到 Start 的While循環中
            #Start 的While循環中監聽客戶端socket句柄的狀態,以便再最後調用IOStream的_handle_events方法把處理後的信息響應給用戶
            self.io_loop.add_handler(self.socket.fileno(), self._handle_events, self._state)

 

3.三、HTTPConnections的__init__方法

此處代碼主要兩項任務:正則表達式

  • 獲取請求數據
  • 調用 _on_headers 繼續處理請求

對於獲取請求數據,其實就是執行IOStream的read_until函數來完成,其內部經過socket.recv(4096)方法獲取客戶端請求的數據,並以 【\r\n\r\n】做爲請求信息結束符(http請求頭和內容經過\r\n\r\n分割)。json

class HTTPConnection(object):
     
    def __init__(self, stream, address, request_callback, no_keep_alive=False,xheaders=False):
         
        self.stream = stream    #stream是封裝了客戶端socket和IOLoop實例的IOStream對象
        self.address = address  #address是客戶端IP地址
        self.request_callback = request_callback    #request_callback是封裝了URL映射和配置文件的Application對象。
        self.no_keep_alive = no_keep_alive
        self.xheaders = xheaders
        self._request = None
        self._request_finished = False
        #獲取請求信息(請求頭和內容),而後執行 HTTPConnection的_on_headers方法繼續處理請求
        self._header_callback = stack_context.wrap(self._on_headers)
        self.stream.read_until("\r\n\r\n", self._header_callback)

請求數據格式:安全

GET / HTTP/1.1 Host: localhost:8888 Connection: keep-alive Cache-Control: max-age=0 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.118 Safari/537.36 Accept-Encoding: gzip, deflate, sdch Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4 If-None-Match: "e02aa1b106d5c7c6a98def2b13005d5b84fd8dc8"

詳細代碼解析:服務器

class IOStream(object):
                
    def read_until(self, delimiter, callback):
        """Call callback when we read the given delimiter."""
        assert not self._read_callback, "Already reading"
        #終止界定 \r\n\r\n
        self._read_delimiter = delimiter
        #回調函數,即:HTTPConnection的 _on_headers 方法
        self._read_callback = stack_context.wrap(callback)
        while True:
            #代碼概述:
            #先從socket中讀取信息並保存到buffer中
            #而後再讀取buffer中的數據,以其爲參數執行回調函數(HTTPConnection的 _on_headers 方法)
            #buffer實際上是一個線程安裝的雙端隊列collections.deque
           
            #從buffer中讀取數據,並執行回調函數。
            #注意:首次執行時buffer中沒有數據
            if self._read_from_buffer():
                return
            self._check_closed()
            #從socket中讀取信息到buffer(線程安全的一個雙向消息隊列)
            if self._read_to_buffer() == 0:
                break
                
        self._add_io_state(self.io_loop.READ)
IOStream.read_until
class IOStream(object):

    def _read_to_buffer(self):
        #省略部分代碼
        chunk = self._read_from_socket()
        self._read_buffer.append(chunk)
        return len(chunk)
        
    def _read_from_socket(self):
        #socket對象的recv函數接收數據
        
        #read_chunk_size在構造函數中默認設置爲:4096
        chunk = self.socket.recv(self.read_chunk_size)
        if not chunk:
            self.close()
            return None
        return chunk
IOStream._read_to_buffer
class IOStream(object):

    def _read_from_buffer(self):
        """Attempts to complete the currently-pending read from the buffer.

        Returns True if the read was completed.
        """
        #構造函數中默認設置爲None
        if self._read_bytes:
            
            if self._read_buffer_size() >= self._read_bytes:
                num_bytes = self._read_bytes
                callback = self._read_callback
                self._read_callback = None
                self._read_bytes = None
                self._run_callback(callback, self._consume(num_bytes))
                return True
        #_read_delimiter的值爲 \r\n\r\n
        elif self._read_delimiter:
            #buffer列表首元素合併,合併詳細見_merge_prefix函數
            _merge_prefix(self._read_buffer, sys.maxint)
            #獲取 \r\n\r\n 所在 buffer 首元素的位置索引
            loc = self._read_buffer[0].find(self._read_delimiter)
            
            if loc != -1:
                #若是在請求中找到了 \r\n\r\n
                
                #self._read_callback 是HTTPConnection對象的 _on_headers 方法
                callback = self._read_callback
                delimiter_len = len(self._read_delimiter) #獲取  \r\n\r\n 的長度
                self._read_callback = None
                self._read_delimiter = None
                #============ 執行HTTPConnection對象的 _on_headers 方法 =============
                #self._consume(loc + delimiter_len)用來獲取 buffer 的首元素(請求的信息其實就被封裝到了buffer的首個元素中)
                self._run_callback(callback,self._consume(loc + delimiter_len))
                return True
        return False
IOStream._read_from_buffer

 

3.四、HTTPConnnection的 _on_headers 方法(含3.5)

 

上述代碼主要有兩個任務:

  • 根據獲取的請求信息生成響應的請求頭鍵值對,並把信息封裝到HttpRequest對象中
  • 調用Application的__call__方法,繼續處理請求
class HTTPConnection(object):
    def _on_headers(self, data):
        try:
            data = native_str(data.decode('latin1'))
            eol = data.find("\r\n")
            #獲取請求的起始行數據,例如:GET / HTTP/1.1
            start_line = data[:eol]
            try:
                #請求方式、請求地址、http版本號
                method, uri, version = start_line.split(" ")
            except ValueError:
                raise _BadRequestException("Malformed HTTP request line")
            if not version.startswith("HTTP/"):
                raise _BadRequestException("Malformed HTTP version in HTTP Request-Line")
            #把請求頭信息包裝到一個字典中。(不包括第一行)
            headers = httputil.HTTPHeaders.parse(data[eol:])
             
            #把請求信息封裝到一個HTTPRequest對象中
            #注意:self._request = HTTPRequest,
            #HTTPRequest中封裝了HTTPConnection
            #HTTPConnection中封裝了stream和application
            self._request = HTTPRequest(connection=self, method=method, uri=uri, version=version,headers=headers, remote_ip=self.address[0])
            #從請求頭中獲取 Content-Length
            content_length = headers.get("Content-Length")
            if content_length:
                content_length = int(content_length)
                if content_length > self.stream.max_buffer_size:
                    raise _BadRequestException("Content-Length too long")
                if headers.get("Expect") == "100-continue":
                    self.stream.write("HTTP/1.1 100 (Continue)\r\n\r\n")
                self.stream.read_bytes(content_length, self._on_request_body)
                return
            #**************** 執行Application對象的 __call__ 方法,也就是路由系統的入口 *******************
            self.request_callback(self._request)
        except _BadRequestException, e:
            logging.info("Malformed HTTP request from %s: %s",
                         self.address[0], e)
            self.stream.close()
            return
class HTTPRequest(object):

    def __init__(self, method, uri, version="HTTP/1.0", headers=None,
                 body=None, remote_ip=None, protocol=None, host=None,
                 files=None, connection=None):
        self.method = method
        self.uri = uri
        self.version = version
        self.headers = headers or httputil.HTTPHeaders()
        self.body = body or ""
        if connection and connection.xheaders:
            # Squid uses X-Forwarded-For, others use X-Real-Ip
            self.remote_ip = self.headers.get(
                "X-Real-Ip", self.headers.get("X-Forwarded-For", remote_ip))
            # AWS uses X-Forwarded-Proto
            self.protocol = self.headers.get(
                "X-Scheme", self.headers.get("X-Forwarded-Proto", protocol))
            if self.protocol not in ("http", "https"):
                self.protocol = "http"
        else:
            self.remote_ip = remote_ip
            if protocol:
                self.protocol = protocol
            elif connection and isinstance(connection.stream, 
                                           iostream.SSLIOStream):
                self.protocol = "https"
            else:
                self.protocol = "http"
        self.host = host or self.headers.get("Host") or "127.0.0.1"
        self.files = files or {}
        self.connection = connection
        self._start_time = time.time()
        self._finish_time = None

        scheme, netloc, path, query, fragment = urlparse.urlsplit(uri)
        self.path = path
        self.query = query
        arguments = cgi.parse_qs(query)
        self.arguments = {}
        for name, values in arguments.iteritems():
            values = [v for v in values if v]
            if values: self.arguments[name] = values
HTTPRequest.__init__

 

 

3.六、Application的__call__方法(含3.七、3.八、3.9)

此處代碼主要有三個項任務:

  • 根據請求的url和封裝在Application對象中的url映射作匹配,獲取url所對應的Handler對象。ps:Handlers泛指繼承RequestHandler的類
  • 建立Handler對象,即:執行Handler的__init__方法
  • 執行Handler對象的 _execute 方法

注意:

一、執行Application的 __call__ 方法時,其參數request是HTTPRequest對象(其中封裝HTTPConnetion、Stream、Application對象、請求頭信息)

二、Handler泛指就是咱們定義的用於處理請求的類而且她還繼承自RequestHandler

class Application(object):
 
    def __call__(self, request):
        """Called by HTTPServer to execute the request."""
        transforms = [t(request) for t in self.transforms]
        handler = None
        args = []
        kwargs = {}
        #根據請求的目標主機,匹配主機模版對應的正則表達式和Handlers
        handlers = self._get_host_handlers(request)
        if not handlers:
            handler = RedirectHandler(
                self, request, url="http://" + self.default_host + "/")
        else:
            for spec in handlers:
                match = spec.regex.match(request.path)
                if match:
                    # None-safe wrapper around url_unescape to handle
                    # unmatched optional groups correctly
                    def unquote(s):
                        if s is None: return s
                        return escape.url_unescape(s, encoding=None)
                    handler = spec.handler_class(self, request, **spec.kwargs) #建立RquestHandler對象
                    # Pass matched groups to the handler.  Since
                    # match.groups() includes both named and unnamed groups,
                    # we want to use either groups or groupdict but not both.
                    # Note that args are passed as bytes so the handler can
                    # decide what encoding to use.
                    kwargs = dict((k, unquote(v))
                                  for (k, v) in match.groupdict().iteritems())
                    if kwargs:
                        args = []
                    else:
                        args = [unquote(s) for s in match.groups()]
                    break
            if not handler:
                handler = ErrorHandler(self, request, status_code=404)
 
        # In debug mode, re-compile templates and reload static files on every
        # request so you don't need to restart to see changes
        if self.settings.get("debug"):
            if getattr(RequestHandler, "_templates", None):
                for loader in RequestHandler._templates.values():
                    loader.reset()
            RequestHandler._static_hashes = {}
        #==== 執行RequestHandler的_execute方法 ====
        handler._execute(transforms, *args, **kwargs)
        return handler
class Application(object):

    def _get_host_handlers(self, request):
        #將請求的host和handlers中的主機模型進行匹配
        host = request.host.lower().split(':')[0]
        for pattern, handlers in self.handlers:
            if pattern.match(host):
                return handlers
        # Look for default host if not behind load balancer (for debugging)
        if "X-Real-Ip" not in request.headers:
            for pattern, handlers in self.handlers:
                if pattern.match(self.default_host):
                    return handlers
        return None
Application._get_host_handlers
class RequestHandler(object):
    SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PUT", "OPTIONS")
 
    def __init__(self, application, request, **kwargs):
        self.application = application
        self.request = request
        self._headers_written = False
        self._finished = False
        self._auto_finish = True
        self._transforms = None  # will be set in _execute
        #獲取在application中設置的 ui_modules 和ui_method
        self.ui = _O((n, self._ui_method(m)) for n, m in
                     application.ui_methods.iteritems())
        self.ui["modules"] = _O((n, self._ui_module(n, m)) for n, m in
                                application.ui_modules.iteritems())
        self.clear() #設置服務器、內容類型編碼和鏈接
        # Check since connection is not available in WSGI
        #檢查鏈接是否可用,應該是長短鏈接有關。
        if hasattr(self.request, "connection"):
            self.request.connection.stream.set_close_callback(self.on_connection_close)
        self.initialize(**kwargs)
 
    def initialize(self):
        pass

    def clear(self):
        """Resets all headers and content for this response."""
        self._headers = {
            "Server": "TornadoServer/%s" % tornado.version,
            "Content-Type": "text/html; charset=UTF-8",
        }
        if not self.request.supports_http_1_1():
            if self.request.headers.get("Connection") == "Keep-Alive":
                self.set_header("Connection", "Keep-Alive")
        self._write_buffer = []
        self._status_code = 200
RequestHandler.__init__

上述過程當中,首先根據請求的URL去路由規則中匹配,一旦匹配成功,則建立路由相對應的handler的實例。例如:若是請求 的url是【/index/11】則會建立IndexHandler實例,而後再執行該對象的 _execute 方法。因爲全部的 xxxHandler 類是RequestHandler的派生類,因此會默認執行 RequestHandler的 _execute 方法。

 

3.10 RequestHandler的_execute方法 (含有3.十一、3.十二、3.13)

此處代碼主要有三項任務:

  • 擴展點,由於self.prepare默認是空方法,全部能夠在這裏被重寫
  • 經過反射執行Handler的get/post/put/delete等方法
  • 完成請求處理後,執行finish方法
class RequestHandler(object):
 
    def _execute(self, transforms, *args, **kwargs):
        """Executes this request with the given output transforms."""
        self._transforms = transforms
        with stack_context.ExceptionStackContext(
            self._stack_context_handle_exception):
            if self.request.method not in self.SUPPORTED_METHODS:
                raise HTTPError(405)
            # If XSRF cookies are turned on, reject form submissions without
            # the proper cookie
            if self.request.method not in ("GET", "HEAD") and \
               self.application.settings.get("xsrf_cookies"):
                self.check_xsrf_cookie()
            self.prepare()
            if not self._finished:
                #經過反射的方法,執行 RequestHandler 派生類的的 get、post、put方法
                getattr(self, self.request.method.lower())(*args, **kwargs)
                if self._auto_finish and not self._finished:
                    self.finish()

例:用戶發送get請求

class MyHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")
MyHandler.get
class RequestHandler(object):

    def write(self, chunk):
        assert not self._finished
        if isinstance(chunk, dict):
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "text/javascript; charset=UTF-8")
        chunk = _utf8(chunk)
        self._write_buffer.append(chunk)
RequestHandler.write

上述在執行RequestHandler的write方法時,將數據保存在Handler對象的 _write_buffer 列表中,在以後執行finish時再講數據寫到IOStream對象的_write_buffer字段中,其類型是雙向隊列collections.deque()。

 

3.1四、執行RequestHandler的finish

此段代碼主要有兩項任務:

  • 將用戶處理請求後返回的數據經過執行flush()方法發送到IOStream的_write_buffer隊列中
  • 紀錄操做日誌
class RequestHandler(object):
    def finish(self, chunk=None):
        """Finishes this response, ending the HTTP request."""
        if self._finished:
            raise RuntimeError("finish() called twice")

        if chunk is not None:
            self.write(chunk)
.
        if not self._headers_written:
            if (self._status_code == 200 and
                self.request.method in ("GET", "HEAD") and
                    "Etag" not in self._headers):
                self.set_etag_header()
                if self.check_etag_header():
                    self._write_buffer = []
                    self.set_status(304)
            if self._status_code == 304:
                assert not self._write_buffer, "Cannot send body with 304"
                self._clear_headers_for_304()
            elif "Content-Length" not in self._headers:
                content_length = sum(len(part) for part in self._write_buffer)
                self.set_header("Content-Length", content_length)

        if hasattr(self.request, "connection"):
            self.request.connection.set_close_callback(None)

        #執行flush方法,將處理請求返回的數據發送到IOStream的_write_buffer隊列中
        self.flush(include_footers=True)
        self.request.finish()
          #紀錄日誌
        self._log()
        self._finished = True
        self.on_finish()

        self.ui = None

 

3.1五、執行RequestHandler的flush方法

此處代碼主要有一項任務:

  • 將處理請求返回的數據發送到IOStream的_write_buffer隊列中
class RequestHandler(object):
    def flush(self, include_footers=False, callback=None):
       
        chunk = b"".join(self._write_buffer)
        self._write_buffer = []
        if not self._headers_written:
            self._headers_written = True
            for transform in self._transforms:
                self._status_code, self._headers, chunk = \
                    transform.transform_first_chunk(
                        self._status_code, self._headers,
                        chunk, include_footers)
            # Ignore the chunk and only write the headers for HEAD requests
            if self.request.method == "HEAD":
                chunk = None


            # is sent).
            if hasattr(self, "_new_cookie"):
                for cookie in self._new_cookie.values():
                    self.add_header("Set-Cookie", cookie.OutputString(None))

            start_line = httputil.ResponseStartLine('',
                                                    self._status_code,
                                                    self._reason)
             # 執行HTTP1Connection下的write——headers方法
            return self.request.connection.write_headers(
                start_line, self._headers, chunk, callback=callback)
        else:
            for transform in self._transforms:
                chunk = transform.transform_chunk(chunk, include_footers)
           
            # Ignore the chunk and only write the headers for HEAD requests
            if self.request.method != "HEAD":
                 # 執行HTTP1Connection下的write方法
                return self.request.connection.write(chunk, callback=callback)
            else:
                future = Future()
                future.set_result(None)
                return future            
RequestHandler.flush()
 1 class HTTP1Connection(httputil.HTTPConnection):
 2     def write_headers(self, start_line, headers, chunk=None, callback=None):
 3         else:
 4             if callback is not None:
 5                 self._write_callback = stack_context.wrap(callback)
 6             else:
 7                 future = self._write_future = Future()
 8             data = b"\r\n".join(lines) + b"\r\n\r\n"
 9             if chunk:
10                 data += self._format_chunk(chunk)
11             self._pending_write = self.stream.write(data)
12             self._pending_write.add_done_callback(self._on_write_complete)
13         return future
14 
15     def write(self, chunk, callback=None):
16         else:
17             if callback is not None:
18                 self._write_callback = stack_context.wrap(callback)
19             else:
20                 future = self._write_future = Future()
21             self._pending_write = self.stream.write(self._format_chunk(chunk))
22             self._pending_write.add_done_callback(self._on_write_complete)
23         return future
HTTP1Connection
class BaseIOStream(object):
    def write(self, data, callback=None):

        if data:
            for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
        #將數據保存到collections.deque()類型的雙向隊列中_write_buffer中
                self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
        if callback is not None:
             
            self._write_callback = stack_context.wrap(callback)
        if not self._connecting:
            self._handle_write()
            if self._write_buffer:
                self._add_io_state(self.io_loop.WRITE)
BaseIOStream

以上代碼執行完成以後,請求的處理基本上就完成了。下面就是等待監聽客戶端socket句柄的epoll觸發,而後執行IOStream的_handle_event方法來將 響應數據發送給客戶端。

 

3.20、執行RequestHandler的_log方法

此處代碼主要有一項任務:

  • 記錄操做日誌(利用logging模塊)
class RequestHandler:
    def _log(self):
 
        self.application.log_request(self)
class Application:
    def log_request(self, handler):
        if "log_function" in self.settings:
            self.settings["log_function"](handler)
            return
        if handler.get_status() < 400:
            log_method = logging.info
        elif handler.get_status() < 500:
            log_method = logging.warning
        else:
            log_method = logging.error
        request_time = 1000.0 * handler.request.request_time()
        log_method("%d %s %.2fms", handler.get_status(),
                   handler._request_summary(), request_time)
Application.log_request

 

 

3.2一、IOStream的Handle_event方法

因爲epoll中不但監聽了服務器socket句柄還監聽了客戶端sokcet句柄,因此當客戶端socket對象變化時,就會去調用以前指定的IOStream的_handler_events方法。

此段代碼主要有一項任務:

  • 將處理以後的響應數據發送給客戶端
class BaseIOStream(object):
    def _handle_events(self, fd, events):
        try:
            if self._connecting:
                self._handle_connect()
            if self.closed():
                return
            if events & self.io_loop.READ:
                self._handle_read()
            if self.closed():
                return
            if events & self.io_loop.WRITE:
                #執行_handle_write方法,內部調用socket.send將數據響應給客戶端
                self._handle_write()
            if self.closed():
                return
            if events & self.io_loop.ERROR:
                self.error = self.get_fd_error()
IOStream._handle_events

 

 

3.2二、IOStream的_handle_write方法

此段代碼主要有兩項任務:

  • 調用socket.send給客戶端發送響應數據
  • 執行回調函數HTTPConnection的_on_write_complete方法
class BaseIOStream(object):
    def _handle_write(self):
        while self._write_buffer:
            try:
                if not self._write_buffer_frozen:
                    _merge_prefix(self._write_buffer, 128 * 1024)
                #write_to_fd()內部調用客戶端socket對象的send方法發送數據
                num_bytes = self.write_to_fd(self._write_buffer[0])

        if not self._write_buffer:
            if self._write_callback:
                callback = self._write_callback
                self._write_callback = None
                 #執行回調函數關閉客戶端socket鏈接(HTTPConnection的_on_write_complete方法)
                self._run_callback(callback)
class IOStream(object):
    def _run_callback(self, callback, *args, **kwargs):
        try:
            with stack_context.NullContext():
                callback(*args, **kwargs)
        except:
            logging.error("Uncaught exception, closing connection.",
                          exc_info=True)
            self.close()
            raise
IOStream._run_callback

注:IOStream的_run_callback方法內部調用了HTTPConnection的_on_write_complete方法

 

3.2三、執行HTTPConnection的_on_write_complete方法

此處代碼主要有一項任務:

  • 更新客戶端socket所在epoll中的狀態爲【READ】,以便以後執行3.24時關閉socket客戶端。
class HTTPConnection(object):
 
    def _on_write_complete(self):
        if self._request_finished:
            self._finish_request()
 
    def _finish_request(self):
        if self.no_keep_alive:
            disconnect = True
        else:
            connection_header = self._request.headers.get("Connection")
            if self._request.supports_http_1_1():
                disconnect = connection_header == "close"
            elif ("Content-Length" in self._request.headers
                    or self._request.method in ("HEAD", "GET")):
                disconnect = connection_header != "Keep-Alive"
            else:
                disconnect = True
        self._request = None
        self._request_finished = False
        if disconnect:
            self.stream.close()
            return
        self.stream.read_until("\r\n\r\n", self._header_callback)
HTTP1Connection
class IOStream(object):
    def read_until(self, delimiter, callback):
        """Call callback when we read the given delimiter."""
        assert not self._read_callback, "Already reading"
        self._read_delimiter = delimiter
        self._read_callback = stack_context.wrap(callback)
        while True:
            # See if we've already got the data from a previous read
            if self._read_from_buffer():
                return
            self._check_closed()
            if self._read_to_buffer() == 0:
                break
        #更新爲READ
        self._add_io_state(self.io_loop.READ)
IOStream.read_until
class IOStream(object):
    def _add_io_state(self, state):
        if self.socket is None:
            # connection has been closed, so there can be no future events
            return
        if not self._state & state:
            self._state = self._state | state
            #執行IOLoop對象的update_handler方法
            self.io_loop.update_handler(self.socket.fileno(), self._state)
IOStream._add_io_state
class IOLoop(object):
    def update_handler(self, fd, events):
        """Changes the events we listen for fd."""
        #self._impl就是epoll對象
        self._impl.modify(fd, events | self.ERROR)
IOLoop.update_handler

 

 

3.2四、IOStream的_handle_write方法(含3.2五、3.26)

此段代碼主要有一項任務:

  • 關閉客戶端socket
class IOStream(object):
    def _handle_events(self, fd, events):
        if not self.socket:
            logging.warning("Got events for closed stream %d", fd)
            return
        try:
            #因爲在 2.23 步驟中已經將epoll的狀態更新爲READ,因此此次會執行_handle_read方法
            if events & self.io_loop.READ:
                self._handle_read()
            #執行完_handle_read後,客戶端socket被關閉且置空,全部此處就會執行return
            if not self.socket:
                return
            #===============================終止===========================
            if events & self.io_loop.WRITE:
                if self._connecting:
                    self._handle_connect()
                self._handle_write()
            if not self.socket:
                return
            if events & self.io_loop.ERROR:
                self.close()
                return
            state = self.io_loop.ERROR
            if self.reading():
                state |= self.io_loop.READ
            if self.writing():
                state |= self.io_loop.WRITE
            if state != self._state:
                self._state = state
                self.io_loop.update_handler(self.socket.fileno(), self._state)
        except:
            logging.error("Uncaught exception, closing connection.",
                          exc_info=True)
            self.close()
            raise
相關文章
相關標籤/搜索