tornado配合celery及rabbitmq實現web request異步非阻塞

Tornado和Celery介紹

1.Tornado

Tornado是一個用python編寫的一個強大的、可擴展的異步HTTP服務器,同時也是一個web開發框架。tornado是一個非阻塞式web服務器,其速度至關快。得利於其非阻塞的方式和對 epoll的運用,tornado每秒能夠處理數以千計的鏈接,這意味着對於實時web服務來講,tornado是一個理想的web框架。它在處理嚴峻的網絡流量時表現得足夠強健,但卻在建立和編寫時有着足夠的輕量級,並可以被用在大量的應用和工具中。
進一步瞭解和學習tornado可移步: tornado官方文檔

2.Celery

Celery 是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,它是一個專一於實時處理的任務隊列, 同時也支持任務調度。Celery 中有兩個比較關鍵的概念:
  • Worker: worker 是一個獨立的進程,它持續監視隊列中是否有須要處理的任務;
  • Broker: broker 也被稱爲中間人或者協調者,broker 負責協調客戶端和 worker 的溝通。客戶端向 隊列添加消息,broker 負責把消息派發給 worker。

3.RabbitMQ

RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

RabbitMQ主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。html

例如一個日誌系統,很容易使用RabbitMQ簡化工做量,一個Consumer能夠進行消息的正常處理,另外一個Consumer負責對消息進行日誌記錄,只要在程序中指定兩個Consumer所監聽的queue以相同的方式綁定到同一exchange便可,剩下的消息分發工做由RabbitMQ完成。python

通常狀況下,一個工具庫或者一個框架都是獨立的,有本身的feature或者功能點,可能依賴其餘的庫,但毫不依賴於其餘服務。可是celery是一個特例,若是celery沒有broker這個服務,那就徹底不能用了。celery 支持多種 broker, 但主要以 RabbitMQ 和 Redis 爲主,其餘都是試驗性的,雖然也可使用, 可是沒有專門的維護者。官方推薦使用rabbitmq做爲生產環境下的broker,redis雖然也在官方指名的broker之列,可是實際使用上有可能還會出現如下莫名其妙的問題。mysql

Celery的配置和使用方法詳見:官方文檔git

從Tornado的異步講起

tornado的同步阻塞

用tornado進行web開發的過程當中(實際上用任何語言或者框架開發都會遇到),開發者可能會發現有時候tornado的響應會變慢,追根溯源會發現緣由之一就是由於該請求被其餘請求阻塞了。這就有問題了啊!!!tornado不是標榜本身是異步Http Web Server嗎?不是號稱本身解決了C10K問題了嗎?這是欺騙消費者啊!!!
可是,深刻了解tornado以後才發現,人家說的異步非阻塞是有條件的,只有按照它說的來,才能實現真正的異步非阻塞。。。
咱們先來看一個小例子:github

#!/bin/env python

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import torndb
import time

from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)

db = torndb.Connection('127.0.0.1:3306', 'user_db', 'username', 'passwd')

class MysqlHandler(tornado.web.RequestHandler):
    def get(self, flag):
        self.write(db.query('select * from table where flag=%s', flag))

class NowHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("i want you, right now!")

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[
            (r"/mysql_query/(\d+)", MysqlHandler), 
            (r"/i_want_you_now", NowHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

當咱們先請求/mysql_query接口時再請求/i_want_you_now接口,會發現原來能夠馬上返回的第二個請求卻被一直阻塞到第一個接口執行完以後才返回。爲何?由於大部分web框架都是使用的同步阻塞模型來處理請求的,tornado的默認模型也不例外。可是tornado但是一個異步http服務器啊,不會這麼弱吧?並且不上場景下都有一些至關耗時的操做,這些操做就會阻塞其餘一些普通的請求,應該怎麼解決這個問題?web

相信不少使用過tornado的人會想到@tornado.web.asynchronous這個裝飾器,可是這就是tornado官方雞賊的地方了!!!裝飾器 web.asynchronous 只能用在verb函數以前(即get/post/delete等),而且須要搭配tornado異步客戶端使用,如httpclient.AsyncHTTPClient,或者,你須要異步執行的那個函數(操做)必須也是異步的。。。(我是怨念滿滿的粗體!!!),並且加上這個裝飾器後,開發者必須在異步回調函數裏顯式調用 RequestHandler.finish 纔會結束此次 HTTP 請求。(由於tornado默認在函數處理返回時會自動關閉客戶端的鏈接)redis

什麼意思呢?就是說,tornado:老子只給你提供異步的入口,你要是真想異步操做,要不你就使用我提供的一些異步客戶端來搞,否則你就本身實現一個異步的操做。sql

以操做MongoDB爲例,若是你的函數中含有調用mongo的調用(使用pymongo庫),那麼這時候你加asynchronous這個裝飾器就沒有任何效果了,由於你的mongo調用自己是同步的,若是想作成異步非阻塞的效果,須要使用mongo出品的另外一個python driver -- motor,這個driver支持異步操做mongo,這時候你再加asynchronous裝飾器並操做mongo就能夠實現異步非阻塞的效果了。數據庫

異步非阻塞的實現

因此,若是要使用tornado的異步調用,第一,使用tornado內置的異步客戶端如httpclient.AsyncHTTPClient等;第二,可參考內置異步客戶端,藉助tornado.ioloop.IOLoop封裝一個本身的異步客戶端,但開發成本並不小。服務器

然而,天無絕人之路,仍是有辦法能夠用較低的成本實現tornado的異步非阻塞的,那就是藉助celery項目。前面說了,它是一個分佈式的實時處理消息隊列調度系統,tornado接到請求後,能夠把全部的複雜業務邏輯處理、數據庫操做以及IO等各類耗時的同步任務交給celery,由這個任務隊列異步處理完後,再返回給tornado。這樣只要保證tornado和celery的交互是異步的,那麼整個服務是徹底異步的。至於如何保證tornado和celery之間的交互是異步的,能夠藉助tornado-celery這個適配器來實現。

celery配合rabbitmq的工做流程以下:

這裏咱們來使用這幾個組件重寫前面的同步阻塞的例子:

#!/bin/env python

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient

import time
import tcelery, tasks
from tornado.options import define, options
tcelery.setup_nonblocking_producer()
define("port", default=8000, help="run on the given port", type=int)

class AsyncMysqlHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, flag):
        res = yield tornado.gen.Task(tasks.query_mysql.apply_async, args=[flag])
        self.write(res.result)
        self.finish()

class NowHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("i want you, right now!")

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[
            (r"/mysql_query/(\d+)", AsyncMysqlHandler), 
            (r"/i_want_you_now", NowHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

這裏有個新的tornado.gen.coroutine裝飾器, coroutine是3.0以後新增的裝飾器.之前的辦法是用回調函數的方式進行異步調用,若是使用回調函數的方式,則代碼以下:

#!/bin/env python

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import time
import tcelery, tasks
from tornado.options import define, options
tcelery.setup_nonblocking_producer()
define("port", default=8000, help="run on the given port", type=int)

class AsyncMysqlHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self, flag):
        tasks.query_mysql.apply_async(args=[flag], callback=self.on_result)

    def on_result(self, response):
        self.write(response.result)
        self.finish()

class NowHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("i want you, right now!")

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[
            (r"/mysql_query/(\d+)", AsyncMysqlHandler), 
            (r"/i_want_you_now", NowHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

使用callback的話始終以爲會是的代碼結構變得比較混亂,試想若是有大量異步回調,每個都寫一個回調函數的話,勢必致使項目代碼結構變得不那麼清晰和優雅,畢竟回調這種反人類的寫法仍是不少人不喜歡的,但也看我的喜愛,不喜歡callback風格的可使用yield來進行異步調用。

tasks.py集中放置開發者須要異步執行的函數。

import time
import torndb
from celery import Celery

db = torndb.Connection('127.0.0.1:3306', 'user_db', 'username', 'passwd')
app = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
app.conf.CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672"

@app.task(name='task.query_users')
def query_mysql(flag):
    return db.query('select * from table where flag=%s', flag)

if __name__ == "__main__":
    app.start()

而後啓動celery worker監放任務隊列(消費者會從任務隊列中取走一個個的task並執行):

celery -A tasks worker --loglevel=info

自此,依靠這種架構,能夠實現tornado處理請求的徹底異步調用。

問題及優化

1.隊列過長問題

使用上述方案的異步非阻塞可能會依賴於celery的任務隊列長度,若隊列中的任務過多,則可能致使長時間等待,下降效率。
解決方案:

  • 啓動多個celery worker監放任務隊列,使用多進程併發消費任務隊列,celery命令能夠經過-concurrency參數來指定用來執行任務而prefork的worker進程,若是全部的worker都在執行任務,那麼新添加的任務必需要等待有一個正在執行的任務完成後才能被執行,默認的concurrency數量是機器上CPU的數量。另外,celery是支持好幾個併發模式的,有prefork,threading,協程(gevent,eventlet),prefork在celery的介紹是,默認是用了multiprocess來實現的;能夠經過-p參數指定其餘的併發模型,如gevent(需本身配置好gevent環境)。
  • 創建多個任務queue,把大量的任務分發到不一樣的queue中,減輕單個queue時可能出現的任務數量過載。

2.水平擴展優化

前面說了celery是一個分佈式系統,也就是說,基於celery的項目可無痛實現分佈式擴展,前面寫的tornado和celery配合的demo,也能夠實現獨立部署,即tornado server和celery server其實能夠分開部署,即分佈在不一樣的服務器上,celery server部署本身的tasks.py任務,並啓動celery worker監聽,而後在tornado server上添加如下代碼:

from celery import Celery
app = Celery(broker = "amqp://",)

並使用Celery的send_task函數調用任務:

app.send_task('function_name', args=[param1, param2, param3...])

便可實現tornado和celery的徹底解耦。

後續:

另外,瞭解到tornado.concurrent.futures(py3自帶這個庫,py2需單獨安裝)這個module能夠實現自定義函數的異步化,目前尚未深刻了解這個東西,有時間去研究一下這個東西,有心得再分享一下這個module相關的知識。

相關文章
相關標籤/搜索