Celery 是一個「自帶電池」的的任務隊列。它易於使用,因此你能夠無視其所解決問題的複雜程度而輕鬆入門。它遵守最佳實踐設計,因此你的產品能夠擴展,或與其餘語言集成,而且它自帶了在生產環境中運行這樣一個系統所需的工具和支持。html
Celery 的最基礎部分。包括:redis
選擇和安裝消息傳輸方式(中間人)----broker,如RabbitMQ,redis等。sql
pip install celery
tasks.pydjango
from celery import Celery #第一個參數是你的celery名稱 #backen 用於存儲結果 #broker 用於存儲消息隊列 app = Celery('tasks',backend='redis://:password@host:port/db', broker='redis://:password@host:port/db') @app.task def add(x, y): return x + y
Celery 的第一個參數是當前模塊的名稱,這個參數是必須的,這樣的話名稱能夠自動生成。第二個參數是中間人關鍵字參數,指定你所使用的消息中間人的 URL,此處使用了 RabbitMQ,也是默認的選項。更多可選的中間人見上面的 選擇中間人 一節。例如,對於 RabbitMQ 你能夠寫 amqp://localhost ,而對於 Redis 你能夠寫 redis://localhost .json
你定義了一個單一任務,稱爲 add ,返回兩個數字的和。後端
步驟:緩存
啓動一個工做者,建立一個任務隊列app
// -A 指定celery名稱,loglevel制定log級別,只有大於或等於該級別纔會輸出到日誌文件 celery -A tasks worker --loglevel=info
若是你沒有安裝redis庫,請先pip install redisnosql
如今咱們已經有一個celery隊列了,我門只須要將工做所需的參數放入隊列便可分佈式
from tasks import add #調用任務會返回一個 AsyncResult 實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(若是任務失敗,則爲異常和回溯)。 #但這個功能默認是不開啓的,你須要設置一個 Celery 的結果後端(即backen,咱們在tasks.py中已經設置了,backen就是用來存儲咱們的計算結果) result=add.delay(4, 4) #若是任務已經完成 if(result.ready()): #獲取任務執行結果 print(result.get(timeout=1))
經常使用接口
config.py
#broker BROKER_URL = 'redis://:password@host:port/db' #backen CELERY_RESULT_BACKEND = 'redis://:password@host:port/db' #導入任務,如tasks.py CELERY_IMPORTS = ('tasks', ) #列化任務載荷的默認的序列化方式 CELERY_TASK_SERIALIZER = 'json' #結果序列化方式 CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] #時間地區與形式 CELERY_TIMEZONE = 'Europe/Oslo' #時間是否使用utc形式 CELERY_ENABLE_UTC = True #設置任務的優先級或任務每分鐘最多執行次數 CELERY_ROUTES = { # 若是設置了低優先級,則可能好久都沒結果 #'tasks.add': 'low-priority', #'tasks.add': {'rate_limit': '10/m'}, #'tasks.add': {'rate_limit': '10/s'}, #'*': {'rate_limit': '10/s'} } #borker池,默認是10 BROKER_POOL_LIMIT = 10 #任務過時時間,單位爲s,默認爲一天 CELERY_TASK_RESULT_EXPIRES = 3600 #backen緩存結果的數目,默認5000 CELERY_MAX_CACHED_RESULTS = 10000
celery.py
from celery import Celery #指定名稱 app = Celery('mycelery') #加載配置模塊 app.config_from_object('config') if __name__=='__main__': app.start()
tasks.py
from .celery import app @app.task def add(a, b): return a + b
// -l 是 --loglevel的簡寫 celery -A mycelery worker -l info
from tasks import add #調用任務會返回一個 AsyncResult 實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(若是任務失敗,則爲異常和回溯)。 #但這個功能默認是不開啓的,你須要設置一個 Celery 的結果後端(即backen,咱們在tasks.py中已經設置了,backen就是用來存儲咱們的計算結果) result=add.delay(4, 4) #若是任務已經完成 if(result.ready()): #獲取任務執行結果 print(result.get(timeout = 1))
啓動多個celery worker,這樣即便一個worker掛掉了其餘worker也能繼續提供服務
// 啓動三個worker:w1,w2,w3 celery multi start w1 -A project -l info celery multi start w2 -A project -l info celery multi start w3 -A project -l info // 當即中止w1,w2,即使如今有正在處理的任務 celery multi stop w1 w2 // 重啓w1 celery multi restart w1 -A project -l info // celery multi stopwait w1 w2 w3 # 待任務執行完,中止
// 啓動多個worker,可是不指定worker名字 // 你能夠在同一臺機器上運行多個worker,但要爲每一個worker指定一個節點名字,使用--hostname或-n選項 // concurrency指定處理進程數,默認與cpu數量相同,所以通常無需指定 $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h
celery能夠指定在發生錯誤的狀況下進行自定義的處理
config.py
def my_on_failure(self, exc, task_id, args, kwargs, einfo): print('Oh no! Task failed: {0!r}'.format(exc)) // 對全部類型的任務,當發生執行失敗的時候所執行的操做 CELERY_ANNOTATIONS = {'*': {'on_failure': my_on_failure}}