原文:http://python.jobbole.com/87238/html
參考:https://zhuanlan.zhihu.com/p/22304455 python
Celery 是什麼?git
Celery 是一個由 Python 編寫的簡單、靈活、可靠的用來處理大量信息的分佈式系統,它同時提供操做和維護分佈式系統所需的工具。github
Celery 專一於實時任務處理,支持任務調度。redis
說白了,它是一個分佈式隊列的管理工具,咱們能夠用 Celery 提供的接口快速實現並管理一個分佈式的任務隊列。數據庫
Celery包含以下組件:promise
1. Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,週期性地將配置中到期須要執行的任務發送給任務隊列。服務器
2. Celery Worker:執行任務的消費者,一般會在多臺服務器運行多個消費者來提升執行效率。架構
3. Broker:消息代理,或者叫做消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(一般是消息隊列或者數據庫)。app
4. Producer:調用了Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
5. Result Backend:任務處理完後保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
(本文以 Celery4.0 爲基礎進行書寫)
首先,咱們要理解 Celery 自己不是任務隊列,它是管理分佈式任務隊列的工具,或者換一種說法,它封裝好了操做常見任務隊列的各類操做,咱們用它能夠快速進行任務隊列的使用與管理,固然你也能夠本身看 rabbitmq 等隊列的文檔而後本身實現相關操做都是沒有問題的。
Celery 是語言無關的,雖然它是用 Python 實現的,但他提供了其餘常見語言的接口支持。只是若是你剛好使用 Python 進行開發那麼使用 Celery 就天然而然了。
想讓 Celery 運行起來咱們要明白幾個概念:
brokers 中文意思爲中間人,在這裏就是指任務隊列自己,Celery 扮演生產者和消費者的角色,brokers 就是生產者和消費者存放/拿取產品的地方(隊列)
常見的 brokers 有 rabbitmq、redis、Zookeeper 等
顧名思義就是結果儲存的地方,隊列中的任務運行完後的結果或者狀態須要被任務發送者知道,那麼就須要一個地方儲存這些結果,就是 Result Stores 了
常見的 backend 有 redis、Memcached 甚至經常使用的數據均可以。
就是 Celery 中的工做者,相似與生產/消費模型中的消費者,其從隊列中取出任務並執行
就是咱們想在隊列中進行的任務咯,通常由用戶、觸發器或其餘操做將任務入隊,而後交由 workers 進行處理。
理解以上概念後咱們就能夠快速實現一個隊列的操做:
這裏咱們用 redis 當作 celery 的 broker 和 backend。
(其餘 brokers 與 backend 支持看這裏)
安裝 Celery 和 redis 以及 python 的 redis 支持:
1
2
3
|
apt-get install redis-server
pip install redis
pip install celery
|
這裏須要注意若是你的 celery 是 4.0 及以上版本請確保 python 的 redis 庫版本在 2.10.4 及以上,不然會出現 redis 鏈接 timeout 的錯誤,具體參考
而後,咱們須要寫一個task:
1
2
3
4
5
6
7
8
|
#tasks.py
from celery import Celery
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') #配置好celery的backend和broker
@app.task #普通函數裝飾爲 celery task
def add(x, y):
return x + y
|
OK,到這裏,broker 咱們有了,backend 咱們有了,task 咱們也有了,如今就該運行 worker 進行工做了,在 tasks.py 所在目錄下運行:
1
|
celery -A tasks worker --loglevel=info
|
意思就是運行 tasks 這個任務集合的 worker 進行工做(固然此時broker中尚未任務,worker此時至關於待命狀態)
最後一步,就是觸發任務啦,最簡單方式就是再寫一個腳本而後調用那個被裝飾成 task 的函數:
1
2
3
4
5
6
|
#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),這裏須要用 celery 提供的接口 delay 進行調用
while not result.ready():
time.sleep(1)
print 'task done: {0}'.format(result.get())
|
運行此腳本
delay 返回的是一個 AsyncResult 對象,裏面存的就是一個異步的結果,當任務完成時result.ready()
爲 true,而後用 result.get()
取結果便可。
到此,一個簡單的 celery 應用就完成啦。
通過快速入門的學習後,咱們已經可以使用 Celery 管理普通任務,但對於實際使用場景來講這是遠遠不夠的,因此咱們須要更深刻的去了解 Celery 更多的使用方式。
首先來看以前的task:
1
2
3
|
@app.task #普通函數裝飾爲 celery task
def add(x, y):
return x + y
|
這裏的裝飾器app.task
其實是將一個正常的函數修飾成了一個 celery task 對象,因此這裏咱們能夠給修飾器加上參數來決定修飾後的 task 對象的一些屬性。
首先,咱們可讓被修飾的函數成爲 task 對象的綁定方法,這樣就至關於被修飾的函數 add 成了 task 的實例方法,能夠調用 self 獲取當前 task 實例的不少狀態及屬性。
其次,咱們也能夠本身複寫 task 類而後讓這個自定義 task 修飾函數 add ,來作一些自定義操做。
任務執行後,根據任務狀態執行不一樣操做須要咱們複寫 task 的 on_failure、on_success
等方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# tasks.py
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print 'task done: {0}'.format(retval)
return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
print 'task fail, reason: {0}'.format(exc)
return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@app.task(base=MyTask)
def add(x, y):
return x + y
|
嗯, 而後繼續運行 worker:
1
|
celery -A tasks worker --loglevel=info
|
運行腳本,獲得:
再修改下tasks:
1
2
3
4
|
@app.task #普通函數裝飾爲 celery task
def add(x, y):
raise KeyError
return x + y
|
從新運行 worker,再運行 trigger.py:
能夠看到,任務執行成功或失敗後分別執行了咱們自定義的 on_failure、on_success
1
2
3
4
5
6
7
8
|
# tasks.py
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
logger.info(self.request.__dict__)
return x + y
|
而後從新運行:
執行中的任務獲取到了本身執行任務的各類信息,能夠根據這些信息作不少其餘操做,例如判斷鏈式任務是否到結尾等等。
關於 celery.task.request 對象的詳細數據能夠看這裏
實際場景中得知任務狀態是很常見的需求,對於 Celery 其內建任務狀態有以下幾種:
參數 | 說明 |
---|---|
PENDING | 任務等待中 |
STARTED | 任務已開始 |
SUCCESS | 任務執行成功 |
FAILURE | 任務執行失敗 |
RETRY | 任務將被重試 |
REVOKED | 任務取消 |
當咱們有個耗時時間較長的任務進行時通常咱們想得知它的實時進度,這裏就須要咱們自定義一個任務狀態用來講明進度並手動更新狀態,從而告訴回調當前任務的進度,具體實現:
1
2
3
4
5
6
7
8
9
10
|
# tasks.py
from celery import Celery
import time
@app.task(bind=True)
def test_mes(self):
for i in xrange(1, 11):
time.sleep(0.1)
self.update_state(state="PROGRESS", meta={'p': i*10})
return 'finish'
|
而後在 trigger.py 中增長:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# trigger.py
from task import add,test_mes
import sys
def pm(body):
res = body.get('result')
if body.get('status') == 'PROGRESS':
sys.stdout.write('\r任務進度: {0}%'.format(res.get('p')))
sys.stdout.flush()
else:
print '\r'
print res
r = test_mes.delay()
print r.get(on_message=pm, propagate=False)
|
而後運行任務:
Celery 進行週期任務也很簡單,只須要在配置中配置好週期任務,而後在運行一個週期任務觸發器( beat )便可:
新建 Celery 配置文件 celery_config.py:
1
2
3
4
5
6
7
8
9
10
11
12
|
# celery_config.py
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'ptask': {
'task': 'tasks.period_task',
'schedule': timedelta(seconds=5),
},
}
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
|
配置中 schedule
就是間隔執行的時間,這裏能夠用 datetime.timedelta 或者 crontab 甚至太陽系經緯度座標進行間隔時間配置,具體能夠參考這裏
若是定時任務涉及到 datetime 須要在配置中加入時區信息,不然默認是以 utc 爲準。例如中國能夠加上:
1
|
CELERY_TIMEZONE = 'Asia/Shanghai'
|
而後在 tasks.py 中增長要被週期執行的任務:
1
2
3
4
5
6
7
|
# tasks.py
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')
@app.task(bind=True)
def period_task(self):
print 'period task done: {0}'.format(self.request.id)
|
而後從新運行 worker,接着再運行 beat:
1
|
celery -A task beat
|
能夠看到週期任務運行正常~
有些任務可能需由幾個子任務組成,此時調用各個子任務的方式就變的很重要,儘可能不要以同步阻塞的方式調用子任務,而是用異步回調的方式進行鏈式任務的調用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(url, page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
|
1
|
fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
|
鏈式任務中前一個任務的返回值默認是下一個任務的輸入值之一 ( 不想讓返回值作默認參數能夠用 si() 或者 s(immutable=True) 的方式調用 )。
這裏的 s()
是方法 celery.signature()
的快捷調用方式,signature 具體做用就是生成一個包含調用任務及其調用參數與其餘信息的對象,我的感受有點相似偏函數的概念:先不執行任務,而是把任務與任務參數存起來以供其餘地方調用。
前面講了調用任務不能直接使用普通的調用方式,而是要用相似 add.delay(2, 2)
的方式調用,而鏈式任務中又用到了 apply_async
方法進行調用,實際上 delay
只是 apply_async
的快捷方式,兩者做用相同,只是 apply_async
能夠進行更多的任務屬性設置,好比 callbacks/errbacks 正常回調與錯誤回調、執行超時、重試、重試時間等等,具體參數能夠參考這裏
AsyncResult 主要用來儲存任務執行信息與執行結果,有點相似 tornado 中的 Future 對象,都有儲存異步結果與任務執行狀態的功能,對於寫 js 的朋友,它有點相似 Promise 對象,固然在 Celery 4.0 中已經支持了 promise 協議,只須要配合 gevent 一塊兒使用就能夠像寫 js promise 同樣寫回調:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import gevent.monkey
monkey.patch_all()
import time
from celery import Celery
app = Celery(broker='amqp://', backend='rpc')
@app.task
def add(x, y):
return x + y
def on_result_ready(result):
print('Received result for id %r: %r' % (result.id, result.result,))
add.delay(2, 2).then(on_result_ready)
|
要注意的是這種 promise 寫法如今只能用在 backend 是 RPC (amqp) 或 Redis 時。 而且獨立使用時須要引入 gevent 的猴子補丁,可能會影響其餘代碼。 官方文檔給的建議是這個特性結合異步框架使用更合適,例如 tornado、 twisted 等。
delay
與 apply_async
生成的都是 AsyncResult 對象,此外咱們還能夠根據 task id 直接獲取相關 task 的 AsyncResult: AsyncResult(task_id=xxx)
關於 AsyncResult 更詳細的內容,能夠參考這裏
利用 Celery 進行分佈式隊列管理、開發將會大幅提高開發效率,關於 Celery 更詳細的使用你們能夠去參考詳細的官方文檔