Python Celery多實例 定時任務

celery是一個分佈式的任務調度模塊,那麼celery是如何和分佈式掛鉤呢?redis

celery能夠支持多臺不一樣的計算機執行不一樣的任務或者相同的任務。vim

若是要說celery的分佈式應用的話,就要提到celery的消息路由機制,提到AMQP協議。服務器

具體能夠查看AMQP文檔詳細瞭解。app

簡單理解:分佈式

能夠有多個"消息隊列"(message Queue),不一樣的消息能夠指定發送給不一樣的Message Queue,code

而這是經過Exchange來實現的,發送消息到"消息隊列"中時,能夠指定routiing_key,Exchange經過routing_key來吧消息路由(routes)到不一樣的"消息隊列"中去。對象

如圖:blog

clipboard.png

exchange 對應 一個消息隊列(queue),即:經過"消息路由"的機制使exchange對應queue,每一個queue對應每一個worker隊列

寫個例子:ip

vim demon3.py

from celery import Celery
app = Celery()
app.config_from_object("celeryconfig")
@app.task
def taskA(x, y):
    return x * y
@app.task
def taskB(x, y, z):
    return x + y + z
@app.task
def add(x, y):
    return x + y

 

vim celeryconfig.py

from kombu import Queue
BORKER_URL = "redis://192.168.48.131:6379/1"				#1庫
CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"	#2庫
CELERY_QUEUES = {
    Queue("default", Exchange("default"), routing_key = "default"),
    Queue("for_task_A", Exchange("for_task_A"), routing_key = "for_task_A"),
    Queue("for_task_B", Exchange("for_task_B"), routing_key = "for_task_B")
}
#路由
CELERY_ROUTES = {
    "demon3.taskA":{"queue": "for_task_A",  "routing_key": "for_task_A"},
    "demon3.taskB":{"queue": "for_task_B",  "routing_key": "for_task_B"}
}

下面把兩個腳本導入服務器:

指定taskA啓動一個worker:

# celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

同理:

# celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

下面遠程客戶端調用:新文件

vim remote.py

from demon3 import *
r1 = taskA.delay(10, 20)
print (r1.result)
print (r1.status)
r2 = taskB.delay(10, 20, 30)
time.sleep(1)
prnit (r2.result)
print (r2.status)
#print (dir(r2))
r3 = add.delay(100, 200)
print (r3.result)
print (r3.status)	#PENDING

看到狀態是PENDING,表示沒有執行,這個是由於沒有celeryconfig.py文件中指定改route到哪個Queue中,因此會被髮動到默認的名字celery的Queue中,可是咱們尚未啓動worker執行celery中的任務。

下面,咱們來啓動一個worker來執行celery隊列中的任務

# celery -A tasks worker -l info -n worker.%h -Q celery 		##默認的

能夠看到這行的結果爲success

print(re3.status)    #SUCCESS

 

定時任務:

Celery 與 定時任務

在celery中執行定時任務很是簡單,只須要設置celery對象中的CELERYBEAT_SCHEDULE屬性便可。

下面咱們接着在配置文件:celeryconfig.py,添加關於 CELERYBEAT_SCHEDULE 變量到腳本中去:

CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
    'taskA_schedule' : {
        'task':'tasks.taskA',
        'schedule':20,
        'args':(5,6)
    },
'taskB_scheduler' : {
    'task':"tasks.taskB",
    "schedule":200,
    "args":(10,20,30)
    },
'add_schedule': {
    "task":"tasks.add",
    "schedule":10,
    "args":(1,2)
    }
}

注意格式,不然會有問題

啓動:

celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

celery -A tasks worker -l info -n worker.%h -Q celery

celery -A demon3 beat

相關文章
相關標籤/搜索