Python Elasticsearch api

1、介紹

ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口。下面介紹了利用Python API接口進行數據查詢,方便其餘系統的調用。html

安裝API

pip3 install elasticsearch

 

創建es鏈接

無用戶名密碼狀態

from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])

 

默認的超時時間是10秒,若是數據量很大,時間設置更長一些。若是端口是9200,直接寫IP便可。代碼以下:java

es = Elasticsearch(['10.10.13.12'], timeout=3600)

 

用戶名密碼狀態

若是Elasticsearch開啓了驗證,須要用戶名和密碼python

es = Elasticsearch(['10.10.13.12'], http_auth=('xiao', '123456'), timeout=3600)

 

數據檢索功能

es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')

 

經常使用參數
  • index - 索引名
  • q - 查詢指定匹配 使用Lucene查詢語法
  • from_ - 查詢起始點  默認0
  • doc_type - 文檔類型
  • size - 指定查詢條數 默認10
  • field - 指定字段 逗號分隔
  • sort - 排序  字段:asc/desc
  • body - 使用Query DSL
  • scroll - 滾動查詢

 

統計查詢功能

語法同search大體同樣,但只輸出統計值linux

es.count(index='logstash-2015.08.21', q='http_status_code:500')

輸出:web

{'_shards':{'failed':0, 'successful':5, 'total':5}, 'count':17042}

 

17042 就是統計值!json

 

知識擴展

滾動demo

# Initialize the scroll
page = es.search(
    index ='yourIndex',
    doc_type ='yourType',
    scroll ='2m',
    search_type ='scan',
    size =1000,
    body ={
    # Your query's body
})
 
sid = page['_scroll_id']
scroll_size = page['hits']['total']
 
# Start scrolling
while(scroll_size >0):
    print "Scrolling..."
    page = es.scroll(scroll_id = sid, scroll ='2m')
    # Update the scroll ID
    sid = page['_scroll_id']
    # Get the number of results that we returned in the last scroll
    scroll_size = len(page['hits']['hits'])
    print "scroll size: "+ str(scroll_size)
    # Do something with the obtained page
View Code

 

以上demo實現了一次取若干數據,數據取完以後結束,不會獲取到最新更新的數據。咱們滾動完以後想獲取最新數據怎麼辦?滾動的時候會有一個統計值,如total: 5。跳出循環以後,咱們能夠用_from參數定位到5開始滾動以後的數據。後端

 

可是我用的不是這個,用的是如下方法,連接以下:數組

http://www.javashuo.com/article/p-schlwvfe-w.html服務器

 

在下面的內容中,我會詳細介紹此代碼如何使用!app

 

2、Query DSL

range過濾器查詢範圍

gt: > 大於
lt: < 小於
gte: >= 大於或等於
lte: <= 小於或等於

 

示例代碼1

"range":{
    "money":{
        "gt":20,
        "lt":40
    }
}

 

時間範圍

最近時間段

好比我要查詢最近1分鐘的

"range": {
    '@timestamp': {'gt': 'now-1m'}
}

 

最新1小時

"range": {
    '@timestamp': {'gt': 'now-1h'}
}

 

最新1天的

"range": {
    '@timestamp': {'gt': 'now-1d'}
}

 

指定時間段

那麼問題來了,它是根據當前時間來計算最近的時間。可是有些狀況下,我須要制定時間範圍,精確到分鐘

假設須要查詢早上8點到9點的數據,能夠這樣

"range": {
    '@timestamp': {
        "gt" : "{}T{}:00:00".format("2018-12-17","08"),
        "lt": "{}T{}:59:59".format("2018-12-17","09"),
        "time_zone": "Asia/Shanghai"
    }
}

 

注意:日期和小時之間,有一個字母T來間隔。不能用空格!

time_zone 表示時區,若是默認的時區不會,可能會影響查詢結果!

 

bool組合過濾器

must:全部分句都必須匹配,與 AND 相同。
must_not:全部分句都必須不匹配,與 NOT 相同。
should:至少有一個分句匹配,與 OR 相同。

 

示例代碼

{
    "bool":{
      "must":[],
      "should":[],
      "must_not":[],
    }
}

 

term過濾器

term單過濾

{
    "terms":{
      "money":20
    }
}

 

表示money包含20的記錄

 

terms複數版本

容許多個匹配條件

