深刻tornado中的ioLoop

本文所剖析的tornado源碼版本爲4.4.2html

ioloop是tornado的關鍵,是他的最底層。git

ioloop就是對I/O多路複用的封裝,它實現了一個單例,將這個單例保存在IOLoop._instance中github

ioloop實現了Reactor模型,將全部要處理的I/O事件註冊到一箇中心I/O多路複用器上,同時主線程/進程阻塞在多路複用器上;一旦有I/O事件到來或是準備就緒(文件描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應I/O事件分發到對應的處理器中。app

另外,ioloop還被用來集中運行回調函數以及集中處理定時任務。socket

一 準備知識:

  1 首先咱們要了解Reactor模型ide

  2 其次,咱們要了解I/O多路複用,因爲本文假設系統爲Linux,因此要了解epoll以及Python中的select模塊函數

  3 IOLoop類是Configurable類的子類,而Configurable類是一個工廠類,講解在這tornado

二  建立IOLoop實例

來看IOLoop,它的父類是Configurable類,也就是說:IOLoop是一個直屬配置子類oop

class IOLoop(Configurable):
    ......

這裏就要結合Configurable類進行講解:.net

 Configurable中的__new__方法

1 首先實例化一個該直屬配置子類的'執行類對象',也就是調用該類的configurable_default方法並返回賦值給impl:

複製代碼
    @classmethod
    def configurable_default(cls):
        if hasattr(select, "epoll"):     # 由於咱們假設咱們的系統爲Linux,且支持epoll,因此這裏爲True
            from tornado.platform.epoll import EPollIOLoop
            return EPollIOLoop 
        if hasattr(select, "kqueue"):
            # Python 2.6+ on BSD or Mac
            from tornado.platform.kqueue import KQueueIOLoop
            return KQueueIOLoop
        from tornado.platform.select import SelectIOLoop
        return SelectIOLoop
複製代碼

2 也就是impl是EPollIOLoop類對象,而後實例化該對象,運行其initialize方法

class EPollIOLoop(PollIOLoop):  # 該類只有這麼短短的幾句,可見主要的方法是在其父類PollIOLoop中實現。
    def initialize(self, **kwargs):
        super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 執行了父類PollIOLoop的initialize方法,並將select.epoll()傳入

  來看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())幹了些啥:

複製代碼
class PollIOLoop(IOLoop):  # 從屬配置子類

    def initialize(self, impl, time_func=None, **kwargs):
        super(PollIOLoop, self).initialize(**kwargs)                # 調用IOLoop的initialize方法
        self._impl = impl                               # self._impl = select.epoll()
        if hasattr(self._impl, 'fileno'):               # 文件描述符的close_on_exec屬性
            set_close_exec(self._impl.fileno())
        self.time_func = time_func or time.time
        self._handlers = {}                             # 文件描述符對應的fileno()做爲key,(文件描述符對象,處理函數)做爲value
        self._events = {}                               # 用來存儲epoll_obj.poll()返回的事件,也就是哪一個fd發生了什麼事件{(fd1, event1), (fd2, event2)……}
        self._callbacks = []
        self._callback_lock = threading.Lock()          # 添加線程鎖
        self._timeouts = []                             # 存儲定時任務
        self._cancellations = 0
        self._running = False
        self._stopped = False
        self._closing = False
        self._thread_ident = None                       # 得到當前線程標識符
        self._blocking_signal_threshold = None
        self._timeout_counter = itertools.count()

        # Create a pipe that we send bogus data to when we want to wake
        # the I/O loop when it is idle
        self._waker = Waker()
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)
複製代碼

  首先調用了IOLoop.initialize(self,**kwargs)方法:

複製代碼
    def initialize(self, make_current=None):
        if make_current is None:
            if IOLoop.current(instance=False) is None:
                self.make_current()
        elif make_current:
            if IOLoop.current(instance=False) is not None:
                raise RuntimeError("current IOLoop already exists")
            self.make_current()
@staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self
複製代碼

    咱們能夠看到IOLoop.initialize()主要是對線程作了一些支持和操做。

3 返回該實例

三 剖析PollIOLoop

1 處理I/O事件以及其對應handler的相關屬性以及方法

    使用self._handlers用來存儲fd與handler的對應關係,文件描述符對應的fileno()做爲key,元組(文件描述符對象,處理函數)做爲value

  self._events 用來存儲epoll_obj.poll()返回的事件,也就是哪一個fd發生了什麼事件{(fd1, event1), (fd2, event2)……}

    add_handler方法用來添加handler

  update_handle方法用來更新handler

    remove_handler方法用來移除handler

