批量執行(Linux命令,上傳/下載文件)

前言:

                                                   

 

 

每一個公司的網絡環境大都劃分 辦公網絡、線上網絡,之因此劃分的主要緣由是爲了保證線上操做安全;css

對於外部用戶而言也只能訪問線上網絡的特定開放端口,那麼是什麼控制了用戶訪問線上網絡的呢?html

防火牆過濾......!前端

 

對於內部員工而言對線上系統平常運維、代碼部署如何安全訪問線上業務系統呢?如何監控、記錄技術人員的操做記錄?python

 

堡壘機策略:ios

1.回收全部遠程登陸Linux主機的用戶名、密碼;git

2.中間設置堡壘機(保存全部線上Linux主機的用戶名、密碼);github

3.全部技術人員都要經過堡壘機去獲取用戶名、密碼,而後在再去鏈接 線上系統,並記錄操做日誌;web

 

堡壘機策略優勢:ajax

1.記錄用戶操做;redis

2.實現遠程操做權限集中管理;

 

1、堡壘機表結構設計

 

from django.db import models
from django.contrib.auth.models import  User
# Create your models here.


class IDC(models.Model):
    name = models.CharField(max_length=64,unique=True)
    def __str__(self):
        return self.name

class Host(models.Model):
    """存儲全部主機信息"""
    hostname = models.CharField(max_length=64,unique=True)
    ip_addr = models.GenericIPAddressField(unique=True)
    port = models.IntegerField(default=22)
    idc = models.ForeignKey("IDC")
    #host_groups = models.ManyToManyField("HostGroup")
    #host_users = models.ManyToManyField("HostUser")
    enabled = models.BooleanField(default=True)

    def __str__(self):
        return "%s-%s" %(self.hostname,self.ip_addr)

class HostGroup(models.Model):
    """主機組"""
    name = models.CharField(max_length=64,unique=True)
    host_user_binds  = models.ManyToManyField("HostUserBind")
    def __str__(self):
        return self.name


class HostUser(models.Model):
    """存儲遠程主機的用戶信息
    root 123
    root abc
    root sfsfs
    """
    auth_type_choices = ((0,'ssh-password'),(1,'ssh-key'))
    auth_type = models.SmallIntegerField(choices=auth_type_choices)
    username = models.CharField(max_length=32)
    password = models.CharField(blank=True,null=True,max_length=128)

    def __str__(self):
        return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password)

    class Meta:
        unique_together = ('username','password')


class HostUserBind(models.Model):
    """綁定主機和用戶"""
    host = models.ForeignKey("Host")
    host_user = models.ForeignKey("HostUser")

    def __str__(self):
        return "%s-%s" %(self.host,self.host_user)

    class Meta:
        unique_together = ('host','host_user')


class SessionLog(models.Model):
    ''' 記錄每一個用戶登陸操做,ID傳給 shell生成文件命名 '''
    account=models.ForeignKey('Account')
    host_user_bind=models.ForeignKey('HostUserBind')
    start_date=models.DateField(auto_now_add=True)
    end_date=models.DateField(blank=True,null=True)

    def __str__(self):
        return '%s-%s'%(self.account,self.host_user_bind)

class AuditLog(models.Model):
    """審計日誌"""


class Account(models.Model):
    """堡壘機帳戶
    1. 擴展
    2. 繼承
    user.account.host_user_bind
    """

    user = models.OneToOneField(User)
    name = models.CharField(max_length=64)

    host_user_binds = models.ManyToManyField("HostUserBind",blank=True)
    host_groups = models.ManyToManyField("HostGroup",blank=True)
models.py

 

 

 2、經過堡壘機遠程登陸Linux主機

2種堡壘機登陸方式:

 

命令行登陸堡壘機方式:

方式1:經過 修改open_shh源碼擴展-Z option生成惟一 ssh進程,使用Linux的strace 命令對惟一 ssh進程進行檢測生成日誌文件;

 

0.用戶執行audit_shell出現交互界面,提示用戶輸入機組和主機;

import sys,os,django
os.environ.setdefault("DJANGO_SETTINGS_MODULE","zhanggen_audit.settings")
django.setup() #在Django視圖以外,調用Django功能設置環境變量!
from audit.backend import user_interactive


if __name__ == '__main__':
    shell_obj=user_interactive.UserShell(sys.argv)
    shell_obj.start()
audit_shell.py
from django.contrib.auth import authenticate

class UserShell(object):
    '''用戶登陸堡壘機,啓動自定製shell  '''
    def __init__(self,sys_argv):
        self.sys_argv=sys_argv
        self.user=None

    def auth(self):
        count=0
        while count < 3:
            username=input('username:').strip()
            password=input('password:').strip()
            user=authenticate(username=username,password=password)
            #none 表明認證失敗,返回用戶對象認證成功!
            if not user:
                count+=1
                print('無效的用戶名或者,密碼!')
            else:
                self.user=user
                return True
        else:
            print('輸入次數超過3次!')


    def start(self):
        """啓動交互程序"""

        if self.auth():
            # print(self.user.account.host_user_binds.all()) #select_related()
            while True:
                host_groups = self.user.account.host_groups.all()
                for index, group in enumerate(host_groups):
                    print("%s.\t%s[%s]" % (index, group, group.host_user_binds.count()))
                print("%s.\t未分組機器[%s]" % (len(host_groups), self.user.account.host_user_binds.count()))

                choice = input("select group>:").strip()
                if choice.isdigit():
                    choice = int(choice)
                    host_bind_list = None
                    if choice >= 0 and choice < len(host_groups):
                        selected_group = host_groups[choice]
                        host_bind_list = selected_group.host_user_binds.all()
                    elif choice == len(host_groups):  # 選擇的未分組機器
                        # selected_group = self.user.account.host_user_binds.all()
                        host_bind_list = self.user.account.host_user_binds.all()

                    if host_bind_list:
                        while True:
                            for index, host in enumerate(host_bind_list):
                                print("%s.\t%s" % (index, host,))
                            choice2 = input("select host>:").strip()
                            if choice2.isdigit():
                                choice2 = int(choice2)
                                if choice2 >= 0 and choice2 < len(host_bind_list):
                                    selected_host = host_bind_list[choice2]
                                    print("selected host", selected_host)
                            elif choice2 == 'b':
                                break
user_interactive.py

 

知識點:

在Django視圖以外,調用Django功能設置環境變量!(切記放在和Django manage.py 同級目錄); 

import sys,os,django
os.environ.setdefault("DJANGO_SETTINGS_MODULE","Sensors_Data.settings")
django.setup()  # 在Django視圖以外,調用Django功能設置環境變量!
from app01 import models
objs=models.AlarmInfo.objects.all()
for row in objs:
    print(row.comment)

 

 

注意:在Django啓動時會自動加載一些 文件,好比每一個app中admin.py,不能在這些文件裏面設置加載環境變量,由於已經加載完了,若是違反這個規則會致使Django程序啓動失敗;

 

 

 

1.實現ssh用戶指令檢測

1.0  修改open_shh源碼,擴展 ssh -Z  惟一標識符;(這樣每次ssh遠程登陸,均可以利用惟一標識符,分辨出 每一個ssh會話進程;)

