記錄一次kafka consumer 將消息寫入hdfs的經歷

1.大致的一個業務需求:linux

    Logstash吐出來的消息做爲kafka producer,每小時大約有百萬條記錄,kafka consumer 使用springboot 配置kafka,根據業務需求按照topic 日期 小時 這種結構 寫入到Hdfsspring

2.遇到的問題:1.開始的時候每條記錄寫入 設置到 hdfs的create 以及append ,每秒大約能夠消費 1000條記錄,這種消費速度形成kafka消息的堆積,LAG一直很大,特別是有的topic能夠到達數萬條;2.在某一天,Kafka進程忽然死掉,也沒有報錯日誌。springboot

3.解決問題:服務器

    Kafka進程忽然死掉, 在linux服務器上 使用命令  app

nohup top -p kafka pid -b  >> out.txt &   fetch

將服務器的內存信息等 輸出到 指定文件中,發如今kafka進程掛掉的前一秒,free 可以使用的內存還有150m 。初步判斷是服務器內存不足。而後 看另外一臺的服務器 buff/cache 內存很高。而後查資料釋放這臺服務器的cache內存。將kafak consumer遷移到這臺服務器,目前在觀察;後續輸出kafka consumer的堆棧信息優化

   Kafka consumer 業務邏輯進行優化,每次拉取 1000條數據,手動提交offset 以及 按批寫入Hdfs。目前的問題是,在 kafka的offset 的LAG很小或者爲0的時候,每次拉取的數據不固定,基本不超過500,針對該問題 正在處理中spa

4.總結:觀察 進程的佔用服務器的資源; Java進程的堆棧信息;對於寫Hdfs 單次寫入 和批量寫入的速度對比;Kafka Consumer的配置以及坑點日誌

spring.kafka.consumer.auto-commit-interval-ms=3000
spring.kafka.consumer.max-poll-records=1200       //最大拉取消息的條數,當該條不知足的時候,根據spring.kafka.consumer.fetch-min-size來拉取[目前看來這個還有問題]
spring.kafka.consumer.fetch-min-size=900000       //最小拉取的字節數     
spring.kafka.consumer.auto-offset-reset=earliest  //針對沒有提交的offset的consumer 會從頭開始消費;已有提交的offset,從當前的offset開始消費
相關文章
相關標籤/搜索