Python Elasticsearch API操做ES集羣

環境

  • Centos 7.4
  • Python 2.7
  • Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1
  • Elasitcsearch6.3.2

知識點

  • 調用Python Elasticsearh API
  • Python Mysqldb使用
  • DSL查詢與聚合
  • Python 列表操做

代碼

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#minyt 2018.9.1
#獲取24小時內出現的模塊次數
# 該程序經過elasticsearch python client 獲取相關精簡數據,能夠計算請求數、超時數、錯誤數、正確率、錯誤率等等
import MySQLdb
from elasticsearch import Elasticsearch
from elasticsearch import helpers

#定義elasticsearch集羣索引名
index_name = "logstash-nginxlog-*"

#實例化Elasticsearch類,並設置超時間爲180秒,默認是10秒的,若是數據量很大,時間設置更長一些
es = Elasticsearch(['elasticsearch01','elasticsearch02','elasticsearch03'],timeout=180)

#DSL(領域特定語言)查詢語法,查詢top50 sname的排列次數
data_sname = {
  "aggs": {
    "2": {
      "terms": {
        "field": "apistatus.sname.keyword",
        "size": 100,
        "order": {
          "_count": "desc"
        }
      }
    }
  },
  "size": 0,
  "_source": {
    "excludes": []
  },
  "stored_fields": [
    "*"
  ],
  "script_fields": {},
  "docvalue_fields": [
    "@timestamp"
  ],
  "query": {
    "bool": {
      "must": [
        {
          "match_all": {}
        },
        {
          "range": {
            "@timestamp": {
              "gte" : "now-24h/h",
              "lt" :  "now/h"
            }
          }
        }
      ],
      "filter": [],
      "should": [],
      "must_not": []
    }
  }
}

#按照DSL(特定領域語言)語法查詢獲取數據
def get_original_data():
    try:
        #根據上面條件搜索數據
        res = es.search(
            index=index_name,
            size=0,
            body=data_sname
        )
        return res

    except:
        print "get original data failure"

#初始化數據庫
def init_mysql():
    # 打開數據庫鏈接
    db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )

    # 使用cursor()方法獲取操做遊標 
    cursor = db.cursor()

    # SQL 更新語句
    sql = "update appname set count=0"
    try:
        # 執行SQL語句
        cursor.execute(sql)
        # 提交到數據庫執行
        db.commit()
    except:
        # 發生錯誤時回滾
        db.rollback()

    # 關閉數據庫鏈接
    db.close()

def updata_mysql(sname_count,sname_list):
    # 打開數據庫鏈接
       db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )

    # 使用cursor()方法獲取操做遊標 
    cursor = db.cursor()

    # SQL 更新語句
    sql = "update appname set count=%d where sname = '%s'" % (sname_count,sname_list)
    try:
        # 執行SQL語句
        cursor.execute(sql)
        # 提交到數據庫執行
        db.commit()
    except:
        # 發生錯誤時回滾
        db.rollback()

    # 關閉數據庫鏈接
    db.close()

#根據Index數據結構經過Elasticsearch Python Client上傳數據到新的Index
def import_process_data():
    try:
        #列表形式顯示結果
        res = get_original_data()
        #print res
        res_list = res.get('aggregations').get('2').get('buckets')
        #print res_list

        #初始化數據庫
        init_mysql()

        #獲取24小時內出現的SNAME 
        for value in res_list:
            sname_list = value.get('key')
            sname_count = value.get('doc_count')
            print sname_list,sname_count
            #更新sname_status值
            updata_mysql(sname_count,sname_list)

    except Exception, e:
        print repr(e)

if __name__ == "__main__":
    import_process_data()

總結

關鍵是DSL語法的編寫涉及查詢與聚合能夠經過kibana的visualize或者devtool先測試出正確語法,而後結合python對列表、字典、除法、字符串等操做便可。下面彙總下各個算法:python

  • 總請求
    http_host.keyword: api.mydomain.com mysql

  • 超長請求
    http_host.keyword: api.mydomain.com AND request_time: [1 TO 600] NOT apistatus.status.keyword:*錯誤nginx

  • 錯誤請求
    apistatus.status.keyword:*錯誤 AND (http_host.keyword: api.mydomain.com OR http_host.keyword: api.yourdomain.com )算法

  • 請求健康度
    域名與request_time聚合,域名請求時間小於3秒的次數除以總請求次數對應各個域名健康度sql

  • 請求正確率域名與http狀態碼聚合,域名http狀態碼爲200的次數除以域名總請求數對應各個域名的請求正確率
相關文章
相關標籤/搜索