做爲一個Celery使用重度用戶。看到Celery Best Practices這篇文章。不禁得菊花一緊。html
乾脆翻譯出來,同一時候也會添加咱們項目中celery的實戰經驗。mysql
至於Celery爲什麼物,看這裏Celery。web
一般在使用Django的時候,你可能需要運行一些長時間的後臺任務,沒準你可能需要使用一些能排序的任務隊列,那麼Celery將會是一個很是好的選擇。redis
當把Celery做爲一個任務隊列用於很是多項目中後,做者積累了一些最佳實踐方式,譬如怎樣用合適的方式使用Celery,以及一些Celery提供的但是還未充分使用的特性。sql
數據庫並不是天生設計成能用於AMQP broker的。在生產環境下,它很是有可能在某時候當機(PS,當掉這點我認爲不論什麼系統都不能保證不當吧!!docker
。)。數據庫
做者猜測爲啥很是多人使用數據庫做爲broker主要是因爲他們已經有一個數據庫用來給web app提供數據存儲了。因而乾脆直接拿來使用。設置成Celery的broker是很是easy的。並且不需要再安裝其它組件(譬如RabbitMQ)。後端
假設有例如如下場景:你有4個後端workers去獲取並處理放入到數據庫裏面的任務,這意味着你有4個進程爲了獲取最新任務,需要頻繁地去輪詢數據庫。沒準每個worker同一時候還有多個本身的併發線程在幹這事情。網絡
某一天。你發現因爲太多的任務產生。4個worker不夠用了,處理任務的速度已經大大落後於生產任務的速度,因而你不停去添加worker的數量。忽然,你的數據庫因爲大量進程輪詢任務而變得響應緩慢,磁盤IO一直處於高峯值狀態,你的web應用也開始受到影響。這一切,都因爲workers在不停地對數據庫進行DDOS。併發
而當你使用一個合適的AMQP(譬如RabbitMQ)的時候,這一切都不會發生,以RabbitMQ爲例。首先,它將任務隊列放到內存裏面。你不需要去訪問硬盤。其次,consumers(也就是上面的worker)並不需要頻繁地去輪詢因爲RabbitMQ能將新的任務推送給consumers。
固然,假設RabbitMQ真出現故障了。至少也不會影響到你的web應用。
這也就是做者說的不用數據庫做爲broker的緣由,並且很是多地方都提供了編譯好的RabbitMQ鏡像,你都能直接使用,譬如這些。
對於這點。我是深表贊同的。咱們系統大量使用Celery處理異步任務,大概平均一天幾百萬的異步任務,曾經咱們使用的mysql。而後總會出現任務處理延時太嚴重的問題,即便添加了worker也無論用。因而咱們使用了redis。性能提高了很是多。至於爲啥使用mysql很是慢,咱們沒去深究,沒準也還真出現了DDOS的問題。
Celery很是easy設置,一般它會使用默認的queue用來存聽任務(除非你顯示指定其它queue)。一般寫法例如如下:
@app.task() def my_taskA(a, b, c): print("doing something here...") @app.task() def my_taskB(x, y): print("doing something here...")
這兩個任務都會在同一個queue裏面運行。這樣寫事實上很是有吸引力的,因爲你僅僅需要使用一個decorator就能實現一個異步任務。做者關心的是taskA和taskB沒準是全然兩個不一樣的東西,或者一個可能比還有一個更加劇要,那麼爲何要把它們放到一個籃子裏面呢?(雞蛋都不能放到一個籃子裏面,是吧!)沒準taskB事實上不怎麼重要,但是量太多,以致於重要的taskA反而不能高速地被worker進行處理。添加workers也解決不了這個問題,因爲taskA和taskB仍然在一個queue裏面運行。
爲了解決2裏面出現的問題,咱們需要讓taskA在一個隊列Q1,而taskB在還有一個隊列Q2運行。同一時候指定x workers去處理隊列Q1的任務,而後使用其它的workers去處理隊列Q2的任務。使用這樣的方式,taskB能夠得到足夠的workers去處理,同一時候一些優先級workers也能很是好地處理taskA而不需要進行長時間的等待。
首先手動定義queue
CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'), Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'), )
而後定義routes用來決定不一樣的任務去哪個queue
CELERY_ROUTES = { 'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'}, 'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'}, }
最後再爲每個task啓動不一樣的workerscelery worker -E -l INFO -n workerA -Q for_task_A celery worker -E -l INFO -n workerB -Q for_task_B
在咱們項目中。會涉及到大量文件轉換問題,有大量小於1mb的文件轉換,同一時候也有少許將近20mb的文件轉換。小文件轉換的優先級是最高的,同一時候不用佔用很是多時間,但大文件的轉換很是耗時。假設將轉換任務放到一個隊列裏面,那麼很是有可能因爲出現轉換大文件,致使耗時太嚴重形成小文件轉換延時的問題。
因此咱們依照文件大小設置了3個優先隊列。並且每個隊列設置了不一樣的workers。很是好地攻克了咱們文件轉換的問題。
大多數任務並無使用錯誤處理,假設任務失敗,那就失敗了。在一些狀況下這很是不錯。但是做者見到的多數失敗任務都是去調用第三方API而後出現了網絡錯誤。或者資源不可用這些錯誤。而對於這些錯誤。最簡單的方式就是重試一下,或許就是第三方API暫時服務或者網絡出現故障,沒準當即就行了,那麼爲何不試着重試一下呢?
@app.task(bind=True, default_retry_delay=300, max_retries=5) def my_task_A(): try: print("doing stuff here...") except SomeNetworkException as e: print("maybe do some clenup here....") self.retry(e)
做者喜歡給每個任務定義一個等待多久重試的時間,以及最大的重試次數。固然還有更具體的參數設置,本身看文檔去。
對於錯誤處理,咱們因爲使用場景特殊,好比一個文件轉換失敗,那麼無論多少次重試都會失敗。因此沒有添加劇試機制。
Flower是一個很是強大的工具,用來監控celery的tasks和works。
這玩意咱們也沒怎麼使用。因爲多數時候咱們都是直接鏈接redis去查看celery相關狀況了。貌似挺傻逼的對不,尤爲是celery在redis裏面存放的數據並不能方便的取出來。
一個任務狀態就是該任務結束的時候成功仍是失敗信息,沒準在一些統計場合,這很是實用。但咱們需要知道。任務退出的狀態並不是該任務運行的結果,該任務運行的一些結果因爲會對程序有影響,通常會被寫入數據庫(好比更新一個用戶的朋友列表)。
做者見過的多數項目都將任務結束的狀態存放到sqlite或者本身的數據庫,但是存這些真有必要嗎,沒準可能影響到你的web服務的。因此做者一般設置CELERY_IGNORE_RESULT = True
去丟棄。
對於咱們來講,因爲是異步任務,知道任務運行完畢以後的狀態真沒啥用。因此果斷丟棄。
這個事實上就是不要傳遞Database對象(好比一個用戶的實例)給任務。因爲沒準序列化以後的數據已是過時的數據了。
因此不妨直接傳遞一個user id,而後在任務運行的時候實時的從數據庫獲取。
對於這個,咱們也是如此,給任務僅僅傳遞相關id數據。譬如文件轉換的時候,咱們僅僅會傳遞文件的id,而其它文件信息的獲取咱們都是直接經過該id從數據庫裏面取得。
後面就是咱們本身的感觸了,上面做者提到的Celery的使用,真的能夠算是很是好地實踐方式。至少現在咱們的Celery沒出過太大的問題,固然小坑仍是有的。至於RabbitMQ,這玩意咱們是真沒用過,效果怎麼樣不知道。至少比mysql好用吧。
最後。附上做者的一個Celery Talk https://denibertovic.com/talks/celery-best-practices/。