做爲一個Celery使用重度用戶,看到Celery Best Practices這篇文章,不禁得菊花一緊。乾脆翻譯出來,同時也會加入咱們項目中celery的實戰經驗。html
至於Celery爲什麼物,看這裏Celery。mysql
一般在使用Django的時候,你可能須要執行一些長時間的後臺任務,沒準你可能須要使用一些能排序的任務隊列,那麼Celery將會是一個很是好的選擇。web
當把Celery做爲一個任務隊列用於不少項目中後,做者積累了一些最佳實踐方式,譬如如何用合適的方式使用Celery,以及一些Celery提供的可是還未充分使用的特性。redis
數據庫並非天生設計成能用於AMQP broker的,在生產環境下,它頗有可能在某時候當機(PS,當掉這點我以爲任何系統都不能保證不當吧!!!)。sql
做者猜測爲啥不少人使用數據庫做爲broker主要是由於他們已經有一個數據庫用來給web app提供數據存儲了,因而乾脆直接拿來使用,設置成Celery的broker是很容易的,而且不須要再安裝其餘組件(譬如RabbitMQ)。docker
假設有以下場景:你有4個後端workers去獲取並處理放入到數據庫裏面的任務,這意味着你有4個進程爲了獲取最新任務,須要頻繁地去輪詢數據庫,沒準每一個worker同時還有多個本身的併發線程在幹這事情。數據庫
某一天,你發現由於太多的任務產生,4個worker不夠用了,處理任務的速度已經大大落後於生產任務的速度,因而你不停去增長worker的數量。忽然,你的數據庫由於大量進程輪詢任務而變得響應緩慢,磁盤IO一直處於高峯值狀態,你的web應用也開始受到影響。這一切,都由於workers在不停地對數據庫進行DDOS。後端
而當你使用一個合適的AMQP(譬如RabbitMQ)的時候,這一切都不會發生,以RabbitMQ爲例,首先,它將任務隊列放到內存裏面,你不須要去訪問硬盤。其次,consumers(也就是上面的worker)並不須要頻繁地去輪詢由於RabbitMQ能將新的任務推送給consumers。固然,若是RabbitMQ真出現問題了,至少也不會影響到你的web應用。網絡
這也就是做者說的不用數據庫做爲broker的緣由,並且不少地方都提供了編譯好的RabbitMQ鏡像,你都能直接使用,譬如這些。併發
對於這點,我是深表贊同的。咱們系統大量使用Celery處理異步任務,大概平均一天幾百萬的異步任務,之前咱們使用的mysql,而後總會出現任務處理延時太嚴重的問題,即便增長了worker也無論用。因而咱們使用了redis,性能提高了不少。至於爲啥使用mysql很慢,咱們沒去深究,沒準也還真出現了DDOS的問題。
Celery很是容易設置,一般它會使用默認的queue用來存聽任務(除非你顯示指定其餘queue)。一般寫法以下:
@app.task() def my_taskA(a, b, c): print("doing something here...") @app.task() def my_taskB(x, y): print("doing something here...")
這兩個任務都會在同一個queue裏面執行,這樣寫其實頗有吸引力的,由於你只須要使用一個decorator就能實現一個異步任務。做者關心的是taskA和taskB沒準是徹底兩個不一樣的東西,或者一個可能比另外一個更加劇要,那麼爲何要把它們放到一個籃子裏面呢?(雞蛋都不能放到一個籃子裏面,是吧!)沒準taskB其實不怎麼重要,可是量太多,以致於重要的taskA反而不能快速地被worker進行處理。增長workers也解決不了這個問題,由於taskA和taskB仍然在一個queue裏面執行。
爲了解決2裏面出現的問題,咱們須要讓taskA在一個隊列Q1,而taskB在另外一個隊列Q2執行。同時指定x workers去處理隊列Q1的任務,而後使用其它的workers去處理隊列Q2的任務。使用這種方式,taskB可以得到足夠的workers去處理,同時一些優先級workers也能很好地處理taskA而不須要進行長時間的等待。
首先手動定義queue
CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'), Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'), )
而後定義routes用來決定不一樣的任務去哪個queue
CELERY_ROUTES = { 'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'}, 'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'}, }
最後再爲每一個task啓動不一樣的workerscelery worker -E -l INFO -n workerA -Q for_task_A celery worker -E -l INFO -n workerB -Q for_task_B
在咱們項目中,會涉及到大量文件轉換問題,有大量小於1mb的文件轉換,同時也有少許將近20mb的文件轉換,小文件轉換的優先級是最高的,同時不用佔用不少時間,但大文件的轉換很耗時。若是將轉換任務放到一個隊列裏面,那麼頗有可能由於出現轉換大文件,致使耗時太嚴重形成小文件轉換延時的問題。
因此咱們按照文件大小設置了3個優先隊列,而且每一個隊列設置了不一樣的workers,很好地解決了咱們文件轉換的問題。
大多數任務並無使用錯誤處理,若是任務失敗,那就失敗了。在一些狀況下這很不錯,可是做者見到的多數失敗任務都是去調用第三方API而後出現了網絡錯誤,或者資源不可用這些錯誤,而對於這些錯誤,最簡單的方式就是重試一下,也許就是第三方API臨時服務或者網絡出現問題,沒準立刻就行了,那麼爲何不試着重試一下呢?
@app.task(bind=True, default_retry_delay=300, max_retries=5) def my_task_A(): try: print("doing stuff here...") except SomeNetworkException as e: print("maybe do some clenup here....") self.retry(e)
做者喜歡給每個任務定義一個等待多久重試的時間,以及最大的重試次數。固然還有更詳細的參數設置,本身看文檔去。
對於錯誤處理,咱們由於使用場景特殊,例如一個文件轉換失敗,那麼不管多少次重試都會失敗,因此沒有加入重試機制。
Flower是一個很是強大的工具,用來監控celery的tasks和works。
這玩意咱們也沒怎麼使用,由於多數時候咱們都是直接鏈接redis去查看celery相關狀況了。貌似挺傻逼的對不,尤爲是celery在redis裏面存放的數據並不能方便的取出來。
一個任務狀態就是該任務結束的時候成功仍是失敗信息,沒準在一些統計場合,這頗有用。但咱們須要知道,任務退出的狀態並非該任務執行的結果,該任務執行的一些結果由於會對程序有影響,一般會被寫入數據庫(例如更新一個用戶的朋友列表)。
做者見過的多數項目都將任務結束的狀態存放到sqlite或者本身的數據庫,可是存這些真有必要嗎,沒準可能影響到你的web服務的,因此做者一般設置CELERY_IGNORE_RESULT = True
去丟棄。
對於咱們來講,由於是異步任務,知道任務執行完成以後的狀態真沒啥用,因此果斷丟棄。
這個其實就是不要傳遞Database對象(例如一個用戶的實例)給任務,由於沒準序列化以後的數據已是過時的數據了。因此最好仍是直接傳遞一個user id,而後在任務執行的時候實時的從數據庫獲取。
對於這個,咱們也是如此,給任務只傳遞相關id數據,譬如文件轉換的時候,咱們只會傳遞文件的id,而其它文件信息的獲取咱們都是直接經過該id從數據庫裏面取得。
後面就是咱們本身的感觸了,上面做者提到的Celery的使用,真的能夠算是很好地實踐方式,至少如今咱們的Celery沒出過太大的問題,固然小坑仍是有的。至於RabbitMQ,這玩意咱們是真沒用過,效果怎麼樣不知道,至少比mysql好用吧。
最後,附上做者的一個Celery Talk https://denibertovic.com/talks/celery-best-practices/。