最近打算學習 tornado 的源碼,因此就創建一個系列主題 「深刻理解 tornado」。 在此記錄學習經歷及我的看法與你們分享。文中必定會出現理解不到位或理解錯誤的地方,還請你們多多指教 html
進入正題:python
tornado 優秀的大併發處理能力得益於它的 web server 從底層開始就本身實現了一整套基於 epoll 的單線程異步架構(其餘 python web 框架的自帶 server 基本是基於 wsgi 寫的簡單服務器,並無本身實現底層結構。 關於 wsgi 詳見以前的文章: 本身寫一個 wsgi 服務器運行 Django 、Tornado 應用)。 那麼 tornado.ioloop
就是 tornado web server 最底層的實現。linux
看 ioloop 以前,咱們須要瞭解一些預備知識,有助於咱們理解 ioloop。git
ioloop 的實現基於 epoll ,那麼什麼是 epoll? epoll 是Linux內核爲處理大批量文件描述符而做了改進的 poll 。
那麼什麼又是 poll ? 首先,咱們回顧一下, socket 通訊時的服務端,當它接受( accept )一個鏈接並創建通訊後( connection )就進行通訊,而此時咱們並不知道鏈接的客戶端有沒有信息發完。 這時候咱們有兩種選擇:github
一直在這裏等着直到收發數據結束;web
每隔必定時間來看看這裏有沒有數據;api
第二種辦法要比第一種好一些,多個鏈接能夠統一在必定時間內輪流看一遍裏面有沒有數據要讀寫,看上去咱們能夠處理多個鏈接了,這個方式就是 poll / select 的解決方案。 看起來彷佛解決了問題,但實際上,隨着鏈接愈來愈多,輪詢所花費的時間將愈來愈長,而服務器鏈接的 socket 大多不是活躍的,因此輪詢所花費的大部分時間將是無用的。爲了解決這個問題, epoll 被創造出來,它的概念和 poll 相似,不過每次輪詢時,他只會把有數據活躍的 socket 挑出來輪詢,這樣在有大量鏈接時輪詢就節省了大量時間。服務器
對於 epoll 的操做,其實也很簡單,只要 4 個 API 就能夠徹底操做它。架構
用來建立一個 epoll 描述符( 就是建立了一個 epoll )併發
操做 epoll 中的 event;可用參數有:
參數 | 含義 |
---|---|
EPOLL_CTL_ADD | 添加一個新的epoll事件 |
EPOLL_CTL_DEL | 刪除一個epoll事件 |
EPOLL_CTL_MOD | 改變一個事件的監聽方式 |
而事件的監聽方式有七種,而咱們只須要關心其中的三種:
宏定義 | 含義 |
---|---|
EPOLLIN | 緩衝區滿,有數據可讀 |
EPOLLOUT | 緩衝區空,可寫數據 |
EPOLLERR | 發生錯誤 |
就是讓 epoll 開始工做,裏面有個參數 timeout,當設置爲非 0 正整數時,會監聽(阻塞) timeout 秒;設置爲 0 時當即返回,設置爲 -1 時一直監聽。
在監聽時有數據活躍的鏈接時其返回活躍的文件句柄列表(此處爲 socket 文件句柄)。
關閉 epoll
如今瞭解了 epoll 後,咱們就能夠來看 ioloop 了 (若是對 epoll 還有疑問能夠看這兩篇資料: epoll 的原理是什麼、百度百科:epoll)
不少初學者必定好奇 tornado 運行服務器最後那一句 tornado.ioloop.IOLoop.current().start()
究竟是幹什麼的。 咱們先不解釋做用,來看看這一句代碼背後到底都在幹什麼。
先貼 ioloop 代碼:
from __future__ import absolute_import, division, print_function, with_statement import datetime import errno import functools import heapq # 最小堆 import itertools import logging import numbers import os import select import sys import threading import time import traceback import math from tornado.concurrent import TracebackFuture, is_future from tornado.log import app_log, gen_log from tornado.platform.auto import set_close_exec, Waker from tornado import stack_context from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds try: import signal except ImportError: signal = None if PY3: import _thread as thread else: import thread _POLL_TIMEOUT = 3600.0 class TimeoutError(Exception): pass class IOLoop(Configurable): _EPOLLIN = 0x001 _EPOLLPRI = 0x002 _EPOLLOUT = 0x004 _EPOLLERR = 0x008 _EPOLLHUP = 0x010 _EPOLLRDHUP = 0x2000 _EPOLLONESHOT = (1 << 30) _EPOLLET = (1 << 31) # Our events map exactly to the epoll events NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP # Global lock for creating global IOLoop instance _instance_lock = threading.Lock() _current = threading.local() @staticmethod def instance(): if not hasattr(IOLoop, "_instance"): with IOLoop._instance_lock: if not hasattr(IOLoop, "_instance"): # New instance after double check IOLoop._instance = IOLoop() return IOLoop._instance @staticmethod def initialized(): """Returns true if the singleton instance has been created.""" return hasattr(IOLoop, "_instance") def install(self): assert not IOLoop.initialized() IOLoop._instance = self @staticmethod def clear_instance(): """Clear the global `IOLoop` instance. .. versionadded:: 4.0 """ if hasattr(IOLoop, "_instance"): del IOLoop._instance @staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self @staticmethod def clear_current(): IOLoop._current.instance = None @classmethod def configurable_base(cls): return IOLoop @classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current() def close(self, all_fds=False): raise NotImplementedError() def add_handler(self, fd, handler, events): raise NotImplementedError() def update_handler(self, fd, events): raise NotImplementedError() def remove_handler(self, fd): raise NotImplementedError() def set_blocking_signal_threshold(self, seconds, action): raise NotImplementedError() def set_blocking_log_threshold(self, seconds): self.set_blocking_signal_threshold(seconds, self.log_stack) def log_stack(self, signal, frame): gen_log.warning('IOLoop blocked for %f seconds in\n%s', self._blocking_signal_threshold, ''.join(traceback.format_stack(frame))) def start(self): raise NotImplementedError() def _setup_logging(self): if not any([logging.getLogger().handlers, logging.getLogger('tornado').handlers, logging.getLogger('tornado.application').handlers]): logging.basicConfig() def stop(self): raise NotImplementedError() def run_sync(self, func, timeout=None): future_cell = [None] def run(): try: result = func() if result is not None: from tornado.gen import convert_yielded result = convert_yielded(result) except Exception: future_cell[0] = TracebackFuture() future_cell[0].set_exc_info(sys.exc_info()) else: if is_future(result): future_cell[0] = result else: future_cell[0] = TracebackFuture() future_cell[0].set_result(result) self.add_future(future_cell[0], lambda future: self.stop()) self.add_callback(run) if timeout is not None: timeout_handle = self.add_timeout(self.time() + timeout, self.stop) self.start() if timeout is not None: self.remove_timeout(timeout_handle) if not future_cell[0].done(): raise TimeoutError('Operation timed out after %s seconds' % timeout) return future_cell[0].result() def time(self): return time.time() ...
IOLoop 類首先聲明瞭 epoll 監聽事件的宏定義,固然,如前文所說,咱們只要關心其中的 EPOLLIN 、 EPOLLOUT 、 EPOLLERR 就行。
類中的方法有不少,看起來有點暈,但其實咱們只要關心 IOLoop 核心功能的方法便可,其餘的方法在明白核心功能後也就不難理解了。因此接下來咱們着重分析核心代碼。
instance
、 initialized
、 install
、 clear_instance
、 current
、 make_current
、 clear_current
這些方法不用在乎細節,總之如今記住它們都是爲了讓 IOLoop 類變成一個單例,保證從全局上調用的都是同一個 IOLoop 就好。
你必定疑惑 IOLoop 爲什麼沒有 __init__
, 實際上是由於要初始化成爲單例,IOLoop 的 new 函數已經被改寫了,同時指定了 initialize
作爲它的初始化方法,因此此處沒有 __init__
。 說到這,ioloop 的代碼裏好像沒有看到 new
方法,這又是什麼狀況? 咱們先暫時記住這裏。
接着咱們來看這個初始化方法:
def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is None: raise RuntimeError("current IOLoop already exists") self.make_current() def make_current(self): IOLoop._current.instance = self
what? 裏面只是判斷了是否第一次初始化或者調用 self.make_current()
初始化,而 make_current()
裏也僅僅是把實例指定爲本身,那麼初始化到底去哪了?
而後再看看 start()
、 run()
、 close()
這些關鍵的方法都成了返回 NotImplementedError
錯誤,所有未定義?!跟網上搜到的源碼分析徹底不同啊。 這時候看下 IOLoop 的繼承關係,原來問題出在這裏,以前的 tornado.ioloop 繼承自 object 因此全部的一切都本身實現,而如今版本的 tornado.ioloop 則繼承自 Configurable
看起來如今的 IOLoop 已經成爲了一個基類,只定義了接口。 因此接着看 Configurable
代碼:
class Configurable(object): __impl_class = None __impl_kwargs = None def __new__(cls, *args, **kwargs): base = cls.configurable_base() init_kwargs = {} if cls is base: impl = cls.configured_class() if base.__impl_kwargs: init_kwargs.update(base.__impl_kwargs) else: impl = cls init_kwargs.update(kwargs) instance = super(Configurable, cls).__new__(impl) # initialize vs __init__ chosen for compatibility with AsyncHTTPClient # singleton magic. If we get rid of that we can switch to __init__ # here too. instance.initialize(*args, **init_kwargs) return instance @classmethod def configurable_base(cls): """Returns the base class of a configurable hierarchy. This will normally return the class in which it is defined. (which is *not* necessarily the same as the cls classmethod parameter). """ raise NotImplementedError() @classmethod def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError() def initialize(self): """Initialize a `Configurable` subclass instance. Configurable classes should use `initialize` instead of ``__init__``. .. versionchanged:: 4.2 Now accepts positional arguments in addition to keyword arguments. """ @classmethod def configure(cls, impl, **kwargs): """Sets the class to use when the base class is instantiated. Keyword arguments will be saved and added to the arguments passed to the constructor. This can be used to set global defaults for some parameters. """ base = cls.configurable_base() if isinstance(impl, (unicode_type, bytes)): impl = import_object(impl) if impl is not None and not issubclass(impl, cls): raise ValueError("Invalid subclass of %s" % cls) base.__impl_class = impl base.__impl_kwargs = kwargs @classmethod def configured_class(cls): """Returns the currently configured class.""" base = cls.configurable_base() if cls.__impl_class is None: base.__impl_class = cls.configurable_default() return base.__impl_class @classmethod def _save_configuration(cls): base = cls.configurable_base() return (base.__impl_class, base.__impl_kwargs) @classmethod def _restore_configuration(cls, saved): base = cls.configurable_base() base.__impl_class = saved[0] base.__impl_kwargs = saved[1]
以前咱們尋找的 __new__
出現了! 注意其中這句: impl = cls.configured_class()
impl 在這裏就是 epoll ,它的生成函數是 configured_class()
, 而其方法裏又有 base.__impl_class = cls.configurable_default()
,調用了 configurable_default()
。而 Configurable
的 configurable_default()
:
def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError()
顯然也是個接口,那麼咱們再回頭看 ioloop 的 configurable_default()
:
def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
原來這是個工廠函數,根據不一樣的操做系統返回不一樣的事件池(linux 就是 epoll, mac 返回 kqueue,其餘就返回普通的 select。 kqueue 基本等同於 epoll, 只是不一樣系統對其的不一樣實現)
如今線索轉移到了 tornado.platform.epoll.EPollIOLoop
上,咱們再來看看 EPollIOLoop
:
import select from tornado.ioloop import PollIOLoop class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
EPollIOLoop
徹底繼承自 PollIOLoop
(注意這裏是 PollIOLoop 不是 IOLoop)並只是在初始化時指定了 impl 是 epoll,因此看起來咱們用 IOLoop 初始化最後初始化的其實就是這個 PollIOLoop,因此接下來,咱們真正須要理解和閱讀的內容應該都在這裏:
class PollIOLoop(IOLoop): """Base class for IOLoops built around a select-like function. For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or `tornado.platform.select.SelectIOLoop` (all platforms). """ def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} self._events = {} self._callbacks = [] self._callback_lock = threading.Lock() self._timeouts = [] self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True) def set_blocking_signal_threshold(self, seconds, action): if not hasattr(signal, "setitimer"): gen_log.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL) def start(self): ... try: while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] # Add any timeouts that have come due to the callback list. # Do not run anything until we have determined which ones # are ready, so timeouts that call add_timeout cannot # schedule anything in this iteration. due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # The timeout was cancelled. Note that the # cancellation check is repeated below for timeouts # that are cancelled by another timeout or callback. heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it's # more than half cancellations. self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. callbacks = callback = due_timeouts = timeout = None if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of 'now' to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) def stop(self): self._running = False self._stopped = True self._waker.wake() def time(self): return self.time_func() def call_at(self, deadline, callback, *args, **kwargs): timeout = _Timeout( deadline, functools.partial(stack_context.wrap(callback), *args, **kwargs), self) heapq.heappush(self._timeouts, timeout) return timeout def remove_timeout(self, timeout): # Removing from a heap is complicated, so just leave the defunct # timeout object in the queue (see discussion in # http://docs.python.org/library/heapq.html). # If this turns out to be a problem, we could add a garbage # collection pass whenever there are too many dead timeouts. timeout.callback = None self._cancellations += 1 def add_callback(self, callback, *args, **kwargs): with self._callback_lock: if self._closing: raise RuntimeError("IOLoop is closing") list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty and thread.get_ident() != self._thread_ident: # If we're in the IOLoop's thread, we know it's not currently # polling. If we're not, and we added the first callback to an # empty list, we may need to wake it up (it may wake up on its # own, but an occasional extra wake is harmless). Waking # up a polling IOLoop is relatively expensive, so we try to # avoid it when we can. self._waker.wake() def add_callback_from_signal(self, callback, *args, **kwargs): with stack_context.NullContext(): if thread.get_ident() != self._thread_ident: # if the signal is handled on another thread, we can add # it normally (modulo the NullContext) self.add_callback(callback, *args, **kwargs) else: # If we're on the IOLoop's thread, we cannot use # the regular add_callback because it may deadlock on # _callback_lock. Blindly insert into self._callbacks. # This is safe because the GIL makes list.append atomic. # One subtlety is that if the signal interrupted the # _callback_lock block in IOLoop.start, we may modify # either the old or new version of self._callbacks, # but either way will work. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs))
果真, PollIOLoop 繼承自 IOLoop 並實現了它的全部接口,如今咱們終於能夠進入真正的正題了
首先要看的是關於 epoll 操做的方法,還記得前文說過的 epoll 只須要四個 api 就能徹底操做嘛? 咱們來看 PollIOLoop 的實現:
def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
epoll_ctl:這個三個方法分別對應 epoll_ctl 中的 add 、 modify 、 del 參數。 因此這三個方法實現了 epoll 的 epoll_ctl 。
epoll_create:而後 epoll 的生成在前文 EPollIOLoop 的初始化中就已經完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
。 這個至關於 epoll_create 。
epoll_wait:epoll_wait 操做則在 start()
中:event_pairs = self._impl.poll(poll_timeout)
epoll_close:而 epoll 的 close 則在 PollIOLoop 中的 close
方法內調用: self._impl.close()
完成。
接下來看 PollIOLoop 的初始化方法中做了什麼:
def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl # 指定 epoll if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) # fork 後關閉無用文件描述符 self.time_func = time_func or time.time # 指定獲取當前時間的函數 self._handlers = {} # handler 的字典,儲存被 epoll 監聽的 handler,與打開它的文件描述符 ( file descriptor 簡稱 fd ) 一一對應 self._events = {} # event 的字典,儲存 epoll 返回的活躍的 fd event pairs self._callbacks = [] # 儲存各個 fd 回調函數的列表 self._callback_lock = threading.Lock() # 指定進程鎖 self._timeouts = [] # 將是一個最小堆結構,按照超時時間從小到大排列的 fd 的任務堆( 一般這個任務都會包含一個 callback ) self._cancellations = 0 # 關於 timeout 的計數器 self._running = False # ioloop 是否在運行 self._stopped = False # ioloop 是否中止 self._closing = False # ioloop 是否關閉 self._thread_ident = None # 當前線程堆標識符 ( thread identify ) self._blocking_signal_threshold = None # 系統信號, 主要用來在 epoll_wait 時判斷是否會有 signal alarm 打斷 epoll self._timeout_counter = itertools.count() # 超時計數器 ( 暫時不是很明白具體做用,好像和前面的 _cancellations 有關係? 請大神講講) self._waker = Waker() # 一個 waker 類,主要是對於管道 pipe 的操做,由於 ioloop 屬於底層的數據操做,這裏 epoll 監聽的是 pipe self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) # 將管道加入 epoll 監聽,對於 web server 初始化時只須要關心 READ 事件
除了註釋中的解釋,還有幾點補充:
close_exec 的做用: 子進程在fork出來的時候,使用了寫時複製(COW,Copy-On-Write)方式得到父進程的數據空間、 堆和棧副本,這其中也包括文件描述符。剛剛fork成功時,父子進程中相同的文件描述符指向系統文件表中的同一項,接着,通常咱們會調用exec執行另外一個程序,此時會用全新的程序替換子進程的正文,數據,堆和棧等。此時保存文件描述符的變量固然也不存在了,咱們就沒法關閉無用的文件描述符了。因此一般咱們會fork子進程後在子進程中直接執行close關掉無用的文件描述符,而後再執行exec。 因此 close_exec 執行的其實就是 關閉 + 執行的做用。 詳情能夠查看: 關於linux進程間的close-on-exec機制
Waker(): Waker 封裝了對於管道 pipe 的操做:
def set_close_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) def _set_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) class Waker(interface.Waker): def __init__(self): r, w = os.pipe() _set_nonblocking(r) _set_nonblocking(w) set_close_exec(r) set_close_exec(w) self.reader = os.fdopen(r, "rb", 0) self.writer = os.fdopen(w, "wb", 0) def fileno(self): return self.reader.fileno() def write_fileno(self): return self.writer.fileno() def wake(self): try: self.writer.write(b"x") except IOError: pass def consume(self): try: while True: result = self.reader.read() if not result: break except IOError: pass def close(self): self.reader.close() self.writer.close()
能夠看到 waker 把 pipe 分爲讀、 寫兩個管道並都設置了非阻塞和 close_exec
。 注意wake(self)
方法中:self.writer.write(b"x")
直接向管道中寫入隨意字符從而釋放管道。
ioloop 最核心的部分:
def start(self): if self._running: # 判斷是否已經運行 raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False # 設置中止爲假 return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() # 得到當前線程標識符 self._running = True # 設置運行 old_wakeup_fd = None if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: # 服務器進程正式開始,相似於其餘服務器的 serve_forever with self._callback_lock: # 加鎖,_callbacks 作爲臨界區不加鎖進行讀寫會產生髒數據 callbacks = self._callbacks # 讀取 _callbacks self._callbacks = []. # 清空 _callbacks due_timeouts = [] # 用於存放這個週期內已過時( 已超時 )的任務 if self._timeouts: # 判斷 _timeouts 裏是否有數據 now = self.time() # 獲取當前時間,用來判斷 _timeouts 裏的任務有沒有超時 while self._timeouts: # _timeouts 有數據時一直循環, _timeouts 是個最小堆,第一個數據永遠是最小的, 這裏第一個數據永遠是最接近超時或已超時的 if self._timeouts[0].callback is None: # 超時任務無回調 heapq.heappop(self._timeouts) # 直接彈出 self._cancellations -= 1 # 超時計數器 -1 elif self._timeouts[0].deadline <= now: # 判斷最小的數據是否超時 due_timeouts.append(heapq.heappop(self._timeouts)) # 超時就加到已超時列表裏。 else: break # 由於最小堆,若是沒超時就直接退出循環( 後面的數據一定未超時 ) if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # 當超時計數器大於 512 而且 大於 _timeouts 長度一半( >> 爲右移運算, 至關於十進制數據被除 2 )時,清零計數器,並剔除 _timeouts 中無 callbacks 的任務 self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) # 進行 _timeouts 最小堆化 for callback in callbacks: self._run_callback(callback) # 運行 callbacks 裏全部的 calllback for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # 運行全部已過時任務的 callback callbacks = callback = due_timeouts = timeout = None # 釋放內存 if self._callbacks: # _callbacks 裏有數據時 poll_timeout = 0.0 # 設置 epoll_wait 時間爲0( 當即返回 ) elif self._timeouts: # _timeouts 裏有數據時 poll_timeout = self._timeouts[0].deadline - self.time() # 取最小過時時間當 epoll_wait 等待時間,這樣當第一個任務過時時當即返回 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) # 若是最小過時時間大於默認等待時間 _POLL_TIMEOUT = 3600,則用 3600,若是最小過時時間小於0 就設置爲0 當即返回。 else: poll_timeout = _POLL_TIMEOUT # 默認 3600 s 等待時間 if not self._running: # 檢查是否有系統信號中斷運行,有則中斷,無則繼續 break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 開始 epoll_wait 以前確保 signal alarm 都被清空( 這樣在 epoll_wait 過程當中不會被 signal alarm 打斷 ) try: event_pairs = self._impl.poll(poll_timeout) # 獲取返回的活躍事件隊 except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # epoll_wait 結束, 再設置 signal alarm self._events.update(event_pairs) # 將活躍事件加入 _events while self._events: fd, events = self._events.popitem() # 循環彈出事件 try: fd_obj, handler_func = self._handlers[fd] # 處理事件 handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: self._stopped = False # 確保發生異常也繼續運行 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) # 和 start 開頭部分對應,可是不是很清楚做用,求老司機帶帶路
def stop(self): self._running = False self._stopped = True self._waker.wake()
這個很簡單,設置判斷條件,而後調用 self._waker.wake()
向 pipe 寫入隨意字符喚醒 ioloop 事件循環。 over!
噗,寫了這麼長,終於寫完了。 通過分析,咱們能夠看到, ioloop 其實是對 epoll 的封裝,並加入了一些對上層事件的處理和 server 相關的底層處理。
最後,感謝你們任勞任怨看到這,文中理解有誤的地方還請多多指教!