修改OpenSsh下的ssh.c文件的608和609行、935行增長;
    while ((opt = getopt(ac, av, "1246ab:c:e:fgi:kl:m:no:p:qstvxz:"
        "ACD:E:F:GI:J:KL:MNO:PQ:R:S:TVw:W:XYyZ:")) != -1) {

        case 'Z':
            break;
ssh.c 

 

知識點:

OpenSSH 是 SSH (Secure SHell) 協議的免費開源實現項目。

 

 

1.1  修改openssh以後,編譯、安裝

chmod 755 configure
./configure --prefix=/usr/local/openssh
make
chmod 755 mkinstalldirs
make install
sshpass -p xxxxxx123 /usr/local/openssh/bin/ssh root@172.17.10.112 -Z s1123ssssd212

 

 

1.2  每一個ssh會話進程能夠惟一標識以後,在堡壘機使用會話腳本shell腳本檢測 ssh會話進程;(strace命令進行監控,並生產 log日誌文件);

#!/usr/bin/bash

for i in $(seq 1 30);do
    echo $i $1
    process_id=`ps -ef | grep $1 | grep -v 'ession_check.sh' | grep -v grep | grep -v sshpass | awk '{print $2}'`

    echo "process: $process_id"

    if [ ! -z "$process_id" ];then
        echo 'start run strace.....'
        strace -fp $process_id -t -o $2.log;
        break;
    fi

    sleep 5

done;
ssh 會話檢測腳本

 

知識點:

strace 檢測進程的IO調用,監控用戶shell輸入的命令字符;

 strace -fp 60864 -o /ssh.log 
 cat /ssh.log |grep 'write(8'
 rz -E #從xshell上傳文件

 

 sshpass無需提示輸入密碼登陸

[root@localhost sshpass-1.06]# sshpass -p wsnb ssh root@172.16.22.1  -o StrictHostKeyChecking=no 
Last login: Tue Jul 10 16:39:53 2018 from 192.168.113.84
[root@ecdb ~]# 

 

python生成惟一標識符

s=string.ascii_lowercase+string.digits
random_tag=''.join(random.sample(s,10))

 

 

解決普通用戶,沒法執行 strace命令;

方式1:執行文件  +s權限

chmod u+s `which strace`

 

方式2:修改sudo配置文件,使普通用戶sudo時無需輸入密碼!

修改sudo配置文件,防止修改出錯,必定要切換到root用戶;


%普通用戶  ALL=(ALL)       NOPASSWD: ALL

wq! #退出
vim /etc/sudoers

 

 

 

 

 

#!/usr/bin/python3
# -*- coding: utf-8 -*
from django.contrib.auth import authenticate
import subprocess,string,random
from audit import models
from django.conf import settings
class UserShell(object):
    '''用戶登陸堡壘機,啓動自定製shell  '''
    def __init__(self,sys_argv):
        self.sys_argv=sys_argv
        self.user=None

    def auth(self):
        count=0
        while count < 3:
            username=input('username:').strip()
            password=input('password:').strip()
            user=authenticate(username=username,password=password)
            #none 表明認證失敗,返回用戶對象認證成功!
            if not user:
                count+=1
                print('無效的用戶名或者,密碼!')
            else:
                self.user=user
                return True
        else:
            print('輸入次數超過3次!')


    def start(self):
        """啓動交互程序"""

        if self.auth():
            # print(self.user.account.host_user_binds.all()) #select_related()
            while True:
                host_groups = self.user.account.host_groups.all()
                for index, group in enumerate(host_groups):
                    print("%s.\t%s[%s]" % (index, group, group.host_user_binds.count()))
                print("%s.\t未分組機器[%s]" % (len(host_groups), self.user.account.host_user_binds.count()))

                choice = input("select group>:").strip()
                if choice.isdigit():
                    choice = int(choice)
                    host_bind_list = None
                    if choice >= 0 and choice < len(host_groups):
                        selected_group = host_groups[choice]
                        host_bind_list = selected_group.host_user_binds.all()
                    elif choice == len(host_groups):  # 選擇的未分組機器
                        # selected_group = self.user.account.host_user_binds.all()
                        host_bind_list = self.user.account.host_user_binds.all()

                    if host_bind_list:
                        while True:
                            for index, host in enumerate(host_bind_list):
                                print("%s.\t%s" % (index, host,))
                            choice2 = input("select host>:").strip()
                            if choice2.isdigit():
                                choice2 = int(choice2)
                                if choice2 >= 0 and choice2 < len(host_bind_list):
                                    selected_host = host_bind_list[choice2]
                                    s = string.ascii_lowercase + string.digits
                                    random_tag = ''.join(random.sample(s, 10))
                                    session_obj=models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host)

                                    session_tracker_scipt='/bin/sh %s %s %s'%(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.pk)

                                    session_tracker_process=subprocess.Popen(session_tracker_scipt,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
                                    cmd='sshpass -p %s /usr/local/openssh/bin/ssh %s@%s -p %s -o stricthostkeychecking=no -Z %s' % (selected_host.host_user.password,
                                                                                                             selected_host.host_user.username,
                                                                                                             selected_host.host.ip_addr,
                                                                                                             selected_host.host.port,random_tag)
                                    subprocess.run(cmd,shell=True)#開啓子進程交互
                                    print(session_tracker_process.stdout.readlines(),
                                          session_tracker_process.stderr.readlines())


                            elif choice2 == 'b':
                                break
彙總

 

2.shell遠程登陸程序檢查日誌文件,分析;

tab補全的命令,須要搜素write(5,該腳本實現思路,按鍵去嘗試,循環多種條件判斷;

import re


class AuditLogHandler(object):
    '''分析audit log日誌'''

    def __init__(self,log_file):
        self.log_file_obj = self._get_file(log_file)


    def _get_file(self,log_file):

        return open(log_file)

    def parse(self):
        cmd_list = []
        cmd_str = ''
        catch_write5_flag = False #for tab complication
        for line in self.log_file_obj:
            #print(line.split())
            line = line.split()
            try:
                pid,time_clock,io_call,char = line[0:4]
                if io_call.startswith('read(4'):
                    if char == '"\\177",':#回退
                        char = '[1<-del]'
                    if char == '"\\33OB",': #vim中下箭頭
                        char = '[down 1]'
                    if char == '"\\33OA",': #vim中下箭頭
                        char = '[up 1]'
                    if char == '"\\33OC",': #vim中右移
                        char = '[->1]'
                    if char == '"\\33OD",': #vim中左移
                        char = '[1<-]'
                    if char == '"\33[2;2R",': #進入vim模式
                        continue
                    if char == '"\\33[>1;95;0c",':  # 進入vim模式
                        char = '[----enter vim mode-----]'


                    if char == '"\\33[A",': #命令行向上箭頭
                        char = '[up 1]'
                        catch_write5_flag = True #取到向上按鍵拿到的歷史命令
                    if char == '"\\33[B",':  # 命令行向上箭頭
                        char = '[down 1]'
                        catch_write5_flag = True  # 取到向下按鍵拿到的歷史命令
                    if char == '"\\33[C",':  # 命令行向右移動1位
                        char = '[->1]'
                    if char == '"\\33[D",':  # 命令行向左移動1位
                        char = '[1<-]'

                    cmd_str += char.strip('"",')
                    if char == '"\\t",':
                        catch_write5_flag = True
                        continue
                    if char == '"\\r",':
                        cmd_list.append([time_clock,cmd_str])
                        cmd_str = ''  # 重置
                    if char == '"':#space
                        cmd_str += ' '

                if catch_write5_flag: #to catch tab completion
                    if io_call.startswith('write(5'):
                        if io_call == '"\7",': #空鍵,不是空格,是回退不了就是這個鍵
                            pass
                        else:
                            cmd_str += char.strip('"",')
                        catch_write5_flag = False
            except ValueError as e:
                print("\033[031;1mSession log record err,please contact your IT admin,\033[0m",e)

        #print(cmd_list)
        for cmd in cmd_list:
            print(cmd)
        return cmd_list
if __name__ == "__main__":
    parser = AuditLogHandler(r'D:\zhanggen_audit\log\6.log')
    parser.parse()
日誌分析

 

3.修改bashrc文件,限制用戶登陸行爲;

alias rm='rm -i'
alias cp='cp -i'
alias mv='mv -i'

# Source global definitions
if [ -f /etc/bashrc ]; then
        . /etc/bashrc
fi



echo '-----------------------welcome  to  zhanggen  audit  --------------------------'

python3 /root/zhanggen_audit/audit_shell.py

echo 'bye'

logout
vim ~/.bashrc

 

缺陷:

雖然限制了用戶shell登陸,但沒法阻止用戶使用程序(paramiko)上傳惡意文件!

 

 

 

方式2:提取paramiko源碼demos文件,對其進行修改支持交互式操做;

from django.db import models
from django.contrib.auth.models import  User
# Create your models here.


class IDC(models.Model):
    name = models.CharField(max_length=64,unique=True)
    def __str__(self):
        return self.name

class Host(models.Model):
    """存儲全部主機信息"""
    hostname = models.CharField(max_length=64,unique=True)
    ip_addr = models.GenericIPAddressField(unique=True)
    port = models.IntegerField(default=22)
    idc = models.ForeignKey("IDC")
    #host_groups = models.ManyToManyField("HostGroup")
    #host_users = models.ManyToManyField("HostUser")
    enabled = models.BooleanField(default=True)

    def __str__(self):
        return "%s-%s" %(self.hostname,self.ip_addr)

class HostGroup(models.Model):
    """主機組"""
    name = models.CharField(max_length=64,unique=True)
    host_user_binds  = models.ManyToManyField("HostUserBind")
    def __str__(self):
        return self.name


class HostUser(models.Model):
    """存儲遠程主機的用戶信息
    root 123
    root abc
    root sfsfs
    """
    auth_type_choices = ((0,'ssh-password'),(1,'ssh-key'))
    auth_type = models.SmallIntegerField(choices=auth_type_choices)
    username = models.CharField(max_length=32)
    password = models.CharField(blank=True,null=True,max_length=128)

    def __str__(self):
        return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password)

    class Meta:
        unique_together = ('username','password')


class HostUserBind(models.Model):
    """綁定主機和用戶"""
    host = models.ForeignKey("Host")
    host_user = models.ForeignKey("HostUser")

    def __str__(self):
        return "%s-%s" %(self.host,self.host_user)

    class Meta:
        unique_together = ('host','host_user')


class AuditLog(models.Model):
    """審計日誌"""
    session = models.ForeignKey("SessionLog")
    cmd = models.TextField()
    date = models.DateTimeField(auto_now_add=True)
    def __str__(self):
        return "%s-%s" %(self.session,self.cmd)


class SessionLog(models.Model):
    account = models.ForeignKey("Account")
    host_user_bind = models.ForeignKey("HostUserBind")
    start_date = models.DateTimeField(auto_now_add=True)
    end_date = models.DateTimeField(blank=True,null=True)

    def __str__(self):
        return "%s-%s" %(self.account,self.host_user_bind)


class Account(models.Model):
    """堡壘機帳戶
    1. 擴展
    2. 繼承
    user.account.host_user_bind
    """

    user = models.OneToOneField(User)
    name = models.CharField(max_length=64)

    host_user_binds = models.ManyToManyField("HostUserBind",blank=True)
    host_groups = models.ManyToManyField("HostGroup",blank=True)
model.py

 

__author__ = 'Administrator'
import subprocess,random,string
from django.contrib.auth import authenticate
from django.conf import settings 
from audit import models
from audit.backend import ssh_interactive 

class UserShell(object):
    """用戶登陸堡壘機後的shell"""

    def __init__(self,sys_argv):
        self.sys_argv = sys_argv
        self.user = None

    def auth(self):

        count = 0
        while count < 3:
            username = input("username:").strip()
            password = input("password:").strip()
            user = authenticate(username=username,password=password)
            #None 表明認證不成功
            #user object ,認證對象 ,user.name
            if not user:
                count += 1
                print("Invalid username or password!")
            else:
                self.user = user
                return  True
        else:
            print("too many attempts.")

    def start(self):
        """啓動交互程序"""

        if self.auth():
            #print(self.user.account.host_user_binds.all()) #select_related()
            while True:
                host_groups = self.user.account.host_groups.all()
                for index,group in enumerate(host_groups):
                    print("%s.\t%s[%s]"%(index,group,group.host_user_binds.count()))
                print("%s.\t未分組機器[%s]"%(len(host_groups),self.user.account.host_user_binds.count()))
                try:
                    choice = input("select group>:").strip()
                    if choice.isdigit():
                        choice = int(choice)
                        host_bind_list = None
                        if choice >=0 and choice < len(host_groups):
                            selected_group = host_groups[choice]
                            host_bind_list = selected_group.host_user_binds.all()
                        elif choice == len(host_groups): #選擇的未分組機器
                            #selected_group = self.user.account.host_user_binds.all()
                            host_bind_list = self.user.account.host_user_binds.all()
                        if host_bind_list:
                            while True:
                                for index,host in enumerate(host_bind_list):
                                    print("%s.\t%s"%(index,host,))
                                choice2 = input("select host>:").strip()
                                if choice2.isdigit():
                                    choice2 = int(choice2)
                                    if choice2 >=0 and choice2 < len(host_bind_list):
                                        selected_host = host_bind_list[choice2]

                                        ssh_interactive.ssh_session(selected_host,self.user)


                                        # s = string.ascii_lowercase +string.digits
                                        # random_tag = ''.join(random.sample(s,10))
                                        # session_obj = models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host)
                                        #
                                        # cmd = "sshpass -p %s /usr/local/openssh/bin/ssh %s@%s -p %s -o StrictHostKeyChecking=no -Z %s" %(selected_host.host_user.password,selected_host.host_user.username,selected_host.host.ip_addr,selected_host.host.port ,random_tag)
                                        # #start strace ,and sleep 1 random_tag, session_obj.id
                                        # session_tracker_script = "/bin/sh %s %s %s " %(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.id)
                                        #
                                        # session_tracker_obj =subprocess.Popen(session_tracker_script, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
                                        #
                                        # ssh_channel = subprocess.run(cmd,shell=True)
                                        # print(session_tracker_obj.stdout.read(), session_tracker_obj.stderr.read())
                                        #
                                elif choice2 == 'b':
                                    break

                except KeyboardInterrupt as e :
                    pass
user_interactive.py
#!/usr/bin/env python

# Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.


import base64
from binascii import hexlify
import getpass
import os
import select
import socket
import sys
import time
import traceback
from paramiko.py3compat import input
from audit import models
import paramiko

try:
    import interactive
except ImportError:
    from . import interactive


def manual_auth(t, username, password):
    # default_auth = 'p'
    # auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth)
    # if len(auth) == 0:
    #     auth = default_auth
    #
    # if auth == 'r':
    #     default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
    #     path = input('RSA key [%s]: ' % default_path)
    #     if len(path) == 0:
    #         path = default_path
    #     try:
    #         key = paramiko.RSAKey.from_private_key_file(path)
    #     except paramiko.PasswordRequiredException:
    #         password = getpass.getpass('RSA key password: ')
    #         key = paramiko.RSAKey.from_private_key_file(path, password)
    #     t.auth_publickey(username, key)
    # elif auth == 'd':
    #     default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa')
    #     path = input('DSS key [%s]: ' % default_path)
    #     if len(path) == 0:
    #         path = default_path
    #     try:
    #         key = paramiko.DSSKey.from_private_key_file(path)
    #     except paramiko.PasswordRequiredException:
    #         password = getpass.getpass('DSS key password: ')
    #         key = paramiko.DSSKey.from_private_key_file(path, password)
    #     t.auth_publickey(username, key)
    # else:
    # pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
    t.auth_password(username, password)


def ssh_session(bind_host_user, user_obj):
    # now connect
    hostname = bind_host_user.host.ip_addr #自動輸入 主機名
    port = bind_host_user.host.port        #端口
    username = bind_host_user.host_user.username
    password = bind_host_user.host_user.password

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #生成socket鏈接
        sock.connect((hostname, port))
    except Exception as e:
        print('*** Connect failed: ' + str(e))
        traceback.print_exc()
        sys.exit(1)

    try:
        t = paramiko.Transport(sock) #使用paramiko的方法去鏈接服務器執行命令!
        try:
            t.start_client()
        except paramiko.SSHException:
            print('*** SSH negotiation failed.')
            sys.exit(1)

        try:
            keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts'))
        except IOError:
            try:
                keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts'))
            except IOError:
                print('*** Unable to open host keys file')
                keys = {}

        # check server's host key -- this is important.
        key = t.get_remote_server_key()
        if hostname not in keys:
            print('*** WARNING: Unknown host key!')
        elif key.get_name() not in keys[hostname]:
            print('*** WARNING: Unknown host key!')
        elif keys[hostname][key.get_name()] != key:
            print('*** WARNING: Host key has changed!!!')
            sys.exit(1)
        else:
            print('*** Host key OK.')

        if not t.is_authenticated():
            manual_auth(t, username, password) #密碼校驗
        if not t.is_authenticated():
            print('*** Authentication failed. :(')
            t.close()
            sys.exit(1)

        chan = t.open_session()
        chan.get_pty()  # terminal
        chan.invoke_shell()
        print('*** Here we go!\n')

        session_obj = models.SessionLog.objects.create(account=user_obj.account,
                                                       host_user_bind=bind_host_user)
        interactive.interactive_shell(chan, session_obj)#開始進入交換模式·
        chan.close()
        t.close()

    except Exception as e:
        print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e))
        traceback.print_exc()
        try:
            t.close()
        except:
            pass
        sys.exit(1)
