Python中任務隊列-芹菜celery的使用|Python 主題月

本文正在參加「Python主題月」,詳情查看 活動鏈接html

1、關於celery

芹菜celery是一個python實現的異步任務隊列,能夠用於爬蟲、web後臺查詢、計算等等。經過任務隊列,當一個任務來臨時再也不傻傻等待。python

他的架構以下:web

celery_architecture

  • Broker

咱們的生產者建立任務後會進入celery的任務調度隊列中間件Broker,Broker經過調度規則將消息(任務)調度消息隊列,Broker依賴第三方隊列消息代理如rabbitmqredis等。redis

  • Worker

廣大勞動者,盯着消息隊列,當隊列中有消息時把它拿過來給處理了。docker

  • Backend

用於結果存儲經worker處理的結果,好比經常使用的數據庫等。shell


#### 使用celery數據庫

在本文中我們使用rabbitmq(celery推薦)做爲消息代理中間件。django

咱們建立的celery目錄以下後端

learn_celery/
...celery_env/
...celery.py
...my_task1.py
...my_task2.py
...task1_run.py
...task2_run.py
複製代碼
1. 建立虛擬環境並安裝celery、flower(web監控),這裏不作贅述。
2.安裝我們的消息隊列中間件rabbitmq

這裏以docker的方式運行並配置,指定主機名爲rabbit(rabbitmq是以主機名來訪問的,因此這是必須的),容器名稱爲celery_rabbitmqmarkdown

docker run -d -p 5672:5672 -h rabbit --name celery_rabbitmq rabbitmq
複製代碼

添加用於celery訪問的用戶,以及配置configurewriteread權限,在下面咱們配置rabbit_user擁有全部配置、寫入和讀取權限。

docker exec -it celery_rabbitmq rabbitmqctl add_user rabbit_user rabbit_pass
docker exec -it celery_rabbitmq rabbitmqctl add_vhost rabbit_vhost
docker exec -it celery_rabbitmq rabbitmqctl set_user_tags rabbit_user celery
docker exec -it celery_rabbitmq rabbitmqctl  set_permissions -p rabbit_vhost rabbit_user ".*" ".*" ".*"
複製代碼
3.建立celery應用
#celery.py
from celery import Celery

broker_rabbitmq="amqp://rabbit_user:rabbit_pass@i-k9pwet2d/rabbit_vhost"
app=Celery("learn_celery",broker=broker_rabbitmq,backend="rpc://",include=["learn_celery.my_task2","learn_celery.my_task2"])
複製代碼

咱們經過建立app來實例化Celery,項目包的名稱爲learn_celery,經過broker_rabbitmq來鏈接rabbitmq,rabbitmq的amqp協議格式爲

amqp://userid:password@hostname:port/virtual_host
複製代碼

因爲咱們是在docker中啓動的rabbitmq,因此咱們的hostname應該爲宿主機的hostname。

指定後端經過rpc回傳數據,include加載帶worker處理的任務learn_celery.my_task1learn_celery.my_task2

4.建立兩個任務(消息)
#my_task1.py
from .celery import app
import time

@app.task
def args_add1(x,y):
    print("start task no.1 now!")
    time.sleep(10)
    print("task no.1 end!")
    return x+y

#my_task12.py
from .celery import app
import time

@app.task
def args_add2(x,y):
    print("start task no.2 now!")
    time.sleep(20)
    print("task no.2 end!")
    return x+y
複製代碼

在這裏咱們導入了celery中的app,並用它來裝飾咱們的方法args_add,在args_add中模擬任務處理時間分別爲10s、20s而後返回結果。

5.發送任務給celery
#tasks1_run.py
from .my_task1 import args_add1
import time

reslut=args_add1.delay(11,22)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(15)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))

#tasks2_run.py
from .my_task2 import args_add2
import time

reslut=args_add2.delay(33,44)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(25)
print("task over?{}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))

複製代碼

關於任務的delay,官方文檔(參考)是這樣描述的,我把它理解爲發送任務給celery或者celery調用待進來的任務。

image-20210707162220566

reslut.ready() 返回任務執行是否執行完成True or False

reslut.result 返回任務執行結果

咱們在任務進入celery和結束分別檢查一次。


2、看看結果

1.啓動worker

進入learn_celery的父目錄。啓動learn_celery的這個應用worker,並指定併發數爲10個

celery -A learn_celery worker --loglevel=info --concurrency=10
複製代碼

若celery鏈接rabbitmq正常,咱們能夠看到以下的info

image-20210707112018241

2.執行任務

爲了便於觀察,咱們另外開啓一個窗口2,到learn_celery父目錄運行task1_run模塊

python -m learn_celery.tasks1_run
複製代碼

image-20210707164856051

開啓窗口3,到learn_celery父目錄運行task2_run模塊

python -m learn_celery.tasks2_run
複製代碼

image-20210707165012326

能夠看到通過各自任務的等待時間後,兩個任務都順利執行結束,並獲得結果,接下來咱們到worker上看一下info

因爲celery的併發性,收到任務立刻被調入執行,任務1耗時10s結果爲33,任務2耗時20s結果爲77


3、使用Flower監控celery

1.啓動flower
celery -A learn_celery flower
複製代碼
2. 查看web監控 http://ip:5555

Tasks中能夠查看到當前任務隊列的狀態、參數、接收和啓動、執行時間。 image-20210707170905888Dashborad中查看當前worker節點的相關信息 image-20210707171023610


文章有不足的地方歡迎指出。

歡迎收藏、點贊、提問。關注頂級飲水機管理員,除了管燒熱水,有時還作點別的。


NEXT

  • celery的深刻了解

  • celery在django中的使用

相關文章
相關標籤/搜索