1,添加和刪除topicbootstrap
在topic配置中若是設置了auto_created topic 爲true,則當生產者第一次將數據發佈到一個不存在的topic,topic會自動建立。固然,topic也能夠手動建立:ubuntu
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
參數 replication-factor控制每條寫入的消息會複製到多少個服務器(broker)。服務器的個數要>=replication-factor的個數.不然要掛。官方是建議replication-factor至少要>=2這樣數據消費過程不會被打斷。服務器
參數partition控制topic將被寫入多少log(所謂的partition就是文件夾個數),分區數=最大並行消費者數量。ui
2,修改topiccode
添加分區數orm
t@ubuntu:~/source/kafka_2.10-0.10.0.0/bin$ ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic myTest4 --partitions 8 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! t@ubuntu:~/source/kafka_2.10-0.10.0.0/bin$
三個broker的數據目錄自動生成三個分區目錄,其中數據文件大小均爲0server
添加配置ip
bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --add-config x=y
刪除配置資源
bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --delete-config x
刪除topic文檔
首先要把server配置文件添加以下項:
delete.topic.enable=true
若是沒有設置爲true,則有以下結果:
t@ubuntu:~/source/kafka_2.10-0.10.0.0/bin$ ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTest4 Topic myTest4 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
只是標記而已,myTest4數據依舊能夠讀
3,關閉kafka服務
kafka集羣能自動檢測關閉或者失敗的服務器,並在其餘服務器中選出新的leader。若是關閉服務是爲了維護或者更新配置,能夠選擇一種相對優雅的方式關閉kafka服務。
首先要將配置文件添加以下:
controlled.shutdown.enable=true
經過bin目錄下zookeeper-server-stop.sh關閉,這樣關閉有兩個好處
1,關閉前會吧數據同步到磁盤,避免重啓服務作額外的數據恢復於是啓動耗時;
2,It will migrate any partitions the server is the leader for to other replicas prior to shutting down. This will make the leadership transfer faster and minimize the time each partition is unavailable to a few milliseconds.大意是下降分區不可用時間,具體我也不瞭解。
4,平衡leadership
官網寫的不太好理解。若是本地分別啓動三個broker,第一個broker會成爲全部分區的leader,這樣讀寫所有到broker1,對資源利用很不利,因此要平衡leader到各個服務器上。有兩種方法,
1,利用命令手動平衡一下:
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
配置文件裏面添加以下項目:
auto.leader.rebalance.enable=true
5,機架間phenomenon副本
讓副本相同的分區在不一樣機架上進行復制,避免某個機架掛了致使數據丟失。
須要在配置文件中添加broker歸屬的機架id
broker.rack=my-rack-id
目前在連機房都沒有的小公司,這個估計是用不到了。呵呵。。。。
6,集羣間鏡像數據
呵呵。。。公司過小,暫時是用不到了。
7,檢查消費者位置
有時候查看一下消費者位置是有用的。
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group pm_ws_6d0a289d7d7b11e7a9f20242c0a80010 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER pm_ws_6d0a289d7d7b11e7a9f20242c0a80010 test 0 98 1113 1015 none
8,管理消費組
官方文檔的命令是
bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
本地運行以下,應該是版本有點落後不一致,
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Missing required argument "[zookeeper]" Option Description ------ ----------- --bootstrap-server <server to connect REQUIRED (only when using new- to> consumer): The server to connect to. --command-config <command config Property file containing configs to be property file> passed to Admin Client and Consumer. --delete Pass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 -- group g2
對2.10-0.10.0.0命令以下
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list console-consumer-64888 aaa pm_ws_6d0a289d7d7b11e7a9f20242c0a80010 console-consumer-7335 wuwuwu console-consumer-17559 awaken console-consumer-47823 pm_ws_6d0a289d7d7b11e7a9f20242c0a80009 group1 0 test-consumer-group108 console-consumer-95228 console-consumer-51385 console-consumer-32687 group-1 t@ubuntu:~/source/kafka_2.10-0.10.0.0$