Celery 入門-簡單任務開發

安裝

pip install celery  #

pip install flower #celery的及時監控組件

配置

默認celery的配置文件名是celeryconfig.py,本例的內容以下:python

BROKER_URL ='amqp://guest:guest[@localhost](https://my.oschina.net/u/570656):5672' 
CELERY_ENABLE_UTC = True 
CELERY_TIMEZONE='Asia/Shanghai' 
CELERY_IMPORTS = ('celery_demo.tasks','celery_demo.tasks2')
# other config option

默認flower的配置文件名是flowerconfig.py,本例的內容以下:api

broker_api = 'http://guest:guest[@localhost](https://my.oschina.net/u/570656):15672/api/'
logging = 'DEBUG'
basic_auth = ['admin:admin'] #加上受權保護

address = '127.0.0.1'
port = 5556

#開發任務 項目結構以下:瀏覽器

├── celery_demo
│   ├── celeryconfig.py 
│   ├── flowerconfig.py
│   ├── __init__.py 
│   ├── README.md
│   ├── tasks2.py 
│   ├── tasks.py 
├── __init__.py     
└── test
    ├── __init__.py 
    ├── tasks_test.py

爲演示目的,tasks.py和tasks2.py的代碼是相同的,tasks.py的代碼以下:app

from celery import Celery
from celery.exceptions import Reject,MaxRetriesExceededError
from celery.utils.log import task_logger

import celeryconfig
app = Celery()
import requests
from requests.exceptions import ConnectTimeout 

class ErrorURLStartException(Exception):
    def __init__(self):
        self.message = '必須是HTTP或HTTPS開頭' 

@app.task(bind=True, acks_late=True)
def task1(self, url):
    print url
    try:
        if url.startswith('http')==False :
            raise ErrorURLStartException()
        res = requests.get(url, timeout=5)
        task_logger.info('status_code:%s' % res.status_code)
    except ConnectTimeout as e:
        raise self.retry(exc=e, countdown=5, max_retries=5)
    except ErrorURLStartException as e:
        raise Reject(reason=e, requeue=False) #requeue=True可能會形成任務一致被循環的處理,永遠不會結束
    else:
        task_logger.info('任務執行完畢') 

if __name__ == '__main__':
    app.config_from_object(celeryconfig)
    app.worker_main(['tasks', '-lDEBUG'])

tasks.task1任務的目的是當一個合法的HTTP URL過來的時候去GET內容,若是出現鏈接超時的異常就重試5次,若是出現非法的URL就直接將任務拋棄。異步

單元測試的代碼以下:async

import unittest
from unittest import TestCase 
from celery_demo.tasks import task1
from celery_demo.tasks2 import task1 as task2




class TasksTestCase(TestCase):
    def test_task1(self):
        print 'test_task1'
        url = 'http://www.baidu.com'
        task1.apply_async(args=[url])
     

    def test_task2(self):
        print 'test_task2'
        url = 'http://169.24.1.100'
        task1.apply_async(args=[url])
    

    def test_task3(self):
        print 'test_task3'
        url = 'adfhttp://169.24.1.100'
        task1.apply_async(args=[url])
    

    def test_task4(self):
        print 'test_task4'
        url = 'http://169.24.1.100'
        task2.apply_async(args=[url])

啓動任務

進入到 celery_demo的上一級目錄單元測試

george@george-pc:~/workspace/work_demo$ ll
總用量 24
drwxr-xr-x 2 george george 4096 9月  16 22:45 celery_demo
-rw-r--r-- 1 george george   56 9月  15 22:36 __init__.py
drwxr-xr-x 2 george george 4096 6月   4 11:29 rabbitmq_demo
-rw-r--r-- 1 george george  254 9月  15 21:40 tasks.py
-rw-r--r-- 1 george george  547 9月  16 00:12 tasks.pyc
drwxr-xr-x 2 george george 4096 9月  16 12:58 test

分別實行下面的三條指令:測試

  • 啓動flower的實例url

    celery flower --conf=celery_demo/flowerconfig.pyspa

  • 啓動celery worker的實例

    celery worker --config=celery_demo.celeryconfig -n worker1.%h -l DEBUG

  • 運行單元測試

    python -m unittest test.tasks_test

運行結果

能夠經過在瀏覽器中輸入http://localhost:5556/ 進行flower的訪問,如圖:輸入圖片說明

總結

使用celery來進行異步任務的處理讓程序擴展性獲得提高然而並無增長應用的複雜性.開發人員幾乎不用關注broker的存在,只把精力關注在如何設計task上。粗淺的理解。

相關文章
相關標籤/搜索