django使用celery搭配redis配置定時任務

已經安裝環境:python

Python3.6   c++

django==2.1.8(用2.2.2須要升級sqlite3)  redis

項目名稱:ceshiproject   APP名稱:ceshi sql

第一步:centos7下首先安裝redis程序django

wget http://download.redis.io/releases/redis-5.0.5.tar.gz 或者 到官網https://redis.io/download 查看教程並下載
tar xzf redis-5.0.5.tar.gz
yum install gcc  gcc-c++  #安裝一下依賴,若是有省略
cd redis-5.0.5
make MALLOC=libc
cd src && make install
---------到此安裝完成---------------
cd redis-5.0.5  #進入安裝目錄
./redis-server  #啓動redis
./redis-cli     #可進入redis命令模式,通常用不上

 第二步:安裝django-celery==3.2.2,並在APP中添加加 'djcelery' (注意:若是超過3.2.2版本程序會報錯:TypeError: can only concatenate list (not "tuple") to list)json

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'ceshi',
    'djcelery'
]

 第三步:在項目ceshiproject下,新建配置文件celeryconfig.py,內容以下centos

# -*- coding:utf-8 -*-
import djcelery
djcelery.setup_loader()
from datetime import timedelta

#設置任務運行時,使用的隊列,以防形成定時,跟按期擁堵
CELERY_QUEUES = {
    'beat_task1':{
        'exchange':'beat_task1',
        'exchange_type':'direct',
        'binding_key':'beat_task1'
    },
    'work_queue':{
        'exchange':'work_queue',
        'exchange_type':'direct',
        'binding_key':'work_queue'
    }
}

#設置默認使用的隊列
CELERY_DEFAULT_QUEUE = 'work_queue'


#導入任務
CELERY_IMPORTS=(
    'ceshi.task1',
)

#有些狀況下防止死鎖
CELERY_FORCE_EXECV = True

#設置併發的worker數量,根據cpu核心數量來定
CELERY_CONCURRENCY = 4

#任務失敗,能夠容許重試
CELERY_ACKS_LATE = True

#設置每個worker能夠執行多少個任務後銷燬,防止內存泄漏
CELERY_MAX_TASKS_PER_CHILD = 100

#單個任務最大的運行時間,這邊設置6分鐘
CELERY_TASK_TIME_LIMIT = 12 * 30



# 定時任務
CELERYBEAT_SCHEDULE = {
    'task1': {
        'task': 'ceshi_task',  # 任務名稱
        'schedule': timedelta(seconds=10),  # 時間週期
        'options': {
            'queue': 'beat_task1'  # 消息隊列
        }
    },
    # 'task2': {
    #     'task': 'update-db',  # 任務名稱
    #     # 'schedule': crontab(minute=0, hour=0),  # 天天凌晨執行一次
    #     'schedule': timedelta(seconds=10),  #
    #     'options': {
    #         'queue': 'beat_tasks'
    #     }
    # }
}

第四步:在APP下面新建任務task1.py,內容以下:緩存

from celery.task import Task  #調用celery中的task模塊
import time
class ceshitask(Task):
    name = "ceshi_task"
    def run(self, *args, **kwargs):
        print ("start ceshi task")
        time.sleep(5)
        print ("args={},kwargs={}".format(args,kwargs))
        print ("end task")

第五步:配置好項目跟APP的路由服務器

項目ceshiproject路由:session

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', include('ceshi.urls',namespace='ceshi')),
]

APP(ceshi)的路由

from django.urls import path
from . import views
app_name="ceshi"
urlpatterns = [
    path('do/', views.do,name='do'),
]

第六步:

首先安裝redis驅動:pip install redis==2.10.6 

redis==2.10.6 (高版本會報錯,用於django內部驅動用跟下面安裝程序是不同)

在setting.py中配置redis調用,引入項目下面的celerycofig.py

from .celeryconfig import *
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

第七步:在views.py中引入APP下面的task1,並調用

from django.http import JsonResponse
from ceshi.task1 import ceshitask

def do(request):
    #執行異步任務
    print ('start request')
    ceshitask.delay()
    print ('end request')
    return JsonResponse({"result":"ok"})

第八步:開啓兩個終端,並在終端中(Terminal)啓動celery中的worker跟定時任務beat模塊,進行測試

python manage.py celery worker -l INFO   #首先開啓worker
python manage.py celerybeat -l INFO   #開啓定時任務

 第九步:上面能夠使用了,若是要在django中調用,進行一序列的操做,請繼續往下走:

1.把django中cache用於redis

首先安裝django-redis==4.10

2.把下面的內容添加到django的settings.py中(redis默認不能超過16,若是

redis://127.0.0.1:6379/10 設置爲17會出現:redis.exceptions.ResponseError: DB index is out of range

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        # 把這裏緩存你的redis服務器ip和port
        "LOCATION": "redis://127.0.0.1:6379/10",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        }
    }
}

# 3.設置redis存儲django的session信息
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
SESSION_CACHE_ALIAS = "default"

3.能夠在task任務,經過下面命令設置redis,(注意:設置cache時候參數加None,即cache永久不失效)

from django.core.cache import cache
class
ceshitask(Task): name = "ceshi_task" def run(self, *args, **kwargs): cache.set("start-ceshi-task",值, None) return "ceshi....."

4.在views中調用

from django.shortcuts import render,HttpResponse
from django.core.cache import cache
import json

def num_recode(request):
    count = cache.get("start-ceshi-task")
    print(json.loads(count))
    return HttpResponse(count)
相關文章
相關標籤/搜索