對巡風vulscan的理解

# coding:utf-8
# 漏洞檢測引擎
import urllib2
import thread
import time
import pymongo
import sys
import datetime
import hashlib
import json
import re
import uuid
import os
from kunpeng import kunpeng


sys.path.append(sys.path[0] + '/vuldb')
# 加載漏洞插件的目錄
sys.path.append(sys.path[0] + "/../")

from config import ProductionConfig

db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT)
na_db = getattr(db_conn, ProductionConfig.DBNAME)
na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD)
na_task = na_db.Task
na_result = na_db.Result
na_plugin = na_db.Plugin
na_config = na_db.Config
na_heart = na_db.Heartbeat
na_update = na_db.Update
lock = thread.allocate()
PASSWORD_DIC = []
THREAD_COUNT = 50
TIMEOUT = 10
PLUGIN_DB = {}
TASK_DATE_DIC = {}
WHITE_LIST = []
kp = kunpeng()

#巡風的漏洞掃描技術要先看初始化的方法,看完初始化的方法,再從main函數開始看,這樣容易理解,分析完main函數以後,再從start函數慢慢看
class vulscan():
    # 初始化操做
    def __init__(self, task_id, task_netloc, task_plugin):
        self.task_id = task_id #任務id
        self.task_netloc = task_netloc #任務端口
        self.task_plugin = task_plugin #任務插件
        self.result_info = '' #任務結果
        self.start() #任務開始

