異步任務分發模塊Celery

Celery簡介

Celery是一個功能完備即插即用的任務隊列。它使得咱們不須要考慮複雜的問題,使用很是簡單。 celery適用異步處理問題,當遇到發送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操做,咱們可將其異步執行,這樣用戶不須要等待好久,提升用戶體驗。html

celery的特色是:python

  • 簡單,易於使用和維護,有豐富的文檔。
  • 高效,單個celery進程每分鐘能夠處理數百萬個任務。
  • 靈活,celery中幾乎每一個部分均可以自定義擴展。

celery很是易於集成到一些web開發框架中。web

使用場景簡介

咱們在作網站後端程序開發時,會碰到這樣的需求:用戶須要在咱們的網站填寫註冊信息,咱們發給用戶一封註冊激活郵件到用戶郵箱,若是因爲各類緣由,這封郵件發送所需時間較長,那麼客戶端將會等待好久,形成很差的用戶體驗。redis

咱們可使用異步任務分發系統Celery解決上面的問題。數據庫

咱們將耗時任務放到後臺異步執行。不會影響用戶其餘操做。除了註冊功能,例如上傳,圖形處理等等耗時的任務,均可以按照這種思路來解決。 如何實現異步執行任務呢?咱們可以使用celery.。django

celery除了剛纔所涉及到的異步執行任務以外,還能夠實現定時處理某些任務。 後端

Celery的概念、配置及使用

任務隊列是一種跨線程、跨機器工做的一種機制。服務器

任務隊列中包含稱做任務的工做單元。有專門的工做進程持續不斷的監視任務隊列,並從中得到新的任務並處理。app

celery經過消息進行通訊,一般使用一個叫Broker(中間人)來協client(任務的發出者)和worker(任務的處理者). clients發出消息到隊列中,broker將隊列中的信息派發給worker來處理。框架

一個celery系統能夠包含不少的worker和broker,可加強橫向擴展性和高可用性能。

安裝celery

直接使用pip包管理工具能夠安裝celery:

pip install celery

也可從官方直接下載安裝包:https://pypi.python.org/pypi/celery/

tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python3 setup.py build
python3 setup.py install

Broker的選擇

Celery須要一種解決消息的發送和接受的方式,咱們把這種用來存儲消息的的中間裝置叫作message broker, 也可叫作消息中間人。

做爲中間人,咱們有幾種方案可選擇:

RabbitMQ

RabbitMQ是一個功能完備,穩定的而且易於安裝的broker. 它是生產環境中最優的選擇。使用RabbitMQ的細節參照如下連接: http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq

若是咱們使用的是Ubuntu或者Debian發行版的Linux,能夠直接經過下面的命令安裝RabbitMQ: sudo apt-get install rabbitmq-server 安裝完畢以後,RabbitMQ-server服務器就已經在後臺運行。若是您用的並非Ubuntu或Debian, 能夠在如下網址: http://www.rabbitmq.com/download.html 去查找本身所須要的版本軟件。

Redis

Redis也是一款功能完備的broker可選項,可是其可能因意外中斷或者電源故障致使數據丟失的狀況。

關因而有那個Redis做爲Broker,可訪下面網址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

存儲結果

若是咱們想跟蹤任務的狀態,Celery須要將結果保存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

Celery使用的一個簡單示例

項目的目錄結構以下:

咱們在tasks包中加入須要異步調用的任務,在task_demo.py文件中寫具體的任務:

# -*- coding:utf-8 -*-
from celery import Celery

# 定義celery對象
# 123是我redis的密碼;使用redis的1號庫做爲broker,2號庫做爲存取結果的地方
celery_app = Celery("demo1",
                    broker="redis://:123@127.0.0.1:6800/1",
                    backend="redis://:123@127.0.0.1:6800/2"
                    )


@celery_app.task
def print_now(now):
    """異步任務的demo"""
    print(now)
    # 定義一個返回值,若是不寫的話會默認返回None
    return "當前時間:{}".format(now)

而後在外部的transfer.py文件中加入調用這個任務的代碼:

# -*- coding:utf-8 -*-
import time

import redis
from tasks.task_demo import print_now


# 建立redis鏈接,選擇2號庫
conn = redis.Redis(host="127.0.0.1",port=6800,password=123,db=2)

# 使用celery異步任務
# delay函數調用後當即返回
now = time.strftime("%Y-%m-%d %X")
# 把參數放在delay中!
ret = print_now.delay(now)
print(ret,type(ret)) # 7af04c3c-8070-4bf1-9563-b220fd5aac9c <class 'celery.result.AsyncResult'>

而後在項目的目錄下啓動celery:

 

啓動的結果及說明以下:

Celery啓動時若是上報AttributeError: async 這個錯誤,建議把Celery的版本升級到4.1.1

啓動成功後,終端會夯住;而後運行transfer.py文件,在終端會出現這樣的提示:

而後咱們再利用redis可視化工具看看結果:

咱們能夠看到,在執行完一次任務後,redis數據庫1存放着celery的broker的信息,在庫2中存放着執行的結果。

Django中使用Celery生成首頁靜態頁面並將結果存在本地數據庫中 *****

在實際中,咱們網站的首頁內容是不怎麼變化的,若是每次請求首頁都對數據庫中的內容進行查詢的話,這樣對數據庫來講負擔會很大。

咱們可使用celery生成一下靜態頁面,當沒有數據變化的時候用戶每次請求主頁咱們都讓他們去訪問這個生成好的「首頁」,有當數據變化的時候再從新生成一下這個「靜態頁面」就能夠了。

項目的目錄以下

 

路由與視圖以下 

路由:

from django.contrib import admin
from django.urls import path

from demo import views

urlpatterns = [
    path('admin/', admin.site.urls),
 path('index/',views.Index.as_view()),
]

