python之celery使用詳解(二)

前言

前面咱們瞭解了celery的基本使用後,如今對其經常使用的對象和方法進行分析。html

Celery對象

核心的對象就是Celery了,初始化方法:python

class Celery(object):
    def __init__(self, main=None, loader=None, backend=None,
                 amqp=None, events=None, log=None, control=None,
                 set_as_current=True, accept_magic_kwargs=False,
                 tasks=None, broker=None, include=None, changes=None,
                 config_source=None, fixups=None, task_cls=None,
                 autofinalize=True, **kwargs):

# 經常使用的須要配置的參數   
main:若是做爲__main__運行,則爲主模塊的名稱。用做自動生成的任務名稱的前綴
loader:當前加載器實例。
backend:任務結果url;
amqp:AMQP對象或類名,通常無論;
log:日誌對象或類名;
set_as_current:將本實例設爲全局當前應用
tasks:任務註冊表。
broker:使用的默認代理的URL,任務隊列;
include:每一個worker應該導入的模塊列表,以實例建立的模塊的目錄做爲起始路徑;

這些參數都是celery實例化的配置,咱們也能夠不寫,而後使用config_from_object方法加載配置;web

建立異步任務的方法task

任何被task修飾的方法都會被建立一個Task對象,變成一個可序列化併發送到遠程服務器的任務;它有多種修飾方式:redis

  • 使用默認的參數
@celery.task
def function_name():
    pass
  • 指定相關參數
@celery.task(bind=True, name='name')
def function_name():
    pass

# task方法參數
name:能夠顯式指定任務的名字;默認是模塊的命名空間中本函數的名字。
serializer:指定本任務的序列化的方法;
bind:一個bool值,設置是否綁定一個task的實例,若是綁定,task實例會做爲參數傳遞到任務方法中,能夠訪問task實例的全部的屬性,即前面反序列化中那些屬性
base:定義任務的基類,能夠以此來定義回調函數,默認是Task類,咱們也能夠定義本身的Task類
default_retry_delay:設置該任務重試的延遲時間,當任務執行失敗後,會自動重試,單位是秒,默認3分鐘;
autoretry_for:設置在特定異常時重試任務,默認False即不重試;
retry_backoff:默認False,設置重試時的延遲時間間隔策略;
retry_backoff_max:設置最大延遲重試時間,默認10分鐘,若是失敗則再也不重試;
retry_jitter:默認True,即引入抖動,避免重試任務集中執行;
# 當bind=True時,add函數第一個參數是self,指的是task實例
@task(bind=True)  # 第一個參數是self,使用self.request訪問相關的屬性
def add(self, x, y):
    try:
        logger.info(self.request.id)
    except:
        self.retry() # 當任務失敗則進行重試
  • 自定義Task基類
import celery

class MyTask(celery.Task):
    # 任務失敗時執行
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))
    # 任務成功時執行
    def on_success(self, retval, task_id, args, kwargs):
        pass
    # 任務重試時執行
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        pass

@task(base=MyTask)
def add(x, y):
    raise KeyError()

#方法相關的參數
exc:失敗時的錯誤的類型;
task_id:任務的id;
args:任務函數的參數;
kwargs:鍵值對參數;
einfo:失敗或重試時的異常詳細信息;
retval:任務成功執行的返回值;

Task的通常屬性

Task.name:任務名稱;
Task.request:當前任務的信息;
Task.max_retries:設置重試的最大次數
Task.throws:預期錯誤類的可選元組,不該被視爲實際錯誤,而是結果失敗;
Task.rate_limit:設置此任務類型的速率限制
Task.time_limit:此任務的硬限時(以秒爲單位)。
Task.ignore_result:不存儲任務狀態。默認False;
Task.store_errors_even_if_ignored:若是True,即便任務配置爲忽略結果,也會存儲錯誤。
Task.serializer:標識要使用的默認序列化方法的字符串。
Task.compression:標識要使用的默認壓縮方案的字符串。默認爲task_compression設置。
Task.backend:指定該任務的結果存儲後端用於此任務。
Task.acks_late:若是設置True爲此任務的消息將在任務執行後確認 ,而不是在執行任務以前(默認行爲),即默認任務執行以前就會發送確認;
Task.track_started:若是True任務在工做人員執行任務時將其狀態報告爲「已啓動」。默認是False;

調用異步任務

調用異步任務有三個方法,以下:json

task.delay():這是apply_async方法的別名,但接受的參數較爲簡單;
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value}):能夠接受複雜的參數
send_task():能夠發送未被註冊的異步任務,即沒有被celery.task裝飾的任務;

1. app.send_task

# tasks.py
from celery import Celery
app = Celery()
def add(x,y):
    return x+y