{
    "terms":{
      "money": [20,30]
    }
}

 

表示money包含20或者30的記錄

 

結合bool+term來舉一個實際的例子:

查詢path字段中包含applogs最近1分鐘的記錄

"bool": {
    "must": [
        {
            "terms": {
                "path": [
                    "applogs",
                ]
            }
        },
        {
            "range": {
                '@timestamp': {'gt': 'now-1m'}
            }
        }
    ]
}
View Code

 

這裏使用了terms複數版本,能夠隨時添加多個條件!

正則查詢 

{
    "regexp": {
        "http_status_code": "5.*"
    }
}

 

match查詢

match 精確匹配

{
    "match":{
      "email":"123456@qq.com"
    }
}

 

multi_match 多字段搜索

{
    "multi_match":{
      "query":"11",
      "fields":["Tr","Tq"]
    }
}

 

demo

獲取最近一小時的數據

{'query':
    {'filtered':
        {'filter':
            {'range':
                {'@timestamp':{'gt':'now-1h'}}
            }
        }
    }
}
View Code


條件過濾查詢

{
    "query":{
        "filtered":{
            "query":{"match":{"http_status_code":500}},
            "filter":{"term":{"server_name":"vip03"}}
        }
    }
}
View Code

 

Terms Facet 單字段統計

{'facets':
    {'stat':
        {'terms':
            {'field':'http_status_code',
              'order':'count',
        'size':50}
        }
    },
    'size':0
}
View Code

 

一次統計多個字段

{'facets':
    {'cip':
        {'terms':
            {'fields':['client_ip']}},
              'status_facets':{'terms':{'fields':['http_status_code'],
              'order':'term',
              'size':50}}},
        'query':{'query_string':{'query':'*'}},
    'size':0
}
View Code

 

多個字段一塊兒統計

{'facets':
    {'tag':
        {'terms':
            {'fields':['http_status_code','client_ip'],
              'size':10
           }
        }
    },
    'query':
        {'match_all':{}},
    'size':0
}
View Code

 

數據組裝

如下是kibana首頁的demo,用來統計一段時間內的日誌數量

{
  "facets": {
    "0": {
      "date_histogram": {
        "field": "@timestamp",
        "interval": "5m"
      },
      "facet_filter": {
        "fquery": {
          "query": {
            "filtered": {
              "query": {
                "query_string": {
                  "query": "*"
                }
              },
              "filter": {
                "bool": {
                  "must": [
                    {
                      "range": {
                        "@timestamp": {
                          'gt': 'now-1h'
                        }
                      }
                    },
                    {
                      "exists": {
                        "field": "http_status_code.raw"
                      }
                    },
                    # --------------- -------
                    # 此處加匹配條件
                  ]
                }
              }
            }
          }
        }
      }
    }
  },
  "size": 0
}
View Code

 

若是想添加匹配條件,在以上代碼標識部分加上過濾條件,按照如下代碼格式便可

{
"query": {
    "query_string": {"query": "backend_name:baidu.com"}
    }
},
View Code

 

先介紹到這裏,後續會有Query DSL API介紹。

 

3、需求分析

需求

下面是kibana展現的日誌

須要統計某一天的日誌,統計每個小時用戶數,要求用戶id不能重複。一個用戶id就是一個用戶,也稱之爲一個PV。

看一段message字段信息

2018-12-17 12:00:00,533 l=INFO [r=9538381535][s=2] [t=http-xxx-543] [APP=user] [Class=o.s.u.c.AccountController:1189] [Method=findCustomerByLoid]- Operation=find customer by loid,Params=loid:001,Content=start

 

其中有一個[r=9538381535],這個9538381535就是用戶id。那麼用戶登陸手機APP操做,都會帶着這個id,產生一條日誌。

好比user項目,那麼最終要的數據格式以下:

"user":{
    "00":1,
    "01":0,
    ...
    "22":3245,
    "23":765
}

 

這裏使用24小時制來表示每個時間段,有多個個用戶訪問了。注意:已經去重了用戶id,統計用戶數!

 

4、相關技術點

在放出最終代碼以前,先來介紹相關技術點,便於理解代碼。按照代碼從上到下原則,分別來介紹!

項目列表

project_list = ['user',...]

實際的項目是user,可是存儲到elasticsearch中,是userlogs,加了一個logs後綴。這個是java後端代碼定義的,便於識別!

 

