kafka告警簡單方案

1、前言

  爲何要設計kafka告警方案?現成的監控項目百度一下一大堆,KafkaOffsetMonitor、KafkaManager、 Burrow等,具體參考:kafka的消息擠壓監控。因爲本小組的項目使用的kafka集羣並無被公司的kafka-manager管理,因此只能本身簡單作一個告警。html

2、告警方案

  

  首先須要兩個定時任務,之間的通訊依靠延遲隊列。java

  左邊的定時任務按週期掃面配置Topic-Consumer列表,經過kafka api獲取消費詳情並判斷消息積壓量是否已經大於閾值,若是閾值校驗失敗則放入延遲隊裏。node

  右邊的定時任務按照週期從延遲隊列對應的真實隊列中取出一個Topic-Consumer關係,再次進行一下閾值的校驗,若是檢驗失敗才發送告警短信。git

3、準備工做

  一、依賴配置中心

  配置中心是實現告警方案的一個關鍵點,經過配置中心能夠動態獲取告警相關的屬性配置,並刷新對應的 java bean。以下是告警對應的配置bean。github

@ConfigCenterBean
@ConfigurationProperties(prefix = "wmhcontrol.alarm")
@Component
public class AlarmConstants extends BaseConfigCenterBean {
    private static Logger LOGGER = LoggerFactory.getLogger(AlarmConstants.class);

    //告警電話號碼
    @ConfigField
    private String[] phones;

    //短信模板ID
    @ConfigField
    private String templateId;

    //延遲時間
    @ConfigField
    private Integer delay = 600;

    //輪訓時間
    @ConfigField
    private Integer period = 60;

    //獲取topic-consumer消費詳情地址
    @ConfigField
    private String tcsr;

    //查看topic-consumer消費詳情地址
    @ConfigField
    private String tclr;

    //全局統一閾值
    @ConfigField
    private Integer threshold = 1000;

    //topic和consumer關係列表
    @ConfigField
    private List<TCR> tcrs;

    @ToInitial
    private void refreshProperties() {
        try {
            super.doBind();
            LOGGER.info(String.format("%s 刷新成功..., 當前配置=%s...", this.getModuleName(), this));
        } catch (Exception e) {
            LOGGER.error("AlarmConstants 對象屬性綁定失敗...", e);
        }
    }

    private void toRefresh() {
        try {
            if (isCfgCenterEffect()) {
                ZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource();
                if (ConfigCenterUtils.propertySourceShouldRefresh(this.getModuleName(), propertySource)) {
                    this.refreshProperties();
                }
            }
        } catch (Exception e) {
            LOGGER.error("AlarmConstants 對象屬性刷新失敗", e);
        }
    }

    @ToRefresh
    public Integer getThreshold() {
        return threshold;
    }

    public void setThreshold(Integer threshold) {
        this.threshold = threshold;
    }

    @ToRefresh
    public List<TCR> getTcrs() {
        return tcrs;
    }

    public void setTcrs(List<TCR> tcrs) {
        this.tcrs = tcrs;
    }

    @ToRefresh
    public String getTcsr() {
        return tcsr;
    }

    public void setTcsr(String tcsr) {
        this.tcsr = tcsr;
    }

    @ToRefresh
    public Integer getPeriod() {
        return period;
    }

    public void setPeriod(Integer period) {
        this.period = period;
    }

    @ToRefresh
    public Integer getDelay() {
        return delay;
    }

    public void setDelay(Integer delay) {
        this.delay = delay;
    }

    @ToRefresh
    public String[] getPhones() {
        return phones;
    }

    public void setPhones(String[] phones) {
        this.phones = phones;
    }

    @ToRefresh
    public String getTemplateId() {
        return templateId;
    }

    public void setTemplateId(String templateId) {
        this.templateId = templateId;
    }

    @ToRefresh
    public String getTclr() {
        return tclr;
    }

    public void setTclr(String tclr) {
        this.tclr = tclr;
    }

    @Override
    public String toString() {
        return ReflectionToStringBuilder.toString(this
                , ToStringStyle.JSON_STYLE
                , false
                , false
                , AlarmConstants.class);
    }

    @Override
    public String getDefaultResourcePath() {
        return "config/alarm.properties";
    }

    @Override
    public String getConfigPrefix() {
        return "wmhcontrol.alarm";
    }

    @Override
    public String getModuleName() {
        return "告警配置";
    }

    @Override
    public void refreshForEvent() {
        this.refreshProperties();
    }

