本文所剖析的tornado源碼版本爲4.4.2html
ioloop是tornado的關鍵,是他的最底層。git
ioloop就是對I/O多路複用的封裝,它實現了一個單例,將這個單例保存在IOLoop._instance中github
ioloop實現了Reactor模型,將全部要處理的I/O事件註冊到一箇中心I/O多路複用器上,同時主線程/進程阻塞在多路複用器上;一旦有I/O事件到來或是準備就緒(文件描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應I/O事件分發到對應的處理器中。app
另外,ioloop還被用來集中運行回調函數以及集中處理定時任務。socket
1 首先咱們要了解Reactor模型ide
2 其次,咱們要了解I/O多路複用,因爲本文假設系統爲Linux,因此要了解epoll以及Python中的select模塊函數
3 IOLoop類是Configurable類的子類,而Configurable類是一個工廠類,講解在這。tornado
來看IOLoop,它的父類是Configurable類,也就是說:IOLoop是一個直屬配置子類oop
class IOLoop(Configurable): ......
這裏就要結合Configurable類進行講解:.net
1 首先實例化一個該直屬配置子類的'執行類對象',也就是調用該類的configurable_default方法並返回賦值給impl:
@classmethod def configurable_default(cls): if hasattr(select, "epoll"): # 由於咱們假設咱們的系統爲Linux,且支持epoll,因此這裏爲True from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
2 也就是impl是EPollIOLoop類對象,而後實例化該對象,運行其initialize方法
class EPollIOLoop(PollIOLoop): # 該類只有這麼短短的幾句,可見主要的方法是在其父類PollIOLoop中實現。 def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 執行了父類PollIOLoop的initialize方法,並將select.epoll()傳入
來看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())幹了些啥:
class PollIOLoop(IOLoop): # 從屬配置子類 def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) # 調用IOLoop的initialize方法 self._impl = impl # self._impl = select.epoll() if hasattr(self._impl, 'fileno'): # 文件描述符的close_on_exec屬性 set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} # 文件描述符對應的fileno()做爲key,(文件描述符對象,處理函數)做爲value self._events = {} # 用來存儲epoll_obj.poll()返回的事件,也就是哪一個fd發生了什麼事件{(fd1, event1), (fd2, event2)……} self._callbacks = [] self._callback_lock = threading.Lock() # 添加線程鎖 self._timeouts = [] # 存儲定時任務 self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None # 得到當前線程標識符 self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)
首先調用了IOLoop.initialize(self,**kwargs)方法:
def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current()
@staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self
咱們能夠看到IOLoop.initialize()主要是對線程作了一些支持和操做。
3 返回該實例
1 處理I/O事件以及其對應handler的相關屬性以及方法
使用self._handlers用來存儲fd與handler的對應關係,文件描述符對應的fileno()做爲key,元組(文件描述符對象,處理函數)做爲value
self._events 用來存儲epoll_obj.poll()返回的事件,也就是哪一個fd發生了什麼事件{(fd1, event1), (fd2, event2)……}
add_handler方法用來添加handler
update_handle方法用來更新handler
remove_handler方法用來移除handler
def add_handler(self, fd, handler, events): # 向epoll中註冊事件 , 並在self._handlers[fd]中爲該文件描述符添加相應處理函數 fd, obj = self.split_fd(fd) # fd.fileno(),fd self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
2 處理回調函數的相關屬性以及方法
self._callbacks用來存儲回調函數
add_callback方法用來直接添加回調函數
add_future方法用來間接的添加回調函數,future對象詳解在這
def add_callback(self, callback, *args, **kwargs): # 由於Python的GIL的限制,致使Python線程並不算高效。加上tornado實現了多進程 + 協程的模式,因此咱們略過源碼中的部分線程相關的一些操做 if self._closing: return self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs)) def add_future(self, future, callback): # 爲future對象添加通過包裝後的回調函數,該回調函數會在future對象被set_done後添加至_callbacks中 assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
3 處理定時任務的相關屬性以及方法
self._timeouts用來存儲定時任務
self.add_timeout用來添加定時任務(self.call_later self.call_at都是間接調用了該方法)
def add_timeout(self, deadline, callback, *args, **kwargs): """ ``deadline``多是一個數字,表示相對於當前時間的時間(與「IOLoop.time」一般爲「time.time」相同的大小),或者是datetime.timedelta對象。 自從Tornado 4.0以來,`call_later`是一個比較方便的替代方案,由於它不須要timedelta對象。 """ if isinstance(deadline, numbers.Real): return self.call_at(deadline, callback, *args, **kwargs) elif isinstance(deadline, datetime.timedelta): return self.call_at(self.time() + timedelta_to_seconds(deadline), callback, *args, **kwargs) else: raise TypeError("Unsupported deadline %r" % deadline)
4 啓動io多路複用器
啓動也通常就意味着開始循環,那麼循環什麼呢?
1 運行回調函數
2 運行時間已到的定時任務
3 當某個文件描述法發生事件時,運行該事件對應的handler
使用start方法啓動ioloop,看一下其簡化版(去除線程相關,以及一些相對不重要的細節):
def start(self): try: while True: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] # 將時間已到的定時任務放置到due_timeouts中,過程省略 for callback in callbacks: # 執行callback self._run_callback(callback) for timeout in due_timeouts: # 執行定時任務 if timeout.callback is not None: self._run_callback(timeout.callback) callbacks = callback = due_timeouts = timeout = None # 釋放內存 # 根據狀況設置poll_timeout的值,過程省略 if not self._running: # 終止ioloop運行時,在執行完了callback後結束循環 breaktry: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: # 系統調用被信號處理函數中斷,進行下一次循環 continue else: raise self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() # 獲取一個fd以及對應事件 try: fd_obj, handler_func = self._handlers[fd] # 獲取該fd對應的事件處理函數 handler_func(fd_obj, events) # 運行該事件處理函數 except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # 當客戶端關閉鏈接時會產生EPIPE錯誤 pass # 其餘異常處理已經省略 fd_obj = handler_func = None # 釋放內存空間
5 關閉io多路複用器
def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: # 該參數若爲True,則表示會關閉全部文件描述符 for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None
https://zhu327.github.io/2016/06/14/tornado%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-ioloop/ https://www.zhihu.com/question/20021164 http://stackoverflow.com/questions/12179271/meaning-of-classmethod-and-staticmethod-for-beginner/12179752#12179752 http://blog.csdn.net/benkaoya/article/details/17262053