tornado的非異步阻塞模式

【優化tornado阻塞任務的三個選擇】

一、優化阻塞的任務,使其執行時間更快。常常因爲是一個DB的慢查詢,或者複雜的上層模板致使的,這個時候首要的是加速這些任務,而不是優化複雜的webserver。能夠提高99%的效率。html

二、開啓一個單獨的線程或者進程執行耗時任務。這意味着對於IOLoop來講,能夠開啓另外一個線程(或進程)處理off-loading任務,這樣它就能夠再接收其餘請求了,而不是阻塞住。python

三、使用異步的驅動或者庫函數來執行任務,例如gevent , motorweb

【例子1】

import time

import tornado.ioloop
import tornado.web


class MainHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello, world %s" % time.time())


class SleepHandler(tornado.web.RequestHandler):

    def get(self, n):
        time.sleep(float(n))
        self.write("Awake! %s" % time.time())


application = tornado.web.Application([
    (r"/", MainHandler),
    (r"/sleep/(\d+)", SleepHandler),
])


if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

這樣在一個tab頁中開啓http://localhost:8888/sleep/10,同時在另外一個tab頁訪問http://localhost:8888/,會發現沒有打印"Hello World"直到第一個頁面完成爲止。實際上,第一個調用將IOLoop阻塞住了,致使其沒法響應第二個請求。mongodb

 

【例子2——非阻塞模式】

from concurrent.futures import ThreadPoolExecutor
from functools import partial, wraps

import tornado.ioloop
import tornado.web


EXECUTOR = ThreadPoolExecutor(max_workers=4)


def unblock(f):

    @tornado.web.asynchronous
    @wraps(f)
    def wrapper(*args, **kwargs):
        self = args[0]

        def callback(future):
            self.write(future.result())
            self.finish()

        EXECUTOR.submit(
            partial(f, *args, **kwargs)
        ).add_done_callback(
            lambda future: tornado.ioloop.IOLoop.instance().add_callback(
                partial(callback, future)))

    return wrapper


class SleepHandler(tornado.web.RequestHandler):

    @unblock
    def get(self, n):
        time.sleep(float(n))
        return "Awake! %s" % time.time()

unblock修飾器將被修飾函數提交給線程池,返回一個future。在future中添加一個callback函數,並將控制權交給IOLoop。安全

這個回調函數最終將調用self.finish,並結束這次請求。app

Note:這個修飾器函數自己還須要被tornado.web.asynchronous修飾,爲了是避免調用self.finish太快。異步

self.write不是線程安全(thread-safe)的,所以避免在主線程中處理future的結果。async

當你使用@tornado.web.asynchonous裝飾器時,Tornado永遠不會本身關閉鏈接,須要顯式的self.finish()關閉函數

 【完整的demo】

from concurrent.futures import ThreadPoolExecutor
from functools import partial, wraps
import time
 
import tornado.ioloop
import tornado.web
 
 
EXECUTOR = ThreadPoolExecutor(max_workers=4)
 
 
def unblock(f):
 
    @tornado.web.asynchronous
    @wraps(f)
    def wrapper(*args, **kwargs):
        self = args[0]
 
        def callback(future):
            self.write(future.result())
            self.finish()
 
        EXECUTOR.submit(
            partial(f, *args, **kwargs)
        ).add_done_callback(
            lambda future: tornado.ioloop.IOLoop.instance().add_callback(
                partial(callback, future)))
 
    return wrapper
 
 
class MainHandler(tornado.web.RequestHandler):
 
    def get(self):
        self.write("Hello, world %s" % time.time())
 
 
class SleepHandler(tornado.web.RequestHandler):
 
    @unblock
    def get(self, n):
        time.sleep(float(n))
        return "Awake! %s" % time.time()
 
 
class SleepAsyncHandler(tornado.web.RequestHandler):
 
    @tornado.web.asynchronous
    def get(self, n):
 
        def callback(future):
            self.write(future.result())
            self.finish()
 
        EXECUTOR.submit(
            partial(self.get_, n)
        ).add_done_callback(
            lambda future: tornado.ioloop.IOLoop.instance().add_callback(
                partial(callback, future)))
 
    def get_(self, n):
        time.sleep(float(n))
        return "Awake! %s" % time.time()
 
 
application = tornado.web.Application([
    (r"/", MainHandler),
    (r"/sleep/(\d+)", SleepHandler),
    (r"/sleep_async/(\d+)", SleepAsyncHandler),
])
 
 
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

【ThreadPoolExecutor】

上面涉及到ThreadPoolExecutor兩個方法,初始化以及submit,查看幫助tornado

class ThreadPoolExecutor(concurrent.futures._base.Executor)
 |  Method resolution order:
 |      ThreadPoolExecutor
 |      concurrent.futures._base.Executor
 |      __builtin__.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, max_workers)
 |      Initializes a new ThreadPoolExecutor instance.
 |      
 |      Args:
 |          max_workers: The maximum number of threads that can be used to
 |              execute the given calls.
 |  
 |  submit(self, fn, *args, **kwargs)
 |      Submits a callable to be executed with the given arguments.
 |      
 |      Schedules the callable to be executed as fn(*args, **kwargs) and returns
 |      a Future instance representing the execution of the callable.
 |      
 |      Returns:
 |          A Future representing the given call.

一、max_workers能夠處理給定calls的最大線程數目,若是超過這個數目會怎麼樣呢??

二、submit調用fn(*args, **kwargs),返回一個Future的實例

【Future】

Help on class Future in module concurrent.futures._base:

class Future(__builtin__.object)
 |  Represents the result of an asynchronous computation.
 |  
 |  Methods defined here:
 |  
 |  __init__(self)
 |      Initializes the future. Should not be called by clients.
 |  
 |  __repr__(self)
 |  
 |  add_done_callback(self, fn)
 |      Attaches a callable that will be called when the future finishes.
 |      
 |      Args:
 |          fn: A callable that will be called with this future as its only
 |              argument when the future completes or is cancelled. The callable
 |              will always be called by a thread in the same process in which
 |              it was added. If the future has already completed or been
 |              cancelled then the callable will be called immediately. These
 |              callables are called in the order that they were added.

  

【參考文獻】

一、http://lbolla.info/blog/2013/01/22/blocking-tornado

二、http://www.tuicool.com/articles/36ZzA3

相關文章
相關標籤/搜索