判斷日期是否合法

def isVaildDate(self, date):
    try:
        if ":" in date:
            time.strptime(date, "%Y-%m-%d %H:%M:%S")
        else:
            time.strptime(date, "%Y-%m-%d")
        return True
    except:
        return False
View Code

 

由於須要統計一週的數據,因此腳本執行時,須要傳一個日期參數。那麼日期參數,傳給程序是否合法呢?須要有一個函數來判斷!

 

記錄日誌

    def write_log(self, content):
        """
        寫入日誌文件
        :param path:
        :param content:
        :return:
        """
        path = "print.log"
        with open(path, mode='a+', encoding='utf-8') as f:
            content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content + "\n"
            print(content)
            f.write(content)
View Code

 

爲啥不用Python的日誌模塊呢?由於測試發現,它寫入一些,我不想要的信息,太佔用磁盤空間了。因此,我單獨寫了一個記錄日誌方法。

 

獲取elasticsearch數據

def Get_Data_By_Body(self, project, fixed_date, hour):
    """
    獲取數據
    :param project: 項目名
    :param fixed_date: 指定日期
    :param hour: 24小時制中的某一個小時
    :return: object
    """
    # 查詢條件,查詢項目最近1小時的數據。
    doc = {
        "query": {
            "bool": {
                "must": [
                    {
                        "terms": {
                            "path": [
                                project + "logs",
                            ]
                        }
                    },
                    {
                        "range": {
                            '@timestamp': {
                                "gt": "{}T{}:00:00".format(fixed_date, hour),
                                "lt": "{}T{}:59:59".format(fixed_date, hour),
                                "time_zone": "Asia/Shanghai"
                            }
                        }
                    }
                ]
            }
        }
    }
View Code

 

因爲線上數據量過大,所以直接查詢一天的數據,會卡死。因此是切分爲每個小時查詢!

上面的query表示查詢語句,大概就是查詢指定項目(項目名+logs),1小時範圍內的數據

 

scroll獲取數據

因爲1小時內的數據量,也很大。不能直接返回!默認不指定size,是返回10條數據!

size = 1000  # 指定返回1000條
queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )

 

參數解釋:

size 指定返回的條數,默認返回10條

index 指定索引名

body 查詢語句

scroll 告訴 Elasticsearch 把搜索上下文再保持一分鐘。1m表示1分鐘

 

返回結果

mdata = queryData.get("hits").get("hits")  # 返回數據,它是一個列表類型
if not mdata:
    self.write_log('%s mdata is empty!' % project)

 

queryData 返回一個字典,那麼真正的查詢結果在queryData['hits']['hits']中,若是這個值沒有,表示沒有查詢到數據!

注意:它並非返回全部的結果,而是一頁的數據,是一個列表類型。由於咱們使用了scroll獲取數據,只返回一頁!

 

分頁數據

上面只是返回了1頁,我要全部數據,怎麼辦?須要使用分頁,先來看一下分頁公式

divmod(總條數, 每頁大小)

 

注意:divmod返回一個元祖,第一個元素,就是要分頁數

 

總條數,使用

total = queryData['hits']['total']  # 返回數據的總條數

 

每頁大小,就是上面指定的size

size = 1000  # 指定返回1000條

 

那麼遍歷每一頁數據,須要這樣

scroll_id = queryData['_scroll_id']  # 獲取scrollID
total = queryData['hits']['total']  # 返回數據的總條數

# 使用divmod設置分頁查詢
# divmod(total,1000)[0]+1 表示總條數除以1000,結果取整數加1
for i in range(divmod(total, size)[0] + 1):
    res = self.es.scroll(scroll_id=scroll_id, scroll='1m')  # scroll參數必須指定不然會報錯
    mdata += res["hits"]["hits"]  # 擴展列表

 

scroll_id給es.scroll獲取數據使用,這個參數必需要有。

因爲Python中的range是顧頭不顧尾,因此須要加1。使用for循環,就能夠遍歷每個分頁數

es.scroll(scroll_id=scroll_id, scroll='1m') 纔是真正查詢每一頁的數據,必需要指定這2個參數。它的返回結果,就是查詢結果!返回一個列表

上面的mdata是一個列表,res也是列表。所以使用+=就能夠擴展列表,獲得全部數據!

 

