目錄html
kafka下載地址:https://kafka.apache.org/downloads
須要說明的是,kafka的安裝依賴於zk,zk的部署可直接參考《Zookeeper介紹與基本部署》。固然,kafka默認也內置了zk的啓動腳本,在kafka安裝路徑的bin目錄下,名稱爲zookeeper-server-start.sh
,若是不想獨立安裝zk,可直接使用該腳本。java
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz tar xf kafka_2.12-2.2.0.tgz -C /usr/local/ cd /usr/local ln -s kafka_2.12-2.2.0 kafka
kafka主配置文件爲/usr/local/kafka/config/server.properties
,配置示例以下:apache
broker.id=0 listeners=PLAINTEXT://10.1.60.29:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/logs num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.1.60.29:2181,10.1.61.195:2181,10.1.61.27:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true delete.topics.enable=true
配置說明:json
hostname:port/path
。hostname爲zk的主機名或ip,port爲zk監聽的端口。/path
表示kafka的元數據存儲到zk上的目錄,若是不設置,默認爲根目錄須要說明的是,多個kafka節點依賴zk實現集羣,因此各節點並不須要做特殊配置,只須要broker.id不一樣,並接入到同一個zk集羣便可。bootstrap
#啓動 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #檢查java進程 # jps 1394 QuorumPeerMain 13586 Logstash 27591 Kafka 27693 Jps #中止 /usr/local/kafka/bin/kafka-server-start.sh
能夠經過zookeeper查看kafka的元數據信息:服務器
#經過zk客戶端鏈接zookeeper ../zookeeper/bin/zkCli.sh #查看根下多了不少目錄 [zk: localhost:2181(CONNECTED) 1] ls / [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config] #查看/brokers/ids,能夠看到有三個broker已經加入 [zk: localhost:2181(CONNECTED) 8] ls /brokers/ids [0, 1, 2] #查看/brokers/topics,目前爲空,說明尚未建立任何的topic [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics []
上面完成了kafka的部署,經過驗證部署咱們發現當前沒有topic,因此建立一個topic以下:網絡
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic myfirsttopic Created topic myfirsttopic.
參數說明:socket
上面經過操做zk就能夠看到topic相關信息,接下來咱們直接經過kafka命令行來進行相關操做:測試
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --list myfirsttopic
#查看myfirsttopic的詳細信息 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic Topic:myfirsttopic PartitionCount:3 ReplicationFactor:2 Configs: Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0 Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
參數說明:大數據
輸出說明:
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 6 --topic myfirsttopic WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic Topic:myfirsttopic PartitionCount:6 ReplicationFactor:2 Configs: Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0 Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: myfirsttopic Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: myfirsttopic Partition: 4 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: myfirsttopic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
#建立一個topic名爲mysecondtopic,指定分區爲2,副本爲1 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic mysecondtopic Created topic mysecondtopic. #查看新建立的topic詳細信息 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mysecondtopic Topic:mysecondtopic PartitionCount:2 ReplicationFactor:1 Configs: Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1 #將broker.id爲0上的partition的副本由原來的[0]擴充爲[0,2],將broker.id爲1上的partition的副本由原來的[1]擴充爲[1,2]。 #須要先建立一個json文件以下: # cat partitions-to-move.json { "partitions": [ { "topic":"mysecondtopic", "partition": 0, "replicas": [0,2] }, { "topic": "mysecondtopic", "partition": 1, "replicas": [1,2] } ], "version": 1 } #執行副本修改 # ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./partitions-to-move.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"mysecondtopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"mysecondtopic","partition":0,"replicas":[0],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. #再次查看topic狀態,發現副本數由按照預期發生變動 # ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mysecondtopic Topic:mysecondtopic PartitionCount:2 ReplicationFactor:2 Configs: Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0 Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1
#執行刪除操做 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic Topic myfirsttopic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. #查看topic,能夠看到myfirsttopic已被刪除 # ./bin/kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets mysecondtopic
# ./bin/kafka-console-producer.sh --broker-list 10.1.60.29:9092 --topic mysecondtopic >hello kafka! >hello world! >just a test! > >hi world! >hahahaha! >
# ./bin/kafka-console-consumer.sh --bootstrap-server 10.1.60.29:9092 --topic mysecondtopic --from-beginning hello kafka! just a test! hi world! hello world! hahahaha!