kafka安裝,管理,以及常見問題的解決【如:服務端是ok的,java客戶端卻始終調不通】

安裝步驟:

下載     http://kafka.apache.org/downloads.htmlhtml

解壓      java

tar -zxvf kafka_2.10-0.8.1.1.tgz 
cd kafka_2.10-0.8.1.1

啓動服務:  首先啓動zookeeper服務      node

 bin/zookeeper-server-start.sh config/zookeeper.properties &    

啓動Kafka  linux

bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

建立topic         建立一個"test"的topic,一個分區一個副本        apache

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看主題       app

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看主題詳情      學習

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

刪除主題      ui

bin/kafka-run-class.sh kafka.admin.TopicCommand –delete --topic test --zookeeper 10.1.10.77:2181

 

Kafka學習整理八(topic管理) 添加,修改分區等

建立生產者 producer spa

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

建立消費者 consumer從頭開始消費

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

建立消費者 consumer從最新消費.net

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

增長分區數

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic my_topic_name--partitions 40

增長配置

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic my_topic_name --config flush.messages=1

刪除配置

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic my_topic_name --delete-config flush.messages=1

 

(四)、刪除topic

目前刪除操做在默認狀況下只是打上一個刪除的標記,在從新啓動kafka 後才刪除。若是須要當即刪除,則須要在server.properties中配置

delete.topic.enable=true

 

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic my_topic_name

 

問題一:記得第一次安裝時候java客戶端卻始終調不通,調了好久,一直不通,都是是按網上步驟一步一步安裝的,安裝也沒有報錯,直接在linux上面經過如下命、

建立生產者 producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

服務端是ok的,可以發送消息,

建立消費者 consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

這個也能收到消息

可是java 客戶端就是調不通,重試3次就拋異常了

解決方案:

 編輯 這個文件打開下面紅色部分加上去,重啓kafka就解決啦

[root@localhost kafka_2.11-0.10.1.0]# vi ./config/server.properties  vi ./config/server.properties

#listeners=PLAINTEXT://:9092
steners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://120.77.155.63:9092

參考連接:http://www.javacoder.cn/?p=849&utm_source=tuicool&utm_medium=referral

問題二:kafka跑一端時間【通常2個小時】就掛了,kafka進程還在,可是發送不了數據,也消費不了,就好像阻塞了同樣,zookeeper沒有掛

錯誤信息以下:

kafka.common.KafkaException: Controller doesn't exist
	at kafka.utils.ZkUtils.getController(ZkUtils.scala:153)
	at kafka.server.KafkaServer.networkClientControlledShutdown$1(KafkaServer.scala:373)
	at kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:522)
	at kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:544)
	at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:76)
	at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
	at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:47)
	at kafka.utils.Logging$class.swallow(Logging.scala:94)
	at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:47)
	at kafka.server.KafkaServer.shutdown(KafkaServer.scala:544)
	at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:49)
	at kafka.Kafka$$anon$1.run(Kafka.scala:63)
[2016-12-15 19:53:08,006] INFO [Socket Server on Broker 0], Shutting down (kafka.network.SocketServer)
[2016-12-15 19:53:08,011] INFO [Socket Server on Broker 0], Shutdown completed (kafka.network.SocketServer)
[2016-12-15 19:53:08,011] INFO [Kafka Request Handler on Broker 0], shutting down (kafka.server.KafkaRequestHandlerPool)
[2016-12-15 19:53:08,012] INFO [Kafka Request Handler on Broker 0], shut down completely (kafka.server.KafkaRequestHandlerPool)
[2016-12-15 19:53:08,014] INFO [ThrottledRequestReaper-Produce], Shutting down (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-12-15 19:53:08,020] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-12-15 19:53:08,020] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-12-15 19:53:08,101] INFO [ThrottledRequestReaper-Produce], Shutdown completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-12-15 19:53:08,101] INFO [ThrottledRequestReaper-Produce], Stopped  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-12-15 19:53:08,101] INFO [ThrottledRequestReaper-Fetch], Shutting down (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-12-15 19:53:08,107] INFO [ThrottledRequestReaper-Fetch], Stopped  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-12-15 19:53:08,107] INFO [ThrottledRequestReaper-Fetch], Shutdown completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-12-15 19:53:08,108] INFO [KafkaApi-0] Shutdown complete. (kafka.server.KafkaApis)
[2016-12-15 19:53:08,108] INFO [Replica Manager on Broker 0]: Shutting down (kafka.server.ReplicaManager)
[2016-12-15 19:53:08,109] INFO [ReplicaFetcherManager on broker 0] shutting down (kafka.server.ReplicaFetcherManager)
[2016-12-15 19:53:08,109] INFO [ReplicaFetcherManager on broker 0] shutdown completed (kafka.server.ReplicaFetcherManager)
[2016-12-15 19:53:08,109] INFO [ExpirationReaper-0], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:08,120] INFO [ExpirationReaper-0], Stopped  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:08,120] INFO [ExpirationReaper-0], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:08,120] INFO [ExpirationReaper-0], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:08,157] INFO [ExpirationReaper-0], Stopped  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:08,157] INFO [ExpirationReaper-0], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:08,577] INFO [Replica Manager on Broker 0]: Shut down completely (kafka.server.ReplicaManager)
[2016-12-15 19:53:08,578] INFO Shutting down. (kafka.log.LogManager)
[2016-12-15 19:53:08,813] INFO re-registering broker info in ZK for broker 0 (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2016-12-15 19:53:08,813] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-12-15 19:53:08,855] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-12-15 19:53:08,869] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(10.1.10.80,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-12-15 19:53:08,869] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2016-12-15 19:53:08,869] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2016-12-15 19:53:08,881] INFO Shutdown complete. (kafka.log.LogManager)
[2016-12-15 19:53:08,882] INFO [GroupCoordinator 0]: Shutting down. (kafka.coordinator.GroupCoordinator)
[2016-12-15 19:53:08,882] INFO [ExpirationReaper-0], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:09,002] INFO [ExpirationReaper-0], Stopped  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:09,002] INFO [ExpirationReaper-0], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:09,002] INFO [ExpirationReaper-0], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:09,157] INFO [ExpirationReaper-0], Stopped  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:09,157] INFO [ExpirationReaper-0], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-15 19:53:09,157] INFO [GroupCoordinator 0]: Shutdown complete. (kafka.coordinator.GroupCoordinator)
[2016-12-15 19:53:09,512] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer)

解決方案以下:

把以前的啓動方式改爲後臺守護進程的方式啓動,後面就沒有再掛了。

./bin/kafka-server-start.sh -daemon config/server.properties

 

若有其它問題能夠留言或email我wei.liu2021@gmail.com

相關文章
相關標籤/搜索