建立年月日目錄

def create_folder(self, fixed_date):
    """
    建立年/月/日 文件夾
    :return: path
    """

    # 系統當前時間年份
    # year = time.strftime('%Y', time.localtime(time.time()))
    # # 月份
    # month = time.strftime('%m', time.localtime(time.time()))
    # # 日期
    # day = time.strftime('%d', time.localtime(time.time()))

    # 年月日
    year, month, day = fixed_date.split("-")

    # 具體時間 小時分鐘毫秒
    # mdhms = time.strftime('%m%d%H%M%S', time.localtime(time.time()))

    # 判斷基礎目錄是否存在
    if not os.path.exists(os.path.join(self.BASE_DIR, 'data_files')):
        os.mkdir(os.path.join(self.BASE_DIR, 'data_files'))

    # 年月日
    fileYear = os.path.join(self.BASE_DIR, 'data_files', year)
    fileMonth = os.path.join(fileYear, month)
    fileDay = os.path.join(fileMonth, day)

    # 判斷目錄是否存在,不然建立
    try:
        if not os.path.exists(fileYear):
            os.mkdir(fileYear)
            os.mkdir(fileMonth)
            os.mkdir(fileDay)
        else:
            if not os.path.exists(fileMonth):
                os.mkdir(fileMonth)
                os.mkdir(fileDay)
            else:
                if not os.path.exists(fileDay):
                    os.mkdir(fileDay)

        return fileDay
    except Exception as e:
        print(e)
        return False
View Code

 

統計結果是最終寫入到一個txt裏面,那麼如何存儲呢?使用年月日目錄在區分,能夠知道這個txt文件,是屬於哪一天的。到了必定時間後,能夠按期清理,很是方便!

這裏使用的傳參方式,傳入一個日期。因此使用"-"就能夠切割出年月日

# 年月日
year, month, day = fixed_date.split("-")

 

輸出24小時

使用如下代碼就能夠實現

hour_list = ['{num:02d}'.format(num=i) for i in range(24)]

輸出:

['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23']

 

項目統計字典

須要統計每個項目的每個小時的用戶id,用戶id必須去重。既然要去重,咱們首先會想到用集合。

可是還有一個辦法,使用字典,也能夠去重。由於字典的key是惟一的。

構造24小時字典

先來構造項目user的數據,格式以下:

"basebusiness": {
    "00": {},
    "01": {},
    "02": {},
    "03": {},
    "04": {},
    "05": {},
    "06": {},
    "07": {},
    "08": {},
    "09": {},
    "10": {},
    "11": {},
    "12": {},
    "13": {},
    "14": {},
    "15": {},
    "16": {},
    "17": {},
    "18": {},
    "19": {},
    "20": {},
    "21": {},
    "22": {},
    "23": {},
}
View Code

 

這只是一個項目,實際有不少項目。因此每個字典,都有這樣的24小時數據。相關代碼以下:

project_dic = {}  # 項目統計字典
# 24小時
hour_list = ['{num:02d}'.format(num=i) for i in range(24)]

for hour in hour_list:  # 遍歷24小時
    # print("查詢{}點的數據###############".format(hour))
    self.write_log("查詢{}點的數據###############".format(hour))
    for project in project_list:  # 遍歷項目列表
        if not project_dic.get(project):
            project_dic[project] = {}  # 初始化項目字典

        if not project_dic[project].get(hour):
            project_dic[project][hour] = {}  # 初始化項目小時字典
View Code

 

這裏的每個小時,都是空字典。尚未添加數據,須要添加用戶id,下面會講到!

 

正則匹配用戶id

看這一點字符串

2018-12-17 12:00:00,533 l=INFO [r=9538381535][s=2] [t=http-xxx-543] [APP=user]

 

須要提取出9538381535,思路就是:匹配中括號內容-->提取以r=開頭的內容-->使用等號切割,獲取用戶id

匹配中括號內容

p1 = re.compile(r'[[](.*?)[]]', re.S)  # 最小匹配,匹配中括號的內容

 

注意:這裏要使用最小匹配,不能使用貪婪匹配。這一段正則,我是用網上找的,測試ok

 

提取和切割,就比較簡單了。使用startswith和split方法,就能夠了!

使用字典去重

接下來,須要將用戶id寫入到字典中,須要去重,不然字典添加時,會報錯!

那麼如何使用字典去重呢?只須要遵循一個原則便可! 有則忽略,無則添加

# 判斷字典中rid不存在時,避免字典鍵值重複
if not project_dic[project][hour].get(rid):
    project_dic[project][hour][rid] = True  # 添加值

 

生成器

這裏主要在2個方法中,使用了生成器。生成器的優勢,就是節省內容。

一處在是Get_Data_By_Body方法中,它須要返回全部查詢的數據。數據量很是大,所以必需要生成器,不然服務器內容就溢出!

還有一處,就main方法。它是返回項目的統計結果。注意,它不是最終結果。它裏面保存了每個項目,每個小時中的用戶id,是已經去重的用戶id。

數據量也是比較大,固然,沒有Get_Data_By_Body方法返回的結果大。

 

統計每個小時用戶數

main方法,返回的字典格式以下:

"user":{
    "00":{
        "242412":True,
    }
    "01":{
        "":True,
    },
    ...
    "22":{
        "457577":True,
        "546583":True,
    },
    "23":{
        "457577":True,
        "546583":True,
        "765743":True,
    }
}

 

我須要知道,每個小時的用戶數。怎麼統計呢?用2個方法

1. 遍歷字典的每個小時,使用計數器

2. 使用len方法(推薦)

 

最簡單的方法,就是使用len方法,就能夠知道每個小時有多少個key

for i in dic:  # 遍歷數據
    if not final_dic.get(i):
        final_dic[i] = {}  # 初始化字典

    for h in sorted(dic[i]):  # 遍歷項目的每個小時
        # 統計字典的長度
        final_dic[i][h] = len(dic[i][h])

 

有序字典

看下面的數據

 

能夠發現,24小時,排序是亂的。這樣給領導看時,不太美觀。因此須要對24小時進行排序!

在Python 3.6以前,字典的key是無序的。所以,須要定義一個有序字典,在寫入以前,要對字典的key作一次排序。

這樣順序寫入到有序字典以後,以後再次調用,依然是有序的!

order_dic = OrderedDict()  # 實例化一個有序字典
    final_dic = {}  # 最終統計結果
    for dic in data:  # 遍歷生成器
        for i in dic:  # 遍歷數據
            if not final_dic.get(i):
                final_dic[i] = order_dic  # 初始化字典

            # 有序字典必須先對普通字典key作排序
            for h in sorted(dic[i]):  # 遍歷項目的每個小時
                # 統計字典的長度
                final_dic[i][h] = len(dic[i][h])

 

完整代碼

#!/usr/bin/env python3
# coding: utf-8


import re
import os
import sys
import json
import time
from collections import OrderedDict
from elasticsearch import Elasticsearch

# 項目列表
project_list = ['usercenter', ['login']]


# yesterday = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")
# today = datetime.datetime.now().strftime("%Y-%m-%d")


