celery

 


Celery

1.什麼是Celery

Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統redis

專門處理實時處理的異步任務隊列數據庫

同時也支持任務調度django

2.Celery架構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成windows

消息中間件

Celery自己是不提供消息服務,可是能夠方便的和第三方提供的消息中間價集成,包括RabbitMQ,Redis等markdown

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。架構

任務結果存儲

Task result store用來存儲Worker執行的任務結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP,Redis併發

版本支持狀況

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

3.使用場景

異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信、郵件、消息推送、音視頻處理等等app

定時任務:定時執行某件事情,好比天天數據統計異步

4.Celery的安裝配置

pip install celery

消息中間件:RabbitMQ/Redis

app = Celery('任務名',backend='xxx',broker='xxx')

5.Celery執行異步任務

基本使用

建立項目celerytest

建立py文件:celery_app_task_.py

import celery
import time
broker = 'redis://127.0.0.1:6379/1' 
#消息中間件
#redis不加密,放到第1個庫中
# broker = 'reids://:123@127.0.0.1:6379/1' redis加密,密碼是123

#結果存儲
backend = 'reids://127.0.0.1:6379/2'  # redis不加密,放到第二個庫中

# 實例化產生一個Celery對象,必定要指定名字
cel = celery.Celery('test',backend=backend,broker=broker)

#任務其實就是一個函數
#須要用一個裝飾器裝飾,表示該任務是被celery管理的,而且能夠用celery執行的
@cel.task
def add(x,y):
    time.sleep(2)
    return x+y

提交任務的py文件:add_task.py

import celery_task_s1
res = celery_task_s1.add.delay(3,4)
print(res)
# res就是任務的id號

右擊運行提交任務,

啓動worker,使用命令執行:celery worker -A celery_task_s1 -l info

注意:windows下:celery worker -A celery_task_s1 -l info -P eventlet

存儲任務結果的py文件:celery_result.py

from celery.result import AsyncResult
from celery_task_s1 import cel
# 根據id去查詢它的執行結果
async = AsyncResult(id="a5ea035f-0cc3-44ba-b334-f5d7c7ce681d", app=cel)
if async.successful():
    #取出它return的值
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

執行add_task.py,添加任務,並獲取任務id

執行celery_result.py,檢查任務狀態並獲取結果

執行定時任務,3s鍾之後執行add任務

第一種獲取時間的方法

from datetime import datetime
v1 = datetime(2019,7,12,16,15,56)
print(v1)
#默認用UTC時間
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
result = celery_task_s1.add.apply_async(args=[1,3],eta=v2)
print(result,id)

第二種獲取時間的方法

from datetime import timedelta
ctime = datetime.now()
#默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=3)
task_time = utc_ctime + time_delay
result = celery_task_s1.add.apply_async(args=[2,8],eta=task_time)

多任務結構

# 項目結構
pro_cel
    ├── celery_task# celery相關文件夾
    │   ├── celery.py   # celery鏈接和配置相關文件,必須叫這個名字
    │   └── tasks1.py    # 全部任務函數
    │   └── tasks2.py    # 全部任務函數
    ├── check_result.py # 檢查結果
    └── add_task.py    # 觸發任務

celery.py文件

#必須叫celery,生成celery對象

from celery import Celery
from datetime import timedelta

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
cel = Celery('test',broker=broker,backend=backend,
             include=[
                 'celery_task.order_task',
                 'celery_task.user_task'
             ])

#時區
cel.conf.timezone = 'Asia/shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

order_task.py

from celery_task.celery import cel

@cel.task
def order_add(x,y):
    import time
    time.sleep(2)
    return x+y

user_task.py

from celery_task.celery import cel
@cel.task
def user_add(x,y):
    import time
    time.sleep(2)
    return x+y

add_task.py

from celery_task.order_task import order_add
from celery_task.user_task import user_add


res=order_add.delay(5,6)
print(res.id)
res=user_add.delay(10,60)
print(res.id)

celery_result.py

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="c8815fd0-c126-4fed-b908-805974761381", app=cel)

if async.successful():
    #取出它return的值
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
    # async.revoke(terminate=True) # 不管如今是何時,都要終止
    # async.revoke(terminate=False) # 若是任務尚未開始執行呢,那麼就能夠終止。 
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

添加任務(執行send_task.py),開啓work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)

定時任務:多任務結構中celery.py修改以下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
cel = Celery('test',broker=broker,backend=backend,
             include=[
                 'celery_task.order_task',
                 'celery_task.user_task'
             ])

#時區
cel.conf.timezone = 'Asia/shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字隨意命名
    'add_every_10_seconds':{
        'test':'celery_task.order_task.order_add',
        # 每隔2秒執行一次
        # 'sehedule':1.0,
        # 'schedule':crontab(minute="*/1"),
        'schedule':timedelta(seconds=2),
        # 傳遞參數
        'args':(5,6)
    },
    # 'add-every-12-seconds': {
    # 'task': 'celery_task.tasks1.test_celery',
    # 每一年4月11號,8點42分執行
    # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    # 'args': (16, 16)
    # },
}

啓動一個beat:celery beat -A celery_task -l info

啓動work執行:celery worker -A celery_task -l info -P eventlet

6.django中使用celery

多任務結構的celery_task文件夾直接拷到根目錄下

views.py中

from django.shortcuts import render,HttpResponse

from celery_task.user_task import user_add

def index(request):
    result = user_add.delay(8,9)
    return HttpResponse(result.id)

def check_result(request):
    res = request.GET.get('id')
    from celery.result import AsynResult
    from celery_task.celery import cel
    async = AsynResult(id=res,app=cel)
    if async.sucessful():
        result = async.get()
        print(result)
        return HttpResponse('ok')

注意:上述作完後別忘了啓動worker,再配置路由就能夠了

強調:在celery的任務函數中不能直接調用django的環境(也就是不能直接操做數據庫),須要手動添加

os.environ.setdefault('DJANGO_SETTINGS_MODEL','untitled15.settings')

import django

django.setup()

補充

pipreq:生成項目依賴的第三方包

項目依賴:pip3 install pipreqs

生成依賴文件:pipreqs ./

安裝依賴文件:pips install -r requirements.txt

方法和函數

他們是有區別的

方法:好比一個類的對象去調用類內部寫的函數就是方法

函數:類直接調用內部的函數就是函數

偏函數:先給函數傳一個值進去

from functions import partial
def test(x,y,z):
    return x+y+z

tsst = partial(test,1)
print(test(2,3))  # 先傳一個值進去,以後再傳只需傳兩個就能夠了

輪詢和長輪詢

輪詢:不停的發請求

長輪詢:hang一下子再發請求

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息