ssh_interactive.py
# Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.


import socket
import sys
from paramiko.py3compat import u
from audit import models
# windows does not have termios...
try:
    import termios
    import tty
    has_termios = True
except ImportError:
    has_termios = False


def interactive_shell(chan,session_obj):
    if has_termios: #
        posix_shell(chan,session_obj) #unix 通用協議標準
    else:
        windows_shell(chan)


def posix_shell(chan,session_obj):
    import select
    
    oldtty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        tty.setcbreak(sys.stdin.fileno())
        chan.settimeout(0.0)
        flag = False
        cmd = ''
        while True: #開始輸入命令
            r, w, e = select.select([chan, sys.stdin], [], []) #循環檢測 輸入、輸出、錯誤,有反應就返回,沒有就一直夯住!

            if chan in r:#遠程 由返回 命令結果
                try:
                    x = u(chan.recv(1024))
                    if len(x) == 0:
                        sys.stdout.write('\r\n*** EOF\r\n')
                        break
                    if flag: #若是用戶輸入的Tab補全,服務器端返回
                        cmd += x
                        flag = False
                    sys.stdout.write(x)
                    sys.stdout.flush()
                except socket.timeout:
                    pass


            if sys.stdin in r: #本地輸入
                x = sys.stdin.read(1) #輸入1個字符就發送遠程服務器
                if len(x) == 0:
                    break
                if x == '\r': #回車·
                    models.AuditLog.objects.create(session=session_obj,cmd=cmd)
                    cmd = ''
                elif x == '\t':#tab 本地1個字符+遠程返回的
                    flag = True
                else:
                    cmd += x
                chan.send(x) #發送本地輸入 到遠程服務器

    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)

    
# thanks to Mike Looijmans for this code
def windows_shell(chan):
    import threading

    sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
        
    def writeall(sock):
        while True:
            data = sock.recv(256)
            if not data:
                sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
                sys.stdout.flush()
                break
            sys.stdout.write(data)
            sys.stdout.flush()
        
    writer = threading.Thread(target=writeall, args=(chan,))
    writer.start()
        
    try:
        while True:
            d = sys.stdin.read(1)
            if not d:
                break
            chan.send(d)
    except EOFError:
        # user hit ^Z or F6
        pass
interactive.py

 

 

 程序流程:用戶界面---------->ssh自動輸入用戶&登陸密碼---------->進入shell命令交互模式

 

知識點:

1對1:      1個 對應  1個   (1個女人嫁給了1個男人,生活慢慢平淡下來,)

1對多:      1個 對應  N個   (這個女人隱瞞丈夫相繼出軌了N個男人,這個男人發現老婆出軌了,很憤懣)

多對多:     雙方都存在1對多關係 (也相繼找了N個女情人,而這些女情人中就有他老婆出軌男人的老婆,故事結束。)

 

感悟:

這個故事很混亂! 怎麼設計男、女表結構?  其實在作數據庫表關係設計的時候,糾結2張表到底須要設計成什麼關係?到不如加幾張關係綁定表!

徹底是出於  你的程序在容許的過程當中到底 要向用戶展現什麼信息? 而決定的!

 

 

web頁面使用堡壘機方式:

web開發模式

1.MTV/MVC 先後端雜交模式;(面向公司內部OA)

優點:簡單,一人全棧;

缺陷:先後端耦合性高,性能低、單點壓力

 

2.先後端分離(面向大衆用戶)

優點:前、後端開發人員商定好接口和數據格式,並行開發,效率高;解決了後端獨自渲染模板的壓力;

缺陷:招前端得花錢

 

3.hostlist 展現主機組和主機

  <div class="panel col-lg-3">
            <div class="panel-heading">
                <h3 class="panel-title">主機組</h3>
            </div>
            <div class="panel-body">
                <ul class="list-group">
                {% for group in  request.user.account.host_groups.all %}

                    <li class="list-group-item " onclick="GetHostlist({{ group.id }},this)"><span class="badge badge-success">{{ group.host_user_binds.count }}</span>{{ group.name }}</li>
                {% endfor %}
                    <li class="list-group-item " onclick="GetHostlist(-1,this)"> <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span>未分組主機</li>

                </ul>
            </div>
        </div>
在標籤上綁定事件
<script>

function GetHostlist(gid,self) {

    $.get("{% url 'get_host_list' %}",{'gid':gid},function(callback){

        var data  = JSON.parse(callback);
        console.log(data)
        var trs = ''
        $.each(data,function (index,i) {
            var tr = "<tr><td>" + i.host__hostname + "</td><td>" + i.host__ip_addr +"</td><td>" + i.host__idc__name
                    +"</td><td>" + i.host__port  + "</td><td>" + i.host_user__username+ "</td><td>Login</td></tr>";
            trs += tr

        })
        $("#hostlist").html(trs);



    });//end get
    $(self).addClass("active").siblings().removeClass('active');

}

</script>
經過ajax向後端請求數據

 

知識點:

若是給標籤綁定事件,須要傳參數,能夠直接在標籤直接綁定。

url(r'^get_tocken$', views.get_tocken, name="get_tocken"),
Django路由別名
function GetToken(self,bind_host_id) {
    $.post(
        '{% url "get_tocken" %}',     //經過url別名渲染url
        {'bind_host_id':bind_host_id,'csrfmiddlewaretoken':"{{ csrf_token }}"},//請求攜帶的參數
        function (callback) {          //回調函數
            console.log(callback)
        }

        )
}
Django模板語言
@login_required
def get_token(request):
    bind_host_id=request.POST.get('bind_host_id')
    time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300)  # 5mins ago
    exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id,
                                                   host_user_bind_id=bind_host_id,
                                                   date__gt=time_obj)
    if exist_token_objs:  # has token already
        token_data = {'token': exist_token_objs[0].val}
    else:
        token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8))

        token_obj=models.Token.objects.create(
            host_user_bind_id=bind_host_id,
            account=request.user.account,
            val=token_val)
        token_data={"token":token_val}

    return HttpResponse(json.dumps(token_data))
生成5分鐘以內生效的token

 

 

 

 

4.點擊主機登陸,經過Shellinabox 插件以web頁面的形式遠程登陸Linux主機;

 4.0 安裝sehllinabox

yum install git openssl-devel pam-devel zlib-devel autoconf automake libtool

git clone https://github.com/shellinabox/shellinabox.git && cd shellinabox

autoreconf -i

./configure && make

make install

shellinaboxd -b -t  //-b選項表明在後臺啓動,-t選項表示不使用https方式啓動,默認以nobody用戶身份,監聽TCP4200端口

netstat -ntpl |grep shell

 

5.django結合sehll inabox

 

5.1:用戶在Django的hostlist頁面點擊生成tocken(綁定了account+host_bind_user),記錄到數據庫。

5.2: 用戶在Django的hostlist頁面 login跳轉至 sehll inabox因爲修改了bashrc跳轉以後,就會執行python用戶交互程序,python用戶交互程序 提示用戶輸入 token;

5.3: 用戶輸入token以後,python 用戶交互程序去數據庫查詢token,進而查詢到host_bind_user的ip、用戶、密碼,調用paramiko的demo.py自動輸入ip、用戶、密碼進入shell交互界面;

from django.db import models
from django.contrib.auth.models import  User
# Create your models here.


class IDC(models.Model):
    name = models.CharField(max_length=64,unique=True)
    def __str__(self):
        return self.name

class Host(models.Model):
    """存儲全部主機信息"""
    hostname = models.CharField(max_length=64,unique=True)
    ip_addr = models.GenericIPAddressField(unique=True)
    port = models.IntegerField(default=22)
    idc = models.ForeignKey("IDC")
    #host_groups = models.ManyToManyField("HostGroup")
    #host_users = models.ManyToManyField("HostUser")
    enabled = models.BooleanField(default=True)

    def __str__(self):
        return "%s-%s" %(self.hostname,self.ip_addr)

class HostGroup(models.Model):
    """主機組"""
    name = models.CharField(max_length=64,unique=True)
    host_user_binds  = models.ManyToManyField("HostUserBind")
    def __str__(self):
        return self.name


class HostUser(models.Model):
    """存儲遠程主機的用戶信息
    root 123
    root abc
    root sfsfs
    """
    auth_type_choices = ((0,'ssh-password'),(1,'ssh-key'))
    auth_type = models.SmallIntegerField(choices=auth_type_choices)
    username = models.CharField(max_length=32)
    password = models.CharField(blank=True,null=True,max_length=128)

    def __str__(self):
        return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password)

    class Meta:
        unique_together = ('username','password')


class HostUserBind(models.Model):
    """綁定主機和用戶"""
    host = models.ForeignKey("Host")
    host_user = models.ForeignKey("HostUser")

    def __str__(self):
        return "%s-%s" %(self.host,self.host_user)

    class Meta:
        unique_together = ('host','host_user')


class AuditLog(models.Model):
    """審計日誌"""
    session = models.ForeignKey("SessionLog")
    cmd = models.TextField()
    date = models.DateTimeField(auto_now_add=True)
    def __str__(self):
        return "%s-%s" %(self.session,self.cmd)


class SessionLog(models.Model):
    account = models.ForeignKey("Account")
    host_user_bind = models.ForeignKey("HostUserBind")
    start_date = models.DateTimeField(auto_now_add=True)
    end_date = models.DateTimeField(blank=True,null=True)

    def __str__(self):
        return "%s-%s" %(self.account,self.host_user_bind)


class Account(models.Model):
    """堡壘機帳戶
    1. 擴展
    2. 繼承
    user.account.host_user_bind
    """

    user = models.OneToOneField(User)
    name = models.CharField(max_length=64)

    host_user_binds = models.ManyToManyField("HostUserBind",blank=True)
    host_groups = models.ManyToManyField("HostGroup",blank=True)



