celery 分佈式異步任務框架(celery簡單使用、celery多任務結構、celery定時任務、celery計劃任務、celery在Django項目中使用Python腳本調用Django環境)

1、celery簡介:

Celery 是一個強大的 分佈式任務隊列 的 異步處理框架,它可讓任務的執行徹底脫離主程序,甚至能夠被分配到其餘主機上運行。咱們一般使用它來實現異步任務(async task)和定時任務(crontab)。linux

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。redis

能夠看到,Celery 主要包含如下幾個模塊:mongodb

  • 任務模塊 Task數據庫

    包含異步任務和定時任務。其中,異步任務一般在業務邏輯中被觸發併發往任務隊列,而定時任務由 Celery Beat 進程週期性地將任務發往任務隊列。django

  • 消息中間件 Brokerwindows

    Broker,即爲任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。Celery 自己不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。架構

  • 任務執行單元 Worker併發

    Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。app

  • 任務結果存儲 Backend框架

    Backend 用於存儲任務的執行結果,以供查詢。同消息中間件同樣,存儲也可以使用 RabbitMQ, redis 和 MongoDB 等。

 因此總結一下celery:它是一個處理大量消息的分佈式系統,能異步任務、定時任務,使用場景通常用於耗時操做的多任務或者定時性的任務

2、celery安裝與使用

pycharm安裝:

pip3 install celery

 初步使用:(建立一個Python項目)

① 實例化一個celery對象,使用該對象.task裝飾須要管理的任務函數:

# celery_task.py

from celery import Celery

"""
# 若是redis沒有設置密碼
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
"""
broker = 'redis://:12345@127.0.0.1:6379/1'
backend = 'redis://:12345@127.0.0.1:6379/2'
# c1是實例化產生的celery的名字,由於會存在多個celery
app = Celery('c1', broker=broker, backend=backend)

# 須要使用一個裝飾器,來管理該任務(函數)
@app.task
def add(x, y):
    import time
    time.sleep(1)
    return x + y

② 將裝飾的任務函數條件到消息隊列中,此時提交的任務函數並無執行,只是提交到worker,它會返回一個標識任務的字符串

# submit.task.py

# 用於提交任務
from celery_task import add
# 提交任務到消息隊列中,這裏只是將任務提交,並無執行

res = add. delay(3, 8)
print(res)
# 結果是標識任務的字符串(id號)
# 7811a028-428c-4dd5-9135-788e26e694a7

③ 使用命令啓動worker去剛纔提交的執行任務

linux: celery worker -A celery_task -l info   
windows下:celery worker -A celery_task -l info -P eventlet

 

④ 查看結果,根據提交任務返回的字符串去查詢

# check_res.py

from celery.result import AsyncResult
from celery_task import app

async = AsyncResult(id='bd600820-9366-4220-a679-3e435ae91e71', app=app)

if async.successful():
    result = async.get()
    print(result)

elif async.failed():
    print('執行失敗')

elif async.status == 'PENDING':
    print('任務等待中')

elif async.status == 'RETRY':
    print('任務異常後重試')

elif async.status == 'STARTED':
    print('任務正在執行')

celery簡單使用流程:

-celery的使用
    -pip3 install celery
    -寫一個py文件:celery_task
        -1 指定broker(消息中間件),指定backend(結果存儲)
        -2 實例化產生一個Celery對象 app=Celery('名字',broker,backend)
        -3 加裝飾器綁定任務,在函數(add)上加裝飾器app.task
        -4 其餘程序提交任務,先導入add,add.delay(參,參數),會將該函數提交到消息中間件,可是並不會執行,有個返回值,直接print會打印出任務的id,之後用id去查詢任務是否執行完成
        -5 啓動worker去執行任務:
        linux: celery worker -A celery_task_s1 -l info   
        windows下:celery worker -A celery_task_s1 -l info -P eventlet
        -6 查看結果:根據id去查詢
            async = AsyncResult(id="bd600820-9366-4220-a679-3e435ae91e71", app=app)
            if async.successful():
                #取出它return的值
                result = async.get()
                print(result)

