celery是什麼?
個人理解比較簡單,它是一個「任務隊列」,我主要拿他來作兩件事情:html
1.處理異步任務node
2.處理定時任務python
一個簡單任務
安裝相應的pip包
pip install celery[redis]
準備項目文件
項目文件結構以下:web
. ├── caller.py ├── tasks.py
tasks.py
中存聽任務函數:redis
import time from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def add(x, y): time.sleep(30) print(f'add end, res:{x+y}') return x + y
caller.py
中存放調用函數:shell
from tasks import add for i in range(0, 20): add.delay(4, 4)
運行celery
在當前目錄下運行一個worker:瀏覽器
celery -A tasks worker --loglevel=info
如何查看任務狀態
一些原理
此處使用redis作celery的消息隊列(broker),函數觸發一次任務調用時,發送消息給redis,woker從redis中獲取任務進行處理,若是任務過多,worker處理不過來,worker不會領取任務,任務會先存放在redis中。併發
默認運行的worker啓動的進程數等於cpu核數(引),例如我這裏是4。也就是說此處的celery一次只能處理4個任務,若是一次有多個任務發送過來,有的任務就須要排隊。此處將add
函數的處理時間模擬爲30s。app
celery有一個「prefetch」的動做。例如上面的例子,雖然這個worker一次只能處理4個任務,可是這個worker除了接收它能處理的4個任務當即去執行,還要再接收一些任務準備運行,它打算再接收多少任務預備着呢?這取決於一個配置參數:worker_prefetch_multiplier
,這個參數默認是4(引),那麼他會多接收的數量爲:worker_prefetch_multiplier * 併發進程數
,放在此處就是:4 * 4 = 16
。也就是說,若是redis中有不少等待處理的任務,其實worker運行起來會一次拿走4 + 16 = 20
個任務。異步
測試
查看有多少任務在消息隊列中
celery在redis中的存聽任務的隊列key默認名稱是celery
(引),這個key只有當redis中有積壓任務時纔會存在,若是它不存在就表明當前消息隊列中無消息。
運行caller.py
,而後登錄redis查詢:
127.0.0.1:6379> llen celery (integer) 0
能夠看到,儘管celery一次只能處理4個任務,但它把20個任務全領走了。因爲咱們的任務要30s才能處理完成,當即再運行一次caller.py
,而後redis查詢:
127.0.0.1:6379> llen celery (integer) 20
能夠看到,有20個任務還在消息隊列中等待處理。
查看有多少任務正在運行
celery -A tasks inspect active
查看有多少任務接收了但還未運行
這種任務在celery中叫作reserved task
。
celery -A tasks inspect reserved # 統計個數(數量爲下面的結果-1) celery -A tasks inspect reserved | wc -l
查看worker狀態
運行:
$ celery -A tasks status celery@itscs-MacBook-Pro.local: OK 1 node online.
能夠看到,提示有一個worker(node)是在線的(online)。
使用flower在線查看
flower能夠實時監控celery的狀態,而且還能修改一些配置(生產環境慎用)。(引)
pip install flower celery -A tasks flower
瀏覽器打開http://localhost:5555
便可看到一個現實celery狀態的網頁。