Kafka運維填坑(轉)

  • 前提: 只針對Kafka 0.9.0.1版本;
  • 說是運維,其實偏重於問題解決;
  • 大部分解決方案都是google而來, 我只是做了次搬運工;
  • 有些問題的解決方案未必必定是通用的, 若應用到線上請慎重;
  • 若有疏漏之處, 歡迎你們批評指正;
  • 列表:
    1. Replica沒法從leader同步消息
    2. Broker到zk集羣的鏈接不時會斷開重斷
    3. Broker重啓耗時好久
    4. 不容許髒主選舉致使Broker被強制關閉
    5. Replica從錯誤的Partition leader上去同步數據
    6. __consumer_offsets日誌沒法被清除
    7. GC問題
    8. zk和kafka部署
    9. 監控很重要
    10. 大量異常: Attempted to decrease connection count for address with no connections
    11. 新版sdk訪問較舊版的kafka, 發送kafka不支持的request
    12. 頻繁FullGC
    13. 機器Swap使用

Replica沒法從leader同步消息
  • 現象: 集羣上某topic原來只有單複本, 增長雙複本後,發現有些partition沒有從leader同步數據,致使isr列表中一直沒有新增的replica;
  • 日誌分析:
[2017-09-20 19:37:05,265] ERROR Found invalid messages during fetch for partition [xxxx,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread) [2017-09-20 19:37:05,458] ERROR Found invalid messages during fetch for partition [xxxx,75] offset 1501373 error Message found with corrupt size (0) in shallow iterator (kafka.server.ReplicaFetcherThread) [2017-09-20 19:37:07,455] ERROR [ReplicaFetcherThread-0-5], Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: error processing data for partition [xxxx,87] offset 1503346 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:147) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:122) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:122) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:120) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbractFeherThread.scala:120) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:93) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Caused by: java.lang.RuntimeException: Offset mismatch: fetched offset = 1503346, log end offset = 1503297. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:110) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:138) 
  • 解決:
    1. Kafka 0.9.0.1版本的bug: ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message
    2. 升級版本 或者 按上面連接中Reporter給出的簡單修復來避開這個問題;
  • 深究:
    這個bug被觸發實際是上下面這個致使:
    ERROR Found invalid messages during fetch for partition [qssnews_download,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread)
    當時觸發這個bug的時, 恰逢相應的broker機器上硬盤出現了多個壞塊, 但不能徹底肯定這個crc錯誤跟這個有關.這個也有個Kafka的issue: Replication issues
Broker到zk集羣的鏈接不時會斷開重斷
  • 現象: broker不時地和zk從新創建session;
  • 日誌分析: broker日誌裏報zk鏈接超時或不能從zk讀取任何數據
  • 解決: 增長broker的zk的session timeout時間, 不能徹底解決,但會改善不少;
  • 深究:
    1. 目前用的kafka集羣仍是相對比較穩定, 可是這個zk超時問題真是百思不得其解啊.
      broker在啓動時會在zk上註冊一個臨時節點,表時本身已上線, 一旦session超時,此臨時節點將被刪除, 至關於此broker下線, 必然引發整個集羣的抖動,可參考KafkaController分析8-broker掛掉
    2. zk爲什麼會timeout, 根本緣由未能準肯定位,目前看到跟諸多因素有關,好比磁盤IO, CPU負載, GC等等吧;
Broker重啓耗時好久
  • 現象: broker重啓下分耗時
  • 日誌分析: 重啓時加載全部的log segments, rebuild index;
  • 解決: 應該是stop時, 沒有優雅的shutdown, 直接 kill -9致使;
  • 深究:
    1. 中止broker服務請使用kafka自己提供的腳本優雅shutdown;
    2. 在shutdown broker時確保相應的zk集羣是可用狀態, 不然可能沒法優雅地shutdown broker.
不容許髒主選舉致使Broker被強制關閉
  • 現象: 監控到集羣中某臺broker掛掉
  • 日誌分析:
    [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log truncation is not allowed for topic test, Current leader 1's latest offset 0 is less than replica 2's latest offset 151 (kafka.server.ReplicaFetcherThread)
  • 解決: 其實是設置了unclean.leader.election.enable=false, 而後走到了代碼裏下面這段邏輯
if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("...") Runtime.getRuntime.halt(1) } 

