最近在作把elk告警日誌發送到kinesis 流,供後續數據分析處理使用。。。。。。。。json
基於儘可能不修改elastalert ,把修改工做放到接收端服務的原則。計劃把elk的告警數據經過遠程api的接口的形式發送到接收端,而後由接收端處理接收到的數據,並傳送保存到kinesis 中。api
從網上搜索了下elastalert 相關告警配置,搜到的文章大多以郵件告警爲主,從官網扒拉了下資源,簡單實現方式以下:app
一、elastalert 配置(能夠本地測試)post
1)啓動配置config.yaml 測試
rules_folder: rules
run_every:
minutes: 1
buffer_time:
minutes: 5
es_host: es_endpoint
es_port: 9200
es_username: username
es_password: password
use_ssl: False
verify_certs: False
writeback_index: elastalert_status
alert_time_limit:
hours: 2
2)告警規則kinesis.yaml url
name: alertfortest type: frequency num_events: 1 timeframe: minutes: 1 index: debug-* filter: - terms: fields.app: ["app1","app2"] - query: query_string: default_field: "message" query: "error NOT INFO" alert: - "email" - "post" http_post_url: "http://localhost:8000/elastalert/" http_post_static_payload: rule_name: alertfortest smtp_host: "smtp.163.com" smtp_port: 25 from_addr: "elastalert@163.com" smtp_auth_file: /tmp/smtp_auth.yaml email: - "youremail@qq.com"
二、數據接收端spa
def elastalert2kinesis(request): if request.method == 'GET':return HttpResponse(status=400) elif request.method == 'POST': data_dict = { "region":"", "env":"","service":"", "ip":"","endpoint":"", "metric":"", "value":"",
"timestamp":0,
"dataSource":"",
"status":"" } alertbody = json.loads(bytes.decode(request.body)) endpoint_list = alertbody['beat']['hostname'].split('-') data_dict['env'] = endpoint_list[0] data_dict['region'] = endpoint_list[1] data_dict['service'] = "-".join(endpoint_list[2:-1]) data_dict['ip'] = endpoint_list[-1] data_dict['endpoint'] = alertbody['beat']['hostname'] data_dict['dataSource'] = "elk" data_dict['metric'] = alertbody['source'] data_dict['value'] = alertbody['message'] data_dict['timestamp'] = utc_to_local(alertbody['@timestamp'].split('.')[0]+"Z")
Stream().put_to_stream(data_dict['service'],**data_dict) print("data_dict.....................:",data_dict) return HttpResponse(status=200)