記一次kafka客戶端NOT_COORDINATOR_FOR_GROUP處理過程

轉發請註明原創地址:https://www.cnblogs.com/dongxiao-yang/p/10602799.htmlhtml

某日晚高峯突然集羣某個大流量業務收到lag報警,查看客戶端日誌發現reblance一直沒法成功,日誌以下java

根據客戶端日誌顯示consumer在嘗試joingroup的過程當中收到了服務端COORDINATOR狀態不正常的信息,懷疑是服務端負責這個consumer-group的broker在coordinator元信息管理上出現了問題。算法

因而跑到對應的節點上看一下server日誌,發如今一臺剛纔有太重啓的服務節點上產生以下日誌apache

Failed to append 363 tombstones to __consumer_offsets-38 for expired/deleted offsets and/or metadata for group consumer-group. (kafka.coordinator.GroupMetadataManager)
org.apache.kafka.common.errors.NotLeaderForPartitionException: Leader not local for partition __consumer_offsets-38 on broker 。app

懷疑是這個服務重啓的過程當中__consumer_offset分區有部分數據或者文件有異常致使coordinator沒法提供服務致使,停掉有問題節點後發現客戶端reblance很快就成功了,因而懷疑問題節點產生了壞文件,後續刪除對應分區能夠重啓成功服務,可是對應group的業務又開始報錯jvm


20 Mar 2019 15:31:32,000 INFO  [PollableSourceRunner-KafkaSource-bl_app_event_detail_source] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle:542)  - Offset commit for group consumer-group failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry
20 Mar 2019 15:31:32,001 INFO  [PollableSourceRunner-KafkaSource-bl_app_event_detail_source] (org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead:529)  - Marking the coordinator 2147483543 dead.this

 

kafka 自從0.9以來摒棄了consumer把offset存在zk的作法而是都存到了__consumer_offsets這個系統topic裏面,同時consumer端的reblance都是依靠server端的coordinator負責調度協調。至於每一個group怎麼選擇對應broker節點是根據下面這個簡單的hashcode對__consumer_offsets分區數取模的算法得出來的,spa

def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

因此看上去是重啓節點拉起來後客戶端發現對應的offset分區leader又活了,可是活過來的leader卻告知客戶端NOT_COORDINATOR_FOR_GROUP這個矛盾。可是明明有問題的offset文件已經被手動刪除掉了,從新拉副本也成功了,爲何仍是會有join group不成功的現象呢。線程

繼續查看問題節點,發現問題節點在Loading group metadata for之類的日誌的時候一直沒有輸出對應的問題group相關日誌,初步判斷broker重啓過程當中load group信息的時效出了問題。scala

 

 def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) { // for each new leader or follower, call coordinator to handle consumer group migration. // this callback is invoked under the replica state change lock to ensure proper order of // leadership changes
      updatedLeaders.foreach { partition =>
        if (partition.topic == GROUP_METADATA_TOPIC_NAME) groupCoordinator.handleGroupImmigration(partition.partitionId) else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch) }

如上述代碼所示,kafka在offset分區從新被選舉爲leader的時候纔會去加載對應的group信息,並且全部新leader是foreach單線程循環,若是其中有一個慢的剩下的group都會受到影響。查看問題節點果真除了被刪掉的offset分區還有一個分區offset歷史文件不少,多到500G的體量,這和offset這種只保存最新數據的場景明顯是不符合的,這個大小會致使服務端加載offset信息長到沒法接受的程度。

爲了儘快回覆offset元信息,把問題節點的offset partition全都從新分配到其餘節點,在重分配的過程當中發現新的副本會不斷的刪除同步過來的過時數據最後結束後整個分區的大小隻有幾十M,因而堅決了原來分區大小不正常的判斷 。對於__consumer_offsets這種compact策略的topic,kafka內部是有一個專門的logcleaner線程負責日誌的合併,可是剛開始出問題的節點通過了幾回重啓,原始的現場早已不存在,因而把整個集羣每一個服務挨個查了一遍,果真在另外一臺看似正常的機器上一樣發現了一個很大的offset分區,jstack了一下,發現kafka-log-cleaner-thread這個線程已經沒了!重啓該服務後發現問題分區的日誌也開始正常刪除。惋惜的是因爲服務日誌只保留了最近7天的,kafka-log-cleaner-thread的錯誤日誌已經找不到了,這個有待後續復現確認。

 

回顧了一下處理問題過程當中出現的其餘現象,其實都是有提示的,像是關掉問題節點的時候server日誌會報

WARN Map failed (kafka.utils.CoreUtils$)
java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:111)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:101)

以及kafka jvm第一次崩掉的hs_err_pid日誌會提示內存不足

Native memory allocation (mmap) failed to map 65536 bytes for committing reserved memory

因爲kafka使用的mmap方式映射了數據文件以及索引,這個mmap failed就已經提示了文件過多。

 

 

結論:kafka的offset數據每一個group會根據hash取模的方式發到一個固定的_consumer_offsets分區中,_consumer_offsets分區的leader負責對應groupid的coordinator服務,_consumer_offsets

的刪除是由kafka-log-cleaner-thread執行的,這個線程個數默認是1,若是線程崩掉了offset歷史分區文件會一直沒法刪除,致使jvm崩掉而且服務恢復的時候group元信息長時間的沒法加載致使reblacne報錯。

相關文章
相關標籤/搜索