此次研究celery的Next Step部分。python
先建立一個python module:linux
mkdir proj cd proj touch __init__.py
在proj目錄中建立celery.py:json
from __future__ import absolute_import from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( CELERY_TASK_RESULT_EXPIRES=3600, CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERY_RESULT_SERIALIZER='json' ) if __name__ == '__main__': app.start()
解析:安全
app=Celery('proj'),命名這個模塊爲'proj',詳細可參考User Guide的Main Name部分app
broker='amqp://',指定broker,這裏用的是rabbitmq。由於rabbitmq默認的用戶爲guest(密碼爲guest),你也能夠這樣寫:amqp://guest@localhost//async
backend='amqp://',指定一個backend,若須要檢查worker執行任務完成後的返回內容,你必須設置一個backendide
app.conf.update(....),在該程序中修改相關配置,最佳實踐是把配置放到一個獨立的文件中。修改的內容是當使用amqp爲backend時,result保存的時間。這裏好像用的是秒爲單位函數
include[]是爲了指定要導入的文件測試
備註:默認狀況下,celery使用pickle做爲payload,我測試時用的是root用戶,會提示有安全問題,而不予執行。於是我須要配置CELERY_TASK_SERIALIZER,CELERY_ACCEPT_CONTENT,CELERY_RESULT_SERIALIZER爲json。(另外請參考個人另外一篇blog:http://my.oschina.net/hochikong/blog/393270 ui
同一目錄中,建立tasks.py:
from __future__ import absolute_import from proj.celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
啓動worker:
celery -A proj worker -l info
提示以下:
能夠看到:
RuntimeWarning: You are running the worker with superuser privileges, which is absolutely not recommended!
其餘相關解析請參考celery文檔。
在本機傳遞任務給worker(新開一個終端):
root@workgroup0:~/celeryapp# ls cagent.py cagent.pyc config.py config.pyc proj test.py
能夠看到proj這個目錄
進入python解釋器:
root@workgroup0:~/celeryapp# python Python 2.7.6 (default, Mar 22 2014, 22:59:56) [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from proj.agent import add #個人tasks.py重命名爲agent.py >>> res = add.delay(2,5) >>>
注意import的形式:由於具體的函數保存在tasks.py(個人是agent.py),因此import的方法應爲:module.path:attribute(具體見celery對於import的解釋)
咱們調用delay(2,5)執行add函數(也可使用apply_async(),但參數須要放在元組裏傳入),傳入的參數分別是2和5,返回一個對象res
>>> res <AsyncResult: c08c72ed-8566-4025-b7f5-6ea5a9137966>
調用get()方法獲取運算結果:
>>> res.get() 7
注意,若是任務執行的時間好久,get()須要設置timeout,例如:get(timeout=1)
獲取運算任務的信息:
>>> res.state u'SUCCESS'
返回一個unicode字符串SUCCESS
咱們看看另外一個終端,即啓動worker那個:
[2015-04-04 14:41:29,772: INFO/MainProcess] Received task: proj.agent.add[60eea8f6-0b6a-4bb4-909f-60a377936dcc] [2015-04-04 14:41:29,798: INFO/MainProcess] Task proj.agent.add[60eea8f6-0b6a-4bb4-909f-60[2015-04-04 15:05:38,847: INFO/MainProcess] Received task: proj.agent.add[c08c72ed-8566-4025-b7f5-6ea5a9137966] [2015-04-04 15:05:38,867: INFO/MainProcess] Task proj.agent.add[c08c72ed-8566-4025-b7f5-6ea5a9137966] succeeded in 0.0136815000001s: 7
能夠看到任務執行的信息。
要關閉worker,直接按Ctrl+C便可