分佈式消息隊列 Celery 的最佳實踐

目錄

不使用數據庫做爲 Broker

Broker 的選擇大體有消息隊列和數據庫兩種,這裏建議儘可能避免使用數據庫做爲 Broker,除非你的業務系統足夠簡單。在併發量很高的複雜系統中,大量 Workers 訪問數據庫的行爲會使得操做系統磁盤 I/O 一直處於高峯值狀態,很是影響系統性能。若是數據庫 Broker 同時還兼顧着後端業務的話,那麼應用程序也很容易被拖垮。python

反觀選擇消息隊列,例如 RabbitMQ,就不存在以上的問題。首先 RabbitMQ 的隊列存放到內存中,速度快且不佔用磁盤 I/O。再一個就是 RabbitMQ 會主動將任務推送給 Worker,因此 Worker 無需頻繁的去輪詢隊列,避免無謂的資源浪費。web

不要過度關注任務結果

Task.delay/Task.apply_async 返回的 AsyncResult 對象用於關聯任務的執行結果,前提是啓用了 Result Backend。不過任務結果的傳遞一樣須要成本,因此 Celery 默認會將其 Disabled。數據庫

  • 全局開啓返回任務結果,默認爲關閉:
app.conf.task_ignore_result = False
  • 局部關閉返回任務結果:
@app.task(ignore_result=True)
def add(...):

若是你僅但願返回並持久化任務執行失敗的異常結果,以便於後續的調查分析,那麼你能夠在使用數據庫做爲 Result Backend 的同時應用下列配置:後端

# Only store task errors in the result backend.
app.conf.task_ignore_result = True
app.conf.task_store_errors_even_if_ignored = True

實現優先級任務

所謂事有輕重緩急,任務如是。例如,用戶的驗證碼短信比較緊急,應及時發送,而宣傳短信則能夠延後再發,以此提供更好的用戶體驗。併發

實現任務優先級最簡單的思路就是,首先將任務進行合理分類,通常的咱們會將實時任務、高頻率任務、短期任務劃分爲高優先級任務;而定時任務、低頻率任務、長時間任務則爲低優先級任務。而後再爲處理高優先級任務的隊列分配更多的 Worker。app

不過這種簡單粗暴的方式還存在一個問題,當高優先級任務被消費完後,相應的Workers 就會空閒下來,很是浪費系統資源。那麼改善的方法就是,「在高優先級任務隊列始終擁有更多 Worker 的前提下,當這些 Worker 空閒時,也能夠用於處理低優先級的任務」。利用 Worker 多隊列訂閱特性便可實現這個效果,例如,如今有 high_queue、low_queue 以及 worker_一、二、3。那麼就可讓 worker_一、二、3 均訂閱 high_queue 的同時,也讓 worker_二、3 訂閱 low_queue。運維

這裏寫圖片描述

應用 Worker 併發池的動態擴展

Celery Worker 支持下列四種併發方式。async

  • celery.concurrency.solo (Single-threaded execution pool)
  • celery.concurrency.prefork (Multiprocessing)
  • celery.concurrency.eventlet
  • celery.concurrency.gevent

經過配置項 worker_pool 指定,默認爲 prefork:ide

# Single-threaded execution pool
app.conf.worker_pool = 'solo'

同時還能夠經過配置項 worker_concurrency 來指定併發池的 size,默認爲運行環境的 CPU 數量:svg

app.conf.worker_concurrency = 10

回到正題,當咱們選擇使用 prefork/gevent 併發方式時,建議應用 Worker 併發池的 autoscale 自動適配功能,在 celery CLI 中使用 --autoscale 選項來指定併發池的上下限。例如:

celery worker -A proj --autoscale=6,3

但須要注意的是,不管是 Worker 的數量仍是併發池的數量都並不是越多越好,畢竟其自身的存在就須要消耗系統資源。但有一個原則是,當你的任務爲 I/O 密集型時,能夠適量增大併發池的 size;若是你的任務爲 CPU 密集型時,默認 size 不失爲一個保險的選擇。總而言之,最佳配比須要結合自身實際狀況不斷的嘗試得出。

應用任務預取數

Prefetch 預取數是繼承至 RabbitMQ 的原語,即爲 Worker 一次從隊列中獲取的任務消息的數量。任務的執行時間有長有短,咱們應該爲短期任務設置更大的任務預取數,以下降獲取任務帶來的資源消耗。

經過配置項 worker_prefetch_multiplier 來指定全局預取數乘子,默認爲 4。當設置爲 1 時,表示 disable 預取功能;當設置爲 0 時,表示 Worker 會盡量多的獲取任務。

# prefetch_count = worker_prefetch_multiplier * concurrent_processes_count
app.conf.worker_prefetch_multiplier = 10

