Celery

Celery 是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。它是一個專一於實時處理的任務隊列,同時也支持任務調度html

安裝celery

pip install celery

建立一個celery實例

建立s1文件node

from celery import Celery

#tasks:當前模塊的名稱,可隨意寫,可是必須存在
#broker:指定要使用的消息中間件,
#中間件可參考:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#id4
app=Celery("tasks",broker="redis://10.0.0.23:6379/1")

@app.task
def add():
    return "add"
@app.task
def add1():
    return "add1"

運行celery服務python

celery worker -A s1 -l info   #s1爲文件名 info:日誌級別

調用任務(異步任務)

建立一個新文件,使用 delay() 方法來調用任務linux

from s1 import add

s=add.delay()
print(s)

此時s1文件會報錯,由於在celery4.0後再也不執行windows系統,能夠安裝eventlet來解決報錯,當讓也能夠使用4.0之前的版本redis

Traceback (most recent call last):
  File "c:\program files\python36\lib\site-packages\billiard\pool.py", line 358, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "c:\program files\python36\lib\site-packages\celery\app\trace.py", line 544, in _fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)

安裝eventlet

pip install eventlet

指定eventlet啓動數據庫

celery worker -A s1 -l info -P eventlet #windows下須要使用 -P eventlet

保存結果

from celery import Celery
#使用backend保存結果,這裏使用了redis,推薦使用RabbitMQ
app=Celery("tasks",broker="redis://10.0.0.23:6379/1",backend="redis://10.0.0.23:6379/2")

@app.task
def add():
    return "add"
@app.task
def add1():
    return "add1"

查看redis數據庫django

[root@node1 ~]# redis-cli -h 10.0.0.23
10.0.0.23:6379> SELECT 2
10.0.0.23:6379[2]> KEYS *
 1) "celery-task-meta-908f60a1-2c23-4403-ab89-518691d76f48"
 2) "celery-task-meta-8172677a-a5ad-48ac-93e6-8473ba564766"
 3) "celery-task-meta-4e116ff9-f51f-4797-beb0-b106ce609bac"
 4) "celery-task-meta-282dafcf-1bca-4414-bbc1-87759140bcd6"
 5) "celery-task-meta-6fe880a6-e9e2-4609-9fc3-6d284169c8f7"
 6) "celery-task-meta-1ad7acd9-bdf7-4a55-a7ca-4a7157975036"
 7) "celery-task-meta-6382ea5d-e328-427b-be22-4a6cca74078a"
 8) "celery-task-meta-7a260767-6246-457f-aa91-5598007a7dc2"
 9) "celery-task-meta-8d7b6686-ed63-473b-b4c9-db3ebc807421"
10) "celery-task-meta-29acbc9d-539c-4db3-b07b-7ce6b7365825"

查看數據windows

10.0.0.23:6379[2]> get celery-task-meta-908f60a1-2c23-4403-ab89-518691d76f48
"{\"status\": \"SUCCESS\", \"result\": \"add\", \"traceback\": null, \"children\": [], \"task_id\": \"908f60a1-2c23-4403-ab89-518691d76f48\", \"date_done\": \"2019-04-16T12:26:04.267013\"}"

celery獲取返回值

from celery.result import AsyncResult #導入AsyncResult
from s1 import add,app #導入s1文件中的app
for i in range(10):
    s = add.delay()
    r = AsyncResult(id=s.id,app=app)
    print(r.get())

帶參數的返回值

s1文件app

from celery import Celery
#使用backend保存結果,這裏使用了redis,推薦使用RabbitMQ
app=Celery("tasks",broker="redis://10.0.0.23:6379/1",backend="redis://10.0.0.23:6379/2")

@app.task
def add(a,b):
    return ("add",a+b)
@app.task
def add1():
    return "add1"

s2文件異步