class ElasticObj:
    def __init__(self, index_name, ip, fixed_date, timeout=3600):
        '''
        :param index_name: 索引名稱
        :param ip: elasticsearch地址
        :param timeout: 設置超時間,默認是10秒的,若是數據量很大,時間要設置更長一些
        '''
        self.index_name = index_name
        self.ip = ip
        self.timeout = timeout
        # 無用戶名密碼狀態
        # self.es = Elasticsearch([self.ip], timeout=self.timeout)
        # 用戶名密碼狀態
        # self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
        self.es = Elasticsearch([self.ip], http_auth=('elastic', '123456'), timeout=self.timeout)

        self.fixed_date = fixed_date  # 指定日期

        # 當前py文件所在的文件夾
        self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))
        self.fileDay = self.create_folder()  # 建立日誌和數據目錄

    @staticmethod
    def isVaildDate(date):
        """
        判斷日期是否合法
        :param date: 日期,好比: 2018-03-30
        :return: bool
        """
        try:
            if ":" in date:
                time.strptime(date, "%Y-%m-%d %H:%M:%S")
            else:
                time.strptime(date, "%Y-%m-%d")
            return True
        except:
            return False

    def write_log(self, content):
        """
        寫入日誌文件
        :param content: 寫入內容
        :return:
        """
        path = os.path.join(self.fileDay,"output_%s.log" %self.fixed_date)
        # path = "output_{}.log".format(self.fixed_date)
        with open(path, mode='a+', encoding='utf-8') as f:
            content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content + "\n"
            print(content)
            f.write(content)

    def Get_Data_By_Body(self, project, hour):
        """
        獲取數據
        :param project: 項目名
        :param hour: 24小時制中的某一個小時
        :return: 生成器
        """
        # doc = {'query': {'match_all': {}}}
        # 查詢條件,查詢項目最近1小時的數據。now-1h表示最近1小時
        # print(type(fixed_date))
        # print("{date}T00:00:00".format(date=fixed_date))
        # 24小時

        doc = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "terms": {
                                "path": [
                                    project + "logs",
                                ]
                            }
                        },
                        {
                            # "range": {
                            #     '@timestamp': {'gt': 'now-1m'}
                            # }
                            "range": {
                                '@timestamp': {
                                    "gt": "{}T{}:00:00".format(self.fixed_date, hour),
                                    "lt": "{}T{}:59:59".format(self.fixed_date, hour),
                                    "time_zone": "Asia/Shanghai"
                                }
                            }
                        }
                    ]
                }
            }
        }
        # queryData = self.es.search(index=self.index_name, body=doc)
        # scroll 參數告訴 Elasticsearch 把搜索上下文再保持一分鐘,1m表示1分鐘
        # size 參數容許咱們配置沒匹配結果返回的最大命中數。每次調用 scroll API 都會返回下一批結果,直到再也不有能夠返回的結果,即命中數組爲空。
        size = 1000  # 指定返回1000條
        queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )
        # print(queryData['hits']['total'])

        mdata = queryData.get("hits").get("hits")  # 返回查詢的數據,不是全部數據,而是一頁的數據,它是一個列表類型
        if not mdata:
            self.write_log('%s mdata is empty!' % project)

        # scroll_id 的值就是上一個請求中返回的 _scroll_id 的值
        scroll_id = queryData['_scroll_id']  # 獲取scrollID
        total = queryData['hits']['total']  # 返回數據的總條數
        # print("查詢項目{} {}點的數據,總共有{}條".format(project,hour,total))
        self.write_log("查詢項目{} {}點的數據,總共有{}條".format(project, hour, total))

        # 使用divmod設置分頁查詢
        # divmod(total,1000)[0]+1 表示總條數除以1000,結果取整數加1
        for i in range(divmod(total, size)[0] + 1):
            res = self.es.scroll(scroll_id=scroll_id, scroll='1m')  # scroll參數必須指定不然會報錯
            mdata += res["hits"]["hits"]  # 擴展列表
            # yield mdata

        # print(mdata)
        # return mdata
        yield mdata

    def create_folder(self):
        """
        建立年/月/日 文件夾
        :return: path
        """

        # 系統當前時間年份
        # year = time.strftime('%Y', time.localtime(time.time()))
        # # 月份
        # month = time.strftime('%m', time.localtime(time.time()))
        # # 日期
        # day = time.strftime('%d', time.localtime(time.time()))

        # 年月日
        year, month, day = self.fixed_date.split("-")

        # 具體時間 小時分鐘毫秒
        # mdhms = time.strftime('%m%d%H%M%S', time.localtime(time.time()))

        # 判斷基礎目錄是否存在
        if not os.path.exists(os.path.join(self.BASE_DIR, 'data_files')):
            os.mkdir(os.path.join(self.BASE_DIR, 'data_files'))

        # 年月日
        fileYear = os.path.join(self.BASE_DIR, 'data_files', year)
        fileMonth = os.path.join(fileYear, month)
        fileDay = os.path.join(fileMonth, day)

        # 判斷目錄是否存在,不然建立
        try:
            if not os.path.exists(fileYear):
                os.mkdir(fileYear)
                os.mkdir(fileMonth)
                os.mkdir(fileDay)
            else:
                if not os.path.exists(fileMonth):
                    os.mkdir(fileMonth)
                    os.mkdir(fileDay)
                else:
                    if not os.path.exists(fileDay):
                        os.mkdir(fileDay)

            return fileDay
        except Exception as e:
            print(e)
            return False

    def main(self):
        """
        主要處理邏輯
        :return: 生成器
        """
        project_dic = {}  # 項目統計字典
        # fixed_date = datetime.datetime.strptime(fixed_date, "%Y-%m-%d")
        # strftime("%Y-%m-%d")
        # conv_date = fixed_date.strftime("%Y-%m-%d")
        # print(conv_date, type(conv_date))
        # exit()
        # now_hour = fixed_date.strftime('%H')  # 當前時間的小時
        # print(now_hour)
        # 24小時
        hour_list = ['{num:02d}'.format(num=i) for i in range(24)]
        # hour_list = ['{num:02d}'.format(num=i) for i in range(2)]

        # project="usercenter"
        # project_dic[project] = {now_hour: {}}  # 初始化字典
        for hour in hour_list:  # 遍歷24小時
            # print("查詢{}點的數據###############".format(hour))
            self.write_log("查詢{}點的數據###############".format(hour))
            for project in project_list:  # 遍歷項目列表
                if not project_dic.get(project):
                    project_dic[project] = {}  # 初始化項目字典

                if not project_dic[project].get(hour):
                    project_dic[project][hour] = {}  # 初始化項目小時字典

                mdata = self.Get_Data_By_Body(project, hour)  # 獲取數據
                for item in mdata:  # 遍歷生成器
                    for hit in item:  # 遍歷返回數據
                        # hit是一個字典
                        str1 = hit['_source']['message']  # 查詢指定字典
                        p1 = re.compile(r'[[](.*?)[]]', re.S)  # 最小匹配,匹配中括號的內容
                        for i in re.findall(p1, str1):  # 遍歷結果
                            if i.startswith('r='):  # 判斷以r=開頭的
                                rid = i.split("=")[1]  # 獲取rid
                                # print("rid",rid)
                                # 判斷字典中rid不存在時,避免字典鍵值重複
                                if not project_dic[project][hour].get(rid):
                                    project_dic[project][hour][rid] = True  # 添加值

            time.sleep(1)  # 休眠1秒鐘

        # return project_dic
        yield project_dic


