tornado線程阻塞的解決

前言前端

也許有同窗很迷惑:tornado不是標榜異步非阻塞解決10K問題的嘛?可是我卻發現不是torando很差,而是你用錯了. 好比最近發現一個事情:某網站打開頁面很慢,服務器cpu/內存都正常.網絡狀態也良好. 後來發現,打開頁面會有不少請求後端數據庫的訪問,有一個mongodb的數據庫業務api的rest服務.可是它的tornado卻用錯了,一步步的來 研究問題:python

 

說明git

如下的例子都有2個url,一個是耗時的請求,一個是能夠或者說須要馬上返回的請求,我想就算一個對技術不熟,從道理上來講的用戶, 他但願的是他訪問的請求不會影響也不會被其餘人的請求影響github

#!/bin/env python

import tornado.httpserver

import tornado.ioloop

import tornado.options

import tornado.web

import tornado.httpclient

import time

from tornado.options import define, options

define("port", default=8000, help="run on the given port", type=int)

class SleepHandler(tornado.web.RequestHandler):

    def get(self):

        time.sleep(5)

        self.write("when i sleep 5s")

class JustNowHandler(tornado.web.RequestHandler):

    def get(self):

        self.write("i hope just now see you")

if __name__ == "__main__":

    tornado.options.parse_command_line()

    app = tornado.web.Application(handlers=[

            (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])

    http_server = tornado.httpserver.HTTPServer(app)

    http_server.listen(options.port)

    tornado.ioloop.IOLoop.instance().start()

假 如你使用頁面請求或者使用哪一個httpie,curl等工具先訪問http://localhost:8000/sleep,再訪問http: //localhost:8000/justnow.你會發現原本能夠馬上返回的/jsutnow的請求會一直阻塞到/sleep請求完才返回.web

 

這是爲啥?爲啥個人請求被/sleep請求阻塞了?若是平時咱們的web請求足夠快咱們可能不會意識到這個問題,可是事實上常常會有一些耗時的進程,意味着應用程序被有效的鎖定直至處理結束.mongodb

 

這 是時候你有沒有想起@tornado.web.asynchronous這個裝飾器?可是使用這個裝飾器有個前提就是你要耗時的執行須要執行異步,好比上 面的time.sleep,你只是加裝飾器是沒有做用的,並且須要注意的是 Tornado默認在函數處理返回時關閉客戶端的鏈接,可是當你使用@tornado.web.asynchonous裝飾器時,Tornado永遠不會 本身關閉鏈接,須要顯式的self.finish()關閉數據庫

 

咱們大部分的函數都是阻塞的, 好比上面的time.sleep其實tornado有個異步的實現:後端

#!/bin/env python

import tornado.httpserver

import tornado.ioloop

import tornado.options

import tornado.web

import tornado.gen

import tornado.httpclient

import tornado.concurrent

import tornado.ioloop

import time

from tornado.options import define, options

define("port", default=8000, help="run on the given port", type=int)

class SleepHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous

    @tornado.gen.coroutine

    def get(self):

        yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 5)

        self.write("when i sleep 5s")

class JustNowHandler(tornado.web.RequestHandler):

    def get(self):

        self.write("i hope just now see you")

if __name__ == "__main__":

    tornado.options.parse_command_line()

    app = tornado.web.Application(handlers=[

            (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])

    http_server = tornado.httpserver.HTTPServer(app)

    http_server.listen(options.port)

    tornado.ioloop.IOLoop.instance().start()

這裏有個新的tornado.gen.coroutine裝飾器, coroutine是3.0以後新增的裝飾器.之前的辦法是用回調,仍是看我這個例子:api

class SleepHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous

    def get(self):

        tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, callback=self.on_response)

    def on_response(self):

        self.write("when i sleep 5s")

        self.finish()

使用了callback, 可是新的裝飾器讓咱們經過yield實現一樣的效果:你在打開/sleep以後再點擊/justnow, justnow的請求都是馬上返回不受影響.可是用了asynchronous的裝飾器你的耗時的函數也須要執行異步服務器

 

剛纔說的都是沒有意義的例子,下面寫個有點用的:讀取mongodb數據庫數據,而後再前端按行write出來

#!/bin/env python

import tornado.httpserver

import tornado.ioloop

import tornado.options

import tornado.web

import tornado.gen

import tornado.httpclient

import tornado.concurrent

import tornado.ioloop

import time

# 一個mongodb出品的支持異步的數據庫的python驅動

import motor

from tornado.options import define, options

define("port", default=8000, help="run on the given port", type=int)

# db其實就是test數據庫的遊標

