Celery

什麼是Clelery

Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統redis

專一於實時處理的異步任務隊列數據庫

同時也支持任務調度django

Celery架構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。windows

消息中間件

Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等架構

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中併發

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等app

使用場景

異步任務:將耗時操做任務提交給Celeryu去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等異步

定時任務:定時執行某件事情,好比天天數據統計async

Celery的安裝配置

pip install celery分佈式

消息中間件:RabbitMQ/Redis

app=Celery('任務名',backend='xxx',broker='xxx')

Celery執行異步任務

建立py文件:celery_app_task.py

from celery import Celery
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/0' #最後的0是指定的庫
broker='redis://:123456@127.0.0.1:6379/1'
app = Celery('test',broker=broker,backend=backend) #必定要指定一個名字,每次實例化都要起一個名字

#任務實際上是一個函數
#須要用到一個裝飾器,表示該任務是被celery管理的,而且能夠用celery執行的
@app.task()
def add(x,y):
    import time
    time.sleep(2)
    return x+y

如今須要把任務提交到消息隊列裏,是另外一個程序須要新建一個add_task.py

#提交任務到消息隊列中
#只是把任務提交到消息隊列中,並無執行
ret = celery_task_s1.add.delay(4,5)
print(ret)
#c94054bb-9aac-425f-a4e2-9b315b2a2fe1返回了字符串,這就是任務的標識號,那這個去數據庫查

建立py文件:run.py,執行任務,或者使用命令執行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

在命令行輸入命令

而後數據庫就有結果了

而後查看任務執行結果建立celery_result.py

from celery.result import AsyncResult
from celery_task_s1 import app

async = AsyncResult(id="c94054bb-9aac-425f-a4e2-9b315b2a2fe1", app=app)

if async.successful():
    #取出return值
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

這就是分佈式

總結流程

-1 指定broker(消息中間件),指定backend(結果存儲)
-2 實例化產生一個Celery對象 app=Celery('名字',broker,backend)
-3 加裝飾器綁定任務,在函數(add)上加裝飾器app.task
-4 其餘程序提交任務,先導入add,add.delay(參,參數),會將該函數提交到消息中間件,可是並不會執行,有個返回值,直接print會打印出任務的id,之後用id去查詢任務是否執行完成
-5 啓動worker去執行任務:celery worker -A celery_task_s1 -l info   windows下:celery worker -A celery_task_s1 -l info -P eventlet
-6 查看結果:根據id去查詢   

多任務結構

pro_cel
    ├── celery_task# celery相關文件夾
    │   ├── celery.py   # celery鏈接和配置相關文件,必須叫這個名字
    │   └── tasks1.py    #  全部任務函數
    │    └── tasks2.py    #  全部任務函數
    ├── check_result.py # 檢查結果
    └── send_task.py    # 觸發任務

celery.py

from celery import Celery

backend='redis://:123456@127.0.0.1:6379/1' #最後的0是指定的庫
broker='redis://:123456@127.0.0.1:6379/0'
app = Celery('test',broker=broker,backend=backend,
             include=['celery_task.order_task',
                      'celery_task.user_task'
                      ])
#必定要指定一個名字,每次實例化都要起一個名字
# 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類
#時區
# app.conf.timezone = 'Asia/Shanghai'
#是否使用UTC
# app.conf.enable_utf = False

order_task.py

from celery_task.celery import app

import time
@app.task
def order_add(x,y):
    time.sleep(1)
    return x+y

user_task.py

from celery_task.celery import app

import time
@app.task
def user_add(x,y):
    time.sleep(1)
    return x+y

add_task.py

from celery_task.order_task import order_add
from celery_task.user_task import user_add

ret = order_add.delay(5,6)
print(ret)
ret2 =user_add.delay(10,6)
print(ret2)

先執行add_task拿到惟一id存到redis裏的backend結果存儲而後再到命令行執行

-啓動worker,celery_task是包的名字
celery worker -A celery_task -l info -P eventle

查詢結果celery_result.py

from celery.result import AsyncResult
from celery_task.celery import app

async = AsyncResult(id="48da55ab-6109-4913-8f43-0d6a6ad4a174", app=app)

if async.successful():
    #取出return值
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

Celery執行定時任務

設定時間讓celery執行一個任務

好比幾點幾分幾秒執行某個任務,添加任務的時候

方式一:

v1 = datetime(2019, 7, 13, 9, 48, 56)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
#取出我要執行任務的時間對象,調用apply_async方法,args是參數,eta是執行的時間
result = celery_task_s1.add.apply_async(args=[1, 3], eta=v2)
print(result.id)

 

 

方式二:第二種取時間的方法,獲取當前之間,定時執行

ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
#取10秒以後的時間對象
time_delay = timedelta(seconds=3)
task_time = utc_ctime + time_delay

# 使用apply_async並設定時間
result = celery_task_s1.add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

 相似於contab的定時任務

from celery import Celery
from datetime import timedelta
from celery.schedules import crontab
backend='redis://:123456@127.0.0.1:6379/1' #最後的0是指定的庫
broker='redis://:123456@127.0.0.1:6379/0'
app = Celery('test',broker=broker,backend=backend,
             include=['celery_task.order_task',
                      'celery_task.user_task'
                      ])
#必定要指定一個名字,每次實例化都要起一個名字
# 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類
#時區
app.conf.timezone = 'Asia/Shanghai'
#是否使用UTC
app.conf.enable_utf = False
-----******-----
app.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執行tasks1下的test_celery函數
        'task': 'celery_task.order_task.order_add',
        # 每隔2秒執行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 傳遞參數
        'args': (5,6)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.order_task.order_add',
    #     # 每一年4月11號,8點42分執行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}

啓動一個beat:celery beat -A celery_task -l info

啓動work執行:celery worker -A celery_task -l info -P eventlet

Django中使用Celery

安裝包

celery==3.1.25
django-celery==3.1.20

因爲django-celery很差用因此直接把多任務結構中的celery_task文件複製到django目錄下

-******在celery的任務函數中不能直接調用django的環境,須要手動添加
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings")
import django
django.setup()

不多有人使用django的celery

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息