#SORA#celery實踐1


此次研究celery的Next Step部分。python

先建立一個python module:linux

mkdir proj
cd proj
touch __init__.py


在proj目錄中建立celery.py:json

from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_RESULT_SERIALIZER='json'
)
if __name__ == '__main__':
    app.start()

解析:安全

  1. app=Celery('proj'),命名這個模塊爲'proj',詳細可參考User Guide的Main Name部分app

  2. broker='amqp://',指定broker,這裏用的是rabbitmq。由於rabbitmq默認的用戶爲guest(密碼爲guest),你也能夠這樣寫:amqp://guest@localhost//async

  3. backend='amqp://',指定一個backend,若須要檢查worker執行任務完成後的返回內容,你必須設置一個backendide

  4. app.conf.update(....),在該程序中修改相關配置,最佳實踐是把配置放到一個獨立的文件中。修改的內容是當使用amqp爲backend時,result保存的時間。這裏好像用的是秒爲單位函數

  5. include[]是爲了指定要導入的文件測試


備註:默認狀況下,celery使用pickle做爲payload,我測試時用的是root用戶,會提示有安全問題,而不予執行。於是我須要配置CELERY_TASK_SERIALIZER,CELERY_ACCEPT_CONTENT,CELERY_RESULT_SERIALIZER爲json。(另外請參考個人另外一篇blog:http://my.oschina.net/hochikong/blog/393270 ui



同一目錄中,建立tasks.py:

from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
    return x + y
@app.task
def mul(x, y):
    return x * y
@app.task
def xsum(numbers):
    return sum(numbers)


啓動worker:

celery -A proj worker -l info


提示以下:

能夠看到:

RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

其餘相關解析請參考celery文檔。


在本機傳遞任務給worker(新開一個終端):

root@workgroup0:~/celeryapp# ls
cagent.py  cagent.pyc  config.py  config.pyc  proj  test.py

能夠看到proj這個目錄

進入python解釋器:

root@workgroup0:~/celeryapp# python
Python 2.7.6 (default, Mar 22 2014, 22:59:56) 
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from proj.agent import add             #個人tasks.py重命名爲agent.py
>>> res = add.delay(2,5)
>>>

注意import的形式:由於具體的函數保存在tasks.py(個人是agent.py),因此import的方法應爲:module.path:attribute(具體見celery對於import的解釋)

咱們調用delay(2,5)執行add函數(也可使用apply_async(),但參數須要放在元組裏傳入),傳入的參數分別是2和5,返回一個對象res

>>> res
<AsyncResult: c08c72ed-8566-4025-b7f5-6ea5a9137966>

調用get()方法獲取運算結果:

>>> res.get()
7

注意,若是任務執行的時間好久,get()須要設置timeout,例如:get(timeout=1)

獲取運算任務的信息:

>>> res.state
u'SUCCESS'

返回一個unicode字符串SUCCESS


咱們看看另外一個終端,即啓動worker那個:

[2015-04-04 14:41:29,772: INFO/MainProcess] Received task: proj.agent.add[60eea8f6-0b6a-4bb4-909f-60a377936dcc]
[2015-04-04 14:41:29,798: INFO/MainProcess] Task proj.agent.add[60eea8f6-0b6a-4bb4-909f-60[2015-04-04 15:05:38,847: INFO/MainProcess] Received task: proj.agent.add[c08c72ed-8566-4025-b7f5-6ea5a9137966]
[2015-04-04 15:05:38,867: INFO/MainProcess] Task proj.agent.add[c08c72ed-8566-4025-b7f5-6ea5a9137966] succeeded in 0.0136815000001s: 7

能夠看到任務執行的信息。


要關閉worker,直接按Ctrl+C便可

相關文章
相關標籤/搜索