    /**
     * topic 和 consumer之間的關係實體
     */
    public static class TCR {
        private String topic;
        private String consumer;
        private String channel;
        private Integer threshold;

        public String getTopic() {
            return topic;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public String getConsumer() {
            return consumer;
        }

        public void setConsumer(String consumer) {
            this.consumer = consumer;
        }

        public String getChannel() {
            return channel;
        }

        public void setChannel(String channel) {
            this.channel = channel;
        }

        public Integer getThreshold() {
            return threshold;
        }

        public void setThreshold(Integer threshold) {
            this.threshold = threshold;
        }

        @Override
        public String toString() {
            return "TCR{" +
                    "topic='" + topic + '\'' +
                    ", consumer='" + consumer + '\'' +
                    ", channel='" + channel + '\'' +
                    ", threshold=" + threshold +
                    '}';
        }
    }

    public static class TopicConsumerDetail {
        private String group;
        private String topic;
        private Integer pid;
        private Long offset;
        private Long logsize;

        @Override
        public String toString() {
            return "TopicConsumerDetail{" +
                    "group='" + group + '\'' +
                    ", topic='" + topic + '\'' +
                    ", pid=" + pid +
                    ", offset=" + offset +
                    ", logsize=" + logsize +
                    ", lag=" + lag +
                    ", owner='" + owner + '\'' +
                    '}';
        }

        private Long lag;
        private String owner;

        public String getGroup() {
            return group;
        }

        public void setGroup(String group) {
            this.group = group;
        }

        public String getTopic() {
            return topic;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public Integer getPid() {
            return pid;
        }

        public void setPid(Integer pid) {
            this.pid = pid;
        }

        public Long getOffset() {
            return offset;
        }

        public void setOffset(Long offset) {
            this.offset = offset;
        }

        public Long getLogsize() {
            return logsize;
        }

        public void setLogsize(Long logsize) {
            this.logsize = logsize;
        }

        public Long getLag() {
            return lag;
        }

        public void setLag(Long lag) {
            this.lag = lag;
        }

        public String getOwner() {
            return owner;
        }

        public void setOwner(String owner) {
            this.owner = owner;
        }

    }
}

  告警有個全局統一的閾值,每個topic能夠指定不一樣的閾值。redis

  配置中心 和 java bean創建關聯請參考:依賴配置中心實現注有@ConfigurationProperties的bean相關屬性刷新算法

  二、定時任務的週期性可動態配置

  藉助 org.springframework.scheduling.annotation.SchedulingConfigurer。spring

  由@EnableScheduling註釋的@Configuration類實現的可選接口。一般用於設置在執行計劃任務時使用的特定TaskScheduler bean,或者用於以編程方式註冊計劃任務,而不是使用@Scheduled註釋的聲明性方法。例如,在實現基於觸發器的任務時可能須要這樣作,而@Scheduled註釋不支持這些任務。編程

  基本的代碼輪廓以下。json

@Configuration
public class MessageCenterAlarmTask implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        try {
            //每5s檢測一下隊列
            taskRegistrar.addFixedRateTask(() -> {
               
            }, 5 * 1000L);

            //動態修改定時任務週期
            taskRegistrar.addTriggerTask(() -> {

            }, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));
        } catch (Exception e) {
            LOGGER.error("消息中心topic-consumer定時任務初始化失敗...", e);
        }
    }
}

  上面的代碼中的定時任務分別表示了告警方案中右邊和左邊的定時任務。

  三、延遲隊列的實現

  藉助redisson分佈式延遲隊列 或者 java delayqueue + redistemplate 實現分佈式延遲隊列。

  參考:Redisson分佈式延遲隊列官方文檔

  參考:Redisson DelayedQueue實現原理

  Redisson的集羣模式配置以下。

public class RedissonBuilder {
    private static Logger LOGGER = LoggerFactory.getLogger(RedissonBuilder.class);

    public static RedissonClient getRedisson(String cluster) {
        String[] nodes = cluster.split(",");
        for (int i = 0; i < nodes.length; i++) {
            nodes[i] = "redis://" + nodes[i];
        }

        Config config = new Config();
        config.useClusterServers() //這是用的集羣server
                .setScanInterval(2000) //設置集羣狀態掃描時間
                .setConnectTimeout(2000)
                .addNodeAddress(nodes);

        try {
            RedissonClient rc = Redisson.create(config);
            return rc;
        } catch (Exception e) {
            LOGGER.error("RedissonClient getRedisson errors...", e);
            return null;
        }
    }
}


