將要執行異步任務腳本 tasks.py:html
from celery import Celery from celery import group # host='10.32.21.52', port=6379, db=3 app = Celery('tasks', backend = 'redis://10.32.21.52:6379/14', broker='redis://10.32.21.52:6379/15') @app.task def add(x, y): return x + y @app.task def tsum(ite): return sum(ite) @app.task(trail=True) def A(how_many): return group(B.s(i) for i in range(how_many))() @app.task(trail=True) def B(i): return pow2.delay(i) @app.task(trail=True) def pow2(i): return i ** 2
其中:app中對celery進行配置;詳細的配置能夠參考文檔:http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configurationredis
啓動任務:服務器
celery -A tasks worker --loglevel=info
啓動後有如圖所示的輸出,顯示服務器及配置信息和任務運行日誌;app
經過腳本調用任務:異步
from tasks import add from tasks import tsum from tasks import A from celery import chord result = add.delay(4, 4) ~tsum.map([range(10), range(100)]) ~add.starmap([item for item in zip(range(10), range(10))]) # chord(add.s(i, i) for i in xrange(100))(tsum.s()).get() chord((add.s(i, i) for i in xrange(100)), tsum.s())() add.chunks(zip(xrange(100), xrange(100)), 10)() print(result.backend) print result.get() result = A.delay(10)
寫的任務太簡單了,比單機還慢,只能上抓取任務試試了~spa
celery經過裝飾器把各個任務抽象爲消息發送給第三方的插件調度,如:redis;而後把任務分配給各個啓動了celery的服務的機器去執行任務,有空研究一下源碼再繼續分享。插件