kafka_2.8.0-0.8.1java
生產環境一組kafka集羣常常發生問題,現象是kafka在zookeeper上的broker節點消失,此時kafka進程和端口都在,而後每一個broker都在報錯,主要是node
1)apache
[2017-01-09 12:40:53,832] INFO Partition [topic1,3] on broker 1361: Shrinking ISR for partition [topic1,3] from 1351,1361,1341 to 1361 (kafka.cluster.Partition)api
2)session
[2017-01-09 12:33:53,858] ERROR Conditional update of path /brokers/topics/topic2/partitions/0/state with data {"controller_epoch":20,"leader":1361,"version":1,"leader_epoch":13,"isr":[1361]} and expected version 23 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/topic2/partitions/0/state (kafka.utils.ZkUtils$)app
同時客戶端也在報錯;socket
最近的一次問題發生在2017年1月9號中午,3臺broker(134/135/136)前後從zookeeper上消失,最後服務不可用,根據日誌整理過程以下:ide
136 is controllerui
2017-01-09 12:33:33 134 zk disconnectthis
2017-01-09 12:33:33 134 zk connect
2017-01-09 12:33:36 135 zk disconnect
2017-01-09 12:33:36 136 zk disconnect[dispear]
2017-01-09 12:33:36 134 become controller
2017-01-09 12:33:37 135 zk connect
2017-01-09 12:33:42 134 zk disconnect[dispear]
2017-01-09 12:33:44 135 become controller
135 zk disconnect[dispear]
134 restart
2017-01-09 12:37:32 134 zk connect
2017-01-09 12:37:32 134 become controller
2017-01-09 12:37:39 134 zk disconnect[dispear]
135 restart
2017-01-09 12:38:14 135 zk connect[ok]
2017-01-09 12:38:14 135 become controller[ok]
134 restart
2017-01-09 12:39:41 134 zk connect[ok]
136 restart
2017-01-09 12:41:19 136 zk connect[ok]
在12點33分-12點37分服務不可用期間,3臺broker的jstack都有相同的兩個堆棧:
1)
"delete-topics-thread" prio=10 tid=0x00007f52ec06c800 nid=0x18b8 waiting on condition [0x00007f52b59dd000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000c55a2140> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source)
at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2)
"ZkClient-EventThread-12-kafka-common1.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common2.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common3.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common4.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common5.hangzhou-1.zookeeper.internal.lede.com:2182" daemon prio=10 tid=0x00007f533435f800 nid=0xbb4 waiting on condition [0x00007f531a8be000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000e41c5cf8> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
at java.util.concurrent.CountDownLatch.await(Unknown Source)
at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
at kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
先看第二個堆棧,Kafka controller在集羣中只有一個,能夠經過zk上的/controller查看當前controller,controller會啓動一個DeleteTopicsThread,同時會註冊一個SessionExpirationListener,當與zookeeper的鏈接斷開重連以後會回調handleNewSession(以下),主要工做是清理當前的controller狀態並從新elect:
/** * Called after the zookeeper session has expired and a new session has been created. You would have to re-create * any ephemeral nodes here. * * @throws Exception * On any error. */ @throws(classOf[Exception]) def handleNewSession() { info("ZK expired; shut down all controller components and try to re-elect") inLock(controllerContext.controllerLock) { onControllerResignation() controllerElector.elect } } }
其中onControllerResignation(以下):
/** * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is * required to clean up internal controller data structures */ def onControllerResignation() { inLock(controllerContext.controllerLock) { if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() deleteTopicManager.shutdown() Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() replicaStateMachine.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } } }
其中shutdown(以下):
/** * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared */ def shutdown() { deleteTopicsThread.shutdown() topicsToBeDeleted.clear() topicsIneligibleForDeletion.clear() }
其中shutdown(以下):
def shutdown(): Unit = { info("Shutting down") isRunning.set(false) if (isInterruptible) interrupt() shutdownLatch.await() info("Shutdown completed") }
第二個堆棧中線程就是卡在這一行,除了這剛纔說到的SessionExpirationListener,kafka在啓動時還會啓動一個KafkaHealthcheck:
kafkaController = new KafkaController(config, zkClient) /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() /* tell everyone we are alive */ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup()
其中也會註冊SessionExpireListener(以下),這裏會在zk上/brokers/ids下注冊broker結點:
/** * Called after the zookeeper session has expired and a new session has been created. You would have to re-create * any ephemeral nodes here. * * @throws Exception * On any error. */ @throws(classOf[Exception]) def handleNewSession() { info("re-registering broker info in ZK for broker " + brokerId) register() info("done re-registering broker") info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) }
可是這個listener是在controller以後註冊的,在ZkClient代碼中是經過list存放這些listener,而且回調的時候是逐個串行回調,因此若是一個broker上有controller,則必須在controller的handleNewSession以後纔會調用KafkaHealthcheck的handleNewSession,因此當controller的handleNewSession卡住以後就不會到/brokers/ids下注冊broker節點,看起來就是broker節點從zk上消失,卡住是由於controller在等DeleteTopicsThread作shutdown,DeleteTopicsThread的工做流程是:
override def run(): Unit = { info("Starting ") try{ while(isRunning.get()){ doWork() } } catch{ case e: Throwable => if(isRunning.get()) error("Error due to ", e) } shutdownLatch.countDown() info("Stopped ") }
第一個堆棧就是卡在doWork(以下)上:
class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient override def doWork() { inLock(controllerContext.controllerLock) { awaitTopicDeletionNotification() val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
其中awaitTopicDeletionNotification(以下):
/** * Invoked by the delete-topic-thread to wait until events that either trigger, restart or halt topic deletion occur. * controllerLock should be acquired before invoking this API */ private def awaitTopicDeletionNotification() { while(!deleteTopicStateChanged) { info("Waiting for signal to start or continue topic deletion") deleteTopicsCond.await() } deleteTopicStateChanged = false }
這個condition只有在resumeTopicDeletionThread方法中被喚醒:
/** * Signals the delete-topic-thread to process topic deletion */ private def resumeTopicDeletionThread() { deleteTopicStateChanged = true deleteTopicsCond.signal() }
這個resumeTopicDeletionThread方法會在4種情形中調用,不然doWork會一直卡住;
簡單來講,controller在zookeeper鏈接斷開重連以後,會嘗試onControllerResignation(清理以前的controller狀態)並從新elect,onControllerResignation會等待DeleteTopicsThread退出,而DeleteTopicsThread卡在doWork上致使controller流程被卡住,只有幾種情形下才會正常執行完doWork(這個有隨機性);這個問題在新版的kafka中被修復(0.9及以上),其中shutdown方法被修改成:
/** * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared. */ def shutdown() { // Only allow one shutdown to go through if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) { // Resume the topic deletion so it doesn't block on the condition resumeTopicDeletionThread() // Await delete topic thread to exit deleteTopicsThread.awaitShutdown() topicsToBeDeleted.clear() partitionsToBeDeleted.clear() topicsIneligibleForDeletion.clear() } }
其中initiateShutdown和awaitShutdown方法以下:
def initiateShutdown(): Boolean = { if(isRunning.compareAndSet(true, false)) { info("Shutting down") isRunning.set(false) if (isInterruptible) interrupt() true } else false } /** * After calling initiateShutdown(), use this API to wait until the shutdown is complete */ def awaitShutdown(): Unit = { shutdownLatch.await() info("Shutdown completed") }
可見是將0.8版本中的shutdown方法拆成initiateShutdown和awaitShutdown方法,並在中間調用resumeTopicDeletionThread方法避免在doWork上卡住,升級到至少0.9版本就能夠解決,官方升級流程以下:
0.9.0.0 has potential breaking changes (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.
For a rolling upgrade:
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
詳見:http://kafka.apache.org/documentation/#upgrade_9