celery是一個分佈式的任務調度模塊,那麼celery是如何和分佈式掛鉤呢?redis
celery能夠支持多臺不一樣的計算機執行不一樣的任務或者相同的任務。vim
若是要說celery的分佈式應用的話,就要提到celery的消息路由機制,提到AMQP協議。服務器
具體能夠查看AMQP文檔詳細瞭解。app
簡單理解:分佈式
能夠有多個"消息隊列"(message Queue),不一樣的消息能夠指定發送給不一樣的Message Queue,code
而這是經過Exchange來實現的,發送消息到"消息隊列"中時,能夠指定routiing_key,Exchange經過routing_key來吧消息路由(routes)到不一樣的"消息隊列"中去。對象
如圖:blog
exchange 對應 一個消息隊列(queue),即:經過"消息路由"的機制使exchange對應queue,每一個queue對應每一個worker隊列
寫個例子:ip
vim demon3.py
from celery import Celery app = Celery() app.config_from_object("celeryconfig") @app.task def taskA(x, y): return x * y @app.task def taskB(x, y, z): return x + y + z @app.task def add(x, y): return x + y
vim celeryconfig.py
from kombu import Queue BORKER_URL = "redis://192.168.48.131:6379/1" #1庫 CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2" #2庫 CELERY_QUEUES = { Queue("default", Exchange("default"), routing_key = "default"), Queue("for_task_A", Exchange("for_task_A"), routing_key = "for_task_A"), Queue("for_task_B", Exchange("for_task_B"), routing_key = "for_task_B") } #路由 CELERY_ROUTES = { "demon3.taskA":{"queue": "for_task_A", "routing_key": "for_task_A"}, "demon3.taskB":{"queue": "for_task_B", "routing_key": "for_task_B"} }
下面把兩個腳本導入服務器:
指定taskA啓動一個worker:
# celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
同理:
# celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
下面遠程客戶端調用:新文件
vim remote.py
from demon3 import * r1 = taskA.delay(10, 20) print (r1.result) print (r1.status) r2 = taskB.delay(10, 20, 30) time.sleep(1) prnit (r2.result) print (r2.status) #print (dir(r2)) r3 = add.delay(100, 200) print (r3.result) print (r3.status) #PENDING
看到狀態是PENDING,表示沒有執行,這個是由於沒有celeryconfig.py文件中指定改route到哪個Queue中,因此會被髮動到默認的名字celery的Queue中,可是咱們尚未啓動worker執行celery中的任務。
下面,咱們來啓動一個worker來執行celery隊列中的任務
# celery -A tasks worker -l info -n worker.%h -Q celery ##默認的
能夠看到這行的結果爲success
print(re3.status) #SUCCESS
定時任務:
Celery 與 定時任務
在celery中執行定時任務很是簡單,只須要設置celery對象中的CELERYBEAT_SCHEDULE屬性便可。
下面咱們接着在配置文件:celeryconfig.py,添加關於 CELERYBEAT_SCHEDULE 變量到腳本中去:
CELERY_TIMEZONE = 'UTC' CELERYBEAT_SCHEDULE = { 'taskA_schedule' : { 'task':'tasks.taskA', 'schedule':20, 'args':(5,6) }, 'taskB_scheduler' : { 'task':"tasks.taskB", "schedule":200, "args":(10,20,30) }, 'add_schedule': { "task":"tasks.add", "schedule":10, "args":(1,2) } }
注意格式,不然會有問題
啓動:
celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
celery -A tasks worker -l info -n worker.%h -Q celery
celery -A demon3 beat