Tornado1.0源碼分析-Networking

#Asynchronous NetWorking#ios

做者:MetalBug
時間:2015-02-28
出處:http://my.oschina.net/u/247728/blog
聲明:版權全部,侵犯必究
  • tornado.ioloop — Main event loop
  • tornado.iostream — Convenient wrappers for non-blocking sockets

##1.ioloop##後端

IOLoop是一個非阻塞的IO事件循環。 典型的應用使用一個IOLoop對象,一般經過IOLoop.instance()得到。 使用如下三個函數往IOLoop中註冊事件,回調或者定時器,最後使用IOLoop.start()便可。數據結構

IOLoop.add_handler(fd, handler, events)多線程

增長一個IO事件,當事件發生時,hanlder(fd,events)會被調用app

IOLoop.add_callback(callback, *args, **kwargs)socket

增長一個回調函數,該回調函數將會在下一次IO迭代中被執行。函數

IOLoop.add_timeout(self, deadline, callback)tornado

增長一個定時器,該定時器會在到達dealline時被執行。oop

如下是個簡單的例子,利用IOLoop實現了一個TCP Server操作系統

import errno
    import functools
    import ioloop
    import socket

    def connection_ready(sock, fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error, e:
                if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                    raise
                return
            connection.setblocking(0)
            handle_connection(connection, address)

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(0)
    sock.bind(("", port))
    sock.listen(128)

    io_loop = ioloop.IOLoop.instance()
    callback = functools.partial(connection_ready, sock)
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
    io_loop.start()

內部實現中IOLoop根據不一樣的操做系統使用不一樣的IO複用機制。

  • Linux -------epoll:使用的是level-triggered觸發方式。
  • FreeBSD ---kQuene
  • other -----select

###1.1內部實現-數據結構###

self._impl = impl or _poll()
    self._handlers = {}
    self._events = {}
    self._callbacks = set()
    self._timeouts = []
    self._running = False
    self._stopped = False
    self._blocking_log_threshold = None

_impl表示後端使用的IO複用機制,能夠本身指定或者採用系統默認

_handlers維護了fd與其對應handler關係

_events維護了fd與其對應event關係

_callback維護全部在下一個IO迭代中會被調用的回調函數

_timeouts是一個有序列表,根據deadline排序,保存了未到期的全部定時器

_running_stopped是用於表示IOLoop是否start

_blocking_log_threshold表示最大阻塞時間

###1.2內部實現-主要函數### IOLoop.start()實現了事件循環,內部實現爲一個巨大的while循環,在每次迭代中,會一次檢查如下事件:

  1. _callbacks
  2. _timeouts
  3. poll返回的_events

一旦事件就緒,就出發對應的回調函數,這個循環會一直持續到IOLoop.stop()的調用,即_stopped被置爲True

IOLoop.start()流程圖:

IOLoop.start()流程圖

###1.3內部實現-實現細節###

  1. 因爲IOLoop平時阻塞在poll調用中,爲了讓IOLoop可以當即執行callback函數,須要設法喚醒它。這裏採用的是pipeIOLoop始終監視該管道的readable事件,在須要喚醒的時候,往管道中寫入一個字節,這樣IOLoop技能從IO複用(poll)中返回。

    初始化pipe if os.name != 'nt': r, w = os.pipe() self._set_nonblocking(r) self._set_nonblocking(w) self._set_close_exec(r) self._set_close_exec(w) self._waker_reader = os.fdopen(r, "r", 0) self._waker_writer = os.fdopen(w, "w", 0)

    喚醒IOLoop def _wake(self): try: self._waker_writer.write("x") except IOError: pass 在add_callback時,須要喚醒IOLoop從而使其當即執行callback def add_callback(self, callback): self._callbacks.add(callback) self._wake()

  1. 由於callback函數可以對_callbacks進行修改(add, remove)等,因此用一個局部變量存儲當前_callbacks,對該局部變量進行操做。

對於_callbacks的執行,並無反覆執行callback直到_callbacks爲空,這裏這樣作應該是爲了防止IOLoop陷入死循環,沒法處理IO時間,並且也設置_blocking_log_threshold,經過singertimer來防止IOLoop卡死。 若是_callbacks不能執行完,這裏會將poll_timeout設置爲0,即爲當即返回,爲的的在下次IO迭代中可以當即執行_callbacks

callbacks = list(self._callbacks)
    for callback in callbacks:
        if callback in self._callbacks:
            self._callbacks.remove(callback)
            self._run_callback(callback)
    if self._callbacks:
        poll_timeout = 0.0
  1. 對於_timeouts,採用列表存儲,而且按照deadline從小到大排序,這樣才每一個IO迭代中,只須要從頭開始遍歷列表獲得比deadline小於當前時間的事件並執行便可。

#_Timeout的cmp函數
    def __cmp__(self, other):
        return cmp((self.deadline, id(self.callback)),
                   (other.deadline, id(other.callback)))
#往IOLoop中添加timeout,保持有序
    def add_timeout(self, deadline, callback):
        timeout = _Timeout(deadline, callback)
        bisect.insort(self._timeouts, timeout)
        return timeout

##2.iostream## iostream對非阻塞式的 socket 的簡單封裝,以方便經常使用讀寫操做。

###2.1內部實現-數據結構###

IOStream內部維護了一個read_buffer和write_buffer,將維護的socket註冊到IOLoop上,利用IOLoop管理讀寫事件。

def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
             read_chunk_size=4096):
    ##
    self._read_buffer = ""
    self._write_buffer = ""
    self.io_loop = io_loop or ioloop.IOLoop.instance()
    self.io_loop.add_handler(
        self.socket.fileno(), self._handle_events, self._state)
    ##

###2.2內部實現-主要函數###

IOStream._handl_events()根據對應events的類型,調用不一樣的callback

def _handle_events(self, fd, events):
    if events & self.io_loop.READ:
        self._handle_read()
    if events & self.io_loop.WRITE:
        self._handle_write()
    if events & self.io_loop.ERROR:
        self.close()
        return
    state = self.io_loop.ERROR
    if self._read_delimiter or self._read_bytes:
        state |= self.io_loop.READ
    if self._write_buffer:
        state |= self.io_loop.WRITE
    if state != self._state:
        self._state = state
        self.io_loop.update_handler(self.socket.fileno(), self._state)

#總結 在Tornado1.0版本中,IOLoop只考慮在單線程下的實現,對於多線程的處理並無考慮,其函數並無考慮跨線程調用對關鍵數據的保護。

例如對於_callbacks,暴露給全部的線程,單多線程狀況下,可能會出現callback None的狀況。 在Tornado4.1中,對於多線程的狀況有了考慮,具體的見後序博文。

IOStream中,主要涉及到的是一個buffer的設計,內部使用了chunk,一個簡易的塊進行加快讀寫。這個的設計沒有什麼出彩之處,對於buffer的設計能夠看看別的庫是怎麼設計的(TODO)。

相關文章
相關標籤/搜索