前言:
每個公司的網絡環境大都劃分 辦公網絡、線上網絡,之所以劃分的主要原因是爲了保證線上操作安全;
對於外部用戶而言也只能訪問線上網絡的特定開放端口,那麼是什麼控制了用戶訪問線上網絡的呢?
防火牆過濾......!
對於內部員工而言對線上系統日常運維、代碼部署如何安全訪問線上業務系統呢?如何監控、記錄技術人員的操作記錄?
堡壘機策略:
1.回收所有遠程登錄Linux主機的用戶名、密碼;
2.中間設置堡壘機(保存所有線上Linux主機的用戶名、密碼);
3.所有技術人員都要通過堡壘機去獲取用戶名、密碼,然後在再去連接 線上系統,並記錄操作日誌;
堡壘機策略優點:
1.記錄用戶操作;
2.實現遠程操作權限集中管理;
一、堡壘機表結構設計
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from django.db import models from django.contrib.auth.models import User # Create your models here. class IDC(models.Model): name = models.CharField(max_length=64,unique=True) def __str__(self): return self.name class Host(models.Model): """存儲所有主機信息""" hostname = models.CharField(max_length=64,unique=True) ip_addr = models.GenericIPAddressField(unique=True) port = models.IntegerField(default=22) idc = models.ForeignKey("IDC") #host_groups = models.ManyToManyField("HostGroup") #host_users = models.ManyToManyField("HostUser") enabled = models.BooleanField(default=True) def __str__(self): return "%s-%s" %(self.hostname,self.ip_addr) class HostGroup(models.Model): """主機組""" name = models.CharField(max_length=64,unique=True) host_user_binds = models.ManyToManyField("HostUserBind") def __str__(self): return self.name class HostUser(models.Model): """存儲遠程主機的用戶信息 root 123 root abc root sfsfs """ auth_type_choices = ((0,'ssh-password'),(1,'ssh-key')) auth_type = models.SmallIntegerField(choices=auth_type_choices) username = models.CharField(max_length=32) password = models.CharField(blank=True,null=True,max_length=128) def __str__(self): return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password) class Meta: unique_together = ('username','password') class HostUserBind(models.Model): """綁定主機和用戶""" host = models.ForeignKey("Host") host_user = models.ForeignKey("HostUser") def __str__(self): return "%s-%s" %(self.host,self.host_user) class Meta: unique_together = ('host','host_user') class SessionLog(models.Model): ''' 記錄每個用戶登錄操作,ID傳給 shell生成文件命名 ''' account=models.ForeignKey('Account') host_user_bind=models.ForeignKey('HostUserBind') start_date=models.DateField(auto_now_add=True) end_date=models.DateField(blank=True,null=True) def __str__(self): return '%s-%s'%(self.account,self.host_user_bind) class AuditLog(models.Model): """審計日誌""" class Account(models.Model): """堡壘機賬戶 1. 擴展 2. 繼承 user.account.host_user_bind """ user = models.OneToOneField(User) name = models.CharField(max_length=64) host_user_binds = models.ManyToManyField("HostUserBind",blank=True) host_groups = models.ManyToManyField("HostGroup",blank=True)
二、通過堡壘機遠程登錄Linux主機
2種堡壘機登錄方式:
命令行登錄堡壘機方式:
方式1:通過 修改open_shh源碼擴展-Z option生成唯一 ssh進程,使用Linux的strace 命令對唯一 ssh進程進行檢測生成日誌文件;
0.用戶執行audit_shell出現交互界面,提示用戶輸入機組和主機;
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import sys,os,django os.environ.setdefault("DJANGO_SETTINGS_MODULE","zhanggen_audit.settings") django.setup() #在Django視圖之外,調用Django功能設置環境變量! from audit.backend import user_interactive if __name__ == '__main__': shell_obj=user_interactive.UserShell(sys.argv) shell_obj.start()
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from django.contrib.auth import authenticate class UserShell(object): '''用戶登錄堡壘機,啓動自定製shell ''' def __init__(self,sys_argv): self.sys_argv=sys_argv self.user=None def auth(self): count=0 while count < 3: username=input('username:').strip() password=input('password:').strip() user=authenticate(username=username,password=password) #none 代表認證失敗,返回用戶對象認證成功! if not user: count+=1 print('無效的用戶名或者,密碼!') else: self.user=user return True else: print('輸入次數超過3次!') def start(self): """啓動交互程序""" if self.auth(): # print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index, group in enumerate(host_groups): print("%s.\t%s[%s]" % (index, group, group.host_user_binds.count())) print("%s.\t未分組機器[%s]" % (len(host_groups), self.user.account.host_user_binds.count())) choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >= 0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): # 選擇的未分組機器 # selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index, host in enumerate(host_bind_list): print("%s.\t%s" % (index, host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >= 0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] print("selected host", selected_host) elif choice2 == 'b': break
知識點:
在Django視圖之外,調用Django功能設置環境變量!(切記放在和Django manage.py 同級目錄);
import sys,os,django os.environ.setdefault("DJANGO_SETTINGS_MODULE","zhanggen_audit.settings")
注意:在Django啓動時會自動加載一些 文件,比如每個app中admin.py,不能在這些文件裏面設置加載環境變量,因爲已經加載完了,如果違反這個規則會導致Django程序啓動失敗;
1.實現ssh用戶指令檢測
1.0 修改open_shh源碼,擴展 ssh -Z 唯一標識符;(這樣每次ssh遠程登錄,都可以利用唯一標識符,分辨出 每個ssh會話進程;)
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
修改OpenSsh下的ssh.c文件的608和609行、935行增加; while ((opt = getopt(ac, av, "1246ab:c:e:fgi:kl:m:no:p:qstvxz:" "ACD:E:F:GI:J:KL:MNO:PQ:R:S:TVw:W:XYyZ:")) != -1) { case 'Z': break;
知識點:
OpenSSH 是 SSH (Secure SHell) 協議的免費開源實現項目。
1.1 修改openssh之後,編譯、安裝
chmod 755 configure ./configure --prefix=/usr/local/openssh make chmod 755 mkinstalldirs make install sshpass -p xxxxxx123 /usr/local/openssh/bin/ssh [email protected] -Z s1123ssssd212
1.2 每個ssh會話進程可以唯一標識之後,在堡壘機使用會話腳本shell腳本檢測 ssh會話進程;(strace命令進行監控,並生產 log日誌文件);
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#!/usr/bin/bash for i in $(seq 1 30);do echo $i $1 process_id=`ps -ef | grep $1 | grep -v 'ession_check.sh' | grep -v grep | grep -v sshpass | awk '{print $2}'` echo "process: $process_id" if [ ! -z "$process_id" ];then echo 'start run strace.....' strace -fp $process_id -t -o $2.log; break; fi sleep 5 done;
知識點:
strace 檢測進程的IO調用,監控用戶shell輸入的命令字符;
sshpass無需提示輸入密碼登錄
[[email protected] sshpass-1.06]# sshpass -p wsnb ssh [email protected] -o StrictHostKeyChecking=no Last login: Tue Jul 10 16:39:53 2018 from 192.168.113.84 [root@ecdb ~]#
python生成唯一標識符
s=string.ascii_lowercase+string.digits random_tag=''.join(random.sample(s,10))
解決普通用戶,無法執行 strace命令;
方式1:執行文件 +s權限
chmod u+s `which strace`
方式2:修改sudo配置文件,使普通用戶sudo時無需輸入密碼!
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
修改sudo配置文件,防止修改出錯,一定要切換到root用戶; %普通用戶 ALL=(ALL) NOPASSWD: ALL wq! #退出
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#!/usr/bin/python3 # -*- coding: utf-8 -* from django.contrib.auth import authenticate import subprocess,string,random from audit import models from django.conf import settings class UserShell(object): '''用戶登錄堡壘機,啓動自定製shell ''' def __init__(self,sys_argv): self.sys_argv=sys_argv self.user=None def auth(self): count=0 while count < 3: username=input('username:').strip() password=input('password:').strip() user=authenticate(username=username,password=password) #none 代表認證失敗,返回用戶對象認證成功! if not user: count+=1 print('無效的用戶名或者,密碼!') else: self.user=user return True else: print('輸入次數超過3次!') def start(self): """啓動交互程序""" if self.auth(): # print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index, group in enumerate(host_groups): print("%s.\t%s[%s]" % (index, group, group.host_user_binds.count())) print("%s.\t未分組機器[%s]" % (len(host_groups), self.user.account.host_user_binds.count())) choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >= 0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): # 選擇的未分組機器 # selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index, host in enumerate(host_bind_list): print("%s.\t%s" % (index, host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >= 0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] s = string.ascii_lowercase + string.digits random_tag = ''.join(random.sample(s, 10)) session_obj=models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host) session_tracker_scipt='/bin/sh %s %s %s'%(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.pk) session_tracker_process=subprocess.Popen(session_tracker_scipt,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) cmd='sshpass -p %s /usr/local/openssh/bin/ssh %[email protected]%s -p %s -o stricthostkeychecking=no -Z %s' % (selected_host.host_user.password, selected_host.host_user.username, selected_host.host.ip_addr, selected_host.host.port,random_tag) subprocess.run(cmd,shell=True)#開啓子進程交互 print(session_tracker_process.stdout.readlines(), session_tracker_process.stderr.readlines()) elif choice2 == 'b': break
2.shell遠程登錄程序檢查日誌文件,分析;
tab補全的命令,需要搜素write(5,該腳本實現思路,按鍵去嘗試,循環多種條件判斷;
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import re class AuditLogHandler(object): '''分析audit log日誌''' def __init__(self,log_file): self.log_file_obj = self._get_file(log_file) def _get_file(self,log_file): return open(log_file) def parse(self): cmd_list = [] cmd_str = '' catch_write5_flag = False #for tab complication for line in self.log_file_obj: #print(line.split()) line = line.split() try: pid,time_clock,io_call,char = line[0:4] if io_call.startswith('read(4'): if char == '"\\177",':#回退 char = '[1<-del]' if char == '"\\33OB",': #vim中下箭頭 char = '[down 1]' if char == '"\\33OA",': #vim中下箭頭 char = '[up 1]' if char == '"\\33OC",': #vim中右移 char = '[->1]' if char == '"\\33OD",': #vim中左移 char = '[1<-]' if char == '"\33[2;2R",': #進入vim模式 continue if char == '"\\33[>1;95;0c",': # 進入vim模式 char = '[----enter vim mode-----]' if char == '"\\33[A",': #命令行向上箭頭 char = '[up 1]' catch_write5_flag = True #取到向上按鍵拿到的歷史命令 if char == '"\\33[B",': # 命令行向上箭頭 char = '[down 1]' catch_write5_flag = True # 取到向下按鍵拿到的歷史命令 if char == '"\\33[C",': # 命令行向右移動1位 char = '[->1]' if char == '"\\33[D",': # 命令行向左移動1位 char = '[1<-]' cmd_str += char.strip('"",') if char == '"\\t",': catch_write5_flag = True continue if char == '"\\r",': cmd_list.append([time_clock,cmd_str]) cmd_str = '' # 重置 if char == '"':#space cmd_str += ' ' if catch_write5_flag: #to catch tab completion if io_call.startswith('write(5'): if io_call == '"\7",': #空鍵,不是空格,是回退不了就是這個鍵 pass else: cmd_str += char.strip('"",') catch_write5_flag = False except ValueError as e: print("\033[031;1mSession log record err,please contact your IT admin,\033[0m",e) #print(cmd_list) for cmd in cmd_list: print(cmd) return cmd_list if __name__ == "__main__": parser = AuditLogHandler(r'D:\zhanggen_audit\log\6.log') parser.parse()
3.修改bashrc文件,限制用戶登錄行爲;
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
alias rm='rm -i' alias cp='cp -i' alias mv='mv -i' # Source global definitions if [ -f /etc/bashrc ]; then . /etc/bashrc fi echo '-----------------------welcome to zhanggen audit --------------------------' python3 /root/zhanggen_audit/audit_shell.py echo 'bye' logout
缺陷:
雖然限制了用戶shell登錄,但無法阻止用戶使用程序(paramiko)上傳惡意文件!
方式2:提取paramiko源碼demos文件,對其進行修改支持交互式操作;
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from django.db import models from django.contrib.auth.models import User # Create your models here. class IDC(models.Model): name = models.CharField(max_length=64,unique=True) def __str__(self): return self.name class Host(models.Model): """存儲所有主機信息""" hostname = models.CharField(max_length=64,unique=True) ip_addr = models.GenericIPAddressField(unique=True) port = models.IntegerField(default=22) idc = models.ForeignKey("IDC") #host_groups = models.ManyToManyField("HostGroup") #host_users = models.ManyToManyField("HostUser") enabled = models.BooleanField(default=True) def __str__(self): return "%s-%s" %(self.hostname,self.ip_addr) class HostGroup(models.Model): """主機組""" name = models.CharField(max_length=64,unique=True) host_user_binds = models.ManyToManyField("HostUserBind") def __str__(self): return self.name class HostUser(models.Model): """存儲遠程主機的用戶信息 root 123 root abc root sfsfs """ auth_type_choices = ((0,'ssh-password'),(1,'ssh-key')) auth_type = models.SmallIntegerField(choices=auth_type_choices) username = models.CharField(max_length=32) password = models.CharField(blank=True,null=True,max_length=128) def __str__(self): return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password) class Meta: unique_together = ('username','password') class HostUserBind(models.Model): """綁定主機和用戶""" host = models.ForeignKey("Host") host_user = models.ForeignKey("HostUser") def __str__(self): return "%s-%s" %(self.host,self.host_user) class Meta: unique_together = ('host','host_user') class AuditLog(models.Model): """審計日誌""" session = models.ForeignKey("SessionLog") cmd = models.TextField() date = models.DateTimeField(auto_now_add=True) def __str__(self): return "%s-%s" %(self.session,self.cmd) class SessionLog(models.Model): account = models.ForeignKey("Account") host_user_bind = models.ForeignKey("HostUserBind") start_date = models.DateTimeField(auto_now_add=True) end_date = models.DateTimeField(blank=True,null=True) def __str__(self): return "%s-%s" %(self.account,self.host_user_bind) class Account(models.Model): """堡壘機賬戶 1. 擴展 2. 繼承 user.account.host_user_bind """ user = models.OneToOneField(User) name = models.CharField(max_length=64) host_user_binds = models.ManyToManyField("HostUserBind",blank=True) host_groups = models.ManyToManyField("HostGroup",blank=True)
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
__author__ = 'Administrator' import subprocess,random,string from django.contrib.auth import authenticate from django.conf import settings from audit import models from audit.backend import ssh_interactive class UserShell(object): """用戶登錄堡壘機後的shell""" def __init__(self,sys_argv): self.sys_argv = sys_argv self.user = None def auth(self): count = 0 while count < 3: username = input("username:").strip() password = input("password:").strip() user = authenticate(username=username,password=password) #None 代表認證不成功 #user object ,認證對象 ,user.name if not user: count += 1 print("Invalid username or password!") else: self.user = user return True else: print("too many attempts.") def start(self): """啓動交互程序""" if self.auth(): #print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index,group in enumerate(host_groups): print("%s.\t%s[%s]"%(index,group,group.host_user_binds.count())) print("%s.\t未分組機器[%s]"%(len(host_groups),self.user.account.host_user_binds.count())) try: choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >=0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): #選擇的未分組機器 #selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index,host in enumerate(host_bind_list): print("%s.\t%s"%(index,host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >=0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] ssh_interactive.ssh_session(selected_host,self.user) # s = string.ascii_lowercase +string.digits # random_tag = ''.join(random.sample(s,10)) # session_obj = models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host) # # cmd = "sshpass -p %s /usr/local/openssh/bin/ssh %[email protected]%s -p %s -o StrictHostKeyChecking=no -Z %s" %(selected_host.host_user.password,selected_host.host_user.username,selected_host.host.ip_addr,selected_host.host.port ,random_tag) # #start strace ,and sleep 1 random_tag, session_obj.id # session_tracker_script = "/bin/sh %s %s %s " %(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.id) # # session_tracker_obj =subprocess.Popen(session_tracker_script, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) # # ssh_channel = subprocess.run(cmd,shell=True) # print(session_tracker_obj.stdout.read(), session_tracker_obj.stderr.read()) # elif choice2 == 'b': break except KeyboardInterrupt as e : pass
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
#!/usr/bin/env python # Copyright (C) 2003-2007 Robey Pointer <[email protected]> # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. import base64 from binascii import hexlify import getpass import os import select import socket import sys import time import traceback from paramiko.py3compat import input from audit import models import paramiko try: import interactive except ImportError: from . import interactive def manual_auth(t, username, password): # default_auth = 'p' # auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth) # if len(auth) == 0: # auth = default_auth # # if auth == 'r': # default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa') # path = input('RSA key [%s]: ' % default_path) # if len(path) == 0: # path = default_path # try: # key = paramiko.RSAKey.from_private_key_file(path) # except paramiko.PasswordRequiredException: # password = getpass.getpass('RSA key password: ') # key = paramiko.RSAKey.from_private_key_file(path, password) # t.auth_publickey(username, key) # elif auth == 'd': # default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa') # path = input('DSS key [%s]: ' % default_path) # if len(path) == 0: # path = default_path # try: # key = paramiko.DSSKey.from_private_key_file(path) # except paramiko.PasswordRequiredException: # password = getpass.getpass('DSS key password: ') # key = paramiko.DSSKey.from_private_key_file(path, password) # t.auth_publickey(username, key) # else: # pw = getpass.getpass('Password for %[email protected]%s: ' % (username, hostname)) t.auth_password(username, password) def ssh_session(bind_host_user, user_obj): # now connect hostname = bind_host_user.host.ip_addr #自動輸入 主機名 port = bind_host_user.host.port #端口 username = bind_host_user.host_user.username password = bind_host_user.host_user.password try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #生成socket連接 sock.connect((hostname, port)) except Exception as e: print('*** Connect failed: ' + str(e)) traceback.print_exc() sys.exit(1) try: t = paramiko.Transport(sock) #使用paramiko的方法去連接服務器執行命令! try: t.start_client() except paramiko.SSHException: print('*** SSH negotiation failed.') sys.exit(1) try: keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts')) except IOError: try: keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts')) except IOError: print('*** Unable to open host keys file') keys = {} # check server's host key -- this is important. key = t.get_remote_server_key() if hostname not in keys: print('*** WARNING: Unknown host key!') elif key.get_name() not in keys[hostname]: print('*** WARNING: Unknown host key!') elif keys[hostname][key.get_name()] != key: print('*** WARNING: Host key has changed!!!') sys.exit(1) else: print('*** Host key OK.') if not t.is_authenticated(): manual_auth(t, username, password) #密碼校驗 if not t.is_authenticated(): print('*** Authentication failed. :(') t.close() sys.exit(1) chan = t.open_session() chan.get_pty() # terminal chan.invoke_shell() print('*** Here we go!\n') session_obj = models.SessionLog.objects.create(account=user_obj.account, host_user_bind=bind_host_user) interactive.interactive_shell(chan, session_obj)#開始進入交換模式· chan.close() t.close() except Exception as e: print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e)) traceback.print_exc() try: t.close() except: pass sys.exit(1)
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
# Copyright (C) 2003-2007 Robey Pointer <[email protected]> # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. import socket import sys from paramiko.py3compat import u from audit import models # windows does not have termios... try: import termios import tty has_termios = True except ImportError: has_termios = False def interactive_shell(chan,session_obj): if has_termios: # posix_shell(chan,session_obj) #unix 通用協議標準 else: windows_shell(chan) def posix_shell(chan,session_obj): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) flag = False cmd = '' while True: #開始輸入命令 r, w, e = select.select([chan, sys.stdin], [], []) #循環檢測 輸入、輸出、錯誤,有反應就返回,沒有就一直夯住! if chan in r:#遠程 由返回 命令結果 try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break if flag: #如果用戶輸入的Tab補全,服務器端返回 cmd += x flag = False sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: #本地輸入 x = sys.stdin.read(1) #輸入1個字符就發送遠程服務器 if len(x) == 0: break if x == '\r': #回車· models.AuditLog.objects.create(session=session_obj,cmd=cmd) cmd = '' elif x == '\t':#tab 本地1個字符+遠程返回的 flag = True else: cmd += x chan.send(x) #發送本地輸入 到遠程服務器 finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) # thanks to Mike Looijmans for this code def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write('\r\n*** EOF ***\r\n\r\n') sys.stdout.flush() break sys.stdout.write(data) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass
程序流程:用戶界面---------->ssh自動輸入用戶&登錄密碼---------->進入shell命令交互模式
知識點:
1對1: 1個 對應 1個 (1個女人嫁給了1個男人,生活慢慢平淡下來,)
1對多: 1個 對應 N個 (這個女人隱瞞丈夫相繼出軌了N個男人,這個男人發現老婆出軌了,很憤懣)
多對多: 雙方都存在1對多關係 (也相繼找了N個女情人,而這些女情人中就有他老婆出軌男人的老婆,故事結束。)
感悟:
這個故事很混亂! 怎麼設計男、女表結構? 其實在做數據庫表關係設計的時候,糾結2張表到底需要設計成什麼關係?到不如加幾張關係綁定表!
完全是出於 你的程序在允許的過程中到底 要向用戶展示什麼信息? 而決定的!
web頁面使用堡壘機方式:
web開發模式
1.MTV/MVC 前後端雜交模式;(面向公司內部OA)
優勢:簡單,一人全棧;
缺陷:前後端耦合性高,性能低、單點壓力
2.前後端分離(面向大衆用戶)
優勢:前、後端開發人員商定好接口和數據格式,並行開發,效率高;解決了後端獨自渲染模板的壓力;
缺陷:招前端得花錢
3.hostlist 展示主機組和主機
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
<div class="panel col-lg-3"> <div class="panel-heading"> <h3 class="panel-title">主機組</h3> </div> <div class="panel-body"> <ul class="list-group"> {% for group in request.user.account.host_groups.all %} <li class="list-group-item " onclick="GetHostlist({{ group.id }},this)"><span class="badge badge-success">{{ group.host_user_binds.count }}</span>{{ group.name }}</li> {% endfor %} <li class="list-group-item " onclick="GetHostlist(-1,this)"> <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span>未分組主機</li> </ul> </div> </div>
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
<script> function GetHostlist(gid,self) { $.get("{% url 'get_host_list' %}",{'gid':gid},function(callback){ var data = JSON.parse(callback); console.log(data) var trs = '' $.each(data,function (index,i) { var tr = "<tr><td>" + i.host__hostname + "</td><td>" + i.host__ip_addr +"</td><td>" + i.host__idc__name +"</td><td>" + i.host__port + "</td><td>" + i.host_user__username+ "</td><td>Login</td></tr>"; trs += tr }) $("#hostlist").html(trs); });//end get $(self).addClass("active").siblings().removeClass('active'); } </script>
知識點:
如果給標籤綁定事件,需要傳參數,可以直接在標籤直接綁定。
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
url(r'^get_tocken$', views.get_tocken, name="get_tocken"),
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
function GetToken(self,bind_host_id) { $.post( '{% url "get_tocken" %}', //通過url別名渲染url {'bind_host_id':bind_host_id,'csrfmiddlewaretoken':"{{ csrf_token }}"},//請求攜帶的參數 function (callback) { //回調函數 console.log(callback) } ) }
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
@login_required def get_token(request): bind_host_id=request.POST.get('bind_host_id') time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300) # 5mins ago exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id, host_user_bind_id=bind_host_id, date__gt=time_obj) if exist_token_objs: # has token already token_data = {'token': exist_token_objs[0].val} else: token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8)) token_obj=models.Token.objects.create( host_user_bind_id=bind_host_id, account=request.user.account, val=token_val) token_data={"token":token_val} return HttpResponse(json.dumps(token_data))
4.點擊主機登錄,通過Shellinabox 插件以web頁面的形式遠程登錄Linux主機;
4.0 安裝sehllinabox
yum install git openssl-devel pam-devel zlib-devel autoconf automake libtool git clone https://github.com/shellinabox/shellinabox.git && cd shellinabox autoreconf -i ./configure && make make install shellinaboxd -b -t //-b選項代表在後臺啓動,-t選項表示不使用https方式啓動,默認以nobody用戶身份,監聽TCP4200端口 netstat -ntpl |grep shell
5.django結合sehll inabox
5.1:用戶在Django的hostlist頁面點擊生成tocken(綁定了account+host_bind_user),記錄到數據庫。
5.2: 用戶在Django的hostlist頁面 login跳轉至 sehll inabox由於修改了bashrc跳轉之後,就會執行python用戶交互程序,python用戶交互程序 提示用戶輸入 token;
5.3: 用戶輸入token之後,python 用戶交互程序去數據庫查詢token,進而查詢到host_bind_user的ip、用戶、密碼,調用paramiko的demo.py自動輸入ip、用戶、密碼進入shell交互界面;
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from django.db import models from django.contrib.auth.models import User # Create your models here. class IDC(models.Model): name = models.CharField(max_length=64,unique=True) def __str__(self): return self.name class Host(models.Model): """存儲所有主機信息""" hostname = models.CharField(max_length=64,unique=True) ip_addr = models.GenericIPAddressField(unique=True) port = models.IntegerField(default=22) idc = models.ForeignKey("IDC") #host_groups = models.ManyToManyField("HostGroup") #host_users = models.ManyToManyField("HostUser") enabled = models.BooleanField(default=True) def __str__(self): return "%s-%s" %(self.hostname,self.ip_addr) class HostGroup(models.Model): """主機組""" name = models.CharField(max_length=64,unique=True) host_user_binds = models.ManyToManyField("HostUserBind") def __str__(self): return self.name class HostUser(models.Model): """存儲遠程主機的用戶信息 root 123 root abc root sfsfs """ auth_type_choices = ((0,'ssh-password'),(1,'ssh-key')) auth_type = models.SmallIntegerField(choices=auth_type_choices) username = models.CharField(max_length=32) password = models.CharField(blank=True,null=True,max_length=128) def __str__(self): return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password) class Meta: unique_together = ('username','password') class HostUserBind(models.Model): """綁定主機和用戶""" host = models.ForeignKey("Host") host_user = models.ForeignKey("HostUser") def __str__(self): return "%s-%s" %(self.host,self.host_user) class Meta: unique_together = ('host','host_user') class AuditLog(models.Model): """審計日誌""" session = models.ForeignKey("SessionLog") cmd = models.TextField() date = models.DateTimeField(auto_now_add=True) def __str__(self): return "%s-%s" %(self.session,self.cmd) class SessionLog(models.Model): account = models.ForeignKey("Account") host_user_bind = models.ForeignKey("HostUserBind") start_date = models.DateTimeField(auto_now_add=True) end_date = models.DateTimeField(blank=True,null=True) def __str__(self): return "%s-%s" %(self.account,self.host_user_bind) class Account(models.Model): """堡壘機賬戶 1. 擴展 2. 繼承 user.account.host_user_bind """ user = models.OneToOneField(User) name = models.CharField(max_length=64) host_user_binds = models.ManyToManyField("HostUserBind",blank=True) host_groups = models.ManyToManyField("HostGroup",blank=True) class Token(models.Model): host_user_bind = models.ForeignKey("HostUserBind") val = models.CharField(max_length=128,unique=True) account = models.ForeignKey("Account") expire = models.IntegerField("超時時間(s)",default=300) date = models.DateTimeField(auto_now_add=True) def __str__(self): return "%s-%s" %(self.host_user_bind,self.val)
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
@login_required def get_token(request): bind_host_id=request.POST.get('bind_host_id') time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300) # 5mins ago exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id, host_user_bind_id=bind_host_id, date__gt=time_obj) if exist_token_objs: # has token already token_data = {'token': exist_token_objs[0].val} else: token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8)) token_obj=models.Token.objects.create( host_user_bind_id=bind_host_id, account=request.user.account, val=token_val) token_data={"token":token_val} return HttpResponse(json.dumps(token_data))
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
{% extends 'index.html' %} {% block content-container %} <div id="page-title"> <h1 class="page-header text-overflow">主機列表</h1> <!--Searchbox--> <div class="searchbox"> <div class="input-group custom-search-form"> <input type="text" class="form-control" placeholder="Search.."> <span class="input-group-btn"> <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button> </span> </div> </div> </div> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End page title--> <!--Breadcrumb--> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <ol class="breadcrumb"> <li><a href="#">Home</a></li> <li><a href="#">Library</a></li> <li class="active">主機列表</li> </ol> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End breadcrumb--> <div id="page-content"> <div class="panel col-lg-3"> <div class="panel-heading"> <h3 class="panel-title">主機組</h3> </div> <div class="panel-body"> <ul class="list-group"> {% for group in request.user.account.host_groups.all %} <li class="list-group-item " οnclick="GetHostlist({{ group.id }},this)"><span class="badge badge-success">{{ group.host_user_binds.count }}</span>{{ group.name }}</li> {% endfor %} <li class="list-group-item " οnclick="GetHostlist(-1,this)"> <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span>未分組主機</li> </ul> </div> </div> <div class="panel col-lg-9"> <div class="panel-heading"> <h3 class="panel-title">主機列表</h3> </div> <div class="panel-body"> <div class="table-responsive"> <table class="table table-striped"> <thead> <tr> <th>Hostname</th> <th>IP</th> <th>IDC</th> <th>Port</th> <th>Username</th> <th>Login</th> <th>Token</th> </tr> </thead> <tbody id="hostlist"> {# <tr>#} {# <td><a href="#fakelink" class="btn-link">Order #53451</a></td>#} {# <td>Scott S. Calabrese</td>#} {# <td>$24.98</td>#} {# </tr>#} </tbody> </table> </div> </div> </div> </div> <script> function GetHostlist(gid,self) { $.get("{% url 'get_host_list' %}",{'gid':gid},function(callback){ var data = JSON.parse(callback); console.log(data); var trs = ''; $.each(data,function (index,i) { var tr = "<tr><td>" + i.host__hostname + "</td><td>" + i.host__ip_addr +"</td><td>" + i.host__idc__name +"</td><td>" + i.host__port + "</td><td>" + i.host_user__username+ "</td><td><a class='btn btn-sm btn-info' οnclick=GetToken(this,'"+i.id +"')>Token</a><a href='http://192.168.226.135:4200/' class='btn btn-sm btn-info'')>login</a></td><td ></td></tr>"; trs += tr }); $("#hostlist").html(trs); });//end get $(self).addClass("active").siblings().removeClass('active'); } function GetToken(self,bind_host_id) { $.post( '{% url "get_token" %}', //通過url別名渲染url {'bind_host_id':bind_host_id,'csrfmiddlewaretoken':"{{ csrf_token }}"},//請求攜帶的參數 function (callback) { //回調函數 console.log(callback); var data = JSON.parse(callback); //django響應的數據 $(self).parent().next().text(data.token); } ) } </script> {% endblock %}
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
import subprocess,random,string,datetime from django.contrib.auth import authenticate from django.conf import settings from audit import models from audit.backend import ssh_interactive class UserShell(object): """用戶登錄堡壘機後的shell""" def __init__(self,sys_argv): self.sys_argv = sys_argv self.user = None def auth(self): count = 0 while count < 3: username = input("username:").strip() password = input("password:").strip() user = authenticate(username=username,password=password) #None 代表認證不成功 #user object ,認證對象 ,user.name if not user: count += 1 print("Invalid username or password!") else: self.user = user return True else: print("too many attempts.") def token_auth(self): count = 0 while count < 3: user_input = input("請輸入token:").strip() if len(user_input) == 0: return if len(user_input) != 8: print("token length is 8") else: time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300) # 5mins ago token_obj = models.Token.objects.filter(val=user_input, date__gt=time_obj).first() if token_obj: if token_obj.val == user_input: # 口令對上了 self.user = token_obj.account.user #進入交互式shll需要用戶認證! return token_obj count+=1 def start(self): """啓動交互程序""" token_obj = self.token_auth() if token_obj: ssh_interactive.ssh_session(token_obj.host_user_bind, self.user) exit() if self.auth(): #print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index,group in enumerate(host_groups): print("%s.\t%s[%s]"%(index,group,group.host_user_binds.count())) print("%s.\t未分組機器[%s]"%(len(host_groups),self.user.account.host_user_binds.count())) try: choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >=0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): #選擇的未分組機器 #selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index,host in enumerate(host_bind_list): print("%s.\t%s"%(index,host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >=0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] ssh_interactive.ssh_session(selected_host,self.user) # s = string.ascii_lowercase +string.digits # random_tag = ''.join(random.sample(s,10)) # session_obj = models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host) # # cmd = "sshpass -p %s /usr/local/openssh/bin/ssh %[email protected]%s -p %s -o StrictHostKeyChecking=no -Z %s" %(selected_host.host_user.password,selected_host.host_user.username,selected_host.host.ip_addr,selected_host.host.port ,random_tag) # #start strace ,and sleep 1 random_tag, session_obj.id # session_tracker_script = "/bin/sh %s %s %s " %(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.id) # # session_tracker_obj =subprocess.Popen(session_tracker_script, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) # # ssh_channel = subprocess.run(cmd,shell=True) # print(session_tracker_obj.stdout.read(), session_tracker_obj.stderr.read()) # elif choice2 == 'b': break except KeyboardInterrupt as e : pass
三、通過堡壘機批量執行Linux命令
1.批量執行命令前端頁面
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
{% extends 'index.html' %} {% block content-container %} {# {% csrf_token %}#} <div id="page-title"> <h1 class="page-header text-overflow">主機列表</h1> <!--Searchbox--> <div class="searchbox"> <div class="input-group custom-search-form"> <input type="text" class="form-control" placeholder="Search.."> <span class="input-group-btn"> <button class="text-muted" type="button"><i class="pli-magnifi-glass"></i></button> </span> </div> </div> </div> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End page title--> <!--Breadcrumb--> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <ol class="breadcrumb"> <li><a href="#">Home</a></li> <li><a href="#">Library</a></li> <li class="active">主機列表</li> </ol> <!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> <!--End breadcrumb--> <div id="page-content"> <div class="panel col-lg-3"> <div class="panel-heading"> <h3 class="panel-title">主機組 <span id="selected_hosts"></span></h3> </div> <div class="panel-body"> <ul class="list-group" id="host_groups"> {% for group in request.user.account.host_groups.all %} <li class="list-group-item " ><span class="badge badge-success">{{ group.host_user_binds.count }}</span> <input type="checkbox" onclick="CheckAll(this)"> <a onclick="DisplayHostList(this)">{{ group.name }}</a> <!--點擊組名,組名下的 主機列表通過toggleclass 展示/隱藏 --> <ul class="hide"> {% for bind_host in group.host_user_binds.all %} <li><input onclick="ShowCheckedHostCount()" type="checkbox" value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li> {% endfor %} </ul> </li> {% endfor %} <li class="list-group-item " > <span class="badge badge-success">{{ request.user.account.host_user_binds.count }}</span> <input type="checkbox" onclick="CheckAll(this)"> <a onclick="DisplayHostList(this)">未分組主機</a> <ul class="hide"> {% for bind_host in request.user.account.host_user_binds.all %} <li><input onclick="ShowCheckedHostCount()" type="checkbox" value="{{ bind_host.id }}">{{ bind_host.host.ip_addr }}</li> {% endfor %} </ul> </li> </ul> </div> </div> <div class="col-lg-9"> <div class="panel"> <div class="panel-heading"> <h3 class="panel-title">命令</h3> </div> <div class="panel-body"> <textarea class="form-control" id="cmd"></textarea> <button onclick="PostTask('cmd')" class="btn btn-info pull-right">執行</button> <button class="btn btn-danger ">終止</button> </div> </div> <div class="panel"> <div class="panel-heading"> <h3 class="panel-title">任務結果</h3> </div> <div class="panel-body"> <div id="task_result"> </div> </div> </div> </div> </div> <script> function DisplayHostList(self) { $(self).next().toggleClass("hide"); } function CheckAll(self){ console.log($(self).prop('checked')); $(self).parent().find("ul :checkbox").prop('checked',$(self).prop('checked')); ShowCheckedHostCount() } function ShowCheckedHostCount(){ var selected_host_count = $("#host_groups ul").find(":checked").length; console.log(selected_host_count); $("#selected_hosts").text(selected_host_count); return selected_host_count } {# function GetTaskResult(task_id) {#} {# $.getJSON("{% url 'get_task_result' %}",{'task_id':task_id},function(callback){#} {##} {# console.log(callback);#} {##} {# var result_ele = '';#} {# $.each(callback,function (index,i) {#} {# var p_ele = "<p>" + i.host_user_bind__host__hostname + "(" +i.host_user_bind__host__ip_addr +") ------" +#} {# i.status + "</p>";#} {# var res_ele = "<pre>" + i.result +"</pre>";#} {##} {# var single_result = p_ele + res_ele;#} {# result_ele += single_result#} {# });#} {##} {# $("#task_result").html(result_ele)#} {##} {##} {# });//end getJSON#} {##} {# }#} function PostTask(task_type) { //1. 驗證主機列表已選,命令已輸入 //2. 提交任務到後臺 var selected_host_ids = []; var selected_host_eles = $("#host_groups ul").find(":checked"); $.each(selected_host_eles,function (index,ele) { selected_host_ids.push($(ele).val()) }); console.log(selected_host_ids); if ( selected_host_ids.length == 0){ alert("主機未選擇!"); return false; } var cmd_text = $.trim($("#cmd").val()); if ( cmd_text.length == 0){ alert("未輸入命令!"); return false; } var task_data = { 'task_type':task_type, 'selected_host_ids': selected_host_ids, 'cmd': cmd_text }; $.post("{% url 'multitask' %}",{'csrfmiddlewaretoken':"{{ csrf_token }}",'task_data':JSON.stringify(task_data)}, function(callback){ console.log(callback) ;// task id var callback = JSON.parse(callback); GetTaskResult(callback.task_id); var result_timer = setInterval(function () { GetTaskResult(callback.task_id) },2000) } );//end post } </script> {% endblock %}
2.前端收集批量執行的主機,通過ajax發送到後臺
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
@login_required def multitask(request): task_obj = task_handler.Task(request) respose=HttpResponse(json.dumps(task_obj.errors)) if task_obj.is_valid(): # 如果驗證成功 result = task_obj.run() #run()去選擇要執行的任務類型,然後通過 getattr()去執行 respose=HttpResponse(json.dumps({'task_id':result})) #返回數據庫pk task_id return respose
3.後端通過is_valid方法驗證數據的合法性
4.驗證失敗響應前端self.errors信息,驗證成功執行run()選擇任務類型;
5.選擇任務類型(cmd/files_transfer)之後初始化數據庫(更新Task、TaskLog表數據)
6.cmd/files_transfer方法開啓新進程(multitask_execute.py)新進程開啓進程池 去執行批量命令;
7.前端使用定時器不斷去後臺獲取數據;
8.程序中斷按鈕
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
""" Django settings for zhanggen_audit project. Generated by 'django-admin startproject' using Django 1.11.4. For more information on this file, see https://docs.djangoproject.com/en/1.11/topics/settings/ For the full list of settings and their values, see https://docs.djangoproject.com/en/1.11/ref/settings/ """ import os # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = '[email protected]_3y4vizrcxnnj(&vz2en#edpq%i&jr%99-xxv)&' # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True ALLOWED_HOSTS = ['*'] # Application definition INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'audit.apps.AuditConfig', ] MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', ] ROOT_URLCONF = 'zhanggen_audit.urls' TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [os.path.join(BASE_DIR, 'templates'),], 'APP_DIRS': True, 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', ], }, }, ] WSGI_APPLICATION = 'zhanggen_audit.wsgi.application' # Database # https://docs.djangoproject.com/en/1.11/ref/settings/#databases DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', }, { 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', }, { 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', }, { 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', }, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = 'en-us' TIME_ZONE = 'Asia/Shanghai' USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = '/static/' STATICFILES_DIRS=( os.path.join(BASE_DIR,'static'), ) SESSION_TRACKER_SCRIPT=os.path.join(BASE_DIR,'audit%sbackend%ssession_check.sh')%(os.sep,os.sep) SESSION_TRACKER_SCRIPT_LOG_PATH=os.path.join(BASE_DIR,'log')#日誌路徑 MULTI_TASK_SCRIPT = os.path.join(BASE_DIR,'multitask_execute.py') #腳本路徑 CURRENT_PGID=None #進程的 pgid
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
"""zhanggen_audit URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/1.11/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: url(r'^$', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.conf.urls import url, include 2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls')) """ from django.conf.urls import url from django.contrib import admin from audit import views urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^$', views.index ), url(r'^login/$', views.acc_login ), url(r'^logout/$', views.acc_logout ), url(r'^hostlist/$', views.host_list ,name="host_list"), url(r'^multitask/$', views.multitask ,name="multitask"), url(r'^multitask/result/$', views.multitask_result ,name="get_task_result"), url(r'^multitask/cmd/$', views.multi_cmd ,name="multi_cmd"), url(r'^multitask/file_transfer/$', views.multi_file_transfer ,name="multi_file_transfer"), url(r'^api/hostlist/$', views.get_host_list ,name="get_host_list"), url(r'^api/token/$', views.get_token ,name="get_token"), url(r'^api/task/file_upload/$', views.task_file_upload ,name="task_file_upload"), url(r'^api/task/file_download/$', views.task_file_download ,name="task_file_download"), url(r'^end_cmd/$', views.end_cmd,name="end_cmd"), ]
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from django.shortcuts import render,redirect,HttpResponse from django.contrib.auth import authenticate,login,logout from django.contrib.auth.decorators import login_required from django.views.decorators.csrf import csrf_exempt from django.conf import settings import signal import json,os from audit import models import random,string import datetime from audit import task_handler from django import conf import zipfile from wsgiref.util import FileWrapper #from django.core.servers.basehttp import FileWrapper @login_required def index(request): return render(request,'index.html') def acc_login(request): error = '' if request.method == "POST": username = request.POST.get('username') password = request.POST.get('password') user = authenticate(username=username,password=password) if user: login(request, user) return redirect(request.GET.get('next') or '/') else: error = "Wrong username or password!" return render(request,'), url(r'^multitask/$', views.multitask ,name="multitask"), url(r'^multitask/result/$', views.multitask_result ,name="get_task_result"), url(r'^multitask/cmd/$', views.multi_cmd ,name="multi_cmd"), url(r'^multitask/file_transfer/$', views.multi_file_transfer ,name="multi_file_transfer"), url(r'^api/hostlist/$', views.get_host_list ,name="get_host_list"), url(r'^api/token/$', views.get_token ,name="get_token"), url(r'^api/task/file_upload/$', views.task_file_upload ,name="task_file_upload"), url(r'^api/task/file_download/$', views.task_file_download ,name="task_file_download"), url(r'^end_cmd/$', views.end_cmd,name="end_cmd"), ]
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
from django.shortcuts import render,redirect,HttpResponse from django.contrib.auth import authenticate,login,logout from django.contrib.auth.decorators import login_required from django.views.decorators.csrf import csrf_exempt from django.conf import settings import signal import json,os from audit import models import random,string import datetime from audit import task_handler from django import conf import zipfile from wsgiref.util import FileWrapper #from django.core.servers.basehttp import FileWrapper @login_required def index(request): return render(request,'index.html') def acc_login(request): error = '' if request.method == "POST": username = request.POST.get('username') password = request.POST.get('password') user = authenticate(username=username,password=password) if user: login(request, user) return redirect(request.GET.get('next') or '/') else: error = "Wrong username or password!" return render(request,'login.html',{'error':error }) @login_required def acc_logout(request): logout(request) return redirect('/login/') @login_required def host_list(request): return render(request,'hostlist.html') @login_required def get_host_list(request): gid = request.GET.get('gid') if gid: if gid == '-1':#未分組 host_list = request.user.account.host_user_binds.all() elsepassword') user = authenticate(username=username,password=password) if user: login(request, user) return redirect(request.GET.get('next') or '/') else: error = "Wrong username or password!" return render(request,'login.html',{'error':error }) @login_required def acc_logout(request): logout(request) return redirect('/login/') @login_required def host_list(request): return render(request,'hostlist.html') @login_required def get_host_list(request): gid = request.GET.get('gid') if gid: if gid == '-1':#未分組 host_list = request.user.account.host_user_binds.all() else: group_obj = request.user.account.host_groups.get(id=gid) host_list = group_obj.host_user_binds.all() data = json.dumps(list(host_list.values('id','host__hostname','host__ip_addr','host__idc__name','host__port', 'host_user__username'))) return HttpResponse(data) @login_required def get_token(request): bind_host_id=request.POST.get('bind_host_id') time_obj = datetime.datetime.now() - datetime.timedelta(seconds=300) # 5mins ago exist_token_objs = models.Token.objects.filter(account_id=request.user.account.id, host_user_bind_id=bind_host_id, date__gt=time_obj) if exist_token_objs: # has token already token_data = {'token': exist_token_objs[0].val} else: token_val=''.join(random.sample(string.ascii_lowercase+string.digits,8)) token_obj=models.Token.objects.create( host_user_bind_id=bind_host_id, account=request.user.account, val=token_val) token_data={"token":token_val} return HttpResponse(json.dumps(token_data)) @login_required def multi_cmd(request): """多命令執行頁面""" return render(request,'multi_cmd.html') @login_required def multitask(request): task_obj = task_handler.Task(request) respose=HttpResponse(json.dumps(task_obj.errors)) if task_obj.is_valid(): # 如果驗證成功 task_obj = task_obj.run() #run()去選擇要執行的任務類型,然後通過 getattr()去執行 respose=HttpResponse(json.dumps({'task_id':task_obj.id,'timeout':task_obj.timeout})) #返回數據庫pk task_id return respose @login_required def multitask_result(request): """多任務結果""" task_id = request.GET.get('task_id') ''.join(random.sample(string.ascii_lowercase+string.digits,8)) token_obj=models.Token.objects.create( host_user_bind_id=bind_host_id, account=request.user.account, val=token_val) token_data={"token":token_val} return HttpResponse(json.dumps(token_data)) @login_required def