# coding:utf-8 # 漏洞检测引擎 import urllib2 import thread import time import pymongo import sys import datetime import hashlib import json import re import uuid import os from kunpeng import kunpeng sys.path.append(sys.path[0] + '/vuldb') # 加载漏洞插件的目录 sys.path.append(sys.path[0] + "/../") from config import ProductionConfig db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT) na_db = getattr(db_conn, ProductionConfig.DBNAME) na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD) na_task = na_db.Task na_result = na_db.Result na_plugin = na_db.Plugin na_config = na_db.Config na_heart = na_db.Heartbeat na_update = na_db.Update lock = thread.allocate() PASSWORD_DIC = [] THREAD_COUNT = 50 TIMEOUT = 10 PLUGIN_DB = {} TASK_DATE_DIC = {} WHITE_LIST = [] kp = kunpeng() #巡风的漏洞扫描技术要先看初始化的方法,看完初始化的方法,再从main函数开始看,这样容易理解,分析完main函数之后,再从start函数慢慢看 class vulscan(): # 初始化操作 def __init__(self, task_id, task_netloc, task_plugin): self.task_id = task_id #任务id self.task_netloc = task_netloc #任务端口 self.task_plugin = task_plugin #任务插件 self.result_info = '' #任务结果 self.start() #任务开始 # start ---> 开始检测 def start(self): self.get_plugin_info() #获取插件列表,这个在80多行有个get_plugin_info方法,主要就是info = xxx。 #这个是获取.json格式的漏洞库,主要用来是探测,并不能起到exp的作用 if '.json' in self.plugin_info['filename']: # 标示符检测模式 self.load_json_plugin() # 读取漏洞标示 # 跟踪load_json_plugin()函数,读取的时候,是字符串。所以需要转换成json的形式。 self.set_request() # 标示符转换为请求 # 这个就是发送request的请求了 self.poc_check() # 检测 # 进行poc验证,这个地方需要是check(),里面有py,md5,json三种格式的请求,所以要分别的验证一下 # 如果时kunpeng的脚本开始验证,分为两种一种是web端的http,https,一种是除了web端。没咋懂,就这样吧,有空再看 elif 'KP-' in self.plugin_info['filename']: self.log(str(self.task_netloc) + 'call kunpeng - ' + self.plugin_info['filename']) kp.set_config(TIMEOUT, PASSWORD_DIC) if self.task_netloc[1] != 80: self.result_info = kp.check('service', '{}:{}'.format( self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename']) if not self.result_info: scheme = 'http' if self.task_netloc[1] == 443: scheme = 'https' self.result_info = kp.check('web', '{}://{}:{}'.format( scheme, self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename']) else: # 脚本检测模式,这个利用的py脚本的poc,还是比较有意思的,可能能说到说到的就是这一个了,这个和我们平常写的跑py脚本差不多。 plugin_filename = self.plugin_info['filename'] # log里面封装的是一个print方法 self.log(str(self.task_netloc) + 'call ' + self.task_plugin) if task_plugin not in PLUGIN_DB: plugin_res = __import__(plugin_filename) setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC) # 给插件声明密码字典 PLUGIN_DB[plugin_filename] = plugin_res self.result_info = PLUGIN_DB[plugin_filename].check( str(self.task_netloc[0]), int(self.task_netloc[1]), TIMEOUT) self.save_request() # 保存结果 def get_plugin_info(self): info = na_plugin.find_one({"name": self.task_plugin}) self.plugin_info = info def load_json_plugin(self): json_plugin = open(sys.path[0] + '/vuldb/' + self.plugin_info['filename']).read() self.plugin_info['plugin'] = json.loads(json_plugin)['plugin'] def set_request(self): url = 'http://' + \ self.task_netloc[0] + ":" + \ str(self.task_netloc[1]) + self.plugin_info['plugin']['url'] if self.plugin_info['plugin']['method'] == 'GET': request = urllib2.Request(url) else: request = urllib2.Request(url, self.plugin_info['plugin']['data']) self.poc_request = request def get_code(self, header, html): try: m = re.search(r'<meta.*?charset=(.*?)"(>| |/)', html, flags=re.I) if m: return m.group(1).replace('"', '') except: pass try: if 'Content-Type' in header: Content_Type = header['Content-Type'] m = re.search(r'.*?charset=(.*?)(;|$)', Content_Type, flags=re.I) if m: return m.group(1) except: pass def poc_check(self): try: res = urllib2.urlopen(self.poc_request, timeout=30) res_html = res.read(204800) header = res.headers # res_code = res.code except urllib2.HTTPError, e: # res_code = e.code header = e.headers res_html = e.read(204800) except Exception, e: return try: html_code = self.get_code(header, res_html).strip() if html_code and len(html_code) < 12: res_html = res_html.decode(html_code).encode('utf-8') except: pass an_type = self.plugin_info['plugin']['analyzing'] vul_tag = self.plugin_info['plugin']['tag'] analyzingdata = self.plugin_info['plugin']['analyzingdata'] if an_type == 'keyword': # print poc['analyzingdata'].encode("utf-8") if analyzingdata.encode("utf-8") in res_html: self.result_info = vul_tag elif an_type == 'regex': if re.search(analyzingdata, res_html, re.I): self.result_info = vul_tag elif an_type == 'md5': md5 = hashlib.md5() md5.update(res_html) if md5.hexdigest() == analyzingdata: self.result_info = vul_tag def save_request(self): if self.result_info: time_ = datetime.datetime.now() self.log(str(self.task_netloc) + " " + self.result_info) v_count = na_result.find( {"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count() if not v_count: na_plugin.update({"name": self.task_plugin}, {"$inc": {'count': 1}}) vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'], "vul_type": self.plugin_info['type']} w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1], "vul_info": vulinfo, "info": self.result_info, "time": time_, "task_date": TASK_DATE_DIC[str(self.task_id)]} na_result.insert(w_vul) # self.wx_send(w_vul) # 自行定义漏洞提醒 def log(self, info): lock.acquire() try: time_str = time.strftime('%X', time.localtime(time.time())) print "[%s] %s" % (time_str, info) except: pass lock.release() def queue_get(): global TASK_DATE_DIC task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={ "$set": {"status": 1}}, sort={'time': 1}) if task_req: TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now() return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin'] else: task_req_row = na_task.find({"plan": {"$ne": 0}}) if task_req_row: for task_req in task_req_row: if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']): if task_req['isupdate'] == 1: task_req['target'] = update_target( json.loads(task_req['query'])) na_task.update({"_id": task_req['_id']}, { "$set": {"target": task_req['target']}}) na_task.update({"_id": task_req['_id']}, { "$inc": {"status": 1}}) TASK_DATE_DIC[str(task_req['_id']) ] = datetime.datetime.now() return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin'] return '', '', '', '' def update_target(query): target_list = [] try: result_list = na_db.Info.find(query) for result in result_list: target = [result["ip"], result["port"]] target_list.append(target) except: pass return target_list def monitor(): global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST while True: queue_count = na_task.find({"status": 0, "plan": 0}).count() if queue_count: load = 1 else: ac_count = thread._count() load = float(ac_count - 6) / THREAD_COUNT if load > 1: load = 1 if load < 0: load = 0 na_heart.update({"name": "load"}, { "$set": {"value": load, "up_time": datetime.datetime.now()}}) PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() if load > 0: time.sleep(8) else: time.sleep(60) def get_config(): try: config_info = na_config.find_one({"type": "vulscan"}) pass_row = config_info['config']['Password_dic'] thread_row = config_info['config']['Thread'] timeout_row = config_info['config']['Timeout'] white_row = config_info['config']['White_list'] password_dic = pass_row['value'].split('\n') thread_count = int(thread_row['value']) timeout = int(timeout_row['value']) white_list = white_row['value'].split('\n') return password_dic, thread_count, timeout, white_list except Exception, e: print e def install_kunpeng_plugin(): time_ = datetime.datetime.now() for plugin in kp.get_plugin_list(): level_list = ['紧急','高危','中危','低危','提示'] plugin_info = { '_id': plugin['references']['kpid'], 'name': 'Kunpeng -' + plugin['name'], 'info': plugin['remarks'] + ' ' + plugin['references']['cve'], 'level': level_list[int(plugin['level'])], 'type': plugin['type'], 'author': plugin['author'], 'url': plugin['references']['url'], 'source': 1, 'keyword': '', 'add_time': time_, 'filename': plugin['references']['kpid'], 'count': 0 } na_plugin.insert(plugin_info) def init(): time_ = datetime.datetime.now() if na_plugin.find().count() >= 1: return script_plugin = [] json_plugin = [] print 'init plugins' file_list = os.listdir(sys.path[0] + '/vuldb') for filename in file_list: try: if filename.split('.')[1] == 'py': script_plugin.append(filename.split('.')[0]) if filename.split('.')[1] == 'json': json_plugin.append(filename) except: pass for plugin_name in script_plugin: try: res_tmp = __import__(plugin_name) plugin_info = res_tmp.get_plugin_info() plugin_info['add_time'] = time_ plugin_info['filename'] = plugin_name plugin_info['count'] = 0 na_plugin.insert(plugin_info) except: pass for plugin_name in json_plugin: try: json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read() plugin_info = json.loads(json_text) plugin_info['add_time'] = time_ plugin_info['filename'] = plugin_name plugin_info['count'] = 0 del plugin_info['plugin'] na_plugin.insert(plugin_info) except: pass install_kunpeng_plugin() def kp_check(): while True: try: new_release = kp.check_version() print new_release if new_release: info = new_release['body'] if '###' in new_release['body']: info = new_release['body'].split('###')[1] row = { 'info': info, 'isInstall': 0, 'name': new_release['name'], 'author': new_release['author']['login'], 'pushtime': new_release['published_at'], 'location': "", 'unicode': new_release['tag_name'], 'coverage': 0, 'source': 'kunpeng' } na_update.insert(row) time.sleep(60 * 60 * 48) except Exception as e: print e time.sleep(60 * 30) def kp_update(): while True: try: row = na_update.find_one_and_delete( {'source': 'kunpeng', 'isInstall': 1}) if row: kp.update_version(row['unicode']) na_plugin.delete_many({'_id':re.compile('^KP')}) install_kunpeng_plugin() except Exception as e: print e time.sleep(10) if __name__ == '__main__': init() PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() #从数据库里面获得配置参数,比如白名单,引擎的线程 thread.start_new_thread(monitor, ()) #巡风的心跳线程 thread.start_new_thread(kp_check, ()) #巡风的Kunpeng库,检查更新去情况,跟踪kp_check-->check_version-->_get_release_lates,这个函数发现的是与云上的库进行对比,如果有更新的话,就提示更新 # 如果没有更新的话,就不提示了 thread.start_new_thread(kp_update, ()) #kunpeng库更新,上面那个只是推送出来,但是还没有更新,这个是巡风更新的线程,就是如果你把kunpeng库确定更新了之后,然后才能是执行这个线程 while True: try: task_id, task_plan, task_target, task_plugin = queue_get() #获取队列,简单来说就是获得任务的参数 #如果status : 0 ,这个是未执行的状态,如果是status : 1就是正在执行的状态 if task_id == '': time.sleep(10) # 每个间隔10秒钟 continue if PLUGIN_DB: del sys.modules[PLUGIN_DB.keys()[0]] # 清理插件缓存,这个我也没太懂学长讲的怎么回事... PLUGIN_DB.clear() for task_netloc in task_target: while True: if int(thread._count()) < THREAD_COUNT: #如果任务的线程数<50 if task_netloc[0] in WHITE_LIST: #如果探测的资产是在白名单里面,那就break,结束 (^-^) break try: thread.start_new_thread( vulscan, (task_id, task_netloc, task_plugin)) except Exception as e: print e break else: time.sleep(2) if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}}) except Exception as e: print e
巡风如果说作为一个检测内网的手段,真的是非常好用。另外分析下巡风的扫描,也能学到不少东西,比如我之前以为扫描摄像头的时候,以为是扫描的数据流,其实扫描的路径,拼接路径就可以了。还有巡风打poc的地方,也可以学习学习。不过自我感觉,还是巡风的资产探测比较好用,主要的还是要把巡风的资产探测学好,巡风的资产探测还是比较好的