第三篇:白話tornado源碼之請求來了


轉自:http://www.cnblogs.com/wupeiqi/p/4540398.html


上一篇《白話tornado源碼之待請求階段》中介紹了tornado框架在客戶端請求之前所做的準備(下圖1、2部分),本質上就是創建了一個socket服務端,並進行了IP和端口的綁定,但是未執行 socket的accept方法,也就是未獲取客戶端請求信息。

概述

本篇就來詳細介紹tornado服務器(socket服務端)是如何接收用戶請求數據以及如果根據用戶請求的URL處理並返回數據,也就是上圖的3系列所有步驟,如上圖【start】是一個死循環,其中利用epoll監聽服務端socket句柄,一旦客戶端發送請求,則立即調用HttpServer對象的_handle_events方法來進行請求的處理。

對於整個3系列按照功能可以劃分爲四大部分:

  • 獲取用戶請求數據(上圖3.4)
  • 根據用戶請求URL進行路由匹配,從而使得某個方法處理具體的請求(上圖3.5~3.19)
  • 將處理後的數據返回給客戶端(上圖3.21~3.23)
  • 關閉客戶端socket(上圖3.24~3.26)

3.1、HTTPServer對象的_handle_events方法

此處代碼主要有三項任務:

  1、 socket.accept() 接收了客戶端請求。
  2、創建封裝了客戶端socket對象和IOLoop對象的IOStream實例(用於之後獲取或輸出數據)。
  3、創建HTTPConnection對象,其內容是實現整個功能的邏輯。

複製代碼
class HTTPServer(object):
def _handle_events(self, fd, events):
        while True:
            try:
                #======== 獲取客戶端請求 =========#
                connection, address = self._socket.accept() except socket.error, e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            if self.ssl_options is not None:
                assert ssl, "Python 2.6+ and OpenSSL required for SSL"
                try:
                    connection = ssl.wrap_socket(connection,
                                                 server_side=True,
                                                 do_handshake_on_connect=False,
                                                 **self.ssl_options)
                except ssl.SSLError, err:
                    if err.args[0] == ssl.SSL_ERROR_EOF:
                        return connection.close()
                    else:
                        raise
                except socket.error, err:
                    if err.args[0] == errno.ECONNABORTED:
                        return connection.close()
                    else:
                        raise
            try:
                #這是的條件是選擇https和http請求方式
                if self.ssl_options is not None:
                    stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
                else:
                    #將客戶端socket對象和IOLoop對象封裝到IOStream對象中
                    #IOStream用於從客戶端socket中讀取請求信息
                    stream = iostream.IOStream(connection, io_loop=self.io_loop) #創建HTTPConnection對象
                #address是客戶端IPdizhi
                #self.request_callback是Application對象,其中包含了:url映射關係和配置文件等..
                #so,HTTPConnection的構造函數就是下一步處理請求的位置了..
 HTTPConnection(stream, address, self.request_callback,self.no_keep_alive, self.xheaders) except:
                logging.error("Error in connection callback", exc_info=True)
複製代碼

3.2、IOStream的__init__方法

此處代碼主要兩項目任務:

  • 封裝客戶端socket和其他信息,以便之後執行該對象的其他方法獲取客戶端請求的數據和響應客戶信息
  • 將客戶端socket對象添加到epoll,並且指定當客戶端socket對象變化時,就去執行 IOStream的_handle_events方法(調用socket.send給用戶響應數據)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class  IOStream( object ):
     def  __init__( self , socket, io_loop = None , max_buffer_size = 104857600 ,
                  read_chunk_size = 4096 ):
         #客戶端socket對象
         self .socket  =  socket
         self .socket.setblocking( False )
         self .io_loop  =  io_loop  or  ioloop.IOLoop.instance()
         self .max_buffer_size  =  max_buffer_size
         self .read_chunk_size  =  read_chunk_size
         self ._read_buffer  =  collections.deque()
         self ._write_buffer  =  collections.deque()
         self ._write_buffer_frozen  =  False
         self ._read_delimiter  =  None
         self ._read_bytes  =  None
         self ._read_callback  =  None
         self ._write_callback  =  None
         self ._close_callback  =  None
         self ._connect_callback  =  None
         self ._connecting  =  False
         self ._state  =  self .io_loop.ERROR
         with stack_context.NullContext():
             #將客戶端socket句柄添加的epoll中,並將IOStream的_handle_events方法添加到 Start 的While循環中
             #Start 的While循環中監聽客戶端socket句柄的狀態,以便再最後調用IOStream的_handle_events方法把處理後的信息響應給用戶
             self .io_loop.add_handler( self .socket.fileno(),  self ._handle_events,  self ._state)