@Configuration
public class RedissonConfig {
    private static Logger LOGGER = LoggerFactory.getLogger(RedissonConfig.class);

    @Bean
    public RedissonClient redissonClient(@Value("${redisAddress}") String cacheAddress) {
        RedissonClient rc = RedissonBuilder.getRedisson(cacheAddress);
        try {
            if (!Objects.isNull(rc)) {
                LOGGER.info(rc.getConfig().toJSON());
            }
        } catch (IOException e) {
            LOGGER.error("RedissonConfig redissonClient errors... ", e);
        }
        return rc;
    }

}

  Redisson建立延遲隊列方式

RQueue<AlarmConstants.TCR> topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");
RDelayedQueue<AlarmConstants.TCR> topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);

  首先建立目標隊列,而後經過目標隊列拿到延遲隊列。

  四、kafka api返回數據處理

  參考:簡單封裝kafka相關的api

  更具topic和consumer能夠拿到以下數據。其中Lag對應的這一列表示未消費的消息數量。

  

  須要作的是把如上數據映射到 AlarmConstants.TopicConsumerDetail 這個java bean中,藉助Spring BeanWrapperImpl,以下。

private static List<AlarmConstants.TopicConsumerDetail> retrieveDetail(String detailResponse) {
    List<AlarmConstants.TopicConsumerDetail> result = new ArrayList<>();
    try {
        Scanner scanner = new Scanner(detailResponse.replace("<pre>", StringUtils.EMPTY).replace("</pre>", StringUtils.EMPTY));
        String[] propertyNames = null;
//第一行對應java bean的field name
if (scanner.hasNextLine()) { String nameLine = scanner.nextLine(); if (StringUtils.isBlank(nameLine)) { return result; } propertyNames = Arrays.stream(nameLine.split("\\s+")) .map(propertyName -> propertyName.toLowerCase()) .toArray(String[]::new); }
//剩餘行對應java bean的field value
while (scanner.hasNextLine()) { AlarmConstants.TopicConsumerDetail tcd = new AlarmConstants.TopicConsumerDetail(); BeanWrapper br = new BeanWrapperImpl(tcd); String valueLine = scanner.nextLine(); if (StringUtils.isBlank(valueLine)) { continue; } String[] propertyValues = valueLine.split("\\s+"); for (int i = 0; i < propertyValues.length; i++) { br.setPropertyValue(propertyNames[i], propertyValues[i]); } result.add(tcd); } LOGGER.info("消息中心提取topic-consumer詳情信息:" + result); } catch (Exception e) { LOGGER.error("消息中心提取topic-consumer信息異常..." + detailResponse, e); } return result; }

  處理以後的效果以下。

[TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=0, offset=10956087, logsize=10956091, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=1, offset=10950487, logsize=10950502, lag=15, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=2, offset=10958523, logsize=10958529, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=3, offset=10955709, logsize=10955717, lag=8, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=4, offset=10956550, logsize=10956563, lag=13, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=5, offset=10956343, logsize=10956347, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=6, offset=10954124, logsize=10954128, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=7, offset=10949075, logsize=10949082, lag=7, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=8, offset=10963839, logsize=10963843, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=9, offset=10958536, logsize=10958540, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=10, offset=10955316, logsize=10955327, lag=11, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=11, offset=10957850, logsize=10957856, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=12, offset=10954508, logsize=10954515, lag=7, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=13, offset=10960468, logsize=10960477, lag=9, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=14, offset=10955540, logsize=10955544, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}]

4、告警邏輯

  一、短息發送

