分佈式監控系統開發【day38】:報警閾值程序邏輯解析(四)

1、計算單條表達式的結果

一、解決了什麼問題

  1. 主機
  2. 表達式
  3. 多長時間進行一次監控
  4. 拼出此服務在redis中存儲的對應key
  5. 獲取要從redis中取多長時間的數據,單位爲minute

二、代碼實現

class ExpressionProcess(object):
    '''
    load data and calc it by different method
    '''
    def __init__(self,main_ins,host_obj,expression_obj,specified_item=None):
        '''
        :param main_ins:   DataHandler 實例
        :param host_obj: 具體的host obj
        :param expression_obj:
        :return:
        計算單條表達式的結果
        '''
        self.host_obj = host_obj
        self.expression_obj = expression_obj
        self.main_ins = main_ins
        self.service_redis_key = "StatusData_%s_%s_latest" %(host_obj.id,expression_obj.service.name) #拼出此服務在redis中存儲的對應key
        self.time_range = self.expression_obj.data_calc_args.split(',')[0] #獲取要從redis中取多長時間的數據,單位爲minute

        print("\033[31;1m------>%s\033[0m" % self.service_redis_key)

2、如何拿到精確的數據

一、功能以下

一、我取出6個數據(下面的+60是默認多取一分鐘數據,寧多勿少,多出來的後面會去掉)
二、approximate_data_range存的是大概的數據,要拿到精確的,我判斷一下
三、把數據集合交給不一樣的方法去處理了
四、根據監控間隔去取數據,若是監控間隔改變了怎嘛辦?python

二、代碼實現

    def load_data_from_redis(self):
        '''load data from redis according to expression's configuration'''
        time_in_sec = int(self.time_range) * 60  #下面的+60是默認多取一分鐘數據,寧多勿少,多出來的後面會去掉
        approximate_data_points = (time_in_sec + 60) / self.expression_obj.service.interval #獲取一個大概要取的值
        #stop_loading_flag = False #循環去redis裏一個點一個點的取數據,直到變成True
        #while not stop_loading_flag:
        print("approximate dataset nums:", approximate_data_points,time_in_sec)
        data_range_raw = self.main_ins.redis.lrange(self.service_redis_key,-int(approximate_data_points),-1)
        #print("\033[31;1m------>%s\033[0m" % data_range)
        approximate_data_range = [json.loads(i.decode()) for i in data_range_raw]
        data_range = [] #精確的須要的數據 列表
        for point  in approximate_data_range:
            #print('bread point:', point)
            val,saving_time = point
            if time.time() - saving_time < time_in_sec :#表明數據有效
                data_range.append(point)
                #print("service index key:",self.expression_obj.service_index.key)
                #print(point)
                '''if val: #確保數據存在
                    if 'data' not in val:#表明這個dict沒有sub_dict
                        print("\033[44;1m%s\033[0m" %val[self.expression_obj.service_index.key])
                        #如何處理這些數據 呢? 是求avg(5), hit(5,3)....? 看來只能把數據集合交給不一樣的方法去處理了
                        #self.process(self.)
                        #data_range.append(
                    else: #像disk , nic這種有多個item的數據
                        for k,v in val['data'].items():
                            print("\033[45;1m%s, %s\033[0m" %(k,v))
                            print("\033[45;1m%s, %s\033[0m" %(k,v[self.expression_obj.service_index.key]))
                '''
            #else:
            #    print("data is invalid")


        print(data_range)
        return data_range

3、算出單條expression表達式的結果

一、功能以下

一、按照用戶的配置把數據 從redis裏取出來了, 好比 最近5分鐘,或10分鐘的數據redis

二、確保上面的條件 有正確的返回express

二、代碼實現

    def process(self):
        """算出單條expression表達式的結果"""
        data_list = self.load_data_from_redis() #已經按照用戶的配置把數據 從redis裏取出來了, 好比 最近5分鐘,或10分鐘的數據
        data_calc_func = getattr(self,'get_%s' % self.expression_obj.data_calc_func)
        #data_calc_func = self.get_avg...
        single_expression_calc_res = data_calc_func(data_list) #[True,43,None]
        print("---res of single_expression_calc_res ",single_expression_calc_res)
        if single_expression_calc_res: #確保上面的條件 有正確的返回
            res_dic = {
                'calc_res':single_expression_calc_res[0],
                'calc_res_val':single_expression_calc_res[1],
                'expression_obj':self.expression_obj,
                'service_item':single_expression_calc_res[2],
            }

            print("\033[41;1msingle_expression_calc_res:%s\033[0m" % single_expression_calc_res)
            return res_dic
        else:
            return False

