將耗時任務放到後臺異步執行,不影響用戶其餘操做。html
任務隊列是一種跨線程,跨機器的機制。python
任務隊列中包含稱做任務的工做單元。有專門的進程持續不斷的監視任務隊列,並從中獲得新的任務處理。redis
elery經過消息進行通訊,一般使用一個叫Broker(中間人)來協client(任務的發出者)和worker(任務的處理者). clients發出消息到隊列中,broker將隊列中的信息派發給worker來處理。django
一個celery系統能夠包含不少的worker和broker,可加強橫向擴展性和高可用性能。服務器
RabbitMQ是一個功能完備,穩定的而且易於安裝的broker. 它是生產環境中最優的選擇。app
Redis也是一款功能完備的broker可選項,可是其更可能因意外中斷或者電源故障致使數據丟失的狀況。 關因而有那個Redis做爲Broker,可訪下面網址:http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis異步
首先建立tasks.py模塊函數
from celery import Celery # 咱們這裏案例使用redis做爲broker app = Celery('demo', broker='redis://:332572@127.0.0.1/1') # 建立任務函數 @app.task def my_task(): print("任務函數正在執行....")
Celery第一個參數是給其設定一個名字, 第二參數咱們設定一箇中間人broker, 在這裏咱們使用Redis做爲中間人。my_task函數是咱們編寫的一個任務函數, 經過加上裝飾器app.task, 將其註冊到broker的隊列中。性能
如今咱們在建立一個worker, 等待處理隊列中的任務.打開終端,cd到tasks.py同級目錄中,執行命令:url
celery -A tasks worker --loglevel=info
任務加入到broker隊列中,以便剛纔咱們建立的celery workder服務器可以從隊列中取出任務並執行。如何將任務函數加入到隊列中,可以使用delay()。
進入python終端, 執行以下代碼:
from tasks import my_task my_task.delay()
若是咱們想跟蹤任務的狀態,Celery須要將結果保存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。
例子咱們仍然使用Redis做爲存儲結果的方案,任務結果存儲配置咱們經過Celery的backend參數來設定。咱們將tasks模塊修改以下:
from celery import Celery # 咱們這裏案例使用redis做爲broker app = Celery('demo', backend='redis://:332572@127.0.0.1:6379/2', broker='redis://:332572@127.0.0.1:6379/1') # 建立任務函數 @app.task def my_task(a, b): print("任務函數正在執行....") return a + b
咱們給Celery增長了backend參數,指定redis做爲結果存儲,並將任務函數修改成兩個參數,而且有返回值。
from celery import Celery app = Celery('demo') # 增長配置 app.conf.update( result_backend='redis://:332572@127.0.0.1:6379/2', broker_url='redis://:332572@127.0.0.1:6379/1', )
對於比較大的項目,咱們建議配置信息做爲一個單獨的模塊。咱們能夠經過調用app的函數來告訴Celery使用咱們的配置模塊。
配置模塊的名字咱們取名爲celeryconfig, 這個名字不是固定的,咱們能夠任意取名,建議這麼作。咱們必須保證配置模塊可以被導入。
下面咱們在tasks.py模塊 同級目錄下建立配置模塊celeryconfig.py:
result_backend = 'redis://:332572@127.0.0.1:6379/2' broker_url = 'redis://:332572@127.0.0.1:6379/1'
tasks.py文件修改成:
from celery import Celery import celeryconfig # 咱們這裏案例使用redis做爲broker app = Celery('demo') # 從單獨的配置模塊中加載配置 app.config_from_object('celeryconfig')
1.建立celery_tasks包
建立main.py config.py 具體的任務包eg:sms
在sms包中建立tasks.py
tasks.py 代碼
from celery_tasks.main import app @app.task(name='my_task1') def my_task1(*args, **kwargs): print('執行任務1發送sms短信')
config.py代碼
broker_url = "redis://127.0.0.1/14"
main.py代碼
from celery import Celery # 爲celery使用django配置文件進行設置 import os if not os.getenv('DJANGO_SETTINGS_MODULE'): os.environ['DJANGO_SETTINGS_MODULE'] = 'demo.settings.dev' # 建立celery應用 app = Celery('my_app') # 導入celery配置 app.config_from_object('celery_tasks.config') # 自動註冊celery任務 app.autodiscover_tasks(['celery_tasks.sms'])
啓動celery
celery -A celry_tasks.main worker -l info
在須要調用任務的模塊使用
from celery_tasks.sms import tasks as sms_tasks sms_tasks.send_sms_code.delay(mobile, sms_code, sms_code_expires)