設置時區
CELERY_TIMEZONE = 'Asia/Shanghai'
啓動時區設置
CELERY_ENABLE_UTC = True
限制任務的執行頻率
下面這個就是限制tasks模塊下的add函數,每秒鐘只能執行10次
CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}}
或者限制全部的任務的刷新頻率
CELERY_ANNOTATIONS = {'*':{'rate_limit':'10/s'}}
也能夠設置若是任務執行失敗後調用的函數python
def my_on_failure(self,exc,task_id,args,kwargs,einfo): print('task failed') CELERY_ANNOTATIONS = {'*':{'on_failure':my_on_failure}}
併發的worker數量,也是命令行-c指定的數目
事實上並非worker數量越多越好,保證任務不堆積,加上一些新增任務的預留就能夠了
CELERYD_CONCURRENCY = 20
redis
celery worker每次去redis取任務的數量,默認值就是4
CELERYD_PREFETCH_MULTIPLIER = 4
json
每一個worker執行了多少次任務後就會死掉,建議數量大一些
CELERYD_MAX_TASKS_PER_CHILD = 200
bash
使用redis做爲任務隊列
組成: db+scheme://user:password@host:port/dbname
BROKER_URL = 'redis://127.0.0.1:6379/0'
服務器
celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 1200
單個任務的運行時間限制,不然會被殺死
CELERYD_TASK_TIME_LIMIT = 60
併發
使用redis存儲任務執行結果,默認不使用
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
app
將任務結果使用'pickle'序列化成'json'格式
任務序列化方式
CELERY_TASK_SERIALIZER = 'pickle'
任務執行結果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
也能夠直接在Celery對象中設置序列化方式
app = Celery('tasks', broker='...', task_serializer='yaml')
函數
關閉限速
CELERY_DISABLE_RATE_LIMITS = True
性能
在celery4.x之後,就是BROKER_URL,若是是之前,須要寫成CELERY_BROKER_URL
BROKER_URL = 'redis://127.0.0.1:6379/0'
指定結果的接收地址
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
spa
指定任務序列化方式
CELERY_TASK_SERIALIZER = 'msgpack'
指定結果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
指定任務接受的序列化類型.
CELERY_ACCEPT_CONTENT = ['msgpack']
任務過時時間,celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60
任務發送完成是否須要確認,對性能會稍有影響
CELERY_ACKS_LATE = True
壓縮方案選擇,能夠是zlib, bzip2,默認是發送沒有壓縮的數據
CELERY_MESSAGE_COMPRESSION = 'zlib'
規定完成任務的時間
在5s內完成任務,不然執行該任務的worker將被殺死,任務移交給父進程
CELERYD_TASK_TIME_LIMIT = 5
celery worker的併發數,默認是服務器的內核數目,也是命令行-c參數指定的數目
CELERYD_CONCURRENCY = 4
celery worker 每次去BROKER中預取任務的數量
CELERYD_PREFETCH_MULTIPLIER = 4
每一個worker執行了多少任務就會死掉,默認是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40
設置默認的隊列名稱,若是一個消息不符合其餘的隊列就會放在默認隊列裏面,若是什麼都不設置的話,數據都會發送到默認的隊列中
CELERY_DEFAULT_QUEUE = "default"
隊列的詳細設置
CELERY_QUEUES = {
"default": { # 這是上面指定的默認隊列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列 "routing_key": "topic.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "task_eeg": { # 設置扇形交換機 "exchange": "tasks", "exchange_type": "fanout", "binding_key": "tasks", },
或者配置成下面兩種方式:
# 配置隊列(settings.py) CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('for_task_collect', Exchange('for_task_collect'), routing_key='for_task_collect'), Queue('for_task_compute', Exchange('for_task_compute'), routing_key='for_task_compute'), )
# 路由(哪一個任務放入哪一個隊列) CELERY_ROUTES = { 'umonitor.tasks.multiple_thread_metric_collector': { 'queue': 'for_task_collect', 'routing_key': 'for_task_collect' }, 'compute.tasks.multiple_thread_metric_aggregate': { 'queue': 'for_task_compute', 'routing_key': 'for_task_compute' }, 'compute.tasks.test': { 'queue': 'for_task_compute', 'routing_key': 'for_task_compute' }, }