本文正在參加「Python主題月」,詳情查看 活動鏈接html
芹菜celery是一個python實現的異步任務隊列,能夠用於爬蟲、web後臺查詢、計算等等。經過任務隊列,當一個任務來臨時再也不傻傻等待。python
他的架構以下:web
咱們的生產者建立任務後會進入celery的任務調度隊列中間件Broker,Broker經過調度規則將消息(任務)調度消息隊列,Broker依賴第三方隊列消息代理如rabbitmq
、redis
等。redis
廣大勞動者,盯着消息隊列,當隊列中有消息時把它拿過來給處理了。docker
用於結果存儲經worker處理的結果,好比經常使用的數據庫等。shell
#### 使用celery數據庫
在本文中我們使用rabbitmq
(celery推薦)做爲消息代理中間件。django
咱們建立的celery目錄以下後端
learn_celery/
...celery_env/
...celery.py
...my_task1.py
...my_task2.py
...task1_run.py
...task2_run.py
複製代碼
rabbitmq
這裏以docker的方式運行並配置,指定主機名爲rabbit
(rabbitmq是以主機名來訪問的,因此這是必須的),容器名稱爲celery_rabbitmq
markdown
docker run -d -p 5672:5672 -h rabbit --name celery_rabbitmq rabbitmq
複製代碼
添加用於celery訪問的用戶,以及配置configure
、write
和read
權限,在下面咱們配置rabbit_user擁有全部配置、寫入和讀取權限。
docker exec -it celery_rabbitmq rabbitmqctl add_user rabbit_user rabbit_pass
docker exec -it celery_rabbitmq rabbitmqctl add_vhost rabbit_vhost
docker exec -it celery_rabbitmq rabbitmqctl set_user_tags rabbit_user celery
docker exec -it celery_rabbitmq rabbitmqctl set_permissions -p rabbit_vhost rabbit_user ".*" ".*" ".*"
複製代碼
#celery.py
from celery import Celery
broker_rabbitmq="amqp://rabbit_user:rabbit_pass@i-k9pwet2d/rabbit_vhost"
app=Celery("learn_celery",broker=broker_rabbitmq,backend="rpc://",include=["learn_celery.my_task2","learn_celery.my_task2"])
複製代碼
咱們經過建立app來實例化Celery,項目包的名稱爲learn_celery
,經過broker_rabbitmq
來鏈接rabbitmq,rabbitmq的amqp協議格式爲
amqp://userid:password@hostname:port/virtual_host
複製代碼
因爲咱們是在docker中啓動的rabbitmq,因此咱們的hostname應該爲宿主機的hostname。
指定後端經過rpc回傳數據,include加載帶worker處理的任務learn_celery.my_task1
、learn_celery.my_task2
#my_task1.py
from .celery import app
import time
@app.task
def args_add1(x,y):
print("start task no.1 now!")
time.sleep(10)
print("task no.1 end!")
return x+y
#my_task12.py
from .celery import app
import time
@app.task
def args_add2(x,y):
print("start task no.2 now!")
time.sleep(20)
print("task no.2 end!")
return x+y
複製代碼
在這裏咱們導入了celery中的app,並用它來裝飾咱們的方法args_add
,在args_add中模擬任務處理時間分別爲10s、20s而後返回結果。
#tasks1_run.py
from .my_task1 import args_add1
import time
reslut=args_add1.delay(11,22)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(15)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
#tasks2_run.py
from .my_task2 import args_add2
import time
reslut=args_add2.delay(33,44)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(25)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
複製代碼
關於任務的delay
,官方文檔(參考)是這樣描述的,我把它理解爲發送任務給celery或者celery調用待進來的任務。
reslut.ready()
返回任務執行是否執行完成True
or False
reslut.result
返回任務執行結果
咱們在任務進入celery和結束分別檢查一次。
進入learn_celery的父目錄。啓動learn_celery的這個應用worker,並指定併發數爲10個
celery -A learn_celery worker --loglevel=info --concurrency=10
複製代碼
若celery鏈接rabbitmq正常,咱們能夠看到以下的info
爲了便於觀察,咱們另外開啓一個窗口2,到learn_celery父目錄運行task1_run模塊
python -m learn_celery.tasks1_run
複製代碼
開啓窗口3,到learn_celery父目錄運行task2_run模塊
python -m learn_celery.tasks2_run
複製代碼
能夠看到通過各自任務的等待時間後,兩個任務都順利執行結束,並獲得結果,接下來咱們到worker上看一下info
因爲celery的併發性,收到任務立刻被調入執行,任務1耗時10s結果爲33,任務2耗時20s結果爲77
celery -A learn_celery flower
複製代碼
在Tasks
中能夠查看到當前任務隊列的狀態、參數、接收和啓動、執行時間。 在Dashborad
中查看當前worker節點的相關信息
文章有不足的地方歡迎指出。
歡迎收藏、點贊、提問。關注頂級飲水機管理員,除了管燒熱水,有時還作點別的。
celery的深刻了解
celery在django中的使用