複製代碼
    def add_handler(self, fd, handler, events):
        # 向epoll中註冊事件 , 並在self._handlers[fd]中爲該文件描述符添加相應處理函數
        fd, obj = self.split_fd(fd)   # fd.fileno(),fd
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        self._impl.register(fd, events | self.ERROR)

    def update_handler(self, fd, events):
        fd, obj = self.split_fd(fd)
        self._impl.modify(fd, events | self.ERROR)

    def remove_handler(self, fd):
        fd, obj = self.split_fd(fd)
        self._handlers.pop(fd, None)
        self._events.pop(fd, None)
        try:
            self._impl.unregister(fd)
        except Exception:
            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
複製代碼

2 處理回調函數的相關屬性以及方法

  self._callbacks用來存儲回調函數

  add_callback方法用來直接添加回調函數

  add_future方法用來間接的添加回調函數,future對象詳解在這

複製代碼
    def add_callback(self, callback, *args, **kwargs):
        # 由於Python的GIL的限制,致使Python線程並不算高效。加上tornado實現了多進程 + 協程的模式,因此咱們略過源碼中的部分線程相關的一些操做
        if self._closing:
            return
        self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))
    def add_future(self, future, callback):
        # 爲future對象添加通過包裝後的回調函數,該回調函數會在future對象被set_done後添加至_callbacks中
        assert is_future(future)
        callback = stack_context.wrap(callback)
        future.add_done_callback(
            lambda future: self.add_callback(callback, future))
複製代碼

3 處理定時任務的相關屬性以及方法

  self._timeouts用來存儲定時任務

  self.add_timeout用來添加定時任務(self.call_later   self.call_at都是間接調用了該方法)

複製代碼
def add_timeout(self, deadline, callback, *args, **kwargs):
        """
            ``deadline``多是一個數字,表示相對於當前時間的時間(與「IOLoop.time」一般爲「time.time」相同的大小),或者是datetime.timedelta對象。 
            自從Tornado 4.0以來,`call_later`是一個比較方便的替代方案,由於它不須要timedelta對象。

        """
        if isinstance(deadline, numbers.Real):
            return self.call_at(deadline, callback, *args, **kwargs)
        elif isinstance(deadline, datetime.timedelta):
            return self.call_at(self.time() + timedelta_to_seconds(deadline),
                                callback, *args, **kwargs)
        else:
            raise TypeError("Unsupported deadline %r" % deadline)
複製代碼

4 啓動io多路複用器

  啓動也通常就意味着開始循環,那麼循環什麼呢?

    1 運行回調函數

    2 運行時間已到的定時任務

    3 當某個文件描述法發生事件時,運行該事件對應的handler

  使用start方法啓動ioloop,看一下其簡化版(去除線程相關,以及一些相對不重要的細節):

複製代碼
def start(self):
        try:
            while True:    
                callbacks = self._callbacks
                self._callbacks = []
                due_timeouts = []
                # 將時間已到的定時任務放置到due_timeouts中,過程省略
                for callback in callbacks:          # 執行callback
                    self._run_callback(callback)
                for timeout in due_timeouts:        # 執行定時任務
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)       
                callbacks = callback = due_timeouts = timeout = None    # 釋放內存
                # 根據狀況設置poll_timeout的值,過程省略
                if not self._running:    # 終止ioloop運行時,在執行完了callback後結束循環
                    breaktry:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:
                    if errno_from_exception(e) == errno.EINTR:  # 系統調用被信號處理函數中斷,進行下一次循環
                        continue
                    else:
                        raise 
                self._events.update(event_pairs)
                while self._events: 
                    fd, events = self._events.popitem()             # 獲取一個fd以及對應事件
                    try:
                        fd_obj, handler_func = self._handlers[fd]   # 獲取該fd對應的事件處理函數
                        handler_func(fd_obj, events)                # 運行該事件處理函數
                    except (OSError, IOError) as e:         
                        if errno_from_exception(e) == errno.EPIPE:     # 當客戶端關閉鏈接時會產生EPIPE錯誤                         
                            pass
                        # 其餘異常處理已經省略
                fd_obj = handler_func = None       # 釋放內存空間          
複製代碼
 start完整版

5 關閉io多路複用器

複製代碼
def close(self, all_fds=False):
        with self._callback_lock:
            self._closing = True
        self.remove_handler(self._waker.fileno())
        if all_fds:    # 該參數若爲True,則表示會關閉全部文件描述符
            for fd, handler in self._handlers.values():
                self.close_fd(fd)
        self._waker.close()
        self._impl.close() 
        self._callbacks = None
        self._timeouts = None
複製代碼

 四 參考 

  https://zhu327.github.io/2016/06/14/tornado%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-ioloop/  https://www.zhihu.com/question/20021164   http://stackoverflow.com/questions/12179271/meaning-of-classmethod-and-staticmethod-for-beginner/12179752#12179752  http://blog.csdn.net/benkaoya/article/details/17262053

相關文章
相關標籤/搜索