Celery分佈式異步任務框架

執行celery: 

python manage.py celery -A TSDRM work -l info
python manage.py celery -A TSDRM flower -l info

測試項目中的redis是否鏈接:

 

1、什麼是Celery

Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統。專一於實時處理的異步任務隊列,同時也支持定時任務python

 

2、Celery架構

一、Celery的架構由三部分組成:

  消息中間件(message broker):redis

  任務執行單元(worker):數據庫

  任務執行結果存儲(task result store):django

user是提交任務的人,user把任務提交到消息中間件broker中,好比用戶提交個3+5的任務,這個程序只負責提交,另外一個程序負責執行。windows

workers負責執行代碼,在消息中間件取一個一個的任務去執行,執行的結果須要存儲在task result store中,用戶user只須要去存儲結果的task result store中看一下就知道這個任務有沒有執行。架構

二、組成部分介紹

消息中間件:Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等併發

任務執行單元:Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。app

任務結果存儲:Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等異步

三、版本支持狀況

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows.
   Please don’t open any issues related to that platform.

四、場景

異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理,統計某個商品近半年的銷量生成折線圖等等

定時任務:定時執行某件事情,好比天天數據統計

場景1:用戶提交一個任務,把視頻轉成MP4格式。傳上去以後轉格式須要耗費很長時間。因此就能夠把任務提交到消息中間件裏,只返回一個消息告訴用戶,視頻正在轉,你該幹嗎就幹嗎去。開啓workes執行傳視頻的任務,視頻轉完以後,把結果放在結果存儲store中。而後用戶再發請求去結果存儲中看一下視頻有沒有轉好。async

場景2:註冊發郵件。一個用戶一旦註冊了以後。把發郵件的任務提交到消息中間件裏。異步調用發郵件發郵件。因此當用戶註冊成功以後,後臺有郵件正在發。

場景3:秒殺商品,把秒殺的任務提交到任務隊列中,起個worker去執行。

一點擊搶購,顯示你正在排隊中。正常的邏輯是,點擊搶購,會到數據庫裏查看是否還有此商品,有的話,數據庫商品就減一,生成一個訂單返回給用戶,用戶進行支付。可是,你有沒有想過,秒殺的時候,一瞬間來了好多好多人,這時候就會考慮到程序運行效率問題,解決方法是會在秒殺商品的時候提早把數據放在redis中,不在直接操做數據庫而是redis。若是程序還不行的時候,就會對任務作緩衝,來我的把任務提交上去,來我的把任務提交上去,後面開啓多個workers來執行任務,這就至關於把整個任務作成異步的,任務提交完成立馬返回,你正在排隊。

 場景4:利用Celery能夠作頁面靜態化。首頁常常被訪問,每次都要查數據庫很是耗時,能夠把首頁作成靜態頁面,數據是寫死的,誰過來訪問,就能看到首頁。

 

3、Celery執行異步任務

安裝依賴

pip3 install celery
pip3 install redis

一、基本使用

①建立celeryDemo項目

②建立任務:新建py文件:celery_task_s1.py

from celery import Celery
# 指定broker(消息中間件),指定backend(結果儲存)
backend = 'redis://127.0.0.1:6379/0'
broker = 'redis://127.0.0.1:6379/1'   # 0,1指定的是redis中的庫

#
實例化產生一個Celery對象,必定要指定一個名字 app = Celery('test', broker=broker, backend=backend) # 指定任務,就是一個函數 # 須要用一個裝飾器綁定任務,表示該任務是被celery管理的,而且能夠用celery執行的 @app.task def add(x, y): import time time.sleep(3) return x+y

③提交任務到消息中間件中,獲取任務id:新建py文件:add_task.py

# 用於提交任務的py文件
import celery_task_s1
# res = celery_task_s1.add(3, 7)  # 正常同步執行任務
# print(res)  # 打印結果爲 10

# 提交任務到消息中間件中
res = celery_task_s1.add.delay(3, 7)  # 只是把任務提交到消息中間件中,並無執行
print(res)   # 6e6464e8-3b0a-49f9-ad92-824725c2a573 任務的id號

③任務提交完,啓動worker執行任務

注意:windows下啓動須要安裝依賴

pip3 install eventlet

命令執行:celery worker -A celery_task_s1 -l info

windows下:celery worker -A celery_task_s1 -l info -P eventlet

④獲取任務執行結果:新建py文件:celery_result.py

from celery.result import AsyncResult
from celery_task_s1 import app

async = AsyncResult(id="6e6464e8-3b0a-49f9-ad92-824725c2a573", app=app)   # id是worker執行任務後拿到的id

if async.successful():
    # 取出它return的值
    result = async.get()
    print(result)     # 執行成功,打印的結果爲10
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

 

二、多任務結構

celery_mul_Demo
    ├── celery_task        # celery相關文件夾
    │   ├── celery.py      # celery鏈接和配置相關文件,必須叫這個名字
    │   └── order_task.py   #  全部任務函數
    │   └── user_task.py    #  全部任務函數
    ├── add_task.py        # 提交任務
    └── celery_result.py    # 查看結果

①建立任務:

