這裏有幾個概念,task、worker、broker。
顧名思義,task 就是老闆交給你的各類任務,worker 就是你手下幹活的人員。前端
那什麼是 Broker 呢?python
老闆給你下發任務時,你須要 把它記下來, 這個它 能夠是你隨身攜帶的本子,也能夠是 電腦裏地記事本或者excel,或者是你的 任什麼時候間管理工具。redis
Broker 則是 Celery 記錄task的地方。
做爲一個任務管理者的你,將老闆(前端程序)發給你的 安排的工做(Task) 記錄到你的本子(Broker)裏。接下來,你就安排你手下的IT程序猿們(Worker),都到你的本子(Broker)裏來取走工做(Task)app
#tasks.py異步
from celery import Celery app = Celery('tasks', broker='amqp://admin:admin@localhost:5672') @app.task def add(x, y): return x + y
啓動async
celery -A tasks worker --loglevel=info
運行工具
>>> from tasks import add >>> add(1, 3) 4 >>> add.delay(1,3) <AsyncResult: 07614cef-f314-4c7b-a33f-92c080cadb83> >>>
注:delay是使用異步的方式,會壓入到消息隊列。不然,不會使用消息隊列。測試
文件名爲tasks.py,則其中代碼app = Celery('tasks', broker=),Celery第一個參數爲工程名,啓動時也是celery -A tasks worker --loglevel=infospa
對比3d
注:投入到指定的隊列用:add.delay(1, 3, queue='queue_add1')
test_2.py
from celery import Celery app = Celery('proj', broker='amqp://admin:admin@localhost:5672', include='test_2') @app.task def add(x, y): return x + y
例1:
#test.py
from celery import Celery import time app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672') @app.task def add(x, y): print "------>" time.sleep(5) print "<--------------" return x + y if __name__ == "__main__": app.start()
啓動
python test.py worker
celery默認啓動的worker數爲內核個數,若是指定啓動個數,用參數-c,例
python test.py worker -c 2
例2:
#test.py
from celery import Celery import time app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672') @app.task def add(x, y): print "------>" time.sleep(2) print "<--------------" return x + y if __name__ == "__main__": app.start()
#eg.py
from test import * import time rev = [] for i in range(3): rev.append(add.delay(1,3)) print "len rev:", len(rev) while 1: tag = 1 for key in rev: if not key.ready(): tag = 0 time.sleep(1) print "sleep 1" if tag: break print "_____________________>"
#test_redis.py
from celery import Celery import time #app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000') app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000') @app.task def add(x, y): print "------>" time.sleep(5) print "<--------------" return x + y if __name__ == "__main__": app.start()
啓動
python test_redis.py worker -c 2
測試
from celery import group from test_redis import * g = group(add.s(2, 3)).apply_async() g = group(add.s(2, 3)).apply_async() g = group(add.s(2, 3)).apply_async() g = group(add.s(2, 3)).apply_async() g = group(add.s(2, 3)).apply_async() for ret in g.get(): print ret print "end-----------------------------------"
結果
5
end-----------------------------------
#test_redis.py
from celery import Celery import time #app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000') app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000') @app.task def add(x, y): print "------>" time.sleep(5) print "<--------------" return x + y if __name__ == "__main__": app.start()
#test_redis_2.py
from celery import Celery import time #app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000') app = Celery('test_redis_2', backend='redis', broker='redis://100.69.201.116:7001') @app.task def add_2(x, y): print "=======>" time.sleep(5) print "<=================" return x + y if __name__ == "__main__": app.start()
測試
from celery import group from test_redis import * from test_redis_2 import * ll = [(1,2), (3,4), (5,6)] g = group(add.s(key[0], key[1]) for key in ll).apply_async() for ret in g.get(): print ret print "end redis_1 -----------------------------------" ll = [(1,2), (3,4), (5,6)] g = group(add_2.s(key[0], key[1]) for key in ll).apply_async() for ret in g.get(): print ":", ret print "end redis_2 -----------------------------------"
結果
3 7 11 end redis_1 ----------------------------------- : 3 : 7 : 11 end redis_2 -----------------------------------
註釋:須要提早設置下隊列
##例1
#test.py
from celery import Celery import time app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//') @app.task def add(x, y): print "------>" time.sleep(5) print "<--------------" return x + y if __name__ == "__main__": app.start()
#test_2.py
from celery import Celery import time app = Celery('test_2', backend='amqp', broker='amqp://admin:admin@localhost:5672//hwzh') @app.task def add_2(x, y): print "=====>" time.sleep(5) print "<==========" return x + y if __name__ == "__main__": app.start()
測試
from celery import group from test import * from test_2 import * ll = [(1,2), (3,4), (7,8)] g = group(add.s(key[0], key[1]) for key in ll).apply_async() for ret in g.get(): print ret ll = [(1,2), (3,4), (7,8)] g = group(add_2.s(key[0], key[1]) for key in ll).apply_async() for ret in g.get(): print ret
結果
3 7 15 3 7 15
##例2
#test.py
from celery import Celery import time app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//mq4') @app.task def add(x, y): print "------>" time.sleep(2) print "<--------------" return x + y @app.task def sum(x, y): print "------>" time.sleep(2) print "<--------------" return x + y if __name__ == "__main__": app.start()
#eg2.py
from test import * import time rev = [] for i in range(3): rev.append(add.delay(1,3)) for i in range(3): rev.append(sum.delay(1,3)) print "len rev:", len(rev) while 1: tag = 1 for key in rev: if not key.ready(): tag = 0 time.sleep(1) print "sleep 1" if tag: break print "_____________________>"
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://admin:admin@localhost') @app.task def add(x, y): return x + y
啓動
celery -A tasks_1 worker --loglevel=info
與前例不一樣:
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f8057931810
- ** ---------- .> transport: amqp://admin:**@localhost:5672//
- ** ---------- .> results: amqp
運行
>>> from tasks_1 import add >>> result = add.delay(1, 3) >>> result.ready() True >>> result.get() 4
from celery import Celery from kombu import Exchange, Queue BROKER_URL = 'amqp://admin:admin@localhost//' app = Celery('tasks', backend='amqp',broker=BROKER_URL) app.conf.update( CELERY_ROUTES={ "add1":{"queue":"queue_add1"}, "add2":{"queue":"queue_add2"}, "add3":{"queue":"queue_add3"}, "add4":{"queue":"queue_add4"}, }, ) @app.task def add1(x, y): return x + y @app.task def add2(x, y): return x + y @app.task def add3(x, y): return x + y @app.task def add4(x, y): return x + y
文件:tasks.py
from celery import Celery, platforms import time import os app = Celery('proj', broker='amqp://admin:admin@ip:5672', include=['tasks'] ) app.conf.update( CELERY_ROUTES={ 'tasks.fun_1': { 'queue': "q_1" }, 'tasks.fun_2': { 'queue': "q_2" } } ) platforms.C_FORCE_ROOT = True @app.task def fun_1(n): print "(((((((((((((((func_1", n return 1 @app.task def fun_2(n): print n, ")))))))))))))))" return 2 if __name__ == "__main__": app.start()
啓動
python tasks.py worker -c 2 -Q q_1
python tasks.py worker -c 2 -Q q_2
兩個消息隊列:q_1, q_2,調用示例
>>> from tasks import * >>> fun_1(1) (((((((((((((((func_1 1 1 >>> fun_1.delay(1) <AsyncResult: 528a2ad1-bc16-4bdc-beff-cd166fe3e885> >>> fun_2.delay(2) <AsyncResult: ee5881eb-b384-4a39-ba00-08aa8ee53504>
#tasks.py
from celery import Celery import time import multiprocessing as mp app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks") def test_func(i): print "beg...:", i time.sleep(5) print "....end:", i return i * 5 @app.task def fun_1(n): curr_proc = mp.current_process() curr_proc.daemon = False p = mp.Pool(mp.cpu_count()) curr_proc.daemon = True for i in range(n): p.apply_async(test_func, args=(i,)) p.close() p.join() return 1 if __name__ == "__main__": app.start()
說明
直接啓動多進程是確定不能夠的,由於是守候進程(curr_proc.daemon=True),因此啓多進程以前主動設置爲非守候進程:curr_proc.daemon=False,啓動了之後再設爲守候進程
#tasks_callback.py
from celery import Celery import time import multiprocessing as mp app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks_callback") rev = [] def test_func(i): print "beg...:", i time.sleep(5) print "....end:", i return i * 5 def callback_log(rev_val): rev.append(rev_val) @app.task def fun_1(n): print "before rev:", rev curr_proc = mp.current_process() curr_proc.daemon = False p = mp.Pool(mp.cpu_count()) curr_proc.daemon = True for i in range(n): p.apply_async(test_func, args=(i,), callback=callback_log) p.close() p.join() print "after rev:", rev return 1 if __name__ == "__main__": app.start()
1. CELERYD_PREFETCH_MULTIPLIER
同時預取得消息個數,好比若是CELERYD_PREFETCH_MULTIPLIER=2,那麼若是如今對於1個worker,有一個狀態是STARTED, 那麼能夠有2個處於RECEVED狀態(若是有的話),這樣就避免了若是消息不少所有分下取,後起來的worker領不到消息的尷尬。
參考代碼
from celery import Celery, platforms import time import os app = Celery('proj', broker='amqp://admin:admin@localhost:5672', include=['tasks'] ) app.conf.update( CELERYD_PREFETCH_MULTIPLIER=2, CELERY_ROUTES={ 'tasks.fun_1': { 'queue': "q_1" }, 'tasks.fun_2': { 'queue': "q_2" } } ) platforms.C_FORCE_ROOT = True @app.task def fun_1(n): print "(((((((((((((((func_1", n time.sleep(20) return 1 @app.task def fun_2(n): print n, ")))))))))))))))" return 2
調用
>>> from tasks import * >>> fun_1.delay(3) <AsyncResult: 609f2216-6785-409e-9f6f-85ae3fcce084> >>> fun_1.delay(3) <AsyncResult: 0230b8bd-b237-40ef-bc73-88929f8f8290> >>> fun_1.delay(3) <AsyncResult: 8fce172a-93c9-41f8-8c08-377a4363389c> >>> fun_1.delay(3)
參考:http://windrocblog.sinaapp.com/?p=1585