調用Runtime.getRuntime.halt(1)直接暴力退出了.
可參考Kafka issue: Unclean leader election and "Halting because log truncation is not allowed"php

Replica從錯誤的Partition leader上去同步數據
  • 現象: 集羣裏若干臺機器前後磁盤空間報警, 經查是kafka log佔用大量磁盤空間,接着看log, 裏面有大量的
WARN [Replica Manager on Broker 3]: While recording the replica LEO, the partition [orderservice.production,0] hasn't been created. (kafka.server.ReplicaManager) 

css

ERROR [ReplicaFetcherThread-0-58], Error for partition [reptest,0] to broker 58:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) 
  • 日誌分析:
    從上面的日誌結合當前topic的partiton的複本和isr狀況,可知是錯誤的replica從錯誤的partition leader上去同步數據了, 這理論上不該該啊;
    1. 以前每一個集羣因硬件緣由掛掉了一臺機器, 而後想刪掉上面的一個partiton, 但由於kafka自己不支持partiton的刪除, 就在zk上的/brokers/[topic]節點的內容裏直接去掉了這個partiton的信息, 可是kafka controller並不會處理partiton減小的狀況, 可參考KafkaController分析
    2. 爲了觸發這個topic的partition的刪除, 我又遷移了其餘的partiton;
    3. 而後還刪除了zk上的/controller臨時節點;
    4. 最後連本身都暈了;
    5. 而後以前壞的機器修好又上線了, 而後問題出現了;
  • 解決: 將broker都重啓了一遍;
  • 深究:
    1. 最終緣由沒有徹底確認, 發現問題的時候以前的kafka debug log被刪除了;
    2. kafka 上有類以的issue: can't create as many partitions as brokers exists
    3. 儘可能不要手動更新zk上的kafka相關節點內容;
    4. 考慮在kafka源碼里加個delete partition的功能, 這個不會太難;
__consumer_offsets日誌沒法被清除
  • 現象: 集羣中若干臺機器磁盤空間報警, 上去查看是__consumer_offsets的一個partition佔用了幾十G的空間
  • 日誌分析: 以前的日誌被清理了,沒有有效的日誌了.爲了debug這個問題,我把這個partition下的index和log文件打包拷貝到了測試集羣, 而後重啓了當前的broker, 發現了下面的日誌:
[2017-09-30 10:49:36,126] ERROR [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner) java.lang.IllegalArgumentException: requirement failed: 138296566648 messages in segment __consumer_offsets-5/00000000000000000000.log but offset map can fit only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads at scala.Predef$.require(Predef.scala:219) at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584) at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580) at scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580) at kafka.log.Cleaner.clean(LogCleaner.scala:322) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) 
  • 問題分析:
    結合LogCleaner的源碼可知,是00000000000000000000.log這個logSegmentsegment.nextOffset() - segment.baseOffset大於了maxDesiredMapSize, 致使了LogClean線程的終止, 從而沒法清理, 這不該該啊?!
val segmentSize = segment.nextOffset() - segment.baseOffset
      require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can in了crease log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize)) if (map.size + segmentSize <= maxDesiredMapSize) offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) else full = true 
  • 解決: 我也沒想到其餘的好辦法, 暴力刪除了00000000000000000000.log00000000000000000000.index, 而後刪掉了cleaner-offset-checkpoint中相關的項,重啓broker, 日誌開始了壓縮清理
  • 深究:
    這個logSegmentsegment.nextOffset() - segment.baseOffset大於了maxDesiredMapSize, 猜想是有個業務是手動提交offset到這個partition, 沒有控制好,致使每秒能提交8,9MByte上來;
GC問題
  • 現象: 集羣報警某臺broker down, 在zk上無此broker節點的註冊信息
  • 日誌分析:
    1. 看broker日誌裏報zk鏈接超時或不能從zk讀取任何數據, 其實和上面的Broker到zk集羣的鏈接不時會斷開重斷現象是同樣的;
    2. 看broker的gc日誌, 對應時間gc耗時很長, 致使stop the world,broker全部線程都中止工做, 天然也沒法與zk保持心跳;
  • 解決: 暫時無解決方案, GC是個大麻煩, 網上也搜了一圈, 沒找到有效的解決方案, 我的水平有限, 哪位大神有什麼好的方法, 能夠留言給我,謝謝~
  • 補充: 關於GC這個找到了莊博士的這個視頻,能夠參考下OS 形成的長時間非典型 JVM GC 停頓:深度分析和解決
  • GC慢,引發的STW會致使不少問題, 咱們還遇到了他致使的OOM, Listen隊列被打滿
