在日誌管理上咱們使用Elasticsearch,Logstash和Kibana技術棧來管理不斷增加的數據和日誌,可是對於錯誤日誌的監控ELK架構並無提供,因此咱們須要使用到第三方工具ElastAlert,來幫助咱們及時發現業務中存在的問題。php
ElastAlert經過按期查詢Elasticsearch,並將數據傳遞到規則類型,該規則類型肯定什麼時候找到匹配項。發生匹配時,將爲該警報提供一個或多個警報,這些警報將根據匹配採起行動。python
這是由一組規則配置的,每一個規則定義一個查詢,一個規則類型和一組警報。linux
ElastAlert支持如下方式報警git
Command (可調用短信接口)github
Emailsql
JIRAjson
OpsGenieapi
SNS緩存
HipChatbash
Slack
Telegram
Debug
Stomp
除了這種基本用法外,還有許多其餘功能使警報更加有用:
警報連接到Kibana儀表板
任意字段的合計計數
將警報合併爲按期報告
經過使用惟一鍵字段來分隔警報
攔截並加強比賽數據
ELK 環境部署
EFK7.5.0+kafka+logstash日誌分析平臺集羣
安裝依賴包
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel
部署python3.6
mkdir -p /usr/local/python3 cd /usr/local/python3 wget https://www.python.org/ftp/python/3.6.9/Python-3.6.9.tgz tar xf Python-3.6.9.tgz cd Python-3.6.9 ./configure --prefix=/usr/local/python3 make && make install
配置環境變量
將python2.7 軟鏈刪除,換成python3.6 rm /usr/bin/python ln -s /usr/local/python3/bin/python3.6 /usr/bin/python rm /usr/bin/pip ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
驗證版本
python Python 3.6.9 (default, Jun 17 2020, 15:20:38) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
pip -V
pip 20.1.1 from /usr/local/python3/lib/python3.6/site-packages/pip (python 3.6)
cd /app git clone https://github.com/Yelp/elastalert.git
安裝模塊
pip install "setuptools>=11.3" cd /app/elastalert/ python setup.py install
根據Elasticsearch的版本,您可能須要手動安裝正確版本的elasticsearch-py。
Elasticsearch 5.0+:
pip install "elasticsearch>=5.0.0"
Elasticsearch 2.X:
pip install "elasticsearch<3.0.0"
配置config.yaml
cp config.yaml.example config.yaml cat config.yaml|egrep -v "^#|^$" rules_folder: example_rules run_every: minutes: 1 buffer_time: minutes: 15 es_host: 192.168.1.234 es_port: 9200 writeback_index: elastalert_status writeback_alias: elastalert_alerts alert_time_limit: days: 2
rules_folder:ElastAlert從中加載規則配置文件的位置。它將嘗試加載文件夾中的每一個.yaml文件。沒有任何有效規則,ElastAlert將沒法啓動。 run_every:ElastAlert多久查詢一次Elasticsearch的時間。 buffer_time:查詢窗口的大小,從運行每一個查詢的時間開始向後延伸。對於其中use_count_query或use_terms_query設置爲true的規則,將忽略此值。 es_host:是Elasticsearch羣集的地址,ElastAlert將在其中存儲有關其狀態,查詢運行,警報和錯誤的數據。 es_port:es對應的端口。 es_username:可選的; 用於鏈接的basic-auth用戶名es_host。 es_password:可選的; 用於鏈接的basic-auth密碼es_host。 es_send_get_body_as:可選的; 方法查詢Elasticsearch - GET,POST或source。默認是GET writeback_index:ElastAlert將在其中存儲數據的索引的名稱。咱們稍後將建立此索引。 alert_time_limit:失敗警報的重試窗口。
建立elastalert-create-index索引
elastalert-create-index
New index name (Default elastalert_status)
Name of existing index to copy (Default None)
New index elastalert_status created
Done!
cd elastalert wget -P ~/elastalert/elastalert_modules/ wget https://raw.githubusercontent.com/anjia0532/elastalert-wechat-plugin/master/elastalert_modules/wechat_qiye_alert.py touch ~/elastalert/elastalert_modules/__init__.py
因爲這個插件是基於python2.x版本開發的,而ElastAlert的最新版本使用的是python3.6版本開發,因此須要改一些代碼,以便正常運行,另外還添添加了轉中文字符功能。
wechat_qiye_alert.py修改後以下:
#!/usr/bin/env python # -*- coding: utf-8 -*- import json import datetime from elastalert.alerts import Alerter, BasicMatchString from requests.exceptions import RequestException from elastalert.util import elastalert_logger,EAException #[感謝minminmsn分享](https://github.com/anjia0532/elastalert-wechat-plugin/issues/2#issuecomment-311014492) import requests from elastalert_modules.MyEncoder import MyEncoder ''' ################################################################# # 微信企業號推送消息 # # # # 做者: AnJia <anjia0532@gmail.com> # # 做者博客: https://anjia.ml/ # # Github: https://github.com/anjia0532/elastalert-wechat-plugin # # # ################################################################# ''' class WeChatAlerter(Alerter): #企業號id,secret,應用id必填 required_options = frozenset(['corp_id','secret','agent_id']) def __init__(self, *args): super(WeChatAlerter, self).__init__(*args) self.corp_id = self.rule.get('corp_id', '') #企業號id self.secret = self.rule.get('secret', '') #secret self.agent_id = self.rule.get('agent_id', '') #應用id self.party_id = self.rule.get('party_id') #部門id self.user_id = self.rule.get('user_id', '') #用戶id,多人用 | 分割,所有用 @all self.tag_id = self.rule.get('tag_id', '') #標籤id self.access_token = '' #微信身份令牌 self.expires_in=datetime.datetime.now() - datetime.timedelta(seconds=60) def create_default_title(self, matches): subject = 'ElastAlert: %s' % (self.rule['name']) return subject def alert(self, matches): if not self.party_id and not self.user_id and not self.tag_id: elastalert_logger.warn("All touser & toparty & totag invalid") # 參考elastalert的寫法 # https://github.com/Yelp/elastalert/blob/master/elastalert/alerts.py#L236-L243 body = self.create_alert_body(matches) #matches 是json格式 #self.create_alert_body(matches)是String格式,詳見 [create_alert_body 函數](https://github.com/Yelp/elastalert/blob/master/elastalert/alerts.py) # 微信企業號獲取Token文檔 # http://qydev.weixin.qq.com/wiki/index.php?title=AccessToken self.get_token() self.senddata(body) elastalert_logger.info("send message to %s" % (self.corp_id)) def get_token(self): #獲取token是有次數限制的,本想本地緩存過時時間和token,可是elastalert每次調用都是一次性的,不能全局緩存 if self.expires_in >= datetime.datetime.now() and self.access_token: return self.access_token #構建獲取token的url get_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' %(self.corp_id,self.secret) try: response = requests.get(get_token_url) response.raise_for_status() except RequestException as e: raise EAException("get access_token failed , stacktrace:%s" % e) #sys.exit("get access_token failed, system exit") token_json = response.json() if 'access_token' not in token_json : raise EAException("get access_token failed , , the response is :%s" % response.text()) #sys.exit("get access_token failed, system exit") #獲取access_token和expires_in self.access_token = token_json['access_token'] self.expires_in = datetime.datetime.now() + datetime.timedelta(seconds=token_json['expires_in']) return self.access_token def senddata(self, content): #若是須要原始json,須要傳入matches # http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8F # 微信企業號有字符長度限制(2048),超長自動截斷 # 參考 http://blog.csdn.net/handsomekang/article/details/9397025 #len utf8 3字節,gbk2 字節,ascii 1字節 if len(content) > 512 : content = content[:512] + "..." # 微信發送消息文檔 # http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8F send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s' %( self.access_token) headers = {'content-type': 'application/json'} # 替換消息標題爲中文,下面的字段爲logstash切分的日誌字段 title_dict = { # "At least": "報警規則:At least", "@timestamp": "報警時間", "_index": "索引名稱", "_type": "索引類型", "ServerIP": "報警主機", "hostname": "報警機器", "message": "報警內容", "class": "報錯類", "lineNum": "報錯行", "num_hits": "文檔命中數", "num_matches": "文檔匹配數" } #print(f"type:{type(content)}") for k, v in title_dict.items(): content = content.replace(k, v, 1 ) # 最新微信企業號調整校驗規則,tagid必須是string類型,若是是數字類型會報錯,故而使用str()函數進行轉換 payload = { "touser": self.user_id and str(self.user_id) or '', #用戶帳戶,建議使用tag "toparty": self.party_id and str(self.party_id) or '', #部門id,建議使用tag "totag": self.tag_id and str(self.tag_id) or '', #tag能夠很靈活的控制發送羣體細粒度。比較理想的推送應該是,在heartbeat或者其餘elastic工具自定義字段,添加標籤id。這邊根據自定義的標籤id,進行推送 'msgtype': "text", "agentid": self.agent_id, "text":{ "content": content.encode('UTF-8').decode("latin1") #避免中文字符發送失敗 }, "safe":"0" } # set https proxy, if it was provided # 若是須要設置代理,可修改此參數並傳入requests # proxies = {'https': self.pagerduty_proxy} if self.pagerduty_proxy else None try: #response = requests.post(send_url, data=json.dumps(payload, ensure_ascii=False), headers=headers) response = requests.post(send_url, data=json.dumps(payload, cls=MyEncoder, indent=4, ensure_ascii=False), headers=headers) response.raise_for_status() except RequestException as e: raise EAException("send message has error: %s" % e) elastalert_logger.info("send msg and response: %s" % response.text) def get_info(self): return {'type': 'WeChatAlerter'}
在同級目錄下建立MyEncoder.py文件
#!/usr/bin/env python # -*- coding: utf-8 -*- import json class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, bytes): return str(obj, encoding='utf-8') return json.JSONEncoder.default(self, obj)
3、申請企業微信帳號
step 1: 訪問網站 註冊企業微信帳號(不須要企業認證)。
step 2: 訪問apps 建立第三方應用,點擊建立應用按鈕 -> 填寫應用信息:
Step3: 建立部門,獲取部門ID
配置規則文件
cd elastalert/example_rules/ cp example_frequency.yaml applog.yaml cat sms-applog.yaml | grep -v ^# name: 【日誌報警】 use_strftine_index: true type: frequency index: applog* num_events: 1 timeframe: minutes: 1 filter: - query: query_string: query: "status: =500" alert: - "elastalert_modules.wechat_qiye_alert.WeChatAlerter" corp_id: wwcaedfed4430f8bb3 secret: ZCQ08TVhiddZuaZcm6DjieCipw-2rfyCA6570dCYDWE agent_id: 1000028 party_id: 4
index:要查詢的索引的名稱, ES中存在的索引。
num_events:此參數特定於frequency類型,而且是觸發警報時的閾值。
filter:用於過濾結果的Elasticsearch過濾器列表,這裏的規則定義是除了包含「發送郵件失敗」的錯誤日誌,其餘全部ERROR的日誌都會觸發報警。
alert:定義報警方式,咱們這裏採用企業微信報警。
corp_id:企業微信的接口認證信息
python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml
1 rules loaded
INFO:elastalert:Starting up
INFO:elastalert:Disabled rules are: []
INFO:elastalert:Sleeping for 9.999904 seconds INFO:elastalert:Queried rule 【日誌報警】 from 2020-06-05 17:47 CST to 2020-06-05 17:47 CST: 0 / 0 hits INFO:elastalert:Ran 【日誌報警】 from 2020-06-05 17:47 CST to 2020-06-05 17:47 CST: 0 query hits (0 already seen), 0 matches, 0 alerts sent
後臺運行 nohup python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml > nohup.txt 2>&1
執行以後報錯:
注意:執行後提示找不到elastalert_modules模塊的話,須要在~/
目錄下建立elastalert文件夾,而後再把elastalert_modules文件夾給放進去,而不是把elastalert_modules文件夾放在elastalert模塊的安裝路徑下
微信端添加企業號報警格式以下: