轉自:http://www.cnblogs.com/wupeiqi/p/4540398.html
上一篇《白話tornado源碼之待請求階段》中介紹了tornado框架在客戶端請求之前所做的準備(下圖1、2部分),本質上就是創建了一個socket服務端,並進行了IP和端口的綁定,但是未執行 socket的accept方法,也就是未獲取客戶端請求信息。
本篇就來詳細介紹tornado服務器(socket服務端)是如何接收用戶請求數據以及如果根據用戶請求的URL處理並返回數據,也就是上圖的3系列所有步驟,如上圖【start】是一個死循環,其中利用epoll監聽服務端socket句柄,一旦客戶端發送請求,則立即調用HttpServer對象的_handle_events方法來進行請求的處理。
對於整個3系列按照功能可以劃分爲四大部分:
此處代碼主要有三項任務:
1、 socket.accept() 接收了客戶端請求。
2、創建封裝了客戶端socket對象和IOLoop對象的IOStream實例(用於之後獲取或輸出數據)。
3、創建HTTPConnection對象,其內容是實現整個功能的邏輯。
class HTTPServer(object): def _handle_events(self, fd, events): while True: try: #======== 獲取客戶端請求 =========# connection, address = self._socket.accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl.wrap_socket(connection, server_side=True, do_handshake_on_connect=False, **self.ssl_options) except ssl.SSLError, err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error, err: if err.args[0] == errno.ECONNABORTED: return connection.close() else: raise try: #這是的條件是選擇https和http請求方式 if self.ssl_options is not None: stream = iostream.SSLIOStream(connection, io_loop=self.io_loop) else: #將客戶端socket對象和IOLoop對象封裝到IOStream對象中 #IOStream用於從客戶端socket中讀取請求信息 stream = iostream.IOStream(connection, io_loop=self.io_loop) #創建HTTPConnection對象 #address是客戶端IPdizhi #self.request_callback是Application對象,其中包含了:url映射關係和配置文件等.. #so,HTTPConnection的構造函數就是下一步處理請求的位置了.. HTTPConnection(stream, address, self.request_callback,self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True)
此處代碼主要兩項目任務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class
IOStream(
object
):
def
__init__(
self
, socket, io_loop
=
None
, max_buffer_size
=
104857600
,
read_chunk_size
=
4096
):
#客戶端socket對象
self
.socket
=
socket
self
.socket.setblocking(
False
)
self
.io_loop
=
io_loop
or
ioloop.IOLoop.instance()
self
.max_buffer_size
=
max_buffer_size
self
.read_chunk_size
=
read_chunk_size
self
._read_buffer
=
collections.deque()
self
._write_buffer
=
collections.deque()
self
._write_buffer_frozen
=
False
self
._read_delimiter
=
None
self
._read_bytes
=
None
self
._read_callback
=
None
self
._write_callback
=
None
self
._close_callback
=
None
self
._connect_callback
=
None
self
._connecting
=
False
self
._state
=
self
.io_loop.ERROR
with stack_context.NullContext():
#將客戶端socket句柄添加的epoll中,並將IOStream的_handle_events方法添加到 Start 的While循環中
#Start 的While循環中監聽客戶端socket句柄的狀態,以便再最後調用IOStream的_handle_events方法把處理後的信息響應給用戶
self
.io_loop.add_handler(
self
.socket.fileno(),
self
._handle_events,
self
._state)
|
此處代碼主要兩項任務:
對於獲取請求數據,其實就是執行IOStream的read_until函數來完成,其內部通過socket.recv(4096)方法獲取客戶端請求的數據,並以 【\r\n\r\n】作爲請求信息結束符(http請求頭和內容通過\r\n\r\n分割)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class
HTTPConnection(
object
):
def
__init__(
self
, stream, address, request_callback, no_keep_alive
=
False
,xheaders
=
False
):
self
.stream
=
stream
#stream是封裝了客戶端socket和IOLoop實例的IOStream對象
self
.address
=
address
#address是客戶端IP地址
self
.request_callback
=
request_callback
#request_callback是封裝了URL映射和配置文件的Application對象。
self
.no_keep_alive
=
no_keep_alive
self
.xheaders
=
xheaders
self
._request
=
None
self
._request_finished
=
False
#獲取請求信息(請求頭和內容),然後執行 HTTPConnection的_on_headers方法繼續處理請求
self
._header_callback
=
stack_context.wrap(
self
._on_headers)
self
.stream.read_until(
"\r\n\r\n"
,
self
._header_callback)
|
請求數據格式:
GET / HTTP/1.1
Host: localhost:8888
Connection: keep-alive
Cache-Control: max-age=0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.118 Safari/537.36
Accept-Encoding: gzip, deflate, sdch
Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4
If-None-Match: "e02aa1b106d5c7c6a98def2b13005d5b84fd8dc8"
詳細代碼解析:
class IOStream(object): def read_until(self, delimiter, callback): """Call callback when we read the given delimiter.""" assert not self._read_callback, "Already reading" #終止界定 \r\n\r\n self._read_delimiter = delimiter #回調函數,即:HTTPConnection的 _on_headers 方法 self._read_callback = stack_context.wrap(callback) while True: #代碼概述: #先從socket中讀取信息並保存到buffer中 #然後再讀取buffer中的數據,以其爲參數執行回調函數(HTTPConnection的 _on_headers 方法) #buffer其實是一個線程安裝的雙端隊列collections.deque #從buffer中讀取數據,並執行回調函數。 #注意:首次執行時buffer中沒有數據 if self._read_from_buffer(): return self._check_closed() #從socket中讀取信息到buffer(線程安全的一個雙向消息隊列) if self._read_to_buffer() == 0: break self._add_io_state(self.io_loop.READ)
class IOStream(object): def _read_to_buffer(self): #省略部分代碼 chunk = self._read_from_socket() self._read_buffer.append(chunk) return len(chunk) def _read_from_socket(self): #socket對象的recv函數接收數據 #read_chunk_size在構造函數中默認設置爲:4096 chunk = self.socket.recv(self.read_chunk_size) if not chunk: self.close() return None return chunk
class IOStream(object): def _read_from_buffer(self): """Attempts to complete the currently-pending read from the buffer. Returns True if the read was completed. """ #構造函數中默認設置爲None if self._read_bytes: if self._read_buffer_size() >= self._read_bytes: num_bytes = self._read_bytes callback = self._read_callback self._read_callback = None self._read_bytes = None self._run_callback(callback, self._consume(num_bytes)) return True #_read_delimiter的值爲 \r\n\r\n elif self._read_delimiter: #buffer列表首元素合併,合併詳細見_merge_prefix函數 _merge_prefix(self._read_buffer, sys.maxint) #獲取 \r\n\r\n 所在 buffer 首元素的位置索引 loc = self._read_buffer[0].find(self._read_delimiter) if loc != -1: #如果在請求中找到了 \r\n\r\n #self._read_callback 是HTTPConnection對象的 _on_headers 方法 callback = self._read_callback delimiter_len = len(self._read_delimiter) #獲取 \r\n\r\n 的長度 self._read_callback = None self._read_delimiter = None #============ 執行HTTPConnection對象的 _on_headers 方法 ============= #self._consume(loc + delimiter_len)用來獲取 buffer 的首元素(請求的信息其實就被封裝到了buffer的首個元素中) self._run_callback(callback,self._consume(loc + delimiter_len)) return True return False
上述代碼主要有兩個任務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
class
HTTPConnection(
object
):
def
_on_headers(
self
, data):
try
:
data
=
native_str(data.decode(
'latin1'
))
eol
=
data.find(
"\r\n"
)
#獲取請求的起始行數據,例如:GET / HTTP/1.1
start_line
=
data[:eol]
try
:
#請求方式、請求地址、http版本號
method, uri, version
=
start_line.split(
" "
)
except
ValueError:
raise
_BadRequestException(
"Malformed HTTP request line"
)
if
not
version.startswith(
"HTTP/"
):
raise
_BadRequestException(
"Malformed HTTP version in HTTP Request-Line"
)
#把請求頭信息包裝到一個字典中。(不包括第一行)
headers
=
httputil.HTTPHeaders.parse(data[eol:])
#把請求信息封裝到一個HTTPRequest對象中
#注意:self._request = HTTPRequest,
#HTTPRequest中封裝了HTTPConnection
#HTTPConnection中封裝了stream和application
self
._request
=
HTTPRequest(connection
=
self
, method
=
method, uri
=
uri, version
=
version,headers
=
headers, remote_ip
=
self
.address[
0
])
#從請求頭中獲取 Content-Length
content_length
=
headers.get(
"Content-Length"
)
if
content_length:
content_length
=
int
(content_length)
if
content_length >
self
.stream.max_buffer_size:
raise
_BadRequestException(
"Content-Length too long"
)
if
headers.get(
"Expect"
)
=
=
"100-continue"
:
self
.stream.write(
"HTTP/1.1 100 (Continue)\r\n\r\n"
)
self
.stream.read_bytes(content_length,
self
._on_request_body)
return
#**************** 執行Application對象的 __call__ 方法,也就是路由系統的入口 *******************
self
.request_callback(
self
._request)
except
_BadRequestException, e:
logging.info(
"Malformed HTTP request from %s: %s"
,
self
.address[
0
], e)
self
.stream.close()
return
|
此處代碼主要有三個項任務:
注意:
1、執行Application的 __call__ 方法時,其參數request是HTTPRequest對象(其中封裝HTTPConnetion、Stream、Application對象、請求頭信息)
2、Handler泛指就是我們定義的用於處理請求的類並且她還繼承自RequestHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
class
Application(
object
):
def
__call__(
self
, request):
"""Called by HTTPServer to execute the request."""
transforms
=
[t(request)
for
t
in
self
.transforms]
handler
=
None
args
=
[]
kwargs
=
{}
#根據請求的目標主機,匹配主機模版對應的正則表達式和Handlers
handlers
=
self
._get_host_handlers(request)
if
not
handlers:
handler
=
RedirectHandler(
self
, request, url
=
"http://"
+
self
.default_host
+
"/"
)
else
:
for
spec
in
handlers:
match
=
spec.regex.match(request.path)
if
match:
# None-safe wrapper around url_unescape to handle
# unmatched optional groups correctly
def
unquote(s):
if
s
is
None
:
return
s
return
escape.url_unescape(s, encoding
=
None
)
handler
=
spec.handler_class(
self
, request,
*
*
spec.kwargs)
#創建RquestHandler對象
# Pass matched groups to the handler. Since
# match.groups() includes both named and unnamed groups,
# we want to use either groups or groupdict but not both.
# Note that args are passed as bytes so the handler can
# decide what encoding to use.
kwargs
=
dict
((k, unquote(v))
for
(k, v)
in
match.groupdict().iteritems())
if
kwargs:
args
=
[]
else
:
args
=
[unquote(s)
for
s
in
match.groups()]
break
if
not
handler:
handler
=
ErrorHandler(
self
, request, status_code
=
404
)
# In debug mode, re-compile templates and reload static files on every
# request so you don't need to restart to see changes
if
self
.settings.get(
"debug"
):
if
getattr
(RequestHandler,
"_templates"
,
None
):
for
loader
in
RequestHandler._templates.values():
loader.reset()
RequestHandler._static_hashes
=
{}
#==== 執行RequestHandler的_execute方法 ====
handler._execute(transforms,
*
args,
*
*
kwargs)
return
handler
|
上述過程中,首先根據請求的URL去路由規則中匹配,一旦匹配成功,則創建路由相對應的handler的實例。例如:如果請求 的url是【/index/11】則會創建IndexHandler實例,然後再執行該對象的 _execute 方法。由於所有的 xxxHandler 類是RequestHandler的派生類,所以會默認執行 RequestHandler的 _execute 方法。
此處代碼主要有三項任務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
class
RequestHandler(
object
):
def
_execute(
self
, transforms,
*
args,
*
*
kwargs):
"""Executes this request with the given output transforms."""
self
._transforms
=
transforms
with stack_context.ExceptionStackContext(
self
._stack_context_handle_exception):
if
self
.request.method
not
in
self
.SUPPORTED_METHODS:
raise
HTTPError(
405
)
# If XSRF cookies are turned on, reject form submissions without
# the proper cookie
if
self
.request.method
not
in
(
"GET"
,
"HEAD"
)
and
\
self
.application.settings.get(
"xsrf_cookies"
):
self
.check_xsrf_cookie()
self
.prepare()
if
not
self
._finished:
#通過反射的方法,執行 RequestHandler 派生類的的 get、post、put方法
getattr
(
self
,
self
.request.method.lower())(
*
args,
*
*
kwargs)
if
self
._auto_finish
and
not
self
._finished:
self
.finish()
|
例:用戶發送get請求
class MyHandler(tornado.web.RequestHandler): def get(self): self.write("Hello, world")
class RequestHandler(object): def write(self, chunk): assert not self._finished if isinstance(chunk, dict): chunk = escape.json_encode(chunk) self.set_header("Content-Type", "text/javascript; charset=UTF-8") chunk = _utf8(chunk) self._write_buffer.append(chunk)
上述在執行RequestHandler的write方法時,講數據保存在Handler對象的 _write_buffer 列表中,在之後執行finish時再講數據寫到IOStream對象的_write_buffer字段中,其類型是雙向隊列collections.deque()。
此段代碼主要有兩項任務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
class
RequestHandler:
def
finish(
self
, chunk
=
None
):
"""Finishes this response, ending the HTTP request."""
assert
not
self
._finished
if
chunk
is
not
None
:
self
.write(chunk)
if
not
self
._headers_written:
if
(
self
._status_code
=
=
200
and
self
.request.method
in
(
"GET"
,
"HEAD"
)
and
"Etag"
not
in
self
._headers):
hasher
=
hashlib.sha1()
for
part
in
self
._write_buffer:
hasher.update(part)
etag
=
'"%s"'
%
hasher.hexdigest()
inm
=
self
.request.headers.get(
"If-None-Match"
)
if
inm
and
inm.find(etag) !
=
-
1
:
self
._write_buffer
=
[]
self
.set_status(
304
)
else
:
self
.set_header(
"Etag"
, etag)
if
"Content-Length"
not
in
self
._headers:
content_length
=
sum
(
len
(part)
for
part
in
self
._write_buffer)
self
.set_header(
"Content-Length"
, content_length)
if
hasattr
(
self
.request,
"connection"
):
self
.request.connection.stream.set_close_callback(
None
)
if
not
self
.application._wsgi:
#將處理請求返回的數據發送到IOStream的_write_buffer隊列中
self
.flush(include_footers
=
True
)
self
.request.finish()
#紀錄日誌
self
._log()
self
._finished
=
True
|
此處代碼主要有一項任務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
def
flush(
self
, include_footers
=
False
):
"""Flushes the current output buffer to the network."""
if
self
.application._wsgi:
raise
Exception(
"WSGI applications do not support flush()"
)
chunk
=
"".join(
self
._write_buffer)
self
._write_buffer
=
[]
if
not
self
._headers_written:
self
._headers_written
=
True
for
transform
in
self
._transforms:
self
._headers, chunk
=
transform.transform_first_chunk(
self
._headers, chunk, include_footers)
headers
=
self
._generate_headers()
else
:
for
transform
in
self
._transforms:
chunk
=
transform.transform_chunk(chunk, include_footers)
headers
=
""
# Ignore the chunk and only write the headers for HEAD requests
if
self
.request.method
=
=
"HEAD"
:
if
headers:
self
.request.write(headers)
return
if
headers
or
chunk:
#執行HTTPReqeust的write方法
self
.request.write(headers
+
chunk)
|
class HTTPRequest(object): def write(self, chunk): """Writes the given chunk to the response stream.""" assert isinstance(chunk, str) #執行HTTPConnection的write方法 self.connection.write(chunk)
class IOStream(object): def write(self, data, callback=None): self._check_closed() #將數據保存到collections.deque()類型的雙向隊列中_write_buffer中 self._write_buffer.append(data) self._add_io_state(self.io_loop.WRITE) self._write_callback = stack_context.wrap(callback)
以上代碼執行完成之後,請求的處理基本上就完成了。下面就是等待監聽客戶端socket句柄的epoll觸發,然後執行IOStream的_handle_event方法來將 響應數據發送給客戶端。
此處代碼主要有一項任務:
1
2
3
4
|
class
RequestHandler:
def
_log(
self
):
self
.application.log_request(
self
)
|
class Application: def log_request(self, handler): if "log_function" in self.settings: self.settings["log_function"](handler) return if handler.get_status() < 400: log_method = logging.info elif handler.get_status() < 500: log_method = logging.warning else: log_method = logging.error request_time = 1000.0 * handler.request.request_time() log_method("%d %s %.2fms", handler.get_status(), handler._request_summary(), request_time)
由於epoll中不但監聽了服務器socket句柄還監聽了客戶端sokcet句柄,所以當客戶端socket對象變化時,就會去調用之前指定的IOStream的_handler_events方法。
此段代碼主要有一項任務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
class
IOStream(
object
):
def
_handle_events(
self
, fd, events):
if
not
self
.socket:
logging.warning(
"Got events for closed stream %d"
, fd)
return
try
:
if
events &
self
.io_loop.READ:
self
._handle_read()
if
not
self
.socket:
return
if
events &
self
.io_loop.WRITE:
if
self
._connecting:
self
._handle_connect()
#執行_handle_write方法,內部調用socket.send將數據響應給客戶端
self
._handle_write()
if
not
self
.socket:
return
if
events &
self
.io_loop.ERROR:
self
.close()
return
state
=
self
.io_loop.ERROR
if
self
.reading():
state |
=
self
.io_loop.READ
if
self
.writing():
state |
=
self
.io_loop.WRITE
if
state !
=
self
._state:
self
._state
=
state
self
.io_loop.update_handler(
self
.socket.fileno(),
self
._state)
except
:
logging.error(
"Uncaught exception, closing connection."
,
exc_info
=
True
)
self
.close()
raise
|
此段代碼主要有兩項任務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
class
IOStream(
object
):
def
_handle_write(
self
):
while
self
._write_buffer:
try
:
if
not
self
._write_buffer_frozen:
_merge_prefix(
self
._write_buffer,
128
*
1024
)
#調用客戶端socket對象的send方法發送數據
num_bytes
=
self
.socket.send(
self
._write_buffer[
0
])
self
._write_buffer_frozen
=
False
_merge_prefix(
self
._write_buffer, num_bytes)
self
._write_buffer.popleft()
except
socket.error, e:
if
e.args[
0
]
in
(errno.EWOULDBLOCK, errno.EAGAIN):
self
._write_buffer_frozen
=
True
break
else
:
|