最近閒暇無事,閱讀了一下tornado的源碼,對總體的結構有了初步認識,與你們分享 不知道爲何右邊的目錄一直出不來,很是不舒服. 不如移步到oschina吧....[http://my.oschina.net/abc2001x/blog/476349][1]
`ioloop`是`tornado`的核心模塊,也是個調度模塊,各類異步事件都是由他調度的,因此必須弄清他的執行邏輯
而`ioloop`的核心部分則是 `while True`這個循環內部的邏輯,貼上他的代碼以下
def start(self): if self._running: raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() self._running = True old_wakeup_fd = None if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: with self._callback_lock: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) callbacks = callback = due_timeouts = timeout = None if self._callbacks: poll_timeout = 0.0 elif self._timeouts: poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd)
除去註釋,代碼其實沒多少行. 由while 內部代碼能夠看出ioloop主要由三部分組成:
他是ioloop
回調的基礎部分,經過IOLoop.instance().add_callback()
添加到self._callbacks
他們將在每一次loop
中被運行.python
主要用途是將邏輯分塊,在適合時機將包裝好的callbac
k添加到self._callbacks
讓其執行.jquery
例如ioloop
中的add_future
ios
def add_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished. The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
future
對象獲得result
的時候會調用future.add_done_callback
添加的callback
,再將其轉至ioloop
執行web
這是定時器,在指定的事件執行callback
.
跟1中的callback
相似,經過IOLoop.instance().add_callback
segmentfault
在每一次循環,會計算timeouts
回調列表裏的事件,運行已到期的callback
.
固然不是無節操的循環.api
由於poll
操做會阻塞到有io
操做發生,因此只要計算最近的timeout
,
而後用這個時間做爲self._impl.poll(poll_timeout)
的 poll_timeout
,
就能夠達到按時運行了app
可是,假設poll_timeout
的時間很大時,self._impl.poll
一直在堵塞中(沒有io事件,但在處理某一個io
事件),
那添加剛纔1中的callback
不是要等好久纔會被運行嗎? 答案固然是不會.ioloop
中有個waker
對象,他是由兩個fd
組成,一個讀一個寫.ioloop
在初始化的時候把waker綁定到epoll
裏了,add_callback
時會觸發waker的讀寫.
這樣ioloop
就會在poll
中被喚醒了,接着就能夠及時處理timeout callback
了異步
用這樣的方式也能夠本身封裝一個小的定時器功能玩玩socket
處理epoll
事件的功能
經過IOLoop.instance().add_handler(fd, handler, events)
綁定fd event
的處理事件
在httpserver.listen
的代碼內,netutil.py
中的netutil.py
的add_accept_handler
綁定accept handler
處理客戶端接入的邏輯ide
如法炮製,其餘的io事件也這樣綁定,業務邏輯的分塊交由ioloop
的callback
和future
處理
關於epoll
的用法的內容.詳情見我第一篇文章吧,哈哈
ioloop由callback
(業務分塊), timeout callback
(定時任務) io event
(io傳輸和解析) 三塊組成,互相配合完成異步的功能,構建gen
,httpclient
,iostream
等功能
串聯大體的流程是,tornado
綁定io event,處理io傳輸解析,傳輸完成後(結合Future)回調(callback)業務處理的邏輯和一些固定操做 . 定時器則是較爲獨立的模塊
我的認爲Future
是tornado
僅此ioloop
重要的模塊,他貫穿全文,全部異步操做都有他的身影
顧名思義,他主要是關注往後要作的事,相似jquery
的Deferred
吧
通常的用法是經過ioloop
的add_future
定義future
的done callback
,
當future
被set_result
的時候,future
的done callback
就會被調用.
從而完成Future
的功能.
具體能夠參考gen.coroutine
的實現,本文後面也會講到
他的組成不復雜,只有幾個重要的方法
最重要的是 add_done_callback
, set_result
tornado
用Future
和ioloop
,yield
實現了gen.coroutine
跟ioloop
的callback
相似 , 存儲事件完成後的callback
在self._callbacks
裏
def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn)
設置事件的結果,並運行以前存儲好的callback
def set_result(self, result): self._result = result self._set_done() def _set_done(self): self._done = True for cb in self._callbacks: try: cb(self) except Exception: app_log.exception('Exception in callback %r for %r', cb, self) self._callbacks = None
爲了驗證以前所說的,上一段測試代碼
#! /usr/bin/env python #coding=utf-8 import tornado.web import tornado.ioloop from tornado.gen import coroutine from tornado.concurrent import Future def test(): def pp(s): print s future = Future() iol = tornado.ioloop.IOLoop.instance() print 'init future %s'%future iol.add_future(future, lambda f: pp('ioloop callback after future done,future is %s'%f)) #模擬io延遲操做 iol.add_timeout(iol.time()+5,lambda:future.set_result('set future is done')) print 'init complete' tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": test()
運行結果:
接着繼續延伸,看看coroutine
的實現gen.coroutine
實現的功能實際上是將原來的callback
的寫法,用yield
的寫法代替. 即以yield爲分界,將代碼分紅兩部分.
如:
#! /usr/bin/env python #coding=utf-8 import tornado.ioloop from tornado.gen import coroutine from tornado.httpclient import AsyncHTTPClient @coroutine def cotest(): client = AsyncHTTPClient() res = yield client.fetch("http://www.segmentfault.com/") print res if __name__ == "__main__": f = cotest() print f #這裏返回了一個future哦 tornado.ioloop.IOLoop.instance().start()
運行結果:
接下來分析下coroutine
的實現
def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(func) def wrapper(*args, **kwargs): future = TracebackFuture() if replace_callback and 'callback' in kwargs: callback = kwargs.pop('callback') IOLoop.current().add_future( future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = getattr(e, 'value', None) except Exception: future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, types.GeneratorType): try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) if stack_context._state.contexts is not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: future.set_result(getattr(e, 'value', None)) except Exception: future.set_exc_info(sys.exc_info()) else: Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper
如源碼所示,func
運行的結果是GeneratorType
,yielded = next(result)
,
運行至原函數的yield位置,返回的是原函數func
內部 yield
右邊
返回的對象(必須是Future
或Future
的list
)給yielded
.
通過Runner(result, future, yielded)
對yielded進行處理.
在此就 貼出Runner的代碼了.
Runner初始化過程,調用handle_yield
, 查看yielded
是否已done
了,不然add_future
運行Runner
的run
方法,run
方法中若是yielded
對象已完成,用對它的gen
調用send
,發送完成的結果.
因此yielded
在什麼地方被set_result
很是重要,
當被set_result
的時候,纔會send
結果給原func
,完成整個異步操做
詳情能夠查看tornado 中重要的對象 iostream,源碼中iostream的 _handle_connect,如此設置了鏈接的result.
def _handle_connect(self): err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: self.error = socket.error(err, os.strerror(err)) if self._connect_future is None: gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), errno.errorcode[err]) self.close() return if self._connect_callback is not None: callback = self._connect_callback self._connect_callback = None self._run_callback(callback) if self._connect_future is not None: future = self._connect_future self._connect_future = None future.set_result(self) self._connecting = False
最後貼上一個簡單的測試代碼,演示coroutine,future的用法
import tornado.ioloop from tornado.gen import coroutine from tornado.concurrent import Future @coroutine def asyn_sum(a, b): print("begin calculate:sum %d+%d"%(a,b)) future = Future() future2 = Future() iol = tornado.ioloop.IOLoop.instance() print future def callback(a, b): print("calculating the sum of %d+%d:"%(a,b)) future.set_result(a+b) iol.add_timeout(iol.time()+3,lambda f:f.set_result(None),future2) iol.add_timeout(iol.time()+3,callback, a, b) result = yield future print("after yielded") print("the %d+%d=%d"%(a, b, result)) yield future2 print 'after future2' def main(): f = asyn_sum(2,3) print '' print f tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": main()
運行結果:
爲何代碼中個yield都起做用了? 由於Runner.run
裏,最後繼續用handle_yield
處理了send
後返回的yielded
對象,意思是func
裏能夠有n幹個yield
操做
if not self.handle_yield(yielded): return
至此,已完成tornado中重要的幾個模塊的流程,其餘模塊也是由此而來.寫了這麼多,越寫越卡,就到此爲止先吧,
啊~~~~~~好想有份工做
和女友
啊~~~~~