Broker 的選擇大體有消息隊列和數據庫兩種,這裏建議儘可能避免使用數據庫做爲 Broker,除非你的業務系統足夠簡單。在併發量很高的複雜系統中,大量 Workers 訪問數據庫的行爲會使得操做系統磁盤 I/O 一直處於高峯值狀態,很是影響系統性能。若是數據庫 Broker 同時還兼顧着後端業務的話,那麼應用程序也很容易被拖垮。python
反觀選擇消息隊列,例如 RabbitMQ,就不存在以上的問題。首先 RabbitMQ 的隊列存放到內存中,速度快且不佔用磁盤 I/O。再一個就是 RabbitMQ 會主動將任務推送給 Worker,因此 Worker 無需頻繁的去輪詢隊列,避免無謂的資源浪費。web
Task.delay/Task.apply_async 返回的 AsyncResult 對象用於關聯任務的執行結果,前提是啓用了 Result Backend。不過任務結果的傳遞一樣須要成本,因此 Celery 默認會將其 Disabled。數據庫
app.conf.task_ignore_result = False
@app.task(ignore_result=True)
def add(...):
若是你僅但願返回並持久化任務執行失敗的異常結果,以便於後續的調查分析,那麼你能夠在使用數據庫做爲 Result Backend 的同時應用下列配置:後端
# Only store task errors in the result backend.
app.conf.task_ignore_result = True
app.conf.task_store_errors_even_if_ignored = True
所謂事有輕重緩急,任務如是。例如,用戶的驗證碼短信比較緊急,應及時發送,而宣傳短信則能夠延後再發,以此提供更好的用戶體驗。併發
實現任務優先級最簡單的思路就是,首先將任務進行合理分類,通常的咱們會將實時任務、高頻率任務、短期任務劃分爲高優先級任務;而定時任務、低頻率任務、長時間任務則爲低優先級任務。而後再爲處理高優先級任務的隊列分配更多的 Worker。app
不過這種簡單粗暴的方式還存在一個問題,當高優先級任務被消費完後,相應的Workers 就會空閒下來,很是浪費系統資源。那麼改善的方法就是,「在高優先級任務隊列始終擁有更多 Worker 的前提下,當這些 Worker 空閒時,也能夠用於處理低優先級的任務」。利用 Worker 多隊列訂閱特性便可實現這個效果,例如,如今有 high_queue、low_queue 以及 worker_一、二、3。那麼就可讓 worker_一、二、3 均訂閱 high_queue 的同時,也讓 worker_二、3 訂閱 low_queue。運維
Celery Worker 支持下列四種併發方式。async
經過配置項 worker_pool 指定,默認爲 prefork:ide
# Single-threaded execution pool
app.conf.worker_pool = 'solo'
同時還能夠經過配置項 worker_concurrency 來指定併發池的 size,默認爲運行環境的 CPU 數量:svg
app.conf.worker_concurrency = 10
回到正題,當咱們選擇使用 prefork/gevent 併發方式時,建議應用 Worker 併發池的 autoscale 自動適配功能,在 celery CLI 中使用 --autoscale
選項來指定併發池的上下限。例如:
celery worker -A proj --autoscale=6,3
但須要注意的是,不管是 Worker 的數量仍是併發池的數量都並不是越多越好,畢竟其自身的存在就須要消耗系統資源。但有一個原則是,當你的任務爲 I/O 密集型時,能夠適量增大併發池的 size;若是你的任務爲 CPU 密集型時,默認 size 不失爲一個保險的選擇。總而言之,最佳配比須要結合自身實際狀況不斷的嘗試得出。
Prefetch 預取數是繼承至 RabbitMQ 的原語,即爲 Worker 一次從隊列中獲取的任務消息的數量。任務的執行時間有長有短,咱們應該爲短期任務設置更大的任務預取數,以下降獲取任務帶來的資源消耗。
經過配置項 worker_prefetch_multiplier 來指定全局預取數乘子,默認爲 4。當設置爲 1 時,表示 disable 預取功能;當設置爲 0 時,表示 Worker 會盡量多的獲取任務。
# prefetch_count = worker_prefetch_multiplier * concurrent_processes_count
app.conf.worker_prefetch_multiplier = 10
若是你的任務既有長任務,又有短任務,那麼這裏建議你應用分開配置的 Worker 。以文件上傳爲例,上傳小文件(小於 1MB)的數量要遠大於上傳大文件(大於 20MB)的數量。那麼小文件上傳任務就屬於高頻短任務,而大文件上傳任務則是低頻長任務。分別實現 queue_small/worker_small_一、2 以及 queue_big/worker_big 來處理,同時應該爲 worker_small_一、2 設置更大的 Prefetch。
# filename: big_prefetch.py
CELERYD_PREFETCH_MULTIPLIER = 10
# filename: small_prefetch.py
CELERYD_PREFETCH_MULTIPLIER = 100
celery worker -A proj -Q queue_small --config big_prefetch -n worker_small_1
celery worker -A proj -Q queue_small --config small_prefetch -n worker_small_2
celery worker -A proj -Q queue_big --config big_prefetch -n worker_big
Celery 雖然提供了任務異常重試,但卻沒法保證任務的事務性,即不提供任務狀態的回滾能力。因此爲了讓任務更易於部署和重試,應該儘可能將一個長任務拆解爲多個符合冪等性的短任務。
冪等(idempotent)是一個數學概念,常見於抽象代數。冪等性函數的特徵爲「若是接受到相同的實參,那麼不管重複執行多少次,都能獲得相同的結果」。例如,get_user_name()
和 set_true()
均屬冪等函數。
可見冪等性任務結合任務異常重試,可以很是有效的提升任務執行的健壯性。
避免某些任務一直處於非正常的進行中狀態,阻塞隊列中的其餘任務。應該爲任務執行設置超時時間。若是任務超時未完成,則會將 Worker 殺死,並啓動新的 Worker 來替代。
app.conf.task_time_limit = 1800
@app.task(time_limit=1800)
def add(...):
Celery 支持 group/chain/chord/chunks/map/starmap 等多種工做流原語,基本能夠覆蓋大部分複雜的任務組合需求,善用任務工做流可以更好的應用 Celery 優秀的併發特性。例如,若是下一步任務須要等待上一步任務的執行結果,那麼不該該單純的應用 get 方法來實現同步子任務,而是應該使用 chain 任務鏈。
使用 RabbitMQ 充當 Broker,能夠應用 RabbitMQ 的 ACK 機制來保證任務有效傳遞。但在任務執行要求很是嚴格的場景中,「有效傳遞」顯然是不夠的,「有效執行」才能夠。
爲了支持「有效執行」,Celery 在 ACK 的基礎上提供了 ack_late 機制。即只有當任務完成(成功/失敗)後,再向 Broker 回傳 ACK。而代價就是消息隊列的性能會下降,畢竟任務消息佔用隊列資源的時間變長了。
一般的,對於一些以小時爲單位的長時間任務,我會建議實現一次只保留一項任務的 ack late 方式。
app.conf.task_acks_late = True
app.conf.worker_prefetch_multiplier = 1
@app.task(ack_late=True)
def add(...):
有時候任務執行須要對象的參與,此時建議傳遞對象的惟一標識,而非直接將對象序列化後再傳遞。例如,不要嘗試將數據庫的 ORM 對象做爲任務消息傳遞,而是傳遞 ORM 對象的主鍵 id。當任務執行到須要使用 ORM 對象時,再經過 id 從數據庫實時獲取,避免 ORM 對象由於隊列阻塞致使與數據庫實時記錄不一致的狀況。
同一個 Worker 在執行了大量任務後,會有概率出現內存泄漏的狀況。這裏建議全局設置 Worker 最大的任務執行數,Worker 在完成了最大的任務執行數後就主動退出。
app.conf.worker_max_tasks_per_child = 100
定時任務的調度計劃要通過科學合理的設計,通常的,咱們建議遵照如下幾點原則:
Flower 是 Celery 官方推薦的實時監控工具,用於監控 Tasks 和 Workers 的運行狀態。Flower 提供了下列功能:
相比查看日誌,Flower 的 Web 界面會顯得更加友好。