# start ---> 開始檢測
    def start(self):
        self.get_plugin_info()
        #獲取插件列表,這個在80多行有個get_plugin_info方法,主要就是info = xxx。
        #這個是獲取.json格式的漏洞庫,主要用來是探測,並不能起到exp的做用
        if '.json' in self.plugin_info['filename']:  # 標示符檢測模式
            self.load_json_plugin()  # 讀取漏洞標示
            # 跟蹤load_json_plugin()函數,讀取的時候,是字符串。因此須要轉換成json的形式。
            self.set_request()  # 標示符轉換爲請求
            # 這個就是發送request的請求了
            self.poc_check()  # 檢測
        #     進行poc驗證,這個地方須要是check(),裏面有py,md5,json三種格式的請求,因此要分別的驗證一下

        # 若是時kunpeng的腳本開始驗證,分爲兩種一種是web端的http,https,一種是除了web端。沒咋懂,就這樣吧,有空再看
        elif 'KP-' in self.plugin_info['filename']:
            self.log(str(self.task_netloc) + 'call kunpeng - ' + self.plugin_info['filename'])
            kp.set_config(TIMEOUT, PASSWORD_DIC)
            if self.task_netloc[1] != 80:
                self.result_info = kp.check('service', '{}:{}'.format(
                    self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename'])
            if not self.result_info:
                scheme = 'http'
                if self.task_netloc[1] == 443:
                    scheme = 'https'
                self.result_info = kp.check('web', '{}://{}:{}'.format(
                    scheme, self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename'])

        else:  # 腳本檢測模式,這個利用的py腳本的poc,仍是比較有意思的,可能能說到說到的就是這一個了,這個和咱們日常寫的跑py腳本差很少。
            plugin_filename = self.plugin_info['filename']
            # log裏面封裝的是一個print方法
            self.log(str(self.task_netloc) + 'call ' + self.task_plugin)
            if task_plugin not in PLUGIN_DB:
                
                plugin_res = __import__(plugin_filename)
                setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC)  # 給插件聲明密碼字典
                PLUGIN_DB[plugin_filename] = plugin_res
            self.result_info = PLUGIN_DB[plugin_filename].check(
                str(self.task_netloc[0]), int(self.task_netloc[1]), TIMEOUT)
        self.save_request()  # 保存結果

    def get_plugin_info(self):
        info = na_plugin.find_one({"name": self.task_plugin})
        self.plugin_info = info

    def load_json_plugin(self):
        json_plugin = open(sys.path[0] + '/vuldb/' +
                           self.plugin_info['filename']).read()
        self.plugin_info['plugin'] = json.loads(json_plugin)['plugin']

    def set_request(self):
        url = 'http://' + \
            self.task_netloc[0] + ":" + \
            str(self.task_netloc[1]) + self.plugin_info['plugin']['url']
        if self.plugin_info['plugin']['method'] == 'GET':
            request = urllib2.Request(url)
        else:
            request = urllib2.Request(url, self.plugin_info['plugin']['data'])
        self.poc_request = request

    def get_code(self, header, html):
        try:
            m = re.search(r'<meta.*?charset=(.*?)"(>| |/)', html, flags=re.I)
            if m:
                return m.group(1).replace('"', '')
        except:
            pass
        try:
            if 'Content-Type' in header:
                Content_Type = header['Content-Type']
                m = re.search(r'.*?charset=(.*?)(;|$)',
                              Content_Type, flags=re.I)
                if m:
                    return m.group(1)
        except:
            pass

    def poc_check(self):
        try:
            res = urllib2.urlopen(self.poc_request, timeout=30)
            res_html = res.read(204800)
            header = res.headers
            # res_code = res.code
        except urllib2.HTTPError, e:
            # res_code = e.code
            header = e.headers
            res_html = e.read(204800)
        except Exception, e:
            return
        try:
            html_code = self.get_code(header, res_html).strip()
            if html_code and len(html_code) < 12:
                res_html = res_html.decode(html_code).encode('utf-8')
        except:
            pass
        an_type = self.plugin_info['plugin']['analyzing']
        vul_tag = self.plugin_info['plugin']['tag']
        analyzingdata = self.plugin_info['plugin']['analyzingdata']
        if an_type == 'keyword':
            # print poc['analyzingdata'].encode("utf-8")
            if analyzingdata.encode("utf-8") in res_html:
                self.result_info = vul_tag
        elif an_type == 'regex':
            if re.search(analyzingdata, res_html, re.I):
                self.result_info = vul_tag
        elif an_type == 'md5':
            md5 = hashlib.md5()
            md5.update(res_html)
            if md5.hexdigest() == analyzingdata:
                self.result_info = vul_tag

    def save_request(self):
        if self.result_info:
            time_ = datetime.datetime.now()
            self.log(str(self.task_netloc) + " " + self.result_info)
            v_count = na_result.find(
                {"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count()
            if not v_count:
                na_plugin.update({"name": self.task_plugin},
                                 {"$inc": {'count': 1}})
            vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'],
                       "vul_type": self.plugin_info['type']}
            w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1],
                     "vul_info": vulinfo, "info": self.result_info, "time": time_,
                     "task_date": TASK_DATE_DIC[str(self.task_id)]}
            na_result.insert(w_vul)
            # self.wx_send(w_vul)  # 自行定義漏洞提醒

    def log(self, info):
        lock.acquire()
        try:
            time_str = time.strftime('%X', time.localtime(time.time()))
            print "[%s] %s" % (time_str, info)
        except:
            pass
        lock.release()


def queue_get():
    global TASK_DATE_DIC
    task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={
                                       "$set": {"status": 1}}, sort={'time': 1})
    if task_req:
        TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()
        return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
    else:
        task_req_row = na_task.find({"plan": {"$ne": 0}})
        if task_req_row:
            for task_req in task_req_row:
                if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']):
                    if task_req['isupdate'] == 1:
                        task_req['target'] = update_target(
                            json.loads(task_req['query']))
                        na_task.update({"_id": task_req['_id']}, {
                                       "$set": {"target": task_req['target']}})
                    na_task.update({"_id": task_req['_id']}, {
                                   "$inc": {"status": 1}})
                    TASK_DATE_DIC[str(task_req['_id'])
                                  ] = datetime.datetime.now()
                    return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
        return '', '', '', ''


def update_target(query):
    target_list = []
    try:
        result_list = na_db.Info.find(query)
        for result in result_list:
            target = [result["ip"], result["port"]]
            target_list.append(target)
    except:
        pass
    return target_list


def monitor():
    global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST
    while True:
        queue_count = na_task.find({"status": 0, "plan": 0}).count()
        if queue_count:
            load = 1
        else:
            ac_count = thread._count()
            load = float(ac_count - 6) / THREAD_COUNT
        if load > 1:
            load = 1
        if load < 0:
            load = 0
        na_heart.update({"name": "load"}, {
                        "$set": {"value": load, "up_time": datetime.datetime.now()}})
        PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
        if load > 0:
            time.sleep(8)
        else:
            time.sleep(60)


def get_config():
    try:
        config_info = na_config.find_one({"type": "vulscan"})
        pass_row = config_info['config']['Password_dic']
        thread_row = config_info['config']['Thread']
        timeout_row = config_info['config']['Timeout']
        white_row = config_info['config']['White_list']
        password_dic = pass_row['value'].split('\n')
        thread_count = int(thread_row['value'])
        timeout = int(timeout_row['value'])
        white_list = white_row['value'].split('\n')
        return password_dic, thread_count, timeout, white_list
    except Exception, e:
        print e

def install_kunpeng_plugin():
    time_ = datetime.datetime.now()
    for plugin in kp.get_plugin_list():
        level_list = ['緊急','高危','中危','低危','提示']
        plugin_info = {
            '_id': plugin['references']['kpid'],
            'name': 'Kunpeng -' + plugin['name'],
            'info': plugin['remarks'] + ' ' + plugin['references']['cve'],
            'level': level_list[int(plugin['level'])],
            'type': plugin['type'],
            'author': plugin['author'],
            'url': plugin['references']['url'],
            'source': 1,
            'keyword': '',
            'add_time': time_,
            'filename': plugin['references']['kpid'],
            'count': 0
        }
        na_plugin.insert(plugin_info)

def init():
    time_ = datetime.datetime.now()
    if na_plugin.find().count() >= 1:
        return
    script_plugin = []
    json_plugin = []
    print 'init plugins'
    file_list = os.listdir(sys.path[0] + '/vuldb')
    for filename in file_list:
        try:
            if filename.split('.')[1] == 'py':
                script_plugin.append(filename.split('.')[0])
            if filename.split('.')[1] == 'json':
                json_plugin.append(filename)
        except:
            pass
    for plugin_name in script_plugin:
        try:
            res_tmp = __import__(plugin_name)
            plugin_info = res_tmp.get_plugin_info()
            plugin_info['add_time'] = time_
            plugin_info['filename'] = plugin_name
            plugin_info['count'] = 0
            na_plugin.insert(plugin_info)
        except:
            pass
    for plugin_name in json_plugin:
        try:
            json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read()
            plugin_info = json.loads(json_text)
            plugin_info['add_time'] = time_
            plugin_info['filename'] = plugin_name
            plugin_info['count'] = 0
            del plugin_info['plugin']
            na_plugin.insert(plugin_info)
        except:
            pass
    install_kunpeng_plugin()


def kp_check():
    while True:
        try:
            new_release = kp.check_version()
            print new_release
            if new_release:
                info = new_release['body']
                if '###' in new_release['body']:
                    info = new_release['body'].split('###')[1]
                row = {
                    'info': info,
                    'isInstall': 0,
                    'name': new_release['name'],
                    'author': new_release['author']['login'],
                    'pushtime': new_release['published_at'],
                    'location': "",
                    'unicode': new_release['tag_name'],
                    'coverage': 0,
                    'source': 'kunpeng'
                }
                na_update.insert(row)
                time.sleep(60 * 60 * 48)
        except Exception as e:
            print e
        time.sleep(60 * 30)


def kp_update():
    while True:
        try:
            row = na_update.find_one_and_delete(
                {'source': 'kunpeng', 'isInstall': 1})
            if row:
                kp.update_version(row['unicode'])
                na_plugin.delete_many({'_id':re.compile('^KP')})
                install_kunpeng_plugin()
        except Exception as e:
            print e
        time.sleep(10)


if __name__ == '__main__':
    init()
    PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() #從數據庫裏面得到配置參數,好比白名單,引擎的線程
    thread.start_new_thread(monitor, ())  #巡風的心跳線程
    thread.start_new_thread(kp_check, ())  #巡風的Kunpeng庫,檢查更新去狀況,跟蹤kp_check-->check_version-->_get_release_lates,這個函數發現的是與雲上的庫進行對比,若是有更新的話,就提示更新
    # 若是沒有更新的話,就不提示了
    thread.start_new_thread(kp_update, ()) #kunpeng庫更新,上面那個只是推送出來,可是尚未更新,這個是巡風更新的線程,就是若是你把kunpeng庫肯定更新了以後,而後才能是執行這個線程
    while True:
        try:
            task_id, task_plan, task_target, task_plugin = queue_get()  #獲取隊列,簡單來講就是得到任務的參數
            #若是status : 0 ,這個是未執行的狀態,若是是status : 1就是正在執行的狀態
            if task_id == '':
                time.sleep(10)
                # 每一個間隔10秒鐘
                continue
            if PLUGIN_DB:
                del sys.modules[PLUGIN_DB.keys()[0]]  # 清理插件緩存,這個我也沒太懂學長講的怎麼回事...
                PLUGIN_DB.clear()
            for task_netloc in task_target:
                while True:
                    if int(thread._count()) < THREAD_COUNT:
                        #若是任務的線程數<50
                        if task_netloc[0] in WHITE_LIST:
                            #若是探測的資產是在白名單裏面,那就break,結束  (^-^)
                            break
                        try:
                            thread.start_new_thread(
                                vulscan, (task_id, task_netloc, task_plugin))
                        except Exception as e:
                            print e
                        break
                    else:
                        time.sleep(2)
            if task_plan == 0:
                na_task.update({"_id": task_id}, {"$set": {"status": 2}})
        except Exception as e:
            print e

巡風若是說做爲一個檢測內網的手段,真的是很是好用。另外分析下巡風的掃描,也能學到很多東西,好比我以前覺得掃描攝像頭的時候,覺得是掃描的數據流,其實掃描的路徑,拼接路徑就能夠了。還有巡風打poc的地方,也能夠學習學習。不過自我感受,仍是巡風的資產探測比較好用,主要的仍是要把巡風的資產探測學好,巡風的資產探測仍是比較好的html

相關文章
相關標籤/搜索