新建py文件:celery.py(必須叫這個名字)  

# 這個文件必須叫celery,生成celery對象

from celery import Celery

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery('test', broker=broker, backend=backend,
             # 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類
             include=['celery_task.order_task',
                      'celery_task.user_task',
                      ]
             )

新建py文件:order_task.py

from celery_task.celery import app

@app.task
def order_add(x, y):
    import time
    time.sleep(3)
    return x+y

新建py文件:user_task.py

from celery_task.celery import app

@app.task
def user_add(x, y):
    import time
    time.sleep(3)
    return x+y

②提交任務:新建py文件:add_task.py

from celery_task.order_task import order_add
from celery_task.user_task import user_add

# res = order_add.delay(10, 20)   # 打印結果爲 26a025a6-f910-4e71-8173-c119c665df84
res = user_add.delay(100, 200)    # 打印結果爲 c67e5176-3972-49c2-9e13-13f7d6eea4af
print(res)

③任務提交完,啓動worker執行任務

windows下:celery worker -A celery_task -l info -P eventlet

等待提交任務

提交任務order_task

提交任務user_task

④獲取任務執行結果:新建py文件:celery_result.py

from celery.result import AsyncResult
from celery_task.celery import app

async = AsyncResult(id="c67e5176-3972-49c2-9e13-13f7d6eea4af", app=app)   # id是worker執行user_task任務後拿到的id

if async.successful():
    # 取出它return的值
    result = async.get()
    print(result)     # 執行成功,打印的結果爲300
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

 

4、celery執行定時任務

一、指定的幾時幾點幾分執行任務

①建立celeryDemo項目

②建立任務:新建py文件:celery_task_s1.py

from celery import Celery
# 指定broker(消息中間件),指定backend(結果儲存)
backend = 'redis://127.0.0.1:6379/0'
broker = 'redis://127.0.0.1:6379/1'   # 0,1指定的是redis中的庫

# 實例化產生一個Celery對象,必定要指定一個名字
app = Celery('test', broker=broker, backend=backend)

# 指定任務,就是一個函數
# 須要用一個裝飾器綁定任務,表示該任務是被celery管理的,而且能夠用celery執行的
@app.task
def add(x, y):
    import time
    time.sleep(3)
    return x+y

③提交任務到消息中間件中,獲取任務id:新建py文件:add_task.py

# 用於提交任務的py文件
import celery_task_s1

# 執行定時任務:指定時間執行add_task任務
from datetime import datetime
v1 = datetime(2019, 7, 12, 20, 59, 56)   # 指定的幾時幾點幾分執行任務
print(v1)   # 2019-07-12 20:59:56
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)  # 2019-07-12 12:59:56
# 取出要執行任務的時間對象,調用apply_async方法,args是參數,eta是執行的時間
result = celery_task_s1.add.apply_async(args=[1, 3], eta=v2)
print(result.id)   # e28be60b-1546-4b92-987f-dfbf357e623f

③任務提交完,啓動worker執行任務

windows下:celery worker -A celery_task_s1 -l info -P eventlet

指定的時間就會執行任務獲得結果

二、天天的何時執行任務,天天到這個時間點都會執行,循環執行的

①建立項目celery_mul_Demo

②建立任務:新建py文件:celery.py(必須叫這個名字)  

# 這個文件必須叫celery,生成celery對象


from celery import Celery

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery('test', broker=broker, backend=backend,
             include=['celery_task.order_task',
                      ]
             )

from datetime import timedelta

app.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執行tasks1下的test_celery函數
        'task': 'celery_task.order_task.order_add',
        # 每隔3秒執行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=3),
        # 傳遞參數
        'args': (5, 6)
    },
    # 每一年什麼時間執行什麼
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.order_task.order_add',
    #     # 每一年4月11號,8點42分執行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}

新建py文件:order_task.py

from celery_task.celery import app

@app.task
def order_add(x, y):
    import time
    time.sleep(3)
    return x+y

③這裏必須啓動一個beat:celery beat -A celery_task -l info

再啓動work執行:celery worker -A celery_task -l info -P eventlet

 

5、在django中使用celery

①建立項目:django_celery_Demo

②把多任務結構寫的文件夾celery_task直接拷貝到項目中

③views.py

from celery_task.user_task import user_add

def index(request):
    res = user_add.delay(3, 7)
    return HttpResponse(res.id)

def check_result(request):
    res = request.GET.get('id')
    from celery.result import AsyncResult
    from celery_task.celery import app
    async = AsyncResult(id=res, app=app)  # id是worker執行任務後拿到的id
    if async.successful():
        # 取出它return的值
        res = async.get()
        print(res)
        return HttpResponse('ok')

④url.py

url(r'^index/', views.index),
url(r'^check/', views.check_result),

⑤在命令行中先啓動worker:celery worker -A celery_task -l info -P eventlet

⑥啓動項目

 注意:在celery的任務函數中不能直接調用django的環境,須要手動添加

import os
if __name__ == '__main__':
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings")
    import django
    django.setup()

    from app01 import models

    books = models.Book.objects.all()
    print(books)
相關文章
相關標籤/搜索