聲明:代碼是從項目中截取的, 爲進行測試python
使用Celery任務隊列,Celery 只是一個任務隊列,須要一個broker媒介,將耗時的任務傳遞給Celery任務隊列執行,執行完畢將結果經過broker媒介返回。官方推薦使用RabbitMQ做爲消息傳遞,redis也能夠web
單個參數配置: app.conf.CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘
多個參數配置: app.conf.update( CELERY_BROKER_URL = ‘amqp://guest@localhost//‘, CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘ )
從配置文件中獲取:(將配置參數寫在文件app.py中)redis
BROKER_URL=‘amqp://guest@localhost//‘ CELERY_RESULT_BACKEND=‘redis://localhost:6379/0‘ app.config_from_object(‘celeryconfig‘)
啓動一個Celery 任務隊列,也就是消費者:json
from celery import Celery celery = Celery(‘tasks‘, broker=‘amqp://guest:guest@119.29.151.45:5672‘, backend=‘amqp‘) 使用RabbitMQ做爲載體, 回調也是使用rabbit做爲載體 @celery.task(name=‘doing‘) #異步任務,須要命一個獨一無二的名字 def doing(s, b): print(‘開始任務‘) logging.warning(‘開始任務--{}‘.format(s)) time.sleep(s) return s+b
啓動任務生產者app
#!/usr/bin/env python # -*- coding:utf-8 -*- import tcelery from tornado.web import RequestHandler import tornado tcelery.setup_nonblocking_producer() # 設置爲非阻塞生產者,不然沒法獲取回調信息 class MyMainHandler(RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self, *args, **kwargs): print('begin') result = yield tornado.gen.Task(sleep.apply_async, args=[10]) # 使用yield 獲取異步返回值,會一直等待可是不阻塞其餘請求 print('ok - -{}'.format(result.result)) # 返回值結果 # sleep.apply_async((10, ), callback=self.on_success) # print(‘ok -- {}‘.format(result.get(timeout=100)))#使用回調的方式獲取返回值,發送任務以後,請求結束,因此不能放在處理tornado的請求任務當中,由於請求已經結束了,與客戶端已經斷開鏈接,沒法再在獲取返回值的回調中繼續向客戶端返回數據 # result = sleep.delay(10) #delay方法只是對apply_async方法的封裝而已 # data = result.get(timeout=100) #使用get方法獲取返回值,會致使阻塞,至關於同步執行 def on_success(self, response): # 回調函數 print('Ok - - {}'.format(response))
=======================less
#!/usr/bin/env python # -*- coding:utf-8 -*- from tornado.web import Application from tornado.ioloop import IOLoop import tcelery from com.analysis.handlers.data_analysis_handlers import * from com.analysis.handlers.data_summary_handlers import * from com.analysis.handlers.data_cid_sumjson_handler import Cid_Sumjson_Handler from com.analysis.handlers.generator_handlers import GeneratorCsv, GeneratorSpss Handlers = [ (r"/single_factor_variance_analysis/(.*)", SingleFactorVarianceAnalysis), # 單因素方差檢驗 ] if __name__ == "__main__": tcelery.setup_nonblocking_producer() application = Application(Handlers) application.listen(port=8888, address="0.0.0.0") IOLoop.instance().start()
#!/usr/bin/env python # -*- coding:utf-8 -*- import tornado.gen import tornado.web from com.analysis.core.base import BaseAnalysisRequest from com.analysis.tasks.data_analysis import * class SingleFactorVarianceAnalysis(BaseAnalysisRequest): @tornado.gen.coroutine def get(self, *args, **kwargs): response = yield self.celery_task(single_factor_variance_analysis.apply_async, params=args) print(response.result) self.write(response.result[2])
#!/usr/bin/env python # -*- coding:utf-8 -*- from collections import defaultdict import pandas as pd import numpy as np import pygal import tornado.gen from pygal.style import LightStyle from tornado.web import RequestHandler import json from com.analysis.db.db_engine import DBEngine from com.analysis.utils.log import LogCF from com.analysis.handlers.data_cid_sumjson_handler import cid_sumjson class BaseRequest(RequestHandler): def __init__(self, application, request, **kwargs): super(BaseRequest, self).__init__(application, request, **kwargs) class BaseAnalysisRequest(BaseRequest): def __init__(self, application, request, **kwargs): super(BaseAnalysisRequest, self).__init__(application, request, **kwargs) @tornado.gen.coroutine def celery_task(self, func, params, queue="default_analysis"): args_list = list(params) args_list.insert(0, "") response = yield tornado.gen.Task(func, args=args_list, queue=queue) raise tornado.gen.Return(response)
#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery from com.analysis.core.chi_square_test import CST from com.analysis.generator.generator import GeneratorCsv, GeneratorSpss celery = Celery( 'com.analysis.tasks.data_analysis', broker='amqp://192.168.1.1:5672', include='com.analysis.tasks.data_analysis' ) celery.conf.CELERY_RESULT_BACKEND = "amqp://192.168.1.1:5672" celery.conf.CELERY_ACCEPT_CONTENT = ['application/json'] celery.conf.CELERY_TASK_SERIALIZER = 'json' celery.conf.CELERY_RESULT_SERIALIZER = 'json' celery.conf.BROKER_HEARTBEAT = 30 celery.conf.CELERY_IGNORE_RESULT = False # this is less important logger = Logger().getLogger() @celery.task() def single_factor_variance_analysis(*args): return SFV().do_(*args)