經過訂閱網上公開的惡意ip庫(威脅情報),與SIEM平臺中網絡流量日誌進行匹配,得到安全事件告警。node
好比,這裏有一個malware urls數據下載的網站,天天更新一次:
https://urlhaus.abuse.ch/browse/
下載urlhaus裏惡意url數據,https://urlhaus.abuse.ch/downloads/text/, 稍微整理一下,作成一個csv格式的文件,方便導入splunk:python
添加nslookup file:
Settings >> Lookups » Lookup table files » Add newjson
用rest command查看lookup file 添加是否正確:api
| inputlookup myiptest.csv
安全
假設如今有ids日誌,此日誌包含通過ids的內外網鏈接,其中src_ip爲內網ip,dest_ip爲外網目的ip:
sourcetype=ids_log_1 |table src_ip,dest_ip網絡
如今對ids的dest_ip字段和myiptest.csv的threatdomain 字段進行匹配:
sourcetype=ids_log_1 [| inputlookup myiptest.csv | rename threatdomain as dest_ip | table dest_ip]
命中兩條記錄:session
如今完成一次搜索,而後繼續將這個流程自動化,這裏涉及到Splunk Summary Index的概念:app
Summary index的工做機制其實很簡單,它本質上和其餘索引沒有區別,只不過裏面存儲的數據是按期運行的saved search的統計結果。經過按期運行的saved search,SPLUNK能夠將一個較短期間隔特定事件的統計結果存儲到summary index中,經過合理的統計時段和搜索運行計劃配置,全部時間段的事件統計結果都將無一疏漏地紀錄下來。這樣在須要統計過去一年這樣長時段的事件時,經過summary index就能夠很快速的把全部統計結果彙總,從而獲得最終的結果。這正所謂積硅步以至千里,聚細流以成江海。dom
建立一個summary index:
添加一個saved search, 能夠60分鐘執行一次,每次執行60分鐘到now的數據:
Settings>>Searches, reports, and alerts » Add new
http://localhost:8000/en-US/manager/search/data/indexes/_new
選擇使用summary indexing, 寫入剛纔建立的security_event_hub index裏面:
保存完成,坐等安全告警:
index="security_event_hub"
最後能夠將安全告警推送到工單系統,新的告警在工單系統中解決掉,這就完成一整個Use Case流程。
附代碼舉例:
Splunk Rest api 添加Lookup file :
#!/usr/bin/python -u import urllib import httplib2 from xml.dom import minidom import time import json # The same python implementation for curl function ''' import requests from xml.dom import minidom userName = 'admin' password = 'xiaoxiaoleo' baseurl = '127.0.0.1:8089' session = requests.Session() session.auth = (userName, password) #auth = session.post('https://' + baseurl, verify=False) data = {'username':userName, 'password':password} req = session.post('https://' + baseurl + '/services/auth/login', data=data, verify=False) session_key = minidom.parseString(req.text).getElementsByTagName('sessionKey')[0].childNodes[0].nodeValue print(session_key) headers = {"Content-Type": "application/xml", 'Authorization': 'Splunk %s' % session_key} def addlookup(): data = {'name': 'haha.csv','eai:data' : 'C:\\Program Files\\Splunk\\var\\run\\splunk\\lookup_tmp\\abuse_20190215T1418.csv'} req = requests.post('https://127.0.0.1:8089/servicesNS/nobody/search/data/lookup-table-files/geo_attr_us_states.csv', headers=headers, data=data, verify=False) print(req.text) if __name__ == '__main__': addlookup()
獲取urlhaus.abuse.ch的ip地址列表並寫入csv:
import json import requests from datetime import datetime, timedelta search_command = '' def get_abusechrul(): domain_list = [] url_list = [] req = requests.get('https://urlhaus.abuse.ch/downloads/text/') body = req.text for i in body.split('\r\n'): if i.startswith('#'): continue domain = i.split("//")[-1].split("/")[0].split(':')[0] url = i.replace("http://", "") url = url.replace("https://", "") url_list.append(url) domain_list.append(domain) return domain_list, url_list def generate_csv(source_name, domain_list, url_list): now = datetime.utcnow() + timedelta(hours=8) timestamp = now.strftime('%Y%m%dT%H%M.csv') filename = '%s_%s' % (source_name, timestamp) with open(filename, "wb") as csv_file: csv_file.write('threatdomain' + '\n') csv_file.writelines('91.189.91.26' + '\n') for line in domain_list: csv_file.write(line + '\n') if __name__ == '__main__': domain_list, url_list = get_abusechrul() generate_csv('abuse', domain_list,url_list) #print cmd1