private String toShortMessage(AlarmConstants.TCR tcr, Long lag) {
    JSONObject info = new JSONObject();
    StringBuilder text = new StringBuilder();
    String messageDate = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
    text.append("【Topic-Consumer閾值告警 " + messageDate + "】\r\n");

    text.append("\t渠道: " + tcr.getChannel() + "\r\n");
    text.append("\t主題: " + tcr.getTopic() + "\r\n");
    text.append("\t消費: " + tcr.getConsumer() + "\r\n");
    text.append("\t閾值: " + (Objects.isNull(tcr.getThreshold()) ? alarmConstants.getThreshold() : tcr.getThreshold()) + "\r\n");
    text.append("\t堆積: " + lag + "\r\n");

    try {
        String refUrl = alarmConstants.getTclr() + "?topic=" + tcr.getTopic() + "&group=" + tcr.getConsumer();
        JSONObject params = new JSONObject();
        params.put("url", refUrl);
        String shortUrlResponse = HttpClient.post("https://dwz.cn/admin/create", params.toJSONString(), "application/json");
        LOGGER.info("獲取短連接返回內容:" + shortUrlResponse);
        if (StringUtils.isNotBlank(shortUrlResponse)) {
            JSONObject shortUrlJson = JSON.parseObject(shortUrlResponse);
            Integer code = (Integer) FastJsonUtils.search(shortUrlJson, "Code");
            if (Integer.valueOf(0).equals(code)) {
                String shortUrl = (String) FastJsonUtils.search(shortUrlJson, "ShortUrl");
                if (StringUtils.isNotBlank(shortUrl)) {
                    text.append("\t查看: " + shortUrl + "\r\n");
                }
            }
        }
    } catch (Exception e) {
        LOGGER.error("短連接生成異常...", e);
    }
    info.put("txt_name", "消息中心");
    info.put("txt_result", text.toString());

    return info.toJSONString();
}

  二、閾值校驗

private Long thresholdCheck(AlarmConstants.TCR tcr) {
    String detailUrl = alarmConstants.getTcsr() + "?topic=" + tcr.getTopic() + "&group=" + tcr.getConsumer();
    String detailResponseStr = HttpClient.get(detailUrl);
    LOGGER.info(detailUrl + " " + detailResponseStr);
    List<AlarmConstants.TopicConsumerDetail> detailResponse = retrieveDetail(detailResponseStr);

    if (CollectionUtils.isEmpty(detailResponse)) {
        return -1L;
    }
    Long lag = detailResponse.stream()
            .mapToLong(AlarmConstants.TopicConsumerDetail::getLag)
            .sum();

    Long threshold = Long.valueOf(Objects.isNull(tcr.getThreshold()) ? alarmConstants.getThreshold() : tcr.getThreshold());
    if (lag.compareTo(threshold) > 0) {
        return lag;
    }
    return -1L;
}

  三、兩個定時任務邏輯補充

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
    try {
        RQueue<AlarmConstants.TCR> topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");
        RDelayedQueue<AlarmConstants.TCR> topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);

        //每5s檢測一下隊列
        taskRegistrar.addFixedRateTask(() -> {
            AlarmConstants.TCR tcr = topicConsumerQueue.poll();
            if (!Objects.isNull(tcr)) {
                //發送告警信息
                Long lag = thresholdCheck(tcr);
                if (lag > 0) {
                    if (ArrayUtils.isNotEmpty(alarmConstants.getPhones())) {
                        String message = toShortMessage(tcr, lag);
                        String tmplateId = alarmConstants.getTemplateId();
                        LOGGER.info("消息中心告警短信內容:" + message);
                        for (String phone : alarmConstants.getPhones()) {
                            try {
                                MessageUtils.sendMessage(phone, messageUrl, message, tmplateId);
                            } catch (Exception e) {
                                LOGGER.error(String.format("消息中心告警短信發送異常...%s %s %s", phone, messageUrl, message), e);
                            }
                        }
                    }
                }
            }
        }, 5 * 1000L);

        taskRegistrar.addTriggerTask(() -> {
            RLock lock = null;
            try {
                lock = redissonClient.getLock(commonRedisKey + "TopicConsumerForEach");
                // 嘗試加鎖,最多等待5秒,上鎖之後5秒自動解鎖
                if (!lock.tryLock(5, 5, TimeUnit.SECONDS)) {
                    return;
                }
                if (!CollectionUtils.isEmpty(alarmConstants.getTcrs())) {
                    alarmConstants.getTcrs()
                            .stream()
                            .filter(tcr -> !topicConsumerDelayedQueue.contains(tcr) && (thresholdCheck(tcr) > 0))
                            .forEach(tcr -> topicConsumerDelayedQueue.offer(tcr, alarmConstants.getDelay(), TimeUnit.SECONDS));
                }
            } catch (Exception e) {
                LOGGER.error("消息中心topic-consumer定時任務執行失敗...", e);
            } finally {
                if (!Objects.isNull(lock)) {
                    lock.unlock();
                }
            }
     //動態週期性檢測Topic-Consumer閾值
        }, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));
    } catch (Exception e) {
        LOGGER.error("消息中心topic-consumer定時任務初始化失敗...", e);
    }
}

5、告警定時任務源碼

  請關注訂閱號(算法和技術SHARING),回覆:kafka告警, 即可查看。

相關文章
相關標籤/搜索