上一篇從一個簡單的例子大體瞭解到Tornado框架的一個概述,同時也看清了web框架的本質。html
接下來,咱們從tornado程序的起始來分析其源碼:ios
上圖是摘自朋友的博客裏的內容,這張圖很明確的分析了tornado程序啓動以及接收到客戶端請求後的整個過程,對於整個過程能夠分爲兩大部分:web
簡而言之:正則表達式
一、在啓動程序階段,第一步,獲取配置文件而後生成url映射(即:一個url對應一個XXRequestHandler處理方法,從而讓XXRequestHandler來處理指定url發送的請求);第二步,建立服務器socket對象並添加到epoll中;第三步,建立死循環不斷地去監聽epoll。服務器
二、在接收並處理請求階段,第一步,接收客戶端socket發送的請求(socket.accept);第二步,從請求中獲取請求頭信息,再而後根據請求頭中的請求url去匹配某個XXRequestHandler;第三步,匹配成功的XXRequestHandler處理請求;第四步,將處理後的請求發送給客戶端;第五步,關閉客戶端socket。app
本篇的內容主要剖析【啓動程序階段】,下面咱們就來一步一步的剖析整個過程,在此階段主要是有下面重點標註的三個方法來實現。框架
import tornado.ioloop import tornado.web class MainHandler(tornado.web.RequestHandler): def get(self): self.write("Hello, world") application = tornado.web.Application([ (r"/index", MainHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
代碼從上往下執行,到application這行時,就是執行Application類的構造函數。能夠看到並傳入的是一個列表類型的參數。這個列表裏保存的是url規則和對應的處理類,即:當客戶端的請求url能夠配置這個規則時,那麼該請求就交由對應的Handler方法去執行。dom
class Application(object): def __init__(self, handlers=None, default_host="", transforms=None,wsgi=False, **settings): #設置響應的編碼和返回方式,對應的http相應頭:Content-Encoding和Transfer-Encoding
#Content-Encoding:gzip 表示對數據進行壓縮,而後再返回給用戶,從而減小流量的傳輸。
#Transfer-Encoding:chunck 表示數據的傳送方式經過一塊一塊的傳輸。
if transforms is None: self.transforms = [] if settings.get("gzip"): self.transforms.append(GZipContentEncoding) self.transforms.append(ChunkedTransferEncoding) else: self.transforms = transforms #將參數賦值爲類的變量
self.handlers = [] self.named_handlers = {} self.default_host = default_host self.settings = settings #ui_modules和ui_methods用於在模版語言中擴展自定義輸出
#這裏將tornado內置的ui_modules和ui_methods添加到類的成員變量self.ui_modules和self.ui_methods中
self.ui_modules = {'linkify': _linkify, 'xsrf_form_html': _xsrf_form_html, 'Template': TemplateModule, } self.ui_methods = {} self._wsgi = wsgi #獲取獲取用戶自定義的ui_modules和ui_methods,並將他們添加到以前建立的成員變量self.ui_modules和self.ui_methods中
self._load_ui_modules(settings.get("ui_modules", {})) self._load_ui_methods(settings.get("ui_methods", {})) #設置靜態文件路徑,設置方式則是經過正則表達式匹配url,讓StaticFileHandler來處理匹配的url
if self.settings.get("static_path"): #從settings中讀取key爲static_path的值,用於設置靜態文件路徑
path = self.settings["static_path"] #獲取參數中傳入的handlers,若是空則設置爲空列表
handlers = list(handlers or []) #靜態文件前綴,默認是/static/
static_url_prefix = settings.get("static_url_prefix","/static/") #在參數中傳入的handlers前再添加三個映射:
#【/static/.*】 --> StaticFileHandler
#【/(favicon\.ico)】 --> StaticFileHandler
#【/(robots\.txt)】 --> StaticFileHandler
handlers = [ (re.escape(static_url_prefix) + r"(.*)", StaticFileHandler,dict(path=path)), (r"/(favicon\.ico)", StaticFileHandler, dict(path=path)), (r"/(robots\.txt)", StaticFileHandler, dict(path=path)), ] + handlers #執行本類的Application的add_handlers方法
#此時,handlers是一個列表,其中的每一個元素都是一個對應關係,即:url正則表達式和處理匹配該正則的url的Handler
if handlers: self.add_handlers(".*$", handlers) # Automatically reload modified modules
#若是settings中設置了 debug 模式,那麼就使用自動加載重啓
if self.settings.get("debug") and not wsgi: import autoreload autoreload.start()
class Application(object): def add_handlers(self, host_pattern, host_handlers): #若是主機模型最後沒有結尾符,那麼就爲他添加一個結尾符。
if not host_pattern.endswith("$"): host_pattern += "$" handlers = [] #對主機名先作一層路由映射,例如:http://www.wupeiqi.com 和 http://safe.wupeiqi.com
#即:safe對應一組url映射,www對應一組url映射,那麼當請求到來時,先根據它作第一層匹配,以後再繼續進入內部匹配。
#對於第一層url映射來講,因爲.*會匹配全部的url,所將 .* 的永遠放在handlers列表的最後,否則 .* 就會截和了...
#re.complie是編譯正則表達式,之後請求來的時候只須要執行編譯結果的match方法就能夠去匹配了
if self.handlers and self.handlers[-1][0].pattern == '.*$': self.handlers.insert(-1, (re.compile(host_pattern), handlers)) else: self.handlers.append((re.compile(host_pattern), handlers)) #遍歷咱們設置的和構造函數中添加的【url->Handler】映射,將url和對應的Handler封裝到URLSpec類中(構造函數中會對url進行編譯)
#並將全部的URLSpec對象添加到handlers列表中,而handlers列表和主機名模型組成一個元祖,添加到self.Handlers列表中。
for spec in host_handlers: if type(spec) is type(()): assert len(spec) in (2, 3) pattern = spec[0] handler = spec[1] if len(spec) == 3: kwargs = spec[2] else: kwargs = {} spec = URLSpec(pattern, handler, kwargs) handlers.append(spec) if spec.name: #未使用該功能,默認spec.name = None
if spec.name in self.named_handlers: logging.warning("Multiple handlers named %s; replacing previous value",spec.name) self.named_handlers[spec.name] = spec
上述代碼主要完成了如下功能:加載配置信息和生成url映射,而且把全部的信息封裝在一個application對象中。socket
加載的配置信息包括:async
以上的全部配置信息,均可以在settings中配置,而後在建立Application對象時候,傳入參數便可。如:application = tornado.web.Application([(r"/index", MainHandler),],**settings)
生成url映射:
封裝數據:
將配置信息和url映射關係封裝到Application對象中,信息分別保存在Application對象的如下字段中:
第一步操做將配置和url映射等信息封裝到了application對象中,而這第二步執行application對象的listen方法,該方法內部又把以前包含各類信息的application對象封裝到了一個HttpServer對象中,而後繼續調用HttpServer對象的liseten方法。
class Application(httputil.HTTPServerConnectionDelegate): # 將application的對象傳入到HTTPServer中,建立socket,綁定IP和端口並添加相應的設置 def listen(self, port, address="", **kwargs): # import is here rather than top level because HTTPServer
# is not importable on appengine
from tornado.httpserver import HTTPServer server = HTTPServer(self, **kwargs) server.listen(port, address) return server
from tornado.netutil import bind_sockets,add_accept_handler class TCPServer(object): def listen(self, port, address=""): # 建立了socket 並綁定的端口和IP
sockets = bind_sockets(port, address=address) self.add_sockets(sockets)
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False): try: sock = socket.socket(af, socktype, proto) except socket.error as e: if errno_from_exception(e) == errno.EAFNOSUPPORT: continue
raise sock.bind(sockaddr) sock.listen(backlog) bound_port = sock.getsockname()[1] sockets.append(sock) return sockets
class HTTPServer(object): def __init__(self, request_callback, no_keep_alive=False, io_loop=None,xheaders=False, ssl_options=None): #Application對象
self.request_callback = request_callback #是否長鏈接
self.no_keep_alive = no_keep_alive #IO循環
self.io_loop = io_loop self.xheaders = xheaders #Http和Http
self.ssl_options = ssl_options self._socket = None self._started = False def listen(self, port, address=""): self.bind(port, address) self.start(1) def bind(self, port, address=None, family=socket.AF_UNSPEC): assert not self._socket #建立服務端socket對象,IPV4和TCP鏈接
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) flags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD) flags |= fcntl.FD_CLOEXEC fcntl.fcntl(self._socket.fileno(), fcntl.F_SETFD, flags) #配置socket對象
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.setblocking(0) #綁定IP和端口
self._socket.bind((address, port)) #最大阻塞數量
self._socket.listen(128) def start(self, num_processes=1): assert not self._started self._started = True if num_processes is None or num_processes <= 0: num_processes = _cpu_count() if num_processes > 1 and ioloop.IOLoop.initialized(): logging.error("Cannot run in multiple processes: IOLoop instance "
"has already been initialized. You cannot call "
"IOLoop.instance() before calling start()") num_processes = 1
#若是進程數大於1
if num_processes > 1: logging.info("Pre-forking %d server processes", num_processes) for i in range(num_processes): if os.fork() == 0: import random from binascii import hexlify try: # If available, use the same method as
# random.py
seed = long(hexlify(os.urandom(16)), 16) except NotImplementedError: # Include the pid to avoid initializing two
# processes to the same value
seed(int(time.time() * 1000) ^ os.getpid()) random.seed(seed) self.io_loop = ioloop.IOLoop.instance() self.io_loop.add_handler( self._socket.fileno(), self._handle_events, ioloop.IOLoop.READ) return os.waitpid(-1, 0) #進程數等於1,默認
else: if not self.io_loop: #設置成員變量self.io_loop爲IOLoop的實例,注:IOLoop使用methodclass完成了一個單例模式
self.io_loop = ioloop.IOLoop.instance() #執行IOLoop的add_handler方法,將socket句柄、self._handle_events方法和IOLoop.READ當參數傳入
self.io_loop.add_handler(self._socket.fileno(), self._handle_events, ioloop.IOLoop.READ) def _handle_events(self, fd, events): while True: try: #====important=====#
connection, address = self._socket.accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return
raise
if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL"
try: #====important=====#
connection = ssl.wrap_socket(connection,server_side=True,do_handshake_on_connect=False,**self.ssl_options) except ssl.SSLError, err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise
except socket.error, err: if err.args[0] == errno.ECONNABORTED: return connection.close() else: raise
try: if self.ssl_options is not None: stream = iostream.SSLIOStream(connection, io_loop=self.io_loop) else: stream = iostream.IOStream(connection, io_loop=self.io_loop) #====important=====#
HTTPConnection(stream, address, self.request_callback,self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True)
class IOLoop(Configurable): # 單例模式建立IOLoop對象
@staticmethod def instance(): if not hasattr(IOLoop, "_instance"): with IOLoop._instance_lock: if not hasattr(IOLoop, "_instance"): # New instance after double check
IOLoop._instance = IOLoop() return IOLoop._instance
def wrap(fn): '''Returns a callable object that will resore the current StackContext when executed. Use this whenever saving a callback to be executed later in a different execution context (either in a different thread or asynchronously in the same thread). '''
if fn is None: return None # functools.wraps doesn't appear to work on functools.partial objects
#@functools.wraps(fn)
def wrapped(callback, contexts, *args, **kwargs): # If we're moving down the stack, _state.contexts is a prefix
# of contexts. For each element of contexts not in that prefix,
# create a new StackContext object.
# If we're moving up the stack (or to an entirely different stack),
# _state.contexts will have elements not in contexts. Use
# NullContext to clear the state and then recreate from contexts.
if (len(_state.contexts) > len(contexts) or any(a[1] is not b[1] for a, b in itertools.izip(_state.contexts, contexts))): # contexts have been removed or changed, so start over
new_contexts = ([NullContext()] + [cls(arg) for (cls,arg) in contexts]) else: new_contexts = [cls(arg) for (cls, arg) in contexts[len(_state.contexts):]] if len(new_contexts) > 1: with contextlib.nested(*new_contexts): callback(*args, **kwargs) elif new_contexts: with new_contexts[0]: callback(*args, **kwargs) else: callback(*args, **kwargs) if getattr(fn, 'stack_context_wrapped', False): return fn contexts = _state.contexts result = functools.partial(wrapped, fn, contexts) result.stack_context_wrapped = True return result
備註:stack_context.wrap其實就是對函數進行一下封裝,即:函數在不一樣狀況下上下文信息可能不一樣。
上述代碼本質上就幹了如下這麼四件事:
經過epoll監聽服務端socket事件,一旦請求到達時,則執行3中被封裝了的_handle_events函數,該函數又利用application中封裝了的各類配置信息對客戶端url來指定斷定,而後指定對應的Handler處理該請求。
注意:使用epoll建立服務端socket
import socket, select EOL1 = b'/n/n' EOL2 = b'/n/r/n' response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n' response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n' response += b'Hello, world!' serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) serversocket.bind(('0.0.0.0', 8080)) serversocket.listen(1) serversocket.setblocking(0) epoll = select.epoll() epoll.register(serversocket.fileno(), select.EPOLLIN) try: connections = {}; requests = {}; responses = {} while True: events = epoll.poll(1) for fileno, event in events: if fileno == serversocket.fileno(): connection, address = serversocket.accept() connection.setblocking(0) epoll.register(connection.fileno(), select.EPOLLIN) connections[connection.fileno()] = connection requests[connection.fileno()] = b'' responses[connection.fileno()] = response elif event & select.EPOLLIN: requests[fileno] += connections[fileno].recv(1024) if EOL1 in requests[fileno] or EOL2 in requests[fileno]: epoll.modify(fileno, select.EPOLLOUT) print('-'*40 + '/n' + requests[fileno].decode()[:-2]) elif event & select.EPOLLOUT: byteswritten = connections[fileno].send(responses[fileno]) responses[fileno] = responses[fileno][byteswritten:] if len(responses[fileno]) == 0: epoll.modify(fileno, 0) connections[fileno].shutdown(socket.SHUT_RDWR) elif event & select.EPOLLHUP: epoll.unregister(fileno) connections[fileno].close() del connections[fileno] finally: epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close()
上一步中建立了socket對象並使得socket對象和epoll創建了關係,該步驟則就來執行epoll的epoll方法去輪詢已經註冊在epoll對象中的socket句柄,當有讀可用信息時,則觸發一些操做什麼的....
class IOLoop(object): def add_handler(self, fd, handler, events): #HttpServer的Start方法中會調用該方法
self._handlers[fd] = stack_context.wrap(handler) self._impl.register(fd, events | self.ERROR) def start(self): while True: poll_timeout = 0.2
try: #epoll中輪詢
event_pairs = self._impl.poll(poll_timeout) except Exception, e: #省略其餘
#若是有讀可用信息,則把該socket對象句柄和Event Code序列添加到self._events中
self._events.update(event_pairs) #遍歷self._events,處理每一個請求
while self._events: fd, events = self._events.popitem() try: #以socket爲句柄爲key,取出self._handlers中的stack_context.wrap(handler),並執行
#stack_context.wrap(handler)包裝了HTTPServer類的_handle_events函數的一個函數
#是在上一步中執行add_handler方法時候,添加到self._handlers中的數據。
self._handlers[fd](fd, events) except: #省略其餘
對於上述代碼,執行start方法後,程序就進入「死循環」,也就是會一直不停的輪詢的去檢查是否有請求到來,若是有請求到達,則執行封裝了HttpServer類的_handle_events方法和相關上下文的stack_context.wrap(handler)(其實就是執行HttpServer類的_handle_events方法)
本篇介紹了「待請求階段」的所做所爲,簡要來講其實就是三件事:其1、把setting中的各類配置以及url和Handler之間的映射關係封裝到來application對象中(application對象又被封裝到了HttpServer對象的request_callback字段中);其2、結合epoll建立服務端socket;其3、當請求到達時交由HttpServer類的_handle_events方法處理請求,即:處理請求的入口。