celery的多任務

# celery的多任務結構
    -項目結構:
        pro_cel
            ├── celery_task# celery相關文件夾
            │   ├── celery.py   # celery鏈接和配置相關文件,必須叫這個名字
            │   └── tasks1.py    #  全部任務函數
            │   └── tasks2.py    #  全部任務函數
            ├── check_result.py # 檢查結果
            └── send_task.py    # 觸發任務
    -啓動worker,celery_task是包的名字
        celery worker -A celery_task -l info -P eventlet

 按照多任務文件結構建立文件:

注意celery.py這個文件的文件名是固定的,不能改,task_1和task_2能夠本身定義,他倆表明自定義的任務分類,還能夠再建立task_3。。。等其它名字的任務文件,send_task.py是提交任務到worker,check_result.py是查看結果的

# celery.py

from celery import Celery
broker = 'redis://:12345@127.0.0.1:6379/1'
backend = 'redis://:12345@127.0.0.1:6379/2'
# c1是實例化產生的celery的名字,由於會存在多個celery
app = Celery('c1', broker=broker, backend=backend,
             # 包含一些2個任務文件,去相應的py文件找任務,對多個任務進行分類
             include=[
                 'celery_task.task_1',
                 'celery_task.task_2',
             ])


# celery提供一些配置,具體可查看官方文檔
# app.conf.timezone = 'Asia/Shanghai'

在send_task.py種右鍵運行,提交任務到worker(這裏打印了提交的2個任務的id)

# task_1.py
from celery_task.celery import app

@app.task
def add1(x, y):
    import time
    time.sleep(0.5)
    return x + y

# task_2.py
from celery_task.celery import app

@app.task
def add2(x, y):
    import time
    time.sleep(1)
    return x * y
# send_task.py
from celery_task.task_1 import add1
from celery_task.task_2 import add2


res1 = add1.delay(3, 8)
print(res1)   # 16e847f3-fc14-4391-89e2-e2b3546872cf

res2 = add2.delay(4, 9)
print(res2)   # 858c0ae5-8516-4473-8be5-7501fb856ff4

啓動worker,celery_task是包的名字
celery worker -A celery_task -l info -P eventlet

而後將打印的2個id在check_result.py中進行查詢結果

# check_reslut.py
from celery.result import AsyncResult
from celery_task.celery import app

for i in ['16e847f3-fc14-4391-89e2-e2b3546872cf', '858c0ae5-8516-4473-8be5-7501fb856ff4']:
    async = AsyncResult(id=i, app=app)    
    if async.successful():
        result = async.get()
        print(result)
    
    elif async.failed():
        print('執行失敗')
    
    elif async.status == 'PENDING':
        print('任務等待中')
    
    elif async.status == 'RETRY':
        print('任務異常後重試')
    
    elif async.status == 'STARTED':
        print('任務正在執行')

 

celery的定時任務

方式一:執行時間在年月日時分秒

在提交任務的地方修改:

# send_task.py

from celery_task.task_1 import add1
from celery_task.task_2 import add2
# 執行定時任務,3s之後執行add一、add2任務
from datetime import datetime
# 設置任務執行時間2019年7月12日21點45分12秒
v1 = datetime(2019, 7, 12, 21, 48, 12)
print(v1)  # 2019-07-12 21:45:12
# 將v1時間轉成utc時間
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)  # 2019-07-12 13:45:12
# 取出要執行任務的時間對象,調用apply_async方法,args是任務函數傳的參數,eta是執行的時間
result1 = add1.apply_async(args=[3, 8], eta=v2)
result2 = add2.apply_async(args=[4, 9], eta=v2)
print(result1.id)
print(result2.id)

 