db = motor.MotorClient().open_sync().test

class SleepHandler(BaseHandler):

    @tornado.web.asynchronous

    @tornado.gen.coroutine

    def get(self):

        # 這一行執行仍是阻塞須要時間的,個人tt集合有一些數據而且沒有索引

        cursor = db.tt.find().sort([('a', -1)])

        # 這部分會異步非阻塞的執行二不影響其餘頁面請求

        while (yield cursor.fetch_next):

            message = cursor.next_object()

            self.write('<li>%s</li>' % message['a'])

        self.write('</ul>')

        self.finish()

    def _on_response(self, message, error):

        if error:

            raise tornado.web.HTTPError(500, error)

        elif message:

            for i in message:

                self.write('<li>%s</li>' % i['a'])

        else:

            self.write('</ul>')

            self.finish()

class JustNowHandler(BaseHandler):

    def get(self):

        self.write("i hope just now see you")

if __name__ == "__main__":

    tornado.options.parse_command_line()

    app = tornado.web.Application(handlers=[

            (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])

    http_server = tornado.httpserver.HTTPServer(app)

    http_server.listen(options.port)

    tornado.ioloop.IOLoop.instance().start()

一個同事提示爲何這個耗時的東西不能異步的丟給某工具去執行而不阻塞個人請求呢?好吧,我也想到了:celery,正好github有這個東西:tornado-celery

 

執行下面的程序首先你要安裝rabbitmq和celery:

#!/bin/env python

import tornado.httpserver

import tornado.ioloop

import tornado.options

import tornado.web

import tornado.gen

import tornado.httpclient

import tcelery, tasks

import time

from tornado.options import define, options

define("port", default=8000, help="run on the given port", type=int)

tcelery.setup_nonblocking_producer()

class SleepHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous

    @tornado.gen.coroutine

    def get(self):

        # tornado.gen.Task的參數是:要執行的函數, 參數

        yield tornado.gen.Task(tasks.sleep.apply_async, args=[5])

        self.write("when i sleep 5s")

        self.finish()

class JustNowHandler(tornado.web.RequestHandler):

    def get(self):

        self.write("i hope just now see you")

if __name__ == "__main__":

    tornado.options.parse_command_line()

    app = tornado.web.Application(handlers=[

            (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])

    http_server = tornado.httpserver.HTTPServer(app)

    http_server.listen(options.port)

    tornado.ioloop.IOLoop.instance().start()

task是celery的任務定義的文件,包含咱們說的time.sleep的函數


import time

from celery import Celery

celery = Celery("tasks", broker="amqp://guest:guest@localhost:5672")

celery.conf.CELERY_RESULT_BACKEND = "amqp"

@celery.task

def sleep(seconds):

    time.sleep(float(seconds))

    return seconds

if __name__ == "__main__":

    celery.start()

而後啓動celelry worker(要否則你的任務怎麼執行呢?確定須要一個消費者取走):

 

celery -A tasks worker --loglevel=info

可是這裏的問題也可能很嚴重:咱們的異步非阻塞依賴於celery,仍是這個隊列的長度,假如任務不少那麼就須要等待,效率很低.有沒有一種辦法把個人同步阻塞函數變爲異步(或者說被tornado的裝飾器理解和識別)呢?

#!/bin/env python

import tornado.httpserver

import tornado.ioloop

import tornado.options

import tornado.web

import tornado.httpclient

import tornado.gen

from tornado.concurrent import run_on_executor

# 這個併發庫在python3自帶在python2須要安裝sudo pip install futures

from concurrent.futures import ThreadPoolExecutor

import time

from tornado.options import define, options

define("port", default=8000, help="run on the given port", type=int)

class SleepHandler(tornado.web.RequestHandler):

    executor = ThreadPoolExecutor(2)

  #executor 是局部變量  不是全局的

    @tornado.web.asynchronous

    @tornado.gen.coroutine

    def get(self):

        # 假如你執行的異步會返回值被繼續調用能夠這樣(只是爲了演示),不然直接yield就行

        res = yield self.sleep()

        self.write("when i sleep %s s" % res)

        self.finish()

    @run_on_executor

    def sleep(self):

        time.sleep(5)

        return 5

class JustNowHandler(tornado.web.RequestHandler):

    def get(self):

        self.write("i hope just now see you")

if __name__ == "__main__":

    tornado.options.parse_command_line()

    app = tornado.web.Application(handlers=[

            (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])

    http_server = tornado.httpserver.HTTPServer(app)

    http_server.listen(options.port)

    tornado.ioloop.IOLoop.instance().start()