class Token(models.Model):
    host_user_bind = models.ForeignKey("HostUserBind")
    val = models.CharField(max_length=128,unique=True)
    account = models.ForeignKey("Account")
    expire = models.IntegerField("超時時間(s)",default=300)
    date = models.DateTimeField(auto_now_add=True)
    def __str__(self):
        return "%s-%s" %(self.host_user_bind,self.val)
models.py
@login_required
def get_token(request):
    bind_host_id=request.POST.get('bind_host_id')
    time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300)  # 5mins ago
    exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id,
                                                   host_user_bind_id=bind_host_id,
                                                   date__gt=time_obj)
    if exist_token_objs:  # has token already
        token_data = {'token': exist_token_objs[0].val}
    else:
        token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8))

        token_obj=models.Token.objects.create(
            host_user_bind_id=bind_host_id,
            account=request.user.account,
            val=token_val)
        token_data={"token":token_val}

    return HttpResponse(json.dumps(token_data))
View.py
{% extends 'index.html' %}



{% block content-container %}
    <div id="page-title">
        <h1 class="page-header text-overflow">主機列表</h1>

        <!--Searchbox-->
        <div class="searchbox">
            <div class="input-group custom-search-form">
                <input type="text" class="form-control" placeholder="Search..">
                <span class="input-group-btn">
                    <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button>
                </span>
            </div>
        </div>
    </div>
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--End page title-->
        <!--Breadcrumb-->
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <ol class="breadcrumb">
        <li><a href="#">Home</a></li>
        <li><a href="#">Library</a></li>
        <li class="active">主機列表</li>
    </ol>
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--End breadcrumb-->

    <div id="page-content">

        <div class="panel col-lg-3">
            <div class="panel-heading">
                <h3 class="panel-title">主機組</h3>
            </div>
            <div class="panel-body">
                <ul class="list-group">
                {% for group in  request.user.account.host_groups.all %}

                    <li class="list-group-item " onclick="GetHostlist({{ group.id }},this)"><span class="badge badge-success">{{ group.host_user_binds.count }}</span>{{ group.name }}</li>
                {% endfor %}
                    <li class="list-group-item " onclick="GetHostlist(-1,this)"> <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span>未分組主機</li>

                </ul>
            </div>
        </div>
        <div class="panel col-lg-9">
            <div class="panel-heading">
                <h3 class="panel-title">主機列表</h3>
            </div>
            <div class="panel-body">

                <div class="table-responsive">
                    <table class="table table-striped">
                        <thead>
                            <tr>
                                <th>Hostname</th>
                                <th>IP</th>
                                <th>IDC</th>
                                <th>Port</th>
                                <th>Username</th>
                                <th>Login</th>
                                <th>Token</th>
                            </tr>
                        </thead>
                        <tbody id="hostlist">
{#                            <tr>#}
{#                                <td><a href="#fakelink" class="btn-link">Order #53451</a></td>#}
{#                                <td>Scott S. Calabrese</td>#}
{#                                <td>$24.98</td>#}
{#                            </tr>#}

                        </tbody>
                    </table>
                </div>

            </div>
        </div>

    </div>



<script>

function GetHostlist(gid,self) {

    $.get("{% url 'get_host_list' %}",{'gid':gid},function(callback){

        var data  = JSON.parse(callback);
        console.log(data);
        var trs = '';
        $.each(data,function (index,i) {
            var tr = "<tr><td>" + i.host__hostname + "</td><td>" + i.host__ip_addr +"</td><td>" + i.host__idc__name
                    +"</td><td>" + i.host__port  + "</td><td>" + i.host_user__username+ "</td><td><a class='btn btn-sm btn-info' onclick=GetToken(this,'"+i.id +"')>Token</a><a href='http://192.168.226.135:4200/' class='btn btn-sm btn-info'')>login</a></td><td ></td></tr>";
            trs += tr

        });
        $("#hostlist").html(trs);



    });//end get
    $(self).addClass("active").siblings().removeClass('active');

}

function GetToken(self,bind_host_id) {
    $.post(
        '{% url "get_token" %}',     //經過url別名渲染url
        {'bind_host_id':bind_host_id,'csrfmiddlewaretoken':"{{ csrf_token }}"},//請求攜帶的參數
        function (callback) {          //回調函數
            console.log(callback);
            var data = JSON.parse(callback); //django響應的數據
            $(self).parent().next().text(data.token);
        }

        )
}



</script>
{% endblock %}
hostlist.html
import subprocess,random,string,datetime
from django.contrib.auth import authenticate
from django.conf import settings
from audit import models
from audit.backend import ssh_interactive

class UserShell(object):
    """用戶登陸堡壘機後的shell"""

    def __init__(self,sys_argv):
        self.sys_argv = sys_argv
        self.user = None

    def auth(self):

        count = 0
        while count < 3:
            username = input("username:").strip()
            password = input("password:").strip()
            user = authenticate(username=username,password=password)
            #None 表明認證不成功
            #user object ,認證對象 ,user.name
            if not user:
                count += 1
                print("Invalid username or password!")
            else:
                self.user = user
                return  True
        else:
            print("too many attempts.")

    def token_auth(self):
        count = 0
        while count < 3:
            user_input = input("請輸入token:").strip()
            if len(user_input) == 0:
                return
            if len(user_input) != 8:
                print("token length is 8")
            else:
                time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300)  # 5mins ago
                token_obj = models.Token.objects.filter(val=user_input, date__gt=time_obj).first()
                if token_obj:
                    if token_obj.val == user_input:  # 口令對上了
                        self.user = token_obj.account.user #進入交互式shll須要用戶認證!
                        return token_obj
            count+=1
    def start(self):
        """啓動交互程序"""
        token_obj = self.token_auth()
        if token_obj:
            ssh_interactive.ssh_session(token_obj.host_user_bind, self.user)
            exit()
        if self.auth():
            #print(self.user.account.host_user_binds.all()) #select_related()
            while True:
                host_groups = self.user.account.host_groups.all()
                for index,group in enumerate(host_groups):
                    print("%s.\t%s[%s]"%(index,group,group.host_user_binds.count()))
                print("%s.\t未分組機器[%s]"%(len(host_groups),self.user.account.host_user_binds.count()))
                try:
                    choice = input("select group>:").strip()
                    if choice.isdigit():
                        choice = int(choice)
                        host_bind_list = None
                        if choice >=0 and choice < len(host_groups):
                            selected_group = host_groups[choice]
                            host_bind_list = selected_group.host_user_binds.all()
                        elif choice == len(host_groups): #選擇的未分組機器
                            #selected_group = self.user.account.host_user_binds.all()
                            host_bind_list = self.user.account.host_user_binds.all()
                        if host_bind_list:
                            while True:
                                for index,host in enumerate(host_bind_list):
                                    print("%s.\t%s"%(index,host,))
                                choice2 = input("select host>:").strip()
                                if choice2.isdigit():
                                    choice2 = int(choice2)
                                    if choice2 >=0 and choice2 < len(host_bind_list):
                                        selected_host = host_bind_list[choice2]

                                        ssh_interactive.ssh_session(selected_host,self.user)


                                        # s = string.ascii_lowercase +string.digits
                                        # random_tag = ''.join(random.sample(s,10))
                                        # session_obj = models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host)
                                        #
                                        # cmd = "sshpass -p %s /usr/local/openssh/bin/ssh %s@%s -p %s -o StrictHostKeyChecking=no -Z %s" %(selected_host.host_user.password,selected_host.host_user.username,selected_host.host.ip_addr,selected_host.host.port ,random_tag)
                                        # #start strace ,and sleep 1 random_tag, session_obj.id
                                        # session_tracker_script = "/bin/sh %s %s %s " %(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.id)
                                        #
                                        # session_tracker_obj =subprocess.Popen(session_tracker_script, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
                                        #
                                        # ssh_channel = subprocess.run(cmd,shell=True)
                                        # print(session_tracker_obj.stdout.read(), session_tracker_obj.stderr.read())
                                        #
                                elif choice2 == 'b':
                                    break

                except KeyboardInterrupt as e :
                    pass
user_interactive.py

 

 

 

 

 

 3、經過堡壘機批量執行Linux命令

 

 

 

1.批量執行命令前端頁面

{% extends 'index.html' %}



{% block content-container %}
{#    {% csrf_token %}#}
    <div id="page-title">
        <h1 class="page-header text-overflow">主機列表</h1>

        <!--Searchbox-->
        <div class="searchbox">
            <div class="input-group custom-search-form">
                <input type="text" class="form-control" placeholder="Search..">
                <span class="input-group-btn">
                    <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button>
                </span>
            </div>
        </div>
    </div>
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--End page title-->
        <!--Breadcrumb-->
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <ol class="breadcrumb">
        <li><a href="#">Home</a></li>
        <li><a href="#">Library</a></li>
        <li class="active">主機列表</li>
    </ol>
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--End breadcrumb-->

    <div id="page-content">
        <div class="panel col-lg-3">
            <div class="panel-heading">
                <h3 class="panel-title">主機組 <span id="selected_hosts"></span></h3>
            </div>
            <div class="panel-body">

                <ul class="list-group" id="host_groups">
                {% for group in  request.user.account.host_groups.all %}

                    <li class="list-group-item " ><span class="badge badge-success">{{ group.host_user_binds.count }}</span>
                        <input type="checkbox" onclick="CheckAll(this)">
                        <a onclick="DisplayHostList(this)">{{ group.name }}</a>  <!--點擊組名,組名下的 主機列表經過toggleclass 展現/隱藏 -->
                        <ul class="hide">
                            {% for bind_host in group.host_user_binds.all %}
                                <li><input onclick="ShowCheckedHostCount()" type="checkbox" value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li>
                            {% endfor %}
                        </ul>
                    </li>

                {% endfor %}
                    <li class="list-group-item " > <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span>
                       <input type="checkbox" onclick="CheckAll(this)">
                        <a onclick="DisplayHostList(this)">未分組主機</a>
                        <ul class="hide">
                            {% for bind_host in request.user.account.host_user_binds.all %}
                                <li><input onclick="ShowCheckedHostCount()" type="checkbox" value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li>
                            {% endfor %}
                        </ul>
                    </li>

                </ul>



            </div>
        </div>

        <div class="col-lg-9">
            <div class="panel">
                <div class="panel-heading">
                    <h3 class="panel-title">命令</h3>
                </div>
                <div class="panel-body">
                    <textarea class="form-control" id="cmd"></textarea>
                    <button onclick="PostTask('cmd')" class="btn btn-info pull-right">執行</button>
                    <button  class="btn btn-danger ">終止</button>

                </div>
            </div>
            <div class="panel">
                <div class="panel-heading">
                    <h3 class="panel-title">任務結果</h3>
                </div>
                <div class="panel-body">

                    <div id="task_result">
                </div>
            </div>
        </div>

        </div>
    </div>


<script>
    function  DisplayHostList(self) {
        $(self).next().toggleClass("hide");
    }

    function CheckAll(self){
        console.log($(self).prop('checked'));
        $(self).parent().find("ul :checkbox").prop('checked',$(self).prop('checked'));

        ShowCheckedHostCount()
    }

    function ShowCheckedHostCount(){
        var selected_host_count = $("#host_groups ul").find(":checked").length;
        console.log(selected_host_count);
        $("#selected_hosts").text(selected_host_count);
        return selected_host_count
    }


{#    function GetTaskResult(task_id) {#}
{#        $.getJSON("{% url 'get_task_result' %}",{'task_id':task_id},function(callback){#}
{##}
{#            console.log(callback);#}
{##}
{#            var result_ele = '';#}
{#            $.each(callback,function (index,i) {#}
{#                var p_ele = "<p>" + i.host_user_bind__host__hostname + "(" +i.host_user_bind__host__ip_addr +") ------" +#}
{#                    i.status + "</p>";#}
{#                var res_ele = "<pre>" + i.result +"</pre>";#}
{##}
{#                var single_result = p_ele + res_ele;#}
{#                result_ele += single_result#}
{#            });#}
{##}
{#            $("#task_result").html(result_ele)#}
{##}
{##}
{#        });//end getJSON#}
{##}
{#    }#}


    function  PostTask(task_type) {
        //1. 驗證主機列表已選,命令已輸入
        //2. 提交任務到後臺
        var selected_host_ids = [];
        var selected_host_eles = $("#host_groups ul").find(":checked");
        $.each(selected_host_eles,function (index,ele) {
            selected_host_ids.push($(ele).val())
        });
        console.log(selected_host_ids);
        if ( selected_host_ids.length == 0){
            alert("主機未選擇!");
            return false;
        }
        var cmd_text = $.trim($("#cmd").val());
        if ( cmd_text.length == 0){
            alert("未輸入命令!");
            return false;

        }


        var task_data = {
            'task_type':task_type,
            'selected_host_ids': selected_host_ids,
            'cmd': cmd_text
        };

        $.post("{% url 'multitask' %}",{'csrfmiddlewaretoken':"{{ csrf_token }}",'task_data':JSON.stringify(task_data)},
            function(callback){
                    console.log(callback) ;// task id
                    var callback = JSON.parse(callback);

                    GetTaskResult(callback.task_id);
                    var result_timer = setInterval(function () {
                        GetTaskResult(callback.task_id)
                    },2000)


            } );//end post

    }
</script>
{% endblock %}
multi_cmd.html

 

2.前端收集批量執行的主機,經過ajax發送到後臺

@login_required
def multitask(request):
    task_obj = task_handler.Task(request)
    respose=HttpResponse(json.dumps(task_obj.errors))
    if task_obj.is_valid():      # 若是驗證成功
        result = task_obj.run()  #run()去選擇要執行的任務類型,而後經過 getattr()去執行
        respose=HttpResponse(json.dumps({'task_id':result})) #返回數據庫pk task_id

    return respose
views.py

 

3.後端經過is_valid方法驗證數據的合法性

 

4.驗證失敗響應前端self.errors信息,驗證成功執行run()選擇任務類型;

 

5.選擇任務類型(cmd/files_transfer)以後初始化數據庫(更新Task、TaskLog表數據)

 

6.cmd/files_transfer方法開啓新進程(multitask_execute.py)新進程開啓進程池 去執行批量命令;

 

7.前端使用定時器不斷去後臺獲取數據;

 

8.程序中斷按鈕

"""
Django settings for zhanggen_audit project.

Generated by 'django-admin startproject' using Django 1.11.4.

For more information on this file, see
https://docs.djangoproject.com/en/1.11/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.11/ref/settings/
"""

import os

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '5ivlngau4a@_3y4vizrcxnnj(&vz2en#edpq%i&jr%99-xxv)&'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

ALLOWED_HOSTS = ['*']


# Application definition

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'audit.apps.AuditConfig',
]

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

ROOT_URLCONF = 'zhanggen_audit.urls'

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [os.path.join(BASE_DIR,  'templates'),],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    },
]

WSGI_APPLICATION = 'zhanggen_audit.wsgi.application'


# Database
# https://docs.djangoproject.com/en/1.11/ref/settings/#databases

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
    }
}


# Password validation
# https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators

AUTH_PASSWORD_VALIDATORS = [
    {
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
    },
]


# Internationalization
# https://docs.djangoproject.com/en/1.11/topics/i18n/

LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai'

USE_I18N = True

USE_L10N = True

USE_TZ = True


# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.11/howto/static-files/


STATIC_URL = '/static/'
STATICFILES_DIRS=(
os.path.join(BASE_DIR,'static'),
)


SESSION_TRACKER_SCRIPT=os.path.join(BASE_DIR,'audit%sbackend%ssession_check.sh')%(os.sep,os.sep) 
SESSION_TRACKER_SCRIPT_LOG_PATH=os.path.join(BASE_DIR,'log')#日誌路徑
MULTI_TASK_SCRIPT = os.path.join(BASE_DIR,'multitask_execute.py') #腳本路徑
 
CURRENT_PGID=None #進程的 pgid
settings.py
"""zhanggen_audit URL Configuration

The `urlpatterns` list routes URLs to views. For more information please see:
    https://docs.djangoproject.com/en/1.11/topics/http/urls/
Examples:
Function views
    1. Add an import:  from my_app import views
    2. Add a URL to urlpatterns:  url(r'^$', views.home, name='home')
Class-based views
    1. Add an import:  from other_app.views import Home
    2. Add a URL to urlpatterns:  url(r'^$', Home.as_view(), name='home')
Including another URLconf
    1. Import the include() function: from django.conf.urls import url, include
    2. Add a URL to urlpatterns:  url(r'^blog/', include('blog.urls'))
"""
from django.conf.urls import url
from django.contrib import admin
from audit import views

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^$', views.index ),
    url(r'^login/$', views.acc_login ),
    url(r'^logout/$', views.acc_logout ),
    url(r'^hostlist/$', views.host_list ,name="host_list"),
    url(r'^multitask/$', views.multitask ,name="multitask"),
    url(r'^multitask/result/$', views.multitask_result ,name="get_task_result"),
    url(r'^multitask/cmd/$', views.multi_cmd ,name="multi_cmd"),
    url(r'^multitask/file_transfer/$', views.multi_file_transfer ,name="multi_file_transfer"),
    url(r'^api/hostlist/$', views.get_host_list ,name="get_host_list"),
    url(r'^api/token/$', views.get_token ,name="get_token"),
    url(r'^api/task/file_upload/$', views.task_file_upload ,name="task_file_upload"),
    url(r'^api/task/file_download/$', views.task_file_download ,name="task_file_download"),
    url(r'^end_cmd/$', views.end_cmd,name="end_cmd"),

]
urls.py
from django.shortcuts import render,redirect,HttpResponse
from django.contrib.auth import authenticate,login,logout
from django.contrib.auth.decorators import login_required
from django.views.decorators.csrf import csrf_exempt
from django.conf import settings
import signal

import json,os
from audit import models
import random,string
import datetime
from audit import task_handler
from django import conf
import zipfile
from wsgiref.util import FileWrapper #from django.core.servers.basehttp import FileWrapper

@login_required
def index(request):
    return render(request,'index.html')



def acc_login(request):
    error = ''
    if request.method == "POST":
        username = request.POST.get('username')
        password = request.POST.get('password')
        user = authenticate(username=username,password=password)
        if user:
            login(request, user)
            return  redirect(request.GET.get('next') or  '/')
        else:
            error = "Wrong username or password!"
    return render(request,'login.html',{'error':error })


@login_required
def acc_logout(request):
    logout(request)

    return  redirect('/login/')

@login_required
def host_list(request):

    return render(request,'hostlist.html')


@login_required
def get_host_list(request):
    gid = request.GET.get('gid')
    if gid:
        if gid == '-1':#未分組
            host_list = request.user.account.host_user_binds.all()
        else:
            group_obj = request.user.account.host_groups.get(id=gid)
            host_list = group_obj.host_user_binds.all()

        data = json.dumps(list(host_list.values('id','host__hostname','host__ip_addr','host__idc__name','host__port',
                                'host_user__username')))
        return HttpResponse(data)

@login_required
def get_token(request):
    bind_host_id=request.POST.get('bind_host_id')
    time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300)  # 5mins ago
    exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id,
                                                   host_user_bind_id=bind_host_id,
                                                   date__gt=time_obj)
    if exist_token_objs:  # has token already
        token_data = {'token': exist_token_objs[0].val}
    else:
        token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8))

        token_obj=models.Token.objects.create(
            host_user_bind_id=bind_host_id,
            account=request.user.account,
            val=token_val)
        token_data={"token":token_val}

    return HttpResponse(json.dumps(token_data))



@login_required
def multi_cmd(request):
    """多命令執行頁面"""
    return render(request,'multi_cmd.html')


@login_required
def multitask(request):
    task_obj = task_handler.Task(request)
    respose=HttpResponse(json.dumps(task_obj.errors))
    if task_obj.is_valid():      # 若是驗證成功
        task_obj = task_obj.run()  #run()去選擇要執行的任務類型,而後經過 getattr()去執行
        respose=HttpResponse(json.dumps({'task_id':task_obj.id,'timeout':task_obj.timeout})) #返回數據庫pk task_id

    return respose


@login_required
def multitask_result(request):
    """多任務結果"""
    task_id = request.GET.get('task_id')
    # [ {
    #     'task_log_id':23.
    #     'hostname':
    #     'ipaddr'
    #     'username'
    #     'status'
    # } ]


    task_obj = models.Task.objects.get(id=task_id)

    results = list(task_obj.tasklog_set.values('id','status',
                                'host_user_bind__host__hostname',
                                'host_user_bind__host__ip_addr',
                                'result'
                                ))

    return HttpResponse(json.dumps(results))





@login_required
def multi_file_transfer(request):
    random_str = ''.join(random.sample(string.ascii_lowercase + string.digits, 8))
    #return render(request,'multi_file_transfer.html',{'random_str':random_str})
    return render(request,'multi_file_transfer.html',locals())

@login_required
@csrf_exempt
def task_file_upload(request):
    random_str = request.GET.get('random_str')
    upload_to = "%s/%s/%s" %(conf.settings.FILE_UPLOADS,request.user.account.id,random_str)
    if not os.path.isdir(upload_to):
        os.makedirs(upload_to,exist_ok=True)

    file_obj = request.FILES.get('file')
    f = open("%s/%s"%(upload_to,file_obj.name),'wb')
    for chunk in file_obj.chunks():
        f.write(chunk)
    f.close()
    print(file_obj)

    return HttpResponse(json.dumps({'status':0}))




def send_zipfile(request,task_id,file_path):
    """
    Create a ZIP file on disk and transmit it in chunks of 8KB,
    without loading the whole file into memory. A similar approach can
    be used for large dynamic PDF files.
    """
    zip_file_name = 'task_id_%s_files' % task_id
    archive = zipfile.ZipFile(zip_file_name , 'w', zipfile.ZIP_DEFLATED)
    file_list = os.listdir(file_path)
    for filename in file_list:
        archive.write('%s/%s' %(file_path,filename),arcname=filename)
    archive.close()


    wrapper = FileWrapper(open(zip_file_name,'rb'))
    response = HttpResponse(wrapper, content_type='application/zip')
    response['Content-Disposition'] = 'attachment; filename=%s.zip' % zip_file_name
    response['Content-Length'] = os.path.getsize(zip_file_name)
    #temp.seek(0)
    return response

@login_required
def task_file_download(request):
    task_id = request.GET.get('task_id')
    print(task_id)
    task_file_path = "%s/%s"%( conf.settings.FILE_DOWNLOADS,task_id)
    return send_zipfile(request,task_id,task_file_path)


def end_cmd(request):
    current_task_pgid=settings.CURRENT_PGID
    os.killpg(current_task_pgid,signal.SIGKILL)
    return HttpResponse(current_task_pgid)
views.py
import json,subprocess,os,signal
from audit import models
from django.conf import settings
from django.db.transaction import atomic
class Task(object):
    '''  '''
    def __init__(self,request):
        self.request=request
        self.errors=[]
        self.task_data=None

    def is_valid(self):
        task_data=self.request.POST.get('task_data')#{"task_type":"cmd","selected_host_ids":["1","2"],"cmd":"DF"}
        if task_data:
            self.task_data=json.loads(task_data)
            self.task_type=self.task_data.get('task_type')
            if self.task_type == 'cmd':
                selected_host_ids=self.task_data.get('selected_host_ids')
                if selected_host_ids:
                    return True
                self.errors.append({'invalid_argument': '命令/主機不存在'})

            elif self.task_type == 'files_transfer':
                selected_host_ids =self.task_data.get('selected_host_ids')
                pass
                #驗證文件路徑


            else:
                self.errors.append({'invalid_argument': '不支持的任務類型!'})
        self.errors.append({'invalid_data': 'task_data不存在!'})

    def run(self):
        task_func = getattr(self, self.task_data.get('task_type'))  #
        task_obj = task_func() #調用執行命令
        print(task_obj.pk)  # 100 #這裏是任務id是自增的
        return task_obj


    @atomic #事物操做 任務信息和 子任務都要同時建立完成!
    def cmd(self):
        task_obj=models.Task.objects.create(
            task_type=0,
            account=self.request.user.account,
            content=self.task_data.get('cmd'),
        ) #1.增長批量任務信息,並返回批量任務信息的 pk


        tasklog_objs=[] #2.增長子任務信息(初始化數據庫)
        host_ids = set(self.task_data.get("selected_host_ids"))  # 獲取選中的主機id,並用集合去重
        for host_id in host_ids:
            tasklog_objs.append(models.TaskLog(task_id=task_obj.id,
                               host_user_bind_id=host_id,
                               status = 3))
        models.TaskLog.objects.bulk_create(tasklog_objs,100)  # 沒100條記錄 commit 1次!

        task_id=task_obj.pk
        cmd_str = "python3 %s %s" % (settings.MULTI_TASK_SCRIPT,task_id)  # 執行multitask.py腳本路徑
        print('------------------>',cmd_str)
        multitask_obj = subprocess.Popen(cmd_str,stdout=subprocess.PIPE,shell=True,stderr=subprocess.PIPE) #新打開1個新進程
        settings.CURRENT_PGID=os.getpgid(multitask_obj.pid) #os.getpgid(multitask_obj.pid)

        # os.killpg(pgid=pgid,sig=signal.SIGKILL)

        # print(multitask_obj.stderr.read().decode('utf-8') or multitask_obj.stdout.read().decode('utf-8'))
        #print("task result :",multitask_obj.stdout.read().decode('utf-8'),multitask_obj.stderr.read().decode('utf-8'))
        # print(multitask_obj.stdout.read())

        # for host_id in self.task_data.get('selected_host_ids'):
        #     t=Thread(target=self.run_cmd,args=(host_id,self.task_data.get('cmd')))
        #     t.start()

        return task_obj

    def run_cmd(self,host_id,cmd):
        pass

    def files_transfer(self):
        pass
task_handler.py
{% extends 'index.html' %}



{% block content-container %}
    {#    {% csrf_token %}#}
    <div id="page-title">
        <h1 class="page-header text-overflow">主機列表</h1>

        <!--Searchbox-->
        <div class="searchbox">
            <div class="input-group custom-search-form">
                <input type="text" class="form-control" placeholder="Search..">
                <span class="input-group-btn">
                    <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button>
                </span>
            </div>
        </div>
    </div>
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--End page title-->
    <!--Breadcrumb-->
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <ol class="breadcrumb">
        <li><a href="#">Home</a></li>
        <li><a href="#">Library</a></li>
        <li class="active">主機列表</li>
    </ol>
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--End breadcrumb-->

    <div id="page-content">
        <div class="panel col-lg-3">
            <div class="panel-heading">
                <h3 class="panel-title">主機組 <span id="selected_hosts"></span></h3>
            </div>
            <div class="panel-body">

                <ul class="list-group" id="host_groups">
                    {% for group in  request.user.account.host_groups.all %}

                        <li class="list-group-item "><span
                                class="badge badge-success">{{ group.host_user_binds.count }}</span>
                            <input type="checkbox" onclick="CheckAll(this)">
                            <a onclick="DisplayHostList(this)">{{ group.name }}</a>
                            <!--點擊組名,組名下的 主機列表經過toggleclass 展現/隱藏 -->
                            <ul class="hide">
                                {% for bind_host in group.host_user_binds.all %}
                                    <li><input onclick="ShowCheckedHostCount()" type="checkbox"
                                               value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li>
                                {% endfor %}
                            </ul>
                        </li>

                    {% endfor %}
                    <li class="list-group-item "><span
                            class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span>
                        <input type="checkbox" onclick="CheckAll(this)">
                        <a onclick="DisplayHostList(this)">未分組主機</a>
                        <ul class="hide">
                            {% for bind_host in request.user.account.host_user_binds.all %}
                                <li><input onclick="ShowCheckedHostCount()" type="checkbox"
                                           value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li>
                            {% endfor %}
                        </ul>
                    </li>

                </ul>


            </div>
        </div>

        <div class="col-lg-9">
            <div class="panel">
                <div class="panel-heading">
                    <h3 class="panel-title">命令</h3>
                </div>
                <div class="panel-body">
                    <textarea class="form-control" id="cmd"></textarea>
                    <button onclick="PostTask('cmd')" class="btn btn-info pull-right">執行</button>
                    <button class="btn btn-danger" onclick="End()">終止</button>

                </div>

            </div>

            <div id="task_result_panel" class="panel">
                <div class="panel-heading">
                    <h3 class="panel-title">任務結果</h3>
                </div>
                <div class="panel-body">
                    <div class="progress">
                        <div id='task_progress' style="width: 0%;" class="progress-bar progress-bar-info"></div>
                    </div>
                    <div id="task_result"></div>

                </div>
            </div>

        </div>


        <script>
            function DisplayHostList(self) {
                $(self).next().toggleClass("hide");
            }

            function CheckAll(self) {
                console.log($(self).prop('checked'));
                $(self).parent().find("ul :checkbox").prop('checked', $(self).prop('checked'));

                ShowCheckedHostCount()
            }

            function ShowCheckedHostCount() {
                var selected_host_count = $("#host_groups ul").find(":checked").length;
                console.log(selected_host_count);
                $("#selected_hosts").text(selected_host_count);
                return selected_host_count
            }


            function GetTaskResult(task_id, task_timeout) {
                $.getJSON("{% url 'get_task_result' %}", {'task_id': task_id}, function (callback) {
                        console.log(callback);
                        var result_ele = '';
                        var all_task_finished = true;   //所有完成flag
                        var finished_task_count = 0;   //已完成的任務數量
                        $.each(callback, function (index, i) {
                            var p_ele = "<p>" + i.host_user_bind__host__hostname + "(" + i.host_user_bind__host__ip_addr + ") ------" +
                                i.status + "</p>";
                            var res_ele = "<pre>" + i.result + "</pre>"; //<pre> 標籤按後端格式顯示數據

                            var single_result = p_ele + res_ele;
                            result_ele += single_result;

                            if (i.status == 3) {
                                all_task_finished = false;
                            } else {
                                //task not finished yet
                                finished_task_count += 1;

                            }

                        });

                        if (task_timeout_counter < task_timeout) {
                            task_timeout_counter += 2;
                        }
                        else {
                            all_task_finished = true
                        }
                        if (all_task_finished) {   //完成!

                            clearInterval(result_timer);
                             var unexecuted =callback.length-finished_task_count;
                            $.niftyNoty({   //提示超時
                                type: 'danger',
                                container: '#task_result_panel',
                                html: '<h4 id="Prompt">'+'執行:'+callback.length +'  '+ '完成:'+finished_task_count+'  '+'失敗:'+ unexecuted +'</h4>',
                                closeBtn: false
                            });
                            console.log("timmer canceled....")
                        }
                        $("#task_result").html(result_ele);

                        var total_finished_percent = parseInt(finished_task_count / callback.length * 100);
                        $("#task_progress").text(total_finished_percent + "%");
                        $("#task_progress").css("width", total_finished_percent + "%");


                    }
                )
                ;//end getJSON

            }


            function PostTask(task_type) {
                //1. 驗證主機列表已選,命令已輸入
                //2. 提交任務到後臺
                $('.alert').remove();
                var selected_host_ids = [];
                var selected_host_eles = $("#host_groups ul").find(":checked");
                $.each(selected_host_eles, function (index, ele) {
                    selected_host_ids.push($(ele).val())
                });
                console.log(selected_host_ids);
                if (selected_host_ids.length == 0) {
                    alert("主機未選擇!");
                    return false;
                }
                var cmd_text = $.trim($("#cmd").val());
                if (cmd_text.length == 0) {
                    alert("未輸入命令!");
                    return false;

                }


                var task_data = {
                    'task_type': task_type,
                    'selected_host_ids': selected_host_ids,
                    'cmd': cmd_text
                };

                $.post("{% url 'multitask' %}", {
                        'csrfmiddlewaretoken': "{{ csrf_token }}",
                        'task_data': JSON.stringify(task_data)
                    },
                    function (callback) {
                        console.log(callback);// task id
                        var callback = JSON.parse(callback);

                        task_timeout_counter = 0;// add 2 during each call of GetTaskResult

                        GetTaskResult(callback.task_id, callback.timeout); //那批量任務ID 去獲取子任務的進展!那超時時間作對比

                        result_timer = setInterval(function () {
                            GetTaskResult(callback.task_id, callback.timeout)
                        }, 2000);

                        //diplay download file btn
                        $("#file-download-btn").removeClass("hide").attr('href', "{% url 'task_file_download' %}?task_id=" + callback.task_id);


                    });//end post


            }

            function End(){
                 $.getJSON("{% url 'end_cmd' %}", function (callback) {
                     console.log(callback)
                 })
            }
        </script>
{% endblock %}
multi_cmd.html
import time
import sys,os
import multiprocessing
import paramiko

def cmd_run(tasklog_id,cmd_str):
    try:
        import django
        django.setup()
        from audit import models
        tasklog_obj = models.TaskLog.objects.get(id=tasklog_id)
        print(tasklog_obj, cmd_str)
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(tasklog_obj.host_user_bind.host.ip_addr,
                    tasklog_obj.host_user_bind.host.port,
                    tasklog_obj.host_user_bind.host_user.username,
                    tasklog_obj.host_user_bind.host_user.password,
                    timeout=15) #配置超時時間15秒!
        stdin, stdout, stderr = ssh.exec_command(cmd_str)
        result = stdout.read() + stderr.read()
        print('---------%s--------' % tasklog_obj.host_user_bind)
        print(result)
        ssh.close()
        tasklog_obj.result = result or 'cmd has no result output .'#若是沒有 返回結果 /出現錯誤
        tasklog_obj.status = 0
        tasklog_obj.save()
    except Exception as e:
        print(e)

def file_transfer(bind_host_obj):
    pass


if __name__ == '__main__':
    BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    sys.path.append(BASE_DIR)
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "zhanggen_audit.settings")
    import django
    django.setup()

    from audit import models
    task_id = sys.argv[1]
    from audit import models
    task_id=int(sys.argv[1])
    # 1. 根據Taskid拿到任務對象,
    # 2. 拿到任務關聯的全部主機
    # 3.  根據任務類型調用多進程 執行不一樣的方法
    # 4 . 每一個子任務執行完畢後,本身把 子任務執行結果 寫入數據庫 TaskLog表
    task_obj = models.Task.objects.get(id=task_id)
    pool=multiprocessing.Pool(processes=10) #開啓 1個擁有10個進程的進程池


    if task_obj.task_type == 0:
        task_func=cmd_run
    else:
        task_func =file_transfer

    for task_log in task_obj.tasklog_set.all(): #查詢子任務信息,並更新子任務,進入執行階段!
        pool.apply_async(task_func,args=(task_log.pk,task_obj.content)) #開啓子進程,把子任務信息的pk、和 批量任務的命令傳進去!

    pool.close()
    pool.join()
multitask_execute.py

 

 4、經過堡壘機批量上傳和下載文件

 1.上傳本地文件至多臺服務器(批量上傳)

 

 

每次訪問批量上傳頁面上傳惟一字符串

使用filedropzone組件作批量上傳ul,並限制文件大小、個數,文件提交後端時攜帶 惟一字符串

後端生成   /固定上傳路徑/用戶ID/惟一字符串/文件的路徑,並寫入文件;(filedropzone組件把文件拖拽進去以後,自動上傳)

前端點擊執行 驗證堡壘機上的用戶上傳路徑是否合法,而後開啓多進程 分別經過paramiko去發送至遠程服務的路徑

 

"""
Django settings for zhanggen_audit project.

Generated by 'django-admin startproject' using Django 1.11.4.

For more information on this file, see
https://docs.djangoproject.com/en/1.11/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.11/ref/settings/
"""

import os

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '5ivlngau4a@_3y4vizrcxnnj(&vz2en#edpq%i&jr%99-xxv)&'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

ALLOWED_HOSTS = ['*']


# Application definition

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'audit.apps.AuditConfig',
]

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

ROOT_URLCONF = 'zhanggen_audit.urls'

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [os.path.join(BASE_DIR,  'templates'),],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    },
]

WSGI_APPLICATION = 'zhanggen_audit.wsgi.application'


# Database
# https://docs.djangoproject.com/en/1.11/ref/settings/#databases

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
    }
}


# Password validation
# https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators

AUTH_PASSWORD_VALIDATORS = [
    {
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
    },
]


# Internationalization
# https://docs.djangoproject.com/en/1.11/topics/i18n/

LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai'

USE_I18N = True

USE_L10N = True

USE_TZ = True


# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.11/howto/static-files/


STATIC_URL = '/static/'
STATICFILES_DIRS=(
os.path.join(BASE_DIR,'static'),
)


SESSION_TRACKER_SCRIPT=os.path.join(BASE_DIR,'audit%sbackend%ssession_check.sh')%(os.sep,os.sep)
SESSION_TRACKER_SCRIPT_LOG_PATH=os.path.join(BASE_DIR,'log')#日誌路徑
MULTI_TASK_SCRIPT = os.path.join(BASE_DIR,'multitask_execute.py') #腳本路徑

CURRENT_PGID=None #進程的 pgid
FILE_UPLOADS = os.path.join(BASE_DIR,'uploads')     #上傳文件的堡壘機路徑
FILE_DOWNLOADS = os.path.join(BASE_DIR,'downloads') #下載文件的堡壘機路徑
配置堡壘機上傳和下載文件的路徑
<script>
    function  DisplayHostList(self) {
        $(self).next().toggleClass("hide");
    }

    function CheckAll(self){
        console.log($(self).prop('checked'));
        $(self).parent().find("ul :checkbox").prop('checked',$(self).prop('checked'));

        ShowCheckedHostCount()
    }

    function ShowCheckedHostCount(){
        var selected_host_count = $("#host_groups ul").find(":checked").length
        console.log(selected_host_count);
        $("#selected_hosts").text(selected_host_count);
        return selected_host_count
    }


    function GetTaskResult(task_id,task_timeout) {
        $.getJSON("{% url 'get_task_result' %}",{'task_id':task_id},function(callback){

            console.log(callback)

            var result_ele = ''
            var all_task_finished = true
            var finished_task_count = 0 ;
            $.each(callback,function (index,i) {
                var p_ele = "<p>" + i.host_user_bind__host__hostname + "(" +i.host_user_bind__host__ip_addr +") ------" +
                    i.status + "</p>";
                var res_ele = "<pre>" + i.result +"</pre>";

                var single_result = p_ele + res_ele;
                result_ele += single_result;

                //check if ths sub task is finished.
                if ( i.status == 3){
                    all_task_finished = false;
                }else {
                    //task not finished yet
                    finished_task_count += 1;
                }

            });//end each
            //check if the task_timer_count < task_timeout, otherwise it means the task is timedout, setInterval function need to be cancelled
            if (task_timeout_counter < task_timeout){
                // not timed out yet
                task_timeout_counter += 2;

            }else {
                all_task_finished = true; // set all task to be finished ,because it 's already reached the global timeout

                $.niftyNoty({
                    type: 'danger',
                    container : '#task_result_panel',
                    html : '<h4 class="alert-title">Task timed out!</h4><p class="alert-message">The task has timed out!</p><div class="mar-top"><button type="button" class="btn btn-info" data-dismiss="noty">Close this notification</button></div>',
                    closeBtn : false
                });
            }

            if ( all_task_finished){
                clearInterval(result_timer);
                console.log("timmer canceled....")
            }


            $("#task_result").html(result_ele);
            // set progress bar
            var total_finished_percent = parseInt(finished_task_count / callback.length * 100 );
            $("#task_progress").text(total_finished_percent+"%");
            $("#task_progress").css("width",total_finished_percent +"%");
        });//end getJSON

    }


    function  PostTask(task_type) {
        //1. 驗證主機列表已選,命令已輸入
        //2. 提交任務到後臺
        var selected_host_ids = [];
        var selected_host_eles = $("#host_groups ul").find(":checked")
        $.each(selected_host_eles,function (index,ele) {
            selected_host_ids.push($(ele).val())
        });
        console.log(selected_host_ids)
        if ( selected_host_ids.length == 0){
            alert("主機未選擇!")
            return false
        }

        if ( task_type == 'cmd'){
            var cmd_text = $.trim($("#cmd").val())
            if ( cmd_text.length == 0){
                alert("未輸入命令!")
                return false

            }
        }else {
            //file_transfer
            var remote_path = $("#remote_path").val();
            if ($.trim(remote_path).length == 0){
                alert("必須輸入1個遠程路徑")
                return false
            }
        }



        var task_data = {
            'task_type':task_type,
            'selected_host_ids': selected_host_ids,
            //'cmd': cmd_text
        };
        if ( task_type == 'cmd'){
            task_data['cmd'] =  cmd_text

        }else {

            var file_transfer_type = $("select[name='transfer-type']").val();
            task_data['file_transfer_type'] = file_transfer_type;
            task_data['random_str'] = "{{ random_str }}";
            task_data['remote_path'] = $("#remote_path").val();


        }


        $.post("{% url 'multitask' %}",{'csrfmiddlewaretoken':"{{ csrf_token }}",'task_data':JSON.stringify(task_data)},
            function(callback){
                    console.log(callback) ;// task id
                    var callback = JSON.parse(callback);

                    GetTaskResult(callback.task_id,callback.timeout);
                    task_timeout_counter = 0; // add 2 during each call of GetTaskResult
                    result_timer = setInterval(function () {
                        GetTaskResult(callback.task_id,callback.timeout)
                    },2000);

                    //diplay download file btn
                    $("#file-download-btn").removeClass("hide").attr('href', "{% url 'task_file_download' %}?task_id="+callback.task_id);


            } );//end post

    }
</script>
multi_file_transfer.html
{% extends 'index.html' %}
{% block extra-css %}
    <link href="/static/plugins/dropzone/dropzone.css" rel="stylesheet">
    <script src="/static/plugins/dropzone/dropzone.js"></script>
{% endblock %}


{% block content-container %}
    <div id="page-title">
        <h1 class="page-header text-overflow">主機列表</h1>

        <!--Searchbox-->
        <div class="searchbox">
            <div class="input-group custom-search-form">
                <input type="text" class="form-control" placeholder="Search..">
                <span class="input-group-btn">
                    <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button>
                </span>
            </div>
        </div>
    </div>
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--End page title-->
    <!--Breadcrumb-->
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <ol class="breadcrumb">
        <li><a href="#">Home</a></li>
        <li><a href="#">Library</a></li>
        <li class="active">主機列表</li>
    </ol>
    <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
    <!--End breadcrumb-->
    <div id="page-content">
        {% include 'components/hostgroups.html' %}
        <div class="col-lg-9">
            <div class="panel">
                <div class="panel-heading">
                    <h3 class="panel-title">文件傳輸</h3>
                </div>
                <div class="panel-body">
                    <select name="transfer-type" onchange="ToggleUploadEle(this)">
                        <option value="send">發送文件到遠程主機</option>
                        <option value="get">from遠程主機下載文件</option>
                    </select>


                    <form id="filedropzone" class="dropzone">

                    </form>
                    {#                    <input type="hidden" value="{{ random_str }}" name="random_str">#}
                    <input id="remote_path" class="form-control" type="text" placeholder="遠程路徑">

                    <button id="file_count" onclick="PostTask('file_transfer')" class="btn btn-info pull-right">執行</button>
                    <button class="btn btn-danger ">終止</button>
                    <a id="file-download-btn" class="btn btn-info hide" href="">下載任務文件到本地</a>


                </div>
            </div>
            {% include 'components/taskresult.html' %}
        </div>

    </div>
    </div>

    {% include 'components/multitask_js.html' %}
    <script>

        $('#filedropzone').dropzone({
            url: "{% url 'task_file_upload' %}?random_str={{ random_str }}", //必須填寫
            method: "post",  //也可用put
            maxFiles: 10,//一次性上傳的文件數量上限
            maxFilesize: 2, //MB
            //acceptedFiles: ".jpg,.gif,.png"//限制上傳的類型
            dictMaxFilesExceeded: "您最多隻能上傳10個文件!",
            dictFileTooBig: "文件過大上傳文件最大支持."
            /*
            init: function () {
                this.on("success", function (file) { //文件上傳成功觸發事件
                    $('#file_count').attr('file_count')
                });
            }
            */

        });
        Dropzone.autoDiscover = false;


        function ToggleUploadEle(self) {

            console.log($(self).val());
            if ($(self).val() == 'get') {
                $(self).next().addClass("hide")
            } else {
                $(self).next().removeClass('hide')
            }

        }

    </script>

{% endblock %}
multi_file_transfer.html
from django.conf.urls import url
from django.contrib import admin
from audit import views

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^$', views.index ),
    url(r'^login/$', views.acc_login ),
    url(r'^logout/$', views.acc_logout ),
    url(r'^hostlist/$', views.host_list ,name="host_list"),
    url(r'^multitask/$', views.multitask ,name="multitask"),
    url(r'^multitask/result/$', views.multitask_result ,name="get_task_result"),
    url(r'^multitask/cmd/$', views.multi_cmd ,name="multi_cmd"),
    url(r'^api/hostlist/$', views.get_host_list ,name="get_host_list"),
    url(r'^api/token/$', views.get_token ,name="get_token"),
    url(r'^multitask/file_transfer/$', views.multi_file_transfer, name="multi_file_transfer"),
    url(r'^api/task/file_upload/$', views.task_file_upload ,name="task_file_upload"),
    url(r'^api/task/file_download/$', views.task_file_download ,name="task_file_download"),
    url(r'^end_cmd/$', views.end_cmd,name="end_cmd"),

]
urls.py
import json,subprocess,os,signal
from audit import models
from django.conf import settings
from django.db.transaction import atomic
class Task(object):
    '''  '''
    def __init__(self,request):
        self.request=request
        self.errors=[]
        self.task_data=None

    def is_valid(self):
        task_data=self.request.POST.get('task_data')#{"task_type":"cmd","selected_host_ids":["1","2"],"cmd":"DF"}
        if task_data:
            self.task_data=json.loads(task_data)
            self.task_type=self.task_data.get('task_type')
            if self.task_type == 'cmd':
                selected_host_ids=self.task_data.get('selected_host_ids')
                if selected_host_ids:
                    return True
                self.errors.append({'invalid_argument': '命令/主機不存在'})

            elif self.task_type == 'file_transfer': #
                selected_host_ids =self.task_data.get('selected_host_ids')
                self.task_type = self.task_data.get('task_type')
                #驗證文件路徑
                user_id=models.Account.objects.filter(user=self.request.user).first().pk
                random_str=self.task_data.get('random_str')
                file_path=settings.FILE_UPLOADS+os.sep+str(user_id)+os.sep+random_str
                if os.path.isdir(file_path):
                    return True
                if not os.path.isdir(file_path):
                    self.errors.append({'invalid_argument': '上傳路徑失敗,請從新上傳'})
                if not selected_host_ids:
                    self.errors.append({'invalid_argument': '遠程主機不存在'})



            else:
                self.errors.append({'invalid_argument': '不支持的任務類型!'})
        self.errors.append({'invalid_data': 'task_data不存在!'})

    def run(self):
        task_func = getattr(self, self.task_data.get('task_type'))  #
        task_obj = task_func() #調用執行命令
        #print(task_obj.pk)  # 100 #這裏是任務id是自增的
        return task_obj


    @atomic #事物操做 任務信息和 子任務都要同時建立完成!
    def cmd(self):
        task_obj=models.Task.objects.create(
            task_type=0,
            account=self.request.user.account,
            content=self.task_data.get('cmd'),
        ) #1.增長批量任務信息,並返回批量任務信息的 pk


        tasklog_objs=[] #2.增長子任務信息(初始化數據庫)
        host_ids = set(self.task_data.get("selected_host_ids"))  # 獲取選中的主機id,並用集合去重
        for host_id in host_ids:
            tasklog_objs.append(models.TaskLog(task_id=task_obj.id,
                               host_user_bind_id=host_id,
                               status = 3))
        models.TaskLog.objects.bulk_create(tasklog_objs,100)  # 沒100條記錄 commit 1次!

        task_id=task_obj.pk
        cmd_str = "python %s %s" % (settings.MULTI_TASK_SCRIPT,task_id)  # 執行multitask.py腳本路徑
        print('------------------>',cmd_str)
        multitask_obj = subprocess.Popen(cmd_str,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) #新打開1個新進程
        #settings.CURRENT_PGID=os.getpgid(multitask_obj.pid) #os.getpgid(multitask_obj.pid)

        # os.killpg(pgid=pgid,sig=signal.SIGKILL)

        # print(multitask_obj.stderr.read().decode('utf-8') or multitask_obj.stdout.read().decode('utf-8'))
        #print("task result :",multitask_obj.stdout.read().decode('utf-8'),multitask_obj.stderr.read().decode('utf-8'))
        # print(multitask_obj.stdout.read())

        # for host_id in self.task_data.get('selected_host_ids'):
        #     t=Thread(target=self.run_cmd,args=(host_id,self.task_data.get('cmd')))
        #     t.start()

        return task_obj

    @atomic  # 事物操做 任務信息和 子任務都要同時建立完成!
    def file_transfer(self):
        print(self.task_data) #{'task_type': 'file_transfer', 'selected_host_ids': ['3'], 'file_transfer_type': 'send', 'random_str': 'iuon9bhm', 'remote_path': '/'}
        task_obj = models.Task.objects.create(
            task_type=1,
            account=self.request.user.account,
            content=json.dumps(self.task_data),
        )  # 1.增長批量任務信息,並返回批量任務信息的 pk

        tasklog_objs = []  # 2.增長子任務信息(初始化數據庫)
        host_ids = set(self.task_data.get("selected_host_ids"))  # 獲取選中的主機id,並用集合去重
        for host_id in host_ids:
            tasklog_objs.append(models.TaskLog(task_id=task_obj.id,
                                               host_user_bind_id=host_id,
                                               status=3))
        models.TaskLog.objects.bulk_create(tasklog_objs, 100)  # 沒100條記錄 commit 1次!

        task_id = task_obj.pk
        cmd_str = "python %s %s" % (settings.MULTI_TASK_SCRIPT, task_id)  # 執行multitask.py腳本路徑
        print('------------------>', cmd_str)
        multitask_obj = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE,
                                         stderr=subprocess.PIPE)  # 新打開1個新進程

        return task_obj
task_handler.py
import time,json
import sys,os
import multiprocessing
import paramiko

def cmd_run(tasklog_id,task_obj_id,cmd_str,):
    try:
        import django
        django.setup()
        from audit import models
        tasklog_obj = models.TaskLog.objects.get(id=tasklog_id)
        print(tasklog_obj, cmd_str)
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(tasklog_obj.host_user_bind.host.ip_addr,
                    tasklog_obj.host_user_bind.host.port,
                    tasklog_obj.host_user_bind.host_user.username,
                    tasklog_obj.host_user_bind.host_user.password,
                    timeout=15) #配置超時時間15秒!
        stdin, stdout, stderr = ssh.exec_command(cmd_str)
        result = stdout.read() + stderr.read()
        print('---------%s--------' % tasklog_obj.host_user_bind)
        print(result)
        ssh.close()
        #修改子任務數據庫結果
        tasklog_obj.result = result or 'cmd has no result output .'#若是沒有 返回結果 /出現錯誤
        tasklog_obj.status = 0
        tasklog_obj.save()
    except Exception as e:
        print(e)

def file_transfer(tasklog_id,task_id,task_content):
    import django
    django.setup()
    from django.conf import settings
    from audit import models
    tasklog_obj = models.TaskLog.objects.get(id=tasklog_id)
    try:
        print('task contnt:', tasklog_obj)
        task_data = json.loads(tasklog_obj.task.content)
        t = paramiko.Transport((tasklog_obj.host_user_bind.host.ip_addr, tasklog_obj.host_user_bind.host.port))
        t.connect(username=tasklog_obj.host_user_bind.host_user.username, password=tasklog_obj.host_user_bind.host_user.password,)
        sftp = paramiko.SFTPClient.from_transport(t)

        if task_data.get('file_transfer_type') =='send':
            local_path = "%s/%s/%s" %( settings.FILE_UPLOADS,
                                       tasklog_obj.task.account.id,
                                       task_data.get('random_str'))
            print("local path",local_path)
            for file_name in os.listdir(local_path):
                sftp.put('%s/%s' %(local_path,file_name), '%s/%s'%(task_data.get('remote_path'), file_name))
            tasklog_obj.result = "send all files done..."

        else:
            # 循環到全部的機器上的指定目錄下載文件
            download_dir = "{download_base_dir}/{task_id}".format(download_base_dir=settings.FILE_DOWNLOADS,
                                                                  task_id=task_id)
            if not os.path.exists(download_dir):
                os.makedirs(download_dir,exist_ok=True)

            remote_filename = os.path.basename(task_data.get('remote_path'))
            local_path = "%s/%s.%s" %(download_dir,tasklog_obj.host_user_bind.host.ip_addr,remote_filename)
            sftp.get(task_data.get('remote_path'),local_path )
            #remote path  /tmp/test.py
            tasklog_obj.result = 'get remote file [%s] to local done' %(task_data.get('remote_path'))
        t.close()

        tasklog_obj.status = 0
        tasklog_obj.save()
        # ssh = paramiko.SSHClient()
        # ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    except Exception as e:
        print("error :",e )
        tasklog_obj.result = str(e)
        tasklog_obj.save()




if __name__ == '__main__':
    BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    sys.path.append(BASE_DIR)
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "zhanggen_audit.settings")
    import django
    django.setup()

    from audit import models
    task_id = sys.argv[1]
    from audit import models
    task_id=int(sys.argv[1])
    # 1. 根據Taskid拿到任務對象,
    # 2. 拿到任務關聯的全部主機
    # 3.  根據任務類型調用多進程 執行不一樣的方法
    # 4 . 每一個子任務執行完畢後,本身把 子任務執行結果 寫入數據庫 TaskLog表
    task_obj = models.Task.objects.get(id=task_id)

    pool=multiprocessing.Pool(processes=10) #開啓 1個擁有10個進程的進程池

    if task_obj.task_type == 0:
        task_func=cmd_run
    else:
        task_func =file_transfer

    for task_log in task_obj.tasklog_set.all(): #查詢子任務信息,並更新子任務,進入執行階段!
        pool.apply_async(task_func,args=(task_log.id,task_obj.id,task_obj.content)) #開啓子進程,把子任務信息的pk、和 批量任務的命令傳進去!

    pool.close()
    pool.join()
multitask_execute.py

 

 2.從多臺服務器上get文件至本地(批量下載)

 

 

用戶輸入遠程服務器文件路徑,堡壘機生成本地下載路徑( /下載文件路徑/task_id/ip.遠程文件名)

開啓多進程 經過paramiko下載遠程主機的文件 到堡壘機下載路徑;

任務執行完畢前端彈出 下載文件到本地按鈕 (攜帶?批量任務ID)

用戶點擊下載文件到本地 a標籤,後端獲取當前批量任務的ID,把當前批量任務下載的files,打包返回給用戶瀏覽器!

 

def send_zipfile(request,task_id,file_path):

    zip_file_name = 'task_id_%s_files' % task_id
    archive = zipfile.ZipFile(zip_file_name , 'w', zipfile.ZIP_DEFLATED) #建立1個zip 包

    file_list = os.listdir(file_path) #找到堡壘機目錄下 全部文件

    for filename in file_list:      #把全部文件寫入 zip包中!
        archive.write('%s/%s' %(file_path,filename),arcname=filename)
    archive.close()
    #-------------------------------------------------------------- #文件打包完畢!

    wrapper = FileWrapper(open(zip_file_name,'rb')) #在內存中打開 打包好的壓縮包

    response = HttpResponse(wrapper, content_type='application/zip') #修改Django的response的content_type
    response['Content-Disposition'] = 'attachment; filename=%s.zip' % zip_file_name #告訴流量器以 附件形式下載
    response['Content-Length'] = os.path.getsize(zip_file_name)               #文件大小
    #temp.seek(0)
    return response






@login_required
def task_file_download(request): #下載文件到本地
    task_id = request.GET.get('task_id')
    print(task_id)
    task_file_path = "%s/%s"%( conf.settings.FILE_DOWNLOADS,task_id)
    download_files=os.listdir(task_file_path)
    print(download_files)
    return send_zipfile(request,task_id,task_file_path) #調用打包函數
Django響應壓縮文件

 

 

3.架構描述

 

 

 

當前架構缺陷:multitask在堡壘機上開多進程,隨着用戶量的增加,開啓的進程數量也會越多;

將來設想:在Django 和 multitask之間增長隊列,實現用戶大併發!

 

 

 

GitHub:https://github.com/zhanggen3714/zhanggen_audit

GateOne安裝

相關文章
相關標籤/搜索