Kafka的使用和錯誤解決node
1、下載kafka解壓縮:配置環境變量shell
vim /etc/profile export KAFKA_HOME=/root/kafka_2.11-1.0.0 export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile
二 、kafka中須要使用zookeeper
(一)使用kafka自帶的zookeeperapache
先將zookeeper啓動,若是在僞分佈式下,kafka已經集成了zk,在kafka中的config目錄下。bootstrap
能夠編輯config/zookeeper.properties修改zookeeper的端口號。
後臺啓動zookeeper: [root@mail bin]# nohup zookeeper-server-start.sh ../config/zookeeper.properties &
vim
`[root@mail bin]# nohup kafka-server-start.sh ../config/server.properties &`
3.測試:模擬消息的消費和生產緩存
(1)建立主題服務器
[root@mail bin]# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1 Created topic "KafkaTestTopic".
(2)建立生產者分佈式
[root@mail bin]# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092
查看server.properties中的#listeners=PLAINTEXT://:9092,獲取kafka的端口ide
(3)建立消費者測試
[root@mail bin]# kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2281
(二)使用非kafka自帶的zookeeper
使用zookeeper(非kafka自帶) [root@mail zookeeper-3.4.10]# bin/zkServer.sh start conf/zoo.cfg ZooKeeper JMX enabled by default Using config: conf/zoo.cfg Starting zookeeper ... STARTED (1) 建立主題 [root@mail kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic secondTopic --partitions 1 --replication-factor 1 Created topic "secondTopic". (2)kafka啓動 [root@mail kafka_2.11-1.0.0]# nohup bin/kafka-server-start.sh config/server.properties & (3)kafka生產者 [root@mail kafka_2.11-1.0.0]# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092 (4)kafka消費者 [root@mail kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2181 Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. (5)查看kafka中的數據 [root@mail kafka_2.11-1.0.0]# ls bin config libs LICENSE logs logs-kafka nohup.out NOTICE site-docs [root@mail kafka_2.11-1.0.0]# cd logs-kafka/ #kafka中的數據存儲目錄 ##這個目錄是在kafka的config/server.properties文件中進行配置的 log.dirs=/root/kafka/kafka_2.11-1.0.0/logs-kafka [root@mail logs-kafka]# ls #查看kafka中的主題 cleaner-offset-checkpoint __consumer_offsets-20 __consumer_offsets-33 __consumer_offsets-46 kafka_test-0 __consumer_offsets-0 __consumer_offsets-21 __consumer_offsets-34 __consumer_offsets-47 KafkaTestTopic-0 __consumer_offsets-1 __consumer_offsets-22 __consumer_offsets-35 __consumer_offsets-48 log-start-offset-checkpoint __consumer_offsets-10 __consumer_offsets-23 __consumer_offsets-36 __consumer_offsets-49 meta.properties __consumer_offsets-11 __consumer_offsets-24 __consumer_offsets-37 __consumer_offsets-5 My_LOVE_TOPIC-0 __consumer_offsets-12 __consumer_offsets-25 __consumer_offsets-38 __consumer_offsets-6 mytopic-0 __consumer_offsets-13 __consumer_offsets-26 __consumer_offsets-39 __consumer_offsets-7 recovery-point-offset-checkpoint __consumer_offsets-14 __consumer_offsets-27 __consumer_offsets-4 __consumer_offsets-8 replication-offset-checkpoint __consumer_offsets-15 __consumer_offsets-28 __consumer_offsets-40 __consumer_offsets-9 stock-quotation-0 __consumer_offsets-16 __consumer_offsets-29 __consumer_offsets-41 hello-0 stock-quotation-avro-0 __consumer_offsets-17 __consumer_offsets-3 __consumer_offsets-42 hello-1 stock-quotation-partition-0 __consumer_offsets-18 __consumer_offsets-30 __consumer_offsets-43 hello-2 TEST-TOPIC-0 __consumer_offsets-19 __consumer_offsets-31 __consumer_offsets-44 hello-3 __consumer_offsets-2 __consumer_offsets-32 __consumer_offsets-45 hello-4 [root@mail logs-kafka]# cd KafkaTestTopic-0/ #查看kakfa的主題爲KafkaTestTopic的0號分區 [root@mail KafkaTestTopic-0]# ls 00000000000000000000.index 00000000000000000000.timeindex leader-epoch-checkpoint 00000000000000000000.log 00000000000000000063.snapshot [root@mail KafkaTestTopic-0]# tail -f 000000000000000000.log #kafka中的數據存儲文件 (6)修改kafka的分區數,觀察kafka的變化 ## 修改kafka分區數 [root@mail kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic KafkaTestTopic --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [root@mail kafka_2.11-1.0.0]# ls bin config libs LICENSE logs logs-kafka nohup.out NOTICE site-docs [root@mail kafka_2.11-1.0.0]# cd logs-kafka/ #發現出現kakfa的主題爲KafkaTestTopic的0號分區,1號分區,2號分區,總共3個分區 [root@mail logs-kafka]# ls cleaner-offset-checkpoint __consumer_offsets-20 __consumer_offsets-33 __consumer_offsets-46 kafka_test-0 __consumer_offsets-0 __consumer_offsets-21 __consumer_offsets-34 __consumer_offsets-47 KafkaTestTopic-0 __consumer_offsets-1 __consumer_offsets-22 __consumer_offsets-35 __consumer_offsets-48 KafkaTestTopic-1 __consumer_offsets-10 __consumer_offsets-23 __consumer_offsets-36 __consumer_offsets-49 KafkaTestTopic-2 __consumer_offsets-11 __consumer_offsets-24 __consumer_offsets-37 __consumer_offsets-5 log-start-offset-checkpoint __consumer_offsets-12 __consumer_offsets-25 __consumer_offsets-38 __consumer_offsets-6 meta.properties __consumer_offsets-13 __consumer_offsets-26 __consumer_offsets-39 __consumer_offsets-7 My_LOVE_TOPIC-0 __consumer_offsets-14 __consumer_offsets-27 __consumer_offsets-4 __consumer_offsets-8 mytopic-0 __consumer_offsets-15 __consumer_offsets-28 __consumer_offsets-40 __consumer_offsets-9 recovery-point-offset-checkpoint __consumer_offsets-16 __consumer_offsets-29 __consumer_offsets-41 hello-0 replication-offset-checkpoint __consumer_offsets-17 __consumer_offsets-3 __consumer_offsets-42 hello-1 stock-quotation-0 __consumer_offsets-18 __consumer_offsets-30 __consumer_offsets-43 hello-2 stock-quotation-avro-0 __consumer_offsets-19 __consumer_offsets-31 __consumer_offsets-44 hello-3 stock-quotation-partition-0 __consumer_offsets-2 __consumer_offsets-32 __consumer_offsets-45 hello-4 TEST-TOPIC-0 [root@mail KafkaTestTopic-1]# ls #查看kakfa的主題爲KafkaTestTopic的1號分區 00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint [root@mail KafkaTestTopic-1]# tail -f 00000000000000000000.log
3、可能出現的錯誤:
(1)
[root@mail bin]# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
[2018-11-20 16:44:16,269] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
(kafka.admin.TopicCommand$)
解決:修改server.properties中的:zookeeper.connect=localhost:2281,讓2281端口號和zookeeper.properties中的zookeeper端口號一致,而後重啓kafka。**
(2)
kafka.common.KafkaException: fetching topic metadata for topics [Set(KafkaTestTopic)] from broker [ArrayBuffer(BrokerEndPoint(0,123.125.50.7,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
(3)
[2018-11-20 17:28:53,411] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 52 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,513] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 53 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,617] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 54 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,721] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 55 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解決(2)和(3)的錯誤:
修改server.properties中的
I、listeners=PLAINTEXT://localhost:9092
II、 advertised.listeners=PLAINTEXT://localhost:9092
(4) [2018-11-29 09:44:35,275] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
解決:可能的緣由:kafka未啓動,重啓啓動kafka。
kafka中查看zookeeper狀態:
bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0"
(5)Failed to find leader for Set(KafkaTestTopic-0) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
解決:修改kafka配置文件
[root@mail config]# vim server.properties
修改:advertised.host.name=正確的IP地址
4、Kafka相關操做
(1)查看有哪些主題 [root@mail ~]# kafka-topics.sh --describe --zookeeper localhost:2281 Topic:KafkaTestTopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 在kafka中每一個分區都有一個編號,從0開始; 在kafka中若是有多個副本的話,就會存在leader與follower的關係;Leader表示領導,Leader:0表示當前這個副本爲leader所在的broker是哪個。 (2)只看主題名稱 [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 KafkaTestTopic (3)查看指定主題的信息 [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic (4)查看指定的topic是否存在 [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 --topic KafkaTestTopic 或 [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 | grep KafkaTestTopic (5) 修改主題的分區數 [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic Topic:KafkaTestTopic PartitionCount:3 ReplicationFactor:1 Configs: Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: KafkaTestTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: KafkaTestTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0 (6)修改配置項 [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --config flush.messages=1 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic "KafkaTestTopic". [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic Topic:KafkaTestTopic PartitionCount:3 ReplicationFactor:1 Configs:flush.messages=1 Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: KafkaTestTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: KafkaTestTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0 (7)刪除配置項 [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --delete-config flush.messages WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic "KafkaTestTopic". [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic Topic:KafkaTestTopic PartitionCount:3 ReplicationFactor:1 Configs: Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: KafkaTestTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: KafkaTestTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0 (8)刪除主題 [root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --delete --topic KafkaTestTopic 注意:刪除只是標記刪除;當服務器重啓就會刪除已標記的topic,這個和kafka的版本有關。
5、Java中Kakfa配置 (1).Java中使用kafka的Producer端配置 Properties props=new Properties(); // Kafka服務端的主機名和端口號,多個的話,使用逗號分隔 props.put("bootstrap.servers","ip:9092"); // 等待全部副本節點的應答 props.put("acks", "all"); // 消息發送最大嘗試次數 props.put("retries",0); // 一批消息處理大小 props.put("batch.size","16384"); // 請求延時 props.put("linger.ms",1); // 發送緩存區內存大小 props.put("buffer.memory", 33554430); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); (2).Java中使用kafka的Consumer端配置 Properties props=new Properties(); // 定義kakfa 服務的地址,不須要將全部broker指定上 props.put("bootstrap.servers","ip:9092"); // 制定consumer group props.put("group.id","test1"); // 是否自動確認offset props.put("enable.auto.commit", "true"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");