Kafka系列2-producer和consumer報錯

1. 使用127.0.0.1啓動生產和消費進程:java

 

1)啓動生產者進程:apache

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic testfetch

輸入消息:this

this is msgspa

 

 

生產者進程報錯:.net

 

[plain]  view plain  copy
 
  1. [2016-06-03 11:33:47,934] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)  
  2. [2016-06-03 11:33:49,554] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)  
  3. [2016-06-03 11:33:51,177] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)  
  4. [2016-06-03 11:33:53,398] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)  
 
 

2)啓動消費者進程:scala

bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning日誌

 

消費者進程報錯:server

 

[plain]  view plain  copy
 
  1. [2016-06-03 11:34:53,574] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)  
  2. java.nio.channels.ClosedChannelException  
  3.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
  4.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
  5.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
  6.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
  7.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
  8.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
  9.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
  10.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
  11. [2016-06-03 11:34:53,651] WARN [console-consumer-72675_zzs-1464924871670-2192d80a-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)  
  12. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed  
  13.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)  
  14.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
  15.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
  16.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
  17. Caused by: java.nio.channels.ClosedChannelException  
  18.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
  19.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
  20.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
  21.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
  22.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
  23.     ... 3 more  
  24. [2016-06-03 11:35:14,916] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)  
  25. java.nio.channels.ClosedChannelException  
  26.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
  27.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
  28.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
  29.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
  30.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
  31.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
  32.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
  33.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
  34. [2016-06-03 11:35:14,918] WARN [console-consumer-72675_zzs-1464924871670-2192d80a-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)  
  35. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed  
  36.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)  
  37.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
  38.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
  39.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
  40. Caused by: java.nio.channels.ClosedChannelException  
  41.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
  42.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
  43.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
  44.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
  45.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
  46.     ... 3 more  



 

 

2 使用localhost啓動生產和消費進程:blog

 

1)啓動生產者進程:

 

 

[plain]  view plain  copy
 
  1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
  2. this is msg  
  3. [2016-06-03 11:44:16,932] WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
  4. [2016-06-03 11:44:18,255] WARN Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
  5. [2016-06-03 11:44:18,648] WARN Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
  6. [2016-06-03 11:44:18,801] WARN Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
  7. [2016-06-03 11:44:18,928] WARN Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
  8. [2016-06-03 11:44:19,035] WARN Error while fetching metadata with correlation id 5 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
  9. [2016-06-03 11:44:19,180] WARN Error while fetching metadata with correlation id 6 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  
  10. [2016-06-03 11:44:19,308] WARN Error while fetching metadata with correlation id 7 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)  

 

 

2)啓動消費者進程:

[plain]  view plain  copy
 
  1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
  2. [2016-06-03 11:45:18,330] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)  
  3. java.nio.channels.ClosedChannelException  
  4.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
  5.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
  6.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
  7.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
  8.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
  9.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
  10.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
  11.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
  12. [2016-06-03 11:45:18,541] WARN [console-consumer-42554_zzs-1464925496436-ae2ee9c7-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)  
  13. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed  
  14.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)  
  15.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)  
  16.     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)  
  17.     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)  
  18. Caused by: java.nio.channels.ClosedChannelException  
  19.     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)  
  20.     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)  
  21.     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)  
  22.     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)  
  23.     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)  
  24.     ... 3 more  



3.解決問題

1)查看Kafka的配置文件,cat config/server.properties

 

[plain]  view plain  copy
 
  1. zookeeper.connect=localhost:2181  

 

鏈接的zookeeper的爲localhost,因此須要用localhost啓動生產和消費進程

 

2)查看kafka啓動的日誌,發現

 

[plain]  view plain  copy
 
  1. Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(218.30.64.194,9092,PLAINTEXT) (kafka.utils.ZkUtils)  
[plain]  view plain  copy
 
  1.   

爲何啓動的broker的ip是 218.30.64.194

 

==> 沒有綁定Kafka啓動監聽的host信息

vi  config/server.properties

[plain]  view plain  copy
 
  1. listeners=PLAINTEXT://localhost:9092  



 

3)從新啓動zookeeper、kafka、consumer、producer

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

./bin/kafka-server-start.sh ./config/server.properties

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 

在producer中輸入消息,能夠在producer中消費:

 

[plain]  view plain  copy
 
  1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
  2. this is msg  
  3. this is msg2  

 

[plain]  view plain  copy
 
  1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
  2. this is msg  
  3. this is msg2  
  4. this is msg3  
  5. this is mgs4  



 

問題解決!

相關文章
相關標籤/搜索