SSM(十六) 曲線救國-Kafka消費異常

封面
封面

前言

最近線上遇到一個問題:在消費kafka消息的時候若是長時間(大概半天到一天的時間)隊列裏沒有消息就可能再也消費不了。針對這個問題咱們反覆調試屢次。線下模擬,調整代碼,但貌似仍是沒有找到緣由。可是隻要重啓消費進程就又能夠繼續消費。java

解決方案

因爲線上業務很是依賴kafka的消費,但一時半會也沒有找到緣由,因此最後只能想一個臨時的替換方案:git

基於重啓就能夠消費這個特色,咱們在每次消費的時候都記下當前的時間點,當這個時間點在十分鐘以內都沒有更新咱們就認爲當前隊列中沒有消息了,就須要重啓下消費進程。github

既然是須要重啓,因爲目前尚未上分佈式調度中心因此須要crontab來配合調度:每隔一分鐘會調用一個shell腳本,該腳本會判斷當前進程是否存在,若是存在則什麼都不做,不存在則啓動消費進程。web

具體實現

消費程序:shell

/** * kafka消費 * * @author crossoverJie * @date 2017年6月19日 下午3:15:16 */
public class KafkaMsgConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMsgConsumer.class);

    private static final int CORE_POOL_SIZE = 4;
    private static final int MAXIMUM_POOL_SIZE = 4;
    private static final int BLOCKING_QUEUE_CAPACITY = 4000;
    private static final String KAFKA_CONFIG = "kafkaConfig";
    private static final ExecutorService fixedThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(BLOCKING_QUEUE_CAPACITY));

    //最後更新時間
    private static AtomicLong LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());

    private static MsgIterator iter = null;
    private static String topic;//主題名稱

    static {
        Properties properties = new Properties();
        String path = System.getProperty(KAFKA_CONFIG);
        checkArguments(!StringUtils.isBlank(path), "啓動參數中沒有配置kafka_easyframe_msg參數來指定kafka啓動參數,請使用-DkafkaConfig=/path/fileName/easyframe-msg.properties");
        try {
            properties.load(new FileInputStream(new File(path)));
        } catch (IOException e) {
            LOGGER.error("IOException" ,e);
        }
        EasyMsgConfig.setProperties(properties);

    }

    private static void iteratorTopic() {
        if (iter == null) {
            iter = MsgUtil.consume(topic);
        }
        long i = 0L;
        while (iter.hasNext()) {
            i++;
            if (i % 10000 == 0) {
                LOGGER.info("consume i:" + i);
            }
            try {
                String message = iter.next();
                if (StringUtils.isEmpty(message)) {
                    continue;
                }
                LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());

                //處理消息
                LOGGER.debug("msg = " + JSON.toJSONString(message));
            } catch (Exception e) {
                LOGGER.error("KafkaMsgConsumer err:", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    LOGGER.error("Thread InterruptedException", e1);
                }
                break;
            }
        }
    }

    public static void main(String[] args) {
        topic = System.getProperty("topic");
        checkArguments(!StringUtils.isBlank(topic), "system property topic or log_path is must!");
        while (true) {
            try {
                iteratorTopic();
            } catch (Exception e) {
                MsgUtil.shutdownConsummer();
                iter = null;

                LOGGER.error("KafkaMsgConsumer err:", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    LOGGER.error("Thread InterruptedException", e1);
                }
            } finally {
                //此處關閉以後,由crontab每分鐘檢查一次,掛掉的話會從新拉起來
                if (DateUtil.getLongTime() - LAST_MESSAGE_TIME.get() > 10 * 60) { //10分鐘
                    fixedThreadPool.shutdown();
                    LOGGER.info("線程池是否關閉:" + fixedThreadPool.isShutdown());
                    try {
                        //當前線程阻塞10ms後,去檢測線程池是否終止,終止則返回true
                        while (!fixedThreadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                            LOGGER.info("檢測線程池是否終止:" + fixedThreadPool.isTerminated());
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("等待線程池關閉錯誤", e);
                    }
                    LOGGER.info("線程池是否終止:" + fixedThreadPool.isTerminated());
                    LOGGER.info("in 10 min dont have data break");
                    break;
                }
            }
        }
        LOGGER.info("app shutdown");
        System.exit(0);
    }

}複製代碼

在線代碼tomcat

須要配合如下這個shell腳本運行:bash

#!/bin/sh
 #crontab
# * * * * * sh /data/schedule/kafka/run-kafka-consumer.sh >>/data/schedule/kafka/run-sms-log.log
 # 若是進程存在就不啓動
a1=`ps -ef|grep 'KafkaMsgConsumer'|grep -v grep|wc -l`
if [ $a1 -gt 0  ];then
        echo "=======     `date +'%Y-%m-%d %H:%M:%S'` KafkaMsgConsumer  is EXIT...=======     "
        exit
fi
LANG="zh_CN.UTF-8"
nohup /opt/java/jdk1.7.0_80/bin/java -d64 -Djava.security.egd=file:/dev/./urandom
-Djava.ext.dirs=/opt/tomcat/webapps/ROOT/WEB-INF/lib
-Dtopic=TOPIC_A
-Dlogback.configurationFile=/data/schedule/kafka/logback.xml
-DkafkaConfig=/opt/tomcat/iopconf/easyframe-msg.properties
-classpath /opt/tomcat/webapps/ROOT/WEB-INF/classes com.crossoverJie.kafka.SMSMsgConsumer >> /data/schedule/kafka/smslog/kafka.log 2>&1 &

echo "`date +'%Y-%m-%d %H:%M:%S'`  KafkaMsgConsumer running...."複製代碼

在線代碼app

再配合crontab的調度:dom

* * * * * sh /data/schedule/kafka/run-kafka-consumer.sh >>/data/schedule/kafka/run-sms-log.log複製代碼

便可。webapp

總結

雖然說處理起來很簡單,但依然是治標不治本,依賴的東西比較多(shell腳本,調度)。
因此也問問各位有沒有什麼思路:

生產配置:

  • 三臺kafka、ZK組成的集羣。

其中也有其餘團隊的消費程序在正常運行,應該和kafka的配置沒有關係。

項目地址:github.com/crossoverJi…

我的博客:crossoverjie.top

相關文章
相關標籤/搜索