Attempted to decrease connection count for address with no connections
[2017-09-20 19:37:05,265] ERROR Found invalid messages during fetch for partition [xxxx,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread) [2017-09-20 19:37:05,458] ERROR Found invalid messages during fetch for partition [xxxx,75] offset 1501373 error Message found with corrupt size (0) in shallow iterator (kafka.server.ReplicaFetcherThread) [2017-09-20 19:37:07,455] ERROR [ReplicaFetcherThread-0-5], Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: error processing data for partition [xxxx,87] offset 1503346 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:147) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:122) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:122) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:120) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbractFeherThread.scala:120) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:93) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Caused by: java.lang.RuntimeException: Offset mismatch: fetched offset = 1503346, log end offset = 1503297. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:110) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:138)
ERROR Found invalid messages during fetch for partition [qssnews_download,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread)
[2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log truncation is not allowed for topic test, Current leader 1's latest offset 0 is less than replica 2's latest offset 151 (kafka.server.ReplicaFetcherThread)
unclean.leader.election.enable=false
, 而後走到了代碼裏下面這段邏輯if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("...") Runtime.getRuntime.halt(1) }
調用Runtime.getRuntime.halt(1)
直接暴力退出了.
可參考Kafka issue: Unclean leader election and "Halting because log truncation is not allowed"php
WARN [Replica Manager on Broker 3]: While recording the replica LEO, the partition [orderservice.production,0] hasn't been created. (kafka.server.ReplicaManager)
和css
ERROR [ReplicaFetcherThread-0-58], Error for partition [reptest,0] to broker 58:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
replica
從錯誤的partition leader
上去同步數據了, 這理論上不該該啊;
/brokers/[topic]
節點的內容裏直接去掉了這個partiton的信息, 可是kafka controller
並不會處理partiton減小的狀況, 可參考KafkaController分析 /controller
臨時節點;[2017-09-30 10:49:36,126] ERROR [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner) java.lang.IllegalArgumentException: requirement failed: 138296566648 messages in segment __consumer_offsets-5/00000000000000000000.log but offset map can fit only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads at scala.Predef$.require(Predef.scala:219) at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584) at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580) at scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580) at kafka.log.Cleaner.clean(LogCleaner.scala:322) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
LogCleaner
的源碼可知,是00000000000000000000.log
這個logSegment
的segment.nextOffset() - segment.baseOffset
大於了maxDesiredMapSize
, 致使了LogClean
線程的終止, 從而沒法清理, 這不該該啊?!val segmentSize = segment.nextOffset() - segment.baseOffset
require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can in了crease log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize)) if (map.size + segmentSize <= maxDesiredMapSize) offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) else full = true
00000000000000000000.log
和00000000000000000000.index
, 而後刪掉了cleaner-offset-checkpoint
中相關的項,重啓broker, 日誌開始了壓縮清理logSegment
的segment.nextOffset() - segment.baseOffset
大於了maxDesiredMapSize
, 猜想是有個業務是手動提交offset到這個partition, 沒有控制好,致使每秒能提交8,9MByte上來;stop the world
,broker全部線程都中止工做, 天然也沒法與zk保持心跳;GC
是個大麻煩, 網上也搜了一圈, 沒找到有效的解決方案, 我的水平有限, 哪位大神有什麼好的方法, 能夠留言給我,謝謝~zookeeper.forceSync=no
, 下降寫盤IO, 這個配置有其反作用, 在線上使用時還需慎重;Grafana
來顯示和報警;Attempted to decrease connection count for address with no connections
[2016-10-13 00:00:00,495] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /xxx.xxx.xxx.xxx at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:59) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564) at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450) at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:445) at java.lang.Thread.run(Thread.java:745)
[2017-10-12 16:52:38,141] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68) at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:421) at java.lang.Thread.run(Thread.java:745)
SocketServer
中:try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) }
在處理Request時並未處理這個異常,致使這個異常被其外層的try...catch...
處理, 直接進入了下一輪的selector.poll(300)
, 而在這個selector.poll(300)
中會清理以前全部的接收到的Requests, 這就致使在這種狀況下,可能會漏處理一些Request, 這樣看起來仍是個比較嚴重的問題;html
selector.completedReceives.asScala.foreach { receive => var isClose = false try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) case e : ArrayIndexOutOfBoundsException => error("NotSupport Request | Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) } if (!isClose) { selector.mute(receive.source) } }
inflightResponses
會緩存住須要發送但尚未發送完成的response, 這個response又同時持有其對應的request的引用, 訪問請求量大的時候其內存佔用很多.inflightResponses
0.9.0.1代碼中只在completedSends中做了remove, 在disconnected
和close
中沒有處理;inflightResponses
變量去掉, 但這會有個反作用,會影響到Metrics的統計;disconnected
和close
也加入移除的操做;