python—Celery異步分佈式

1、Celery異步分佈式python

Celery  是一個python開發的異步分佈式任務調度模塊,是一個消息傳輸的中間件,能夠理解爲一個郵箱,每當應用程序調用celery的異步任務時,會向broker傳遞消息,而後celery的worker從中取消息redis

Celery  用於存儲消息以及celery執行的一些消息和結果數據庫


對於brokers,官方推薦是rabbitmq和redisapp

對於backend,也就是指數據庫,爲了簡單通常使用redis異步


clipboard.png


使用redis鏈接url格式:分佈式

redis://:password@hostname:port/db_numberide


1)定義鏈接腳本tasks.py
測試


#!/usr/bin/env python
from celery import Celery
broker = "redis://192.168.2.230:6379/1"
backend = "redis://192.168.2.230:6379/2"
app = Celery("tasks", broker=broker, backend=backend)

@app.task
def add(x,y):
    return x+y


2)安裝啓動celeryurl

pip install celeryspa

pip install redis

啓動方式:celery -A huang tasks -l info  #-l 等同於 --loglevel

1.png


3)執行測試 huang.py 

#!/usr/bin/env python
from tasks import add

re = add.delay(10,20)

print(re.result)   #任務返回值
print(re.ready)     #若是任務被執行返回True,其餘狀況返回False

print(re.get(timeout=1))  #帶參數的等待,最後返回結果
print(re.status)  #任務當前狀態

運行結果:

30

<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>

30

SUCCESS


4)根據成功返回的key或celery界面輸出的信息,查看redis存儲

blob.png


說明:中止celery服務,執行完huang.py以後,再啓動celery服務也是有保存數據的



2、celery多進程

1.png

1)配置文件 celeryconfig.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from kombu import Exchange,Queue

BROKER_URL = "redis://192.168.2.230:6379/3"
CELERY_RESULT_BACKEND = "redis://192.168.2.230:6379/4"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)

CELERY_ROUTES = {
'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
}


2)tasks.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from celery import Celery

app = Celery()
app.config_from_object("celeryconfig")

@app.task
    def taskA(x,y):
    return x+y
    
@app.task
    def taskB(x,y,z):
    return x+y+z


3)啓動celery

celery -A tasks worker --loglevel info


4)執行腳本huang2.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from tasks import taskA,taskB

re = taskA.delay(10,20)

print(re.result)   #任務返回值
print(re.ready)     #若是任務被執行返回True,其餘狀況返回False
print(re.get(timeout=1))  #帶參數的等待,最後返回結果
print(re.status)  #任務當前狀態

re2 = taskB.delay(10,20,30)
print(re2.result)
print(re2.ready)
print(re2.get(timeout=1))
print(re2.status)


5)運行結果

None

<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>

30

SUCCESS

None

<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>

60

SUCCESS

相關文章
相關標籤/搜索