zk和kafka部署
  • zk和kafka broker 若是部署在同一臺機器上, 請儘可能將各自的data和log路徑放在不一樣的磁盤, 避免磁盤io的競爭;
  • kafka對zk的波動很敏感, 所以zk最好是單獨部署,保證其穩定運行;
  • 對zk不要有大量的寫入操做, zk的寫操做最後都會轉移動leader上zk;
  • 若是採用了zk和broker是混部的方式,而且還有大量的zk寫入操做,好比使用較舊版本的storm,其提交offset到zk上, 致使zk的IO較高, 在啓動zk時能夠加上zookeeper.forceSync=no, 下降寫盤IO, 這個配置有其反作用, 在線上使用時還需慎重;
監控很重要
  • 實時監控: 在集羣上創建一個專門的topic, 監控程序實時的寫入數據, 但沒法寫入或寫入耗時達到閾值時報警, 這個實時監控真的真好用,基本上都第一時間發現問題;
  • 基礎監控: cpu, 磁盤IO, 網卡流量, FD, 鏈接數等;
  • Topic流量監控: 監控topic的生產和消費流量, 特別是流量突增的狀況, 快速找出害羣之馬, 能夠經過kafka的jmx來獲取相關的數據, 使用Grafana來顯示和報警;
大量異常: Attempted to decrease connection count for address with no connections
  • 現象: 集羣中某臺broker所在機器磁盤報警, 查看是server.log很大;
  • 日誌分析: 日誌裏在刷大量的以下log:
[2016-10-13 00:00:00,495] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /xxx.xxx.xxx.xxx at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:59) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564) at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450) at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:445) at java.lang.Thread.run(Thread.java:745) 
新版sdk訪問較舊版的kafka, 發送kafka不支持的request
  • 現象: 日誌裏有大量以下日誌:
[2017-10-12 16:52:38,141] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68) at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:421) at java.lang.Thread.run(Thread.java:745) 
  • 分析:
    1. 當前用的kafka版本爲0.9.0.1, 支持的request最大id爲16, 這個18是新版 kafka中的ApiVersion Request, 所以會拋這個異常出來;
    2. 跟了一下代碼, 在SocketServer中:
try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) } 

在處理Request時並未處理這個異常,致使這個異常被其外層的try...catch...處理, 直接進入了下一輪的selector.poll(300), 而在這個selector.poll(300)中會清理以前全部的接收到的Requests, 這就致使在這種狀況下,可能會漏處理一些Request, 這樣看起來仍是個比較嚴重的問題;html

  • 解決:
    1. 一個簡單修復:
selector.completedReceives.asScala.foreach { receive => var isClose = false try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) case e : ArrayIndexOutOfBoundsException => error("NotSupport Request | Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) } if (!isClose) { selector.mute(receive.source) } } 
  1. Kafka上也有相關的Broker does not disconnect client on unknown request, 這個修復內容比較多.
頻繁FullGC
  • 現象: Kafka broker中止工做, 日誌無輸出,整個進程Hang住;
  • 分析: 查看kafkaServer-gc.log, 有FullGC log, 內存沒法回收, 考慮是存在內存泄漏
    咱們找到了 SocketServer inflightResponses collection leaks memory on client disconnect: inflightResponses會緩存住須要發送但尚未發送完成的response, 這個response又同時持有其對應的request的引用, 訪問請求量大的時候其內存佔用很多.
    對於inflightResponses0.9.0.1代碼中只在completedSends中做了remove, 在disconnectedclose中沒有處理;
  • 修復:
    1. 最暴力的,能夠直接將這個inflightResponses變量去掉, 但這會有個反作用,會影響到Metrics的統計;
    2. 優雅的,能夠參考最新的kafka代碼, 在disconnectedclose也加入移除的操做;
機器Swap使用
  • 使用大內存的機器,而且禁用掉swap

Kafka源碼分析-彙總

做者:掃帚的影子 連接:https://www.jianshu.com/p/d2cbaae38014 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。
相關文章
相關標籤/搜索