tornado 優秀的大併發處理能力得益於它的 web server 從底層開始就本身實現了一整套基於 epoll 的單線程異步架構(其餘 python web 框架的自帶 server 基本是基於 wsgi 寫的簡單服務器,並無本身實現底層結構。 那麼 tornado.ioloop
就是 tornado web server 最底層的實現。html
看 ioloop 以前,咱們須要瞭解一些預備知識,有助於咱們理解 ioloop。python
ioloop 的實現基於 epoll ,那麼什麼是 epoll? epoll 是Linux內核爲處理大批量文件描述符而做了改進的 poll 。
那麼什麼又是 poll ? 首先,咱們回顧一下, socket 通訊時的服務端,當它接受( accept )一個鏈接並創建通訊後( connection )就進行通訊,而此時咱們並不知道鏈接的客戶端有沒有信息發完。 這時候咱們有兩種選擇:linux
第一種辦法雖然能夠解決問題,但咱們要注意的是對於一個線程\進程同時只能處理一個 socket 通訊,其餘鏈接只能被阻塞。 顯然這種方式在單進程狀況下不現實。git
第二種辦法要比第一種好一些,多個鏈接能夠統一在必定時間內輪流看一遍裏面有沒有數據要讀寫,看上去咱們能夠處理多個鏈接了,這個方式就是 poll / select 的解決方案。 看起來彷佛解決了問題,但實際上,隨着鏈接愈來愈多,輪詢所花費的時間將愈來愈長,而服務器鏈接的 socket 大多不是活躍的,因此輪詢所花費的大部分時間將是無用的。爲了解決這個問題, epoll 被創造出來,它的概念和 poll 相似,不過每次輪詢時,他只會把有數據活躍的 socket 挑出來輪詢,這樣在有大量鏈接時輪詢就節省了大量時間。github
對於 epoll 的操做,其實也很簡單,只要 4 個 API 就能夠徹底操做它。web
用來建立一個 epoll 描述符( 就是建立了一個 epoll )api
操做 epoll 中的 event;可用參數有:服務器
參數 | 含義 |
---|---|
EPOLL_CTL_ADD | 添加一個新的epoll事件 |
EPOLL_CTL_DEL | 刪除一個epoll事件 |
EPOLL_CTL_MOD | 改變一個事件的監聽方式 |
而事件的監聽方式有七種,而咱們只須要關心其中的三種:架構
宏定義 | 含義 |
---|---|
EPOLLIN | 緩衝區滿,有數據可讀 |
EPOLLOUT | 緩衝區空,可寫數據 |
EPOLLERR | 發生錯誤 |
就是讓 epoll 開始工做,裏面有個參數 timeout,當設置爲非 0 正整數時,會監聽(阻塞) timeout 秒;設置爲 0 時當即返回,設置爲 -1 時一直監聽。併發
在監聽時有數據活躍的鏈接時其返回活躍的文件句柄列表(此處爲 socket 文件句柄)。
關閉 epoll
如今瞭解了 epoll 後,咱們就能夠來看 ioloop 了 (若是對 epoll 還有疑問能夠看這兩篇資料: epoll 的原理是什麼、百度百科:epoll)
不少初學者必定好奇 tornado 運行服務器最後那一句 tornado.ioloop.IOLoop.current().start()
究竟是幹什麼的。 咱們先不解釋做用,來看看這一句代碼背後到底都在幹什麼。
先貼 ioloop 代碼:
1 from __future__ import absolute_import, division, print_function, with_statement 2 import datetime 3 import errno 4 import functools 5 import heapq # 最小堆 6 import itertools 7 import logging 8 import numbers 9 import os 10 import select 11 import sys 12 import threading 13 import time 14 import traceback 15 import math 16 from tornado.concurrent import TracebackFuture, is_future 17 from tornado.log import app_log, gen_log 18 from tornado.platform.auto import set_close_exec, Waker 19 from tornado import stack_context 20 from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds 21 try: 22 import signal 23 except ImportError: 24 signal = None 25 if PY3: 26 import _thread as thread 27 else: 28 import thread 29 _POLL_TIMEOUT = 3600.0 30 class TimeoutError(Exception): 31 pass 32 class IOLoop(Configurable): 33 _EPOLLIN = 0x001 34 _EPOLLPRI = 0x002 35 _EPOLLOUT = 0x004 36 _EPOLLERR = 0x008 37 _EPOLLHUP = 0x010 38 _EPOLLRDHUP = 0x2000 39 _EPOLLONESHOT = (1 << 30) 40 _EPOLLET = (1 << 31) 41 # Our events map exactly to the epoll events 42 NONE = 0 43 READ = _EPOLLIN 44 WRITE = _EPOLLOUT 45 ERROR = _EPOLLERR | _EPOLLHUP 46 # Global lock for creating global IOLoop instance 47 _instance_lock = threading.Lock() 48 _current = threading.local() 49 @staticmethod 50 def instance(): 51 if not hasattr(IOLoop, "_instance"): 52 with IOLoop._instance_lock: 53 if not hasattr(IOLoop, "_instance"): 54 # New instance after double check 55 IOLoop._instance = IOLoop() 56 return IOLoop._instance 57 @staticmethod 58 def initialized(): 59 """Returns true if the singleton instance has been created.""" 60 return hasattr(IOLoop, "_instance") 61 def install(self): 62 assert not IOLoop.initialized() 63 IOLoop._instance = self 64 @staticmethod 65 def clear_instance(): 66 """Clear the global `IOLoop` instance. 67 .. versionadded:: 4.0 68 """ 69 if hasattr(IOLoop, "_instance"): 70 del IOLoop._instance 71 @staticmethod 72 def current(instance=True): 73 current = getattr(IOLoop._current, "instance", None) 74 if current is None and instance: 75 return IOLoop.instance() 76 return current 77 def make_current(self): 78 IOLoop._current.instance = self 79 @staticmethod 80 def clear_current(): 81 IOLoop._current.instance = None 82 @classmethod 83 def configurable_base(cls): 84 return IOLoop 85 @classmethod 86 def configurable_default(cls): 87 if hasattr(select, "epoll"): 88 from tornado.platform.epoll import EPollIOLoop 89 return EPollIOLoop 90 if hasattr(select, "kqueue"): 91 # Python 2.6+ on BSD or Mac 92 from tornado.platform.kqueue import KQueueIOLoop 93 return KQueueIOLoop 94 from tornado.platform.select import SelectIOLoop 95 return SelectIOLoop 96 def initialize(self, make_current=None): 97 if make_current is None: 98 if IOLoop.current(instance=False) is None: 99 self.make_current() 100 elif make_current: 101 if IOLoop.current(instance=False) is not None: 102 raise RuntimeError("current IOLoop already exists") 103 self.make_current() 104 def close(self, all_fds=False): 105 raise NotImplementedError() 106 def add_handler(self, fd, handler, events): 107 raise NotImplementedError() 108 def update_handler(self, fd, events): 109 raise NotImplementedError() 110 def remove_handler(self, fd): 111 raise NotImplementedError() 112 def set_blocking_signal_threshold(self, seconds, action): 113 raise NotImplementedError() 114 def set_blocking_log_threshold(self, seconds): 115 self.set_blocking_signal_threshold(seconds, self.log_stack) 116 def log_stack(self, signal, frame): 117 gen_log.warning('IOLoop blocked for %f seconds in\n%s', 118 self._blocking_signal_threshold, 119 ''.join(traceback.format_stack(frame))) 120 def start(self): 121 raise NotImplementedError() 122 def _setup_logging(self): 123 if not any([logging.getLogger().handlers, 124 logging.getLogger('tornado').handlers, 125 logging.getLogger('tornado.application').handlers]): 126 logging.basicConfig() 127 def stop(self): 128 raise NotImplementedError() 129 def run_sync(self, func, timeout=None): 130 future_cell = [None] 131 def run(): 132 try: 133 result = func() 134 if result is not None: 135 from tornado.gen import convert_yielded 136 result = convert_yielded(result) 137 except Exception: 138 future_cell[0] = TracebackFuture() 139 future_cell[0].set_exc_info(sys.exc_info()) 140 else: 141 if is_future(result): 142 future_cell[0] = result 143 else: 144 future_cell[0] = TracebackFuture() 145 future_cell[0].set_result(result) 146 self.add_future(future_cell[0], lambda future: self.stop()) 147 self.add_callback(run) 148 if timeout is not None: 149 timeout_handle = self.add_timeout(self.time() + timeout, self.stop) 150 self.start() 151 if timeout is not None: 152 self.remove_timeout(timeout_handle) 153 if not future_cell[0].done(): 154 raise TimeoutError('Operation timed out after %s seconds' % timeout) 155 return future_cell[0].result() 156 def time(self): 157 return time.time()
IOLoop 類首先聲明瞭 epoll 監聽事件的宏定義,固然,如前文所說,咱們只要關心其中的 EPOLLIN 、 EPOLLOUT 、 EPOLLERR 就行。
類中的方法有不少,看起來有點暈,但其實咱們只要關心 IOLoop 核心功能的方法便可,其餘的方法在明白核心功能後也就不難理解了。因此接下來咱們着重分析核心代碼。
instance
、 initialized
、 install
、 clear_instance
、 current
、 make_current
、 clear_current
這些方法不用在乎細節,總之如今記住它們都是爲了讓 IOLoop 類變成一個單例,保證從全局上調用的都是同一個 IOLoop 就好。
你必定疑惑 IOLoop 爲什麼沒有 __init__
, 實際上是由於要初始化成爲單例,IOLoop 的 new 函數已經被改寫了,同時指定了 initialize
作爲它的初始化方法,因此此處沒有 __init__
。 說到這,ioloop 的代碼裏好像沒有看到 new
方法,這又是什麼狀況? 咱們先暫時記住這裏。
接着咱們來看這個初始化方法:
1 def initialize(self, make_current=None): 2 if make_current is None: 3 if IOLoop.current(instance=False) is None: 4 self.make_current() 5 elif make_current: 6 if IOLoop.current(instance=False) is None: 7 raise RuntimeError("current IOLoop already exists") 8 self.make_current() 9 def make_current(self): 10 IOLoop._current.instance = self
what? 裏面只是判斷了是否第一次初始化或者調用 self.make_current()
初始化,而 make_current()
裏也僅僅是把實例指定爲本身,那麼初始化到底去哪了?
而後再看看 start()
、 run()
、 close()
這些關鍵的方法都成了返回 NotImplementedError
錯誤,所有未定義?!跟網上搜到的源碼分析徹底不同啊。 這時候看下 IOLoop 的繼承關係,原來問題出在這裏,以前的 tornado.ioloop 繼承自 object 因此全部的一切都本身實現,而如今版本的 tornado.ioloop 則繼承自 Configurable
看起來如今的 IOLoop 已經成爲了一個基類,只定義了接口。 因此接着看 Configurable
代碼:
1 class Configurable(object): 2 __impl_class = None 3 __impl_kwargs = None 4 def __new__(cls, *args, **kwargs): 5 base = cls.configurable_base() 6 init_kwargs = {} 7 if cls is base: 8 impl = cls.configured_class() 9 if base.__impl_kwargs: 10 init_kwargs.update(base.__impl_kwargs) 11 else: 12 impl = cls 13 init_kwargs.update(kwargs) 14 instance = super(Configurable, cls).__new__(impl) 15 # initialize vs __init__ chosen for compatibility with AsyncHTTPClient 16 # singleton magic. If we get rid of that we can switch to __init__ 17 # here too. 18 instance.initialize(*args, **init_kwargs) 19 return instance 20 @classmethod 21 def configurable_base(cls): 22 """Returns the base class of a configurable hierarchy. 23 This will normally return the class in which it is defined. 24 (which is *not* necessarily the same as the cls classmethod parameter). 25 """ 26 raise NotImplementedError() 27 @classmethod 28 def configurable_default(cls): 29 """Returns the implementation class to be used if none is configured.""" 30 raise NotImplementedError() 31 def initialize(self): 32 """Initialize a `Configurable` subclass instance. 33 Configurable classes should use `initialize` instead of ``__init__``. 34 .. versionchanged:: 4.2 35 Now accepts positional arguments in addition to keyword arguments. 36 """ 37 @classmethod 38 def configure(cls, impl, **kwargs): 39 """Sets the class to use when the base class is instantiated. 40 Keyword arguments will be saved and added to the arguments passed 41 to the constructor. This can be used to set global defaults for 42 some parameters. 43 """ 44 base = cls.configurable_base() 45 if isinstance(impl, (unicode_type, bytes)): 46 impl = import_object(impl) 47 if impl is not None and not issubclass(impl, cls): 48 raise ValueError("Invalid subclass of %s" % cls) 49 base.__impl_class = impl 50 base.__impl_kwargs = kwargs 51 @classmethod 52 def configured_class(cls): 53 """Returns the currently configured class.""" 54 base = cls.configurable_base() 55 if cls.__impl_class is None: 56 base.__impl_class = cls.configurable_default() 57 return base.__impl_class 58 @classmethod 59 def _save_configuration(cls): 60 base = cls.configurable_base() 61 return (base.__impl_class, base.__impl_kwargs) 62 @classmethod 63 def _restore_configuration(cls, saved): 64 base = cls.configurable_base() 65 base.__impl_class = saved[0] 66 base.__impl_kwargs = saved[1]
以前咱們尋找的 __new__
出現了! 注意其中這句: impl = cls.configured_class()
impl 在這裏就是 epoll ,它的生成函數是 configured_class()
, 而其方法裏又有 base.__impl_class = cls.configurable_default()
,調用了 configurable_default()
。而 Configurable
的 configurable_default()
:
1 def configurable_default(cls): 2 """Returns the implementation class to be used if none is configured.""" 3 raise NotImplementedError()
顯然也是個接口,那麼咱們再回頭看 ioloop 的 configurable_default()
:
1 def configurable_default(cls): 2 if hasattr(select, "epoll"): 3 from tornado.platform.epoll import EPollIOLoop 4 return EPollIOLoop 5 if hasattr(select, "kqueue"): 6 # Python 2.6+ on BSD or Mac 7 from tornado.platform.kqueue import KQueueIOLoop 8 return KQueueIOLoop 9 from tornado.platform.select import SelectIOLoop 10 return SelectIOLoop
原來這是個工廠函數,根據不一樣的操做系統返回不一樣的事件池(linux 就是 epoll, mac 返回 kqueue,其餘就返回普通的 select。 kqueue 基本等同於 epoll, 只是不一樣系統對其的不一樣實現)
如今線索轉移到了 tornado.platform.epoll.EPollIOLoop
上,咱們再來看看 EPollIOLoop
:
1 import select 2 from tornado.ioloop import PollIOLoop 3 class EPollIOLoop(PollIOLoop): 4 def initialize(self, **kwargs): 5 super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
EPollIOLoop
徹底繼承自 PollIOLoop
(注意這裏是 PollIOLoop 不是 IOLoop)並只是在初始化時指定了 impl 是 epoll,因此看起來咱們用 IOLoop 初始化最後初始化的其實就是這個 PollIOLoop,因此接下來,咱們真正須要理解和閱讀的內容應該都在這裏:
1 class PollIOLoop(IOLoop): 2 """Base class for IOLoops built around a select-like function. 3 For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` 4 (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or 5 `tornado.platform.select.SelectIOLoop` (all platforms). 6 """ 7 def initialize(self, impl, time_func=None, **kwargs): 8 super(PollIOLoop, self).initialize(**kwargs) 9 self._impl = impl 10 if hasattr(self._impl, 'fileno'): 11 set_close_exec(self._impl.fileno()) 12 self.time_func = time_func or time.time 13 self._handlers = {} 14 self._events = {} 15 self._callbacks = [] 16 self._callback_lock = threading.Lock() 17 self._timeouts = [] 18 self._cancellations = 0 19 self._running = False 20 self._stopped = False 21 self._closing = False 22 self._thread_ident = None 23 self._blocking_signal_threshold = None 24 self._timeout_counter = itertools.count() 25 # Create a pipe that we send bogus data to when we want to wake 26 # the I/O loop when it is idle 27 self._waker = Waker() 28 self.add_handler(self._waker.fileno(), 29 lambda fd, events: self._waker.consume(), 30 self.READ) 31 def close(self, all_fds=False): 32 with self._callback_lock: 33 self._closing = True 34 self.remove_handler(self._waker.fileno()) 35 if all_fds: 36 for fd, handler in self._handlers.values(): 37 self.close_fd(fd) 38 self._waker.close() 39 self._impl.close() 40 self._callbacks = None 41 self._timeouts = None 42 def add_handler(self, fd, handler, events): 43 fd, obj = self.split_fd(fd) 44 self._handlers[fd] = (obj, stack_context.wrap(handler)) 45 self._impl.register(fd, events | self.ERROR) 46 def update_handler(self, fd, events): 47 fd, obj = self.split_fd(fd) 48 self._impl.modify(fd, events | self.ERROR) 49 def remove_handler(self, fd): 50 fd, obj = self.split_fd(fd) 51 self._handlers.pop(fd, None) 52 self._events.pop(fd, None) 53 try: 54 self._impl.unregister(fd) 55 except Exception: 56 gen_log.debug("Error deleting fd from IOLoop", exc_info=True) 57 def set_blocking_signal_threshold(self, seconds, action): 58 if not hasattr(signal, "setitimer"): 59 gen_log.error("set_blocking_signal_threshold requires a signal module " 60 "with the setitimer method") 61 return 62 self._blocking_signal_threshold = seconds 63 if seconds is not None: 64 signal.signal(signal.SIGALRM, 65 action if action is not None else signal.SIG_DFL) 66 def start(self): 67 ... 68 try: 69 while True: 70 # Prevent IO event starvation by delaying new callbacks 71 # to the next iteration of the event loop. 72 with self._callback_lock: 73 callbacks = self._callbacks 74 self._callbacks = [] 75 # Add any timeouts that have come due to the callback list. 76 # Do not run anything until we have determined which ones 77 # are ready, so timeouts that call add_timeout cannot 78 # schedule anything in this iteration. 79 due_timeouts = [] 80 if self._timeouts: 81 now = self.time() 82 while self._timeouts: 83 if self._timeouts[0].callback is None: 84 # The timeout was cancelled. Note that the 85 # cancellation check is repeated below for timeouts 86 # that are cancelled by another timeout or callback. 87 heapq.heappop(self._timeouts) 88 self._cancellations -= 1 89 elif self._timeouts[0].deadline <= now: 90 due_timeouts.append(heapq.heappop(self._timeouts)) 91 else: 92 break 93 if (self._cancellations > 512 94 and self._cancellations > (len(self._timeouts) >> 1)): 95 # Clean up the timeout queue when it gets large and it's 96 # more than half cancellations. 97 self._cancellations = 0 98 self._timeouts = [x for x in self._timeouts 99 if x.callback is not None] 100 heapq.heapify(self._timeouts) 101 for callback in callbacks: 102 self._run_callback(callback) 103 for timeout in due_timeouts: 104 if timeout.callback is not None: 105 self._run_callback(timeout.callback) 106 # Closures may be holding on to a lot of memory, so allow 107 # them to be freed before we go into our poll wait. 108 callbacks = callback = due_timeouts = timeout = None 109 if self._callbacks: 110 # If any callbacks or timeouts called add_callback, 111 # we don't want to wait in poll() before we run them. 112 poll_timeout = 0.0 113 elif self._timeouts: 114 # If there are any timeouts, schedule the first one. 115 # Use self.time() instead of 'now' to account for time 116 # spent running callbacks. 117 poll_timeout = self._timeouts[0].deadline - self.time() 118 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) 119 else: 120 # No timeouts and no callbacks, so use the default. 121 poll_timeout = _POLL_TIMEOUT 122 if not self._running: 123 break 124 if self._blocking_signal_threshold is not None: 125 # clear alarm so it doesn't fire while poll is waiting for 126 # events. 127 signal.setitimer(signal.ITIMER_REAL, 0, 0) 128 try: 129 event_pairs = self._impl.poll(poll_timeout) 130 except Exception as e: 131 # Depending on python version and IOLoop implementation, 132 # different exception types may be thrown and there are 133 # two ways EINTR might be signaled: 134 # * e.errno == errno.EINTR 135 # * e.args is like (errno.EINTR, 'Interrupted system call') 136 if errno_from_exception(e) == errno.EINTR: 137 continue 138 else: 139 raise 140 if self._blocking_signal_threshold is not None: 141 signal.setitimer(signal.ITIMER_REAL, 142 self._blocking_signal_threshold, 0) 143 # Pop one fd at a time from the set of pending fds and run 144 # its handler. Since that handler may perform actions on 145 # other file descriptors, there may be reentrant calls to 146 # this IOLoop that update self._events 147 self._events.update(event_pairs) 148 while self._events: 149 fd, events = self._events.popitem() 150 try: 151 fd_obj, handler_func = self._handlers[fd] 152 handler_func(fd_obj, events) 153 except (OSError, IOError) as e: 154 if errno_from_exception(e) == errno.EPIPE: 155 # Happens when the client closes the connection 156 pass 157 else: 158 self.handle_callback_exception(self._handlers.get(fd)) 159 except Exception: 160 self.handle_callback_exception(self._handlers.get(fd)) 161 fd_obj = handler_func = None 162 finally: 163 # reset the stopped flag so another start/stop pair can be issued 164 self._stopped = False 165 if self._blocking_signal_threshold is not None: 166 signal.setitimer(signal.ITIMER_REAL, 0, 0) 167 IOLoop._current.instance = old_current 168 if old_wakeup_fd is not None: 169 signal.set_wakeup_fd(old_wakeup_fd) 170 def stop(self): 171 self._running = False 172 self._stopped = True 173 self._waker.wake() 174 def time(self): 175 return self.time_func() 176 def call_at(self, deadline, callback, *args, **kwargs): 177 timeout = _Timeout( 178 deadline, 179 functools.partial(stack_context.wrap(callback), *args, **kwargs), 180 self) 181 heapq.heappush(self._timeouts, timeout) 182 return timeout 183 def remove_timeout(self, timeout): 184 # Removing from a heap is complicated, so just leave the defunct 185 # timeout object in the queue (see discussion in 186 # http://docs.python.org/library/heapq.html). 187 # If this turns out to be a problem, we could add a garbage 188 # collection pass whenever there are too many dead timeouts. 189 timeout.callback = None 190 self._cancellations += 1 191 def add_callback(self, callback, *args, **kwargs): 192 with self._callback_lock: 193 if self._closing: 194 raise RuntimeError("IOLoop is closing") 195 list_empty = not self._callbacks 196 self._callbacks.append(functools.partial( 197 stack_context.wrap(callback), *args, **kwargs)) 198 if list_empty and thread.get_ident() != self._thread_ident: 199 # If we're in the IOLoop's thread, we know it's not currently 200 # polling. If we're not, and we added the first callback to an 201 # empty list, we may need to wake it up (it may wake up on its 202 # own, but an occasional extra wake is harmless). Waking 203 # up a polling IOLoop is relatively expensive, so we try to 204 # avoid it when we can. 205 self._waker.wake() 206 def add_callback_from_signal(self, callback, *args, **kwargs): 207 with stack_context.NullContext(): 208 if thread.get_ident() != self._thread_ident: 209 # if the signal is handled on another thread, we can add 210 # it normally (modulo the NullContext) 211 self.add_callback(callback, *args, **kwargs) 212 else: 213 # If we're on the IOLoop's thread, we cannot use 214 # the regular add_callback because it may deadlock on 215 # _callback_lock. Blindly insert into self._callbacks. 216 # This is safe because the GIL makes list.append atomic. 217 # One subtlety is that if the signal interrupted the 218 # _callback_lock block in IOLoop.start, we may modify 219 # either the old or new version of self._callbacks, 220 # but either way will work. 221 self._callbacks.append(functools.partial( 222 stack_context.wrap(callback), *args, **kwargs))
果真, PollIOLoop 繼承自 IOLoop 並實現了它的全部接口,如今咱們終於能夠進入真正的正題了