異步任務是web開發中一個很常見的方法。對於一些耗時耗資源的操做,每每從主應用中隔離,經過異步的方式執行。簡而言之,作一個註冊的功能,在用戶使用郵箱註冊成功以後,須要給該郵箱發送一封激活郵件。若是直接放在應用中,則調用發郵件的過程會遇到網絡IO的阻塞,比好優雅的方式則是使用異步任務,應用在業務邏輯中觸發一個異步任務。javascript
實現異步任務的工具備不少,其原理都是使用一個任務隊列,好比使用redis生產消費模型或者發佈訂閱模式實現一個簡單的消息隊列。php
除了redis,還可使用另一個神器---Celery。Celery是一個異步任務的調度工具。它是Python寫的庫,可是它實現的通信協議也可使用ruby,php,javascript等調用。異步任務除了消息隊列的後臺執行的方式,仍是一種則是跟進時間的計劃任務。下面將會介紹如何使用celery實現這兩種需求。java
最先學習celery的時候,冒出了一個rabbitmq,又冒出一個redis。當時一頭霧水。實際上這正是celery的設計奧妙。簡單來講,rabbitmq是一個採用Erlang寫的強大的消息隊列工具。在celery中能夠扮演broker的角色。那麼什麼是broker?python
broker是一個消息傳輸的中間件,能夠理解爲一個郵箱。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,然後celery的worker將會取到消息,進行對於的程序執行。好吧,這個郵箱能夠當作是一個消息隊列。那麼什麼又是backend,一般程序發送的消息,發完就完了,可能都不知道對方時候接受了。爲此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。對於 brokers,官方推薦是rabbitmq和redis,至於backend,就是數據庫啦。爲了簡單起見,咱們都用redis。web
使用celery包含三個方面,其一是定義任務函數,其二是運行celery服務,最後是客戶應用程序的調用。redis
建立一個文件 tasks.py
數據庫
輸入下列代碼:ruby
from celery import Celery brokers = 'redis://127.0.0.1:6379/5' backend = 'redis://127.0.0.1:6379/6' app = Celery('tasks', broker=broker, backend=backend) @app.task def add(x, y): return x + y
上述代碼導入了celery,而後建立了celery實例app,實力話的過程當中,指定了任務名tasks
(和文件名一致),傳入了broker和backend。而後建立了一個任務函數add
。網絡
下面就啓動celery服務併發
在當前命令行終端運行:
celery -A tasks worker --loglevel=info
此時會看見一對輸出。包括註冊的任務啦。
下面客戶端程序如何調用呢?打開一個命令行,進入Python環境
In [0]:from tasks import add In [1]: r = add.delay(2, 2) In [2]: add.delay(2, 2) Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d> In [3]: r = add.delay(3, 3) In [4]: r.re r.ready r.result r.revoke In [4]: r.ready() Out[4]: True In [6]: r.result Out[6]: 6 In [7]: r.get() Out[7]: 6
在celery命令行能夠看見celery執行的日誌:
[2015-09-20 21:37:06,086: INFO/MainProcess] Task proj.tasks.add[76beb980-0f55-4629-a4fb-4a1776428ea8] succeeded in 0.00089102005586s: 6
打開 backend的redis,也能夠看見celery執行的信息。
如今時在python環境中調用的add函數,實際上一般在應用程序中調用這個方法。須要注意,若是把返回值賦值給一個變量,那麼原來的應用程序也會被阻塞,須要等待異步任務返回的結果。所以,實際使用中,不須要把結果賦值。
上述的使用是簡單的配置,下面介紹一個更健壯的方式來使用celery。首先建立一個python包,celery服務,姑且命名爲proj。目錄文件以下:
☁ proj tree . ├── __init__.py ├── celery.py # 建立 celery 實例 ├── config.py # 配置文件 └── tasks.py # 任務函數 首先是 celery.py #!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import from celery import Celery app = Celery('proj', include=['proj.tasks']) app.config_from_object('proj.config') if __name__ == '__main__': app.start() 這一次建立 app,並無直接指定 broker 和 backend。而是在配置文件中。 config.py #!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6' 剩下的就是tasks.py #!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import from proj.celery import app @app.task def add(x, y): return x + y 使用方法也很簡單,在proj的同一級目錄執行celery: celery -A proj worker -l info 如今使用任務也很簡單,直接在客戶端代碼調用 proj.tasks 裏的函數便可。 Scheduler 一種常見的需求是每隔一段時間執行一個任務。配置以下 config.py #!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6' CELERY_TIMEZONE = 'Asia/Shanghai' from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'proj.tasks.add', 'schedule': timedelta(seconds=30), 'args': (16, 16) }, } 注意配置文件須要指定時區。這段代碼表示每隔30秒執行 add 函數。 一旦使用了 scheduler, 啓動 celery須要加上-B 參數 celery -A proj worker -B -l info crontab 計劃任務固然也能夠用crontab實現,celery也有crontab模式。修改 config.py #!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6' CELERY_TIMEZONE = 'Asia/Shanghai' from celery.schedules import crontab CELERYBEAT_SCHEDULE = { # Executes every Monday morning at 7:30 A.M 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, } 總而言之,scheduler的切分度更細,能夠精確到秒。crontab模式就不用說了。固然celery還有更高級的用法,好比多個機器使用,啓用多個worker併發處理等。