# coding:utf-8 # 漏洞檢測引擎 import urllib2 import thread import time import pymongo import sys import datetime import hashlib import json import re import uuid import os from kunpeng import kunpeng sys.path.append(sys.path[0] + '/vuldb') # 加載漏洞插件的目錄 sys.path.append(sys.path[0] + "/../") from config import ProductionConfig db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT) na_db = getattr(db_conn, ProductionConfig.DBNAME) na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD) na_task = na_db.Task na_result = na_db.Result na_plugin = na_db.Plugin na_config = na_db.Config na_heart = na_db.Heartbeat na_update = na_db.Update lock = thread.allocate() PASSWORD_DIC = [] THREAD_COUNT = 50 TIMEOUT = 10 PLUGIN_DB = {} TASK_DATE_DIC = {} WHITE_LIST = [] kp = kunpeng() #巡風的漏洞掃描技術要先看初始化的方法,看完初始化的方法,再從main函數開始看,這樣容易理解,分析完main函數以後,再從start函數慢慢看 class vulscan(): # 初始化操做 def __init__(self, task_id, task_netloc, task_plugin): self.task_id = task_id #任務id self.task_netloc = task_netloc #任務端口 self.task_plugin = task_plugin #任務插件 self.result_info = '' #任務結果 self.start() #任務開始 # start ---> 開始檢測 def start(self): self.get_plugin_info() #獲取插件列表,這個在80多行有個get_plugin_info方法,主要就是info = xxx。 #這個是獲取.json格式的漏洞庫,主要用來是探測,並不能起到exp的做用 if '.json' in self.plugin_info['filename']: # 標示符檢測模式 self.load_json_plugin() # 讀取漏洞標示 # 跟蹤load_json_plugin()函數,讀取的時候,是字符串。因此須要轉換成json的形式。 self.set_request() # 標示符轉換爲請求 # 這個就是發送request的請求了 self.poc_check() # 檢測 # 進行poc驗證,這個地方須要是check(),裏面有py,md5,json三種格式的請求,因此要分別的驗證一下 # 若是時kunpeng的腳本開始驗證,分爲兩種一種是web端的http,https,一種是除了web端。沒咋懂,就這樣吧,有空再看 elif 'KP-' in self.plugin_info['filename']: self.log(str(self.task_netloc) + 'call kunpeng - ' + self.plugin_info['filename']) kp.set_config(TIMEOUT, PASSWORD_DIC) if self.task_netloc[1] != 80: self.result_info = kp.check('service', '{}:{}'.format( self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename']) if not self.result_info: scheme = 'http' if self.task_netloc[1] == 443: scheme = 'https' self.result_info = kp.check('web', '{}://{}:{}'.format( scheme, self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename']) else: # 腳本檢測模式,這個利用的py腳本的poc,仍是比較有意思的,可能能說到說到的就是這一個了,這個和咱們日常寫的跑py腳本差很少。 plugin_filename = self.plugin_info['filename'] # log裏面封裝的是一個print方法 self.log(str(self.task_netloc) + 'call ' + self.task_plugin) if task_plugin not in PLUGIN_DB: plugin_res = __import__(plugin_filename) setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC) # 給插件聲明密碼字典 PLUGIN_DB[plugin_filename] = plugin_res self.result_info = PLUGIN_DB[plugin_filename].check( str(self.task_netloc[0]), int(self.task_netloc[1]), TIMEOUT) self.save_request() # 保存結果 def get_plugin_info(self): info = na_plugin.find_one({"name": self.task_plugin}) self.plugin_info = info def load_json_plugin(self): json_plugin = open(sys.path[0] + '/vuldb/' + self.plugin_info['filename']).read() self.plugin_info['plugin'] = json.loads(json_plugin)['plugin'] def set_request(self): url = 'http://' + \ self.task_netloc[0] + ":" + \ str(self.task_netloc[1]) + self.plugin_info['plugin']['url'] if self.plugin_info['plugin']['method'] == 'GET': request = urllib2.Request(url) else: request = urllib2.Request(url, self.plugin_info['plugin']['data']) self.poc_request = request def get_code(self, header, html): try: m = re.search(r'<meta.*?charset=(.*?)"(>| |/)', html, flags=re.I) if m: return m.group(1).replace('"', '') except: pass try: if 'Content-Type' in header: Content_Type = header['Content-Type'] m = re.search(r'.*?charset=(.*?)(;|$)', Content_Type, flags=re.I) if m: return m.group(1) except: pass def poc_check(self): try: res = urllib2.urlopen(self.poc_request, timeout=30) res_html = res.read(204800) header = res.headers # res_code = res.code except urllib2.HTTPError, e: # res_code = e.code header = e.headers res_html = e.read(204800) except Exception, e: return try: html_code = self.get_code(header, res_html).strip() if html_code and len(html_code) < 12: res_html = res_html.decode(html_code).encode('utf-8') except: pass an_type = self.plugin_info['plugin']['analyzing'] vul_tag = self.plugin_info['plugin']['tag'] analyzingdata = self.plugin_info['plugin']['analyzingdata'] if an_type == 'keyword': # print poc['analyzingdata'].encode("utf-8") if analyzingdata.encode("utf-8") in res_html: self.result_info = vul_tag elif an_type == 'regex': if re.search(analyzingdata, res_html, re.I): self.result_info = vul_tag elif an_type == 'md5': md5 = hashlib.md5() md5.update(res_html) if md5.hexdigest() == analyzingdata: self.result_info = vul_tag def save_request(self): if self.result_info: time_ = datetime.datetime.now() self.log(str(self.task_netloc) + " " + self.result_info) v_count = na_result.find( {"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count() if not v_count: na_plugin.update({"name": self.task_plugin}, {"$inc": {'count': 1}}) vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'], "vul_type": self.plugin_info['type']} w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1], "vul_info": vulinfo, "info": self.result_info, "time": time_, "task_date": TASK_DATE_DIC[str(self.task_id)]} na_result.insert(w_vul) # self.wx_send(w_vul) # 自行定義漏洞提醒 def log(self, info): lock.acquire() try: time_str = time.strftime('%X', time.localtime(time.time())) print "[%s] %s" % (time_str, info) except: pass lock.release() def queue_get(): global TASK_DATE_DIC task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={ "$set": {"status": 1}}, sort={'time': 1}) if task_req: TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now() return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin'] else: task_req_row = na_task.find({"plan": {"$ne": 0}}) if task_req_row: for task_req in task_req_row: if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']): if task_req['isupdate'] == 1: task_req['target'] = update_target( json.loads(task_req['query'])) na_task.update({"_id": task_req['_id']}, { "$set": {"target": task_req['target']}}) na_task.update({"_id": task_req['_id']}, { "$inc": {"status": 1}}) TASK_DATE_DIC[str(task_req['_id']) ] = datetime.datetime.now() return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin'] return '', '', '', '' def update_target(query): target_list = [] try: result_list = na_db.Info.find(query) for result in result_list: target = [result["ip"], result["port"]] target_list.append(target) except: pass return target_list def monitor(): global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST while True: queue_count = na_task.find({"status": 0, "plan": 0}).count() if queue_count: load = 1 else: ac_count = thread._count() load = float(ac_count - 6) / THREAD_COUNT if load > 1: load = 1 if load < 0: load = 0 na_heart.update({"name": "load"}, { "$set": {"value": load, "up_time": datetime.datetime.now()}}) PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() if load > 0: time.sleep(8) else: time.sleep(60) def get_config(): try: config_info = na_config.find_one({"type": "vulscan"}) pass_row = config_info['config']['Password_dic'] thread_row = config_info['config']['Thread'] timeout_row = config_info['config']['Timeout'] white_row = config_info['config']['White_list'] password_dic = pass_row['value'].split('\n') thread_count = int(thread_row['value']) timeout = int(timeout_row['value']) white_list = white_row['value'].split('\n') return password_dic, thread_count, timeout, white_list except Exception, e: print e def install_kunpeng_plugin(): time_ = datetime.datetime.now() for plugin in kp.get_plugin_list(): level_list = ['緊急','高危','中危','低危','提示'] plugin_info = { '_id': plugin['references']['kpid'], 'name': 'Kunpeng -' + plugin['name'], 'info': plugin['remarks'] + ' ' + plugin['references']['cve'], 'level': level_list[int(plugin['level'])], 'type': plugin['type'], 'author': plugin['author'], 'url': plugin['references']['url'], 'source': 1, 'keyword': '', 'add_time': time_, 'filename': plugin['references']['kpid'], 'count': 0 } na_plugin.insert(plugin_info) def init(): time_ = datetime.datetime.now() if na_plugin.find().count() >= 1: return script_plugin = [] json_plugin = [] print 'init plugins' file_list = os.listdir(sys.path[0] + '/vuldb') for filename in file_list: try: if filename.split('.')[1] == 'py': script_plugin.append(filename.split('.')[0]) if filename.split('.')[1] == 'json': json_plugin.append(filename) except: pass for plugin_name in script_plugin: try: res_tmp = __import__(plugin_name) plugin_info = res_tmp.get_plugin_info() plugin_info['add_time'] = time_ plugin_info['filename'] = plugin_name plugin_info['count'] = 0 na_plugin.insert(plugin_info) except: pass for plugin_name in json_plugin: try: json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read() plugin_info = json.loads(json_text) plugin_info['add_time'] = time_ plugin_info['filename'] = plugin_name plugin_info['count'] = 0 del plugin_info['plugin'] na_plugin.insert(plugin_info) except: pass install_kunpeng_plugin() def kp_check(): while True: try: new_release = kp.check_version() print new_release if new_release: info = new_release['body'] if '###' in new_release['body']: info = new_release['body'].split('###')[1] row = { 'info': info, 'isInstall': 0, 'name': new_release['name'], 'author': new_release['author']['login'], 'pushtime': new_release['published_at'], 'location': "", 'unicode': new_release['tag_name'], 'coverage': 0, 'source': 'kunpeng' } na_update.insert(row) time.sleep(60 * 60 * 48) except Exception as e: print e time.sleep(60 * 30) def kp_update(): while True: try: row = na_update.find_one_and_delete( {'source': 'kunpeng', 'isInstall': 1}) if row: kp.update_version(row['unicode']) na_plugin.delete_many({'_id':re.compile('^KP')}) install_kunpeng_plugin() except Exception as e: print e time.sleep(10) if __name__ == '__main__': init() PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() #從數據庫裏面得到配置參數,好比白名單,引擎的線程 thread.start_new_thread(monitor, ()) #巡風的心跳線程 thread.start_new_thread(kp_check, ()) #巡風的Kunpeng庫,檢查更新去狀況,跟蹤kp_check-->check_version-->_get_release_lates,這個函數發現的是與雲上的庫進行對比,若是有更新的話,就提示更新 # 若是沒有更新的話,就不提示了 thread.start_new_thread(kp_update, ()) #kunpeng庫更新,上面那個只是推送出來,可是尚未更新,這個是巡風更新的線程,就是若是你把kunpeng庫肯定更新了以後,而後才能是執行這個線程 while True: try: task_id, task_plan, task_target, task_plugin = queue_get() #獲取隊列,簡單來講就是得到任務的參數 #若是status : 0 ,這個是未執行的狀態,若是是status : 1就是正在執行的狀態 if task_id == '': time.sleep(10) # 每一個間隔10秒鐘 continue if PLUGIN_DB: del sys.modules[PLUGIN_DB.keys()[0]] # 清理插件緩存,這個我也沒太懂學長講的怎麼回事... PLUGIN_DB.clear() for task_netloc in task_target: while True: if int(thread._count()) < THREAD_COUNT: #若是任務的線程數<50 if task_netloc[0] in WHITE_LIST: #若是探測的資產是在白名單裏面,那就break,結束 (^-^) break try: thread.start_new_thread( vulscan, (task_id, task_netloc, task_plugin)) except Exception as e: print e break else: time.sleep(2) if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}}) except Exception as e: print e
巡風若是說做爲一個檢測內網的手段,真的是很是好用。另外分析下巡風的掃描,也能學到很多東西,好比我以前覺得掃描攝像頭的時候,覺得是掃描的數據流,其實掃描的路徑,拼接路徑就能夠了。還有巡風打poc的地方,也能夠學習學習。不過自我感受,仍是巡風的資產探測比較好用,主要的仍是要把巡風的資產探測學好,巡風的資產探測仍是比較好的html