Flask中使用celery隊列處理執行時間較長的請求。html
一. 安裝celerypython
pip install celery flask redis
二. celery簡介redis
Celery是個異步分佈式任務隊列 經過Celery在後臺跑任務並不像線程那麼簡單,可是用Celery的話,可以是應用有較好的擴展性,由於Celery是個分佈式架構,下面介紹Celery的三個核心組件: 1. 生產者(Celery client): 生產者發送消息,在Flask上工做時,生產者在Flask應用內運行 2. 消費者(Celert worker): 消費者用於處理後臺任務。消費者能夠是本地的也能夠是遠程的。咱們能夠在運行Flask的server上運行一個單一的消費者,當業務量上漲以後再去添加更多的消費者 3. 消息傳遞着(Celery broker): 生產者和消費者的信息交互使用的是消息隊列,Celery支持若干方式的消息隊列,其中最長用的是RabbitMQ和Redis, 咱們在使用過程當中使用的Redis
三. redis配置與使用json
redis配置文件/etc/redis.conf 1.設置爲後臺啓動 daemonize yes 2.redis端口設置 port 6379 # default prot 3.日誌文件 logfile /home/liuyadong/work/log/redis.log 4.數據保存文件 dir /home/liuyadong/data/redisData 經過下面命令指定配置文件啓動redis: redis-server /etc/redis.conf 經過下面命令測試是否啓動成功: redis-cli -p 6379 下面這樣表示成功(進入了命令行模式): redis 127.0.0.1:6379> 查看啓動端口: sudo netstat -ntlp | grep 6379 tcp 0 0 127.0.0.1:6379 0.0.0.0:* LISTEN 49380/redis-server
四. celery使用簡介flask
1.Choosing a broker 最經常使用的broker包括: RabbitMQ 和 Redis, 咱們使用Redis, Redis的安裝及啓動等查看第二部分 2.intall celery pip install celery 3.Application 使用celery的第一步是建立一個application, 一般叫作'app'。具體的建立一個app的代碼以下: $ cat tasks.py #!/usr/bin/env python from celery import Celery app = Celery('tasks', broker='redis://localhost') @app.tasks def add(x, y): return x + y Note: Celery第一個參數必須是當前module的模擬購置,本次實例中爲:tasks 4.Running the celery worker server $ celery -A tasks worker --loglevel=info 5.Calling the tasks 能夠經過delay()或者apply_sync()方法來調用一個task >>> from tasks import add >>> add.delay(4, 4) 6. Keeping Results 咱們能夠將task的執行狀態保存起來,能夠保存到broker中, 能夠經過CELERY_RESULT_BACKEND字段來設置保存結果。 也能夠經過Celery的backend參數來設置 app.Celery('tasks', broker='redis://localhost', backend='redis://localhost') >>> result = add.delay(4, 4) 能夠經過ready()方法來判斷程序執行是否完成,執行完成返回True. >>> result.ready() False 下面是AsyncResult對象的其餘調用方法介紹: 1) AsyncResult.get(timeout=None, propagate=True, interval=0.5, no_ack=True, follow_parents=True) timeout : 設置一個等待的預操做時間,單位是s, 方法返回執行結果 propagate : 若是task執行失敗,則Re-taise Exception interval : 等待必定時間從新執行操做,若是使用amqp來存儲backend則此參數無效 no_ack : Enable amqp no ack (automatically acknowledge message) If this is False then the message will not be acked follow_parents : Reraise any exception raised by parent task 2) AsyncResult.state 或 status屬性 方法返回當前task的執行狀態,返回值包括下面多種狀況: PENDING: task正在等待執行 STARTED: task已經開始執行了 RETRY : task從新執行了,這多是因爲第一次執行失敗引發的 FAILURE: task執行引起了異常,而且結果的屬性當中包括了異常是由哪一個task引發的 SUCCESS: task執行成功,結果的屬性當中包括執行結果 3) AsyncResult.success() 若是返回True,則表示task執行成功 4) AsyncResult.traceback() 獲得一個執行失敗的task的traceback 7.Configuration celert 默認的配置對於大多數用戶來講已經足夠好了,可是咱們仍有許多想讓celery按照咱們的想法去work,經過configuration實現是一個好的方式。 configutation能夠經過app設置,也能夠經過一個單獨的模塊進行設置。 好比,經過app設置CELERY_TASK_SERIALIZER屬性:app.conf.CELERY_TASK_SERIALIZER = 'json' 若是你一次性有許多須要配置,則能夠經過update()方法實現: app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, ) 你也能夠經過app.config_from_object() method告訴Celery經過一個模塊來生成configuration: app.config_from_object('celeryconfig') 這個模塊一般叫作 celeryconfig,但實際上你能夠叫任何名字。 $ cat celeryconfig.py CELERY_ROUTES = {'tasks.add': 'low-priority', 'tasks.add': {'rate_limit': '10/m'} 8.Where to go from here 若是你想了解更多請閱讀: http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps 以後閱讀: http://docs.celeryproject.org/en/latest/userguide/index.html#guide