kafka經常使用命令

kafka自帶sh腳本使用示例:shell

(1)啓動/關閉kafka服務:json

```shell工具

nohup env JMX_PORT=9999 /path/to/kafka_2.10-0.8.2.2/bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &spa

/path/to/kafka_2.10-0.8.2.2/bin/zookeeper-server-stop.sh config/zookeeper.properties >/dev/null 2>&1 &server

```ip

(2)建立topickafka

/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testit

查看topic列表io

/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --list --zookeeper localhost:2181console

(3)發送msg

/path/to/kafka_2.10-0.8.2.2/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

(4)消費msg

/path/to/kafka_2.10-0.8.2.2/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

(5)查看topic狀態

/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

返回值:

Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:

Topic: my-replicated-topicPartition: 0Leader: 1Replicas: 1,2,0Isr: 1,0,2

(6)刪除topic,刪除的時候須要在server.properties中設置"delete.topic.enable=true"才能刪除,不然topic只是被標記刪除:

/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

(7)修改topic

/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --zookeeper localhost:2181 --alert --topic test --partitions 40

(8)添加/刪除配置

/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --zookeeper localhost:2181 --alert --topic test --config x=y

/path/to/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --zookeeper localhost:2181 --alert --topic test --deleteConfig x

(9)balance leadership

第一種(CLI):

/path/to/kafka_2.10-0.8.2.2/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

第二種(配置文件):

auto.leader.rebalance.enable=true

(10)集羣之間mirror數據

/path/to/kafka_2.10-0.8.2.2/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer-1.properties --consumer.config consumer-2.properties --producer.config producer.properties --whitelist my-topic

(11)check consumer position

/path/to/kafka_2.10-0.8.2.2/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

(12)自動前一數據到新機器(Automatically migrating data to new machines)

將topic:foo一、foo2上的全部partitions遷移到新的broker:五、6上,遷移結束以後,topic:foo一、foo2上的全部partitions只會在broker:五、6

建立 topic-to-move.json

>cat topic-to-move.json

{"topics":[{"topic":"foo1"},{"topic":"foo2"}],

"version":1

}

建立好json文件以後,使用以下命令:

--generate

/path/to/kafka_2.10-0.8.2.2/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate

--execute(執行操做)

/path/to/kafka_2.10-0.8.2.2/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

--verify(查看遷移進度)

/path/to/kafka_2.10-0.8.2.2/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

(13)自定義分區分配和遷移

下面的示例會講topic foo1的partition 0分區遷移到broker5,6;同時把topic foo2的partition 1遷移到broker二、3。

>cat custom-reassignment.json

{

    "version":1,

    "partitions":[

    {"topic":"foo1","partition":0,"replicas":[5,6]},

    {"topic":"foo2","partition":1,"replicas":[2,3]}

    ]

}

 

--execute

/path/to/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute

--verify

/path/to/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify

 

(14)增長副本(increase replication factor)

下面的示例會將topic:foo的partition:0的副本數量從1增長到3個;在增長副本數量以前,這個partition的惟一副本是broker:5上,在這裏咱們將會給broker:六、7增長副本。

第一步是手工寫自定義的從新分配計劃的JSON文件:

>cat increase-replication-factor.json

{

    "version":1,

    "partitions":[

        {"topic":"foo","partition":0,"replicas":[5,6,7]}

    ]

}

使用--execute參數和json文件增長副本數量:

/path/to/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

或者--verify選項可與工具用來檢查分區從新分配的狀態:

/path/to/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

相關文章
相關標籤/搜索