#!/usr/bin/env python # -*- coding: utf-8 -*- #minyt 2018.9.1 #獲取24小時內出現的模塊次數 # 該程序經過elasticsearch python client 獲取相關精簡數據,能夠計算請求數、超時數、錯誤數、正確率、錯誤率等等 import MySQLdb from elasticsearch import Elasticsearch from elasticsearch import helpers #定義elasticsearch集羣索引名 index_name = "logstash-nginxlog-*" #實例化Elasticsearch類,並設置超時間爲180秒,默認是10秒的,若是數據量很大,時間設置更長一些 es = Elasticsearch(['elasticsearch01','elasticsearch02','elasticsearch03'],timeout=180) #DSL(領域特定語言)查詢語法,查詢top50 sname的排列次數 data_sname = { "aggs": { "2": { "terms": { "field": "apistatus.sname.keyword", "size": 100, "order": { "_count": "desc" } } } }, "size": 0, "_source": { "excludes": [] }, "stored_fields": [ "*" ], "script_fields": {}, "docvalue_fields": [ "@timestamp" ], "query": { "bool": { "must": [ { "match_all": {} }, { "range": { "@timestamp": { "gte" : "now-24h/h", "lt" : "now/h" } } } ], "filter": [], "should": [], "must_not": [] } } } #按照DSL(特定領域語言)語法查詢獲取數據 def get_original_data(): try: #根據上面條件搜索數據 res = es.search( index=index_name, size=0, body=data_sname ) return res except: print "get original data failure" #初始化數據庫 def init_mysql(): # 打開數據庫鏈接 db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' ) # 使用cursor()方法獲取操做遊標 cursor = db.cursor() # SQL 更新語句 sql = "update appname set count=0" try: # 執行SQL語句 cursor.execute(sql) # 提交到數據庫執行 db.commit() except: # 發生錯誤時回滾 db.rollback() # 關閉數據庫鏈接 db.close() def updata_mysql(sname_count,sname_list): # 打開數據庫鏈接 db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' ) # 使用cursor()方法獲取操做遊標 cursor = db.cursor() # SQL 更新語句 sql = "update appname set count=%d where sname = '%s'" % (sname_count,sname_list) try: # 執行SQL語句 cursor.execute(sql) # 提交到數據庫執行 db.commit() except: # 發生錯誤時回滾 db.rollback() # 關閉數據庫鏈接 db.close() #根據Index數據結構經過Elasticsearch Python Client上傳數據到新的Index def import_process_data(): try: #列表形式顯示結果 res = get_original_data() #print res res_list = res.get('aggregations').get('2').get('buckets') #print res_list #初始化數據庫 init_mysql() #獲取24小時內出現的SNAME for value in res_list: sname_list = value.get('key') sname_count = value.get('doc_count') print sname_list,sname_count #更新sname_status值 updata_mysql(sname_count,sname_list) except Exception, e: print repr(e) if __name__ == "__main__": import_process_data()
關鍵是DSL語法的編寫涉及查詢與聚合能夠經過kibana的visualize或者devtool先測試出正確語法,而後結合python對列表、字典、除法、字符串等操做便可。下面彙總下各個算法:python
總請求
http_host.keyword: api.mydomain.com mysql
超長請求
http_host.keyword: api.mydomain.com AND request_time: [1 TO 600] NOT apistatus.status.keyword:*錯誤nginx
錯誤請求
apistatus.status.keyword:*錯誤 AND (http_host.keyword: api.mydomain.com OR http_host.keyword: api.yourdomain.com )算法
請求健康度
域名與request_time聚合,域名請求時間小於3秒的次數除以總請求次數對應各個域名健康度sql