celery --分佈式任務隊列

1、介紹html

celery是一個基於python開發的分佈式異步消息任務隊列,用於處理大量消息,同時爲操做提供維護此類系統所需的工具。
它是一個任務隊列,專一於實時處理,同時還支持任務調度。若是你的業務場景中須要用到異步任務,就能夠考慮使用celerypython

2、實例場景redis

一、你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行ing進行時,你能夠繼續作其它的事情。
二、你想作一個定時任務,好比天天檢測一下大家全部客戶的資料,若是發現今天 是客戶的生日,就給他發個短信祝福django

3、優勢json

  • 一、簡單:一但熟悉了celery的工做流程後,配置和使用仍是比較簡單的
  • 二、高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務
  • 三、快速:一個單進程的celery每分鐘可處理上百萬個任務
  • 四、靈活:幾乎celery的各個組件均可以被擴展及自定製

4、入門後端

celery 須要一個解決方案來發送和接受消息,一般,這是以稱爲消息代理的單獨服務的形式出現的
有如下幾種解決方案,包括:
一:RabbitMQ(消息隊列,一種程序之間的通訊方式)
rabbitmq 功能齊全,穩定,耐用且易於安裝。它是生產環境的絕佳選擇。
若是您正在使用Ubuntu或Debian,請執行如下命令安裝RabbitMQ:服務器

$ sudo apt-get install rabbitmq-server

命令完成後,代理已經在後臺運行,準備爲您移動消息:。Starting rabbitmq-server: SUCCESS
2、redisapp

redis功能齊全,但在忽然停止或者電源故障時更容易丟失數據異步

5、安裝async

$ pip install celery 

6、應用

建立一個tasks.py文件

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
    return x + y

第一個參數Celery是當前模塊的名稱。只有在__main__模塊中定義任務時才能自動生成名稱。
第二個參數是broker關鍵字參數,指定要使用的消息代理的URL。這裏使用RabbitMQ(也是默認選項)。
您可使用RabbitMQ amqp://localhost,或者您可使用Redis redis://localhost。
您定義了一個名爲add的任務,返回兩個數字的總和。

 1 from __future__ import absolute_import
 2             import os
 3             from celery import Celery
 4             from django.conf import settings
 5             # set the default Django settings module for the 'celery' program.
 6             os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'saruman_server.settings')
 7             app = Celery('saruman_server')
 8 
 9             # Using a string here means the worker will not have to
10             # pickle the object when using Windows.
11             app.config_from_object('django.conf:settings')
12             app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
13 
14             @app.task(bind=True)
15             def debug_task(self):
16                 print('Request: {0!r}'.format(self.request))
和django配合實例

7、運行celery工做服務器

您如今能夠經過使用worker 參數執行咱們的程序來運行worker :

celery -A tasks worker --loglevel=info

有關可用命令行選項的完整列表,請執行如下操做:

$ celery worker --help

還有其餘幾個可用的命令,也能夠提供幫助:

$ celery help

8、調用任務

要調用咱們的任務,您可使用該delay()方法。
apply_async() 能夠更好地控制任務執行

>>> from tasks import add
>>> add.delay(4, 4)

調用任務會返回一個AsyncResult實例。這可用於檢查任務的狀態,等待任務完成,或獲取其返回值(或者若是任務失敗,則獲取異常和回溯)。

9、保持結果

若是您想跟蹤任務的狀態,Celery須要在某處存儲或發送狀態。有幾個內置的結果後端可供選擇:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您能夠定義本身的。
在本例中,咱們使用rpc結果後端,它將狀態做爲瞬態消息發回。後端經過backend參數 指定Celery

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,若是您想使用Redis做爲結果後端,但仍然使用RabbitMQ做爲消息代理(一種流行的組合):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

如今配置告終果後端,讓咱們再次調用該任務。此次你將保持AsyncResult調用任務時返回的實例:

>>> result = add.delay(4, 4)

該ready()方法返回任務是否已完成處理:

>>> result.ready()
False 

10、配置

與消費類電器同樣,celery不須要太多配置便可運行。它有一個輸入和一個輸出。輸入必須鏈接代理,輸出能夠
選擇到結果後端。
能夠直接在應用程序上或使用專用配置模塊設置配置。例如,您能夠經過更改task_serializer設置來配置用於序列化任務有效負載的默認序列化程序:

app.conf.task_serializer = 'json'

若是您一次配置了許多設置,則可使用update:

app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)

對於大型項目,建議使用專用配置模塊。不鼓勵硬編碼週期性任務間隔和任務路由選項。將它們保存在集中位置要好得多。對於庫來講尤爲如此,由於它使用戶可以控制其任務的行爲方式。集中配置還容許您的SysAdmin在發生系統故障時進行簡單的更改。
您能夠經過調用app.config_from_object()方法告訴Celery實例使用配置模塊:

app.config_from_object('celeryconfig')

此模塊一般稱爲「 celeryconfig」,但您可使用任何模塊名稱。
在上面的例子中,一個名爲的模塊celeryconfig.py必須能夠從當前目錄或Python路徑加載。它可能看起來像這樣:
celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
 1 from datetime import timedelta
 2 
 3 import djcelery
 4 
 5 djcelery.setup_loader()
 6 BROKER_URL = 'amqp://guest@localhost//'  #輸入
 7 CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'  #返回的結果
 8 
 9 #導入指定的任務模塊
10 CELERY_IMPORTS = (
11     'fir.app.fir.tasks',
12 )
13 
14 CELERYBEAT_SCHEDULE = {
15     'receive_mail': {
16         "task": "fir.app.fir.tasks.receive_mail",
17         "schedule": timedelta(seconds=5),
18         "args": (),
19     },
20 }
View Code  

要驗證配置文件是否正常工做且不包含任何語法錯誤,您能夠嘗試導入它:
####################################################

python -m celeryconfig

爲了演示配置文件的強大功能,您能夠將行爲不當的任務路由到專用隊列:

celeryconfig.py:
task_routes = {
'tasks.add': 'low-priority',
}
或者不是路由它,而是能夠對任務進行速率限制,這樣在一分鐘(10 / m)內只能處理10種此類任務:

celeryconfig.py:
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
若是您使用RabbitMQ或Redis做爲代理,那麼您還能夠指示工做人員在運行時爲任務設置新的速率限制:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully

11、在項目中如何使用celery

一、能夠把celery配置成一個應用
二、目錄結構以下:

proj/__init__.py
    /celery.py
    /tasks.py

三、proj/celery.py內容

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
    broker='amqp://',
    backend='amqp://',
    include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

四、proj/tasks.py中的內容

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

五、啓動worker

$ celery -A proj worker -l info

輸出

-------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x103a020f0
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery

django 中使用celery:參考連接:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django 

12、監控工具flower

若是有些任務出現問題,能夠用flower工具監控(基於tornado)
安裝:pip install flower

使用:
三種啓動方式

celery flower
celery flower --broker 
python manage.py celery flower #就能讀取到配置裏的broker_url 默認是rabbitmq

打開運行後的連接 打開worker python manage.py celery worker -l info

相關文章
相關標籤/搜索