pip install celery # pip install flower #celery的及時監控組件
默認celery的配置文件名是celeryconfig.py,本例的內容以下:python
BROKER_URL ='amqp://guest:guest[@localhost](https://my.oschina.net/u/570656):5672' CELERY_ENABLE_UTC = True CELERY_TIMEZONE='Asia/Shanghai' CELERY_IMPORTS = ('celery_demo.tasks','celery_demo.tasks2') # other config option
默認flower的配置文件名是flowerconfig.py,本例的內容以下:api
broker_api = 'http://guest:guest[@localhost](https://my.oschina.net/u/570656):15672/api/' logging = 'DEBUG' basic_auth = ['admin:admin'] #加上受權保護 address = '127.0.0.1' port = 5556
#開發任務 項目結構以下:瀏覽器
├── celery_demo │ ├── celeryconfig.py │ ├── flowerconfig.py │ ├── __init__.py │ ├── README.md │ ├── tasks2.py │ ├── tasks.py ├── __init__.py └── test ├── __init__.py ├── tasks_test.py
爲演示目的,tasks.py和tasks2.py的代碼是相同的,tasks.py的代碼以下:app
from celery import Celery from celery.exceptions import Reject,MaxRetriesExceededError from celery.utils.log import task_logger import celeryconfig app = Celery() import requests from requests.exceptions import ConnectTimeout class ErrorURLStartException(Exception): def __init__(self): self.message = '必須是HTTP或HTTPS開頭' @app.task(bind=True, acks_late=True) def task1(self, url): print url try: if url.startswith('http')==False : raise ErrorURLStartException() res = requests.get(url, timeout=5) task_logger.info('status_code:%s' % res.status_code) except ConnectTimeout as e: raise self.retry(exc=e, countdown=5, max_retries=5) except ErrorURLStartException as e: raise Reject(reason=e, requeue=False) #requeue=True可能會形成任務一致被循環的處理,永遠不會結束 else: task_logger.info('任務執行完畢') if __name__ == '__main__': app.config_from_object(celeryconfig) app.worker_main(['tasks', '-lDEBUG'])
tasks.task1任務的目的是當一個合法的HTTP URL過來的時候去GET內容,若是出現鏈接超時的異常就重試5次,若是出現非法的URL就直接將任務拋棄。異步
單元測試的代碼以下:async
import unittest from unittest import TestCase from celery_demo.tasks import task1 from celery_demo.tasks2 import task1 as task2 class TasksTestCase(TestCase): def test_task1(self): print 'test_task1' url = 'http://www.baidu.com' task1.apply_async(args=[url]) def test_task2(self): print 'test_task2' url = 'http://169.24.1.100' task1.apply_async(args=[url]) def test_task3(self): print 'test_task3' url = 'adfhttp://169.24.1.100' task1.apply_async(args=[url]) def test_task4(self): print 'test_task4' url = 'http://169.24.1.100' task2.apply_async(args=[url])
進入到 celery_demo
的上一級目錄單元測試
george@george-pc:~/workspace/work_demo$ ll 總用量 24 drwxr-xr-x 2 george george 4096 9月 16 22:45 celery_demo -rw-r--r-- 1 george george 56 9月 15 22:36 __init__.py drwxr-xr-x 2 george george 4096 6月 4 11:29 rabbitmq_demo -rw-r--r-- 1 george george 254 9月 15 21:40 tasks.py -rw-r--r-- 1 george george 547 9月 16 00:12 tasks.pyc drwxr-xr-x 2 george george 4096 9月 16 12:58 test
分別實行下面的三條指令:測試
啓動flower的實例url
celery flower --conf=celery_demo/flowerconfig.pyspa
啓動celery worker的實例
celery worker --config=celery_demo.celeryconfig -n worker1.%h -l DEBUG
運行單元測試
python -m unittest test.tasks_test
能夠經過在瀏覽器中輸入http://localhost:5556/ 進行flower的訪問,如圖:
使用celery來進行異步任務的處理讓程序擴展性獲得提高然而並無增長應用的複雜性.開發人員幾乎不用關注broker的存在,只把精力關注在如何設計task上。粗淺的理解。