3.3、HTTPConnections的__init__方法

此處代碼主要兩項任務:

  • 獲取請求數據
  • 調用 _on_headers 繼續處理請求

對於獲取請求數據,其實就是執行IOStream的read_until函數來完成,其內部通過socket.recv(4096)方法獲取客戶端請求的數據,並以 【\r\n\r\n】作爲請求信息結束符(http請求頭和內容通過\r\n\r\n分割)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class  HTTPConnection( object ):
     
     def  __init__( self , stream, address, request_callback, no_keep_alive = False ,xheaders = False ):
         
         self .stream  =  stream     #stream是封裝了客戶端socket和IOLoop實例的IOStream對象
         self .address  =  address   #address是客戶端IP地址
         self .request_callback  =  request_callback     #request_callback是封裝了URL映射和配置文件的Application對象。
         self .no_keep_alive  =  no_keep_alive
         self .xheaders  =  xheaders
         self ._request  =  None
         self ._request_finished  =  False
         #獲取請求信息(請求頭和內容),然後執行 HTTPConnection的_on_headers方法繼續處理請求
         self ._header_callback  =  stack_context.wrap( self ._on_headers)
         self .stream.read_until( "\r\n\r\n" self ._header_callback)

請求數據格式:

GET / HTTP/1.1
Host: localhost:8888
Connection: keep-alive
Cache-Control: max-age=0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.118 Safari/537.36
Accept-Encoding: gzip, deflate, sdch
Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4
If-None-Match: "e02aa1b106d5c7c6a98def2b13005d5b84fd8dc8"

詳細代碼解析:

複製代碼
class IOStream(object):
                
    def read_until(self, delimiter, callback):
        """Call callback when we read the given delimiter."""
        assert not self._read_callback, "Already reading"
        #終止界定 \r\n\r\n
        self._read_delimiter = delimiter
        #回調函數,即:HTTPConnection的 _on_headers 方法
        self._read_callback = stack_context.wrap(callback)
        while True:
            #代碼概述:
            #先從socket中讀取信息並保存到buffer中
            #然後再讀取buffer中的數據,以其爲參數執行回調函數(HTTPConnection的 _on_headers 方法)
            #buffer其實是一個線程安裝的雙端隊列collections.deque
           
            #從buffer中讀取數據,並執行回調函數。
            #注意:首次執行時buffer中沒有數據
            if self._read_from_buffer():
                return
            self._check_closed()
            #從socket中讀取信息到buffer(線程安全的一個雙向消息隊列)
            if self._read_to_buffer() == 0:
                break
                
        self._add_io_state(self.io_loop.READ)
複製代碼
複製代碼
class IOStream(object):

    def _read_to_buffer(self):
        #省略部分代碼
        chunk = self._read_from_socket()
        self._read_buffer.append(chunk)
        return len(chunk)
        
    def _read_from_socket(self):
        #socket對象的recv函數接收數據
        
        #read_chunk_size在構造函數中默認設置爲:4096
        chunk = self.socket.recv(self.read_chunk_size)
        if not chunk:
            self.close()
            return None
        return chunk
複製代碼
複製代碼
class IOStream(object):

    def _read_from_buffer(self):
        """Attempts to complete the currently-pending read from the buffer.

        Returns True if the read was completed.
        """
        #構造函數中默認設置爲None
        if self._read_bytes:
            
            if self._read_buffer_size() >= self._read_bytes:
                num_bytes = self._read_bytes
                callback = self._read_callback
                self._read_callback = None
                self._read_bytes = None
                self._run_callback(callback, self._consume(num_bytes))
                return True
        #_read_delimiter的值爲 \r\n\r\n
        elif self._read_delimiter:
            #buffer列表首元素合併,合併詳細見_merge_prefix函數
            _merge_prefix(self._read_buffer, sys.maxint)
            #獲取 \r\n\r\n 所在 buffer 首元素的位置索引
            loc = self._read_buffer[0].find(self._read_delimiter)
            
            if loc != -1:
                #如果在請求中找到了 \r\n\r\n
                
                #self._read_callback 是HTTPConnection對象的 _on_headers 方法
                callback = self._read_callback
                delimiter_len = len(self._read_delimiter) #獲取  \r\n\r\n 的長度
                self._read_callback = None
                self._read_delimiter = None
                #============ 執行HTTPConnection對象的 _on_headers 方法 =============
                #self._consume(loc + delimiter_len)用來獲取 buffer 的首元素(請求的信息其實就被封裝到了buffer的首個元素中)
                self._run_callback(callback,self._consume(loc + delimiter_len))
                return True
        return False