4、如何獲取網卡的平均值

一、解決了什麼問題

一、監控了特定的指標,好比有多個網卡,但這裏只特定監控eth0,就是監控這個特定指標,match上了json

二、在這裏判斷是否超越閾值app

  多是因爲最近這個服務沒有數據彙報過來,取到的數據爲空,因此沒辦法 判斷閾值code

三、監控這個服務的全部項, 好比一臺機器的多個網卡, 任意一個超過了閾值,都算是問題的blog

  1. 後面的循環不用走了,反正 已經成立了一個了
  2. 能走到這一步,表明 上面的循環判段都未成立

二、代碼實現

    def get_avg(self,data_set):
        '''
        return average value of given data set
        :param data_set:
        :return:
        '''
        clean_data_list = []
        clean_data_dic = {}
        for point in data_set:
            val,save_time = point
            #print('---point:>', val)

            if val:
                if 'data' not in val:#沒有子dict
                    clean_data_list.append(val[self.expression_obj.service_index.key])

                else: #has sub dict
                    for k,v in val['data'].items():
                        if k not in clean_data_dic:
                            clean_data_dic[k]=[]

                        clean_data_dic[k].append(v[self.expression_obj.service_index.key])

        if clean_data_list:
            clean_data_list = [float(i) for i in clean_data_list]
            #avg_res = 0 if sum(clean_data_list) == 0 else  sum(clean_data_list)/ len(clean_data_list)
            avg_res = sum(clean_data_list)/ len(clean_data_list)
            print("\033[46;1m----avg res:%s\033[0m" % avg_res)
            return [self.judge(avg_res), avg_res,None]
            #print('clean data list:', clean_data_list)
        elif clean_data_dic:
            for k,v in clean_data_dic.items():
                clean_v_list = [float(i) for i in v]
                avg_res = 0 if sum(clean_v_list) == 0 else sum(clean_v_list) / len(clean_v_list)
                print("\033[46;1m-%s---avg res:%s\033[0m" % (k,avg_res))
                if self.expression_obj.specified_index_key:#監控了特定的指標,好比有多個網卡,但這裏只特定監控eth0
                    if k == self.expression_obj.specified_index_key:#就是監控這個特定指標,match上了
                        #在這裏判斷是否超越閾值
                        print("test res [%s] [%s] [%s]=%s") %(avg_res,
                                                            self.expression_obj.operator_type,
                                                            self.expression_obj.threshold,
                                                            self.judge(avg_res),
                                                            )
                        calc_res = self.judge(avg_res)
                        if calc_res:
                            return  [calc_res,avg_res,k] #後面的循環不用走了,反正 已經成立了一個了
                else:#監控這個服務 的全部項, 好比一臺機器的多個網卡, 任意一個超過了閾值,都 算是有問題的
                    calc_res = self.judge(avg_res)
                    if calc_res:
                        return [calc_res,avg_res,k]
                print('specified monitor key:',self.expression_obj.specified_index_key)
                print('clean data dic:',k,len(clean_v_list), clean_v_list)
            else: #能走到這一步,表明 上面的循環判段都未成立
                return [False,avg_res,k]
        else:#多是因爲最近這個服務 沒有數據 彙報 過來,取到的數據 爲空,因此沒辦法 判斷閾值
            return [False,None,None]

三、監控這個服務的全部項, 好比一臺機器的多個網卡, 任意一個超過了閾值,都算是有問題的

    def judge(self,calculated_val):
        '''
        determine whether the index has reached the alert benchmark
        :param calculated_val: #已經算好的結果,多是avg(5) or ....
        :return:
        '''
        #expression_args = self.expression_obj.data_calc_args.split(',')
        #hit_times = expression_args[1] if len(expression_args)>1 else None
        #if hit_times:#定義了超過閾值幾回的條件
        calc_func = getattr(operator,self.expression_obj.operator_type)
        #calc_func = operator.eq....
        return calc_func(calculated_val,self.expression_obj.threshold)

四、命中次數值返回給定數據集

    def get_hit(self,data_set):
        '''
        return hit times  value of given data set
        :param data_set:
        :return:
        '''
        pass
相關文章
相關標籤/搜索