app.send_task('tasks.add',args=[3,4])  # 參數基本和apply_async函數同樣
# 可是send_task在發送的時候是不會檢查tasks.add函數是否存在的,即便爲空也會發送成功,因此celery執行是可能找不到該函數報錯;

2. Task.delay

delay方法是apply_async方法的簡化版,不支持執行選項,只能傳遞任務的參數。flask

@app.task
def add(x, y, z=0):
    return x + y

add.delay(30,40,z=5) # 包括位置參數和關鍵字參數

3. Task.apply_async

apply_async支持執行選項,它會覆蓋全局的默認參數和定義該任務時指定的執行選項,本質上仍是調用了send_task方法;後端

add.apply_async(args=[30,40], kwargs={'z':5})

# 其餘參數
task_id:爲任務分配惟一id,默認是uuid;
countdown : 設置該任務等待一段時間再執行,單位爲s;
eta : 定義任務的開始時間;eta=time.time()+10;
expires : 設置任務時間,任務在過時時間後尚未執行則被丟棄;
retry : 若是任務失敗後, 是否重試;使用true或false,默認爲true
shadow:從新指定任務的名字str,覆蓋其在日誌中使用的任務名稱;
retry_policy : {},重試策略.以下:
    max_retries : 最大重試次數, 默認爲 3 次.
    interval_start : 重試等待的時間間隔秒數, 默認爲 0 , 表示直接重試不等待.
    interval_step : 每次重試讓重試間隔增長的秒數, 能夠是數字或浮點數, 默認爲 0.2
    interval_max : 重試間隔最大的秒數, 即 經過 interval_step 增大到多少秒以後, 就不在增長了, 能夠是數字或者浮點數, 默認爲 0.2 .

routing_key:自定義路由鍵;
queue:指定發送到哪一個隊列;
exchange:指定發送到哪一個交換機;
priority:任務隊列的優先級,0到255之間,對於rabbitmq來講0是最高優先級;
serializer:任務序列化方法;一般不設置;
compression:壓縮方案,一般有zlib, bzip2
headers:爲任務添加額外的消息;
link:任務成功執行後的回調方法;是一個signature對象;能夠用做關聯任務;
link_error: 任務失敗後的回調方法,是一個signature對象;

# 以下
add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})
  • 自定義發佈者,交換機,路由鍵, 隊列, 優先級,序列方案和壓縮方法:
task.apply_async((2,2), 
    compression='zlib',
    serialize='json',
    queue='priority.high',
    routing_key='web.add',
    priority=0,
    exchange='web_exchange')

獲取任務結果和狀態

因爲celery發送的都是去其餘進程執行的任務,若是須要在客戶端監控任務的狀態,有以下方法:服務器

r = task.apply_async()
r.ready()     # 查看任務狀態,返回布爾值,  任務執行完成, 返回 True, 不然返回 False.
r.wait()      # 會阻塞等待任務完成, 返回任務執行結果,不多使用;
r.get(timeout=1)       # 獲取任務執行結果,能夠設置等待時間,若是超時但任務未完成返回None;
r.result      # 任務執行結果,未完成返回None;
r.state       # PENDING, START, SUCCESS,任務當前的狀態
r.status      # PENDING, START, SUCCESS,任務當前的狀態
r.successful  # 任務成功返回true
r.traceback  # 若是任務拋出了一個異常,能夠獲取原始的回溯信息

可是通常業務中不多用到,由於獲取任務執行的結果須要阻塞,celery使用場景通常是不關心結果的。併發

使用celery

# seting.py
# 設置配置
BROKER_URL =  'amqp://username:password@localhost:5672/yourvhost'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'msgpack'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ["msgpack"]
CELERY_DEFAULT_QUEUE = "default"   
CELERY_QUEUES = {
    "default": { # 這是上面指定的默認隊列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    }
}

# app.py --- 初始化celery對象 
from celery import Celery
import seting
from task import test_one, test_two

celery = Celery(__name__, include=["task"]) # 設置須要導入的模塊
# 引入配置文件
celery.config_from_object(seting)

if __name__ == '__main__':
    test_one.apply_async((2,2), 
        routing_key='default',
        priority=0,
        exchange='default')

# task.py  --- 定義須要執行的任務
from app import celery

@celery.task
def test_one(x, y):
    return x + y

@celery.task(name="one_name")
def test_two(x, y):
    return x * y

小結

分析了celery任務一些方法參數和相關源碼,接下來咱們去研究celery更復雜的用法。app

參考

  • http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-options

  • http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html

  • http://www.pythondoc.com/flask-celery/first.html

  • https://blog.csdn.net/kk123a/article/details/74549117

  • https://blog.csdn.net/preyta/article/details/54288870

相關文章
相關標籤/搜索