每一個公司的網絡環境大都劃分 辦公網絡、線上網絡,之因此劃分的主要緣由是爲了保證線上操做安全;css
對於外部用戶而言也只能訪問線上網絡的特定開放端口,那麼是什麼控制了用戶訪問線上網絡的呢?html
防火牆過濾......!前端
對於內部員工而言對線上系統平常運維、代碼部署如何安全訪問線上業務系統呢?如何監控、記錄技術人員的操做記錄?python
堡壘機策略:ios
1.回收全部遠程登陸Linux主機的用戶名、密碼;git
2.中間設置堡壘機(保存全部線上Linux主機的用戶名、密碼);github
3.全部技術人員都要經過堡壘機去獲取用戶名、密碼,而後在再去鏈接 線上系統,並記錄操做日誌;web
堡壘機策略優勢:ajax
1.記錄用戶操做;redis
2.實現遠程操做權限集中管理;
from django.db import models from django.contrib.auth.models import User # Create your models here. class IDC(models.Model): name = models.CharField(max_length=64,unique=True) def __str__(self): return self.name class Host(models.Model): """存儲全部主機信息""" hostname = models.CharField(max_length=64,unique=True) ip_addr = models.GenericIPAddressField(unique=True) port = models.IntegerField(default=22) idc = models.ForeignKey("IDC") #host_groups = models.ManyToManyField("HostGroup") #host_users = models.ManyToManyField("HostUser") enabled = models.BooleanField(default=True) def __str__(self): return "%s-%s" %(self.hostname,self.ip_addr) class HostGroup(models.Model): """主機組""" name = models.CharField(max_length=64,unique=True) host_user_binds = models.ManyToManyField("HostUserBind") def __str__(self): return self.name class HostUser(models.Model): """存儲遠程主機的用戶信息 root 123 root abc root sfsfs """ auth_type_choices = ((0,'ssh-password'),(1,'ssh-key')) auth_type = models.SmallIntegerField(choices=auth_type_choices) username = models.CharField(max_length=32) password = models.CharField(blank=True,null=True,max_length=128) def __str__(self): return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password) class Meta: unique_together = ('username','password') class HostUserBind(models.Model): """綁定主機和用戶""" host = models.ForeignKey("Host") host_user = models.ForeignKey("HostUser") def __str__(self): return "%s-%s" %(self.host,self.host_user) class Meta: unique_together = ('host','host_user') class SessionLog(models.Model): ''' 記錄每一個用戶登陸操做,ID傳給 shell生成文件命名 ''' account=models.ForeignKey('Account') host_user_bind=models.ForeignKey('HostUserBind') start_date=models.DateField(auto_now_add=True) end_date=models.DateField(blank=True,null=True) def __str__(self): return '%s-%s'%(self.account,self.host_user_bind) class AuditLog(models.Model): """審計日誌""" class Account(models.Model): """堡壘機帳戶 1. 擴展 2. 繼承 user.account.host_user_bind """ user = models.OneToOneField(User) name = models.CharField(max_length=64) host_user_binds = models.ManyToManyField("HostUserBind",blank=True) host_groups = models.ManyToManyField("HostGroup",blank=True)
2種堡壘機登陸方式:
命令行登陸堡壘機方式:
方式1:經過 修改open_shh源碼擴展-Z option生成惟一 ssh進程,使用Linux的strace 命令對惟一 ssh進程進行檢測生成日誌文件;
0.用戶執行audit_shell出現交互界面,提示用戶輸入機組和主機;
import sys,os,django os.environ.setdefault("DJANGO_SETTINGS_MODULE","zhanggen_audit.settings") django.setup() #在Django視圖以外,調用Django功能設置環境變量! from audit.backend import user_interactive if __name__ == '__main__': shell_obj=user_interactive.UserShell(sys.argv) shell_obj.start()
from django.contrib.auth import authenticate class UserShell(object): '''用戶登陸堡壘機,啓動自定製shell ''' def __init__(self,sys_argv): self.sys_argv=sys_argv self.user=None def auth(self): count=0 while count < 3: username=input('username:').strip() password=input('password:').strip() user=authenticate(username=username,password=password) #none 表明認證失敗,返回用戶對象認證成功! if not user: count+=1 print('無效的用戶名或者,密碼!') else: self.user=user return True else: print('輸入次數超過3次!') def start(self): """啓動交互程序""" if self.auth(): # print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index, group in enumerate(host_groups): print("%s.\t%s[%s]" % (index, group, group.host_user_binds.count())) print("%s.\t未分組機器[%s]" % (len(host_groups), self.user.account.host_user_binds.count())) choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >= 0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): # 選擇的未分組機器 # selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index, host in enumerate(host_bind_list): print("%s.\t%s" % (index, host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >= 0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] print("selected host", selected_host) elif choice2 == 'b': break
知識點:
在Django視圖以外,調用Django功能設置環境變量!(切記放在和Django manage.py 同級目錄);
import sys,os,django os.environ.setdefault("DJANGO_SETTINGS_MODULE","Sensors_Data.settings") django.setup() # 在Django視圖以外,調用Django功能設置環境變量! from app01 import models objs=models.AlarmInfo.objects.all() for row in objs: print(row.comment)
注意:在Django啓動時會自動加載一些 文件,好比每一個app中admin.py,不能在這些文件裏面設置加載環境變量,由於已經加載完了,若是違反這個規則會致使Django程序啓動失敗;
1.實現ssh用戶指令檢測
1.0 修改open_shh源碼,擴展 ssh -Z 惟一標識符;(這樣每次ssh遠程登陸,均可以利用惟一標識符,分辨出 每一個ssh會話進程;)
修改OpenSsh下的ssh.c文件的608和609行、935行增長; while ((opt = getopt(ac, av, "1246ab:c:e:fgi:kl:m:no:p:qstvxz:" "ACD:E:F:GI:J:KL:MNO:PQ:R:S:TVw:W:XYyZ:")) != -1) { case 'Z': break;
知識點:
OpenSSH 是 SSH (Secure SHell) 協議的免費開源實現項目。
1.1 修改openssh以後,編譯、安裝
chmod 755 configure ./configure --prefix=/usr/local/openssh make chmod 755 mkinstalldirs make install sshpass -p xxxxxx123 /usr/local/openssh/bin/ssh root@172.17.10.112 -Z s1123ssssd212
1.2 每一個ssh會話進程能夠惟一標識以後,在堡壘機使用會話腳本shell腳本檢測 ssh會話進程;(strace命令進行監控,並生產 log日誌文件);
#!/usr/bin/bash for i in $(seq 1 30);do echo $i $1 process_id=`ps -ef | grep $1 | grep -v 'ession_check.sh' | grep -v grep | grep -v sshpass | awk '{print $2}'` echo "process: $process_id" if [ ! -z "$process_id" ];then echo 'start run strace.....' strace -fp $process_id -t -o $2.log; break; fi sleep 5 done;
知識點:
strace 檢測進程的IO調用,監控用戶shell輸入的命令字符;
sshpass無需提示輸入密碼登陸
[root@localhost sshpass-1.06]# sshpass -p wsnb ssh root@172.16.22.1 -o StrictHostKeyChecking=no Last login: Tue Jul 10 16:39:53 2018 from 192.168.113.84 [root@ecdb ~]#
python生成惟一標識符
s=string.ascii_lowercase+string.digits random_tag=''.join(random.sample(s,10))
解決普通用戶,沒法執行 strace命令;
方式1:執行文件 +s權限
chmod u+s `which strace`
方式2:修改sudo配置文件,使普通用戶sudo時無需輸入密碼!
修改sudo配置文件,防止修改出錯,必定要切換到root用戶; %普通用戶 ALL=(ALL) NOPASSWD: ALL wq! #退出
#!/usr/bin/python3 # -*- coding: utf-8 -* from django.contrib.auth import authenticate import subprocess,string,random from audit import models from django.conf import settings class UserShell(object): '''用戶登陸堡壘機,啓動自定製shell ''' def __init__(self,sys_argv): self.sys_argv=sys_argv self.user=None def auth(self): count=0 while count < 3: username=input('username:').strip() password=input('password:').strip() user=authenticate(username=username,password=password) #none 表明認證失敗,返回用戶對象認證成功! if not user: count+=1 print('無效的用戶名或者,密碼!') else: self.user=user return True else: print('輸入次數超過3次!') def start(self): """啓動交互程序""" if self.auth(): # print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index, group in enumerate(host_groups): print("%s.\t%s[%s]" % (index, group, group.host_user_binds.count())) print("%s.\t未分組機器[%s]" % (len(host_groups), self.user.account.host_user_binds.count())) choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >= 0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): # 選擇的未分組機器 # selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index, host in enumerate(host_bind_list): print("%s.\t%s" % (index, host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >= 0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] s = string.ascii_lowercase + string.digits random_tag = ''.join(random.sample(s, 10)) session_obj=models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host) session_tracker_scipt='/bin/sh %s %s %s'%(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.pk) session_tracker_process=subprocess.Popen(session_tracker_scipt,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) cmd='sshpass -p %s /usr/local/openssh/bin/ssh %s@%s -p %s -o stricthostkeychecking=no -Z %s' % (selected_host.host_user.password, selected_host.host_user.username, selected_host.host.ip_addr, selected_host.host.port,random_tag) subprocess.run(cmd,shell=True)#開啓子進程交互 print(session_tracker_process.stdout.readlines(), session_tracker_process.stderr.readlines()) elif choice2 == 'b': break
2.shell遠程登陸程序檢查日誌文件,分析;
tab補全的命令,須要搜素write(5,該腳本實現思路,按鍵去嘗試,循環多種條件判斷;
import re class AuditLogHandler(object): '''分析audit log日誌''' def __init__(self,log_file): self.log_file_obj = self._get_file(log_file) def _get_file(self,log_file): return open(log_file) def parse(self): cmd_list = [] cmd_str = '' catch_write5_flag = False #for tab complication for line in self.log_file_obj: #print(line.split()) line = line.split() try: pid,time_clock,io_call,char = line[0:4] if io_call.startswith('read(4'): if char == '"\\177",':#回退 char = '[1<-del]' if char == '"\\33OB",': #vim中下箭頭 char = '[down 1]' if char == '"\\33OA",': #vim中下箭頭 char = '[up 1]' if char == '"\\33OC",': #vim中右移 char = '[->1]' if char == '"\\33OD",': #vim中左移 char = '[1<-]' if char == '"\33[2;2R",': #進入vim模式 continue if char == '"\\33[>1;95;0c",': # 進入vim模式 char = '[----enter vim mode-----]' if char == '"\\33[A",': #命令行向上箭頭 char = '[up 1]' catch_write5_flag = True #取到向上按鍵拿到的歷史命令 if char == '"\\33[B",': # 命令行向上箭頭 char = '[down 1]' catch_write5_flag = True # 取到向下按鍵拿到的歷史命令 if char == '"\\33[C",': # 命令行向右移動1位 char = '[->1]' if char == '"\\33[D",': # 命令行向左移動1位 char = '[1<-]' cmd_str += char.strip('"",') if char == '"\\t",': catch_write5_flag = True continue if char == '"\\r",': cmd_list.append([time_clock,cmd_str]) cmd_str = '' # 重置 if char == '"':#space cmd_str += ' ' if catch_write5_flag: #to catch tab completion if io_call.startswith('write(5'): if io_call == '"\7",': #空鍵,不是空格,是回退不了就是這個鍵 pass else: cmd_str += char.strip('"",') catch_write5_flag = False except ValueError as e: print("\033[031;1mSession log record err,please contact your IT admin,\033[0m",e) #print(cmd_list) for cmd in cmd_list: print(cmd) return cmd_list if __name__ == "__main__": parser = AuditLogHandler(r'D:\zhanggen_audit\log\6.log') parser.parse()
3.修改bashrc文件,限制用戶登陸行爲;
alias rm='rm -i' alias cp='cp -i' alias mv='mv -i' # Source global definitions if [ -f /etc/bashrc ]; then . /etc/bashrc fi echo '-----------------------welcome to zhanggen audit --------------------------' python3 /root/zhanggen_audit/audit_shell.py echo 'bye' logout
缺陷:
雖然限制了用戶shell登陸,但沒法阻止用戶使用程序(paramiko)上傳惡意文件!
方式2:提取paramiko源碼demos文件,對其進行修改支持交互式操做;
from django.db import models from django.contrib.auth.models import User # Create your models here. class IDC(models.Model): name = models.CharField(max_length=64,unique=True) def __str__(self): return self.name class Host(models.Model): """存儲全部主機信息""" hostname = models.CharField(max_length=64,unique=True) ip_addr = models.GenericIPAddressField(unique=True) port = models.IntegerField(default=22) idc = models.ForeignKey("IDC") #host_groups = models.ManyToManyField("HostGroup") #host_users = models.ManyToManyField("HostUser") enabled = models.BooleanField(default=True) def __str__(self): return "%s-%s" %(self.hostname,self.ip_addr) class HostGroup(models.Model): """主機組""" name = models.CharField(max_length=64,unique=True) host_user_binds = models.ManyToManyField("HostUserBind") def __str__(self): return self.name class HostUser(models.Model): """存儲遠程主機的用戶信息 root 123 root abc root sfsfs """ auth_type_choices = ((0,'ssh-password'),(1,'ssh-key')) auth_type = models.SmallIntegerField(choices=auth_type_choices) username = models.CharField(max_length=32) password = models.CharField(blank=True,null=True,max_length=128) def __str__(self): return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password) class Meta: unique_together = ('username','password') class HostUserBind(models.Model): """綁定主機和用戶""" host = models.ForeignKey("Host") host_user = models.ForeignKey("HostUser") def __str__(self): return "%s-%s" %(self.host,self.host_user) class Meta: unique_together = ('host','host_user') class AuditLog(models.Model): """審計日誌""" session = models.ForeignKey("SessionLog") cmd = models.TextField() date = models.DateTimeField(auto_now_add=True) def __str__(self): return "%s-%s" %(self.session,self.cmd) class SessionLog(models.Model): account = models.ForeignKey("Account") host_user_bind = models.ForeignKey("HostUserBind") start_date = models.DateTimeField(auto_now_add=True) end_date = models.DateTimeField(blank=True,null=True) def __str__(self): return "%s-%s" %(self.account,self.host_user_bind) class Account(models.Model): """堡壘機帳戶 1. 擴展 2. 繼承 user.account.host_user_bind """ user = models.OneToOneField(User) name = models.CharField(max_length=64) host_user_binds = models.ManyToManyField("HostUserBind",blank=True) host_groups = models.ManyToManyField("HostGroup",blank=True)
__author__ = 'Administrator' import subprocess,random,string from django.contrib.auth import authenticate from django.conf import settings from audit import models from audit.backend import ssh_interactive class UserShell(object): """用戶登陸堡壘機後的shell""" def __init__(self,sys_argv): self.sys_argv = sys_argv self.user = None def auth(self): count = 0 while count < 3: username = input("username:").strip() password = input("password:").strip() user = authenticate(username=username,password=password) #None 表明認證不成功 #user object ,認證對象 ,user.name if not user: count += 1 print("Invalid username or password!") else: self.user = user return True else: print("too many attempts.") def start(self): """啓動交互程序""" if self.auth(): #print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index,group in enumerate(host_groups): print("%s.\t%s[%s]"%(index,group,group.host_user_binds.count())) print("%s.\t未分組機器[%s]"%(len(host_groups),self.user.account.host_user_binds.count())) try: choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >=0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): #選擇的未分組機器 #selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index,host in enumerate(host_bind_list): print("%s.\t%s"%(index,host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >=0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] ssh_interactive.ssh_session(selected_host,self.user) # s = string.ascii_lowercase +string.digits # random_tag = ''.join(random.sample(s,10)) # session_obj = models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host) # # cmd = "sshpass -p %s /usr/local/openssh/bin/ssh %s@%s -p %s -o StrictHostKeyChecking=no -Z %s" %(selected_host.host_user.password,selected_host.host_user.username,selected_host.host.ip_addr,selected_host.host.port ,random_tag) # #start strace ,and sleep 1 random_tag, session_obj.id # session_tracker_script = "/bin/sh %s %s %s " %(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.id) # # session_tracker_obj =subprocess.Popen(session_tracker_script, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) # # ssh_channel = subprocess.run(cmd,shell=True) # print(session_tracker_obj.stdout.read(), session_tracker_obj.stderr.read()) # elif choice2 == 'b': break except KeyboardInterrupt as e : pass
#!/usr/bin/env python # Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. import base64 from binascii import hexlify import getpass import os import select import socket import sys import time import traceback from paramiko.py3compat import input from audit import models import paramiko try: import interactive except ImportError: from . import interactive def manual_auth(t, username, password): # default_auth = 'p' # auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth) # if len(auth) == 0: # auth = default_auth # # if auth == 'r': # default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa') # path = input('RSA key [%s]: ' % default_path) # if len(path) == 0: # path = default_path # try: # key = paramiko.RSAKey.from_private_key_file(path) # except paramiko.PasswordRequiredException: # password = getpass.getpass('RSA key password: ') # key = paramiko.RSAKey.from_private_key_file(path, password) # t.auth_publickey(username, key) # elif auth == 'd': # default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa') # path = input('DSS key [%s]: ' % default_path) # if len(path) == 0: # path = default_path # try: # key = paramiko.DSSKey.from_private_key_file(path) # except paramiko.PasswordRequiredException: # password = getpass.getpass('DSS key password: ') # key = paramiko.DSSKey.from_private_key_file(path, password) # t.auth_publickey(username, key) # else: # pw = getpass.getpass('Password for %s@%s: ' % (username, hostname)) t.auth_password(username, password) def ssh_session(bind_host_user, user_obj): # now connect hostname = bind_host_user.host.ip_addr #自動輸入 主機名 port = bind_host_user.host.port #端口 username = bind_host_user.host_user.username password = bind_host_user.host_user.password try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #生成socket鏈接 sock.connect((hostname, port)) except Exception as e: print('*** Connect failed: ' + str(e)) traceback.print_exc() sys.exit(1) try: t = paramiko.Transport(sock) #使用paramiko的方法去鏈接服務器執行命令! try: t.start_client() except paramiko.SSHException: print('*** SSH negotiation failed.') sys.exit(1) try: keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts')) except IOError: try: keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts')) except IOError: print('*** Unable to open host keys file') keys = {} # check server's host key -- this is important. key = t.get_remote_server_key() if hostname not in keys: print('*** WARNING: Unknown host key!') elif key.get_name() not in keys[hostname]: print('*** WARNING: Unknown host key!') elif keys[hostname][key.get_name()] != key: print('*** WARNING: Host key has changed!!!') sys.exit(1) else: print('*** Host key OK.') if not t.is_authenticated(): manual_auth(t, username, password) #密碼校驗 if not t.is_authenticated(): print('*** Authentication failed. :(') t.close() sys.exit(1) chan = t.open_session() chan.get_pty() # terminal chan.invoke_shell() print('*** Here we go!\n') session_obj = models.SessionLog.objects.create(account=user_obj.account, host_user_bind=bind_host_user) interactive.interactive_shell(chan, session_obj)#開始進入交換模式· chan.close() t.close() except Exception as e: print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e)) traceback.print_exc() try: t.close() except: pass sys.exit(1)
# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. import socket import sys from paramiko.py3compat import u from audit import models # windows does not have termios... try: import termios import tty has_termios = True except ImportError: has_termios = False def interactive_shell(chan,session_obj): if has_termios: # posix_shell(chan,session_obj) #unix 通用協議標準 else: windows_shell(chan) def posix_shell(chan,session_obj): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) flag = False cmd = '' while True: #開始輸入命令 r, w, e = select.select([chan, sys.stdin], [], []) #循環檢測 輸入、輸出、錯誤,有反應就返回,沒有就一直夯住! if chan in r:#遠程 由返回 命令結果 try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break if flag: #若是用戶輸入的Tab補全,服務器端返回 cmd += x flag = False sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: #本地輸入 x = sys.stdin.read(1) #輸入1個字符就發送遠程服務器 if len(x) == 0: break if x == '\r': #回車· models.AuditLog.objects.create(session=session_obj,cmd=cmd) cmd = '' elif x == '\t':#tab 本地1個字符+遠程返回的 flag = True else: cmd += x chan.send(x) #發送本地輸入 到遠程服務器 finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) # thanks to Mike Looijmans for this code def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write('\r\n*** EOF ***\r\n\r\n') sys.stdout.flush() break sys.stdout.write(data) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass
程序流程:用戶界面---------->ssh自動輸入用戶&登陸密碼---------->進入shell命令交互模式
知識點:
1對1: 1個 對應 1個 (1個女人嫁給了1個男人,生活慢慢平淡下來,)
1對多: 1個 對應 N個 (這個女人隱瞞丈夫相繼出軌了N個男人,這個男人發現老婆出軌了,很憤懣)
多對多: 雙方都存在1對多關係 (也相繼找了N個女情人,而這些女情人中就有他老婆出軌男人的老婆,故事結束。)
感悟:
這個故事很混亂! 怎麼設計男、女表結構? 其實在作數據庫表關係設計的時候,糾結2張表到底須要設計成什麼關係?到不如加幾張關係綁定表!
徹底是出於 你的程序在容許的過程當中到底 要向用戶展現什麼信息? 而決定的!
web頁面使用堡壘機方式:
web開發模式
1.MTV/MVC 先後端雜交模式;(面向公司內部OA)
優點:簡單,一人全棧;
缺陷:先後端耦合性高,性能低、單點壓力
2.先後端分離(面向大衆用戶)
優點:前、後端開發人員商定好接口和數據格式,並行開發,效率高;解決了後端獨自渲染模板的壓力;
缺陷:招前端得花錢
3.hostlist 展現主機組和主機
<div class="panel col-lg-3"> <div class="panel-heading"> <h3 class="panel-title">主機組</h3> </div> <div class="panel-body"> <ul class="list-group"> {% for group in request.user.account.host_groups.all %} <li class="list-group-item " onclick="GetHostlist({{ group.id }},this)"><span class="badge badge-success">{{ group.host_user_binds.count }}</span>{{ group.name }}</li> {% endfor %} <li class="list-group-item " onclick="GetHostlist(-1,this)"> <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span>未分組主機</li> </ul> </div> </div>
<script> function GetHostlist(gid,self) { $.get("{% url 'get_host_list' %}",{'gid':gid},function(callback){ var data = JSON.parse(callback); console.log(data) var trs = '' $.each(data,function (index,i) { var tr = "<tr><td>" + i.host__hostname + "</td><td>" + i.host__ip_addr +"</td><td>" + i.host__idc__name +"</td><td>" + i.host__port + "</td><td>" + i.host_user__username+ "</td><td>Login</td></tr>"; trs += tr }) $("#hostlist").html(trs); });//end get $(self).addClass("active").siblings().removeClass('active'); } </script>
知識點:
若是給標籤綁定事件,須要傳參數,能夠直接在標籤直接綁定。
url(r'^get_tocken$', views.get_tocken, name="get_tocken"),
function GetToken(self,bind_host_id) { $.post( '{% url "get_tocken" %}', //經過url別名渲染url {'bind_host_id':bind_host_id,'csrfmiddlewaretoken':"{{ csrf_token }}"},//請求攜帶的參數 function (callback) { //回調函數 console.log(callback) } ) }
@login_required def get_token(request): bind_host_id=request.POST.get('bind_host_id') time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300) # 5mins ago exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id, host_user_bind_id=bind_host_id, date__gt=time_obj) if exist_token_objs: # has token already token_data = {'token': exist_token_objs[0].val} else: token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8)) token_obj=models.Token.objects.create( host_user_bind_id=bind_host_id, account=request.user.account, val=token_val) token_data={"token":token_val} return HttpResponse(json.dumps(token_data))
4.點擊主機登陸,經過Shellinabox 插件以web頁面的形式遠程登陸Linux主機;
4.0 安裝sehllinabox
yum install git openssl-devel pam-devel zlib-devel autoconf automake libtool git clone https://github.com/shellinabox/shellinabox.git && cd shellinabox autoreconf -i ./configure && make make install shellinaboxd -b -t //-b選項表明在後臺啓動,-t選項表示不使用https方式啓動,默認以nobody用戶身份,監聽TCP4200端口 netstat -ntpl |grep shell
5.django結合sehll inabox
5.1:用戶在Django的hostlist頁面點擊生成tocken(綁定了account+host_bind_user),記錄到數據庫。
5.2: 用戶在Django的hostlist頁面 login跳轉至 sehll inabox因爲修改了bashrc跳轉以後,就會執行python用戶交互程序,python用戶交互程序 提示用戶輸入 token;
5.3: 用戶輸入token以後,python 用戶交互程序去數據庫查詢token,進而查詢到host_bind_user的ip、用戶、密碼,調用paramiko的demo.py自動輸入ip、用戶、密碼進入shell交互界面;
from django.db import models from django.contrib.auth.models import User # Create your models here. class IDC(models.Model): name = models.CharField(max_length=64,unique=True) def __str__(self): return self.name class Host(models.Model): """存儲全部主機信息""" hostname = models.CharField(max_length=64,unique=True) ip_addr = models.GenericIPAddressField(unique=True) port = models.IntegerField(default=22) idc = models.ForeignKey("IDC") #host_groups = models.ManyToManyField("HostGroup") #host_users = models.ManyToManyField("HostUser") enabled = models.BooleanField(default=True) def __str__(self): return "%s-%s" %(self.hostname,self.ip_addr) class HostGroup(models.Model): """主機組""" name = models.CharField(max_length=64,unique=True) host_user_binds = models.ManyToManyField("HostUserBind") def __str__(self): return self.name class HostUser(models.Model): """存儲遠程主機的用戶信息 root 123 root abc root sfsfs """ auth_type_choices = ((0,'ssh-password'),(1,'ssh-key')) auth_type = models.SmallIntegerField(choices=auth_type_choices) username = models.CharField(max_length=32) password = models.CharField(blank=True,null=True,max_length=128) def __str__(self): return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password) class Meta: unique_together = ('username','password') class HostUserBind(models.Model): """綁定主機和用戶""" host = models.ForeignKey("Host") host_user = models.ForeignKey("HostUser") def __str__(self): return "%s-%s" %(self.host,self.host_user) class Meta: unique_together = ('host','host_user') class AuditLog(models.Model): """審計日誌""" session = models.ForeignKey("SessionLog") cmd = models.TextField() date = models.DateTimeField(auto_now_add=True) def __str__(self): return "%s-%s" %(self.session,self.cmd) class SessionLog(models.Model): account = models.ForeignKey("Account") host_user_bind = models.ForeignKey("HostUserBind") start_date = models.DateTimeField(auto_now_add=True) end_date = models.DateTimeField(blank=True,null=True) def __str__(self): return "%s-%s" %(self.account,self.host_user_bind) class Account(models.Model): """堡壘機帳戶 1. 擴展 2. 繼承 user.account.host_user_bind """ user = models.OneToOneField(User) name = models.CharField(max_length=64) host_user_binds = models.ManyToManyField("HostUserBind",blank=True) host_groups = models.ManyToManyField("HostGroup",blank=True) class Token(models.Model): host_user_bind = models.ForeignKey("HostUserBind") val = models.CharField(max_length=128,unique=True) account = models.ForeignKey("Account") expire = models.IntegerField("超時時間(s)",default=300) date = models.DateTimeField(auto_now_add=True) def __str__(self): return "%s-%s" %(self.host_user_bind,self.val)
@login_required def get_token(request): bind_host_id=request.POST.get('bind_host_id') time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300) # 5mins ago exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id, host_user_bind_id=bind_host_id, date__gt=time_obj) if exist_token_objs: # has token already token_data = {'token': exist_token_objs[0].val} else: token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8)) token_obj=models.Token.objects.create( host_user_bind_id=bind_host_id, account=request.user.account, val=token_val) token_data={"token":token_val} return HttpResponse(json.dumps(token_data))
{% extends 'index.html' %} {% block content-container %} <div id="page-title"> <h1 class="page-header text-overflow">主機列表</h1> <!--Searchbox--> <div class="searchbox"> <div class="input-group custom-search-form"> <input type="text" class="form-control" placeholder="Search.."> <span class="input-group-btn"> <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button> </span> </div> </div> </div> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End page title--> <!--Breadcrumb--> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <ol class="breadcrumb"> <li><a href="#">Home</a></li> <li><a href="#">Library</a></li> <li class="active">主機列表</li> </ol> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End breadcrumb--> <div id="page-content"> <div class="panel col-lg-3"> <div class="panel-heading"> <h3 class="panel-title">主機組</h3> </div> <div class="panel-body"> <ul class="list-group"> {% for group in request.user.account.host_groups.all %} <li class="list-group-item " onclick="GetHostlist({{ group.id }},this)"><span class="badge badge-success">{{ group.host_user_binds.count }}</span>{{ group.name }}</li> {% endfor %} <li class="list-group-item " onclick="GetHostlist(-1,this)"> <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span>未分組主機</li> </ul> </div> </div> <div class="panel col-lg-9"> <div class="panel-heading"> <h3 class="panel-title">主機列表</h3> </div> <div class="panel-body"> <div class="table-responsive"> <table class="table table-striped"> <thead> <tr> <th>Hostname</th> <th>IP</th> <th>IDC</th> <th>Port</th> <th>Username</th> <th>Login</th> <th>Token</th> </tr> </thead> <tbody id="hostlist"> {# <tr>#} {# <td><a href="#fakelink" class="btn-link">Order #53451</a></td>#} {# <td>Scott S. Calabrese</td>#} {# <td>$24.98</td>#} {# </tr>#} </tbody> </table> </div> </div> </div> </div> <script> function GetHostlist(gid,self) { $.get("{% url 'get_host_list' %}",{'gid':gid},function(callback){ var data = JSON.parse(callback); console.log(data); var trs = ''; $.each(data,function (index,i) { var tr = "<tr><td>" + i.host__hostname + "</td><td>" + i.host__ip_addr +"</td><td>" + i.host__idc__name +"</td><td>" + i.host__port + "</td><td>" + i.host_user__username+ "</td><td><a class='btn btn-sm btn-info' onclick=GetToken(this,'"+i.id +"')>Token</a><a href='http://192.168.226.135:4200/' class='btn btn-sm btn-info'')>login</a></td><td ></td></tr>"; trs += tr }); $("#hostlist").html(trs); });//end get $(self).addClass("active").siblings().removeClass('active'); } function GetToken(self,bind_host_id) { $.post( '{% url "get_token" %}', //經過url別名渲染url {'bind_host_id':bind_host_id,'csrfmiddlewaretoken':"{{ csrf_token }}"},//請求攜帶的參數 function (callback) { //回調函數 console.log(callback); var data = JSON.parse(callback); //django響應的數據 $(self).parent().next().text(data.token); } ) } </script> {% endblock %}
import subprocess,random,string,datetime from django.contrib.auth import authenticate from django.conf import settings from audit import models from audit.backend import ssh_interactive class UserShell(object): """用戶登陸堡壘機後的shell""" def __init__(self,sys_argv): self.sys_argv = sys_argv self.user = None def auth(self): count = 0 while count < 3: username = input("username:").strip() password = input("password:").strip() user = authenticate(username=username,password=password) #None 表明認證不成功 #user object ,認證對象 ,user.name if not user: count += 1 print("Invalid username or password!") else: self.user = user return True else: print("too many attempts.") def token_auth(self): count = 0 while count < 3: user_input = input("請輸入token:").strip() if len(user_input) == 0: return if len(user_input) != 8: print("token length is 8") else: time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300) # 5mins ago token_obj = models.Token.objects.filter(val=user_input, date__gt=time_obj).first() if token_obj: if token_obj.val == user_input: # 口令對上了 self.user = token_obj.account.user #進入交互式shll須要用戶認證! return token_obj count+=1 def start(self): """啓動交互程序""" token_obj = self.token_auth() if token_obj: ssh_interactive.ssh_session(token_obj.host_user_bind, self.user) exit() if self.auth(): #print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index,group in enumerate(host_groups): print("%s.\t%s[%s]"%(index,group,group.host_user_binds.count())) print("%s.\t未分組機器[%s]"%(len(host_groups),self.user.account.host_user_binds.count())) try: choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >=0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): #選擇的未分組機器 #selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index,host in enumerate(host_bind_list): print("%s.\t%s"%(index,host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >=0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] ssh_interactive.ssh_session(selected_host,self.user) # s = string.ascii_lowercase +string.digits # random_tag = ''.join(random.sample(s,10)) # session_obj = models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host) # # cmd = "sshpass -p %s /usr/local/openssh/bin/ssh %s@%s -p %s -o StrictHostKeyChecking=no -Z %s" %(selected_host.host_user.password,selected_host.host_user.username,selected_host.host.ip_addr,selected_host.host.port ,random_tag) # #start strace ,and sleep 1 random_tag, session_obj.id # session_tracker_script = "/bin/sh %s %s %s " %(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.id) # # session_tracker_obj =subprocess.Popen(session_tracker_script, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) # # ssh_channel = subprocess.run(cmd,shell=True) # print(session_tracker_obj.stdout.read(), session_tracker_obj.stderr.read()) # elif choice2 == 'b': break except KeyboardInterrupt as e : pass
1.批量執行命令前端頁面
{% extends 'index.html' %} {% block content-container %} {# {% csrf_token %}#} <div id="page-title"> <h1 class="page-header text-overflow">主機列表</h1> <!--Searchbox--> <div class="searchbox"> <div class="input-group custom-search-form"> <input type="text" class="form-control" placeholder="Search.."> <span class="input-group-btn"> <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button> </span> </div> </div> </div> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End page title--> <!--Breadcrumb--> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <ol class="breadcrumb"> <li><a href="#">Home</a></li> <li><a href="#">Library</a></li> <li class="active">主機列表</li> </ol> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End breadcrumb--> <div id="page-content"> <div class="panel col-lg-3"> <div class="panel-heading"> <h3 class="panel-title">主機組 <span id="selected_hosts"></span></h3> </div> <div class="panel-body"> <ul class="list-group" id="host_groups"> {% for group in request.user.account.host_groups.all %} <li class="list-group-item " ><span class="badge badge-success">{{ group.host_user_binds.count }}</span> <input type="checkbox" onclick="CheckAll(this)"> <a onclick="DisplayHostList(this)">{{ group.name }}</a> <!--點擊組名,組名下的 主機列表經過toggleclass 展現/隱藏 --> <ul class="hide"> {% for bind_host in group.host_user_binds.all %} <li><input onclick="ShowCheckedHostCount()" type="checkbox" value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li> {% endfor %} </ul> </li> {% endfor %} <li class="list-group-item " > <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span> <input type="checkbox" onclick="CheckAll(this)"> <a onclick="DisplayHostList(this)">未分組主機</a> <ul class="hide"> {% for bind_host in request.user.account.host_user_binds.all %} <li><input onclick="ShowCheckedHostCount()" type="checkbox" value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li> {% endfor %} </ul> </li> </ul> </div> </div> <div class="col-lg-9"> <div class="panel"> <div class="panel-heading"> <h3 class="panel-title">命令</h3> </div> <div class="panel-body"> <textarea class="form-control" id="cmd"></textarea> <button onclick="PostTask('cmd')" class="btn btn-info pull-right">執行</button> <button class="btn btn-danger ">終止</button> </div> </div> <div class="panel"> <div class="panel-heading"> <h3 class="panel-title">任務結果</h3> </div> <div class="panel-body"> <div id="task_result"> </div> </div> </div> </div> </div> <script> function DisplayHostList(self) { $(self).next().toggleClass("hide"); } function CheckAll(self){ console.log($(self).prop('checked')); $(self).parent().find("ul :checkbox").prop('checked',$(self).prop('checked')); ShowCheckedHostCount() } function ShowCheckedHostCount(){ var selected_host_count = $("#host_groups ul").find(":checked").length; console.log(selected_host_count); $("#selected_hosts").text(selected_host_count); return selected_host_count } {# function GetTaskResult(task_id) {#} {# $.getJSON("{% url 'get_task_result' %}",{'task_id':task_id},function(callback){#} {##} {# console.log(callback);#} {##} {# var result_ele = '';#} {# $.each(callback,function (index,i) {#} {# var p_ele = "<p>" + i.host_user_bind__host__hostname + "(" +i.host_user_bind__host__ip_addr +") ------" +#} {# i.status + "</p>";#} {# var res_ele = "<pre>" + i.result +"</pre>";#} {##} {# var single_result = p_ele + res_ele;#} {# result_ele += single_result#} {# });#} {##} {# $("#task_result").html(result_ele)#} {##} {##} {# });//end getJSON#} {##} {# }#} function PostTask(task_type) { //1. 驗證主機列表已選,命令已輸入 //2. 提交任務到後臺 var selected_host_ids = []; var selected_host_eles = $("#host_groups ul").find(":checked"); $.each(selected_host_eles,function (index,ele) { selected_host_ids.push($(ele).val()) }); console.log(selected_host_ids); if ( selected_host_ids.length == 0){ alert("主機未選擇!"); return false; } var cmd_text = $.trim($("#cmd").val()); if ( cmd_text.length == 0){ alert("未輸入命令!"); return false; } var task_data = { 'task_type':task_type, 'selected_host_ids': selected_host_ids, 'cmd': cmd_text }; $.post("{% url 'multitask' %}",{'csrfmiddlewaretoken':"{{ csrf_token }}",'task_data':JSON.stringify(task_data)}, function(callback){ console.log(callback) ;// task id var callback = JSON.parse(callback); GetTaskResult(callback.task_id); var result_timer = setInterval(function () { GetTaskResult(callback.task_id) },2000) } );//end post } </script> {% endblock %}
2.前端收集批量執行的主機,經過ajax發送到後臺
@login_required def multitask(request): task_obj = task_handler.Task(request) respose=HttpResponse(json.dumps(task_obj.errors)) if task_obj.is_valid(): # 若是驗證成功 result = task_obj.run() #run()去選擇要執行的任務類型,而後經過 getattr()去執行 respose=HttpResponse(json.dumps({'task_id':result})) #返回數據庫pk task_id return respose
3.後端經過is_valid方法驗證數據的合法性
4.驗證失敗響應前端self.errors信息,驗證成功執行run()選擇任務類型;
5.選擇任務類型(cmd/files_transfer)以後初始化數據庫(更新Task、TaskLog表數據)
6.cmd/files_transfer方法開啓新進程(multitask_execute.py)新進程開啓進程池 去執行批量命令;
7.前端使用定時器不斷去後臺獲取數據;
8.程序中斷按鈕
""" Django settings for zhanggen_audit project. Generated by 'django-admin startproject' using Django 1.11.4. For more information on this file, see https://docs.djangoproject.com/en/1.11/topics/settings/ For the full list of settings and their values, see https://docs.djangoproject.com/en/1.11/ref/settings/ """ import os # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = '5ivlngau4a@_3y4vizrcxnnj(&vz2en#edpq%i&jr%99-xxv)&' # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True ALLOWED_HOSTS = ['*'] # Application definition INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'audit.apps.AuditConfig', ] MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', ] ROOT_URLCONF = 'zhanggen_audit.urls' TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [os.path.join(BASE_DIR, 'templates'),], 'APP_DIRS': True, 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', ], }, }, ] WSGI_APPLICATION = 'zhanggen_audit.wsgi.application' # Database # https://docs.djangoproject.com/en/1.11/ref/settings/#databases DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', }, { 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', }, { 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', }, { 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', }, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = 'en-us' TIME_ZONE = 'Asia/Shanghai' USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = '/static/' STATICFILES_DIRS=( os.path.join(BASE_DIR,'static'), ) SESSION_TRACKER_SCRIPT=os.path.join(BASE_DIR,'audit%sbackend%ssession_check.sh')%(os.sep,os.sep) SESSION_TRACKER_SCRIPT_LOG_PATH=os.path.join(BASE_DIR,'log')#日誌路徑 MULTI_TASK_SCRIPT = os.path.join(BASE_DIR,'multitask_execute.py') #腳本路徑 CURRENT_PGID=None #進程的 pgid
"""zhanggen_audit URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/1.11/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: url(r'^$', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.conf.urls import url, include 2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls')) """ from django.conf.urls import url from django.contrib import admin from audit import views urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^$', views.index ), url(r'^login/$', views.acc_login ), url(r'^logout/$', views.acc_logout ), url(r'^hostlist/$', views.host_list ,name="host_list"), url(r'^multitask/$', views.multitask ,name="multitask"), url(r'^multitask/result/$', views.multitask_result ,name="get_task_result"), url(r'^multitask/cmd/$', views.multi_cmd ,name="multi_cmd"), url(r'^multitask/file_transfer/$', views.multi_file_transfer ,name="multi_file_transfer"), url(r'^api/hostlist/$', views.get_host_list ,name="get_host_list"), url(r'^api/token/$', views.get_token ,name="get_token"), url(r'^api/task/file_upload/$', views.task_file_upload ,name="task_file_upload"), url(r'^api/task/file_download/$', views.task_file_download ,name="task_file_download"), url(r'^end_cmd/$', views.end_cmd,name="end_cmd"), ]
from django.shortcuts import render,redirect,HttpResponse from django.contrib.auth import authenticate,login,logout from django.contrib.auth.decorators import login_required from django.views.decorators.csrf import csrf_exempt from django.conf import settings import signal import json,os from audit import models import random,string import datetime from audit import task_handler from django import conf import zipfile from wsgiref.util import FileWrapper #from django.core.servers.basehttp import FileWrapper @login_required def index(request): return render(request,'index.html') def acc_login(request): error = '' if request.method == "POST": username = request.POST.get('username') password = request.POST.get('password') user = authenticate(username=username,password=password) if user: login(request, user) return redirect(request.GET.get('next') or '/') else: error = "Wrong username or password!" return render(request,'login.html',{'error':error }) @login_required def acc_logout(request): logout(request) return redirect('/login/') @login_required def host_list(request): return render(request,'hostlist.html') @login_required def get_host_list(request): gid = request.GET.get('gid') if gid: if gid == '-1':#未分組 host_list = request.user.account.host_user_binds.all() else: group_obj = request.user.account.host_groups.get(id=gid) host_list = group_obj.host_user_binds.all() data = json.dumps(list(host_list.values('id','host__hostname','host__ip_addr','host__idc__name','host__port', 'host_user__username'))) return HttpResponse(data) @login_required def get_token(request): bind_host_id=request.POST.get('bind_host_id') time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300) # 5mins ago exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id, host_user_bind_id=bind_host_id, date__gt=time_obj) if exist_token_objs: # has token already token_data = {'token': exist_token_objs[0].val} else: token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8)) token_obj=models.Token.objects.create( host_user_bind_id=bind_host_id, account=request.user.account, val=token_val) token_data={"token":token_val} return HttpResponse(json.dumps(token_data)) @login_required def multi_cmd(request): """多命令執行頁面""" return render(request,'multi_cmd.html') @login_required def multitask(request): task_obj = task_handler.Task(request) respose=HttpResponse(json.dumps(task_obj.errors)) if task_obj.is_valid(): # 若是驗證成功 task_obj = task_obj.run() #run()去選擇要執行的任務類型,而後經過 getattr()去執行 respose=HttpResponse(json.dumps({'task_id':task_obj.id,'timeout':task_obj.timeout})) #返回數據庫pk task_id return respose @login_required def multitask_result(request): """多任務結果""" task_id = request.GET.get('task_id') # [ { # 'task_log_id':23. # 'hostname': # 'ipaddr' # 'username' # 'status' # } ] task_obj = models.Task.objects.get(id=task_id) results = list(task_obj.tasklog_set.values('id','status', 'host_user_bind__host__hostname', 'host_user_bind__host__ip_addr', 'result' )) return HttpResponse(json.dumps(results)) @login_required def multi_file_transfer(request): random_str = ''.join(random.sample(string.ascii_lowercase + string.digits, 8)) #return render(request,'multi_file_transfer.html',{'random_str':random_str}) return render(request,'multi_file_transfer.html',locals()) @login_required @csrf_exempt def task_file_upload(request): random_str = request.GET.get('random_str') upload_to = "%s/%s/%s" %(conf.settings.FILE_UPLOADS,request.user.account.id,random_str) if not os.path.isdir(upload_to): os.makedirs(upload_to,exist_ok=True) file_obj = request.FILES.get('file') f = open("%s/%s"%(upload_to,file_obj.name),'wb') for chunk in file_obj.chunks(): f.write(chunk) f.close() print(file_obj) return HttpResponse(json.dumps({'status':0})) def send_zipfile(request,task_id,file_path): """ Create a ZIP file on disk and transmit it in chunks of 8KB, without loading the whole file into memory. A similar approach can be used for large dynamic PDF files. """ zip_file_name = 'task_id_%s_files' % task_id archive = zipfile.ZipFile(zip_file_name , 'w', zipfile.ZIP_DEFLATED) file_list = os.listdir(file_path) for filename in file_list: archive.write('%s/%s' %(file_path,filename),arcname=filename) archive.close() wrapper = FileWrapper(open(zip_file_name,'rb')) response = HttpResponse(wrapper, content_type='application/zip') response['Content-Disposition'] = 'attachment; filename=%s.zip' % zip_file_name response['Content-Length'] = os.path.getsize(zip_file_name) #temp.seek(0) return response @login_required def task_file_download(request): task_id = request.GET.get('task_id') print(task_id) task_file_path = "%s/%s"%( conf.settings.FILE_DOWNLOADS,task_id) return send_zipfile(request,task_id,task_file_path) def end_cmd(request): current_task_pgid=settings.CURRENT_PGID os.killpg(current_task_pgid,signal.SIGKILL) return HttpResponse(current_task_pgid)
import json,subprocess,os,signal from audit import models from django.conf import settings from django.db.transaction import atomic class Task(object): ''' ''' def __init__(self,request): self.request=request self.errors=[] self.task_data=None def is_valid(self): task_data=self.request.POST.get('task_data')#{"task_type":"cmd","selected_host_ids":["1","2"],"cmd":"DF"} if task_data: self.task_data=json.loads(task_data) self.task_type=self.task_data.get('task_type') if self.task_type == 'cmd': selected_host_ids=self.task_data.get('selected_host_ids') if selected_host_ids: return True self.errors.append({'invalid_argument': '命令/主機不存在'}) elif self.task_type == 'files_transfer': selected_host_ids =self.task_data.get('selected_host_ids') pass #驗證文件路徑 else: self.errors.append({'invalid_argument': '不支持的任務類型!'}) self.errors.append({'invalid_data': 'task_data不存在!'}) def run(self): task_func = getattr(self, self.task_data.get('task_type')) # task_obj = task_func() #調用執行命令 print(task_obj.pk) # 100 #這裏是任務id是自增的 return task_obj @atomic #事物操做 任務信息和 子任務都要同時建立完成! def cmd(self): task_obj=models.Task.objects.create( task_type=0, account=self.request.user.account, content=self.task_data.get('cmd'), ) #1.增長批量任務信息,並返回批量任務信息的 pk tasklog_objs=[] #2.增長子任務信息(初始化數據庫) host_ids = set(self.task_data.get("selected_host_ids")) # 獲取選中的主機id,並用集合去重 for host_id in host_ids: tasklog_objs.append(models.TaskLog(task_id=task_obj.id, host_user_bind_id=host_id, status = 3)) models.TaskLog.objects.bulk_create(tasklog_objs,100) # 沒100條記錄 commit 1次! task_id=task_obj.pk cmd_str = "python3 %s %s" % (settings.MULTI_TASK_SCRIPT,task_id) # 執行multitask.py腳本路徑 print('------------------>',cmd_str) multitask_obj = subprocess.Popen(cmd_str,stdout=subprocess.PIPE,shell=True,stderr=subprocess.PIPE) #新打開1個新進程 settings.CURRENT_PGID=os.getpgid(multitask_obj.pid) #os.getpgid(multitask_obj.pid) # os.killpg(pgid=pgid,sig=signal.SIGKILL) # print(multitask_obj.stderr.read().decode('utf-8') or multitask_obj.stdout.read().decode('utf-8')) #print("task result :",multitask_obj.stdout.read().decode('utf-8'),multitask_obj.stderr.read().decode('utf-8')) # print(multitask_obj.stdout.read()) # for host_id in self.task_data.get('selected_host_ids'): # t=Thread(target=self.run_cmd,args=(host_id,self.task_data.get('cmd'))) # t.start() return task_obj def run_cmd(self,host_id,cmd): pass def files_transfer(self): pass
{% extends 'index.html' %} {% block content-container %} {# {% csrf_token %}#} <div id="page-title"> <h1 class="page-header text-overflow">主機列表</h1> <!--Searchbox--> <div class="searchbox"> <div class="input-group custom-search-form"> <input type="text" class="form-control" placeholder="Search.."> <span class="input-group-btn"> <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button> </span> </div> </div> </div> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End page title--> <!--Breadcrumb--> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <ol class="breadcrumb"> <li><a href="#">Home</a></li> <li><a href="#">Library</a></li> <li class="active">主機列表</li> </ol> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End breadcrumb--> <div id="page-content"> <div class="panel col-lg-3"> <div class="panel-heading"> <h3 class="panel-title">主機組 <span id="selected_hosts"></span></h3> </div> <div class="panel-body"> <ul class="list-group" id="host_groups"> {% for group in request.user.account.host_groups.all %} <li class="list-group-item "><span class="badge badge-success">{{ group.host_user_binds.count }}</span> <input type="checkbox" onclick="CheckAll(this)"> <a onclick="DisplayHostList(this)">{{ group.name }}</a> <!--點擊組名,組名下的 主機列表經過toggleclass 展現/隱藏 --> <ul class="hide"> {% for bind_host in group.host_user_binds.all %} <li><input onclick="ShowCheckedHostCount()" type="checkbox" value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li> {% endfor %} </ul> </li> {% endfor %} <li class="list-group-item "><span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span> <input type="checkbox" onclick="CheckAll(this)"> <a onclick="DisplayHostList(this)">未分組主機</a> <ul class="hide"> {% for bind_host in request.user.account.host_user_binds.all %} <li><input onclick="ShowCheckedHostCount()" type="checkbox" value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li> {% endfor %} </ul> </li> </ul> </div> </div> <div class="col-lg-9"> <div class="panel"> <div class="panel-heading"> <h3 class="panel-title">命令</h3> </div> <div class="panel-body"> <textarea class="form-control" id="cmd"></textarea> <button onclick="PostTask('cmd')" class="btn btn-info pull-right">執行</button> <button class="btn btn-danger" onclick="End()">終止</button> </div> </div> <div id="task_result_panel" class="panel"> <div class="panel-heading"> <h3 class="panel-title">任務結果</h3> </div> <div class="panel-body"> <div class="progress"> <div id='task_progress' style="width: 0%;" class="progress-bar progress-bar-info"></div> </div> <div id="task_result"></div> </div> </div> </div> <script> function DisplayHostList(self) { $(self).next().toggleClass("hide"); } function CheckAll(self) { console.log($(self).prop('checked')); $(self).parent().find("ul :checkbox").prop('checked', $(self).prop('checked')); ShowCheckedHostCount() } function ShowCheckedHostCount() { var selected_host_count = $("#host_groups ul").find(":checked").length; console.log(selected_host_count); $("#selected_hosts").text(selected_host_count); return selected_host_count } function GetTaskResult(task_id, task_timeout) { $.getJSON("{% url 'get_task_result' %}", {'task_id': task_id}, function (callback) { console.log(callback); var result_ele = ''; var all_task_finished = true; //所有完成flag var finished_task_count = 0; //已完成的任務數量 $.each(callback, function (index, i) { var p_ele = "<p>" + i.host_user_bind__host__hostname + "(" + i.host_user_bind__host__ip_addr + ") ------" + i.status + "</p>"; var res_ele = "<pre>" + i.result + "</pre>"; //<pre> 標籤按後端格式顯示數據 var single_result = p_ele + res_ele; result_ele += single_result; if (i.status == 3) { all_task_finished = false; } else { //task not finished yet finished_task_count += 1; } }); if (task_timeout_counter < task_timeout) { task_timeout_counter += 2; } else { all_task_finished = true } if (all_task_finished) { //完成! clearInterval(result_timer); var unexecuted =callback.length-finished_task_count; $.niftyNoty({ //提示超時 type: 'danger', container: '#task_result_panel', html: '<h4 id="Prompt">'+'執行:'+callback.length +' '+ '完成:'+finished_task_count+' '+'失敗:'+ unexecuted +'</h4>', closeBtn: false }); console.log("timmer canceled....") } $("#task_result").html(result_ele); var total_finished_percent = parseInt(finished_task_count / callback.length * 100); $("#task_progress").text(total_finished_percent + "%"); $("#task_progress").css("width", total_finished_percent + "%"); } ) ;//end getJSON } function PostTask(task_type) { //1. 驗證主機列表已選,命令已輸入 //2. 提交任務到後臺 $('.alert').remove(); var selected_host_ids = []; var selected_host_eles = $("#host_groups ul").find(":checked"); $.each(selected_host_eles, function (index, ele) { selected_host_ids.push($(ele).val()) }); console.log(selected_host_ids); if (selected_host_ids.length == 0) { alert("主機未選擇!"); return false; } var cmd_text = $.trim($("#cmd").val()); if (cmd_text.length == 0) { alert("未輸入命令!"); return false; } var task_data = { 'task_type': task_type, 'selected_host_ids': selected_host_ids, 'cmd': cmd_text }; $.post("{% url 'multitask' %}", { 'csrfmiddlewaretoken': "{{ csrf_token }}", 'task_data': JSON.stringify(task_data) }, function (callback) { console.log(callback);// task id var callback = JSON.parse(callback); task_timeout_counter = 0;// add 2 during each call of GetTaskResult GetTaskResult(callback.task_id, callback.timeout); //那批量任務ID 去獲取子任務的進展!那超時時間作對比 result_timer = setInterval(function () { GetTaskResult(callback.task_id, callback.timeout) }, 2000); //diplay download file btn $("#file-download-btn").removeClass("hide").attr('href', "{% url 'task_file_download' %}?task_id=" + callback.task_id); });//end post } function End(){ $.getJSON("{% url 'end_cmd' %}", function (callback) { console.log(callback) }) } </script> {% endblock %}
import time import sys,os import multiprocessing import paramiko def cmd_run(tasklog_id,cmd_str): try: import django django.setup() from audit import models tasklog_obj = models.TaskLog.objects.get(id=tasklog_id) print(tasklog_obj, cmd_str) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(tasklog_obj.host_user_bind.host.ip_addr, tasklog_obj.host_user_bind.host.port, tasklog_obj.host_user_bind.host_user.username, tasklog_obj.host_user_bind.host_user.password, timeout=15) #配置超時時間15秒! stdin, stdout, stderr = ssh.exec_command(cmd_str) result = stdout.read() + stderr.read() print('---------%s--------' % tasklog_obj.host_user_bind) print(result) ssh.close() tasklog_obj.result = result or 'cmd has no result output .'#若是沒有 返回結果 /出現錯誤 tasklog_obj.status = 0 tasklog_obj.save() except Exception as e: print(e) def file_transfer(bind_host_obj): pass if __name__ == '__main__': BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(BASE_DIR) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "zhanggen_audit.settings") import django django.setup() from audit import models task_id = sys.argv[1] from audit import models task_id=int(sys.argv[1]) # 1. 根據Taskid拿到任務對象, # 2. 拿到任務關聯的全部主機 # 3. 根據任務類型調用多進程 執行不一樣的方法 # 4 . 每一個子任務執行完畢後,本身把 子任務執行結果 寫入數據庫 TaskLog表 task_obj = models.Task.objects.get(id=task_id) pool=multiprocessing.Pool(processes=10) #開啓 1個擁有10個進程的進程池 if task_obj.task_type == 0: task_func=cmd_run else: task_func =file_transfer for task_log in task_obj.tasklog_set.all(): #查詢子任務信息,並更新子任務,進入執行階段! pool.apply_async(task_func,args=(task_log.pk,task_obj.content)) #開啓子進程,把子任務信息的pk、和 批量任務的命令傳進去! pool.close() pool.join()
1.上傳本地文件至多臺服務器(批量上傳)
每次訪問批量上傳頁面上傳惟一字符串
使用filedropzone組件作批量上傳ul,並限制文件大小、個數,文件提交後端時攜帶 惟一字符串
後端生成 /固定上傳路徑/用戶ID/惟一字符串/文件的路徑,並寫入文件;(filedropzone組件把文件拖拽進去以後,自動上傳)
前端點擊執行 驗證堡壘機上的用戶上傳路徑是否合法,而後開啓多進程 分別經過paramiko去發送至遠程服務的路徑
""" Django settings for zhanggen_audit project. Generated by 'django-admin startproject' using Django 1.11.4. For more information on this file, see https://docs.djangoproject.com/en/1.11/topics/settings/ For the full list of settings and their values, see https://docs.djangoproject.com/en/1.11/ref/settings/ """ import os # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = '5ivlngau4a@_3y4vizrcxnnj(&vz2en#edpq%i&jr%99-xxv)&' # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True ALLOWED_HOSTS = ['*'] # Application definition INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'audit.apps.AuditConfig', ] MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', ] ROOT_URLCONF = 'zhanggen_audit.urls' TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [os.path.join(BASE_DIR, 'templates'),], 'APP_DIRS': True, 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', ], }, }, ] WSGI_APPLICATION = 'zhanggen_audit.wsgi.application' # Database # https://docs.djangoproject.com/en/1.11/ref/settings/#databases DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', }, { 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', }, { 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', }, { 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', }, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = 'en-us' TIME_ZONE = 'Asia/Shanghai' USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = '/static/' STATICFILES_DIRS=( os.path.join(BASE_DIR,'static'), ) SESSION_TRACKER_SCRIPT=os.path.join(BASE_DIR,'audit%sbackend%ssession_check.sh')%(os.sep,os.sep) SESSION_TRACKER_SCRIPT_LOG_PATH=os.path.join(BASE_DIR,'log')#日誌路徑 MULTI_TASK_SCRIPT = os.path.join(BASE_DIR,'multitask_execute.py') #腳本路徑 CURRENT_PGID=None #進程的 pgid FILE_UPLOADS = os.path.join(BASE_DIR,'uploads') #上傳文件的堡壘機路徑 FILE_DOWNLOADS = os.path.join(BASE_DIR,'downloads') #下載文件的堡壘機路徑
<script> function DisplayHostList(self) { $(self).next().toggleClass("hide"); } function CheckAll(self){ console.log($(self).prop('checked')); $(self).parent().find("ul :checkbox").prop('checked',$(self).prop('checked')); ShowCheckedHostCount() } function ShowCheckedHostCount(){ var selected_host_count = $("#host_groups ul").find(":checked").length console.log(selected_host_count); $("#selected_hosts").text(selected_host_count); return selected_host_count } function GetTaskResult(task_id,task_timeout) { $.getJSON("{% url 'get_task_result' %}",{'task_id':task_id},function(callback){ console.log(callback) var result_ele = '' var all_task_finished = true var finished_task_count = 0 ; $.each(callback,function (index,i) { var p_ele = "<p>" + i.host_user_bind__host__hostname + "(" +i.host_user_bind__host__ip_addr +") ------" + i.status + "</p>"; var res_ele = "<pre>" + i.result +"</pre>"; var single_result = p_ele + res_ele; result_ele += single_result; //check if ths sub task is finished. if ( i.status == 3){ all_task_finished = false; }else { //task not finished yet finished_task_count += 1; } });//end each //check if the task_timer_count < task_timeout, otherwise it means the task is timedout, setInterval function need to be cancelled if (task_timeout_counter < task_timeout){ // not timed out yet task_timeout_counter += 2; }else { all_task_finished = true; // set all task to be finished ,because it 's already reached the global timeout $.niftyNoty({ type: 'danger', container : '#task_result_panel', html : '<h4 class="alert-title">Task timed out!</h4><p class="alert-message">The task has timed out!</p><div class="mar-top"><button type="button" class="btn btn-info" data-dismiss="noty">Close this notification</button></div>', closeBtn : false }); } if ( all_task_finished){ clearInterval(result_timer); console.log("timmer canceled....") } $("#task_result").html(result_ele); // set progress bar var total_finished_percent = parseInt(finished_task_count / callback.length * 100 ); $("#task_progress").text(total_finished_percent+"%"); $("#task_progress").css("width",total_finished_percent +"%"); });//end getJSON } function PostTask(task_type) { //1. 驗證主機列表已選,命令已輸入 //2. 提交任務到後臺 var selected_host_ids = []; var selected_host_eles = $("#host_groups ul").find(":checked") $.each(selected_host_eles,function (index,ele) { selected_host_ids.push($(ele).val()) }); console.log(selected_host_ids) if ( selected_host_ids.length == 0){ alert("主機未選擇!") return false } if ( task_type == 'cmd'){ var cmd_text = $.trim($("#cmd").val()) if ( cmd_text.length == 0){ alert("未輸入命令!") return false } }else { //file_transfer var remote_path = $("#remote_path").val(); if ($.trim(remote_path).length == 0){ alert("必須輸入1個遠程路徑") return false } } var task_data = { 'task_type':task_type, 'selected_host_ids': selected_host_ids, //'cmd': cmd_text }; if ( task_type == 'cmd'){ task_data['cmd'] = cmd_text }else { var file_transfer_type = $("select[name='transfer-type']").val(); task_data['file_transfer_type'] = file_transfer_type; task_data['random_str'] = "{{ random_str }}"; task_data['remote_path'] = $("#remote_path").val(); } $.post("{% url 'multitask' %}",{'csrfmiddlewaretoken':"{{ csrf_token }}",'task_data':JSON.stringify(task_data)}, function(callback){ console.log(callback) ;// task id var callback = JSON.parse(callback); GetTaskResult(callback.task_id,callback.timeout); task_timeout_counter = 0; // add 2 during each call of GetTaskResult result_timer = setInterval(function () { GetTaskResult(callback.task_id,callback.timeout) },2000); //diplay download file btn $("#file-download-btn").removeClass("hide").attr('href', "{% url 'task_file_download' %}?task_id="+callback.task_id); } );//end post } </script>
{% extends 'index.html' %} {% block extra-css %} <link href="/static/plugins/dropzone/dropzone.css" rel="stylesheet"> <script src="/static/plugins/dropzone/dropzone.js"></script> {% endblock %} {% block content-container %} <div id="page-title"> <h1 class="page-header text-overflow">主機列表</h1> <!--Searchbox--> <div class="searchbox"> <div class="input-group custom-search-form"> <input type="text" class="form-control" placeholder="Search.."> <span class="input-group-btn"> <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button> </span> </div> </div> </div> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End page title--> <!--Breadcrumb--> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <ol class="breadcrumb"> <li><a href="#">Home</a></li> <li><a href="#">Library</a></li> <li class="active">主機列表</li> </ol> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End breadcrumb--> <div id="page-content"> {% include 'components/hostgroups.html' %} <div class="col-lg-9"> <div class="panel"> <div class="panel-heading"> <h3 class="panel-title">文件傳輸</h3> </div> <div class="panel-body"> <select name="transfer-type" onchange="ToggleUploadEle(this)"> <option value="send">發送文件到遠程主機</option> <option value="get">from遠程主機下載文件</option> </select> <form id="filedropzone" class="dropzone"> </form> {# <input type="hidden" value="{{ random_str }}" name="random_str">#} <input id="remote_path" class="form-control" type="text" placeholder="遠程路徑"> <button id="file_count" onclick="PostTask('file_transfer')" class="btn btn-info pull-right">執行</button> <button class="btn btn-danger ">終止</button> <a id="file-download-btn" class="btn btn-info hide" href="">下載任務文件到本地</a> </div> </div> {% include 'components/taskresult.html' %} </div> </div> </div> {% include 'components/multitask_js.html' %} <script> $('#filedropzone').dropzone({ url: "{% url 'task_file_upload' %}?random_str={{ random_str }}", //必須填寫 method: "post", //也可用put maxFiles: 10,//一次性上傳的文件數量上限 maxFilesize: 2, //MB //acceptedFiles: ".jpg,.gif,.png"//限制上傳的類型 dictMaxFilesExceeded: "您最多隻能上傳10個文件!", dictFileTooBig: "文件過大上傳文件最大支持." /* init: function () { this.on("success", function (file) { //文件上傳成功觸發事件 $('#file_count').attr('file_count') }); } */ }); Dropzone.autoDiscover = false; function ToggleUploadEle(self) { console.log($(self).val()); if ($(self).val() == 'get') { $(self).next().addClass("hide") } else { $(self).next().removeClass('hide') } } </script> {% endblock %}
from django.conf.urls import url from django.contrib import admin from audit import views urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^$', views.index ), url(r'^login/$', views.acc_login ), url(r'^logout/$', views.acc_logout ), url(r'^hostlist/$', views.host_list ,name="host_list"), url(r'^multitask/$', views.multitask ,name="multitask"), url(r'^multitask/result/$', views.multitask_result ,name="get_task_result"), url(r'^multitask/cmd/$', views.multi_cmd ,name="multi_cmd"), url(r'^api/hostlist/$', views.get_host_list ,name="get_host_list"), url(r'^api/token/$', views.get_token ,name="get_token"), url(r'^multitask/file_transfer/$', views.multi_file_transfer, name="multi_file_transfer"), url(r'^api/task/file_upload/$', views.task_file_upload ,name="task_file_upload"), url(r'^api/task/file_download/$', views.task_file_download ,name="task_file_download"), url(r'^end_cmd/$', views.end_cmd,name="end_cmd"), ]
import json,subprocess,os,signal from audit import models from django.conf import settings from django.db.transaction import atomic class Task(object): ''' ''' def __init__(self,request): self.request=request self.errors=[] self.task_data=None def is_valid(self): task_data=self.request.POST.get('task_data')#{"task_type":"cmd","selected_host_ids":["1","2"],"cmd":"DF"} if task_data: self.task_data=json.loads(task_data) self.task_type=self.task_data.get('task_type') if self.task_type == 'cmd': selected_host_ids=self.task_data.get('selected_host_ids') if selected_host_ids: return True self.errors.append({'invalid_argument': '命令/主機不存在'}) elif self.task_type == 'file_transfer': # selected_host_ids =self.task_data.get('selected_host_ids') self.task_type = self.task_data.get('task_type') #驗證文件路徑 user_id=models.Account.objects.filter(user=self.request.user).first().pk random_str=self.task_data.get('random_str') file_path=settings.FILE_UPLOADS+os.sep+str(user_id)+os.sep+random_str if os.path.isdir(file_path): return True if not os.path.isdir(file_path): self.errors.append({'invalid_argument': '上傳路徑失敗,請從新上傳'}) if not selected_host_ids: self.errors.append({'invalid_argument': '遠程主機不存在'}) else: self.errors.append({'invalid_argument': '不支持的任務類型!'}) self.errors.append({'invalid_data': 'task_data不存在!'}) def run(self): task_func = getattr(self, self.task_data.get('task_type')) # task_obj = task_func() #調用執行命令 #print(task_obj.pk) # 100 #這裏是任務id是自增的 return task_obj @atomic #事物操做 任務信息和 子任務都要同時建立完成! def cmd(self): task_obj=models.Task.objects.create( task_type=0, account=self.request.user.account, content=self.task_data.get('cmd'), ) #1.增長批量任務信息,並返回批量任務信息的 pk tasklog_objs=[] #2.增長子任務信息(初始化數據庫) host_ids = set(self.task_data.get("selected_host_ids")) # 獲取選中的主機id,並用集合去重 for host_id in host_ids: tasklog_objs.append(models.TaskLog(task_id=task_obj.id, host_user_bind_id=host_id, status = 3)) models.TaskLog.objects.bulk_create(tasklog_objs,100) # 沒100條記錄 commit 1次! task_id=task_obj.pk cmd_str = "python %s %s" % (settings.MULTI_TASK_SCRIPT,task_id) # 執行multitask.py腳本路徑 print('------------------>',cmd_str) multitask_obj = subprocess.Popen(cmd_str,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) #新打開1個新進程 #settings.CURRENT_PGID=os.getpgid(multitask_obj.pid) #os.getpgid(multitask_obj.pid) # os.killpg(pgid=pgid,sig=signal.SIGKILL) # print(multitask_obj.stderr.read().decode('utf-8') or multitask_obj.stdout.read().decode('utf-8')) #print("task result :",multitask_obj.stdout.read().decode('utf-8'),multitask_obj.stderr.read().decode('utf-8')) # print(multitask_obj.stdout.read()) # for host_id in self.task_data.get('selected_host_ids'): # t=Thread(target=self.run_cmd,args=(host_id,self.task_data.get('cmd'))) # t.start() return task_obj @atomic # 事物操做 任務信息和 子任務都要同時建立完成! def file_transfer(self): print(self.task_data) #{'task_type': 'file_transfer', 'selected_host_ids': ['3'], 'file_transfer_type': 'send', 'random_str': 'iuon9bhm', 'remote_path': '/'} task_obj = models.Task.objects.create( task_type=1, account=self.request.user.account, content=json.dumps(self.task_data), ) # 1.增長批量任務信息,並返回批量任務信息的 pk tasklog_objs = [] # 2.增長子任務信息(初始化數據庫) host_ids = set(self.task_data.get("selected_host_ids")) # 獲取選中的主機id,並用集合去重 for host_id in host_ids: tasklog_objs.append(models.TaskLog(task_id=task_obj.id, host_user_bind_id=host_id, status=3)) models.TaskLog.objects.bulk_create(tasklog_objs, 100) # 沒100條記錄 commit 1次! task_id = task_obj.pk cmd_str = "python %s %s" % (settings.MULTI_TASK_SCRIPT, task_id) # 執行multitask.py腳本路徑 print('------------------>', cmd_str) multitask_obj = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 新打開1個新進程 return task_obj
import time,json import sys,os import multiprocessing import paramiko def cmd_run(tasklog_id,task_obj_id,cmd_str,): try: import django django.setup() from audit import models tasklog_obj = models.TaskLog.objects.get(id=tasklog_id) print(tasklog_obj, cmd_str) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(tasklog_obj.host_user_bind.host.ip_addr, tasklog_obj.host_user_bind.host.port, tasklog_obj.host_user_bind.host_user.username, tasklog_obj.host_user_bind.host_user.password, timeout=15) #配置超時時間15秒! stdin, stdout, stderr = ssh.exec_command(cmd_str) result = stdout.read() + stderr.read() print('---------%s--------' % tasklog_obj.host_user_bind) print(result) ssh.close() #修改子任務數據庫結果 tasklog_obj.result = result or 'cmd has no result output .'#若是沒有 返回結果 /出現錯誤 tasklog_obj.status = 0 tasklog_obj.save() except Exception as e: print(e) def file_transfer(tasklog_id,task_id,task_content): import django django.setup() from django.conf import settings from audit import models tasklog_obj = models.TaskLog.objects.get(id=tasklog_id) try: print('task contnt:', tasklog_obj) task_data = json.loads(tasklog_obj.task.content) t = paramiko.Transport((tasklog_obj.host_user_bind.host.ip_addr, tasklog_obj.host_user_bind.host.port)) t.connect(username=tasklog_obj.host_user_bind.host_user.username, password=tasklog_obj.host_user_bind.host_user.password,) sftp = paramiko.SFTPClient.from_transport(t) if task_data.get('file_transfer_type') =='send': local_path = "%s/%s/%s" %( settings.FILE_UPLOADS, tasklog_obj.task.account.id, task_data.get('random_str')) print("local path",local_path) for file_name in os.listdir(local_path): sftp.put('%s/%s' %(local_path,file_name), '%s/%s'%(task_data.get('remote_path'), file_name)) tasklog_obj.result = "send all files done..." else: # 循環到全部的機器上的指定目錄下載文件 download_dir = "{download_base_dir}/{task_id}".format(download_base_dir=settings.FILE_DOWNLOADS, task_id=task_id) if not os.path.exists(download_dir): os.makedirs(download_dir,exist_ok=True) remote_filename = os.path.basename(task_data.get('remote_path')) local_path = "%s/%s.%s" %(download_dir,tasklog_obj.host_user_bind.host.ip_addr,remote_filename) sftp.get(task_data.get('remote_path'),local_path ) #remote path /tmp/test.py tasklog_obj.result = 'get remote file [%s] to local done' %(task_data.get('remote_path')) t.close() tasklog_obj.status = 0 tasklog_obj.save() # ssh = paramiko.SSHClient() # ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) except Exception as e: print("error :",e ) tasklog_obj.result = str(e) tasklog_obj.save() if __name__ == '__main__': BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(BASE_DIR) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "zhanggen_audit.settings") import django django.setup() from audit import models task_id = sys.argv[1] from audit import models task_id=int(sys.argv[1]) # 1. 根據Taskid拿到任務對象, # 2. 拿到任務關聯的全部主機 # 3. 根據任務類型調用多進程 執行不一樣的方法 # 4 . 每一個子任務執行完畢後,本身把 子任務執行結果 寫入數據庫 TaskLog表 task_obj = models.Task.objects.get(id=task_id) pool=multiprocessing.Pool(processes=10) #開啓 1個擁有10個進程的進程池 if task_obj.task_type == 0: task_func=cmd_run else: task_func =file_transfer for task_log in task_obj.tasklog_set.all(): #查詢子任務信息,並更新子任務,進入執行階段! pool.apply_async(task_func,args=(task_log.id,task_obj.id,task_obj.content)) #開啓子進程,把子任務信息的pk、和 批量任務的命令傳進去! pool.close() pool.join()
2.從多臺服務器上get文件至本地(批量下載)
用戶輸入遠程服務器文件路徑,堡壘機生成本地下載路徑( /下載文件路徑/task_id/ip.遠程文件名)
開啓多進程 經過paramiko下載遠程主機的文件 到堡壘機下載路徑;
任務執行完畢前端彈出 下載文件到本地按鈕 (攜帶?批量任務ID)
用戶點擊下載文件到本地 a標籤,後端獲取當前批量任務的ID,把當前批量任務下載的files,打包返回給用戶瀏覽器!
def send_zipfile(request,task_id,file_path): zip_file_name = 'task_id_%s_files' % task_id archive = zipfile.ZipFile(zip_file_name , 'w', zipfile.ZIP_DEFLATED) #建立1個zip 包 file_list = os.listdir(file_path) #找到堡壘機目錄下 全部文件 for filename in file_list: #把全部文件寫入 zip包中! archive.write('%s/%s' %(file_path,filename),arcname=filename) archive.close() #-------------------------------------------------------------- #文件打包完畢! wrapper = FileWrapper(open(zip_file_name,'rb')) #在內存中打開 打包好的壓縮包 response = HttpResponse(wrapper, content_type='application/zip') #修改Django的response的content_type response['Content-Disposition'] = 'attachment; filename=%s.zip' % zip_file_name #告訴流量器以 附件形式下載 response['Content-Length'] = os.path.getsize(zip_file_name) #文件大小 #temp.seek(0) return response @login_required def task_file_download(request): #下載文件到本地 task_id = request.GET.get('task_id') print(task_id) task_file_path = "%s/%s"%( conf.settings.FILE_DOWNLOADS,task_id) download_files=os.listdir(task_file_path) print(download_files) return send_zipfile(request,task_id,task_file_path) #調用打包函數
3.架構描述
當前架構缺陷:multitask在堡壘機上開多進程,隨着用戶量的增加,開啓的進程數量也會越多;
將來設想:在Django 和 multitask之間增長隊列,實現用戶大併發!
GitHub:https://github.com/zhanggen3714/zhanggen_audit
GateOne安裝