Celery任務隊列

文檔

簡介

Celery 是一個「自帶電池」的的任務隊列。它易於使用,因此你能夠無視其所解決問題的複雜程度而輕鬆入門。它遵守最佳實踐設計,因此你的產品能夠擴展,或與其餘語言集成,而且它自帶了在生產環境中運行這樣一個系統所需的工具和支持。html

Celery 的最基礎部分。包括:redis

  • 選擇和安裝消息傳輸方式(中間人)----broker,如RabbitMQ,redis等。sql

    • RabbitMQ的安裝:sudo apt-get install rabbitmq-server
    • 本文使用redis
    • 官方推薦RabbitMQ
    • 固然部分nosql也能夠
  • 安裝 Celery 並建立第一個任務
  • 運行職程並調用任務。
  • 追蹤任務在不一樣狀態間的遷移,並檢視返回值。

安裝

pip install celery

簡單使用

定義任務

tasks.pydjango

from celery import Celery
#第一個參數是你的celery名稱
#backen 用於存儲結果
#broker 用於存儲消息隊列
app = Celery('tasks',backend='redis://:password@host:port/db', broker='redis://:password@host:port/db')

@app.task
def add(x, y):
    return x + y

Celery 的第一個參數是當前模塊的名稱,這個參數是必須的,這樣的話名稱能夠自動生成。第二個參數是中間人關鍵字參數,指定你所使用的消息中間人的 URL,此處使用了 RabbitMQ,也是默認的選項。更多可選的中間人見上面的 選擇中間人 一節。例如,對於 RabbitMQ 你能夠寫 amqp://localhost ,而對於 Redis 你能夠寫 redis://localhost .json

你定義了一個單一任務,稱爲 add ,返回兩個數字的和。後端

啓動celery服務

步驟:緩存

  • 啓動任務工做者worker
  • 講任務放入celery隊列
  • worker讀取隊列,並執行任務

啓動一個工做者,建立一個任務隊列app

// -A 指定celery名稱,loglevel制定log級別,只有大於或等於該級別纔會輸出到日誌文件
celery -A tasks worker --loglevel=info

若是你沒有安裝redis庫,請先pip install redisnosql

使用celery

如今咱們已經有一個celery隊列了,我門只須要將工做所需的參數放入隊列便可分佈式

from tasks import add
#調用任務會返回一個 AsyncResult 實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(若是任務失敗,則爲異常和回溯)。
#但這個功能默認是不開啓的,你須要設置一個 Celery 的結果後端(即backen,咱們在tasks.py中已經設置了,backen就是用來存儲咱們的計算結果)
result=add.delay(4, 4)
#若是任務已經完成
if(result.ready()):
  #獲取任務執行結果
  print(result.get(timeout=1))

經常使用接口

  • tasks.add(4,6) ---> 本地執行
  • tasks.add.delay(3,4) --> worker執行
  • t=tasks.add.delay(3,4) --> t.get() 獲取結果,或卡住,阻塞
  • t.ready()---> False:未執行完,True:已執行完
  • t.get(propagate=False) 拋出簡單異常,但程序不會中止
  • t.traceback 追蹤完整異常

使用配置

  • 使用配置來運行,對於正式項目來講可維護性更好。配置能夠使用app.config.XXXXX_XXX='XXX'的形式如app.conf.CELERY_TASK_SERIALIZER = 'json'來進行配置
  • 配置資料

配置文件

config.py

#broker
BROKER_URL = 'redis://:password@host:port/db'
#backen
CELERY_RESULT_BACKEND = 'redis://:password@host:port/db'
#導入任務,如tasks.py
CELERY_IMPORTS = ('tasks', )
#列化任務載荷的默認的序列化方式
CELERY_TASK_SERIALIZER = 'json'
#結果序列化方式
CELERY_RESULT_SERIALIZER = 'json'

CELERY_ACCEPT_CONTENT=['json']
#時間地區與形式
CELERY_TIMEZONE = 'Europe/Oslo'
#時間是否使用utc形式
CELERY_ENABLE_UTC = True

#設置任務的優先級或任務每分鐘最多執行次數
CELERY_ROUTES = {
    # 若是設置了低優先級,則可能好久都沒結果
    #'tasks.add': 'low-priority',
    #'tasks.add': {'rate_limit': '10/m'},
    #'tasks.add': {'rate_limit': '10/s'},
    #'*': {'rate_limit': '10/s'}
}
#borker池,默認是10
BROKER_POOL_LIMIT = 10
#任務過時時間,單位爲s,默認爲一天
CELERY_TASK_RESULT_EXPIRES = 3600
#backen緩存結果的數目,默認5000
CELERY_MAX_CACHED_RESULTS = 10000

開啓服務

celery.py

from celery import Celery
#指定名稱
app = Celery('mycelery')
#加載配置模塊
app.config_from_object('config')

if __name__=='__main__':
      app.start()

任務定義

tasks.py

from .celery import app
@app.task
def add(a, b):
  return a + b

啓動

// -l 是 --loglevel的簡寫
celery -A mycelery worker -l info

執行/調用服務

from tasks import add
#調用任務會返回一個 AsyncResult 實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(若是任務失敗,則爲異常和回溯)。
#但這個功能默認是不開啓的,你須要設置一個 Celery 的結果後端(即backen,咱們在tasks.py中已經設置了,backen就是用來存儲咱們的計算結果)
result=add.delay(4, 4)
#若是任務已經完成
if(result.ready()):
  #獲取任務執行結果
  print(result.get(timeout = 1))

分佈式

  • 啓動多個celery worker,這樣即便一個worker掛掉了其餘worker也能繼續提供服務

    • 方法一
    // 啓動三個worker:w1,w2,w3
    celery multi start w1 -A project -l info
    celery multi start w2 -A project -l info
    celery multi start w3 -A project -l info
    // 當即中止w1,w2,即使如今有正在處理的任務
    celery multi stop w1 w2
    // 重啓w1
    celery multi restart w1 -A project -l info
    // celery multi stopwait w1 w2 w3    # 待任務執行完,中止
    • 方法二
    // 啓動多個worker,可是不指定worker名字
    // 你能夠在同一臺機器上運行多個worker,但要爲每一個worker指定一個節點名字,使用--hostname或-n選項
    // concurrency指定處理進程數,默認與cpu數量相同,所以通常無需指定
    $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
    $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
    $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

錯誤處理

celery能夠指定在發生錯誤的狀況下進行自定義的處理
config.py

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
    print('Oh no! Task failed: {0!r}'.format(exc))

// 對全部類型的任務,當發生執行失敗的時候所執行的操做
CELERY_ANNOTATIONS = {'*': {'on_failure': my_on_failure}}
相關文章
相關標籤/搜索