若是你的任務既有長任務,又有短任務,那麼這裏建議你應用分開配置的 Worker 。以文件上傳爲例,上傳小文件(小於 1MB)的數量要遠大於上傳大文件(大於 20MB)的數量。那麼小文件上傳任務就屬於高頻短任務,而大文件上傳任務則是低頻長任務。分別實現 queue_small/worker_small_一、2 以及 queue_big/worker_big 來處理,同時應該爲 worker_small_一、2 設置更大的 Prefetch。

  • 設定不一樣的 celeryconfig 配置文件
# filename: big_prefetch.py
CELERYD_PREFETCH_MULTIPLIER = 10

# filename: small_prefetch.py
CELERYD_PREFETCH_MULTIPLIER = 100
  • 使用 celery CLI 的 –config 選項分別爲 worker 指定不一樣的 celeryconfig
celery worker -A proj -Q queue_small --config big_prefetch -n worker_small_1
celery worker -A proj -Q queue_small --config small_prefetch -n worker_small_2
celery worker -A proj -Q queue_big --config big_prefetch -n worker_big

保持任務的冪等性

Celery 雖然提供了任務異常重試,但卻沒法保證任務的事務性,即不提供任務狀態的回滾能力。因此爲了讓任務更易於部署和重試,應該儘可能將一個長任務拆解爲多個符合冪等性的短任務。

冪等(idempotent)是一個數學概念,常見於抽象代數。冪等性函數的特徵爲「若是接受到相同的實參,那麼不管重複執行多少次,都能獲得相同的結果」。例如,get_user_name()set_true() 均屬冪等函數。

可見冪等性任務結合任務異常重試,可以很是有效的提升任務執行的健壯性。

應用任務超時限制

避免某些任務一直處於非正常的進行中狀態,阻塞隊列中的其餘任務。應該爲任務執行設置超時時間。若是任務超時未完成,則會將 Worker 殺死,並啓動新的 Worker 來替代。

  • 全局設置任務超時時間:
app.conf.task_time_limit = 1800
  • 局部設置任務超時時間
@app.task(time_limit=1800)
def add(...):

善用任務工做流

Celery 支持 group/chain/chord/chunks/map/starmap 等多種工做流原語,基本能夠覆蓋大部分複雜的任務組合需求,善用任務工做流可以更好的應用 Celery 優秀的併發特性。例如,若是下一步任務須要等待上一步任務的執行結果,那麼不該該單純的應用 get 方法來實現同步子任務,而是應該使用 chain 任務鏈。

合理應用 ack_late 機制

使用 RabbitMQ 充當 Broker,能夠應用 RabbitMQ 的 ACK 機制來保證任務有效傳遞。但在任務執行要求很是嚴格的場景中,「有效傳遞」顯然是不夠的,「有效執行」才能夠。

爲了支持「有效執行」,Celery 在 ACK 的基礎上提供了 ack_late 機制。即只有當任務完成(成功/失敗)後,再向 Broker 回傳 ACK。而代價就是消息隊列的性能會下降,畢竟任務消息佔用隊列資源的時間變長了。

一般的,對於一些以小時爲單位的長時間任務,我會建議實現一次只保留一項任務的 ack late 方式。

app.conf.task_acks_late = True
app.conf.worker_prefetch_multiplier = 1
  • 局部開啓 ack_late:
@app.task(ack_late=True)
def add(...):

傳遞 ORM 對象的惟一標識

有時候任務執行須要對象的參與,此時建議傳遞對象的惟一標識,而非直接將對象序列化後再傳遞。例如,不要嘗試將數據庫的 ORM 對象做爲任務消息傳遞,而是傳遞 ORM 對象的主鍵 id。當任務執行到須要使用 ORM 對象時,再經過 id 從數據庫實時獲取,避免 ORM 對象由於隊列阻塞致使與數據庫實時記錄不一致的狀況。

預防內存泄漏

同一個 Worker 在執行了大量任務後,會有概率出現內存泄漏的狀況。這裏建議全局設置 Worker 最大的任務執行數,Worker 在完成了最大的任務執行數後就主動退出。

app.conf.worker_max_tasks_per_child = 100

合理安排定時任務的調度計劃

定時任務的調度計劃要通過科學合理的設計,通常的,咱們建議遵照如下幾點原則:

  1. 與系統管理員和數據庫管理員溝通,確保你預期的調度時間不會與他們的定時任務衝突。
  2. 將定時調度任務分散到各個時間點執行,均衡負載。
  3. 要考慮執行定時任務對生產業務系統的影響,儘量在業務低峯期執行。

啓用任務監控

Flower 是 Celery 官方推薦的實時監控工具,用於監控 Tasks 和 Workers 的運行狀態。Flower 提供了下列功能:

  • 查看 Task 清單、歷史記錄、參數、開始時間、執行狀態等
  • 撤銷、終止任務
  • 查看 Worker 清單、狀態
  • 遠程開啓、關閉、重啓 Worker 進程
  • 提供 HTTP API,方便集成到運維繫統

相比查看日誌,Flower 的 Web 界面會顯得更加友好。

相關文章
相關標籤/搜索