if __name__ == '__main__':
    # fixed_date = "2018-12-16"
    fixed_date = sys.argv[1]  # 日期參數
    if not ElasticObj.isVaildDate(fixed_date):
        print("日期不合法!")
        exit()

    startime = time.time()  # 開始時間

    index_name = "common-*"
    es_server = "192.168.92.131"

    obj = ElasticObj(index_name, es_server, fixed_date)  # 鏈接elasticsearch
    print("正在查詢日期%s這一天的數據" % fixed_date)
    obj.write_log("###########################################")
    obj.write_log("正在查詢日期%s這一天的數據" % fixed_date)

    data = obj.main()
    # print("初步結果",data)

    # fileDay = obj.create_folder()  # 建立目錄
    # if not fileDay:
    #     # print("建立目錄失敗!")
    #     obj.write_log("建立目錄失敗!")
    #     exit()

    order_dic = OrderedDict()  # 實例化一個有序字典
    final_dic = {}  # 最終統計結果
    for dic in data:  # 遍歷生成器
        for i in dic:  # 遍歷數據
            if not final_dic.get(i):
                final_dic[i] = order_dic  # 初始化字典

            # 有序字典必須先對普通字典key作排序
            for h in sorted(dic[i]):  # 遍歷項目的每個小時
                # 統計字典的長度
                final_dic[i][h] = len(dic[i][h])

    # print("最終結果",final_dic)  # 統計結果
    obj.write_log("最終結果執行完畢!")

    # 寫入文件
    with open(os.path.join(obj.fileDay, 'access_data.txt'), encoding='utf-8', mode='a') as f:
        f.write(json.dumps(final_dic) + "\n")

    endtime = time.time()
    take_time = endtime - startime

    if take_time < 1:  # 判斷不足1秒時
        take_time = 1  # 設置爲1秒
    # 計算花費時間
    m, s = divmod(take_time, 60)
    h, m = divmod(m, 60)

    # print("本次花費時間 %02d:%02d:%02d" % (h, m, s))
    obj.write_log("統計日期%s這一天的數據完成!請查閱data_files目錄的日誌和數據文件" % fixed_date)
    obj.write_log("本次花費時間 %02d:%02d:%02d" % (h, m, s))
View Code

 

 

日誌文件和數據文件,都在年月日目錄裏面!

 

本文參考連接:

http://www.cnblogs.com/letong/p/4749234.html

http://www.linuxyw.com/790.html

http://www.javashuo.com/article/p-schlwvfe-w.html

相關文章
相關標籤/搜索