kafka自帶sh腳本使用示例:shell
(1)啓動/關閉kafka服務:json
```shell工具
nohup env JMX_PORT=9999 /path/to/kafka_2.10-0.8.2.2/bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &spa
/path/to/kafka_2.10-0.8.2.2/bin/zookeeper-server-stop.sh config/zookeeper.properties >/dev/null 2>&1 &server
```ip
(2)建立topickafka
/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testit
查看topic列表io
/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --list --zookeeper localhost:2181console
(3)發送msg
/path/to/kafka_2.10-0.8.2.2/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(4)消費msg
/path/to/kafka_2.10-0.8.2.2/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
(5)查看topic狀態
/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
返回值:
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topicPartition: 0Leader: 1Replicas: 1,2,0Isr: 1,0,2
(6)刪除topic,刪除的時候須要在server.properties中設置"delete.topic.enable=true"才能刪除,不然topic只是被標記刪除:
/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
(7)修改topic
/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --zookeeper localhost:2181 --alert --topic test --partitions 40
(8)添加/刪除配置
/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --zookeeper localhost:2181 --alert --topic test --config x=y
/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --zookeeper localhost:2181 --alert --topic test --deleteConfig x
(9)balance leadership
第一種(CLI):
/path/to/kafka_2.10-0.8.2.2/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181
第二種(配置文件):
auto.leader.rebalance.enable=true
(10)集羣之間mirror數據
/path/to/kafka_2.10-0.8.2.2/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer-1.properties --consumer.config consumer-2.properties --producer.config producer.properties --whitelist my-topic
(11)check consumer position
/path/to/kafka_2.10-0.8.2.2/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
(12)自動前一數據到新機器(Automatically migrating data to new machines)
將topic:foo一、foo2上的全部partitions遷移到新的broker:五、6上,遷移結束以後,topic:foo一、foo2上的全部partitions只會在broker:五、6
建立 topic-to-move.json
>cat topic-to-move.json
{"topics":[{"topic":"foo1"},{"topic":"foo2"}],
"version":1
}
建立好json文件以後,使用以下命令:
--generate
/path/to/kafka_2.10-0.8.2.2/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
--execute(執行操做)
/path/to/kafka_2.10-0.8.2.2/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
--verify(查看遷移進度)
/path/to/kafka_2.10-0.8.2.2/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
(13)自定義分區分配和遷移
下面的示例會講topic foo1的partition 0分區遷移到broker5,6;同時把topic foo2的partition 1遷移到broker二、3。
>cat custom-reassignment.json
{
"version":1,
"partitions":[
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}
]
}
--execute
/path/to/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
--verify
/path/to/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
(14)增長副本(increase replication factor)
下面的示例會將topic:foo的partition:0的副本數量從1增長到3個;在增長副本數量以前,這個partition的惟一副本是broker:5上,在這裏咱們將會給broker:六、7增長副本。
第一步是手工寫自定義的從新分配計劃的JSON文件:
>cat increase-replication-factor.json
{
"version":1,
"partitions":[
{"topic":"foo","partition":0,"replicas":[5,6,7]}
]
}
使用--execute參數和json文件增長副本數量:
/path/to/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
或者--verify選項可與工具用來檢查分區從新分配的狀態:
/path/to/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify