線上出現一臺服務器特別慢,因而關閉了服務器上的kafka broker. 關閉後發現一些kafka consumer沒法正常消費數據了, 日誌錯誤:o.a.kakfa.clients.consumer.internals.AbstractCordinator Marking the coordinator (39.0.2.100) as dead.
apache
通過一番排查,發現consumer group信息:
(kafka.coordinator.GroupMetadataMessageFormatter類型):
groupId::[groupId,Some(consumer),groupState,Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs], ...->[]...)],
存到了KAFKA內部topic: __consumer_offsets
裏, , 它的key是 groupId.
同時發現broker 參數 offsets.topic.replication.factor 錯誤地被設置爲1. 這個參數表示TOPIC: __Consumer_offsets
的副本數. 這樣一旦某個broker被關閉, 若是被關閉的Broker 是__Consumer_offsets
的某些partition的Leader. 則致使某些consumer group 不可用. 若是一旦broker已經啓動, 須要手工經過命令行來擴展副本數. json
reassignment.json: {"version":1, "partitions": [{"topic": "xxx", "partition": 0, "replicas": {brokerId1, brokerId2}}] } kafka-reassign-partitions --zookeeper localhost:2818 --reassignment-json-file reassignment.json --execute
客戶端尋找Consumer Coordinator的過程:
客戶端 org.apache.kafka.clients.consumer.internals.AbstractCoordinator
若是Coordinator 未知 (AbstractCoordinator.coordinatorUnknown()), 發起請求 lookupCoordinator,向負載最低的節點發送FindCoordinatorRequest 服務器
服務端 KafkaApis.handleFindCoordinatorRequest 接收請求:
首先調用 GroupMetaManager.partitionFor(consumerGroupId) consunerGroupId 的 hashCode 對 __consumer_offsets
總的分片數取模獲取partition id 再從 __consumer_offset
這個Topic 中找到partition對應的 Partition Metadata, 而且獲取對應的Partition leader 返回給客戶端 session
KAFKA 的failover機制到底是怎麼樣的?假使 __consumer_offset
設置了正確的副本數,重選舉的過程是怎樣的. 若是broker宕機後致使某些副本不可用, 副本會自動遷移到其餘節點嗎?帶着這些問題稍微閱讀了一下KAFKA的相關代碼: ide
當一個Broker 被關掉時, 會有兩步操做:
KafkaController.onBrokerFailure ->KafkaController.onReplicasBecomeOffline
主要是經過 PartitionStateMachine.handleStateChanges 方法通知Partition狀態機將狀態置爲offline. ReplicaStateMachine.handleStateChanges方法會將Replica 狀態修改成OfflineReplica, 同時修改partition ISR. 若是被關閉broker 是partition leader 那麼須要從新觸發partition leader 選舉,最後發送LeaderAndIsrRequest獲取最新的Leader ISR 信息.
KafkaController.unregisterBrokerModificationsHandler 取消註冊的BrokerModificationsHandler 並取消zookeeper 中broker 事件的監聽. 函數
當ISR請求被髮出,KafkaApis.handleLeaderAndIsrRequest() 會被調用. 這裏若是須要變動leader的partition是屬於__consumer_offset
這個特殊的topic,取決於當前的broker節點是否是partition leader. 會分別調用GroupCoordinator.handleGroupImmigration 和 GroupCoordinator.handleGroupEmmigration. 若是是partition leader, GroupCoordinator.handleGroupImmigration -> GroupMetadataManager.loadGroupsForPartition 會從新從 __consumer_offset
讀取group數據到本地metadata cache, 若是是partition follower, GroupCoordniator.handleGroupImigration -> GroupMetadataManager.removeGroupsForPartition 會從metadata cache中移除group信息. 並在onGroupUnloaded回調函數中將group的狀態變動爲dead. 同時通知全部等待join或者sync的組成員..net
KAFKA在Broker關閉時不會自動作partition 副本的遷移. 這時被關閉的Broker上的副本變爲under replicated 狀態. 這種狀態將持續直到Broker被從新拉起而且追上新的數據, 或者用戶經過命令行 手動複製副本到其餘節點. 命令行
官方建議設置兩個參數來保證graceful shutdown. controlled.shutdown.enable=true auto.leader.rebalance.enable=true前者保證關機以前將日誌數據同步到磁盤,並進行重選舉. 後者保證在broker從新恢復後再次得到宕機前leader狀態. 避免leader分配不均勻致使讀寫熱點. 日誌
https://blog.csdn.net/zhanglh046/article/details/72833129
https://blog.csdn.net/huochen1994/article/details/80511038
https://www.jianshu.com/p/1aba6e226763code