分佈式監控系統開發【day38】:報警閾值程序邏輯解析(三)

1、需求討論

一、請問如何解決延遲問題

1000臺機器,每1分鐘循環一次可是恰好第一次循環第一秒剛處理完了,結果還沒等到第二分鐘又出問題,你那必須等到第二次循環,假如我這個服務很重要必須實時知道,
每次客戶端彙報過來的同時,觸發trigger檢測,就能夠實時的實現報警反應前端

二、這樣有什麼問題?

前提是它給你彙報,若是客戶端網絡斷了,客戶端宕機了,就沒法彙報了python

三、你要確保客戶端存活的檢測機制

拿到每臺機器的全部觸發器,檢測閾值,若是超過閾值,存到redis
你不用再連redis我給你傳(從外部調用 時才用的到,爲了不重複調用 redis鏈接)redis

2、那個表達式觸發了報警?

一、開發目標爲紅框內內容

二、觸發報警函數功能

def load_service_data_and_calulating()的功能以下:
	[ iowait.avg(5) >10 and ,idle < 20 or ,mem_usage > 80% and, disk > 90% ]
	exp_res_list = [] #[ True, False.True ]

一、到redis裏取出5分鐘的值,進行平均運算,獲得的結果,與閾值【按定義的運算符】進行比較數據庫

二、拿到,每一個表達式的結果,添加到exp_res_list,爲何要拿到結果?由於不是否是最後一條express

三、拼接完整的表達式字符串 exp= "False and True and False and True"django

四、我爲何不在這裏順手就報警了,這樣不是很簡單?json

  大家學過生產消費者模型嗎?爲了解耦,另外就是異步網絡

  我要考量到報警收斂,我可能一次批量報警,因此我就單獨寫一個報警模塊app

五、將成立的觸發發送到報警隊列異步

三、實現代碼

    def load_service_data_and_calulating(self,host_obj,trigger_obj,redis_obj):
        '''
        fetching out service data from redis db and calculate according to each serivce's trigger configuration
        :param host_obj:
        :param trigger_obj:
        :param redis_obj: #從外面調用此函數時需傳入redis_obj,以減小重複鏈接
        :return:
        '''
        #StatusData_1_LinuxCPU_10mins
        self.redis = redis_obj
        calc_sub_res_list= [] #先把每一個expression的結果算出來放在這個列表裏,最後再統一計算這個列表
        positive_expressions = [] #報警的時候,讓用戶知道,那些條件致使觸發報警器成立
        expression_res_string = '' #最終拼成的表達式運算字符串
        for expression in trigger_obj.triggerexpression_set.select_related().order_by('id'):
            print(expression,expression.logic_type)
            expression_process_obj = ExpressionProcess(self,host_obj,expression)
            single_expression_res = expression_process_obj.process() #獲得單條expression表達式的結果
            if single_expression_res:
                calc_sub_res_list.append(single_expression_res)
                if single_expression_res['expression_obj'].logic_type: #不是最後一條
                    expression_res_string += str(single_expression_res['calc_res']) + ' ' + \
                                             single_expression_res['expression_obj'].logic_type + ' '
                else:
                    expression_res_string += str(single_expression_res['calc_res']) + ' '

                #把全部結果爲True的expression提出來,報警時你得知道是誰出問題致使trigger觸發了
                if single_expression_res['calc_res'] == True:
                    single_expression_res['expression_obj'] = single_expression_res['expression_obj'].id #要存到redis裏,數據庫對象轉成id
                    positive_expressions.append(single_expression_res)
            #else: #single expression不成立,隨便加個東西,別讓程序出錯,這個地方我以爲是個bug
            #    expression_res_string += 'None'
        print("whole trigger res:", trigger_obj.name,expression_res_string)
        if expression_res_string:
            trigger_res = eval(expression_res_string)
            print("whole trigger res:", trigger_res )
            if trigger_res:#終於走到這一步,該觸發報警了
                print("##############trigger alert:",trigger_obj.severity,trigger_res)
                self.trigger_notifier(host_obj,trigger_obj.id, positive_expressions,msg=trigger_obj.name) #msg 須要專門分析後生成, 這裏是臨時寫的

3、故障持續了多久?

一、開發目標

二、功能以下:

一、鏈接上 redis鏈接,雙方已是生產這消費者模式

二、因此我要把傳送接口約定好,內容包括

  1. host_id:那臺主機
  2. trigger_id:哪個觸發器,
  3. positive_expressions:哪個表達式觸發的報警
  4. msg:消息
  5. time: 何時報警的
  6. start_time:什麼時間開始的
  7. duration:故障持續多久了

 

三、發不到訂閱的頻道TRIGGER_CHAN = 'trigger_event_channel'

四、爲何是pickle?

  1. 個人ositive_expressions裏面存的是什麼東西,是實例
  2. redis裏面確定存不成實例,因此我以字符串的形式存進去,json序列化不了實例

五、先把以前的trigger加載回來,獲取上次報警的時間,以統計 故障持續時間
六、同時在redis中記錄這個trigger , 前端頁面展現時要統計trigger 個數
七、一個trigger 記錄5分鐘後會自動清除, 爲了在前端統計trigger個數用的

三、代碼實現

    def trigger_notifier(self,host_obj,trigger_id, positive_expressions,redis_obj=None,msg=None):
        '''
        all the triggers alerts need to be published through here
        :param host_obj:
        :param trigger_id:
        :param positive_expressions: it's list, contains all the expression has True result
        :param redis_obj:
        :return:
        '''

        #alert.sendmail(msg )
        #alert.sendsms(msg)
        if redis_obj: #從外部調用 時才用的到,爲了不重複調用 redis鏈接
            self.redis = redis_obj
        print("\033[43;1mgoing to send alert msg to trigger queue............\033[0m")
        print('trigger_notifier argv:',host_obj,trigger_id, positive_expressions,redis_obj)
        #
        msg_dic = {'host_id':host_obj.id,
                   'trigger_id':trigger_id,
                   'positive_expressions':positive_expressions,
                   'msg':msg,
                   'time': time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()),
                   'start_time':time.time() ,
                   'duration':None
                   }
        self.redis.publish(self.django_settings.TRIGGER_CHAN, pickle.dumps(msg_dic))

        #先把以前的trigger加載回來,獲取上次報警的時間,以統計 故障持續時間
        trigger_redis_key = "host_%s_trigger_%s" % (host_obj.id, trigger_id)
        old_trigger_data = self.redis.get(trigger_redis_key)
        print("old_trigger_data",old_trigger_data)
        if old_trigger_data:
            old_trigger_data = old_trigger_data.decode()
            trigger_startime = json.loads(old_trigger_data)['start_time']
            msg_dic['start_time'] = trigger_startime
            msg_dic['duration'] = round(time.time() - trigger_startime)


        #同時在redis中紀錄這個trigger , 前端頁面展現時要統計trigger 個數

        self.redis.set(trigger_redis_key, json.dumps(msg_dic), 300) #一個trigger 紀錄 5分鐘後會自動清除, 爲了在前端統計trigger個數用的
相關文章
相關標籤/搜索