方式二:經過延遲執行的時間算出執行的具體utc時間,與方式一基本相同

在提交任務的地方修改:

# send_task.py

# 方式二:實際上和方法一相似,多了一個延遲時間,也就是用如今時間和推遲執行的時間計算出任務執行的最終utc時間
# 而後也是調用apply_async方法。
from datetime import datetime
ctime = datetime.now()
# 默認使用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
# 使用timedelta模塊,拿到10秒後的時間對象,這裏參數能夠傳秒、毫秒、微秒、分、小時、周、天
time_delay = timedelta(seconds=10)
# 獲得任務運行時間:
task_time = utc_ctime + time_delay
result1 = add1.apply_async(args=[3, 8], eta=task_time)
result2 = add2.apply_async(args=[4, 9], eta=task_time)
print(result1.id)
print(result2.id)

celery的計劃任務

計劃任務須要在celery.py中添加代碼,而後須要beat一下,才能將計劃開啓

# celery.py中

from celery import Celery
broker = 'redis://:12345@127.0.0.1:6379/1'
backend = 'redis://:12345@127.0.0.1:6379/2'
# c1是實例化產生的celery的名字,由於會存在多個celery
app = Celery('c1', broker=broker, backend=backend,
             # 包含一些2個任務文件,去相應的py文件找任務,對多個任務進行分類
             include=[
                 'celery_task.task_1',
                 'celery_task.task_2',
                 'celery_task.task_3',
             ])


# celery提供一些配置,具體可查看官方文檔
# app.conf.timezone = "Asia/Shanghai"
# app.conf.enable_utc = True


# 計劃任務
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'submit_every_2_seconds': {
        # 計劃的任務執行函數
        'task': 'celery_task.task_1.add1',
        # 每一個2秒執行一次
        'schedule': timedelta(seconds=2),
        # 傳遞的任務函數參數
        'args': (3, 9)
    },
    'submit_every_3_seconds': {
            # 計劃的任務執行函數
            'task': 'celery_task.task_2.add2',
            # 每一個3秒執行一次
            'schedule': timedelta(seconds=3),
            # 傳遞的任務函數參數
            'args': (4, 7)
    },
    'submit_in_fix_datetime': {
        'task': 'celery_task.task_3.add3',
        # 好比每一年的7月13日10點53分執行
        # 注意:默認使用utc時間,當前的時間中的小時必需要-8個小時纔會到點提交
        'schedule': crontab(minute=53, hour=2, day_of_month=13, month_of_year=7),
        
        '''
        # 若是不想-8,能夠先設置時區,再按正常時間設置
        app.conf.timezone = "Asia/Shanghai"
        app.conf.enable_utc = True
        '''
        'args': ('Hello World',)
    }

}

# 上面寫完後,須要起一個進程,啓動計劃任務
# celery beat -A celery_task -l info

# 啓動worker:
# celery worker -A celery_task -l info -P eventlet

 Django中使用celery

django-celery:因爲djang-celery模塊對版本的要求過於嚴格,並且容易出現不少bug,因此不建議使用

直接使用celery多任務結構的,將celery多任務結構的代碼文件夾celery_task拷貝到Django項目中,而後在視圖函數中進行任務提交、而後進行結構查看。(啓動項目時候記得將worker啓動起來,注意啓動路徑要跟你拷貝的celery_task文件同級)

注意:當咱們在Django項目中使用celery,在celery的任務函數中不能直接調用django的環境(好比orm方法查詢數據庫),須要添加代碼調用Django環境

在Python腳本中調用Django環境

import os
# 加載Django環境,bbs是所在的Django項目名稱
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'bbs.settings')
    # 引入Django模塊
    import django
    # 初始化Django環境
    django.setup()
    # 從app當中導入models
    from app01 import models
    # 調用操做,拿到數據庫中的全部Book數據對象
    books = models.Books.objects.all()
相關文章
相關標籤/搜索