視圖:

from django.views import View
from django.shortcuts import render

from tasks.static_index import index_static

class Index(View):

    def get(self,request):

        # 這裏只是模擬一下後臺是否有數據改動的狀況
        # 當數據有修改的時候須要進行判斷,把修改後的數據傳過去再從新進行頁面的生成!
        # 若是a=1表示後臺沒有數據修改
        a = 12
        if a == 1:
            return render(request, "index.html")
        # 有數據修改了,就從新生成一下靜態的頁面
        else:
            index_static.delay()
            return render(request,"index.html")

將結果存儲在本地數據庫中的配置

此處須要用到額外包django_celery_results, 先安裝包:

pip3 install django-celery-results

在celery_demo/settings.py中安裝此應用:

INSTALLED_APPS = [
   xxx

    # 自定義app或第三方app
    'demo.apps.DemoConfig',
    # celery用於存儲結果的應用
    'django_celery_results',
    # celery用於定時任務的應用
    'django_celery_beat'
]

此時,tasks/static_index.py文件中的配置以及生成靜態頁面的代碼以下:

import os

from celery import Celery
from django.conf import settings
from django.template import loader,RequestContext

### 注意!必須引入Django環境!爲celery設置環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoCeleryDemo.settings') ## 建立應用 # 定義celery對象 # 123是我redis的密碼;使用redis的1號庫做爲broker
celery_app = Celery("demo2") ## 配置應用
celery_app.conf.update( # 使用redis的庫1做爲消息隊列
    BROKER_URL="redis://:123@127.0.0.1:6800/1", # 使用項目數據庫存儲任務執行結果
    CELERY_RESULT_BACKEND='django-db', # 如過想把把結果存在redis的庫2中這樣配置:
    # CELERY_RESULT_BACKEND = "redis://:123@127.0.0.1:6800/2", )

@celery_app.task
def index_static():

    # 這裏可使用ORM查詢的結果等等
    context = { "msg":"這是一個基於Celery實現的首頁靜態頁面" } # 使用模板
    # 加載模板文件,返回模板對象
    temp = loader.get_template("index_static.html") # 模板渲染
    index_static_html = temp.render(context) # 生成首頁靜態頁面 —— 存放在static目錄下
    index_static_html_path = os.path.join(settings.BASE_DIR,'templates','index.html') with open(index_static_html_path,'w')as f: f.write(index_static_html) # 做爲測試,這裏返回一個字符串OK
    return "OK"

建立django_celery_results應用所需數據庫表, 執行遷移文件:

這裏須要注意,因爲後續咱們會用到admin頁面,所以須要先遷移一下「默認註冊應用auth」相關的數據庫:

python3 manage.py migrate

而後遷移django_celery_results的表:

python3 manage.py migrate django_celery_results

啓動celery

進入項目根目錄,啓動celery:

celery -A tasks.static_index worker -l info

訪問index頁面查看結果

此時咱們再訪問index頁面,能夠從結果的表中看到result:

Django中使用celery作定時任務 *****

若是咱們想某日某時執行某個任務,或者每隔一段時間執行某個任務,也可使用celery來完成。

使用定時任務,須要安裝額外包:

pip3 install django_celery_beat

而後在settings.py中安裝此應用: 

INSTALLED_APPS = [
    xxx

    # 自定義app或第三方app
    'demo.apps.DemoConfig',
    # celery用於存儲結果的應用
    'django_celery_results',
    # celery用於定時任務的應用
    'django_celery_beat'
]

而後,tasks/static_index.py文件中的配置以及定時任務的代碼以下:

# -*- coding:utf-8 -*-
import os

from celery import Celery
from django.conf import settings
from django.template import loader,RequestContext

### 注意!必須引入Django環境!爲celery設置環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoCeleryDemo.settings')


## 建立應用
# 定義celery對象
# 123是我redis的密碼;使用redis的1號庫做爲broker
celery_app = Celery("demo2")

## 配置應用
celery_app.conf.update(
    # 使用redis的庫1做爲消息隊列
    BROKER_URL="redis://:123@127.0.0.1:6800/1",
    # 使用項目數據庫存儲任務執行結果
    CELERY_RESULT_BACKEND='django-db',
    # 把結果存在redis的庫2中
    # CELERY_RESULT_BACKEND = "redis://:123@127.0.0.1:6800/2",
    # 配置定時器模塊,定時器信息存儲在數據庫中
    CELERYBEAT_SCHEDULER='django_celery_beat.schedulers.DatabaseScheduler',
)


### 定時任務
@celery_app.task def interval_task(): print("我每隔5秒執行一次...")
# 做爲測試,這裏返回666
return 666

因爲定時器信息存儲在數據庫中,咱們須要先生成對應表, 對diango_celery_beat執行遷移操做,建立對應表:

python3 manage.py migrate django_celery_beat

其餘的表是以前遷移的時候生成的:

因爲咱們須要在數據庫中添加數據才能使用定時任務,所以這裏須要建立一下後臺管理員帳號:

python3 manage.py createsuperuser

而後登錄後臺管理員admin界面:

其中Crontabs用於定時某個具體時間執行某個任務的時間;

Intervals用於每隔多久執行任務的事件;

具體任務的執行在Periodic tasks表中建立。 

咱們要建立每隔5秒執行某個任務,因此在Intervals表名後面點擊Add按鈕:

而後在Periodic tasks表名後面,點擊Add按鈕,添加任務:

 

啓動定時任務須要在後面加上--beat參數!

celery -A tasks.static_index worker -l info --beat

結果以下:

咱們能夠在admin的Task results表中查看結果:

其中上面3條是定時任務的結果,下面那一條是生成靜態頁面的結果。

相關文章
相關標籤/搜索