通報漏洞後,開發未能及時修復漏洞,致使被攻擊,領導說我發現被攻擊的時間晚了,因爲一我的安所有精力有限未能及時看IPS告警,因而作了個釘釘告警。php
本人環境介紹css
ubuntu 14.04 python 2.7 kibana-5.5.2 logstash-5.5.2 elasticsearch-5.5.2 paloalto軟件版本7.1.14
elasticsearch下載地址:https://www.elastic.co/cn/downloads/elasticsearchhtml
kibana下載地址:https://www.elastic.co/cn/downloads/kibanapython
Logstash下載地址:https://www.elastic.co/cn/downloads/logstashlinux
elasticsearch和kibana配置就很少說了,比較簡單。git
新建syslog.conf文件,此版本的paloalto的有50多個字段,暴力配置以下,全部轉發過來的日誌直接丟到elasticsearch裏面,最後用kibana展現(能夠大屏裝X)github
啓動logstash:nohup ./bin/logstash -f syslog.conf &web
input{
syslog{#輸入syslog type => "syslog" port => 514 } } filter { grok { match => ["message", "%{DATA:Domain}\,%{DATA:Receive-Time}\,%{DATA:Serial}\,%{DATA:Type}\,%{DATA:Threat-Type}\,%{DATA:Config-Version}\,%{DATA:Generate-Time}\,%{IP:Source-address}\,%{IP:Destination-address}\,%{DATA:NAT-Source-IP}\,%{DATA:NAT-Destination-IP}\,%{DATA:Rule}\,%{DATA:Source-User}\,%{DATA:Destination-User}\,%{DATA:Application}\,%{DATA:Virtual-System}\,%{DATA:Source-Zone}\,%{DATA:Destination-Zone}\,%{DATA:Inbound-Interface}\,%{DATA:Outbound-Interface}\,%{DATA:Log-Action}\,%{DATA:Time-Logged}\,%{DATA:Session-ID}\,%{DATA:Repeat-Count}\,%{DATA:Source-Port}\,%{DATA:Destination-Port}\,%{DATA:NAT-Source-Port}\,%{DATA:NAT-Destination-Port}\,%{DATA:Flags}\,%{DATA:IP-Protocol}\,%{DATA:Action}\,%{DATA:URL}\,%{DATA:Threat-Content-Name}\,%{DATA:Category}\,%{DATA:Severity}\,%{DATA:Direction}\,%{DATA:Sequence-Number}\,%{DATA:Action-Flags}\,%{DATA:Source-Country}\,%{DATA:Destination-Country}\,%{DATA:cpadding}\,%{DATA:contenttype}\,%{DATA:pcap_id}\,%{DATA:filedigest}\,%{DATA:cloud}\,%{DATA:url_idx}\,%{DATA:user_agent}\,%{DATA:filetype}\,%{DATA:xff}\,%{DATA:referer}\,%{DATA:sender}\,%{DATA:subject}\,%{DATA:recipient}\,%{DATA:reportid}\,%{DATA:dg_hier_level_1}\,%{DATA:dg_hier_level_2}\,%{DATA:dg_hier_level_3}\,%{DATA:dg_hier_level_4}\,%{DATA:Virtual-System-Name}\,%{DATA:Device-Name}\,%{DATA:file_url}"] } } output{ elasticsearch{ hosts => ["x.x.x.x:9200"] index => "syslog" } # stdout{#控制檯打印輸出 # codec => rubydebug # } }
一共有寄個地方要注意一下,不然日誌轉發不成功sql
一、建立syslog,轉發到logstash服務器ubuntu
二、配置轉發用syslog
三、配置你想要的日誌類型和嚴重性
四、在安全策略出匹配設置的日誌轉發
五、最後記得提交配置,不然不生效
kibana最終效果
elastalert:https://github.com/Yelp/elastalert.git釘釘告警:https://github.com/xuyaoqiang/elastalert-dingtalk-plugin部分依賴:
sudo apt-get install python-dev libffi-dev
pip install elastalert 或者 git clone https://github.com/Yelp/elastalert.git cd elastalert sudo python setup.py install sudo pip install -r requirements.txt
其中有部分依賴可能安裝錯誤,請單獨下載安裝既可。
sudo elastalert-create-index --index elastalert
根據本身的狀況,填入elasticsearch的相關信息,關於 elastalert_status部分直接回車默認的便可。 以下所示:
Enter Elasticsearch host: localhost
Enter Elasticsearch port: 9200
Use SSL? t/f: Enter optional basic-auth username (or leave blank): Enter optional basic-auth password (or leave blank): Enter optional Elasticsearch URL prefix (prepends a string to the URL of every request): Name of existing index to copy? (Default None) Elastic Version:5 Mapping used for string:{'index': 'not_analyzed', 'type': 'string'} New index elastalert created Done!
3.一、修改elastalert的配置文件
下載https://github.com/xuyaoqiang/elastalert-dingtalk-plugin
把elastalert-dingtalk-plugin中的elastalert_modules、rules和config.yaml複製到elastalert下
修改config.yaml對應配置
es_host: elasticsearch 地址 es_port: elasticsearch 端口
3.二、修改rules的配置文件
官方有不少rules規則能夠去看官方文檔:http://elastalert.readthedocs.io/en/latest/ruletypes.html#rule-types
修改xxx.yaml對應配置:
name: IPS安全告警 #惟一值重複告警規則type: cardinality #es_host: localhost #es_port: 9200 # Index to search, wildcard supported index: syslog cardinality_field: Source-address.keyword #最小5次觸發規則min_cardinality: 5 #max_cardinality: 5 # 60秒內 timeframe: seconds: 60 #5分鐘內重複告警不告警realert: minutes: 30 # ES 查詢,用以過濾 #filter: #- term: # Severity: "high" # (Required) # The alert is use when a match is found alert: * "debug"#你本身定義的釘釘告警腳本 * "elastalert_modules.dingtalk_alert.DingTalkAlerter" dingtalk_webhook: 在釘釘羣中添加機器人能夠獲取dingtalk_msgtype: "text"
5.一、啓用釘釘的報警
原生告警比較不友好
修改成:
修改了elastalert-dingtalk-plugin-master\elastalert_modules\dingtalk_alert.py裏面的代碼,
爲了獲取告警時間,而後對這個時間變爲時間段,到es裏面查詢,獲取對應的字段值,這個裏的時間轉換比較亂,代碼寫的渣,大佬能夠忽略
# 獲取對應時間段,並查詢到對應可疑ip地址 get_time = body.split('\n')[4].split(' ')[1] times = get_time.split('.')[0].split(':')[:2] t = times[0] t1 = int(times[1]) - 2 t2 = int(times[1]) + 2 # 將其轉換爲時間數組 timeStruct = time.strptime(t + ':' + str(t1), "%Y-%m-%dT%H:%M") # 轉換爲時間戳: timeStamp1 = int(time.mktime(timeStruct)) # 時間戳轉換爲指定格式日期 localTime = time.localtime(timeStamp1) gt = time.strftime("%Y-%m-%dT%H:%M", localTime) timeStruct = time.strptime(t + ':' + str(t2), "%Y-%m-%dT%H:%M") # 轉換爲時間戳: timeStamp2 = int(time.mktime(timeStruct)) # 時間戳轉換爲指定格式日期 localTime = time.localtime(timeStamp2) lt = time.strftime("%Y-%m-%dT%H:%M", localTime) # print(gt+'\n'+lt) es = Elasticsearch("10.11.10.245:9200") body = { "query": { "range" : { "@timestamp" : { "gt" : gt, "lt": lt } } } } res = es.search(index="syslog", body=body) text = res['hits']['hits'] if len(text) != 0: sip = text[0]['_source']['Source-address'] dip = text[0]['_source']['Destination-address'] dport = text[0]['_source']['Destination-Port'] atype = text[0]['_source']['Threat-Content-Name'] ntime = text[0]['_source']['Time-Logged'] payload = { "msgtype": self.dingtalk_msgtype, "text": { "content": "IPS安全告警\n發現源ip地址: %s 在30秒內,對服務器ip:%s 的 %s 端口進行了5次攻擊,攻擊類型爲 %s,請排除或確認攻擊!\n(攻擊時間點:%s)" % (sip, dip, dport, atype, ntime) }, "at": { "isAtAll":False } }
./elasticsearch-5.5.2/bin/elasticsearch & ./kibana-5.5.2-linux-x86_64/bin/kibana & ./logstash-5.5.2/bin/logstash -f /xxxx/logstash-5.5.2/syslog.conf & python -m ./elastalert/elastalert.elastalert --verbose &
至此大功告成,其實elk還能夠接收各類日誌,本身作分析,而後告警,本文只是其中一個場景,你們能夠收集全部日誌一塊兒作集中告警。謝謝各位大佬捧場。