1、介紹html
celery是一個基於python開發的分佈式異步消息任務隊列,用於處理大量消息,同時爲操做提供維護此類系統所需的工具。
它是一個任務隊列,專一於實時處理,同時還支持任務調度。若是你的業務場景中須要用到異步任務,就能夠考慮使用celerypython
2、實例場景redis
一、你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行ing進行時,你能夠繼續作其它的事情。
二、你想作一個定時任務,好比天天檢測一下大家全部客戶的資料,若是發現今天 是客戶的生日,就給他發個短信祝福django
3、優勢json
4、入門後端
celery 須要一個解決方案來發送和接受消息,一般,這是以稱爲消息代理的單獨服務的形式出現的
有如下幾種解決方案,包括:
一:RabbitMQ(消息隊列,一種程序之間的通訊方式)
rabbitmq 功能齊全,穩定,耐用且易於安裝。它是生產環境的絕佳選擇。
若是您正在使用Ubuntu或Debian,請執行如下命令安裝RabbitMQ:服務器
$ sudo apt-get install rabbitmq-server
命令完成後,代理已經在後臺運行,準備爲您移動消息:。Starting rabbitmq-server: SUCCESS
2、redisapp
redis功能齊全,但在忽然停止或者電源故障時更容易丟失數據異步
5、安裝async
$ pip install celery
6、應用
建立一個tasks.py文件
from celery import Celery app = Celery('tasks', broker='pyamqp://guest@localhost//') @app.task def add(x, y): return x + y
第一個參數Celery是當前模塊的名稱。只有在__main__模塊中定義任務時才能自動生成名稱。
第二個參數是broker關鍵字參數,指定要使用的消息代理的URL。這裏使用RabbitMQ(也是默認選項)。
您可使用RabbitMQ amqp://localhost,或者您可使用Redis redis://localhost。
您定義了一個名爲add的任務,返回兩個數字的總和。
1 from __future__ import absolute_import 2 import os 3 from celery import Celery 4 from django.conf import settings 5 # set the default Django settings module for the 'celery' program. 6 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'saruman_server.settings') 7 app = Celery('saruman_server') 8 9 # Using a string here means the worker will not have to 10 # pickle the object when using Windows. 11 app.config_from_object('django.conf:settings') 12 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 13 14 @app.task(bind=True) 15 def debug_task(self): 16 print('Request: {0!r}'.format(self.request))
7、運行celery工做服務器
您如今能夠經過使用worker 參數執行咱們的程序來運行worker :
celery -A tasks worker --loglevel=info
有關可用命令行選項的完整列表,請執行如下操做:
$ celery worker --help
還有其餘幾個可用的命令,也能夠提供幫助:
$ celery help
8、調用任務
要調用咱們的任務,您可使用該delay()方法。
apply_async() 能夠更好地控制任務執行
>>> from tasks import add >>> add.delay(4, 4)
調用任務會返回一個AsyncResult實例。這可用於檢查任務的狀態,等待任務完成,或獲取其返回值(或者若是任務失敗,則獲取異常和回溯)。
9、保持結果
若是您想跟蹤任務的狀態,Celery須要在某處存儲或發送狀態。有幾個內置的結果後端可供選擇:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您能夠定義本身的。
在本例中,咱們使用rpc結果後端,它將狀態做爲瞬態消息發回。後端經過backend參數 指定Celery
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
或者,若是您想使用Redis做爲結果後端,但仍然使用RabbitMQ做爲消息代理(一種流行的組合):
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
如今配置告終果後端,讓咱們再次調用該任務。此次你將保持AsyncResult調用任務時返回的實例:
>>> result = add.delay(4, 4)
該ready()方法返回任務是否已完成處理:
>>> result.ready()
False
10、配置
與消費類電器同樣,celery不須要太多配置便可運行。它有一個輸入和一個輸出。輸入必須鏈接代理,輸出能夠
選擇到結果後端。
能夠直接在應用程序上或使用專用配置模塊設置配置。例如,您能夠經過更改task_serializer設置來配置用於序列化任務有效負載的默認序列化程序:
app.conf.task_serializer = 'json'
若是您一次配置了許多設置,則可使用update:
app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Europe/Oslo', enable_utc=True, )
對於大型項目,建議使用專用配置模塊。不鼓勵硬編碼週期性任務間隔和任務路由選項。將它們保存在集中位置要好得多。對於庫來講尤爲如此,由於它使用戶可以控制其任務的行爲方式。集中配置還容許您的SysAdmin在發生系統故障時進行簡單的更改。
您能夠經過調用app.config_from_object()方法告訴Celery實例使用配置模塊:
app.config_from_object('celeryconfig')
此模塊一般稱爲「 celeryconfig」,但您可使用任何模塊名稱。
在上面的例子中,一個名爲的模塊celeryconfig.py必須能夠從當前目錄或Python路徑加載。它可能看起來像這樣:
celeryconfig.py:
broker_url = 'pyamqp://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True
1 from datetime import timedelta 2 3 import djcelery 4 5 djcelery.setup_loader() 6 BROKER_URL = 'amqp://guest@localhost//' #輸入 7 CELERY_RESULT_BACKEND = 'amqp://guest@localhost//' #返回的結果 8 9 #導入指定的任務模塊 10 CELERY_IMPORTS = ( 11 'fir.app.fir.tasks', 12 ) 13 14 CELERYBEAT_SCHEDULE = { 15 'receive_mail': { 16 "task": "fir.app.fir.tasks.receive_mail", 17 "schedule": timedelta(seconds=5), 18 "args": (), 19 }, 20 }
要驗證配置文件是否正常工做且不包含任何語法錯誤,您能夠嘗試導入它:
####################################################
python -m celeryconfig
爲了演示配置文件的強大功能,您能夠將行爲不當的任務路由到專用隊列:
celeryconfig.py:
task_routes = {
'tasks.add': 'low-priority',
}
或者不是路由它,而是能夠對任務進行速率限制,這樣在一分鐘(10 / m)內只能處理10種此類任務:
celeryconfig.py:
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
若是您使用RabbitMQ或Redis做爲代理,那麼您還能夠指示工做人員在運行時爲任務設置新的速率限制:
$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully
11、在項目中如何使用celery
一、能夠把celery配置成一個應用
二、目錄結構以下:
proj/__init__.py /celery.py /tasks.py
三、proj/celery.py內容
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
四、proj/tasks.py中的內容
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
五、啓動worker
$ celery -A proj worker -l info
輸出
-------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall) ---- **** ----- --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: proj:0x103a020f0 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost/ - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
django 中使用celery:參考連接:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django
12、監控工具flower
若是有些任務出現問題,能夠用flower工具監控(基於tornado)
安裝:pip install flower
使用:
三種啓動方式
celery flower celery flower --broker python manage.py celery flower #就能讀取到配置裏的broker_url 默認是rabbitmq
打開運行後的連接 打開worker python manage.py celery worker -l info