Django項目中接受前端頁面點擊事件異步執行之celery+redis

問題場景:

當在Python Django項目中,建立多個APP應用,而且Django實現後端業務邏輯處理過程,屬於(後端代碼),既而後有後端代碼,那基本會與前端代碼(前端頁面)進行人機交互操做。在此狀況下,基本操做前端輸入對應的參數和點擊對應的按鈕事件,將數據實時傳輸給後端代碼進行業務處理,而後並在最短期內反饋到前端,實現一次完整的輸入輸出過程(該過程實時並高效快速),但每每該狀況屬於實時處理,既然有實時,也存在異步操做,此處就以Python調用ansible執行自動化部署操做爲例,在該例子中,前端提交部署請求,Python調用ansible進行部署處理,在處理過程當中,通常簡單操做會快速執行完成並返回結果數據,但當部署複雜的ansible操做時其中等待的時間會更長,給前端會處於等待狀態,會所以帶來擁塞狀況和用戶體驗較差。針對此狀況,Django項目有他處理的成熟方案。html

解決方案:

根據以上提出的問題,進行分析,既要讓用戶不處於一直等待狀態,又要讓該任務後端異步執行,同時用戶須要在任務執行完成後被動知道最終結果。綜上所述得出方案:Django項目中採用celery異步任務執行+redis任務隊列數據存儲+執行任務結束回調過程。前端

具體實現:mysql存儲執行結果日誌,redis緩存,Django1.11.7,Python3.6.3(asstes項目,asstes_cd APP應用)

一、Django項目基礎依賴包:ansible,celery,Django,Django,django-celery-beat(django項目中會生成定時任務表),redispython

(assets) root@python-dev:/application/assets# pip list Package Version ------------------- ----------- ansible 2.3.2.0 celery 4.2.0 Django 1.11.7 django-celery-beat 1.1.1 django-filter       1.1.0 django-mysql        2.2.2 django-rest-swagger 2.1.2 djangorestframework 3.7.3 pip 10.0.1 PyMySQL 0.8.0 redis 2.10.6 requests 2.18.4 requests-ntlm       1.1.0 setuptools 28.8.0      urllib3 1.22 virtualenv 16.0.0 

二、Django項目asstes主項目配置:redis數據庫用於celery任務存放mysql

CELERY_BROKER_URL = 'redis://localhost:6379/0'
#CELERY_RESULT_BACKEND = 'redis://' #CELERY_RESULT_PERSISTENT = False #CELERY_TASK_SERIALIZER = 'json' #CELERY_RESULT_SERIALIZER = 'json' #CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = TIME_ZONE # CELERY_ENABLE_UTC = True

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_beat',
]git

 

三、Django項目asstes主目錄下__init__.py配置啓用celery在項目中使用github

import pymysql pymysql.install_as_MySQLdb() from .celerytask import app as celery_app # include all celery task

__all__ = ['celery_app']

四、Django項目asstes主目錄下創建celerytask.py實現Django與celery任務對接redis

import os from celery import Celery from celery.schedules import crontab  # By default, celery searches for a submodule named celery.py # http://docs.celeryproject.org/en/latest/getting-started/\ # next-steps.html#about-the-app-argument # # -- Refer # http://docs.celeryproject.org/en/latest/ # https://github.com/celery/celery/tree/master/examples/django/ # http://docs.celeryproject.org/en/latest/userguide/application.html # http://docs.celeryproject.org/en/latest/userguide/configuration.html


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'assets.settings') #app = Celery('oms', backend='rpc://', broker='pyamqp://guest@localhost//')
app = Celery('assets') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY') 

# Load task modules from all registered Django app configs. #from django.conf import settings #app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks()

五、APP asstes_cd應用中創建view.py業務類(與前端交互API),tasks.py(celery任務處理類)sql

view類:因爲內容過長,直接上重點數據庫

from assets_cd import tasks #導入task celery任務處理
.............省略.............
oper_log_id_list = [] ip_list = [] ssh_list = [] path_list = [] with transaction.atomic(): for deploy in deploy_log_list: deploy.save() oper_log_id_list.append(deploy.id) ip_list.append(deploy.host_ip) ssh_list.append(deploy.ssh_user) path_list.append(deploy.service_path) callback_id = ','.join([str(x) for x in oper_log_id_list]) ip_list = ','.join([str(ip) for ip in ip_list]) ssh_list = ','.join([str(ssh) for ssh in ssh_list]) path_list = ','.join([str(path) for path in path_list]) logger.info('callback_id, {}'.format(callback_id)) logger.info('add tasks_deploy, tasks_deploy.run_playbook_task_deploy.delay') for playbook in play_book_list: tasks.run_playbook_task.apply_async((playbook,), queue='priority.high') #此處任務丟入執行 logger.info('add tasks_deploy completed, congrats')
          #將如下日誌反饋到前端 deployment_result_list.append({
