tornado+flask實現異步任務

一、tornado是單線程的,同時WSGI應用又是同步的,若是咱們使用Tornado啓動WSGI應用,理論上每次只能處理一個請求都是,任何一個請求有阻塞,都會致使tornado的整個IOLOOP阻塞。以下所示,咱們同時發出兩個GET請求向http://127.0.0.1:5000/html

會發現第一個發出的請求會在大約5s以後返回,而另外一個請求會在10s左右返回,咱們能夠判斷,這兩個請求是順序執行的。python

from tornado.wsgi import WSGIContainer from tornado.httpserver import HTTPServer from tornado.ioloop import IOLoopfrom flask import Flask import time



app = Flask(__name__)

@app.route('/') 
def index():
    time.sleep(5) return 'OK'
if __name__ == '__main__':
    http_server = HTTPServer(WSGIContainer(app))
    http_server.listen(5000)
    IOLoop.instance().start()

二、咱們知道,tornado實現異步運行同步函數,咱們只能使用線程來運行,以下所示:web

import tornado.web
import tornado.ioloop
import time
import tornado

class IndexHandler(tornado.web.RequestHandler):
"""主路由處理類"""
@tornado.gen.coroutine
def get(self):
    """對應http的get請求方式"""
    loop = tornado.ioloop.IOLoop.instance()
    yield loop.run_in_executor(None,self.sleep)
    self.write("Hello You!")

def sleep(self):
    time.sleep(5)
    self.write('sleep OK')


if __name__ == "__main__":
app = tornado.web.Application([
    (r"/", IndexHandler),
])
app.listen(8000)
tornado.ioloop.IOLoop.current().start()

三、對於這種(使用tornado運行Flask的狀況)狀況,咱們如何作呢,查看 WSGIContainer 的代碼咱們發現:flask

class WSGIContainer(object):

def __init__(self, wsgi_application):
    self.wsgi_application = wsgi_application

def __call__(self, request):
    data = {}
    response = []

    def start_response(status, response_headers, exc_info=None):
        data["status"] = status
        data["headers"] = response_headers
        return response.append
    # 修改這裏
    app_response = self.wsgi_application(
        WSGIContainer.environ(request), start_response)
    try:
        response.extend(app_response)
        body = b"".join(response)
    finally:
        if hasattr(app_response, "close"):
            app_response.close()
    if not data:
        raise Exception("WSGI app did not call start_response")

    status_code, reason = data["status"].split(' ', 1)
    status_code = int(status_code)
    headers = data["headers"]
    header_set = set(k.lower() for (k, v) in headers)
    body = escape.utf8(body)
    if status_code != 304:
        if "content-length" not in header_set:
            headers.append(("Content-Length", str(len(body))))
        if "content-type" not in header_set:
            headers.append(("Content-Type", "text/html; charset=UTF-8"))
    if "server" not in header_set:
        headers.append(("Server", "TornadoServer/%s" % tornado.version))

    start_line = httputil.ResponseStartLine("HTTP/1.1", status_code, reason)
    header_obj = httputil.HTTPHeaders()
    for key, value in headers:
        header_obj.add(key, value)
    request.connection.write_headers(start_line, header_obj, chunk=body)
    request.connection.finish()
    self._log(status_code, request)

將 app_response 改成異步獲取(使用yield)多線程

import tornado
from tornado import escape
from tornado import httputil
from typing import List, Tuple, Optional, Callable, Any, Dict
from types import TracebackType


class WSGIContainer_With_Thread(WSGIContainer):
@tornado.gen.coroutine
def __call__(self, request):
    data = {}  # type: Dict[str, Any]
    response = []  # type: List[bytes]

    def start_response(
            status: str,
            headers: List[Tuple[str, str]],
            exc_info: Optional[
                Tuple[
                    "Optional[Type[BaseException]]",
                    Optional[BaseException],
                    Optional[TracebackType],
                ]
            ] = None,
    ) -> Callable[[bytes], Any]:
        data["status"] = status
        data["headers"] = headers
        return response.append

    loop = tornado.ioloop.IOLoop.instance()
    # 修改這裏
    app_response = yield loop.run_in_executor(None, self.wsgi_application, WSGIContainer.environ(request),
                                              start_response)
    # app_response = self.wsgi_application(
    #     WSGIContainer.environ(request), start_response
    # )
    try:
        response.extend(app_response)
        body = b"".join(response)
    finally:
        if hasattr(app_response, "close"):
            app_response.close()  # type: ignore
    if not data:
        raise Exception("WSGI app did not call start_response")

    status_code_str, reason = data["status"].split(" ", 1)
    status_code = int(status_code_str)
    headers = data["headers"]  # type: List[Tuple[str, str]]
    header_set = set(k.lower() for (k, v) in headers)
    body = escape.utf8(body)
    if status_code != 304:
        if "content-length" not in header_set:
            headers.append(("Content-Length", str(len(body))))
        if "content-type" not in header_set:
            headers.append(("Content-Type", "text/html; charset=UTF-8"))
    if "server" not in header_set:
        headers.append(("Server", "TornadoServer/%s" % tornado.version))

    start_line = httputil.ResponseStartLine("HTTP/1.1", status_code, reason)
    header_obj = httputil.HTTPHeaders()
    for key, value in headers:
        header_obj.add(key, value)
    assert request.connection is not None
    request.connection.write_headers(start_line, header_obj, chunk=body)
    request.connection.finish()
    self._log(status_code, request)


if __name__ == '__main__':
http_server = HTTPServer(WSGIContainer_With_Thread(app))
http_server.listen(5000)
IOLoop.instance().start()

注意:app

  1 、這種方法實際上並無提升性能,說到底仍是使用多線程來運行的,因此推薦若是使用tornado仍是和tornado的web框架聯合起來寫出真正的異步代碼,這樣纔會達到tornado異步IO的高性能目的,yield 在python 3.10版本不在支持協程框架

相關文章
相關標籤/搜索