複製代碼

3.4、HTTPConnnection的 _on_headers 方法(含3.5)

上述代碼主要有兩個任務:

  • 根據獲取的請求信息生成響應的請求頭鍵值對,並把信息封裝到HttpRequest對象中
  • 調用Application的__call__方法,繼續處理請求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class  HTTPConnection( object ):
     def  _on_headers( self , data):
         try :
             data  =  native_str(data.decode( 'latin1' ))
             eol  =  data.find( "\r\n" )
             #獲取請求的起始行數據,例如:GET / HTTP/1.1
             start_line  =  data[:eol]
             try :
                 #請求方式、請求地址、http版本號
                 method, uri, version  =  start_line.split( " " )
             except  ValueError:
                 raise  _BadRequestException( "Malformed HTTP request line" )
             if  not  version.startswith( "HTTP/" ):
                 raise  _BadRequestException( "Malformed HTTP version in HTTP Request-Line" )
             #把請求頭信息包裝到一個字典中。(不包括第一行)
             headers  =  httputil.HTTPHeaders.parse(data[eol:])
             
             #把請求信息封裝到一個HTTPRequest對象中
             #注意:self._request = HTTPRequest,
             #HTTPRequest中封裝了HTTPConnection
             #HTTPConnection中封裝了stream和application
             self ._request  =  HTTPRequest(connection = self , method = method, uri = uri, version = version,headers = headers, remote_ip = self .address[ 0 ])
             #從請求頭中獲取 Content-Length
             content_length  =  headers.get( "Content-Length" )
             if  content_length:
                 content_length  =  int (content_length)
                 if  content_length >  self .stream.max_buffer_size:
                     raise  _BadRequestException( "Content-Length too long" )
                 if  headers.get( "Expect" = =  "100-continue" :
                     self .stream.write( "HTTP/1.1 100 (Continue)\r\n\r\n" )
                 self .stream.read_bytes(content_length,  self ._on_request_body)
                 return
             #**************** 執行Application對象的 __call__ 方法,也就是路由系統的入口 *******************
             self .request_callback( self ._request)
         except  _BadRequestException, e:
             logging.info( "Malformed HTTP request from %s: %s" ,
                          self .address[ 0 ], e)
             self .stream.close()
             return
  HTTPRequest.__init__

3.6、Application的__call__方法(含3.7、3.8、3.9)

此處代碼主要有三個項任務:

  • 根據請求的url和封裝在Application對象中的url映射做匹配,獲取url所對應的Handler對象。ps:Handlers泛指繼承RequestHandler的類
  • 創建Handler對象,即:執行Handler的__init__方法
  • 執行Handler對象的 _execute 方法

注意:

1、執行Application的 __call__ 方法時,其參數request是HTTPRequest對象(其中封裝HTTPConnetion、Stream、Application對象、請求頭信息)

2、Handler泛指就是我們定義的用於處理請求的類並且她還繼承自RequestHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class  Application( object ):
 
     def  __call__( self , request):
         """Called by HTTPServer to execute the request."""
         transforms  =  [t(request)  for  in  self .transforms]
         handler  =  None
         args  =  []
         kwargs  =  {}
         #根據請求的目標主機,匹配主機模版對應的正則表達式和Handlers
         handlers  =  self ._get_host_handlers(request)
         if  not  handlers:
             handler  =  RedirectHandler(
                 self , request, url = "http://"  +  self .default_host  +  "/" )
         else :
             for  spec  in  handlers:
                 match  =  spec.regex.match(request.path)
                 if  match:
                     # None-safe wrapper around url_unescape to handle
                     # unmatched optional groups correctly
                     def  unquote(s):
                         if  is  None return  s
                         return  escape.url_unescape(s, encoding = None )
                     handler  =  spec.handler_class( self , request,  * * spec.kwargs)  #創建RquestHandler對象
                     # Pass matched groups to the handler.  Since
                     # match.groups() includes both named and unnamed groups,
                     # we want to use either groups or groupdict but not both.
                     # Note that args are passed as bytes so the handler can
                     # decide what encoding to use.
                     kwargs  =  dict ((k, unquote(v))
                                   for  (k, v)  in  match.groupdict().iteritems())
                     if  kwargs:
                         args  =  []
                     else :
                         args  =  [unquote(s)  for  in  match.groups()]
                     break
             if  not  handler:
                 handler  =  ErrorHandler( self , request, status_code = 404 )
 
         # In debug mode, re-compile templates and reload static files on every
         # request so you don't need to restart to see changes
         if  self .settings.get( "debug" ):
             if  getattr (RequestHandler,  "_templates" None ):
                 for  loader  in  RequestHandler._templates.values():
                     loader.reset()
             RequestHandler._static_hashes  =  {}
         #==== 執行RequestHandler的_execute方法 ====
         handler._execute(transforms,  * args,  * * kwargs)
         return  handler
  Application._get_host_handlers
  RequestHandler.__init__

上述過程中,首先根據請求的URL去路由規則中匹配,一旦匹配成功,則創建路由相對應的handler的實例。例如:如果請求 的url是【/index/11】則會創建IndexHandler實例,然後再執行該對象的 _execute 方法。由於所有的 xxxHandler 類是RequestHandler的派生類,所以會默認執行 RequestHandler的 _execute 方法。

3.10 RequestHandler的_execute方法 (含有3.11、3.12、3.13)

此處代碼主要有三項任務:

  • 擴展點,因爲self.prepare默認是空方法,所有可以在這裏被重寫
  • 通過反射執行Handler的get/post/put/delete等方法
  • 完成請求處理後,執行finish方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class  RequestHandler( object ):
 
     def  _execute( self , transforms,  * args,  * * kwargs):
         """Executes this request with the given output transforms."""
         self ._transforms  =  transforms
         with stack_context.ExceptionStackContext(
             self ._stack_context_handle_exception):
             if  self .request.method  not  in  self .SUPPORTED_METHODS:
                 raise  HTTPError( 405 )
             # If XSRF cookies are turned on, reject form submissions without
             # the proper cookie
             if  self .request.method  not  in  ( "GET" "HEAD" and  \
                self .application.settings.get( "xsrf_cookies" ):
                 self .check_xsrf_cookie()
             self .prepare()
             if  not  self ._finished:
                 #通過反射的方法,執行 RequestHandler 派生類的的 get、post、put方法
                 getattr ( self self .request.method.lower())( * args,  * * kwargs)
                 if  self ._auto_finish  and  not  self ._finished:
                     self .finish()

例:用戶發送get請求

class MyHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")
複製代碼
class RequestHandler(object):

    def write(self, chunk):
        assert not self._finished
        if isinstance(chunk, dict):
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "text/javascript; charset=UTF-8")
        chunk = _utf8(chunk)
        self._write_buffer.append(chunk)
複製代碼

上述在執行RequestHandler的write方法時,講數據保存在Handler對象的 _write_buffer 列表中,在之後執行finish時再講數據寫到IOStream對象的_write_buffer字段中,其類型是雙向隊列collections.deque()。

3.14、執行RequestHandler的finish

此段代碼主要有兩項任務:

  • 將用戶處理請求後返回的數據發送到IOStream的_write_buffer隊列中
  • 紀錄操作日誌
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class  RequestHandler:
 
    def  finish( self , chunk = None ):
        """Finishes this response, ending the HTTP request."""
        assert  not  self ._finished
        if  chunk  is  not  None self .write(chunk)
 
        if  not  self ._headers_written:
            if  ( self ._status_code  = =  200  and
                self .request.method  in  ( "GET" "HEAD" and
                "Etag"  not  in  self ._headers):
                hasher  =  hashlib.sha1()
                for  part  in  self ._write_buffer:
                    hasher.update(part)
                etag  =  '"%s"'  %  hasher.hexdigest()
                inm  =  self .request.headers.get( "If-None-Match" )
                if  inm  and  inm.find(etag) ! =  - 1 :
                    self ._write_buffer  =  []
                    self .set_status( 304 )
                else :
                    self .set_header( "Etag" , etag)
            if  "Content-Length"  not  in  self ._headers:
                content_length  =  sum ( len (part)  for  part  in  self ._write_buffer)
                self .set_header( "Content-Length" , content_length)
 
        if  hasattr ( self .request,  "connection" ):
            self .request.connection.stream.set_close_callback( None )
 
        if  not  self .application._wsgi:
            #將處理請求返回的數據發送到IOStream的_write_buffer隊列中
            self .flush(include_footers = True )
            self .request.finish()
            #紀錄日誌
            self ._log()
        self ._finished  =  True

3.15、執行RequestHandler的flush方法

此處代碼主要有一項任務:

  • 將處理請求返回的數據發送到IOStream的_write_buffer隊列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def  flush( self , include_footers = False ):
     """Flushes the current output buffer to the network."""
     if  self .application._wsgi:
         raise  Exception( "WSGI applications do not support flush()" )
 
     chunk  =  "".join( self ._write_buffer)
     self ._write_buffer  =  []
     if  not  self ._headers_written:
         self ._headers_written  =  True
         for  transform  in  self ._transforms:
             self ._headers, chunk  =  transform.transform_first_chunk(
                 self ._headers, chunk, include_footers)
         headers  =  self ._generate_headers()
     else :
         for  transform  in  self ._transforms:
             chunk  =  transform.transform_chunk(chunk, include_footers)
         headers  =  ""
 
     # Ignore the chunk and only write the headers for HEAD requests
     if  self .request.method  = =  "HEAD" :
         if  headers:  self .request.write(headers)
         return
 
     if  headers  or  chunk:
         #執行HTTPReqeust的write方法
         self .request.write(headers  +  chunk)
複製代碼
class HTTPRequest(object):
    def write(self, chunk):
        """Writes the given chunk to the response stream."""
        assert isinstance(chunk, str)
        #執行HTTPConnection的write方法
        self.connection.write(chunk)
複製代碼
複製代碼
class IOStream(object):
    def write(self, data, callback=None):
    
        self._check_closed()
        #將數據保存到collections.deque()類型的雙向隊列中_write_buffer中
        self._write_buffer.append(data)
        self._add_io_state(self.io_loop.WRITE)
        self._write_callback = stack_context.wrap(callback)
複製代碼

以上代碼執行完成之後,請求的處理基本上就完成了。下面就是等待監聽客戶端socket句柄的epoll觸發,然後執行IOStream的_handle_event方法來將 響應數據發送給客戶端。

3.20、執行RequestHandler的_log方法

此處代碼主要有一項任務:

  • 記錄操作日誌(利用logging模塊)
1
2
3
4
class  RequestHandler:
     def  _log( self ):
 
         self .application.log_request( self )
複製代碼
class Application:
    def log_request(self, handler):
        if "log_function" in self.settings:
            self.settings["log_function"](handler)
            return
        if handler.get_status() < 400:
            log_method = logging.info
        elif handler.get_status() < 500:
            log_method = logging.warning
        else:
            log_method = logging.error
        request_time = 1000.0 * handler.request.request_time()
        log_method("%d %s %.2fms", handler.get_status(),
                   handler._request_summary(), request_time)
複製代碼

3.21、IOStream的Handle_event方法

由於epoll中不但監聽了服務器socket句柄還監聽了客戶端sokcet句柄,所以當客戶端socket對象變化時,就會去調用之前指定的IOStream的_handler_events方法。

此段代碼主要有一項任務:

  • 將處理之後的響應數據發送給客戶端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class  IOStream( object ):
     def  _handle_events( self , fd, events):
         if  not  self .socket:
             logging.warning( "Got events for closed stream %d" , fd)
             return
         try :
             if  events &  self .io_loop.READ:
                 self ._handle_read()
             if  not  self .socket:
                 return
             if  events &  self .io_loop.WRITE:
                 if  self ._connecting:
                     self ._handle_connect()
                 #執行_handle_write方法,內部調用socket.send將數據響應給客戶端
                 self ._handle_write()
             if  not  self .socket:
                 return
             if  events &  self .io_loop.ERROR:
                 self .close()
                 return
             state  =  self .io_loop.ERROR
             if  self .reading():
                 state | =  self .io_loop.READ
             if  self .writing():
                 state | =  self .io_loop.WRITE
             if  state ! =  self ._state:
                 self ._state  =  state
                 self .io_loop.update_handler( self .socket.fileno(),  self ._state)
         except :
             logging.error( "Uncaught exception, closing connection." ,
                           exc_info = True )
             self .close()
             raise

3.22、IOStream的_handle_write方法

此段代碼主要有兩項任務:

  • 調用socket.send給客戶端發送響應數據
  • 執行回調函數HTTPConnection的_on_write_complete方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class  IOStream( object ):
     def  _handle_write( self ):
         while  self ._write_buffer:
             try :
                 if  not  self ._write_buffer_frozen:
                     _merge_prefix( self ._write_buffer,  128  *  1024 )
                 #調用客戶端socket對象的send方法發送數據
                 num_bytes  =  self .socket.send( self ._write_buffer[ 0 ])
                 self ._write_buffer_frozen  =  False
                 _merge_prefix( self ._write_buffer, num_bytes)
                 self ._write_buffer.popleft()
             except  socket.error, e:
                 if  e.args[ 0 in  (errno.EWOULDBLOCK, errno.EAGAIN):
                     self ._write_buffer_frozen  =  True
                     break
                 else :
相關文章
相關標籤/搜索