'id': '{}'.format(callback_id), 'ip_list': ip_list, 'ssh_user': ssh_list, 'path': path_list, 'status':'任務提交celery成功,請在到日誌列表中查詢相關記錄.' })
.............省略.............

tasks.py類django

# -*- coding: utf-8 -*-
import logging import os import time try: import simplejson as json except ImportError: import json from util import ansibleutil import logging from util import deploymentutil from celery import shared_task #導入celery任務執行裝飾器 import traceback import datetime import requests import time from django.db import transaction try: import simplejson as json except ImportError: import json from assets_cd.models import DeployLog logger = logging.getLogger(__name__) @shared_task #裝飾器引用,執行celery異步開始 def run_playbook_task(playbook): begin_time = datetime.datetime.now() end_time = datetime.datetime.now() diff_time = end_time - begin_time elapsed_time = '{}.{}'.format(diff_time.seconds, diff_time.microseconds) serial_number = playbook['serial_number'] callback_url = playbook['callback_url'] try: ssh_user = playbook['ssh_user'] sudo_user = playbook['sudo_user'] playbook_path = playbook['playbook_path'] playbook_extra_vars = playbook['playbook_extra_vars'] os_type = playbook['os_type'] ip = playbook['ip'] result = 0 DeployLog.objects.filter( serial_number=serial_number).update( begin_time=begin_time, status=-3, result=-3, ) 
#ansible任務執行過程,耗時就在此處,將整個任務丟入celery異步執行。不影響前端快速響應 ansible_result
= ansibleutil.run_playbook( serial_number=serial_number, ssh_user=ssh_user, sudo_user=sudo_user, playbook_path=playbook_path, playbook_extra_vars=playbook_extra_vars, os_type=os_type, ) deploymentutil.del_maint_ip_list(ip) logger.info('ansible_result {}'.format(ansible_result)) #執行ansible操做完成後結果狀態反饋。 result = ansible_result log_file = DeployLog.objects.get(serial_number=serial_number) if log_file: log_content = log_file.result_log # with open(log_file,'r') as f: # log_content = f.read() log_content = str(log_content).replace('\n', '<br>').replace('\t', '&nbsp;' * 8) logger.info(log_content) else: logger.info("reslut:[]") except: msg = traceback.format_exc() logger.error(msg) # log_dir = '/var/log/ansible' log_file = DeployLog.objects.get(serial_number=serial_number) if log_file: data = log_file.result_log +'\n'+ msg with transaction.atomic(): log_file.result_log = data log_file.save() else: logger.error(msg) result = -1 try: logger.info("callback_url:{}".format(callback_url)) logger.info("result:{}".format(result)) if result != 0: DeployLog.objects.filter( serial_number=serial_number).update( begin_time=begin_time, end_time=end_time, elapsed_time=elapsed_time, status=result, result=result, )
#執行結束前進行接口回調操做
if callback_url != "" or callback_url is not None: log_file = DeployLog.objects.get(serial_number=serial_number) log = -1 data_log = { "id": "{}".format(log_file.id), "result": int(log), } logger.info(json.dumps(data_log)) try: req = requests.post(callback_url,json.dumps(data_log),timeout=120) logger.info(req.status_code) logger.info(req.content) if req.status_code != 200: for i in range(3): time.sleep(5) req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break elif req.status_code == 200: logger.info("callback log url seccess.") except Exception as e: logger.info(e.args[0]) for i in range(3): time.sleep(5) try: req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break except Exception as e: logger.info(e.args[0]) pass else: DeployLog.objects.filter( serial_number=serial_number).update( end_time=end_time, elapsed_time=elapsed_time, status=result, result=result, ) if callback_url != "" or callback_url is not None: log_file = DeployLog.objects.get(serial_number=serial_number) log = log_file.result data_log = { "id": "{}".format(log_file.id), "result": int(log), } logger.info(json.dumps(data_log)) try: req = requests.post(callback_url,json.dumps(data_log),timeout=120) if req.status_code != 200: for i in range(3): time.sleep(5) req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break elif req.status_code == 200: logger.info("callback log url seccess.") except Exception as e: logger.info(e.args[0]) for i in range(3): time.sleep(5) try: req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break except Exception as e: logger.info(e.args[0]) pass
except: logger.error(traceback.format_exc()) logger.info('run_ansible_playbook, result {}'.format(result))

六、執行流程:

前端頁面事務提交--->後端view接收任務--->任務數據存入redis--->celery觸發任務執行(任務來源redis數據庫)--->結果反饋前端--->造成閉環

相關文章
相關標籤/搜索