Tornado + Celery + RabbitMQ

聲明:代碼是從項目中截取的, 爲進行測試python

 

使用Celery任務隊列,Celery 只是一個任務隊列,須要一個broker媒介,將耗時的任務傳遞給Celery任務隊列執行,執行完畢將結果經過broker媒介返回。官方推薦使用RabbitMQ做爲消息傳遞,redis也能夠web

 

1、Celery 介紹:

注意:
一、當使用RabbitMQ時,須要按照pika第三方庫,pika0.10.0存在bug,沒法得到回調信息,須要按照0.9.14版本便可
二、tornado-celery 庫比較舊,沒法適應Celery的最新版,會致使報沒法導入task Producter包錯誤,只須要將celery版本按照在3.0.25就能夠了
 

2、配置

單個參數配置:
app.conf.CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘ 

 

多個參數配置:
app.conf.update(
     CELERY_BROKER_URL = ‘amqp://guest@localhost//‘,
     CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘
 )

 

從配置文件中獲取:(將配置參數寫在文件app.py中)redis

 BROKER_URL=‘amqp://guest@localhost//
 CELERY_RESULT_BACKEND=‘redis://localhost:6379/0‘
 app.config_from_object(‘celeryconfig‘)

 

3、案例

啓動一個Celery 任務隊列,也就是消費者:json

 from celery import Celery
 celery = Celery(‘tasks‘, broker=‘amqp://guest:guest@119.29.151.45:5672‘, backend=‘amqp‘)  使用RabbitMQ做爲載體, 回調也是使用rabbit做爲載體
 
 @celery.task(name=‘doing‘)   #異步任務,須要命一個獨一無二的名字
 def doing(s, b):
     print(‘開始任務‘)
     logging.warning(‘開始任務--{}‘.format(s))
     time.sleep(s)
     return s+b

 

命令行啓動任務隊列守護進程,當隊列中有任務時,自動執行 (命令行能夠放在supervisor中管理)
--loglevel=info --concurrency=5
記錄等級,默認是concurrency:指定工做進程數量,默認是CPU核心數

 

啓動任務生產者app

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tcelery
from tornado.web import RequestHandler
import tornado

tcelery.setup_nonblocking_producer()  # 設置爲非阻塞生產者,不然沒法獲取回調信息


class MyMainHandler(RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        print('begin')
        result = yield tornado.gen.Task(sleep.apply_async, args=[10])  # 使用yield 獲取異步返回值,會一直等待可是不阻塞其餘請求
        print('ok - -{}'.format(result.result))  # 返回值結果

        # sleep.apply_async((10, ), callback=self.on_success)
        # print(‘ok -- {}‘.format(result.get(timeout=100)))#使用回調的方式獲取返回值,發送任務以後,請求結束,因此不能放在處理tornado的請求任務當中,由於請求已經結束了,與客戶端已經斷開鏈接,沒法再在獲取返回值的回調中繼續向客戶端返回數據

        # result = sleep.delay(10)    #delay方法只是對apply_async方法的封裝而已
        # data = result.get(timeout=100)  #使用get方法獲取返回值,會致使阻塞,至關於同步執行


        def on_success(self, response):  # 回調函數
            print('Ok - - {}'.format(response))

 

=======================less

#!/usr/bin/env python

# -*-  coding:utf-8 -*-

from tornado.web import Application



from tornado.ioloop import IOLoop



import tcelery

from com.analysis.handlers.data_analysis_handlers import *

from com.analysis.handlers.data_summary_handlers import *

from com.analysis.handlers.data_cid_sumjson_handler import Cid_Sumjson_Handler

from com.analysis.handlers.generator_handlers import GeneratorCsv, GeneratorSpss





Handlers = [



    (r"/single_factor_variance_analysis/(.*)", SingleFactorVarianceAnalysis), # 單因素方差檢驗





]



if __name__ == "__main__":

    tcelery.setup_nonblocking_producer()

    application = Application(Handlers)

    application.listen(port=8888, address="0.0.0.0")

    IOLoop.instance().start()
server

 

#!/usr/bin/env python

# -*-  coding:utf-8 -*-



import tornado.gen

import tornado.web

from com.analysis.core.base import BaseAnalysisRequest

from com.analysis.tasks.data_analysis import *





class SingleFactorVarianceAnalysis(BaseAnalysisRequest):

    @tornado.gen.coroutine

    def get(self, *args, **kwargs):

        response = yield self.celery_task(single_factor_variance_analysis.apply_async, params=args)

        print(response.result)

        self.write(response.result[2])
handler

 

#!/usr/bin/env python

# -*-  coding:utf-8 -*-



from collections import defaultdict



import pandas as pd

import numpy as np

import pygal

import tornado.gen

from pygal.style import LightStyle

from tornado.web import RequestHandler

import json



from com.analysis.db.db_engine import DBEngine

from com.analysis.utils.log import LogCF

from com.analysis.handlers.data_cid_sumjson_handler import cid_sumjson





class BaseRequest(RequestHandler):

    def __init__(self, application, request, **kwargs):

        super(BaseRequest, self).__init__(application, request, **kwargs)





class BaseAnalysisRequest(BaseRequest):

    def __init__(self, application, request, **kwargs):

        super(BaseAnalysisRequest, self).__init__(application, request, **kwargs)



    @tornado.gen.coroutine

    def celery_task(self, func, params, queue="default_analysis"):

        args_list = list(params)

        args_list.insert(0, "")

        response = yield tornado.gen.Task(func, args=args_list, queue=queue)

        raise tornado.gen.Return(response)
basehandler

 

#!/usr/bin/env python

# -*-  coding:utf-8 -*-





from celery import Celery



from com.analysis.core.chi_square_test import CST

from com.analysis.generator.generator import GeneratorCsv, GeneratorSpss



celery = Celery(

    'com.analysis.tasks.data_analysis',

    broker='amqp://192.168.1.1:5672',

    include='com.analysis.tasks.data_analysis'

)

celery.conf.CELERY_RESULT_BACKEND = "amqp://192.168.1.1:5672"

celery.conf.CELERY_ACCEPT_CONTENT = ['application/json']

celery.conf.CELERY_TASK_SERIALIZER = 'json'

celery.conf.CELERY_RESULT_SERIALIZER = 'json'

celery.conf.BROKER_HEARTBEAT = 30

celery.conf.CELERY_IGNORE_RESULT = False  # this is less important

logger = Logger().getLogger()





@celery.task()

def single_factor_variance_analysis(*args):

    return SFV().do_(*args)
task
相關文章
相關標籤/搜索