記一次 Kafka 排錯

環境信息

CentOS 7.3
Kafka 使用默認配置, 單獨啓動 Zookeeper , 不使用自帶的 zk ,
Kafka 和 Zookeeper 在同一臺主機上, 均爲單節點java

問題現象

使用 kafka 測試隊列正常, Java 代碼沒法正常接收隊列消息node

相關代碼

 pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.6.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
    
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.2.2.RELEASE</version>
</dependency>

application.propertiesgit

spring.kafka.consumer.group-id=junbaor-test-group
spring.kafka.bootstrap-servers=10.4.82.141:9092

App.javagithub

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }

    @KafkaListener(topics = "junbaor-test")
    public void test(String s) {
        System.out.println(s);
    }

}

日誌

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.6.RELEASE)

2017-09-05 14:56:50.971  INFO 52968 --- [           main] com.example.demo.DemoApplication         : Starting DemoApplication on Junbaor-PC with PID 52968 (D:\Project\github\demo\target\classes started by junbaor in D:\Project\github\demo)
2017-09-05 14:56:50.973  INFO 52968 --- [           main] com.example.demo.DemoApplication         : No active profile set, falling back to default profiles: default
2017-09-05 14:56:51.023  INFO 52968 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6bd61f98: startup date [Tue Sep 05 14:56:51 CST 2017]; root of context hierarchy
2017-09-05 14:56:51.463  INFO 52968 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$a05e7a75] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2017-09-05 14:56:51.714  INFO 52968 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2017-09-05 14:56:51.746  INFO 52968 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2017-09-05 14:56:51.763  INFO 52968 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [10.4.82.141:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = junbaor-test-group
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2017-09-05 14:56:51.822  INFO 52968 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.0
2017-09-05 14:56:51.822  INFO 52968 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cb8625948210849f
2017-09-05 14:56:59.155  INFO 52968 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 8.466 seconds (JVM running for 10.586)
2017-09-05 14:56:59.259  INFO 52968 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator wkfg-1:9092 (id: 2147483647 rack: null) for group junbaor-test-group.
2017-09-05 14:57:06.036  INFO 52968 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator wkfg-1:9092 (id: 2147483647 rack: null) dead for group junbaor-test-group

最後一行spring

Marking the coordinator wkfg-1:9092 (id: 2147483647 rack: null) dead for group junbaor-test-group

被標記爲死亡, 不能接收消息的緣由可能就是消費者死亡致使的apache

分析過程

從 log 能夠分析這句是 AbstractCoordinator 類打印的, 咱們找到打印這行信息的代碼bootstrap

clipboard.png

緣由是 this.coordinator != null, 打上斷點看一下 coordinator 是什麼東東windows

clipboard.png

wkfg-1 是 Kafka 實例所在服務器的主機名,
9092 是 kafka 的端口,這玩意好像是 Kafka 的鏈接地址
乍一看, 以爲沒什麼問題 (其實問題就出在這裏)服務器


this.coordinator 是何時賦值的呢, 往上追蹤, 找到上一個調用的方法
點擊調用棧中的上一個方法session

clipboard.png

跳到了這裏

clipboard.png

既然 coordinator 不爲空, 那進入代碼塊必定是由於 client.connectionFailed(coordinator)
從語義分析是由於客戶端鏈接 coordinator 失敗
無論什麼緣由引發的, 先點進去再說

clipboard.png

看到形參名是 node, 這個對象應該就是 kafka 的節點信息,
點開查看一下對象的具體屬性

clipboard.png

問題定位

注意看上圖的 host 屬性, host 的意思通常是主機.
局域網內, 經過主機名是沒法訪問的。
通常是經過 IP 、域名、或者修改 hosts 文件把主機名和 IP 對應起來
定位後,咱們嘗試用最簡單的方法解決問題.

嘗試解決

上面幾個方案實施起來最簡單的就是修改本機 hosts 文件

windows 系統 hosts 文件位於 C:\Windows\System32\drivers\etc\hosts
使用管理員權限打開, 追加 IP 和 主機名對應關係

10.4.82.141       wkfg-1

再次啓動項目, 日誌以下

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.6.RELEASE)

2017-09-05 16:06:45.862  INFO 53000 --- [           main] com.example.demo.App                     : Starting App on Junbaor-PC with PID 53000 (D:\Project\github\demo\target\classes started by junbaor in D:\Project\github\demo)
2017-09-05 16:06:45.867  INFO 53000 --- [           main] com.example.demo.App                     : No active profile set, falling back to default profiles: default
2017-09-05 16:06:45.963  INFO 53000 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@52b1beb6: startup date [Tue Sep 05 16:06:45 CST 2017]; root of context hierarchy
2017-09-05 16:06:46.838  INFO 53000 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$2436eacd] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2017-09-05 16:06:47.184  INFO 53000 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2017-09-05 16:06:47.248  INFO 53000 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2017-09-05 16:06:47.308  INFO 53000 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [10.4.82.141:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = junbaor-test-group
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2017-09-05 16:06:47.412  INFO 53000 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.0
2017-09-05 16:06:47.413  INFO 53000 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cb8625948210849f
2017-09-05 16:06:47.432  INFO 53000 --- [           main] com.example.demo.App                     : Started App in 1.927 seconds (JVM running for 2.774)
2017-09-05 16:06:47.519  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator wkfg-1:9092 (id: 2147483647 rack: null) for group junbaor-test-group.
2017-09-05 16:06:47.525  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group junbaor-test-group
2017-09-05 16:06:47.525  INFO 53000 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-09-05 16:06:47.526  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group junbaor-test-group
2017-09-05 16:06:47.765  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group junbaor-test-group with generation 1
2017-09-05 16:06:47.766  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [junbaor-test-0, junbaor-test-1, junbaor-test-2] for group junbaor-test-group
2017-09-05 16:06:47.767  INFO 53000 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[junbaor-test-0, junbaor-test-1, junbaor-test-2]

log 顯示已加入 group , 分區已經分配

使用 Kafka 自帶的命令往 Topic 發一條消息試試可否收到

[junbaor@wkfg-1 bin]$ ./kafka-console-producer.sh --topic junbaor-test --broker-list 127.0.0.1:9092
>test
>

clipboard.png

至此, 問題解決

方案總結

可能由於沒有給 Kafka 設置監聽地址致使的默認監聽主機名

clipboard.png

在配置中果真搜索到相似選項, 按照註釋的意思就是會廣播給消費者和生產者的地址.
咱們按照要求改爲 advertised.listeners=PLAINTEXT://10.4.82.141:9092
恢復本機 hosts 文件經測試一樣解決了問題

知識點

咱們在 application.properties 中已經指定 spring.kafka.bootstrap-servers 爲 IP, 爲何還會使用主機名連接呢?

推測客戶端是先鏈接到 Kafka 實例後會從 zk 中獲取配置
而後客戶端 watch zk 節點獲得配置地址後纔開始監聽隊列。

clipboard.png

相關文章
相關標籤/搜索