#Asynchronous NetWorking#ios
做者:MetalBug 時間:2015-02-28 出處:http://my.oschina.net/u/247728/blog 聲明:版權全部,侵犯必究
tornado.ioloop
— Main event looptornado.iostream
— Convenient wrappers for non-blocking sockets##1.ioloop##後端
IOLoop
是一個非阻塞的IO事件循環。 典型的應用使用一個IOLoop
對象,一般經過IOLoop.instance()
得到。 使用如下三個函數往IOLoop
中註冊事件,回調或者定時器,最後使用IOLoop.start()
便可。數據結構
IOLoop.add_handler(fd, handler, events)
多線程
增長一個IO事件,當事件發生時,hanlder(fd,events)會被調用app
IOLoop.add_callback(callback, *args, **kwargs)
socket
增長一個回調函數,該回調函數將會在下一次IO迭代中被執行。函數
IOLoop.add_timeout(self, deadline, callback)
tornado
增長一個定時器,該定時器會在到達dealline時被執行。oop
如下是個簡單的例子,利用IOLoop
實現了一個TCP Server操作系統
import errno import functools import ioloop import socket def connection_ready(sock, fd, events): while True: try: connection, address = sock.accept() except socket.error, e: if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): raise return connection.setblocking(0) handle_connection(connection, address) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(("", port)) sock.listen(128) io_loop = ioloop.IOLoop.instance() callback = functools.partial(connection_ready, sock) io_loop.add_handler(sock.fileno(), callback, io_loop.READ) io_loop.start()
內部實現中IOLoop
根據不一樣的操做系統使用不一樣的IO複用機制。
Linux
-------epoll
:使用的是level-triggered
觸發方式。FreeBSD
---kQuene
other
-----select
###1.1內部實現-數據結構###
self._impl = impl or _poll() self._handlers = {} self._events = {} self._callbacks = set() self._timeouts = [] self._running = False self._stopped = False self._blocking_log_threshold = None
_impl
表示後端使用的IO複用機制,能夠本身指定或者採用系統默認
_handlers
維護了fd與其對應handler關係
_events
維護了fd與其對應event關係
_callback
維護全部在下一個IO迭代中會被調用的回調函數
_timeouts
是一個有序列表,根據deadline排序,保存了未到期的全部定時器
_running
和_stopped
是用於表示IOLoop是否start
_blocking_log_threshold
表示最大阻塞時間
###1.2內部實現-主要函數### IOLoop.start()
實現了事件循環,內部實現爲一個巨大的while
循環,在每次迭代中,會一次檢查如下事件:
_callbacks
_timeouts
poll
返回的_events
一旦事件就緒,就出發對應的回調函數,這個循環會一直持續到IOLoop.stop()
的調用,即_stopped
被置爲True
。
IOLoop.start()
流程圖:
###1.3內部實現-實現細節###
因爲
IOLoop
平時阻塞在poll
調用中,爲了讓IOLoop
可以當即執行callback
函數,須要設法喚醒它。這裏採用的是pipe
,IOLoop
始終監視該管道的readable事件,在須要喚醒的時候,往管道中寫入一個字節,這樣IOLoop
技能從IO複用(poll)中返回。
初始化pipe if os.name != 'nt': r, w = os.pipe() self._set_nonblocking(r) self._set_nonblocking(w) self._set_close_exec(r) self._set_close_exec(w) self._waker_reader = os.fdopen(r, "r", 0) self._waker_writer = os.fdopen(w, "w", 0)
喚醒
IOLoop
def _wake(self): try: self._waker_writer.write("x") except IOError: pass 在add_callback
時,須要喚醒IOLoop
從而使其當即執行callback
def add_callback(self, callback): self._callbacks.add(callback) self._wake()
由於callback函數可以對
_callbacks
進行修改(add, remove)等,因此用一個局部變量存儲當前_callbacks
,對該局部變量進行操做。
對於
_callbacks
的執行,並無反覆執行callback直到_callbacks
爲空,這裏這樣作應該是爲了防止IOLoop
陷入死循環,沒法處理IO時間,並且也設置_blocking_log_threshold
,經過singer
的timer
來防止IOLoop
卡死。 若是_callbacks
不能執行完,這裏會將poll_timeout
設置爲0,即爲當即返回,爲的的在下次IO迭代中可以當即執行_callbacks
。
callbacks = list(self._callbacks) for callback in callbacks: if callback in self._callbacks: self._callbacks.remove(callback) self._run_callback(callback) if self._callbacks: poll_timeout = 0.0
對於
_timeouts
,採用列表存儲,而且按照deadline從小到大排序,這樣才每一個IO迭代中,只須要從頭開始遍歷列表獲得比deadline小於當前時間的事件並執行便可。
#_Timeout的cmp函數 def __cmp__(self, other): return cmp((self.deadline, id(self.callback)), (other.deadline, id(other.callback)))
#往IOLoop中添加timeout,保持有序 def add_timeout(self, deadline, callback): timeout = _Timeout(deadline, callback) bisect.insort(self._timeouts, timeout) return timeout
##2.iostream## iostream
對非阻塞式的 socket 的簡單封裝,以方便經常使用讀寫操做。
###2.1內部實現-數據結構###
IOStream
內部維護了一個read_buffer和write_buffer,將維護的socket註冊到IOLoop
上,利用IOLoop
管理讀寫事件。
def __init__(self, socket, io_loop=None, max_buffer_size=104857600, read_chunk_size=4096): ## self._read_buffer = "" self._write_buffer = "" self.io_loop = io_loop or ioloop.IOLoop.instance() self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state) ##
###2.2內部實現-主要函數###
IOStream._handl_events()
根據對應events
的類型,調用不一樣的callback
def _handle_events(self, fd, events): if events & self.io_loop.READ: self._handle_read() if events & self.io_loop.WRITE: self._handle_write() if events & self.io_loop.ERROR: self.close() return state = self.io_loop.ERROR if self._read_delimiter or self._read_bytes: state |= self.io_loop.READ if self._write_buffer: state |= self.io_loop.WRITE if state != self._state: self._state = state self.io_loop.update_handler(self.socket.fileno(), self._state)
#總結 在Tornado1.0版本中,IOLoop
只考慮在單線程下的實現,對於多線程的處理並無考慮,其函數並無考慮跨線程調用對關鍵數據的保護。
例如對於_callbacks
,暴露給全部的線程,單多線程狀況下,可能會出現callback None的狀況。 在Tornado4.1中,對於多線程的狀況有了考慮,具體的見後序博文。
在IOStream
中,主要涉及到的是一個buffer的設計,內部使用了chunk,一個簡易的塊進行加快讀寫。這個的設計沒有什麼出彩之處,對於buffer的設計能夠看看別的庫是怎麼設計的(TODO)。