安裝步驟:
下載 http://kafka.apache.org/downloads.htmlhtml
解壓 java
tar -zxvf kafka_2.10-0.8.1.1.tgz cd kafka_2.10-0.8.1.1
啓動服務: 首先啓動zookeeper服務 node
bin/zookeeper-server-start.sh config/zookeeper.properties &
啓動Kafka linux
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
建立topic 建立一個"test"的topic,一個分區一個副本 apache
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看主題 app
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主題詳情 學習
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
刪除主題 ui
bin/kafka-run-class.sh kafka.admin.TopicCommand –delete --topic test --zookeeper 10.1.10.77:2181
建立生產者 producer spa
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
增長分區數
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic my_topic_name--partitions 40
增長配置
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic my_topic_name --config flush.messages=1
刪除配置
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic my_topic_name --delete-config flush.messages=1
目前刪除操做在默認狀況下只是打上一個刪除的標記,在從新啓動kafka 後才刪除。若是須要當即刪除,則須要在server.properties中配置
delete.topic.enable=true
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic my_topic_name
問題一:記得第一次安裝時候,java客戶端卻始終調不通,調了好久,一直不通,都是是按網上步驟一步一步安裝的,安裝也沒有報錯,直接在linux上面經過如下命、
建立生產者 producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
服務端是ok的,可以發送消息,
建立消費者 consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
這個也能收到消息
可是java 客戶端就是調不通,重試3次就拋異常了
解決方案:
編輯 這個文件打開下面紅色部分加上去,重啓kafka就解決啦
[root@localhost kafka_2.11-0.10.1.0]# vi ./config/server.properties vi ./config/server.properties
#listeners=PLAINTEXT://:9092 steners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 advertised.listeners=PLAINTEXT://120.77.155.63:9092
參考連接:http://www.javacoder.cn/?p=849&utm_source=tuicool&utm_medium=referral
問題二:kafka跑一端時間【通常2個小時】就掛了,kafka進程還在,可是發送不了數據,也消費不了,就好像阻塞了同樣,zookeeper沒有掛
錯誤信息以下:
kafka.common.KafkaException: Controller doesn't exist at kafka.utils.ZkUtils.getController(ZkUtils.scala:153) at kafka.server.KafkaServer.networkClientControlledShutdown$1(KafkaServer.scala:373) at kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:522) at kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:544) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:76) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:47) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:47) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:544) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:49) at kafka.Kafka$$anon$1.run(Kafka.scala:63) [2016-12-15 19:53:08,006] INFO [Socket Server on Broker 0], Shutting down (kafka.network.SocketServer) [2016-12-15 19:53:08,011] INFO [Socket Server on Broker 0], Shutdown completed (kafka.network.SocketServer) [2016-12-15 19:53:08,011] INFO [Kafka Request Handler on Broker 0], shutting down (kafka.server.KafkaRequestHandlerPool) [2016-12-15 19:53:08,012] INFO [Kafka Request Handler on Broker 0], shut down completely (kafka.server.KafkaRequestHandlerPool) [2016-12-15 19:53:08,014] INFO [ThrottledRequestReaper-Produce], Shutting down (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-12-15 19:53:08,020] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-12-15 19:53:08,020] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2016-12-15 19:53:08,101] INFO [ThrottledRequestReaper-Produce], Shutdown completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-12-15 19:53:08,101] INFO [ThrottledRequestReaper-Produce], Stopped (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-12-15 19:53:08,101] INFO [ThrottledRequestReaper-Fetch], Shutting down (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-12-15 19:53:08,107] INFO [ThrottledRequestReaper-Fetch], Stopped (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-12-15 19:53:08,107] INFO [ThrottledRequestReaper-Fetch], Shutdown completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-12-15 19:53:08,108] INFO [KafkaApi-0] Shutdown complete. (kafka.server.KafkaApis) [2016-12-15 19:53:08,108] INFO [Replica Manager on Broker 0]: Shutting down (kafka.server.ReplicaManager) [2016-12-15 19:53:08,109] INFO [ReplicaFetcherManager on broker 0] shutting down (kafka.server.ReplicaFetcherManager) [2016-12-15 19:53:08,109] INFO [ReplicaFetcherManager on broker 0] shutdown completed (kafka.server.ReplicaFetcherManager) [2016-12-15 19:53:08,109] INFO [ExpirationReaper-0], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:08,120] INFO [ExpirationReaper-0], Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:08,120] INFO [ExpirationReaper-0], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:08,120] INFO [ExpirationReaper-0], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:08,157] INFO [ExpirationReaper-0], Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:08,157] INFO [ExpirationReaper-0], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:08,577] INFO [Replica Manager on Broker 0]: Shut down completely (kafka.server.ReplicaManager) [2016-12-15 19:53:08,578] INFO Shutting down. (kafka.log.LogManager) [2016-12-15 19:53:08,813] INFO re-registering broker info in ZK for broker 0 (kafka.server.KafkaHealthcheck$SessionExpireListener) [2016-12-15 19:53:08,813] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-12-15 19:53:08,855] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-12-15 19:53:08,869] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(10.1.10.80,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2016-12-15 19:53:08,869] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener) [2016-12-15 19:53:08,869] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener) [2016-12-15 19:53:08,881] INFO Shutdown complete. (kafka.log.LogManager) [2016-12-15 19:53:08,882] INFO [GroupCoordinator 0]: Shutting down. (kafka.coordinator.GroupCoordinator) [2016-12-15 19:53:08,882] INFO [ExpirationReaper-0], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:09,002] INFO [ExpirationReaper-0], Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:09,002] INFO [ExpirationReaper-0], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:09,002] INFO [ExpirationReaper-0], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:09,157] INFO [ExpirationReaper-0], Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:09,157] INFO [ExpirationReaper-0], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-12-15 19:53:09,157] INFO [GroupCoordinator 0]: Shutdown complete. (kafka.coordinator.GroupCoordinator) [2016-12-15 19:53:09,512] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer)
解決方案以下:
把以前的啓動方式改爲後臺守護進程的方式啓動,後面就沒有再掛了。
./bin/kafka-server-start.sh -daemon config/server.properties
若有其它問題能夠留言或email我wei.liu2021@gmail.com