分佈式任務隊列Celery

分佈式任務隊列Celery

Celery (芹菜)是基於Python開發的分佈式任務隊列。它支持使用任務隊列的方式在分佈的機器/進程/線程上執行任務調度。python

結構

圖片描述

核心部件

  • brokerweb

    • 消息隊列,由第三方消息中間件完成sql

    • 常見有RabbitMQ, Redis, MongoDB等django

  • workerjson

    • 任務執行器flask

    • 能夠有多個worker進程後端

    • worker又能夠起多個queue來並行消費消息服務器

  • backendapp

    • 後端存儲,用於持久化任務執行結果異步

功能部件

  • beat

    • 定時器,用於週期性調起任務

  • flower

    • web管理界面

任務

基本用法是在程序裏引用celery,並將函數方法綁定到task

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

@app.task
def add(x, y):
    return x + y

而後調用相應方法便可(delay與apply_async都是異步調用)

from tasks import add
import time
result = add.delay(4,4)
    
while not result.ready():
  print "not ready yet"
  time.sleep(5)

print result.get()

因爲是採用消息隊列,所以任務提交以後,程序馬上返回一個任務ID。
以後能夠經過該ID查詢該任務的執行狀態和結果。

關聯任務

執行1個任務,完成後再執行第2個,第一個任務的結果作第二個任務的入參

add.apply_async((2, 2), link=add.s(16))
結果:2+2+16=20

還能夠作錯誤處理

@app.task(bind=True)
def error_handler(self, uuid):
    result = self.app.AsyncResult(uuid)
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          uuid, result.result, result.traceback))

  add.apply_async((2, 2), link_error=error_handler.s())

定時任務

讓任務在指定的時間執行,與下文敘述的週期性任務是不一樣的。

  • ETA, 指定任務執行時間,注意時區

  • countdown, 倒計時,單位秒

from datetime import datetime, timedelta
tomorrow = datetime.utcnow() + timedelta(seconds=3)
add.apply_async((2, 2), eta=tomorrow)
result = add.apply_async((2, 2), countdown=3)

tip

  • 任務的信息是保存在broker中的,所以關閉worker並不會丟失任務信息

  • 回收任務(revoke)並不是是將隊列中的任務刪除,而是在worker的內存中保存回收的任務task-id,不一樣worker之間會自動同步上述revoked task-id。

  • 因爲信息是保存在內存當中的,所以若是將全部worker都關閉了,revoked task-id信息就丟失了,回收過的任務就又能夠執行了。要防治這點,須要在啓動worker時指定一個文件用於保存信息

celery -A app.celery worker --loglevel=info &> celery_worker.log --statedb=/var/tmp/celery_worker.state

過時時間

expires單位秒,超過過時時間還未開始執行的任務會被回收

add.apply_async((10, 10), expires=60)

重試

  • max_retries:最大重試次數

  • interval_start:重試等待時間

  • interval_step:每次重試疊加時長,假設第一重試等待1s,第二次等待1+n秒

  • interval_max:最大等待時間

    add.apply_async((2, 2), retry=True, retry_policy={
            'max_retries': 3,
            'interval_start': 0,
            'interval_step': 0.2,
            'interval_max': 0.2,
        })

序列化

將任務結果按照必定格式序列化處理,支持pickle, JSON, YAML and msgpack

add.apply_async((10, 10), serializer='json')

壓縮

將任務結果壓縮

add.apply_async((2, 2), compression='zlib')

任務路由

使用-Q參數爲隊列(queue)命名,而後調用任務時能夠指定相應隊列

$ celery -A proj worker -l info -Q celery,priority.high    

add.apply_async(queue='priority.high')

工做流

按照必定關係一次調用多個任務

  • group: 並行調度

  • chain: 串行調度

  • chord: 相似group,但分header和body2個部分,header能夠是一個group任務,執行完成後調用body的任務

  • map: 映射調度,經過輸入多個入參來屢次調度同一個任務

  • starmap: 相似map,入參相似*args

  • chunks:將任務按照必定數量進行分組

group

from celery import group
>>> res = group(add.s(i, i) for i in xrange(10))()
>>> res.get(timeout=1)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

chain

>>> from celery import chain
# 2 + 2 + 4 + 8
>>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
>>> res.get()
16

能夠用|來表示chain
# ((4 + 16) * 2 + 4) * 8
>>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))

>>> res = c2()
>>> res.get()

chord

>>> from celery import chord
#1*2+2*2+...9*2
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
>>> res.get()
90

map

>>> from proj.tasks import add

>>> ~xsum.map([range(10), range(100)])
[45, 4950]

starmap

>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

chunks

>>> from proj.tasks import add

>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

週期性任務

週期性任務就是按照必定的時間檢查反覆執行的任務。前面描述的定時任務值的是一次性的任務。
程序中引入並配置好週期性任務後,beat進程就會按期調起相關任務

beat進程是須要單獨啓動的

$ celery -A proj beat

或者在worker啓動時一塊兒拉起

$ celery -A proj worker -B

注意一套celery只能啓一個beat進程

時區配置

因爲python中時間默認是utc時間,所以最簡便的方法是celery也用utc時區

CELERY_TIMEZONE = 'UTC'

這麼配置能夠保證任務調度的時間是準確的,但因爲服務器通常都配置時區,所以flower、以及日誌中的時間可能會有誤差

另一種方法,就是配置正確的時區

CELERY_TIMEZONE = 'Asia/Shanghai'

而後任務調起時,將時間帶入時區配置

local_tz = pytz.timezone(app.config['CELERY_TIMEZONE'])
format_eta = local_tz.localize(datetime.strptime(eta.strip(), '%Y/%m/%d %H:%M:%S'))
add.apply_async((2, 2),eta=format_eta)

週期性任務配置

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16)
    },
}

週期性任務配置crontab

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

總結

celery功能很是強大,與django、flask等web應用配合相得益彰,推薦給你們。

本文參考官方文檔

相關文章
相關標籤/搜索