from celery.result import AsyncResult #導入AsyncResult
from s1 import add,app #導入s1文件中的app
for i in range(10):
    s = add.delay(3,5)
    # r = AsyncResult(id=str(s),app=app) #能夠使用str
    r = AsyncResult(id=s.id,app=app) #也能夠使用s.id
    print(r.get()) #返回值
    print(r.status) #獲取執行狀態
    print(r.successful()) # 獲取執行狀態

獲取報錯信息

from celery.result import AsyncResult #導入AsyncResult
from s1 import add,app #導入s1文件中的app

s = add.delay(3,5)
print(s.id)
r = AsyncResult(id=s.id,app=app) #也能夠使用s.id

#只獲取報錯信息
print(r.get(propagate=False))
#獲取源文件的報錯信息內容
print(r.traceback)

執行延時任務

apply_async

t=add.apply_async((1,2),countdown=5) #表示延遲5秒鐘執行任務
print(t)
print(t.get())
問題:是延遲5秒發送仍是當即發送,消費者延遲5秒在執行那?

支持的參數 :

  • countdown : 等待一段時間再執行.

    add.apply_async((2,3), countdown=5)
  • eta : 定義任務的開始時間.這裏的時間是UTC時間,這裏有坑

    add.apply_async((2,3), eta=now+tiedelta(second=10))
  • expires : 設置超時時間.

    add.apply_async((2,3), expires=60)
  • retry : 定時若是任務失敗後, 是否重試.

    add.apply_async((2,3), retry=False)
  • retry_policy : 重試策略.    

  • max_retries : 最大重試次數, 默認爲 3 次.
    interval_start : 重試等待的時間間隔秒數, 默認爲 0 , 表示直接重試不等待.
    interval_step : 每次重試讓重試間隔增長的秒數, 能夠是數字或浮點數, 默認爲 0.2
    interval_max : 重試間隔最大的秒數, 即 經過 interval_step 增大到多少秒以後, 就不在增長了, 能夠是數字或者浮點數, 默認爲 0.2 .

週期任務

from c import task
task.conf.beat_schedule={
    timezone='Asia/Shanghai',
    "each10s_task":{
        "task":"c.add",
        "schedule":3, # 每3秒鐘執行一次
        "args":(10,10)
    },

}

其實celery也支持linux裏面的crontab格式的書寫的

from celery.schedules import crontab
task.conf.beat_schedule={
     timezone='Asia/Shanghai',
    "each3m_task":{
        "task":"c.add",
        "schedule":crontab(minute=3), #每小時的第3分鐘執行
        "args":(10,10)
    },
     "each3m_task":{
        "task":"c.add",
        "schedule":crontab(minute=*/3), #每小時的第3分鐘執行
        "args":(10,10)
    },
}

啓動

celery best -A s2 -l info 

與django結合

執行異步任務

在項目目錄下,與settings統計的目錄中建立一個名爲celery的文件 ,在生成的目錄文件中添加celery文件,內容以下

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tests.settings') #與項目關聯 tests爲項目名稱

app = Celery('tests',backend='redis://10.211.55.19/3',broker='redis://10.211.55.19/4')
# app = Celery('tests',backend='redis://:password@10.211.55.19/3',broker='redis://:password@10.211.55.19/4') #若是redis存在密碼,password爲密碼
#建立celery對象
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
#在django中建立celery的命名空間
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
#自動加載任務

編輯settings.py同級目錄的init.py

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

在項目中添加tasks文件,用來保存tasks的文件   

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

添加views文件內容

from .tasks import add

def index(request):
    result = add.delay(2, 3)
    return HttpResponse('返回數據{}'.format(result.get()))

啓動worker

celery -A tests  worker -l info

添加url並調用

執行週期性任務

須要安裝一個django的組件來完成這個事情

pip install django-celery-beat

將django-celery-beat添加到INSTALLED_APPS裏面

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

刷新到數據庫

python3 manage.py makemigrations #不執行這個會有問題
python3 manage.py migrate

 admin配置

啓動beat

celery -A tests beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

 啓動worker

celery -A tests worker -l info 
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息